ManagedT: Compositional resource management

March 3, 2018

When bootstrapping a new application, you’d typically like to nicely structure the way resources (which we’ll define in a moment) are acquired. Resources should be allowed to depend on each other, imposing a sequential order on the startup sequence. This pretty much falls out of the module structure:

import monix.eval.Task

trait KinesisWriter
trait EventWriter
trait App

def acquireKinesis: Task[KinesisWriter] = ???
def acquireEventWriter(k: KinesisWriter): Task[EventWriter] = ???
def runApp(k: KinesisWriter, e: EventWriter): Task[Unit] = ???

Ruling out usage of naughty null s and var s, it is impossible to avoid sequential ordering of acquireKinesisWriter, acquireEventWriter and runApp when starting up. However, say we also have close methods which must be called on those writers:

trait KinesisWriter {
  def close: Task[Unit] = Task.unit
}

trait EventWriter {
  def close: Task[Unit] = Task.unit
}

It’s pretty important to call these close methods in the right order - which is usually the reverse acquisition order. There are no function arguments to guide us in this case, so care must be taken; the EventWriter might be buffering events to be written to Kinesis, and if we close that KinesisWriter under its feet, some writes might be lost.

Composing resource acquisition means composing functions that return Task. That’s monadic composition, and we can use for-comprehensions for that:

def acquireKinesis = Task(new KinesisWriter {})
def acquireEventWriter(kinesis: KinesisWriter) = Task(new EventWriter {})
def runApp(eventWriter: EventWriter) = Task.unit

val appTask = for {
  kinesis     <- acquireKinesis
  eventWriter <- acquireEventWriter(kinesis)
  _           <- runApp(eventWriter)
} yield ()

This doesn’t do anything for the cleanup handlers. Imperative programming offers us the try-catch-finally statement, that looks like this:

val resource = acquire()

try {
  use(resource)
} finally {
  release(resource)
}

This assures that release(resource) will be called, regardless if use(resource) throws or not. If we want to compose resource acquisition, we nest the blocks. This doesn’t handle any sort of effect; errors are signaled through exceptions.

bracket

bracket is a functional version of try-catch-finally, that can be used with effects. Here’s bracket from monix.eval.Task’s master branch, slightly modified to be non-infix:

object Task {
  def bracket[R, A](acquire: Task[R])(use: R => Task[A])(release: R => Task[Unit]): Task[A]
}

acquire is a value that produces the resource; use is a function that uses the resource to produce an A; release is a function that releases the resource.

We can write our own version of bracket (that is slightly broken in that it doesn’t take cancellation into account, but we can leave this out for now) using combinators from the MonadError[Task, Throwable] typeclass instance:

import cats.implicits._

def bracket[R, A](acquire: Task[R])(use: R => Task[A])(release: R => Task[Unit]): Task[A] = 
  for {
    resource      <- acquire
    resultOrError <- use(resource).attempt
    result        <- resultOrError match {
                       case Right(a) =>
                         release(resource).map(_ => a)

                       case Left(e)  => 
                         release(resource).flatMap(_ => e.raiseError[Task, A])
                     }
  } yield result

We may want to convince ourselves that this function works given the various failure points it has:

The behavior in the last two points might be arguable, in that it may make sense to always return a or to return the error from use, but the important point here is that release will always be called.

Note that we only used map, flatMap, attempt and raiseError. That means we can abstract Task out too, and constrain F[_] to have an instance of MonadError:

import cats.MonadError

def bracket[F[_], R, A](acquire: F[R])(use: R => F[A])(
  release: R => F[Unit])(implicit F: MonadError[F, Throwable]): F[A] = 
  for {
    resource      <- acquire
    resultOrError <- use(resource).attempt
    result        <- resultOrError match {
                       case Right(a) =>
                         release(resource).map(_ => a)

                       case Left(e)  => 
                         release(resource).flatMap(_ => e.raiseError[F, A])
                     }
  } yield result

We can now build our application as such:

trait KinesisWriter {
  def close: Task[Unit] = Task.unit
}
object KinesisWriter {
  def acquire: Task[KinesisWriter] = Task(new KinesisWriter {})
}

trait EventWriter {
  def close: Task[Unit] = Task.unit
}
object EventWriter {
  def acquire(k: KinesisWriter): Task[EventWriter] = Task(new EventWriter {})
}

val app = bracket(KinesisWriter.acquire) { kw =>
  bracket(EventWriter.acquire(kw)) { ew =>
    Task.unit
  }(_.close)
}(_.close)

