Making of Message Gateway with Kafka - Part 5

Handling delivery reports

So much time passed since last blog post in this series, I feel any kind of apology would be  hypocritical :)
Instead, let's just start with the post...

In previous post I was talking about handling SMS message receipts  (known as "submit responses" in SMS world) coming from SMS service provider.

Let's just quickly go through all steps in SMS delivery process.
  1. External application send SMS message to message gateway (via SMPP protocol)
  2. Message gateway sends SMS message to SMS service provider (SMS center or online SMS service)
  3. SMS service provider sends so-called "submit response" to message gateway, confirming that it accepted the message for further processing
  4. After that, SMS service provider tries to deliver the message to end user (mobile device) and notifies message gateway about delivery result (which can be success, failure, expired or some intermediate status)
In this post we are going to deal with processing that happens after step 4 (delivery report received).

Before we dig in, lets take a look at diagram that visualize processing of submit responses described in detail in previous blog post. This diagram serves as both reminder and simplification - we omit multi-part aggregation and focus only on joins. 



Our message gateway uses WholeMessage class to represent SMS message whether it is single or multi-part SMS message.
Whenever we send our WholeMessage instance, first we store it to sent-messages topic using it's ID as key (referred as ourMsgId in diagram). Then we decide if one or more parts (SMPP messages) should be sent (represented by MessagePart class in our diagram).
For each sent MessagePart instance, SMS service provider will send us back submit response, and SMPP library keeps track of sent messages and received submit responses and it correlates them, so when our code gets notified about submit response, passed object also contains a (in-memory) reference to sent MessagePart. And, since MessagePart contains id of WholeMessage (ourMsgId), we can use it as key for writing to submit-responses topic.
We use MessagePart instance as a value, updated with following information from SubmitResponse:
 - status (tells us if message is accepted or rejected)
 - ID assigned by SMS service provider ( let's call it externalId ), we will need it later for correlation  with delivery report)

With ourMsgId as key in both topics, we have everything ready for joining.
Once submit response has arrived, join is triggered, we take WholeMessage instance from right side of the join, update it with MessagePart submit status (left side of join) and forward updated WholeMessage down the stream, where it gets written to message-store topic.
This topic is compacted and exported to Elasticsearch so we can search for messages and observe their current status.

That's our first streaming application, with very simple topology, just as we like it.

Finally - delivery reports...

Our second streaming application handles delivery reports and it's bit more complicated.
A picture is worth a thousand word so let's start with diagram.




Let's see what happens here...
Upon arrival,  delivery reports are stored in delivery-reports topic, but in that moment we don't have any reference to MessagePart instance, we only have externalId - (ID assigned by SMS service provider , remember ?) and delivery status for sent MessagePart.
externalId is used as key for writing to delivery-reports topic.
Our final goal is to  join DeliveryReport records from delivery-reports topic with WholeMessage records from sent-messages topic in order to update WholeMessage status and store it to message-store topic.
It would be piece of cake if keys of delivery-reports topic were ourMsgId, but unfortunately, this is not the case, we have externalId as a key, so we have to use submit-responses topic as a link between delivery-reports and sent-messages topics.
submit-responses topic also uses ourMsgId as key, but it has MessagePart as a value, so we can use it to extract externalId from it.
To do that, we need to group KStream derived from submit-responses  topic and select MessagePart.externalId as key.
(Note: reusut of groupBy operation is KGroupedStream, and we need KTable, so we  use little trick to get it:  we apply dummy reduce operation)
That results in KTable whith externalId as key and MessagePart as value. Now we can join delivery-reports topic with it.
Result of that join is KTable with externalId as key and value being MessagePart with delivery status info added to it.

Next, we need to join it with sent-messages topic, but the problem is that we still have externalId as key, while sent-messages topic has ourMsgId as key. We must re-group this table once again. So we do it using groupBy operation and selecting MessagePart.ourMsgId as key.
This will give us KTable, having ourMsgId as key and MessagePart as value, and now we can finally perform join to sent-messages topic.
Our ValueJoiner takes WholeMessage from right side of the join, updates it with delivery status info from MessagePart (left side of the join) and finally outputs updated WholeMessage to message-store topic. Done. Phew!

Can we make it simpler ?

I always ask myself this question since simplification always lead to better design.
In this case we have two streaming apps, one is very simple and another is significantly more complicated (although , i would't say too complicated).
We could "move" some complexity from the second streaming app to the first.
How? When submit response arrives we get WholeMessage ID ( ourMsgId ) from MessagePart and use it as key for writing to submit-responses topic. We could instead use externalId (which we received as a part of submit response) as a key.
If we do that, then we eliminate the need to use first groupBy operation in our delivery report handling app (second streaming app) since submit-responses topic keys would already be externalId, same as keys in delivery-reports topic, so we could join them immediately.
On the other hand, our first streaming app, one that handles submit responses would have to re-group KStream derived from  submit-responses topic in order to have ourMsgId as key before joining with KTable derived from sent-messages topic.

Another approach would be to have two separate topics for submit responses, one keyed by ourMsgId and used in first streaming app, and another keyed by  externalId  and used in second streaming app. This way first app stays very simple, second app is simplified, but now the  producer of submit responses gets more complicated since it has to write to two topics.
Maybe it sounds simpler, but I don't prefer this approach since producer has to write to two topics instead of just one, and it must do it in transactional way, which complicates things.

Foreign joins in Kafka 2.4

In our streaming app that handles delivery reports, we had to use groupBy operation  twice in order to change keys before we can perform join. In Kafka 2.4 this need will be eliminated by new feature - foreign joins, defined by KIP-213.
Thanks to this feature, we will be able to pass foreignKeyExtractor function to join operation. That function extracts the key relevant for joining from record value) and the join is performed by that (extracted) key instead of current key of KTable.

This is great improvement that will make streaming apps using joins easier to write and maintain. 

Comments

Popular posts from this blog

Making of Message Gateway with Kafka - Part 3

Making of Message Gateway with Kafka - Part 1

Kafka demo at JavaCro 2016 conference