Kotlin, Spring Reactive, EventSourcing, and CQRS with PostgreSQL, Kafka, MongoDB, tracing and monitoring 👋✨💫
👨‍💻 Full list of what has been used:
Spring Boot
Spring WebFlux
PostgreSQL with R2DBC The Reactive Relational Database Connectivity driver
Spring Data MongoDB Web and API-based SMTP testing
Kafka as messages broker
Zipkin open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
Spring Cloud Sleuth for distributed tracing
Swagger OpenAPI 3 The library for OpenAPI 3
Flywaydb for migrations
Source code you can find in the GitHub repository. The main idea of this project is the implementation of Event Sourcing and CQRS using Kotlin, Spring, Reactive WebFlux, Postgresql with Kafka for event store and Mongo for read projections. Previously have written the same articles where implemented the same microservice using Go and EventStoreDB, Go EventSourcing using PostgreSQL, and Spring, as written before, repeat, think EventStoreDB is the best choice for event sourcing, but in real life at some projects, we usually have business restrictions and for example usage of the EventStoreDB can be not allowed, in this case, think Postgres and Kafka is good alternative for implementing our own event store. If you don’t familiar with Event Sourcing and CQRS patterns, the best place to read is microservices.io, blog and documentation of eventstore site is very good too, and highly recommend Alexey Zimarev “Hands-on Domain-Driven Design with .NET Core” book.
In this project, we have a microservice with event store implemented by using PostgreSQL and Kafka, as read databases for projections MongoDB. Some descriptions in this article repeat previously because there is another implementation by Postgres and Kafka but the idea is the same.
All UI interfaces will be available on ports:
Swagger UI: http://localhost:8000/webjars/swagger-ui/index.html
Zipkin UI: http://localhost:9411
Prometheus UI: http://localhost:9090
Grafana UI: http://localhost:3005
Docker compose file for this project:
version: "3.9"
services:
es_postgesql:
image: postgres:14.4
container_name: es_postgesql
expose:
- "5432"
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=bank_accounts
- POSTGRES_HOST=5432
command: -p 5432
volumes:
- ./microservices_pgdata:/var/lib/postgresql/data
networks: [ "microservices" ]
zookeeper:
image: 'bitnami/zookeeper:3.8.0'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
volumes:
- "./zookeeper:/zookeeper"
networks: [ "microservices" ]
kafka:
image: 'bitnami/kafka:3.2.0'
ports:
- "9092:9092"
- "9093:9093"
volumes:
- "./kafka_data:/bitnami"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
networks: [ "microservices" ]
mongodb:
image: mongo:latest
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin
MONGODB_DATABASE: microservices
ports:
- "27017:27017"
volumes:
- ./mongodb_data_container:/data/db
networks: [ "microservices" ]
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]
node_exporter:
container_name: node_exporter_container
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "microservices" ]
grafana:
container_name: grafana_container
restart: always
image: grafana/grafana
ports:
- '3005:3000'
networks: [ "microservices" ]
zipkin:
image: openzipkin/zipkin:latest
restart: always
container_name: zipkin
ports:
- "9411:9411"
networks: [ "microservices" ]
volumes:
mongodb_data_container:
networks:
microservices:
name: microservices
In Event Sourcing we are storing the history of all the actions that have occurred to an entity and deriving the state from that, it is possible to read back through that history in order to establish what the state was at a given point in time. It is a pattern for storing data as events in an append-only log.
Every new event is a change. The AggregateRoot should keep track of all the changes that happen during the command execution flow, so we can persist those changes in the command handler. Aggregates take the current state, verify the business rules for the particular operation and apply the business logic that returns the new state. The important part of this process is storing all or nothing. All aggregated data needs to be saved successfully. If one rule or operation fails then the whole state change is rejected. AggregateRoot can be implemented in different ways, the main methods is load events — apply and raise changes. When we fetch the aggregate from the database, instead of reading its state as one record in a table or document, we read all events that were saved before and call when method for each. After all these steps, we will recover all the history of a given aggregate. By doing this, we will be bringing our aggregate to its latest state.
An event represents a fact that took place in the domain. They are the source of truth, the current state is derived from the events. Events are immutable and represent the business facts. In Event Sourcing, each operation made on the aggregate should result with a new event. An event represents a fact that took place in the domain. They are the source of truth, your current state is derived from the events. They are immutable and represent the business facts. It means that we never change or remove anything in the database, and we only append new events.
Snapshots are the representation of the current state at a certain “point in time”. If we follow the Event Sourcing pattern literally, we need to get all these transactions to calculate the current account’s balance. This won’t be efficient. Your first thought to make this more efficient may be caching the latest state somewhere. Instead of retrieving all these events, we could retrieve one record and use it for our business logic. This is a Snapshot. The general logic: read the snapshot (if it exists), then read events from the EventStore, if a snapshot exists, read events since the last stream revision of which snapshot was created, otherwise, read all events. In our microservice, we are storing the snapshot of every N number of events. Snapshots can be not needed as performance may be good enough.
EventStore is a key element of a system. Each change that took place in the domain is recorded in the database. It is specifically designed to store the history of changes, the state is represented by the append-only log of events. The events are immutable: they cannot be changed. Implementation of AggregateStore is Load, Save, and Exists methods. Load and Save accept aggregate then load or apply events using EventStoreDB client. The Load method: find out the stream name for an aggregate, read all the events from the aggregate stream, loop through all the events, and call the RaiseEvent handler for each of them. And the Save method persists aggregates by saving the history of changes, and handling concurrency, when you retrieve a stream from EventStore, you take note of the current version number, then when you save it back you can determine if somebody else has modified the record in the meantime.
And implementation using R2DBC and PostgresSQL:
Load aggregate method:
load snapshot:
load events:
save aggregate:
save events:
and save snapshot:
For event serialization and deserialization we need implementation of the Serializer interface:
serializer implementation for bank account aggregate:
For the next step let’s create a BankAccountAggregate:
Our microservice accepts HTTP requests: The bank account REST controller, which accepts requests, validates them using hibernate validator, then call a command or query service. The main reason for CQRS gaining popularity is the ability to handle reads and writes separately due to severe differences in optimization techniques for those much more distinct operations.
Swagger open API create method:
Spring WebFlux REST controller supports suspend functions:
The main attribute of a command is that when the command gets successfully executed, the system transitions to a new state. Command handlers are responsible for handling commands, mutating state or doing other side effects. The command service handle cqrs commands, load aggregate from event store and call its methods depend on business logic of the application, aggregate applies these changes, and then we save these events changes list in event store.
Bank account command service:
In Event Sourcing, Projections (also known as View Models or Query Models) provide a view of the underlying event-based data model. Often they represent the logic of translating the source write model into the read model. The idea is that the projection will receive all the events that it is able to project and will do the normal CRUD operations on the read model it controls, using the normal CRUD operations provided by the read model database. Projections aren’t limited to only processing events of a single entity and can assemble and aggregate data for multiple entities, even for different types of entities. Events appended in the event store trigger the projection logic that creates or updates the read model. We can subscribe to our projections for order-type stream events. When we execute a command, the aggregate generates a new event that represents the state transitions of the aggregate. Those events are committed to the store, so the store appends them to the end of the aggregate stream. A projection receives these events and updates its read models, using When method, like aggregate it applies changes depending on the event type:
Bank Account Mongo Subscription using kafka:
implementing When method, handle events and apply changes like aggregate then save it using mongodb repository:
Queries in CQRS represent the intention to get data and are responsible for returning the result of the requested query. The read model can be but doesn’t have to be, derived from the write model. It’s a transformation of the results of the business operation into a readable form. One of the great outcomes of having an event-sourced system is the ability to create new read models at will, at any time, without affecting anything else. Then we can retrieve projection data using query:
Bank account query service:
Bank account Mongo repository methods using official Spring Data MongoDB, used ReactiveMongoOperations here and not ReactiveMongoRepository because of its example and personally prefer ReactiveMongoTemplate or ReactiveMongoOperations, but its up to you which to choose:
More details and source code of the full project you can find here, of course in real-world applications, business logic and infrastructure code is much more complicated and we have to implement many more necessary features. I hope this article is usefully and helpfully, and be happy to receive any feedback or questions, feel free to contact me by email or any messengers :)