One can only imagine how that would look if we have 15 resources with complex dependencies between them. This is the exact opposite of compositionality; everything has to be lumped together. bracket looks like a good direction, but we have to improve on it to make it actually nice to use.

ManagedT

When in doubt, turn to Haskell: most mundane functional programming problems have already been explored there. Fabio Labella helpfully pointed me to Gabriel Gonzalez’ managed library that does exactly what we’ll construct here.

First, let’s re-arrange the bracket signature slightly:

def bracketCurried[F[_], R, A](acquire: F[R])(release: R => F[Unit])(
  implicit F: MonadError[F, Throwable]): (R => F[A]) => F[A] = { use =>
  bracket(acquire)(use)(release)
}

What we’ve done is return a function that, when given the use action, will run the acquision, use and cleanup actions. By using the original bracket, we show ourselves that this version is compatible with the previous.

We can make another mechanical change: instead of currying the function, let’s make this a case class (called ManagedT) and move acquire and release to the constructor:

case class ManagedT[F[_], R](acquire: F[R])(release: R => F[Unit])(
  implicit F: MonadError[F, Throwable]) {

  def apply[A](use: R => F[A]): F[A] = bracket(acquire)(use)(release)
}

We also moved A into the method definition, so the same instance of ManagedT can be used with multiple use actions, not necessarily of the same type. Again, still using the original bracket.

What we have now is a data type that represents resource acquisition and release, decoupled from the code that uses it. We now only need to be able to compose several instances of this data type.

Monad[ManagedT[F, ?]]

When I see a data type that needs to be composed sequentially, monads immediately pop into my mind. Here are the ManagedT instances for the Kinesis and Event writers:

val kinesis = ManagedT(KinesisWriter.acquire)(_.close)
def event(k: KinesisWriter) = ManagedT(EventWriter.acquire(k))(_.close)

The monad smell grows stronger as we see that the form of event is KinesisWriter => ManagedT[Task, EventWriter] - a Kleisli arrow for ManagedT[Task, ?]. Let’s write the pure and flatMap functions required to write the Monad instance.

def pure[F[_], R](r: R)(implicit F: MonadError[F, Throwable]) = 
  ManagedT(r.pure[F])(_ => F.unit)

What pure does for ManagedT is lift a regular value into ManagedT with no cleanup action. That’s simple enough. flatMap is next. We should look at the expected signature first:

def flatMap[F[_], R1, R2](fr1: ManagedT[F, R1])(fr2: R1 => ManagedT[F, R2])(
  implicit F: MonadError[F, Throwable]): ManagedT[F, R2]

This says that we must be able to use a managed R1 in order to create a managed R2. That sounds like nesting resources. Here’s a first try:

scala> def flatMap[F[_], R1, R2](fr1: ManagedT[F, R1])(fr2: R1 => ManagedT[F, R2])(
     |   implicit F: MonadError[F, Throwable]): ManagedT[F, R2] =
     |   ManagedT {
     |     fr1 { r1 =>
     |       fr2(r1)(???)
     |     }
     |   } { r2 => ??? }
