Do you really need Await ?
When doing asyc processing in Scala, you might use Future and Await. But you can avoid the latter. Here are some explanations.
If you are doing asynchronous processing in Scala, chances are high that you met Future
. If you are lucky enough, you do not because you work with cats-effect or a similar effect library. There are many pitfalls when working with Future
and in this article, we will talk about a very obvious one, which is even clearly mentioned in the standard library : Await
. From time to time, I encounter it in production code and so, I replace it with safer patterns. Here are some of them.
Await
as a way to get the result of a Future
Most of the time, this is the situation where I meet Await
in production code:
import scala.concurrent.duration._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
val future: Future[String] = Future {
val inner = Future {
Thread.sleep(1000)
"Done inner"
}
val res = Await.result(inner, Duration.Inf)
s"We did it : $res"
}
Await.result(future, Duration.Inf)
Why is it dangerous ?
Depending on your ExecutionContext
you can create a deadlock, especially if this whole thing ends up being in a future itself, as in this example:
import scala.concurrent.duration._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
val future: Future[String] = Future {
val inner = Future {
Thread.sleep(1000)
"Done inner"
}
val res = Await.result(inner, Duration.Inf)
s"We did it : $res"
}
Await.result(future, Duration.Inf)
If you use the provided global
execution context, you will be fine. But if you use a custom context with a single thread, it will never terminate:
import scala.concurrent.duration._
import scala.concurrent._
import java.util.concurrent.Executors
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
val future: Future[String] = Future {
val inner = Future {
Thread.sleep(1000)
"Done inner"
}
val res = Await.result(inner, Duration.Inf)
s"We did it : $res"
}
Await.result(future, Duration.Inf)
In this situation, we artificially created a lack of threads to execute futures. This highlights two things :
- blocking in
Future
should be avoided Await
blocks !
This example is very simplistic, but it may have several forms in real word.
Overcoming
If your intent is to just use the result of a future in a non-effectful function (a function whose type looks like A => B
, then the combinator you are looking for is map
. If you want to use the result as the input of a computation returning a Future
, you want to use flatMap
.
So, instead of the code above, we could just :
import scala.concurrent.duration._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
val future: Future[String] = Future {
Thread.sleep(1000) // Or better, some real computation that takes some time
"done"
}.map(doSomething)
Or if doSomething
returns a future itself, we can use flatMap
like this:
import scala.concurrent.duration._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
def doSomething(str: String): Future[String] = // Some meaningful computation
val future: Future[String] = Future {
Thread.sleep(1000) // Or better, some real computation that takes some time
"done"
}.flatMap(doSomething)
But you may find something else of interest with Await
, which is the ability to set a timeout. This is the other use-case that can easily be replaced.
Using Await
as a timeout
Sometimes, the useful thing in Await.result
is its second parameter which is the maximum duration to wait for a Future
to complete (either successfully or with failure). But this behavior can also be replaced to stay in the context of Future
. We just need to provide a mechanism to race two Future
s. Fortunately this already exists and it is called firstCompletedOf
. This way we can have a delayed Future
which will complete with a failure acting as the timeout. This can be achieved with a ScheduledExecutorService
when using standard Scala, or with a Scheduler
when akka is involved.
Note that in this implementation, timeout does not mean cancel. Achieving proper cancellation with Future
is a tough topic which must be handled carefully. Here is however two implementations of a timeout mechanism. For both use examples, we use an implicit class to provide a nice syntax.
Basic timeout
import scala.concurrent.duration._
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
val scheduler = Executors.newSingleThreadScheduledExecutor()
def failAfter[A](duration: FiniteDuration): Future[A] = {
val p = Promise[A]()
scheduler.schedule(
() => p.failure(new TimeoutException("Timeout") with NoStackTrace),
duration.toMillis,
java.util.concurrent.TimeUnit.MILLISECONDS
)
p.future
}
implicit class FutureOps[A](val f: Future[A]) extends AnyVal {
def timeout(after: FiniteDuration): Future[A] =
Future.firstCompletedOf(List(failAfter[A](after), f))
}
val never = Promise[Int]().future
never.timeout(3.seconds).onComplete {
case Failure(exception) => println(s"Error ${exception.getMessage}")
case Success(value) => println("Success")
}
Timeout with akka
import akka.actor.ActorSystem
import scala.concurrent.duration._
import scala.concurrent._
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val ec: ExecutionContext = actorSystem.dispatcher
def failAfter[A](duration: FiniteDuration): Future[A] =
akka.pattern.after(duration)(Future.failed(new TimeoutException("Timeout") with NoStackTrace))
implicit class FutureOps[A](val f: Future[A]) extends AnyVal {
def timeout(after: FiniteDuration): Future[A] = Future.firstCompletedOf(List(failAfter[A](after), f))
}
val never = Promise[Int]().future
never.timeout(3.seconds).onComplete {
case Failure(exception) => println(s"Error: ${exception.getMessage}")
case Success(value) => println("Success")
}
Usage in akka-stream
Last but not least, I sometimes encouter Await
in custom akka stream stages. This happens when the stage needs to call some external API or perform an async processing. Let's say we have a def fetch[A](): Future[A]
method that we want to call from a custom stage; in this case, calling the push
method directly like this fetch().map(e => push(out, e))
won't work. The temptation to use Await
is high but it is forgetting that akka provides us with the necessary tool : getAsyncCallback
.
Conclusion
Except at the edges of your program, it is perfectly fine to stay in the Future
context. We saw how everything is done to make working with them easily. Test frameworks like scalatest or munit provide first class support for Future
so even in tests, you do not need Await
. One thing we do not discuss in this blog post is cancellation : in our samples, we saw how to timeout but not how to stop computation on timeout. This is a though issue where some answers can be found in Viktor Klang's post. Alternatively, you can also drop Future
from your codebase and use cats-effect
, a runtime providing a pure asynchronous runtime.