Skip to main content

Asynchronous event publishing using Spring Boot

· 6 min read

I’ve created a sample Spring Boot app that demonstrates useful scenario of asynchronous publish / subscribe model. It could be useful in many ways. Idea is that, for example, you have some API handling some web requests, and apart from functionality it provides, it also publishes some event, so that some other thread can react upon it, with main functionality still being processed with request handling thread. Or even shorter – request handling thread will do it’s job, and publishes some event (using Java annotation), and some other background thread will subscribe to event and process it. You can use it for scenario you register user with your API, and publish user registered event, and in background you can subscribe to it by sending out e-mail to end user.

Easiest way to start your application could be running it via spring-boot:run Maven goal:

mvn spring-boot:run

That will start our demo app on standard 8080 port.

Demo app contains two APIs, get user and update user. Those are just dummy implementations. It also has a User domain model class.

You can then, using command line and curl tool, issue the get user request (I’m just doing pipe to jq tool for nicer response rendering – you can omit that part):

curl -s 'http://localhost:8080/users/1' | jq .
{
"id": 1,
"age": 33,
"name": "Some name"
}

Application code will show something like:

2017-04-23 11:46:59.940 INFO 7421 --- [nio-8080-exec-1] de.odalinho.async.PublishingController : PublishingController::Thread.currentThread().getName() = http-nio-8080-exec-1
2017-04-23 11:46:59.941 INFO 7421 --- [pool-1-thread-3] d.o.a.m.UserNotificationPublisher : UserNotificationPublisher::Thread.currentThread().getName() = pool-1-thread-3
2017-04-23 11:46:59.941 INFO 7421 --- [pool-1-thread-3] d.o.a.m.UserNotificationPublisher : UserNotificationPublisher::event.getMessage() = User(id=1, age=33, name=Some name)

You can see that Tomcat’s request handling thread named http-nio-8080-exec-1 is handling web request, whereas pool-1-thread-3 is handling background event processing. For the remaining, update user API call, similar to the first one, we’re issuing command:

curl -s \
-X POST \
-H 'Accept: application/json' \
-H 'Content-Type: application/json' \
http://localhost:8080/users/1 \
-d '{"id":1,"age":25,"name":"updated-name"}' | jq .

{
"id": 1,
"age": 25,
"name": "updated-name"
}

Application log will show something similar to:

2017-04-23 11:47:17.747 INFO 7421 --- [nio-8080-exec-2] de.odalinho.async.PublishingController : PublishingController::Thread.currentThread().getName() = http-nio-8080-exec-2
2017-04-23 11:47:17.749 INFO 7421 --- [pool-1-thread-3] d.o.a.m.UserNotificationPublisher : UserNotificationPublisher::Thread.currentThread().getName() = pool-1-thread-3
2017-04-23 11:47:17.749 INFO 7421 --- [pool-1-thread-3] d.o.a.m.UserNotificationPublisher : UserNotificationPublisher::event.getMessage() = User(id=1, age=25, name=updated-name)

From the application standpoint, you need PublishingConfiguration config class to configure Spring’s interceptor and background processing. I’m creating thread pool of size 5 for background processing purposes.

In de.odalinho.async.messaging package there’s all required infrastructure to support publish / subscribe functionality. PublishEventAdvisor component is Spring’s AOP component, which is basically just a mapper saying we would like to intercept PublishEvent annotation annotated methods using PublishEventAnnotationProcessor interceptor class. PublishEventAnnotationProcessor uses reflection to fulfill all this magic functionality.

There’s also PublishingController that is simple dummy API to demonstrate functionality:

package de.odalinho.async;

import de.odalinho.async.messaging.PublishEvent;
import de.odalinho.async.model.User;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

import static de.odalinho.async.messaging.Event.USER_CREATED;
import static de.odalinho.async.messaging.Event.USER_UPDATED;

@RestController
@Slf4j
public class PublishingController {

@PublishEvent(event = USER_CREATED)
@GetMapping("/users/{userId}")
User getUser(@PathVariable int userId) {
val user = new User().setId(userId).setAge(33).setName("Some name");
log.info("PublishingController::Thread.currentThread().getName() = " + Thread.currentThread().getName());
return user;
}

@PublishEvent(event = USER_UPDATED)
@PostMapping(value = "/users/{userId}", consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
User updateUser(@PathVariable int userId, @RequestBody User user) {
log.info("PublishingController::Thread.currentThread().getName() = " + Thread.currentThread().getName());
return user;
}
}

You could notice these API methods are annotated with@PublishEvent(event = USER_CREATED). Framework will execute this method and return value of method (User model) will be published and available to UserNotificationPublisher, which is background component handling all events in general. It receives UserNotificationEvent which contains getMessage() API which returns our User model.

You can extend this by creating new, custom Events and annotating your API methods with @PublishEvent annotation with your new Event type.

Demo app is using Lombok library to get rid of some boilerplate Java code required.

You can checkout source code from GitHub

That was all for today! Hope you liked it!