paint-brush
Composable Resource Management in Scalaby@bszwej
762 reads
762 reads

Composable Resource Management in Scala

by Bartlomiej SzwejApril 28th, 2020
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

This article is dedicated to beginner and intermediate level Scala programmers. The idea is to encapsulate resource management inside a function and delegate its usage to a closure (handle) In other words, to loan the resource to the handler. We’ll start with the most basic ones and iteratively improve to achieve a fully compositional and safe way of dealing with them. We can use the so-called Loaner pattern to better manage resources in purely functional programs with Cats Effect 2xx.

Company Mentioned

Mention Thumbnail
featured image - Composable Resource Management in Scala
Bartlomiej Szwej HackerNoon profile picture

We’re all using resources on a daily basis. You turn on the water tap, wash your hands for at least 20–30 seconds, and turn it off. You switch the light on if it’s dark, and off when you no longer need it.

Resources have a lifecycle. You open them, use them, and close them afterward. Not only light and water taps fall in that category, but also all kinds of database connections, HTTP servers, clients, files, or streams.

What’s interesting about resources is that they often depend on each other, so we care about the ordering of their acquisition and release. We want to see how we wash our hands, so we switch the light on before turning the water tap. If a database is used for processing HTTP requests, we want to connect to it first and after that, we start the HTTP server. However, during the application shutdown, we want to stop the HTTP server, and then close the database connection. In other words, we often want to release the resources in reverse acquisition order.

Finally, we’d like to automate resource management. We shouldn't go through the code all over again to manually verify if we properly release them. Ideally, we should come up with a construct that does that automatically. It’d be awesome if both light and water tap would turn on/off itself as you’re approaching!

There are multiple ways to do resource management in Scala. We’ll start with the most basic ones and iteratively improve to achieve a fully compositional and safe way of dealing with them. In the end, we’ll see how to manage resources in purely functional programs with Cats Effect 2.x.

This article is dedicated to beginner and intermediate level Scala programmers. The source code shown in this blog post can be found in this repository. With all of that in mind, let’s start!

It all starts with… try-finally

try-finally is probably the most basic way of managing resources. We try to use them, and finally, whatever happens, we try to close them:

trait SqsConsumer {
  def close() = ???
}
val sqsConsumer = new SqsConsumer {}

try {
  // using sqsConsumer
} finally {
  sqsConsumer.close()
}

The code above wouldn’t differ much from its Java equivalent. try-finally is a construct that comes from and is meant to be used in the imperative world. This approach has the following disadvantages:

  • It’s not composable.
  • It’s manual.
  • You have to always look at the code, and check if all the resources you acquire are closed.It’s easy to forget about proper ordering when closing.
  • If we’re closing more resources in the finally block, and something throws an exception there, the rest of the resources would remain open.

How can we improve?

The Loaner Pattern

Next, we can use the so-called Loaner pattern to better manage resources:

def withSqsConsumer[T](resource: SqsConsumer)(handle: SqsConsumer => T): T = {
  try {
    handle(resource)
  } finally  {
    resource.close()
  }
}

withSqsConsumer(new SqsConsumer{}) { consumer: SqsConsumer =>
  consumer.poll
}

The idea is to encapsulate resource management inside a function and delegate its usage to a closure (handle). In other words, to loan the resource to the handler.

It doesn’t differ much from the raw try-finally approach. However, what’s better is that the caller of withSqsConsumer doesn’t have to care about closing the resource.

If we had more resources, we could try to compose them by nesting:

withDB(new DB) { db =>
  withHttpServer(new Server) { httpServer =>
    println("==> Using DB and server")
  }
}

We have a guaranteed ordering when closing resources, which is a reverse order of acquisition. It means that, in this case, we could observe the following order of acquisition/release:

Opening DB
Opening Http server
==> Using DB and server
Closing Http server
Closing DB

The main drawback is that this approach does not compose well. Imagine how it would look with 10 or more resources. It’s definitely not scalable in terms of code structure, and would probably resemble something like a callback hell.

