Transactional Outbox pattern step by step with Spring and Kotlinđź’«

Alexander Bryksin
17 min readMay 15, 2023

--

The reason why we need Transactional Outbox is that a service often needs to publish messages as part of a transaction that updates the database. Both the database update and the sending of the message must happen within a transaction. Otherwise, if the service doesn’t perform these two operations atomically, a failure could leave the system in an inconsistent state.

The GitHub repository with the source code for this article.

In this article, we will implement it using Reactive Spring and Kotlin with Coroutines. Full list of used dependencies: Kotlin with Coroutines, Spring Boot 3, WebFlux, R2DBC, Postgres, MongoDB, Kafka, Grafana, Prometheus, Zipkin, and Micrometer for observability.

The Transactional Outbox pattern solves the problem of the implementation where usually the transaction tries to update the database table, then publish a message to the broker and commits the transaction. But here is the problem, if at the last step commit of the transaction fails, the transaction will roll back database changes, but the event has been already published to the broker. So we need to find a way how to guarantee both, database writing and publishing to the broker. The idea of how we can solve it is next: in the one transaction, save to the orders table, and in the same transaction, save to the outbox table, and commit the transaction. then we have to publish saved events from the outbox table to the broker, we have two ways to do that, CDC (Change data capture) tool like Debezium, which continuously monitors your databases and lets any of your applications stream every row-level change in the same order they were committed to the database, and Polling publisher, for this project used polling publisher. Highly recommend Chris Richardson Book: Microservices Patterns, where the Transactional Outbox pattern is very well explained.

And one more important thing is we have to be ready for cases when the same event can be published more than one time, so the consumer must be idempotent. Idempotence describes the reliability of messages in a distributed system, specifically the reception of duplicated messages. Because of retries or message broker features, a message sent once can be received multiple times by consumers. A service is idempotent if processing the same event multiple times results in the same state and output as processing that event just a single time. The reception of a duplicated event does not change the application state or behavior. Most of the time, an idempotent service detects these events and ignores them. Idempotence can be implemented using unique identifiers.

So let’s implement it, business logic of our example microservice is simple, orders with product shop items, it’s two tables for simplicity and an outbox table of course. Usually, an outbox table looks like, when at the data field we store serialized events, most common is JSON format, but it’s up to you and concrete microservice, we can put as data field state changes or can simply put every time the last updated full order domain entity, of course, state changes take much less size, but again it’s up to you, other fields in the outbox table usually event type, timestamp, version, and other metadata, it again depends on each concrete implementation, but often it’s required minimum, the version field is for concurrency control.

All UI interfaces will be available on ports:

Swagger UI: http://localhost:8000/webjars/swagger-ui/index.html

Prometheus UI: http://localhost:9090

The docker-compose file for this article has Postgres, MongoDB, zookeeper, Kafka, Kafka-ui, Zipkin, Prometheus, and Grafana,

for local development run: make local or make develop, first run only docker-compose, second same include the microservice image.

version: "3.9"

services:
microservices_postgresql:
image: postgres:latest
container_name: microservices_postgresql
expose:
- "5432"
ports:
- "5432:5432"
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=microservices
- POSTGRES_HOST=5432
command: -p 5432
volumes:
- ./docker_data/microservices_pgdata:/var/lib/postgresql/data
networks: [ "microservices" ]

zoo1:
image: confluentinc/cp-zookeeper:7.3.0
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
volumes:
- "./zookeeper:/zookeeper"
networks: [ "microservices" ]

kafka1:
image: confluentinc/cp-kafka:7.3.0
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
volumes:
- "./kafka_data:/kafka"
networks: [ "microservices" ]

kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "8086:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:19092
networks: [ "microservices" ]

zipkin-all-in-one:
image: openzipkin/zipkin:latest
restart: always
ports:
- "9411:9411"
networks: [ "microservices" ]

mongo:
image: mongo
restart: always
ports:
- "27017:27017"
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin
MONGODB_DATABASE: bank_accounts
networks: [ "microservices" ]

prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
networks: [ "microservices" ]

node_exporter:
container_name: microservices_node_exporter
restart: always
image: prom/node-exporter
ports:
- '9101:9100'
networks: [ "microservices" ]

grafana:
container_name: microservices_grafana
restart: always
image: grafana/grafana
ports:
- '3000:3000'
networks: [ "microservices" ]


networks:
microservices:
name: microservices

The Postgres database schema for this project is:

Orders domain REST Controller has the following methods:

@RestController
@RequestMapping(path = ["/api/v1/orders"])
class OrderController(private val orderService: OrderService, private val or: ObservationRegistry) {

@GetMapping
@Operation(method = "getOrders", summary = "get order with pagination", operationId = "getOrders")
suspend fun getOrders(
@RequestParam(name = "page", defaultValue = "0") page: Int,
@RequestParam(name = "size", defaultValue = "20") size: Int,
) = coroutineScopeWithObservation(GET_ORDERS, or) { observation ->
ResponseEntity.ok()
.body(orderService.getAllOrders(PageRequest.of(page, size))
.map { it.toSuccessResponse() }
.also { response -> observation.highCardinalityKeyValue("response", response.toString()) }
)
}

@GetMapping(path = ["{id}"])
@Operation(method = "getOrderByID", summary = "get order by id", operationId = "getOrderByID")
suspend fun getOrderByID(@PathVariable id: String) = coroutineScopeWithObservation(GET_ORDER_BY_ID, or) { observation ->
ResponseEntity.ok().body(orderService.getOrderWithProductsByID(UUID.fromString(id)).toSuccessResponse())
.also { response ->
observation.highCardinalityKeyValue("response", response.toString())
log.info("getOrderByID response: $response")
}
}

@PostMapping
@Operation(method = "createOrder", summary = "create new order", operationId = "createOrder")
suspend fun createOrder(@Valid @RequestBody createOrderDTO: CreateOrderDTO) = coroutineScopeWithObservation(CREATE_ORDER, or) { observation ->
ResponseEntity.status(HttpStatus.CREATED).body(orderService.createOrder(createOrderDTO.toOrder()).toSuccessResponse())
.also {
log.info("created order: $it")
observation.highCardinalityKeyValue("response", it.toString())
}
}

@PutMapping(path = ["/add/{id}"])
@Operation(method = "addProductItem", summary = "add to the order product item", operationId = "addProductItem")
suspend fun addProductItem(
@PathVariable id: UUID,
@Valid @RequestBody dto: CreateProductItemDTO
) = coroutineScopeWithObservation(ADD_PRODUCT, or) { observation ->
ResponseEntity.ok().body(orderService.addProductItem(dto.toProductItem(id)))
.also {
observation.highCardinalityKeyValue("CreateProductItemDTO", dto.toString())
observation.highCardinalityKeyValue("id", id.toString())
log.info("addProductItem id: $id, dto: $dto")
}
}

@PutMapping(path = ["/remove/{orderId}/{productItemId}"])
@Operation(method = "removeProductItem", summary = "remove product from the order", operationId = "removeProductItem")
suspend fun removeProductItem(
@PathVariable orderId: UUID,
@PathVariable productItemId: UUID
) = coroutineScopeWithObservation(REMOVE_PRODUCT, or) { observation ->
ResponseEntity.ok().body(orderService.removeProductItem(orderId, productItemId))
.also {
observation.highCardinalityKeyValue("productItemId", productItemId.toString())
observation.highCardinalityKeyValue("orderId", orderId.toString())
log.info("removeProductItem orderId: $orderId, productItemId: $productItemId")
}
}

@PutMapping(path = ["/pay/{id}"])
@Operation(method = "payOrder", summary = "pay order", operationId = "payOrder")
suspend fun payOrder(@PathVariable id: UUID, @Valid @RequestBody dto: PayOrderDTO) = coroutineScopeWithObservation(PAY_ORDER, or) { observation ->
ResponseEntity.ok().body(orderService.pay(id, dto.paymentId).toSuccessResponse())
.also {
observation.highCardinalityKeyValue("response", it.toString())
log.info("payOrder result: $it")
}
}

@PutMapping(path = ["/cancel/{id}"])
@Operation(method = "cancelOrder", summary = "cancel order", operationId = "cancelOrder")
suspend fun cancelOrder(@PathVariable id: UUID, @Valid @RequestBody dto: CancelOrderDTO) = coroutineScopeWithObservation(CANCEL_ORDER, or) { observation ->
ResponseEntity.ok().body(orderService.cancel(id, dto.reason).toSuccessResponse())
.also {
observation.highCardinalityKeyValue("response", it.toString())
log.info("cancelOrder result: $it")
}
}

@PutMapping(path = ["/submit/{id}"])
@Operation(method = "submitOrder", summary = "submit order", operationId = "submitOrder")
suspend fun submitOrder(@PathVariable id: UUID) = coroutineScopeWithObservation(SUBMIT_ORDER, or) { observation ->
ResponseEntity.ok().body(orderService.submit(id).toSuccessResponse())
.also {
observation.highCardinalityKeyValue("response", it.toString())
log.info("submitOrder result: $it")
}
}

@PutMapping(path = ["/complete/{id}"])
@Operation(method = "completeOrder", summary = "complete order", operationId = "completeOrder")
suspend fun completeOrder(@PathVariable id: UUID) = coroutineScopeWithObservation(COMPLETE_ORDER, or) { observation ->
ResponseEntity.ok().body(orderService.complete(id).toSuccessResponse())
.also {
observation.highCardinalityKeyValue("response", it.toString())
log.info("completeOrder result: $it")
}
}
}

As typed earlier the main idea of implementation for the transactional outbox is at the first step in the one transaction write to orders and outbox tables and commit the transaction, additional, but not required optimization, we can in the same methods after successfully committed a transaction, then publish the event and delete it from the outbox table, but here if any one step of publishing to the broker or deleting from the outbox table fails, it’s ok, because we have polling producer as a scheduled process, which anyway will do that, again it’s small optimization and improvement, and it’s not mandatory to implement an outbox pattern, so do it or not, it’s up to you, try both variants and chose the best for your case. In our case we use Kafka, so we have to remember that producers have acks setting,

