Using cats with reactive-kafka

Posted on July 7, 2017

At my current job, we’re big fans of using Kafka to replicate data between services. In fact, Kafka is the single most important input mechanism for our services: we structure them using CQRS, so their internal state is determined entirely by the contents of the commands topic. To process these topics, we use Akka Streams, and specifically the wonderful reactive-kafka library.

Having written several services in this form, we realized that although Akka Streams and Reactive Kafka are very capable and expressive, we were still duplicating copious amounts of code: batch committing, structuring graphs for at-least-once processing, etc. Apart from that, we had some issues around conditionally producing to topics - I’ll touch that in a moment.

To get this post going, I’d like to start with a motivating example showing how we use reactive-kafka.

A motivating example

To provide some context, we’re talking about a service that continually performs the following set of actions:

These map nicely to an Akka Streams graph, so let’s see how that looks. First, some imports and definitions:

import akka.Done
import akka.stream.scaladsl._
import akka.kafka.{ 
  ConsumerSettings => AkkaConsumerSettings, 
  ProducerSettings => AkkaProducerSettings, 
  _ 
}
import akka.kafka.scaladsl._
import play.api.libs.json._
import scala.concurrent.{ ExecutionContext, Future }

// The data type we'll deserialize from Kafka
case class Message()
object Message {
  implicit val reads: Reads[Message] = ???
}

// The data type that will result from our validation
case class Result()
object Result {
  def fromMessage(msg: Message): Either[String, Result] = ???
}

// Some sort of storage mechanism
trait Database {
  def write(result: Result): Future[Unit]
}

// We'll always be using byte arrays with Kafka, so there's no point in carrying around the
// big types provided by the library. We'll fix everything to byte arrays.
type ConsumerSettings = AkkaConsumerSettings[Array[Byte], Array[Byte]]
type ProducerSettings = AkkaProducerSettings[Array[Byte], Array[Byte]]
type CommittableMessage = ConsumerMessage.CommittableMessage[Array[Byte], Array[Byte]]
type ProducerMessage[P] = ProducerMessage.Message[Array[Byte], Array[Byte], P]
type ProducerResult[P] = ProducerMessage.Result[Array[Byte], Array[Byte], P]

def toProducerMessage(kafkaMsg: CommittableMessage, result: Result, 
  destTopic: String): ProducerMessage[CommittableMessage] = ???

And the graph itself:

object ProcessingGraph {
  def apply(consumerSettings: ConsumerSettings, producerSettings: ProducerSettings,
    db: Database)(implicit ec: ExecutionContext): RunnableGraph[(Consumer.Control, Future[Done])] = 
    Consumer.committableSource(consumerSettings, Subscriptions.topics("input"))
      // JSON deserialization
      .mapConcat { kafkaMsg =>
        Json.parse(kafkaMsg.record.value).validate[Message] match {
          case JsSuccess(v, _) => List((kafkaMsg, v))
          case e: JsError      => Nil
        }
      }
      // Message validation
      .mapConcat { case (kafkaMsg, msg) => 
        Result.fromMessage(msg) match {
          case Right(result) => List((kafkaMsg, result))
          case Left(err)     => Nil
        }
      }
      // Database write
      .mapAsync(1) {
        case (kafkaMsg, result) => 
          db.write(result).map(_ => (kafkaMsg, result))
      }
      // ProducerMessage creation
      .map {
        case (kafkaMsg, result) => toProducerMessage(kafkaMsg, result, "output")
      }
      // Topic output
      .via(Producer.flow(producerSettings))
      // Message commit
      .mapAsync(1)(_.message.passThrough.committableOffset.commitScaladsl())
      .toMat(Sink.ignore)(Keep.both)
}

There are 2 problems that I’d like to point out with this formulation:

So how do we solve these 2 problems? Let’s tackle them separately.

Creating an optional producer

Before we begin, I must say that we are going to re-implement some functionality from reactive-kafka. As far as I understand, there is no way to do what we want with the current interface provided by the library.

Let’s assume, for simplicity, that our validation procedure returns an Option instead of an Either:

def fromMessage(msg: Message): Option[Result]

To write a message to a Kafka topic, we use the ProducerMessage data type. This type contains the destination topic, partition, the data itself and some more metadata and a passthrough P: a data type that’ll be attached to the producer’s result.