The Loaner pattern addresses some of the flaws we observed with try-catch. But what’s interesting here is that it looks as something we could generalize even more, and if done properly, we could drastically improve composability.

Introducing Resource

Let’s try to generalize the Loaner pattern to work for all kinds of resources:

def withResource[R, T](resource: => R)(handle: R => T)(close: R => Unit): T =
  try {
    handle(resource)
  } finally {
    close(resource)
  }

What we did here is we introduced the type parameter R, which represents the resource and we extracted the close function. Notice that resource handle and close nicely describe the whole lifecycle of the value R.

But… we still didn’t solve the problem of composition!

It turns out we can take the above approach, and turn it into a nice abstraction. Meet the Resource:

trait Resource[R] {
  def use[U](f: R => U): U
}
object Resource {
  def make[R](acquire: => R)(close: R => Unit) = new Resource[R] {
    override def use[U](f: R => U): U = {
      val resource = acquire
      try {
        f(resource)
      } finally {
        close(resource)
      }
    }
  }
}

We can use it in the following way:

val server = Resource.make(new HttpServer {})(_.close())
server.use { httpServer =>
  // ...
}

The resource is meant to be used only inside the use block. Before/after that, it’s closed. If you leak it outside of use, that might throw an exception.

Now let’s think about how we could approach the problem of composition. As mentioned in the introduction, we often care about the order of acquisition/release, as resources can have dependencies between each other.

That means we’d like to compose our new data type sequentially, and sequential composition is the essence of… a Monad!

Creating a Monad[Resource] is not that hard, and we can follow the types here:

def flatMap[A, B](r: Resource[A])(mapping: A => Resource[B]): Resource[B] = new Resource[B] {
  override def use[U](f: B => U): U =
    r.use(res1 => mapping(res1).use(res2 => f(res2)))
}
def pure[R](r: => R): Resource[R] = Resource.make(r)(_ => ())

Each flatMap invocation creates a new Resource[B]. To open Resource[B], we have to first open Resource[A] with r.use. As you noticed, this is the essence of the sequential composition. We open A, and only after that we open B. Then, we define what happens after r is opened. We apply the mapping function and follow the types till the end.

That’s it! Now we can compose resources sequentially!

val httpServer = Resource.make(new HttpServer {})(_.close())
val mq         = Resource.make(new MQ {})(_.close())
httpServer.flatMap { server =>
  mq.flatMap { mq =>
    Resource.pure(businessLogic(server, mq))
  }
}

Side note on Monads

The Curse of Monad says, that once you get the epiphany, once you understand, you lose the ability to explain it to anybody. That’s probably why so many beginner functional programmers struggle with this concept… while they don’t really have to know anything about Monads in order to write functional programs.

To simply put, a Monad is all about running computations in sequence. In Scala, this sequencing is expressed using flatMap operation, or its syntactic sugar: for-comprehension. Option, Either, List, IO, Resource, and many more data types are in fact Monads.

def flatMap(fa: F[A])(f: A => F[B]): F[B]
says:

To get 

F[B]
, first it needs to be returned by 
f: A => F[B]
. But to return it, we need to supply 
A
. And the only place where A comes can be 
F[A]
.

Monads come with laws. They’re just to make your life easier and avoid unnecessary surprises. Laws guarantee, that flatMap will behave in the same consistent way across all monads, e.g. Option, Resource, IO, …

We can also easily implement Functor[Resource] instance as well:

def map[B](r: Resource[A])(mapping: A => B): Resource[B] = 
  new Resource[B] {
    override def use[U](f: B => U): U = r.use(a => f(mapping(a)))
  }

Functor, like Monad, is also a recurring pattern in functional programming. Functors are all data structures, that you can map. In fact, all Monads, are also Functors, but not the other way around.

Voilà, now we have flatMap and map operations defined on the Resource, so we can use for-comprehensions:

val resources: Resource[(HttpServer, DB)] =
  for {
    db         <- dbResource
    httpServer <- httpServerResource
  } yield (httpServer, db)

resources.use {
  case (httpServer, db) =>
    println("Running program, that uses http server and db...")
}

And it works exactly as we wanted:

Opening DB
Opening http server
Running program, that uses http server and db...
Closing HTTP Server
Closing DB

We can use the property, that every Monad is an Applicative, and turn the above example into a nice applicative composition (if resources don’t depend on each other). We’ll see such an example later in this article when we’ll talk about resource management with cats-effect.

What we achieved here is:

  • sequential composition
  • an ordered resource release (in reverse order of acquisition)
  • automated resource management
  • closing all resources if our program fails with an exception
  • handling edge situations when closing one of the resources throws an exception. In this case, all other resources are closed, which is good.

It turns out, that a similar concept of resource already exists in Haskell and in Scala libraries for some time.

Of course, the Resource[R] we designed is very simplistic, just to give you an idea. It’d be hard to use when asynchrony comes into play, hence we can’t really use it with modern effect types like ZIO, Monix Task, or Cats IO.

Luckily, all the above effect types have mechanisms to deal with resource management. The good news is as soon as we see them, and we’ll immediately notice some resemblance.

Resource management with Cats Effect

cats-effect is a library that comes with building blocks for modeling effectful computations. In other words, it allows you to write purely functional programs, that do side-effects like:

Communicating with different services over the network.Keeping and modifying its internal state.…

cats-effect consists of two parts:

  • cats IO — a fully composable data type, that allows you to hide (suspend) side-effects inside. Think of it as an alternative to Monix Task, or ZIO. Think of it as Future, but rather in a wide approximation.
  • type classes, which describe the shape of an arbitrary effect type. What it means is that you can write programs or libraries in terms of them, and later supply other compatible¹ effect types (e.g. Cats IO, Monix Task, ZIO, …).
  • [1] the ones for which cats-effect type class instances are defined.

Daniel Spiewak defines it in the following way:

Slide from the recent talk Cats Effect 3: What, When, Why presented during Scala Love conference

cats-effect comes with two handy mechanisms of dealing with resources: bracket and resource.

But for now, let’s see how we’d manage resources without them, and what could be the potential problems.

By default, we can just treat resources any other kind of value:

class SomeAwsSdkJavaClient {
  println("Opening connections")
  def use: Unit = println("Using")
  def close: Unit = println("Closing connections")
}
// we should probably use Blocker here, but let's forget about that detail for now
def program: IO[Unit] =
  for {
    client <- IO(new SomeAwsSdkJavaClient)
    _      <- businessLogic(client)
    _      <- IO(client.close)
  } yield ()

def businessLogic(client: SomeAwsSdkJavaClient): IO[Unit] =
  for {
    _ <- IO(client.use)
  } yield ()

and this could work fine:

Opening connections
Using
Closing connections

Now let’s modify the business logic to raise an unexpected error:

def businessLogic(client: SomeAwsSdkJavaClient): IO[Unit] =
  for {
    _ <- IO.raiseError(new RuntimeException("boom"))
    _ <- IO(client.use)
  } yield ()

As a result, we are not closing the resource:

Opening connections
java.lang.RuntimeException: boom
 at com.example.catsexamples$.businessLogic(catsexamples.scala:22)
 ...

Ok, you might say we can just attempt , and then rethrow again after we close the resource:

def program: IO[Unit] =
  for {
    client <- IO(new SomeAwsSdkJavaClient)
    e      <- businessLogic(client).attempt
    _      <- IO(client.close)
    _      <- IO.fromEither(e)
  } yield ()

And there we go, the program explodes and the resource is closed:

Opening connections
Closing connections
java.lang.RuntimeException: boom

But... there is one detail we missed. IO is cancellable and that means we can stop it in the middle of execution. In this case, we’d also want to, no matter what, release all the resources involved. We can demonstrate it in the following way:

override def run(args: List[String]): IO[ExitCode] =
  program.timeout(2.seconds).map(_ => ExitCode.Success)

def program: IO[Unit] =
  for {
    client <- IO(new SomeAwsSdkJavaClient)
    _      <- IO.sleep(5.seconds)
    e      <- businessLogic(client).attempt
    _      <- IO(client.close)
    _      <- IO.fromEither(e)
  } yield ()