When acks=0 producers consider messages as “written successfully” the moment the message was sent without waiting for the broker to accept it at all. If the broker goes offline or an exception happens, we won’t know and will lose data, so be careful with this setting and don’t use acks=0.

When acks=1, producers consider messages as “written successfully” when the message was acknowledged by only the leader.

When acks=all, producers consider messages as “written successfully” when the message is accepted by all in-sync replicas (ISR).

In the Simplified sequence diagram for service layer business logic, steps 5 and 6 are optional and not required optimization, because we have polling publisher anyway:

The order service implementation:

interface OrderService {
suspend fun createOrder(order: Order): Order
suspend fun getOrderByID(id: UUID): Order
suspend fun addProductItem(productItem: ProductItem)
suspend fun removeProductItem(orderID: UUID, productItemId: UUID)
suspend fun pay(id: UUID, paymentId: String): Order
suspend fun cancel(id: UUID, reason: String?): Order
suspend fun submit(id: UUID): Order
suspend fun complete(id: UUID): Order

suspend fun getOrderWithProductsByID(id: UUID): Order
suspend fun getAllOrders(pageable: Pageable): Page<Order>

suspend fun deleteOutboxRecordsWithLock()
}
@Service
class OrderServiceImpl(
private val orderRepository: OrderRepository,
private val productItemRepository: ProductItemRepository,
private val outboxRepository: OrderOutboxRepository,
private val orderMongoRepository: OrderMongoRepository,
private val txOp: TransactionalOperator,
private val eventsPublisher: EventsPublisher,
private val kafkaTopicsConfiguration: KafkaTopicsConfiguration,
private val or: ObservationRegistry,
private val outboxEventSerializer: OutboxEventSerializer
) : OrderService {

override suspend fun createOrder(order: Order): Order = coroutineScopeWithObservation(CREATE, or) { observation ->
txOp.executeAndAwait {
orderRepository.insert(order).let {
val productItemsEntityList = ProductItemEntity.listOf(order.productsList(), UUID.fromString(it.id))
val insertedItems = productItemRepository.insertAll(productItemsEntityList).toList()

it.addProductItems(insertedItems.map { item -> item.toProductItem() })

Pair(it, outboxRepository.save(outboxEventSerializer.orderCreatedEventOf(it)))
}
}.run {
observation.highCardinalityKeyValue("order", first.toString())
observation.highCardinalityKeyValue("outboxEvent", second.toString())

publishOutboxEvent(second)
first
}
}

override suspend fun addProductItem(productItem: ProductItem): Unit = coroutineScopeWithObservation(ADD_PRODUCT, or) { observation ->
txOp.executeAndAwait {
val order = orderRepository.findOrderByID(UUID.fromString(productItem.orderId))
order.incVersion()

val updatedProductItem = productItemRepository.upsert(productItem)

val savedRecord = outboxRepository.save(
outboxEventSerializer.productItemAddedEventOf(
order,
productItem.copy(version = updatedProductItem.version).toEntity()
)
)

orderRepository.updateVersion(UUID.fromString(order.id), order.version)
.also { result -> log.info("addOrderItem result: $result, version: ${order.version}") }

savedRecord
}.run {
observation.highCardinalityKeyValue("outboxEvent", this.toString())
publishOutboxEvent(this)
}
}

override suspend fun removeProductItem(orderID: UUID, productItemId: UUID): Unit = coroutineScopeWithObservation(REMOVE_PRODUCT, or) { observation ->
txOp.executeAndAwait {
if (!productItemRepository.existsById(productItemId)) throw ProductItemNotFoundException(productItemId)

val order = orderRepository.findOrderByID(orderID)
productItemRepository.deleteById(productItemId)

order.incVersion()

val savedRecord = outboxRepository.save(outboxEventSerializer.productItemRemovedEventOf(order, productItemId))

orderRepository.updateVersion(UUID.fromString(order.id), order.version)
.also { log.info("removeProductItem update order result: $it, version: ${order.version}") }

savedRecord
}.run {
observation.highCardinalityKeyValue("outboxEvent", this.toString())
publishOutboxEvent(this)
}
}

override suspend fun pay(id: UUID, paymentId: String): Order = coroutineScopeWithObservation(PAY, or) { observation ->
txOp.executeAndAwait {
val order = orderRepository.getOrderWithProductItemsByID(id)
order.pay(paymentId)

val updatedOrder = orderRepository.update(order)
Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderPaidEventOf(updatedOrder, paymentId)))
}.run {
observation.highCardinalityKeyValue("order", first.toString())
observation.highCardinalityKeyValue("outboxEvent", second.toString())

publishOutboxEvent(second)
first
}
}

override suspend fun cancel(id: UUID, reason: String?): Order = coroutineScopeWithObservation(CANCEL, or) { observation ->
txOp.executeAndAwait {
val order = orderRepository.findOrderByID(id)
order.cancel()

val updatedOrder = orderRepository.update(order)
Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderCancelledEventOf(updatedOrder, reason)))
}.run {
observation.highCardinalityKeyValue("order", first.toString())
observation.highCardinalityKeyValue("outboxEvent", second.toString())

publishOutboxEvent(second)
first
}
}

