Skip to Content

Reactive Spring Boot with Kotlin Coroutines

Kotlin Coroutines make it even easier to write Reactive Spring Boot applications

Posted on
Photo by Boris Smokrovic on Unsplash
Photo by Boris Smokrovic on Unsplash

Over two years ago, I wrote about how easy it is to write reactive servers using Spring Boot and Kotlin. A lot has changed since then, and while the code I wrote then would still be fine today, there is now a better way. Thanks to the introduction of Kotlin coroutines and its integration with Spring Boot, it is even easier than before to write reactive Spring Boot applications with Kotlin. In this post, I will rewrite my original counter application using more up to date tools - Spring Boot 2.3, Kotlin coroutines, and Redis.

If you want to jump straight to the code, you can check out the repository for this post on GitHub .

What’s Different This Time?

One of the reasons I’ve avoided writing too many reactive servers is the fact that I find them harder to reason about and explain. Coroutines, on the other hand, let us write non-blocking code using an imperative style. In our previous version of the counter, we used the Reactor libraries directly - dealing with Mono, Flux, and subscriptions to them. The new version of our counter still uses Reactor, but instead of using it directly, we will use Kotlin Coroutines and let the integration between Reactor and Coroutines handle the complicated stuff for us. The result will be code that is easier to reason about, more succinct, and less verbose.

Finally, thanks to updates in Spring Data Redis, we’re able to use Redis publish/subscribe in a reactive way this time. In the version we wrote two years ago, the Redis publish/subscribe commands were blocking and I opted to make the application handle this in memory. Offloading the event publication and subscription to Redis allows us to simplify our application and scale it out if we ever needed to. Before, we were limited to one node because the stream as all in memory on a single node. Unfortunately, we still have a race condition where the stream may not reflect the events in the order they were sent, but they should all be there.

The Plan

We’re going to rebuild the counter we wrote two years ago using a Spring/Reactor integration with Kotlin Coroutines to make it look cleaner. The requirements are identical to last time.

Users must be able to…

  1. Check the value of the counter.
  2. Increment the counter by one.
  3. Decrement the counter by one.
  4. Watch a stream of updates to the counter.

And technical requirements are…

  1. The microservice must communicate via REST.
  2. The service must be reactive and nonblocking.
  3. The counter should be effectively infinite in either direction (+/-).

The resulting high level architecture looks similar:

Architecture Diagram

Our technology stack remains mostly the same, just with upgraded versions:

  1. Java 11 (Java 8 will work fine though)
  2. Spring Boot 2.3 (2.2 is fine, but 2.3 just came out so let’s stay current!)
  3. Kotlin 1.2.72
  4. Redis
I am going to assume you have Redis running on localhost on its default port, 6379. I run mine in a Docker container.

The API

Since our requirements aren’t that large, I’m going to skip the formal definition of the API which I would normally use OpenAPI for. Instead, we’ll refer to this handy table:

Purpose URI HTTP Method Accept Header
Current state of the counter / GET application/json
Stream of counter events / GET text/event-stream
Increment the counter /up PUT application/json
Decrement the counter /down PUT application/json

Getting Started

We’ll use the Spring Initializr to get our project off the ground. This is a great way to start any Spring project as it gives you the minimal amount of setup that just works. It includes all of the dependencies and build settings you need. While I am using the web interface here, you can do this right from your IDE if you want.

Spring Initializr

We will pick options to generate a project for us that will be written in Kotlin, using Spring Boot 2.3.0 (latest as of this writing), Gradle (this is a personal preference, feel free to use Maven, it makes no real difference), and Java 11 (Again, my preference. Java 8 or 14 will both work fine). We declare two dependencies, both reactive - Spring Reactive Web and Spring Data Reactive Redis.

You can click here for a pre-configured link so you don’t have to enter it all yourself. Generate the project and import it into our IDE and we’re ready to get started!

Let’s Start Coding!

Once we’ve opened the project the Spring Initializr has created for us, we will open up application.properties and turn on pretty printing for JSON output. You can skip this if you don’t mind the more compact default format. But since I’m going to be capturing sample output to show you below, I’ll turn this on.

// In application.properties

# Pretty-print JSON
spring.jackson.serialization.indent_output=true

Data Transfer Objects

