import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Success, Failure}
// 동기 실행 (블로킹)
def fetchUserSync(id: Int): User = {
Thread.sleep(1000) // 네트워크 요청 시뮬레이션
User(id, s"User$id")
}
val start = System.currentTimeMillis()
val user1 = fetchUserSync(1)
val user2 = fetchUserSync(2)
val end = System.currentTimeMillis()
println(s"Time: ${end - start}ms") // ~2000ms
// 비동기 실행 (논블로킹)
def fetchUserAsync(id: Int): Future[User] = Future {
Thread.sleep(1000)
User(id, s"User$id")
}
val startAsync = System.currentTimeMillis()
val future1 = fetchUserAsync(1)
val future2 = fetchUserAsync(2)
// 결과 대기
Future.sequence(List(future1, future2)).onComplete {
case Success(users) =>
val endAsync = System.currentTimeMillis()
println(s"Time: ${endAsync - startAsync}ms") // ~1000ms (병렬 실행)
case Failure(ex) => println(s"Error: $ex")
}
case class User(id: Int, name: String)
// Java CompletableFuture와 비교
import java.util.concurrent.CompletableFuture;
CompletableFuture<User> future1 = CompletableFuture.supplyAsync(() -> fetchUser(1));
CompletableFuture<User> future2 = CompletableFuture.supplyAsync(() -> fetchUser(2));
CompletableFuture.allOf(future1, future2).thenRun(() -> {
// 완료 처리
});
val future: Future[Int] = Future {
Thread.sleep(500)
42
}
// onComplete: Success/Failure 모두 처리
future.onComplete {
case Success(value) => println(s"Result: $value")
case Failure(ex) => println(s"Error: ${ex.getMessage}")
}
// foreach: 성공 시만 처리
future.foreach(value => println(s"Success: $value"))
// failed: 실패 시만 처리
future.failed.foreach(ex => println(s"Failed: ${ex.getMessage}"))
// map: 결과 변환
val future1: Future[Int] = Future(42)
val future2: Future[String] = future1.map(_.toString)
future2.foreach(println) // "42"
// flatMap: Future 체이닝
def getUser(id: Int): Future[User] = Future(User(id, s"User$id"))
def getOrders(userId: Int): Future[List[Order]] = Future(List(Order(1, userId, 100)))
val result: Future[List[Order]] = for {
user <- getUser(1)
orders <- getOrders(user.id)
} yield orders
result.foreach(println)
case class Order(id: Int, userId: Int, amount: Int)
val future: Future[Int] = Future(42)
// filter: 조건 불만족 시 NoSuchElementException
val filtered = future.filter(_ > 40)
filtered.foreach(println) // 42
// collect: 부분 함수 적용
val collected = future.collect {
case x if x > 40 => s"Large: $x"
}
collected.foreach(println) // "Large: 42"
def riskyOperation(): Future[Int] = Future {
if (scala.util.Random.nextBoolean()) throw new Exception("Random failure")
else 42
}
// recover: 동기 복구
val recovered1 = riskyOperation().recover {
case _: Exception => 0
}
// recoverWith: 비동기 복구
val recovered2 = riskyOperation().recoverWith {
case _: Exception => Future(0)
}
// fallbackTo: 대체 Future
val primary = riskyOperation()
val fallback = Future(0)
val result = primary.fallbackTo(fallback)
// 기본 ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
val future = Future {
println(s"Running on: ${Thread.currentThread().getName}")
42
}
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
// 고정 크기 스레드 풀
val fixedPool: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(4)
)
// 캐시된 스레드 풀
val cachedPool: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newCachedThreadPool()
)
// 단일 스레드
val singleThread: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newSingleThreadExecutor()
)
// 사용
implicit val ec: ExecutionContext = fixedPool
val future = Future {
// fixedPool에서 실행
42
}
import scala.concurrent.{Future, blocking, ExecutionContext}
// 블로킹 작업용 ExecutionContext
implicit val blockingEC: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newCachedThreadPool()
)
def readFile(path: String): Future[String] = Future {
blocking { // 블로킹 작업 명시
scala.io.Source.fromFile(path).mkString
}
}(blockingEC)
// CPU 집약적 작업은 별도 ExecutionContext 사용
implicit val cpuIntensiveEC: ExecutionContext = ExecutionContext.fromExecutor(
Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors())
)
// sequence: List[Future[A]] => Future[List[A]]
val futures: List[Future[Int]] = List(
Future(1),
Future(2),
Future(3)
)
val combined: Future[List[Int]] = Future.sequence(futures)
combined.foreach(println) // List(1, 2, 3)
// traverse: List[A] => Future[List[B]]
val ids = List(1, 2, 3)
val users: Future[List[User]] = Future.traverse(ids)(id => getUser(id))
val future1 = Future(10)
val future2 = Future(20)
// zip: Future[(A, B)]
val zipped: Future[(Int, Int)] = future1.zip(future2)
zipped.foreach { case (a, b) => println(s"$a, $b") } // 10, 20
// zipWith: 결과 변환
val sum: Future[Int] = future1.zipWith(future2)(_ + _)
sum.foreach(println) // 30
// firstCompletedOf: 가장 먼저 완료된 Future
val futures = List(
Future { Thread.sleep(100); 1 },
Future { Thread.sleep(50); 2 },
Future { Thread.sleep(200); 3 }
)
val first = Future.firstCompletedOf(futures)
first.foreach(println) // 2 (가장 빠름)
// find: 조건을 만족하는 첫 번째 결과
val found = Future.find(futures)(_ > 1)
found.foreach(println) // Some(2)
import scala.concurrent.Promise
// Promise 생성
val promise = Promise[Int]()
val future = promise.future
// 완료 처리
future.foreach(v => println(s"Result: $v"))
// Promise 완료
promise.success(42)
// 또는 실패
// promise.failure(new Exception("Failed"))
// 한 번만 완료 가능
// promise.success(100) // IllegalStateException
import scala.concurrent.duration._
def withTimeout[T](future: Future[T], timeout: FiniteDuration): Future[T] = {
val promise = Promise[T]()
// 원본 Future 결과 전달
future.onComplete(promise.tryComplete)
// 타임아웃 스케줄링
val scheduler = Executors.newScheduledThreadPool(1)
scheduler.schedule(new Runnable {
def run(): Unit = promise.tryFailure(new TimeoutException(s"Timeout after $timeout"))
}, timeout.length, timeout.unit)
promise.future
}
// 사용
val slowOperation = Future {
Thread.sleep(5000)
42
}
val result = withTimeout(slowOperation, 2.seconds)
result.onComplete {
case Success(v) => println(s"Result: $v")
case Failure(ex) => println(s"Failed: ${ex.getMessage}") // Timeout after 2 seconds
}
case class UserProfile(id: Int, name: String, email: String)
case class UserStats(userId: Int, posts: Int, followers: Int)
case class UserSettings(userId: Int, theme: String, notifications: Boolean)
// 개별 API 호출
def fetchProfile(id: Int): Future[UserProfile] = Future {
Thread.sleep(100)
UserProfile(id, s"User$id", s"user$id@example.com")
}
def fetchStats(id: Int): Future[UserStats] = Future {
Thread.sleep(100)
UserStats(id, 42, 100)
}
def fetchSettings(id: Int): Future[UserSettings] = Future {
Thread.sleep(100)
UserSettings(id, "dark", true)
}
// 병렬 조합
case class CompleteUser(
profile: UserProfile,
stats: UserStats,
settings: UserSettings
)
def fetchCompleteUser(id: Int): Future[CompleteUser] = {
val profileF = fetchProfile(id)
val statsF = fetchStats(id)
val settingsF = fetchSettings(id)
for {
profile <- profileF
stats <- statsF
settings <- settingsF
} yield CompleteUser(profile, stats, settings)
}
// 사용
val start = System.currentTimeMillis()
fetchCompleteUser(1).onComplete {
case Success(user) =>
val end = System.currentTimeMillis()
println(s"Fetched user in ${end - start}ms") // ~100ms (병렬)
println(user)
case Failure(ex) => println(s"Error: ${ex.getMessage}")
}
def step1(): Future[Int] = Future(10)
def step2(x: Int): Future[Int] = Future {
if (x < 20) throw new Exception("Too small")
else x * 2
}
def step3(x: Int): Future[String] = Future(s"Result: $x")
// 중간 에러는 자동 전파
val pipeline: Future[String] = for {
a <- step1()
b <- step2(a) // 에러 발생
c <- step3(b) // 실행 안 됨
} yield c
pipeline.recover {
case ex => s"Error: ${ex.getMessage}"
}.foreach(println) // "Error: Too small"
def fetchMultipleUsers(ids: List[Int]): Future[List[Either[Throwable, User]]] = {
val futures = ids.map { id =>
getUser(id)
.map(Right(_): Either[Throwable, User])
.recover { case ex => Left(ex) }
}
Future.sequence(futures)
}
// 사용
fetchMultipleUsers(List(1, 2, 999)).foreach { results =>
val (errors, users) = results.partition(_.isLeft)
println(s"Success: ${users.size}, Errors: ${errors.size}")
}
import scala.concurrent.Await
import scala.concurrent.duration._
val future = Future(42)
// 블로킹 대기 (프로덕션에서는 피해야 함)
val result = Await.result(future, 5.seconds)
println(result) // 42
// 완료 여부만 확인
val ready = Await.ready(future, 5.seconds)
// 대량 작업을 배치로 분할
def processBatch[A, B](items: List[A], batchSize: Int)(f: A => Future[B]): Future[List[B]] = {
items.grouped(batchSize).foldLeft(Future.successful(List.empty[B])) { (accF, batch) =>
for {
acc <- accF
results <- Future.sequence(batch.map(f))
} yield acc ++ results
}
}
// 사용: 100개씩 배치 처리
val ids = (1 to 1000).toList
processBatch(ids, 100)(getUser).foreach { users =>
println(s"Processed ${users.size} users")
}
기능 | Scala Future | Java CompletableFuture |
---|---|---|
생성 | Future { ... } |
CompletableFuture.supplyAsync |
변환 | map , flatMap |
thenApply , thenCompose |
조합 | zip , sequence |
thenCombine , allOf |
에러 처리 | recover , recoverWith |
exceptionally , handle |
콜백 | onComplete , foreach |
thenAccept , whenComplete |
val future = Future { ... } // 즉시 실행 시작
def myFunc()(implicit ec: ExecutionContext): Future[Int] = Future(42)
여러 파일을 병렬로 읽고 처리하세요:
def processFiles(paths: List[String]): Future[Map[String, Int]] = ???
// 각 파일의 단어 수 계산
// 결과: Map(파일명 -> 단어수)
실패 시 재시도하는 Future 래퍼:
def retry[T](times: Int)(f: => Future[T]): Future[T] = ???
// 최대 3회 재시도
val result = retry(3) {
fetchFromUnreliableAPI()
}
Future 결과를 캐싱하는 시스템:
class AsyncCache[K, V] {
def get(key: K)(compute: => Future[V]): Future[V] = ???
}
// 같은 키의 중복 요청 방지
val cache = new AsyncCache[Int, User]()
val user1 = cache.get(1)(fetchUser(1))
val user2 = cache.get(1)(fetchUser(1)) // 캐시에서 반환
이번 챕터에서 학습한 내용:
다음 챕터 예고: Chapter 13에서는 매크로와 리플렉션을 활용한 메타프로그래밍을 학습합니다.