override suspend fun submit(id: UUID): Order = coroutineScopeWithObservation(SUBMIT, or) { observation ->
txOp.executeAndAwait {
val order = orderRepository.getOrderWithProductItemsByID(id)
order.submit()

val updatedOrder = orderRepository.update(order)
updatedOrder.addProductItems(order.productsList())

Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderSubmittedEventOf(updatedOrder)))
}.run {
observation.highCardinalityKeyValue("order", first.toString())
observation.highCardinalityKeyValue("outboxEvent", second.toString())

publishOutboxEvent(second)
first
}
}

override suspend fun complete(id: UUID): Order = coroutineScopeWithObservation(COMPLETE, or) { observation ->
txOp.executeAndAwait {
val order = orderRepository.findOrderByID(id)
order.complete()

val updatedOrder = orderRepository.update(order)
log.info("order submitted: ${updatedOrder.status} for id: $id")

Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderCompletedEventOf(updatedOrder)))
}.run {
observation.highCardinalityKeyValue("order", first.toString())
observation.highCardinalityKeyValue("outboxEvent", second.toString())

publishOutboxEvent(second)
first
}
}

@Transactional(readOnly = true)
override suspend fun getOrderWithProductsByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_WITH_PRODUCTS_BY_ID, or) { observation ->
orderRepository.getOrderWithProductItemsByID(id).also { observation.highCardinalityKeyValue("order", it.toString()) }
}

override suspend fun getAllOrders(pageable: Pageable): Page<Order> = coroutineScopeWithObservation(GET_ALL_ORDERS, or) { observation ->
orderMongoRepository.getAllOrders(pageable).also { observation.highCardinalityKeyValue("pageResult", it.toString()) }
}

override suspend fun deleteOutboxRecordsWithLock() = coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_WITH_LOCK, or) { observation ->
outboxRepository.deleteOutboxRecordsWithLock {
observation.highCardinalityKeyValue("outboxEvent", it.toString())
eventsPublisher.publish(getTopicName(it.eventType), it)
}
}

override suspend fun getOrderByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_BY_ID, or) { observation ->
orderMongoRepository.getByID(id.toString())
.also { log.info("getOrderByID: $it") }
.also { observation.highCardinalityKeyValue("order", it.toString()) }
}

private suspend fun publishOutboxEvent(event: OutboxRecord) = coroutineScopeWithObservation(PUBLISH_OUTBOX_EVENT, or) { observation ->
try {
log.info("publishing outbox event: $event")

outboxRepository.deleteOutboxRecordByID(event.eventId!!) {
eventsPublisher.publish(getTopicName(event.eventType), event.aggregateId.toString(), event)
}

log.info("outbox event published and deleted: $event")
observation.highCardinalityKeyValue("event", event.toString())
} catch (ex: Exception) {
log.error("exception while publishing outbox event: ${ex.localizedMessage}")
observation.error(ex)
}
}
}

Order and product items Postgres repositories are a combination of CoroutineCrudRepository and custom implementation using DatabaseClient and R2dbcEntityTemplate, supporting optimistic and pessimistic locking, depending on method requirements.

@Repository
interface OrderRepository : CoroutineCrudRepository<OrderEntity, UUID>, OrderBaseRepository

@Repository
interface OrderBaseRepository {
suspend fun getOrderWithProductItemsByID(id: UUID): Order
suspend fun updateVersion(id: UUID, newVersion: Long): Long
suspend fun findOrderByID(id: UUID): Order
suspend fun insert(order: Order): Order
suspend fun update(order: Order): Order
}