Since Option has a Functor instance, we can use map to convert the Result using toProducerMessage:

val kafkaMsg: CommittableMessage
val maybeResult: Option[Result]
val maybeProducerMessage: Option[ProducerMessage[Result]] = maybeResult.map(toProducerMessage(kafkaMsg, _))

Great! Now, how do we actually write this to a topic? Let’s assume we have a plain old Future-based function for writing:

def produce[P](producerMsg: ProducerMessage[P]): Future[ProducerResult[P]]

How can we use produce on the Option? Let’s pattern match and work our way through the cases:

val result: Future[ProducerResult[P]] = maybeProducerMessage match {
  // This is the easy case - run the function:
  case Some(pm) => produce(pm)
  case None     => // What now?
}

If our desired return type is Future[ProducerResult[P]], we’re in trouble for the None case, as there is no ProducerMessage to write.

We can return a Future.failed(new NoSuchElementException), but if we’re going to use this function with mapAsync from Akka Streams, that means going through stream supervision or using the .recover combinator to catch the specific exception and replace it with a placeholder element.

Possible, but dirty.

Instead, let’s adjust our return type to be Future[Option[ProducerResult[P]]]. This makes more sense, as an optional ProducerMessage means we should get an optional ProducerResult. Here’s the adjusted expression:

val result: Future[Option[ProducerResult[P]]] = maybeProducerMessage match {
  case Some(pm) => produce(pm).map(Some(_))
  case None     => Future.successful(None)
}

Looks much better. mapAsync will unwrap the Future when running this in the stream, so we can continue working with the Option throughout the stream.

This operation is called traverse. We’re taking an Option[ProducerMessage], running a function ProducerMessage => Future[ProducerResult] inside it, and then turning it into a Future[Option[ProducerMessage]].

More abstractly, this works for all F[_], G[_], A, B, where G[_] is an Applicative Functor and F[_] is a Traversable Functor; traverse is then of the form:

def traverse[F[_], G[_], A, B](fa: F[A])(f: A => G[B]): G[F[B]]

These typeclasses are included in cats, so we’ll import the required implicits and use the syntax enrichments:

import cats.implicits._

// Important to have an EC in scope, otherwise the Applicative instance 
// for Future can't be derived
implicit val ec: ExecutionContext = ???

val result: Future[Option[ProducerResult[P]]] = maybeProducerMessage.traverse(produce)

The added benefit is that we can now work with any effect that has a Traverse instance. This includes Either[E, ?], Try, List and even play-json’s JsResult.

We can capture this generic produce in another method:

import cats.Traverse

def produceF[F[_]: Traverse, P](fpm: F[ProducerMessage[P]])(
  implicit ec: ExecutionContext): Future[F[ProducerResult[P]]] = 
  fpm.traverse(produce)

And now, we can use it in our stream without dropping bad messages:

Consumer.committableSource(consumerSettings, Subscriptions.topics("input"))
  // JSON deserialization
  .map { kafkaMsg =>
    (kafkaMsg, Json.parse(kafkaMsg.record.value).validate[Message].asEither)
  }
  // Message validation - changed to keep the Either
  .map { case (kafkaMsg, maybeMsg) => 
    (kafkaMsg, maybeMsg.flatMap(Result.fromMessage))  
  }
  // Database write - changed to use traverse
  .mapAsync(1) {
    case (kafkaMsg, maybeResult) => 
      maybeResult
        .traverse(db.write(_))
        .map(_ => (kafkaMsg, result))
  }
  // ProducerMessage creation
  .map {
    case (kafkaMsg, result) => 
      result.map(toProducerMessage(kafkaMsg, _, "output"))
  }
  // Topic output - changed to use produceF
  .mapAsync(1)(produceF(_))
  // Message commit - changed to use traverse (looks a bit noisier,
  // but this is just boilerplate added by the closures)
  .mapAsync(1) { maybeProducerResult =>
    maybeProducerResult.traverse { producerResult =>
      producerResult.message.passThrough.committableOffset.commitScaladsl()
    }
  }
  .toMat(Sink.ignore)(Keep.both)

Note the changes in the validation, database write, topic output and commit stages.

For completeness, here’s a version of produceF, using the raw KafkaProducer from the kafka-clients package:

def produceF[F[_]: Traverse, T](producer: KafkaProducer, topicName: TopicName,
  message: F[ProducerMessage[T]])(implicit ec: ExecutionContext): Future[F[ProducerResult[T]]] =
  message traverse { t =>
    val promise = Promise[ProducerResult[T]]()

    producer.send(t.record,
      new Callback {
      @SuppressWarnings(Array("org.wartremover.warts.Null"))
      def onCompletion(recordMetadata: RecordMetadata, exception: Exception): Unit =
        if (exception != null)
          promise.failure(exception)
        else
          promise.success((recordMetadata, t.passThrough))
    })

    promise.future
  }

Here, ProducerResult[T] is a tuple of RecordMetadata and T.

Carrying around a context

We can now deal with the other problem - the boilerplate of carrying around the original Kafka message.

To begin with, we can examine the data type we’re working with - (CommittableMessage, Result). As only the second element is changing, we can add a type parameter and see what we get:

type Message[T] = (CommittableMessage, T)

And where there are type parameters, there are (usually) functors, too:

val functor = new Functor[Message] {
  def map[T, U](fa: Message[T])(f: T => U): Message[U] = (fa._1, f(fa._2))
}

It turns out that cats has us covered, and helpfully provides a functor instance for (T, ?):

import cats.implicits._
val ourMsg: Message[Result] = (kafkaMsg, result)
val mapped: Message[String] = ourMsg.map(_.toString)

Now, unless there’s a monoid instance for the left side of the tuple, we can’t write an applicative or monad instance. But we can get a Traverse instance. What does that mean?

val ourMsg: Message[Result] = (kafkaMsg, result)
val mappedFuture: Future[Message[String]] = ourMsg.traverse(r => Future(r.toString))

It means we can carry our context (remember, this is just a tuple of the Kafka message and the value) into the future. Not very exciting, but given the fact that traversable functors compose, we can rewrite our database writing stage more succinctly.

We want to traverse two layers at once, given Message[F[A]] where F[_] has a Traverse instance as well. To make this clear to scalac, we need to use cats.data.Nested, which wraps a value of type F[G[A]]. For example, here’s how it works for Message[Either[String, Result]]:

// Database write - type annotations added for clarity
.mapAsync(1) {
  msg: Message[Either[String, Result]] => // Reminder - (CommittableMessage, Either[String, Result])

    val result: Future[Message[Either[String, Result]]] = 
      Nested(msg).traverse(db.write(_)).map(_.value)

    result
}

The traverse call did the following:

We had to wrap and unwrap the Nested data type, which is unfortunate, but at least that’s tucked away nicely inside the stage. Trying to implicitly derive a Traverse[λ[ɑ => Message[F[ɑ]]]] does not work.

Summary

In retrospect, I think this is a clear example of how constructs such as functors and traversables show up in the most mundane code, and how the surrounding infrastructure from cats can just make tons of boilerplate disappear.

To close the post, here’s how the graph looks like after the improvements:

Consumer.committableSource(consumerSettings, Subscriptions.topics("input"))
  // JSON deserialization
  .map { kafkaMsg =>
    (kafkaMsg, Json.parse(kafkaMsg.record.value).validate[Message].asEither)
  }
  // Message validation
  .map(_.map(_.flatMap(Result.fromMessage)))
  // Database write
  .mapAsync(1)(msg => Nested(msg).traverse(db.write(_)).map(_ => msg))
  // ProducerMessage creation
  .map { case (kafkaMsg, result) => 
    result.map(toProducerMessage(kafkaMsg, _, "output")
  }
  // Topic output
  .mapAsync(1)(produceF(_))
  // Message commit
  .mapAsync(1) { maybeProducerResult =>
    maybeProducerResult.traverse { producerResult =>
      producerResult.message.passThrough.committableOffset.commitScaladsl()
    }
  }
  .toMat(Sink.ignore)(Keep.both)

We still had to dismantle the Message at a few stages, but this can be solved using more specialized stages for producing to topics, committing, etc.

Since this pattern of Akka Streams graphs with reactive-kafka is very common in the services we write, we ended up packaging this in a wrapper library, along with a typeclass mechanism for deserializing and serializing messages, some more useful combinators and syntax enrichments to Akka Streams. These make the above graph more declarative and clear.

This library is not published currently; ping me on Twitter (@itrvd) if there’s interest and we’ll try and open-source it.

Thanks for reading!