Making of Message Gateway with Kafka - Part 2

Assembling multi-part messages using KafkaStreams API

What and why? 

In previous post we gave general overview of the product we are building - message gateway. In this post we are going to describe how we process message parts and assemble whole messages.

One issue with  SMS is that messages are limited to 160 characters. The way SMS protocol overrides that limitation is by using so called "Long SMS messages". Basically, multiple SMS messages are sent as parts of final message which is displayed to end user only after all parts arrive.

SMPP protocol supports this feature, so we need to support it in our product as well. Since we are not limited to SMPP  protocol only, we made our own abstraction of multi-part messages. It boils down to this: every message is multi-part and can have one or more parts. Messages with one part are logically not multi-part, but from technical perspective it's easier to treat all messages in same way, so we don't make structural difference between single part messages and multi-part messages.

As we said in previous post, our product must be cluster ready, with multiple active nodes running and our customers can be connected to any of active nodes (customers are not really aware of nodes, they  always connect to same virtual IP address and load-balancer behind it distributes connections to available nodes in round-robin fashion).  We also support customer multi-binding, meaning that same customer can be have more than one active connections simultaneously. It can even send different parts of same multi-part message through different connections.

This means we have to be able to aggregate message parts coming from customer through one or more connections which can be established to any of our application nodes.

In previous version of our product we are using Hazelcast's distributed maps to achieve this. As parts are coming in, they are stored to Hazelcast map and after last part is collected, final message is assembled.  This is quite simple, but there are some limitation:
- application nodes must be visible to each other over the network
- if Hazelcast map is not persisted we can lose data in case some application nodes go down.
We could enable map data persistence but that requires "centralized system that is accessible from all Hazelcast membersPersistence to a local file system is not supported." as stated in Hazelcast documentation. Persisting to centralized data store (E.g. database) will slow things down.

Since we already use Apache Kafka, we decided to use it for aggregating message parts. Compared to Hazelcast implementation, this is more robust and fail-safe solution which doesn't require direct data replication between application nodes.

And this is how it works...

We have two topics,  first is called "customer-incoming-message-parts" and, as you can guess,  it's used for storing message parts. Second is called "customer-incoming-messages" and this is where final messages (containing all parts) are stored.

When message is received from customer, it contains data that we can use to examine if this is single-part or multi-part message.

If it's single-part, new instance of final message (class "WholeMessage") is created and written directly to "customer-incoming-messages" topic (using KafkaProducer).

Otherwise (if message received from customer is multi-part), we can extract info such as identifier of the message this part belongs to, number (index) of this part,  total parts count that final message contains. Then we just create instance of message part (class "MessagePart"), fill it with the info mentioned before and write it to "customer-incoming-message-parts", also using KafkaProducer.

Customer connection handler's job is done here. It does't need to worry how the message parts will be assembled or processed, so the code is pretty simple and clean.

Assembling final messages 

Actual assembling of message is done by KafkaStreams API. This picture shows how our data is moved around.

CX represents customer id, MX represents message id, P represents message part

What we need to do is:

1. Configure streams application
2. Define steams application topology by using KStream / KTable operations
3. Start streams application

And this is how the code looks ( written in Kotlin ) :

val config = Properties()
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "customer-incoming-message-parts-processor")
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE)

val streamConfig = StreamsConfig(config)

val builder = StreamsBuilder()

val inputStream =<Long, MessagePart>("customer-incoming-message-parts")
        .groupBy({ _, value ->
        }, Serialized.with(Serdes.String(), MessagePartSerde()))
                Materialized.with(Serdes.StringSerde(), WholeMessageSerde())
        .filter { _, value -> == value.totalPartsCount }
        .map { _, value -> KeyValue(null as Long?, value) }
                Produced.with(Serdes.Long(), WholeMessageSerde())

streams = KafkaStreams(, streamConfig)

Configuring streams application

Streams configuration requires "application id" property. Here "application" doesn't mean  your  application or some external application, this refers to set of processes that run this KafkaStreams topology across your application nodes. This property is very similar to group id with Kafka consumer configuration. It is used (among other things) to control how multiple instances of this KafkaStreams process access topic partitions. Same as with consumer groups, multiple instances of  KafkaStreams with same application id can never read same topic partition simultaneously. Number of partitions limits the number of streaming application instances that can read from the topic simultaneously.

Boostrap servers property is same thing we use when configuring any Kafka producer or consumer. We need to tell our streams application how to find Kafka cluster,  by providing one or more broker addresses (comma separated) in form {host}:{port}

