Skip to Content

Reactive Spring Boot 2.0 and Kotlin

A reactive microservice using Spring Boot 2 and Kotlin

Posted on
Photo by Slava Bowman on Unsplash
Photo by Slava Bowman on Unsplash

Update: This has been updated to work with Spring Boot 2.0.0.RELEASE and Kotlin 1.2.20. And since start.spring.io now adds a library we were manually adding to our build, I have removed those instructions.

Welcome! Lately I’ve been using Kotlin to play with Spring Boot 2.0’s reactive web libraries. I thought I would share some of the things I’ve learned in the hopes that it helps others.

For this post, I’m going to assume you have a cursory understanding of reactive programming (you know what a Mono and a Flux are), as well as a bit about Kotlin. I’m also going to assume that you have Redis set up and running on its default port (6379) on localhost as I will be using the fact that Spring Boot will auto-configure it with sensible defaults. I just run this in a container, but you can do whatever works for you.

If you want to jump straight in, the code for this is in my GitHub repo, feel free to download it and follow along.

The Plan

We are going to write a very simple reactive microservice. Just about the simplest service you can think of: a controllable counter. It’s an integer that users can either increase, decrease, or just watch. I know you’re thinking “But Todd, that’s boring”. True, but sometimes boring is good. By reducing the business logic to something we all learned in first grade, we can spend our mental energy learning about more exciting things like reactive streams, and Kotlin.

We’re going to be using Spring Boot 2.0.0 and Kotlin 1.2.20.

Before we get into coding, let’s review our requirements:

Users must be able to…

  1. Check the value of the counter.
  2. Increment the counter by one.
  3. Decrement the counter by one.
  4. Watch a stream of updates to the counter.

And technical requirements are…

  1. The microservice must communicate via REST.
  2. The service must be reactive and nonblocking.
  3. The counter should be effectively infinite in either direction (+/-).

More complicated things such as a user interface and usage statistics can all come later, so we will depend on curl and automated tests to make sure this all works properly for now.

When I say `effectively infinite`, I mean that I'm OK with the counter over or under flowing at some point. A Long on the JVM is a 64 bit signed integer. That means we will have to click it 1,000,000 times a second for nearly 300,000 years for anybody to notice. Personally, I'm OK with that. If some far distant civilization depends on The Counter and runs into this, I'm deeply sorry for my short-sightedness.

How Does Reactive Help Us Here?

With The Counter, there are several places where we could block waiting on something to happen. Interacting with Redis, or waiting for the event stream to produce the next event are two examples. Instead of blocking on those things, why don’t we handle everything in a asynchronous non-blocking manner? By using a reactive library here (such as Spring with Reactor), we can do much more work with the same resources.

Having said all that, when writing a fancy new microservice ask yourself if you really need a reactive system. In a lot of cases, you’ll probably find that you don’t really need it. Or your code looks more complicated with it. Or most unfortunately, there isn’t a reactive JDBC driver available (YET?!) so you’ll lose the benefits of reactive anyway.

Design

Now we know our requirements and what a reactive application is, let’s start with a picture of what we’re going to build:

Super Impressive Architecture Diagram

Simple, right? It’s definitely too simple to really be useful as anything other than a training exercise, but that’s the point. As we get more comfortable we can make the architecture more complex and learn more new things.

We could make this even simpler by not using Redis at all and storing the value of the counter in an AtomicLong, but I want to show how easy it is to consume the reactive libraries that Spring Boot provides.

Setting Things Up

First, head on over to start.spring.io, a website that you can use to get a custom made template for your Spring Boot projects. Pick your preferred build type (I picked Gradle, but Maven is also available), language (Kotlin!) and Spring Boot version. It’s important to pick at least Spring Boot 2.0.0, as that will give us access to the reactive web libraries.

After making those settings changes, the only thing you have to do is add the Reactive Web and Reactive Redis dependencies and name your project.

Here is what my screen looked like when I generated this project:

start.spring.io

If you’ve never been to start.spring.io, I encourage you to explore the wide array of options you have by selecting the “Switch to the full version” link. Then you can see all of the configuration choices and dependencies available.

This is going to crank out a zip file for you, so go ahead and get that unpacked and imported into your IDE. I use IntelliJ so this was quite easy, but you could really use anything.

Let’s Start Coding!

Before we start in on The Counter, let’s make a configuration change to make our life better.

Open application.properties and add these lines:

# Pretty-print JSON
spring.jackson.serialization.indent_output=true

This is to turn on pretty printing for JSON output. It’s subjective and I just find it easier to read if it is indented nicely. Do what works for you.

Configuration

Because our application is so simple, there’s not a lot of configuration to do. The only thing I really had to do was create a ReactiveRedisTemplate, which we use to interact with Redis. The ReactiveRedisConnectionFactory is created for us, thanks to the Spring Data Redis starter we included before. Spring has sensible defaults but the overrides are fully documented and comprehensive if you need them.