@Repository
class OrderBaseRepositoryImpl(
private val dbClient: DatabaseClient,
private val entityTemplate: R2dbcEntityTemplate,
private val or: ObservationRegistry
) : OrderBaseRepository {

override suspend fun updateVersion(id: UUID, newVersion: Long): Long = coroutineScopeWithObservation(UPDATE_VERSION, or) { observation ->
dbClient.sql("UPDATE microservices.orders SET version = (version + 1) WHERE id = :id AND version = :version")
.bind(ID, id)
.bind(VERSION, newVersion - 1)
.fetch()
.rowsUpdated()
.awaitSingle()
.also { log.info("for order with id: $id version updated to $newVersion") }
.also {
observation.highCardinalityKeyValue("id", id.toString())
observation.highCardinalityKeyValue("newVersion", newVersion.toString())
}
}

override suspend fun getOrderWithProductItemsByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_WITH_PRODUCTS_BY_ID, or) { observation ->
dbClient.sql(
"""SELECT o.id, o.email, o.status, o.address, o.version, o.payment_id, o.created_at, o.updated_at,
|pi.id as productId, pi.price, pi.title, pi.quantity, pi.order_id, pi.version as itemVersion, pi.created_at as itemCreatedAt, pi.updated_at as itemUpdatedAt
|FROM microservices.orders o
|LEFT JOIN microservices.product_items pi on o.id = pi.order_id
|WHERE o.id = :id""".trimMargin()
)
.bind(ID, id)
.map { row, _ -> Pair(OrderEntity.of(row), ProductItemEntity.of(row)) }
.flow()
.toList()
.let { orderFromList(it) }
.also {
log.info("getOrderWithProductItemsByID order: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}

override suspend fun findOrderByID(id: UUID): Order = coroutineScopeWithObservation(FIND_ORDER_BY_ID, or) { observation ->
val query = Query.query(Criteria.where(ID).`is`(id))
entityTemplate.selectOne(query, OrderEntity::class.java).awaitSingleOrNull()?.toOrder()
.also { observation.highCardinalityKeyValue("order", it.toString()) }
?: throw OrderNotFoundException(id)
}

override suspend fun insert(order: Order): Order = coroutineScopeWithObservation(INSERT, or) { observation ->
entityTemplate.insert(order.toEntity()).awaitSingle().toOrder()
.also {
log.info("inserted order: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}

override suspend fun update(order: Order): Order = coroutineScopeWithObservation(UPDATE, or) { observation ->
entityTemplate.update(order.toEntity()).awaitSingle().toOrder()
.also {
log.info("updated order: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}
}
interface ProductItemBaseRepository {
suspend fun insert(productItemEntity: ProductItemEntity): ProductItemEntity
suspend fun insertAll(productItemEntities: List<ProductItemEntity>): List<ProductItemEntity>
suspend fun upsert(productItem: ProductItem): ProductItem
}

@Repository
class ProductItemBaseRepositoryImpl(
private val entityTemplate: R2dbcEntityTemplate,
private val or: ObservationRegistry,
) : ProductItemBaseRepository {

override suspend fun upsert(productItem: ProductItem): ProductItem = coroutineScopeWithObservation(UPDATE, or) { observation ->
val query = Query.query(
Criteria.where("id").`is`(UUID.fromString(productItem.id))
.and("order_id").`is`(UUID.fromString(productItem.orderId))
)

val product = entityTemplate.selectOne(query, ProductItemEntity::class.java).awaitSingleOrNull()
if (product != null) {
val update = Update
.update("quantity", (productItem.quantity + product.quantity))
.set("version", product.version + 1)
.set("updated_at", LocalDateTime.now())

val updatedProduct = product.copy(quantity = (productItem.quantity + product.quantity), version = product.version + 1)
val updateResult = entityTemplate.update(query, update, ProductItemEntity::class.java).awaitSingle()
log.info("updateResult product: $updateResult")
log.info("updateResult updatedProduct: $updatedProduct")
return@coroutineScopeWithObservation updatedProduct.toProductItem()
}

entityTemplate.insert(ProductItemEntity.of(productItem)).awaitSingle().toProductItem()
.also { productItem ->
log.info("saved productItem: $productItem")
observation.highCardinalityKeyValue("productItem", productItem.toString())
}
}

override suspend fun insert(productItemEntity: ProductItemEntity): ProductItemEntity = coroutineScopeWithObservation(INSERT, or) { observation ->
val product = entityTemplate.insert(productItemEntity).awaitSingle()

log.info("saved product: $product")
observation.highCardinalityKeyValue("product", product.toString())
product
}

override suspend fun insertAll(productItemEntities: List<ProductItemEntity>) = coroutineScopeWithObservation(INSERT_ALL, or) { observation ->
val result = productItemEntities.map { entityTemplate.insert(it) }.map { it.awaitSingle() }
log.info("inserted product items: $result")
observation.highCardinalityKeyValue("result", result.toString())
result
}
}

The outbox repository, important detail here is to be able to handle the case of multiple pod instances processing in parallel outbox table, of course, we have idempotent consumers, but as we can, we have to avoid processing the same table events more than one time, to prevent multiple instances select and publish the same events, we use here FOR UPDATE SKIP LOCKED — this combination does the next thing, when one instance tries to select a batch of outbox events if some other instance already selected these records, first, one will skip locked records and select the next available and not locked, and so on.

FOR UPDATE SKIP LOCKED
@Repository
interface OutboxBaseRepository {
suspend fun deleteOutboxRecordByID(id: UUID, callback: suspend () -> Unit): Long
suspend fun deleteOutboxRecordsWithLock(callback: suspend (outboxRecord: OutboxRecord) -> Unit)
}

class OutboxBaseRepositoryImpl(
private val dbClient: DatabaseClient,
private val txOp: TransactionalOperator,
private val or: ObservationRegistry,
private val transactionalOperator: TransactionalOperator
) : OutboxBaseRepository {

override suspend fun deleteOutboxRecordByID(id: UUID, callback: suspend () -> Unit): Long =
coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_BY_ID, or) { observation ->
withTimeout(DELETE_OUTBOX_RECORD_TIMEOUT_MILLIS) {
txOp.executeAndAwait {

callback()

dbClient.sql("DELETE FROM microservices.outbox_table WHERE event_id = :eventId")
.bind("eventId", id)
.fetch()
.rowsUpdated()
.awaitSingle()
.also {
log.info("outbox event with id: $it deleted")
observation.highCardinalityKeyValue("id", it.toString())
}
}
}
}

override suspend fun deleteOutboxRecordsWithLock(callback: suspend (outboxRecord: OutboxRecord) -> Unit) =
coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_WITH_LOCK, or) { observation ->
withTimeout(DELETE_OUTBOX_RECORD_TIMEOUT_MILLIS) {
txOp.executeAndAwait {

dbClient.sql("SELECT * FROM microservices.outbox_table ORDER BY timestamp ASC LIMIT 10 FOR UPDATE SKIP LOCKED")
.map { row, _ -> OutboxRecord.of(row) }
.flow()
.onEach {
log.info("deleting outboxEvent with id: ${it.eventId}")

callback(it)

dbClient.sql("DELETE FROM microservices.outbox_table WHERE event_id = :eventId")
.bind("eventId", it.eventId!!)
.fetch()
.rowsUpdated()
.awaitSingle()

log.info("outboxEvent with id: ${it.eventId} published and deleted")
observation.highCardinalityKeyValue("eventId", it.eventId.toString())
}
.collect()
}
}
}
}

The polling producer implementation is a scheduled process that does the same job for publishing and deleting events at the given interval as typed earlier and uses the same service method:

@Component
@ConditionalOnProperty(prefix = "schedulers", value = ["outbox.enable"], havingValue = "true")
class OutboxScheduler(private val orderService: OrderService, private val or: ObservationRegistry) {

@Scheduled(initialDelayString = "\${schedulers.outbox.initialDelayMillis}", fixedRateString = "\${schedulers.outbox.fixedRate}")
fun publishAndDeleteOutboxRecords() = runBlocking {
coroutineScopeWithObservation(PUBLISH_AND_DELETE_OUTBOX_RECORDS, or) {
log.debug("starting scheduled outbox table publishing")
orderService.deleteOutboxRecordsWithLock()
log.debug("completed scheduled outbox table publishing")
}
}

companion object {
private val log = LoggerFactory.getLogger(OutboxScheduler::class.java)
private const val PUBLISH_AND_DELETE_OUTBOX_RECORDS = "OutboxScheduler.publishAndDeleteOutboxRecords"
}
}

Usually, the transactional outbox is more often required to guarantee data consistency between microservices, here, for example, consumers in the same microservice process it and save it to MongoDB, the one more important detail here, as we’re processing Kafka events in multiple consumer processes, possible use cases when the order of the events processing can be randomized, in Kafka, we have a keys feature, and it helps us because it sends messages with the same key to one partition. But if the broker has not had this feature, we have to handle it manually, cases when for example fist some of the consumers trying to process event #6 before events #4, and #5 were processed, so, for this reason, have a domain entity version field in outbox events, so we can simply look at the version and validate if in our database we have order version #3, but now processing event with version #6, we need first wait for #4,#5 and process them first, but of course, these details depend on each concrete business logic of the application, here shows only the idea that it’s a possible case. And one more important detail — is retry topics, if we need to retry the process of the messages, better to create a retry topic and process retry here, how much time to retry, and other advanced logic detail depending on your concrete case. In the example, we have two listeners, where one of them is for retry topic message processing:

@Component
class OrderConsumer(
private val kafkaTopicsConfiguration: KafkaTopicsConfiguration,
private val serializer: Serializer,
private val eventsPublisher: EventsPublisher,
private val orderEventProcessor: OrderEventProcessor,
private val or: ObservationRegistry,
) {

@KafkaListener(
groupId = "\${kafka.consumer-group-id:order-service-group-id}",
topics = [
"\${topics.orderCreated.name}",
"\${topics.productAdded.name}",
"\${topics.productRemoved.name}",
"\${topics.orderPaid.name}",
"\${topics.orderCancelled.name}",
"\${topics.orderSubmitted.name}",
"\${topics.orderCompleted.name}",
],
id = "orders-consumer"
)
fun process(ack: Acknowledgment, consumerRecord: ConsumerRecord<String, ByteArray>) = runBlocking {
coroutineScopeWithObservation(PROCESS, or) { observation ->
try {
observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord))

processOutboxRecord(serializer.deserialize(consumerRecord.value(), OutboxRecord::class.java))
ack.acknowledge()

log.info("committed record: ${getConsumerRecordInfo(consumerRecord)}")
} catch (ex: Exception) {
observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord))
observation.error(ex)

if (ex is SerializationException || ex is UnknownEventTypeException || ex is AlreadyProcessedVersionException) {
log.error("ack not serializable, unknown or already processed record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
ack.acknowledge()
return@coroutineScopeWithObservation
}

if (ex is InvalidVersionException || ex is NoSuchElementException || ex is OrderNotFoundException) {
publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, 1)
ack.acknowledge()
log.warn("ack concurrency write or version exception ${ex.localizedMessage}")
return@coroutineScopeWithObservation
}

publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, 1)
ack.acknowledge()
log.error("ack exception while processing record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}", ex)
}
}
}


@KafkaListener(groupId = "\${kafka.consumer-group-id:order-service-group-id}", topics = ["\${topics.retryTopic.name}"], id = "orders-retry-consumer")
fun processRetry(ack: Acknowledgment, consumerRecord: ConsumerRecord<String, ByteArray>): Unit = runBlocking {
coroutineScopeWithObservation(PROCESS_RETRY, or) { observation ->
try {
log.warn("processing retry topic record >>>>>>>>>>>>> : ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord))

processOutboxRecord(serializer.deserialize(consumerRecord.value(), OutboxRecord::class.java))
ack.acknowledge()

log.info("committed retry record: ${getConsumerRecordInfo(consumerRecord)}")
} catch (ex: Exception) {
observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord))
observation.error(ex)

val currentRetry = String(consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER).value()).toInt()
observation.highCardinalityKeyValue("currentRetry", currentRetry.toString())

if (ex is InvalidVersionException || ex is NoSuchElementException || ex is OrderNotFoundException) {
publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, currentRetry)
log.warn("ack concurrency write or version exception ${ex.localizedMessage},record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
ack.acknowledge()
return@coroutineScopeWithObservation
}

if (currentRetry > MAX_RETRY_COUNT) {
publishRetryTopic(kafkaTopicsConfiguration.deadLetterQueue.name, consumerRecord, currentRetry + 1)
ack.acknowledge()
log.error("MAX_RETRY_COUNT exceed, send record to DLQ: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
return@coroutineScopeWithObservation
}

if (ex is SerializationException || ex is UnknownEventTypeException || ex is AlreadyProcessedVersionException) {
ack.acknowledge()
log.error("commit not serializable, unknown or already processed record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
return@coroutineScopeWithObservation
}

log.error("exception while processing: ${ex.localizedMessage}, record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}")
publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, currentRetry + 1)
ack.acknowledge()
}
}
}


private suspend fun publishRetryTopic(topic: String, record: ConsumerRecord<String, ByteArray>, retryCount: Int) =
coroutineScopeWithObservation(PUBLISH_RETRY_TOPIC, or) { observation ->
observation.highCardinalityKeyValue("topic", record.topic())
.highCardinalityKeyValue("key", record.key())
.highCardinalityKeyValue("offset", record.offset().toString())
.highCardinalityKeyValue("value", String(record.value()))
.highCardinalityKeyValue("retryCount", retryCount.toString())

record.headers().remove(RETRY_COUNT_HEADER)
record.headers().add(RETRY_COUNT_HEADER, retryCount.toString().toByteArray())

mono { publishRetryRecord(topic, record, retryCount) }
.retryWhen(Retry.backoff(PUBLISH_RETRY_COUNT, Duration.ofMillis(PUBLISH_RETRY_BACKOFF_DURATION_MILLIS))
.filter { it is SerializationException })
.awaitSingle()
}
}