Default key "serde" and value "serde" property defines which Serdes (serializer and deserializer combined) to use for serializing and deserializing keys and values of topic records.  There is a bunch of predefined serdes already provided by Kafka library, but you can also implement your own.
We only set default serdes, but sooner or later we will want to transform key or value of input topics to some other type on the output, thus requiring different serdes, and in our example we are doing it by using "Serialized.with",  "Materialized.with" and "Produced.with" methods.

"Exactly once" property tells KafkaStreams that we want it to take care of every message from input topic is processed by this streams topology but not more then once. Default value of this property is false which means that messages will be processed in "at least once" mode. In this mode all messages will be processed, but some messages can be processed twice in a case of application node crash. Exactly once is most desirable behavior. It comes with some overhead (not too high though ) so you need to decide if this is what you need.

Defining topology of streams application

KafkaStreams  DSL provides operations for transforming, filtering, aggregating ..etc of stream records.
We distinguish stateless  from stateful operations. Stateless operations operates on records without depending on result of operations on previous records (filtering, mapping...) while stateful operations do (counting, aggregating...) .

Our input topic is "customer-incoming-message-parts"  which is filled with message parts  by multiple customer connection handlers. So this topic contains message parts from different customers which belong to different "final" messages. Message part object is value of this topic records, and key is null. Message part objects have all the information we need (customer id, message id, part number, total parts count). By using null for key, we achieve filling partitions in round-robin fashion thus balancing data more evenly among Kafka brokers and we also get more parallelism in processing that data. Number of partitions in this  topic is currently set to 6, so we can process data with up to 6 streams applications simultaneously, although we will probably only have 3 nodes (instances of streaming app).

In "customer-incoming-message-parts-processor" application topology, first thing we define is input topic - from which our streams application is fed. "" method creates so called "source stream". Records in that stream are same as in input topic, key = null, value = message part instance.

Now most important part - aggregating. To aggregate message part belonging to same message, first we group message parts by compound key combining customer id and message id. customer id is necessary because different customers can have same message ids. Message id type is String which gives us best flexibility since we don't know which type customers use for message id's. String can wrap anything.
Once we have message parts grouped by customer id and message id, we aggregate those parts.
aggregate() method requires instances  of org.apache.kafka.streams.kstream.Initializer  and org.apache.kafka.streams.kstream.Aggregator as arguments.
Initializer is used to initiate "empty" instance of "aggregation result object" while Aggregator is used to add parts to appropriate aggregation result object.

In our case Initializer only returns new instance of "final" message which is empty, no parts added yet.

inner class WholeMessageInitializer : Initializer<WholeMessage> {
    override fun apply(): WholeMessage {
        return WholeMessage(0, "", 0)

And here is our aggregator. Every time new part comes, aggregator's apply method will be called with that part as well as current aggregation result passed as arguments. What aggregator needs to do, is to return new aggregation result object based on those arguments. We just make another instance of "WholeMessage" add all parts present in current aggregation and add new part.

inner class MessagePartAggregator : Aggregator<String, MessagePart, WholeMessage> {
    override fun apply(msgId: String, part: MessagePart, message: WholeMessage): WholeMessage? {
        val updatedMessage = WholeMessage(part.senderId, part.senderMsgId, part.partsCount)
        return updatedMessage

This solves aggregation. Result of aggregation operation is KTable with String as a key and WholeMessage as value. Now we can convert it to stream, filter it in order to get only fully assembled WholeMessage records  and output that instances to output topic.

.filter { _, value -> == value.totalPartsCount }
.map { _, value -> KeyValue(null as Long?, value)}
        Produced.with(Serdes.Long(), WholeMessageSerde())

In current phase of development, it seems that  messages in output topic  don't need  to be processed in exact order they arrived, more important is to get maximum possible speed of processing, and that can be achieved by having messages evenly distributed across topic partitions by using null keys (results in round-robin writing to partitions). There is a possibility that we will have to process messages belonging to same customer sequentially so we decided key type of output topic to be Long, and maybe later instead of null key we will use customer id.

Since our output topic key type is Long (for reasons described above), and aggregation key is string, before using to() operation we need to transform record keys. To achieve this transformation we use map() operation and transform our key to null value of Long type.

Now in output topic we have only fully assembled "WholeMessage" instances that will be processed by another streams application. This will be discussed in one of our next posts. Stay tuned.


Popular posts from this blog

Making of Message Gateway with Kafka - Part 3

Making of Message Gateway with Kafka - Part 1