We’ll use a simple Kotlin class with properties as return objects from our API. I see a lot of developers instinctively define DTOs as data classes but these are plain classes because we don’t need any of the autogenerated functions that come with data classes. It probably wouldn’t hurt to define them as data classes, but I like to send a signal to myself that these aren’t being used for anything other than an eventual conversion to JSON (e.g. not used in comparisons).

Three of our API resources return the current state of the counter, so we will define a CounterState class for that purpose. CounterState encapsulates the current time and the current value of the counter. It also has a function to turn a state into an event.

class CounterState(
    val value: Long,
    val at: ZonedDateTime = ZonedDateTime.now(ZoneId.of("UTC"))
) {
    fun toEvent(action: CounterAction) = CounterEvent(value, action, at)
}

In order to become an event, we need to add an action to the state. In this case, an enum called CounterAction. Once we have that, we can define our CounterEvent, which looks a lot like CounterState, but with an action associated with it.

enum class CounterAction { UP, DOWN }

class CounterEvent(
    val value: Long,
    val action: CounterAction,
    val at: ZonedDateTime = ZonedDateTime.now(ZoneId.of("UTC"))
)

I resisted the urge to make CounterAction descend from CounterState because these objects are both so small and I didn’t want to make this code look confusing.

Talking to Redis

Let’s get some plumbing code out of the way and define a ReactiveRedisTemplate bean which will allow us to communicate with Redis using non-blocking/reactive functions. We can define this in our main class:

@SpringBootApplication
class CounterApplication {

    @Bean
    fun reactiveRedisTemplate(
        connectionFactory: ReactiveRedisConnectionFactory,
        objectMapper: ObjectMapper
    ): ReactiveRedisTemplate<String, CounterEvent> {

        val valueSerializer = Jackson2JsonRedisSerializer(CounterEvent::class.java).apply {
            setObjectMapper(objectMapper)
        }

        return ReactiveRedisTemplate(
            connectionFactory,
            newSerializationContext<String, CounterEvent>(StringRedisSerializer())
                .value(valueSerializer)
                .build()
        )
    }
}

This code will provide us with a ReactiveRedisTemplate that uses String for its keys and CounterEvent converted to JSON strings as values. If we didn’t have the requirement to stream events when the counter changed, we could avoid some of this code to serialize values as the Spring Data Redis library would just use Long for the counter anyway. Defining this class gives us a bean that we can inject into our repository to talk to Redis reactively.

The Repository

Our data repository will perform two main functions: management of the counter (current state, up, and down) and management of the event stream as a result of counter changes. We will also have the ability to read the event stream through the repository.

Thanks to Spring Data and Kotlin coroutines, this code is surprisingly simple and written in an imperative style:

@Repository
class CounterRepository(
    private val redisTemplate: ReactiveRedisTemplate<String, CounterEvent>
) {

    suspend fun get(): CounterState =
        CounterState(redisTemplate.opsForValue().incrementAndAwait(COUNTER_KEY, 0L))

    suspend fun up(): CounterState =
        CounterState(redisTemplate.opsForValue().incrementAndAwait(COUNTER_KEY)).also {
            redisTemplate.sendAndAwait(COUNTER_CHANNEL, it.toEvent(CounterAction.UP))
        }

    suspend fun down(): CounterState =
        CounterState(redisTemplate.opsForValue().decrementAndAwait(COUNTER_KEY)).also {
            redisTemplate.sendAndAwait(COUNTER_CHANNEL, it.toEvent(CounterAction.DOWN))
        }

    suspend fun stream(): Flow<CounterEvent> =
        redisTemplate.listenToChannelAsFlow(COUNTER_CHANNEL).map { it.message }

    companion object {
        private const val COUNTER_CHANNEL = "COUNTER_CHANNEL"
        private const val COUNTER_KEY = "COUNTER"
    }
}

The first thing to notice in this repository is that each function is marked with suspend and none of the functions returns a Mono or a Flux like you would normally see in a reactive application. They are still there under the covers, but the Spring/Reactor/Coroutines integration code is taking care of the translation for us. The result is a much simpler implementation.

The only place we really have to think about suspension is when we ask Redis to perform an operation. For example when we ask Redis to increment a value for us, there is a brief period of time where Redis is doing its work and we are waiting around. In a non-reactive application our thread would block and prevent other work from being scheduled. Thanks to Kotlin Coroutines, we suspend instead. Suspending allows another coroutine that needs to get work done to have a turn using the underlying thread. Once our coroutine is ready to unsuspend it will run again, with the same state it had before, and will continue on its way. We don’t have to do anything explicit to achieve this, Kotlin Coroutines does all of the work for us.