@Configuration
class Redis {

    @Bean
    fun template(factory: ReactiveRedisConnectionFactory): ReactiveRedisTemplate<String, String> =
        ReactiveRedisTemplate(factory, RedisSerializationContext.string())
}

I should point out that this uses the standard Spring Boot @Configuration class that you are used to. However, there is a BeanDefinitionDsl that is worth looking into. Personally, I still find this easier to read for now (see below for conflicting opinions on that!), but you might think differently.

Controller Later - Routes

Here’s a quick view of the RESTful endpoint we’re going to need to satisfy the requirements.

Purpose URI HTTP Method Accept Header
Current state of the counter /api/counter GET application/json
Stream of counter events /api/counter GET text/event-stream
Increment the counter /api/counter/up PUT application/json
Decrement the counter /api/counter/down PUT application/json

With Spring MVC, we’re all used to having a @Controller or @RestController class. And we could do that here as well if we were interested in an imperative approach. Instead, I’m going to be using the functional Spring Reactive Web method, which is to define a router function. This appeals to me because when I write controllers, I like them to do just about nothing. Validate and route requests to a service layer - that’s it.

@Bean
fun counterRouter() = router {
    "/api/counter".nest {
        accept(MediaType.APPLICATION_JSON).nest {
            GET("/", counterHandler::get)
            PUT("/up", counterHandler::up)
            PUT("/down", counterHandler::down)
        }
        accept(MediaType.TEXT_EVENT_STREAM).nest {
            GET("/", counterHandler::stream)
        }
    }
}

So what’s up with that syntax? That’s the RouterFunctionDsl at work! I find this very easy to read, all of my API endpoints are in one place, and the function being called to serve that endpoint (more on that below) is clear. You don’t have to limit yourself to one router function, define as many as you find appropriate and Spring will weave them all together. Because this example is so simple, it really doesn’t show of the full power so check out some examples.

Although it’s not clear from this example, this is fully reactive because each of those routes returns a Mono<ServerResponse>, which you will see below. This router is going to be our controller layer. Nice and simple, not a lot of boilerplate, what’s happening is clear for whoever has to maintain this code in the future.

Service Layer - ControllerHandler

Because we are using functional routes rather than imperative controller methods, we’ll continue that down to our service layer, which will be implemented as a series of handler functions. They take a ServerRequest, do their work, and return a Mono<ServerResponse>. Even if the response is a Flux of something (remember: flux == stream), we’re still only returning one single response, that’s why its always a Mono.

I’ll highlight a representative sample of what’s going on in our handler.

fun get(serverRequest: ServerRequest): Mono<ServerResponse> =
    ServerResponse
        .ok()
        .body(
            counterRepository
                .get()
                .map { CounterState(it) }
        )

All of the functions in this class use the ServerResponse builder to generate a Mono<ServerResponse>. You can really specify everything here from headers to cookies, but since our requirements are simple, so are our handler functions. In this case, we’re asking the CounterRepository for the current value (which is a Mono<Long> because we’re fully reactive and might have to wait on the call to/from Redis) and mapping it to a data class called CounterState.

The CounterState data class is very simple. Data classes are one of the very best features of Kotlin, if you ask me.

data class CounterState(val value: Long, val asOf: LocalDateTime = LocalDateTime.now())

With that one line of code we have what you’d normally have your IDE write for you when defining a POJO - getters for each field (these are vals, so they are final/immutable), a toString(), hashcode(), and equals() function. There’s also a copy method, but we won’t really need it. In our case, the time has a default value because most of the time, that’s what we want. However, if we want to override it for testing, we can.

The increment and decrement functions are similar, but they pass the value on to the EventBus, which is injected into our handler and we will see more of later. The EventBus is the object that handles constructing the Flux of change events.

fun up(serverRequest: ServerRequest): Mono<ServerResponse> =
    ServerResponse.ok()
        .body(
            counterRepository
                .up()
                .map { CounterState(it) }
                .doOnNext { eventBus.publish(CounterUp(it.value)) }
        )

This looks a lot like our get function, don’t you think? Go to Redis, do some work (see Repository below), and map the answer into a CounterState data object. However in our up and down functions, we pass that value along to the EventBus as well, turning our CounterState into either a CounterUp or CounterDown data class, so consumers can see what kind of event happened, not just the state of the counter. This is all handled asynchronously. If we end up having to wait at any point, we don’t have to worry about it. The calls will all resolve themselves eventually.

And finally, the function that handles the event stream mostly just delegates to the EventBus, by subscribing to it.

fun stream(serverRequest: ServerRequest): Mono<ServerResponse> =
    ServerResponse
        .ok()
        .bodyToServerSentEvents(eventBus.subscribe())