and what we get is:

Opening connections
java.util.concurrent.TimeoutException: 2 seconds
 at cats.effect.IO.timeout(IO.scala:452)

As you can see, we do not release any resources if IO gets canceled. For that, we’d probably need something more advanced. We could obviously experiment with using timeoutTo, and falling back another IO that does the cleanup, but this is not where we want to go.

Bracket

Meet the bracket. Bracket comes from Haskell, and is a type class in cats-effect dedicated to safe and automatic resource management:

trait Bracket[F[_], E] extends MonadError[F, E] {
  def bracket[A, B](acquire: F[A])(use: A => F[B])
                   (release: A => F[Unit]): F[B]
}

What the docs say is:

Bracket is an extension of MonadError exposing the bracket operation, a generalized abstracted pattern of safe resource acquisition and release in the face of errors or interruption.

That sounds like a solution to the problem with errors and cancellations mentioned above. Let’s see it in action:

def program: IO[Unit] =
  IO(new SomeAwsSdkJavaClient)
    .bracket { client =>
      businessLogic(client)
    }(client => IO(client.close))

Nice! We just squashed a bunch of lines into just a single construct, gaining full safety along the way!

When business logic raises an error, then we’re covered:

IO(new HttpServer {})
  .bracket { _ =>
    IO.raiseError(new RuntimeException("boom")) *> IO.unit
  }(x => IO(x.close()))
// Output:
// Opening Http server
// Closing Http server
// java.lang.RuntimeException: boom
//   at com.example.Main3$.$anonfun$run$10(Main.scala:211)

The same applies to cancellations:

IO(new HttpServer {})
  .bracket(_ => IO.sleep(5.seconds))(x => IO(x.close()))
  .map(_ => ExitCode.Success)
  .timeout(1.second)
// Output:
// Opening Http server
// Closing Http server
// java.util.concurrent.TimeoutException: 1 second
//   at cats.effect.IO.timeout(IO.scala:452)

Now let’s add more resources into play, and let’s see how the code would scale:

def program: IO[Unit] =
  IO(new Dynamo)
    .bracket { dynamoClient =>
      IO(new Sqs)
        .bracket { sqsClient =>
          businessLogic(sqsClient, dynamoClient)
        }(sqsClient => IO(sqsClient.close))
    }(dynamoClient => IO(dynamoClient.close))

It turns out that with Bracket we stumble upon the same problem as with the Loaner pattern. Our code becomes unmanageable with more resources due to the lack of composition. Although we have fully automated and safe resource management, we can’t really compose it nicely. And this is where another abstraction comes into play, the Resource.

Note: I’m not saying that using Bracket is wrong. It provides safe resource management when working with IO and is fairly simple. It might work nicely when working a single resource, but with more of them, it might be just cumbersome to use.

Resource

Meet the Resource. This is a data structure we already discovered before. You already know that Resource forms a Monad, hence it can be composed sequentially. And with this property, we can nicely structure our code no matter how it grows!

The definition of Resource in cats-effect is just slightly different than ours. In cats-effect world, everything revolves around IO, and corresponding type classes. The same applies for Resource:

abstract class Resource[F[_], A] {
  def use[B](f: A => F[B])(implicit F: Bracket[F, Throwable]): F[B]
}
object Resource {
  def make[F[_], A](open: F[A])(close: A => F[Unit]): Resource[F, A]
}

The structure resembles the Resource we derived in the former part of the article, but this one requires F[_], which denotes the effect type. That’s because Resource in cats-effect is meant to be used with different effect types (Cats IO, ZIO etc.).

Let’s see how to initialise multiple resources:

def businessLogic(dynamo: Dynamo, sqs: Sqs): IO[Unit] =
  for {
    _ <- IO(dynamo.use)
    _ <- IO(sqs.use)
  } yield ()

def program: Resource[IO, Unit] =
  for {
    dynamo <- Resource.make(IO(new Dynamo))(r => IO(r.close()))
    sqs    <- Resource.make(IO(new Sqs))(r => IO(r.close()))
    _      <- Resource.liftF(businessLogic(dynamo, sqs))
  } yield ()