Given that we need to suspend whenever we interact with Redis, we’ll call some extension functions provided by the Kotlin/Reactor/Coroutines integration code. For example, incrementAndAwait(key) is equivalent to increment(key).awaitSingle(), it looks a lot cleaner. Any place you see AndAwait in the code above is an extension function provided to make life easier. We could instead call the existing reactive code which returns a Mono<Long> and call awaitSingle() on it (again, using an extension function provided for us). Keep that in mind that if you run into a reactive library that uses Project Reactor but doesn’t ship with handy extension functions like ...AndAwait, you can define your own.

If we look at the requirements, we need to send a CounterEvent whenever we change the counter state. To do this, we publish an event on a Redis channel after the counter is updated in up and down. We do this in an also call so the whole function is one expression. It should be noted that there is a slight race condition here, making it possible that the stream does not contain events strictly in the order that they happened. It is possible for an event to be in the stream out of order. But they should all be there, and I’m happy with that for a project of this size.

One other thing to notice in this implementation is that the return type for stream() is a Kotlin Flow rather than a reactive Flux. We’re still getting the same effect, but to simplify it greatly, the Flow is the Kotlin equivalent of Project Reactor’s Flux.

Before we move on, there are two small implementation details I want to go over. First, I couldn’t find a nice way to get the state of the counter, that’s why we increment by zero. Second, we could probably leave off the types here and let the compiler infer them but I left them on for reader clarity.

The Controller

We’re nearly done! The controller is the last thing we need to worry about. There are several Spring-sanctioned ways to write this kind of code, and I’ve chosen one that more closely matches the Spring MVC style (rather than a functional router). Thanks to the fact that the repository is doing most of the real work, our controller is responsible for handling requests and serializing responses:

@RestController
class CounterController(private val repo: CounterRepository) {

    @GetMapping("/")
    suspend fun get(): CounterState = repo.get()

    @PutMapping("/up")
    suspend fun up(): CounterState = repo.up()

    @PutMapping("/down")
    suspend fun down(): CounterState = repo.down()

    @GetMapping(value = ["/"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    suspend fun stream(): Flow<CounterEvent> = repo.stream()
}

Again, notice how each function is marked with suspend and returns a plain CounterState rather than a Mono<CounterState>. The Spring/Coroutines integration is doing all of the reactive conversions under the covers for us! This is great because we can write straight forward imperative code without really having to write so much reactive boilerplate.

Using The Counter

While I have provided a test suite to prove this all works, let’s do some manual testing. For this, I’m going to use curl, the commands for which are shown in each example.

First, getting the state of the counter:

$ curl localhost:8080

{
  "value" : 3,
  "at" : "2020-05-24T23:07:42.292Z"
}

Increment:

$ curl -X PUT localhost:8080/up

{
  "value" : 4,
  "at" : "2020-05-24T23:09:34.966Z"
}

Decrement:

curl -X PUT localhost:8080/down

{
  "value" : 3,
  "at" : "2020-05-24T23:10:06.51Z"
}

Listening to the stream of updates. You’ll have to open another command line and use the increment and decrement commands to make the counter do something. There is no replay which means if we subscribe to the counter stream, we only see events published after we subscribe.

$ curl -H Accept:text/event-stream localhost:8080/

data:{
data:  "value" : 4,
data:  "action" : "UP",
data:  "at" : "2020-05-24T23:11:18.96Z"
data:}

data:{
data:  "value" : 3,
data:  "action" : "DOWN",
data:  "at" : "2020-05-24T23:11:19.974Z"
data:}

data:{
data:  "value" : 4,
data:  "action" : "UP",
data:  "at" : "2020-05-24T23:11:20.638Z"
data:}

data:{
data:  "value" : 5,
data:  "action" : "UP",
data:  "at" : "2020-05-24T23:11:21.228Z"
data:}


Conclusion

Thanks to updates in Spring Data Redis and the integration of Kotlin Coroutines into Spring’s reactive libraries, our application is much simpler than it was two years ago. I’m excited to see where we end up in two more years!