<console>:22: error: type mismatch;
 found   : R1 => F[Nothing]
 required: R1 => F[A]
           fr1 { r1 =>
                    ^
<console>:22: error: type mismatch;
 found   : F[Nothing]
 required: F[R]
Note: Nothing <: R, but type F is invariant in type _.
You may wish to define _ as +_ instead. (SLS 4.5)
           fr1 { r1 =>
               ^

This is pretty awkward. We’re really contorting ourselves to fit the composed ManagedT into the existing constructor. What we really want is to create a new instance of ManagedT[F, R2] that will reuse fr1 and fr2 upon execution. We can rearrange the class layout again to make this possible:

abstract class ManagedT[F[_], R] {
  def apply[A](use: R => F[A]): F[A]
}
object ManagedT {
  def apply[F[_], R](acquire: => F[R])(cleanup: R => F[Unit])(
    implicit F: MonadError[F, Throwable]): ManagedT[F, R] = 
    new ManagedT[F, R] {
      def apply[A](use: R => F[A]): F[A] = bracket(acquire)(use)(cleanup)
    }
}

Again, purely mechanical. Instead of capturing acquire and cleanup in the class constructor, we capture them when creating an anonymous instance in the ManagedT.apply function. We can now actually skip on providing acquire and cleanup; this is great for flatMap:

def flatMap[F[_], R1, R2](fr1: ManagedT[F, R1])(fr2: R1 => ManagedT[F, R2])(
  implicit F: MonadError[F, Throwable]): ManagedT[F, R2] = 
  new ManagedT[F, R2] {
    def apply[A](use: R2 => F[A]): F[A] = 
      fr1 { r1 =>
        fr2(r1) { r2 =>
          use(r2)
        }
      }
  }

The definition basically falls out of the types. We no longer have to provide an acquire or cleanup action; we just delegate to the instances that were passed as arguments ot the function. To convince ourselves that this is doing the right thing, we can trace through what is happening in the following expression:

val composed = flatMap(kinesis)(k => event(k))
val app = composed { eventWriter =>
  Task {
    // do something with eventWriter
    ()
  }
}

The app value is a program that, when executed, will execute the apply function of kinesis (that is, r1 in flatMap) with a use function that is created by applying event to the created KinesisWriter, which is then applied to the supplied app body.

Whew! It’s mind bending, but it works. Since we composed the usage actions as before, in a nested fashion, the cleanup handlers will also be executed correctly, in reverse order of acquisition. The ManagedT data type and its monad instance for cats is available here.

Yay, composition!

With just the 25 lines that it takes to define the monad instance, we get a whole host of incredible combinators. We’ve seen flatMap - that means we can do for-comprehensions with ManagedT values. But that’s mundane, so I won’t bore you with that.

How about applicative composition? You’ve got two independent resources, and want to use them? Sure thing:


trait KafkaWriter {
  def close: Task[Unit] = Task.unit
}
object KafkaWriter {
  def acquire: Task[KafkaWriter] = Task(new KafkaWriter {})
}
import com.iravid.managedt.ManagedT
// import com.iravid.managedt.ManagedT

val zipped = (ManagedT(KinesisWriter.acquire)(_.close), 
              ManagedT(KafkaWriter.acquire)(_.close)).tupled
// zipped: com.iravid.managedt.ManagedT[monix.eval.Task,(KinesisWriter, KafkaWriter)] = com.iravid.managedt.ManagedT$$anon$1$$anon$5@218a2462

zipped will acquire and release both the KafkaWriter and KinesisWriter properly. Incidentally, it will do so in order of declaration, but that shouldn’t matter as they are independent. Need to initialize a list of resources of a size only known at runtime? No worries, we’ve got you covered:

def acquireNamed(name: String): Task[KafkaWriter] = KafkaWriter.acquire
// acquireNamed: (name: String)monix.eval.Task[KafkaWriter]

val writers = List("a", "b", "c") traverse acquireNamed
// writers: monix.eval.Task[List[KafkaWriter]] = Task.FlatMap$837147664

Yes, they’ll be acquired as they are specified in the list, and released in reverse order. That trick also works for Option and Either, if you’ve got a resource that should only conditionally be initialized.

There’s also a Monoid instance for ManagedT, if you’ve got a resource that has a Monoid instance. You can combine the managed values as much as you like, and the cleanup actions will still be executed:

def resource(i: Int) = ManagedT(Task { 
  println(s"Acquiring ${i}")
  i 
})(_ => Task(println(s"Releasing $i")))
// resource: (i: Int)com.iravid.managedt.ManagedT[monix.eval.Task,Int]

val squashed = (1 to 5).toList.foldMap(resource)
// squashed: com.iravid.managedt.ManagedT[monix.eval.Task,Int] = com.iravid.managedt.ManagedT$$anon$1$$anon$5@37df48a4

val sum = squashed(sum => Task(println(s"Got $sum")))
// sum: monix.eval.Task[Unit] = Task.FlatMap$2133665162

import monix.execution.Scheduler.Implicits.global
// import monix.execution.Scheduler.Implicits.global

import scala.concurrent.Await, scala.concurrent.duration._
// import scala.concurrent.Await
// import scala.concurrent.duration._

Await.result(sum.runAsync, 1.second)
// Acquiring 1
// Acquiring 2
// Acquiring 3
// Acquiring 4
// Acquiring 5
// Got 15
// Releasing 5
// Releasing 4
// Releasing 3
// Releasing 2
// Releasing 1

It’s amazing that 25 lines unlock so much power. This is what I like about functional programming: it’s a programmer’s dream come true; write a measly amount of code, get back a boatload of reusable functionality.

Summary

ManagedT is usable today as a way to elegantly compose resource acquisition in your application. There are a few caveats, though; they are detailed in the README, but the biggest one is that MonadError cannot be used if the underlying F is an effect that models cancellable computations. monix.eval.Task supports cancellation, so all bets are off if you cancel your cleanup handlers.

cats-effect is about to add the MonadBracket typeclass. When that happens, the MonadError constraint will be replaced with that and we should get stronger guarantess for the cleanup actions.

Enjoy!