and it works fine. What’s interesting is that now the whole program is in the Resource context, and not IO. That’s why we used liftF to lift our business logic into Resource[F[_], A]. This shouldn’t be a surprise for a cats user.

Now, if the business logic raises an exception:

def businessLogic(dynamo: Dynamo, sqs: Sqs): IO[Unit] =
  for {
    _ <- IO.raiseError(new RuntimeException("boom"))
    _ <- IO(dynamo.use)
    _ <- IO(sqs.use)
  } yield ()

Then all resources are closed:

Opening dynamo connections
Opening sqs connections
Closing sqs connections
Closing dynamo connections
java.lang.RuntimeException: boom
 at com.example.catsexamples$.businessLogic(catsexamples.scala:47)

On the other hand, if one of the release operations raises an exception:

def program =
  for {
    s3 <- Resource.make(IO(new S3))(r => IO(r.close()))
    dynamo <- Resource.make(IO(new Dynamo))(r => IO.raiseError(new RuntimeException("boom")))
    sqs    <- Resource.make(IO(new Sqs))(r => IO(r.close()))
    _      <- Resource.liftF(businessLogic(dynamo, sqs, s3))
  } yield ()

Then the rest would be closed normally:

Opening S3 connections
Opening dynamo connections
Opening sqs connections
Using dynamo
Using sqs
Using S3
Closing sqs connections
Closing S3 connections
java.lang.RuntimeException: boom
 at com.example.catsexamples$.$anonfun$program4$6(catsexamples.scala:55)

One last thing. What if you have a bunch of independent resources you’d like to compose? Again, it’s not a problem for Resource. You can just use applicative composition:

(s3Res, dynamoRes).tupled.use { case (s3, dynamo) =>
  // ...
}

That’s it! Resource gives us extra safety, composes nicely, and does all of that automatically. I encourage you to look into the Resource source code. For example, you can find many useful constructors, like fromAutoCloseable , or fromAutoCloseableBlocking that might help when dealing with Java APIs.

Bonus: bootstrapping a microservices using Resource

Below you can find a bigger example of the Resource being used while bootstrapping a microservice application, that uses cats-effect, http4s, and Doobie. The full runnable version can be found here.

object Microservice extends IOApp {

  def createMicroservice[F[_]: ContextShift: ConcurrentEffect: Timer]: Resource[F, Server[F]] =
    for {
      config <- Resource
        .liftF(parser.decodePathF[F, AppConfig]("app"))
      blocker <- Blocker.apply[F]
      kinesis <- Resource
        .fromAutoCloseable(KinesisClient.builder().build().pure[F])
      xa <- Resource.pure[F, Transactor[F]](
        Transactor.fromDriverManager[F]("", "", blocker)
      )
      orderRepo    = OrderRepository[F](xa)
      userRepo     = UserRepository[F](xa)
      orderService = OrderService[F](orderRepo)
      userService  = UserService[F](userRepo, kinesis)
      httpApp = Router(
        "/users"  -> UserEndpoints.endpoints[F](userService),
        "/orders" -> OrderEndpoints.endpoints[F](orderService)
      ).orNotFound
      server <- BlazeServerBuilder[F]
        .bindHttp(config.server.port, config.server.host)
        .withHttpApp(httpApp)
        .resource
    } yield server

  def run(args: List[String]): IO[ExitCode] =
    createMicroservice.use(_ => IO.never).as(ExitCode.Success)

}

Summary

We started with the simplest approach, which is try-finally, and iteratively improved until discovering the Resource. We learned that it’s a data structure that guarantees safety, proper release order, and also composes very well.

In the end, we saw how to deal with resources using Bracket and Resource in purely functional programs using cats-effect. We noticed that those mechanisms are very reliable and they met our initial expectations:

  • Sequential composition.
  • Automatic release in reverse order of acquisition.
  • Closing all resources automatically no matter whether the program completes with success, raises with an exception, or is canceled.
  • Handling edge situations when closing one of the resources out of many raises an exception.

Thanks for reading and happy coding!

Further reading