The role of the orders events processor at this microservice is validating the version of the events and updating MongoDB:

interface OrderEventProcessor {
suspend fun on(orderCreatedEvent: OrderCreatedEvent)
suspend fun on(productItemAddedEvent: ProductItemAddedEvent)
suspend fun on(productItemRemovedEvent: ProductItemRemovedEvent)
suspend fun on(orderPaidEvent: OrderPaidEvent)
suspend fun on(orderCancelledEvent: OrderCancelledEvent)
suspend fun on(orderSubmittedEvent: OrderSubmittedEvent)
suspend fun on(orderCompletedEvent: OrderCompletedEvent)
}

@Service
class OrderEventProcessorImpl(
private val orderMongoRepository: OrderMongoRepository,
private val or: ObservationRegistry,
) : OrderEventProcessor {

override suspend fun on(orderCreatedEvent: OrderCreatedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_CREATED_EVENT, or) { observation ->
orderMongoRepository.insert(orderCreatedEvent.order).also {
log.info("created order: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}

override suspend fun on(productItemAddedEvent: ProductItemAddedEvent): Unit =
coroutineScopeWithObservation(ON_ORDER_PRODUCT_ADDED_EVENT, or) { observation ->
val order = orderMongoRepository.getByID(productItemAddedEvent.orderId)
validateVersion(order.id, order.version, productItemAddedEvent.version)

order.addProductItem(productItemAddedEvent.productItem)
order.version = productItemAddedEvent.version

orderMongoRepository.update(order).also {
log.info("productItemAddedEvent updatedOrder: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}

override suspend fun on(productItemRemovedEvent: ProductItemRemovedEvent): Unit =
coroutineScopeWithObservation(ON_ORDER_PRODUCT_REMOVED_EVENT, or) { observation ->
val order = orderMongoRepository.getByID(productItemRemovedEvent.orderId)
validateVersion(order.id, order.version, productItemRemovedEvent.version)

order.removeProductItem(productItemRemovedEvent.productItemId)
order.version = productItemRemovedEvent.version

orderMongoRepository.update(order).also {
log.info("productItemRemovedEvent updatedOrder: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}

override suspend fun on(orderPaidEvent: OrderPaidEvent): Unit = coroutineScopeWithObservation(ON_ORDER_PAID_EVENT, or) { observation ->
val order = orderMongoRepository.getByID(orderPaidEvent.orderId)
validateVersion(order.id, order.version, orderPaidEvent.version)

order.pay(orderPaidEvent.paymentId)
order.version = orderPaidEvent.version

orderMongoRepository.update(order).also {
log.info("orderPaidEvent updatedOrder: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}

override suspend fun on(orderCancelledEvent: OrderCancelledEvent): Unit = coroutineScopeWithObservation(ON_ORDER_CANCELLED_EVENT, or) { observation ->
val order = orderMongoRepository.getByID(orderCancelledEvent.orderId)
validateVersion(order.id, order.version, orderCancelledEvent.version)

order.cancel()
order.version = orderCancelledEvent.version

orderMongoRepository.update(order).also {
log.info("orderCancelledEvent updatedOrder: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}

override suspend fun on(orderSubmittedEvent: OrderSubmittedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_SUBMITTED_EVENT, or) { observation ->
val order = orderMongoRepository.getByID(orderSubmittedEvent.orderId)
validateVersion(order.id, order.version, orderSubmittedEvent.version)

order.submit()
order.version = orderSubmittedEvent.version

orderMongoRepository.update(order).also {
log.info("orderSubmittedEvent updatedOrder: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}

override suspend fun on(orderCompletedEvent: OrderCompletedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_COMPLETED_EVENT, or) { observation ->
val order = orderMongoRepository.getByID(orderCompletedEvent.orderId)
validateVersion(order.id, order.version, orderCompletedEvent.version)

order.complete()
order.version = orderCompletedEvent.version

orderMongoRepository.update(order).also {
log.info("orderCompletedEvent updatedOrder: $it")
observation.highCardinalityKeyValue("order", it.toString())
}
}

private fun validateVersion(id: Any, currentDomainVersion: Long, eventVersion: Long) {
log.info("validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion")
if (currentDomainVersion >= eventVersion) {
log.warn("currentDomainVersion >= eventVersion validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion")
throw AlreadyProcessedVersionException(id, eventVersion)
}
if ((currentDomainVersion + 1) < eventVersion) {
log.warn("currentDomainVersion + 1) < eventVersion validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion")
throw InvalidVersionException(eventVersion)
}
}
}

The MongoDB repository code is quite simple:

interface OrderMongoRepository {
suspend fun insert(order: Order): Order
suspend fun update(order: Order): Order
suspend fun getByID(id: String): Order
suspend fun getAllOrders(pageable: Pageable): Page<Order>
}

@Repository
class OrderMongoRepositoryImpl(
private val mongoTemplate: ReactiveMongoTemplate,
private val or: ObservationRegistry,
) : OrderMongoRepository {

override suspend fun insert(order: Order): Order = coroutineScopeWithObservation(INSERT, or) { observation ->
withContext(Dispatchers.IO) {
mongoTemplate.insert(OrderDocument.of(order)).awaitSingle().toOrder()
.also { log.info("inserted order: $it") }
.also { observation.highCardinalityKeyValue("order", it.toString()) }
}
}

override suspend fun update(order: Order): Order = coroutineScopeWithObservation(UPDATE, or) { observation ->
withContext(Dispatchers.IO) {
val query = Query.query(Criteria.where(ID).`is`(order.id).and(VERSION).`is`(order.version - 1))

val update = Update()
.set(EMAIL, order.email)
.set(ADDRESS, order.address)
.set(STATUS, order.status)
.set(VERSION, order.version)
.set(PAYMENT_ID, order.paymentId)
.set(PRODUCT_ITEMS, order.productsList())

val options = FindAndModifyOptions.options().returnNew(true).upsert(false)
val updatedOrderDocument = mongoTemplate.findAndModify(query, update, options, OrderDocument::class.java)
.awaitSingleOrNull() ?: throw OrderNotFoundException(order.id.toUUID())

observation.highCardinalityKeyValue("order", updatedOrderDocument.toString())
updatedOrderDocument.toOrder().also { orderDocument -> log.info("updated order: $orderDocument") }
}
}

override suspend fun getByID(id: String): Order = coroutineScopeWithObservation(GET_BY_ID, or) { observation ->
withContext(Dispatchers.IO) {
mongoTemplate.findById(id, OrderDocument::class.java).awaitSingle().toOrder()
.also { log.info("found order: $it") }
.also { observation.highCardinalityKeyValue("order", it.toString()) }
}
}

override suspend fun getAllOrders(pageable: Pageable): Page<Order> = coroutineScopeWithObservation(GET_ALL_ORDERS, or) { observation ->
withContext(Dispatchers.IO) {
val query = Query().with(pageable)
val data = async { mongoTemplate.find(query, OrderDocument::class.java).collectList().awaitSingle() }.await()
val count = async { mongoTemplate.count(Query(), OrderDocument::class.java).awaitSingle() }.await()
PageableExecutionUtils.getPage(data.map { it.toOrder() }, pageable) { count }
.also { observation.highCardinalityKeyValue("pageResult", it.pageable.toString()) }
}
}
}

More details and source code of the full project you can find here in the GitHub repository, of course in real-world applications, we have to implement many more necessary features, like k8s health checks, rate limiters, etc., depending on the project it can be implemented in different ways, for example, you can use Kubernetes and Istio for some of them. I hope this article is useful and helpful, and be happy to receive any feedback or questions, feel free to contact me by email or any messengers :)

--

--