Making of Message Gateway with Kafka - Part 4

Using aggregate and join operations to handle message receipt confirmations

It' been a while since last blog post in this series, but now we continue our journey.

After sending SMS messages to service providers, next thing we have to do is to keep track of it's delivery status.
There are two asynchronous responses that we should receive from our SMS service providers (aka "suppliers") after we send them a message.

1. Supplier sends us so called "submit response" which confirms that SMS message is received and accepted for further processing on their side.

2. If we requested it in our send request, supplier sends us delivery report for  SMS message. This is report about status of delivery to mobile user.

In this post we deal with first one ...

Before we go further, note that we support so-called  "long sms" - messages that are sent as several SMPP messages that represent parts of the whole message.

In order to support that, we have abstraction of "whole" message. Let's say that class representing "long sms" message is called "WholeMessage". And we also have message parts, which are actual SMPP messages being sent to suppliers. Class representing these parts is called MessagePart.
"WholeMessage" is our central entity. This is the message from business perpective and it can represent either "long sms" or "normal sms" message.

We want to track the status of WholeMessage instances because this is the message in business sense, and message parts (SMPP messages that are actually sent) are technical detail of how the message is really sent. Of course, the status of the whole message depends on statuses of all message parts that belong to it.
This means that on every submit response arriving for specific part, we need to check if all other parts submit responses arrived, and if that's the case, we can update our WholeMessage instance status to "SENT" (initial status is  "PENDING") .

Picture bellow illustrates what we do.



1. When sending WholeMessage instances, we set  it's status to PENDING and store them to Kafka topic called "sent-messages", with key being WholeMessage ID (generated by our application), and value is WholeMessage instance itself.

2. We split  WholeMessage instances into one or more MessagePart instances (depending how long is the content of the message) and send them as SMPP messages to appropriate supplier.

3. When we receive "submit response" for specific SMPP message, it contains reference to the MessagePart instance we sent as well as supplier generated ID for that SMPP message. Storing MessagPart instance  in memory between sending and receiving submit response is handled by SMPP library (we use Cloudhopper library). Once we got submit response, we update corresponding MessagePart instance with supplier generated ID and store it to Kafka topic called  "submit-responses" using WholeMessage instance ID as key, and MessagePart itself as value.

4. Using KafkaStreams API we aggregate MessagePart instances (belonging to same WholeMessage object) from "submit-responses" topic and filter aggregates which have all parts arrived. Result of this operation is KTable with WholeMessage ID as key and MessagePartsAggregate as value. By joining this KTable with the KTable built from "sent-messages" topic we get the reference to WholeMessage instance in "sent-messages" topic and since we now know that all parts of this message are successfully sent, we can update it's status to "SENT" and send it to some other topic or external system used to store latest state of messages.

The code for step 4 follows. (Code for sending WholeMessage instances to "sent-messages" topic (step 1) and sending MessagePart instances to "submit-responses" topic (step 3) is not shown here since it's very simple and no need to explain )



val sentMessages = builder.table<String, WholeMessage>("sent-messages", 
                               Materialized.with(Serdes.StringSerde(), WholeMessageSerde()))
val inputStream = builder.stream<String, MessagePart>("submit-responses")

inputStream
        .groupByKey()
        .aggregate(
                MessagePartAggregateInitializer(),
                MessagePartAggregator(),
                Materialized.with(Serdes.StringSerde(), MessagePartsAggregateSerde())
        )
        .filter { messageId, value -> value.parts.size == value.totalPartsCount }
        .join(
                sentMessages,
                { messageParts, message ->
                    message.updateStatus(MessageStatus.SENT, "Sent successfully")
                    message
                }, Materialized.with(Serdes.StringSerde(), WholeMessageSerde())
        )
        .toStream()
        .to("messages-store")


In order to join records from two KTables or KStreams, those records must have same key. This is why we used WholeMessage instance ID as a key both in "sent-messages" and "submit-responses" topics.

As you see, amount  of code is  quite small, yet it does the job in scalable and fault tolerant way.
In next post we will talk about handling delivery reports also using join and aggregate operations of KafkaStreams API.

Comments

Popular posts from this blog

Making of Message Gateway with Kafka - Part 3

Making of Message Gateway with Kafka - Part 2

Making of Message Gateway with Kafka - Part 1