One interesting thing to note is the body and bodyToServerSentEvents functions - they are provided by Spring as Kotlin functions. Because inline functions in Kotlin can have reified types, we don’t have to specify any of that in our code, it’s all inferred! If this were Java, we would have to pass an ugly class file or ParameterizedTypeReference along. Yay, brevity!

Repository Layer

The actual code of the repository doesn’t do much thanks to Spring doing most of the heavy lifting for us. The ReactiveRedisTemplate gets injected to our Repository in the constructor, along with whatever we’ve configured for the key to store it under. As you can see, each of the functions we’re ultimately calling to talk to Redis requires a ByteBuffer, so we make that conversion from String to ByteBuffer once and store it as a private val.

@Repository
class RedisCounterRepository(private val redisTemplate: ReactiveRedisTemplate<String, String>,
                             @Value("\${redis.counter.key:THE_COUNTER}") keyName: String) : CounterRepository {

    private val key = ByteBuffer.wrap(StringRedisSerializer().serialize(keyName))

    override fun up(): Mono<Long> =
        redisTemplate.createMono { it.numberCommands().incr(key) }

    override fun down(): Mono<Long> =
        redisTemplate.createMono { it.numberCommands().decr(key) }

    override fun get(): Mono<Long> =
        redisTemplate.createMono { it.numberCommands().incrBy(key, 0L) }
}

Each of the functions will create a Mono<Long>, the Long being the value of the counter after we perform an operation, or check its value. Again, the code to make the call to Redis and return the result as a Mono is all done for us, we have the luxury of clearly seeing where our business logic is here without much boilerplate. Seven lines of code for all that work isn’t bad!

Why do I have an interface (CounterRepository) for something simple with one implementation? Because when we get to testing we will want to provide another in-memory implementation of this code so we don’t have to boot Redis for testing.

The EventBus

The EventBus is responsible for streaming events to consumers, updating whenever a change is made to the counter. In our case, we’re going to define a ReplayProcessor, which allows us to cache some of the events. That way, when a new consumer joins the stream they don’t have to wait for new events to happen, they’ll have some recent history to view.

@Service
class EventBus(@Value("\${events.replay.size:16}") replaySize: Int = 16) {

    private val eventProcessor: ReplayProcessor<CounterEvent> = ReplayProcessor.create(replaySize)

    fun publish(event: CounterEvent) {
        eventProcessor.onNext(event)
    }

    fun subscribe(): Flux<CounterEvent> =
        Flux.merge(eventProcessor)
            .onBackpressureDrop()

}

Any new events that come in via our publish() method are just added to the replay processor by calling onNext. Additionally, when we have a new consumer subscribe to the event stream we crate a new Flux<CounterEvent> and merge it with the replay processor. If one of our consumers is too slow, we just drop events. This backpressure can be handled in any number of ways, including to buffer them. But since updates to The Counter aren’t critical if consumers are being flooded (sorry again future civilizations), I don’t mind just dropping them. Just be aware that there are many ways of handling these situations, depending on your use case.

Don’t Get Run Over By The Event Bus

If you’ve looked at the code for our stream of counter events, you’ll notice that we manually create events with the new counter value after getting that back from Redis. This is easy to add to a small demo like this, but in reality this would cause serious problems and you’d never use an implementation like this in production.

First, updates only come from the server that generated them. So if The Counter sees a frenzy of activity and you scale out by adding more instances, you will never see updates from the other servers. Another drawback is that events might not be in order, even if you do only have one service running. Because generating the new counter value and emitting it to the event stream happen non-atomically, there’s a race condition where updates to the stream can happen out of order.

But because this is a demo and I shrewdly wrote the requirements in a way to avoid any sort of guarantees, this is fine for now. Perhaps if I have time, I’ll write an update to this post with a better implementation of the event stream.

Using The Counter

Looking at our requirements, we’re just asked to create a RESTful service that we can use curl to interact with. Here are all of our endpoints, doing what we expect them to.

Let’s boot our server from the command line and play with it!

> gradle bootRun

Get the state of the counter

Pretty simple, current state of the counter (372) and when I asked for the data.

// curl http://localhost:8080/api/counter
{
  "value" : 372,
  "asOf" : "2017-11-30T16:11:24.4716191"
}

Increment and decrement the counter

A simple PUT to alter the state of the counter and receive the current state back in return.

// curl -X PUT http://localhost:8080/api/counter/up
{
  "value" : 373,
  "asOf" : "2017-11-30T16:11:37.3301673"
}
// curl -X PUT http://localhost:8080/api/counter/down
{
  "value" : 372,
  "asOf" : "2017-11-30T16:11:58.2711877"
}

Watch the counter events

You’ll have to open two terminal windows for this one. Increment and Decrement the counter in one window and you should see a stream of related in the other. We have to set the Accept header here in order to trigger the correct route, otherwise we will get the non-stream (point in time) version. Notice that our events also have the action performed, either up or down. You should see something like this…

// curl -H Accept:text/event-stream http://localhost:8080/api/counter
data:{
data:  "value" : 373,
data:  "type" : "up",
data:  "at" : "2017-11-30T16:11:37.3301673"
data:}

data:{
data:  "value" : 372,
data:  "type" : "down",
data:  "at" : "2017-11-30T16:11:58.2721698"
data:}

Integration Testing

For our integration tests, we depend heavily on the Spring WebClient, a new component in Spring Reactive Web. Think of this as a reactive-capable replacement for RestTemplate. For increment/decrement, assuming we already know the current state of the counter (perhaps in a @Before method), we can easily test that our increment call works:

val state = /* Get state here, removed for brevity */

@Test
fun `increment counter`() {
    val event: String? = webClient.put()
        .uri("/api/counter/up")
        .exchange()
        .expectStatus().isOk
        .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
        .expectBody(String::class.java)
        .returnResult()
        .responseBody

    assertThat(event).isNotNull()
    assertThat(JsonPath.read<Long>(event, "$.value", null)).isEqualTo(state.value.inc())
}

Notice that I’m using my favorite testing library, AssertJ, which is also included in our project thanks to our Spring starter.

I found that testing the stream was a lot harder than I had anticipated, and some of that probably has to do with getting my head around the reactive concepts.

@Test
@DirtiesContext(methodMode = DirtiesContext.MethodMode.BEFORE_METHOD) // Because of ReplayProcessor state.
fun `stream receives events`() {

    // Send in an event, so the stream has something to emit or the next part
    // has trouble when using WebTestClient with this test in isolation for some reason.
    webClient.put().uri("/api/counter/up").exchange()

    val events = webClient.get()
        .uri("/api/counter")
        .accept(MediaType.TEXT_EVENT_STREAM)
        .exchange()
        .expectStatus().isOk
        .expectHeader().contentType(MediaType.TEXT_EVENT_STREAM)
        .returnResult(String::class.java)
        .responseBody

    StepVerifier.create(events)
        .assertNext {
            assertThat(JsonPath.read<String>(it, "$.type", null)).isEqualToIgnoringCase("up")
            assertThat(JsonPath.read<Long>(it, "$.value", null)).isEqualTo(1L)
        }
        .then {
            webClient.put().uri("/api/counter/up").exchange()
        }
        .assertNext {
            assertThat(JsonPath.read<String>(it, "$.type", null)).isEqualToIgnoringCase("up")
            assertThat(JsonPath.read<Long>(it, "$.value", null)).isEqualTo(2L)
        }
        .then {
            webClient.put().uri("/api/counter/down").exchange()
        }
        .assertNext {
            assertThat(JsonPath.read<String>(it, "$.type", null)).isEqualToIgnoringCase("down")
            assertThat(JsonPath.read<Long>(it, "$.value", null)).isEqualTo(1L)
        }
        .thenCancel()
        .verify()
}

See what I mean? Setting up my test seemed like a lot of work, but to me at least, it seems clear what I’m going for:

  1. Make a request to prime the event stream. Subsequent assertions will wait forever otherwise.
  2. Request the event stream.
  3. Make more requests.
  4. Assert that the event stream contains the things we expect.

I suspect if I could get the Mono<ServerResponse> that the CounterHandler emits into a more usable format, these tests could be rewritten as unit tests. As it stands now, I found getting the body out of a ServerResponse to be difficult to find examples on.

What’s Next?

There are plenty of things that could be done to make this project more useful. Providing a user interface, making the EventBus more accurate, or logging the event stream to something durable for analysis. If you use this project as a basis for your own learning and make changes, I’d love to hear about it!

Parting Thoughts

Notice how little Kotlin we got to write? Part of that has to do with the simplicity of our example, but another big part of that is how easy it is to use Spring Boot with Kotlin. Neither have a lot of ceremonial code so our business logic is front and center. And although I only used part of them, the idiomatic DSLs in Spring Boot are very nice and shows a real dedication to Kotlin from the Spring team.

If you are interested in Spring Boot and Kotlin, you don’t have to use bleeding edge 2.0 bits, Spring Boot 1.5.x works great with Kotlin. However, if you want the reactive extensions, you will need Spring Boot 2.0+.

If you’d like to check out the code for this, it’s up on GitHub.

Thanks for reading!

Resources

  1. Code
  2. Kotlin
  3. Spring Boot
  4. Project Reactor (used by Spring Boot)
  5. Spring Framework 5 Kotlin Support from the Spring team on official support for Kotlin in Spring.
  6. The #Spring channel in the Kotlin Slack is a great place to learn.