Making of Message Gateway with Kafka - Part 3

Balancing service provider connections across application nodes

One of the  challenges with cluster-ready applications is balancing specific processes throughout the cluster nodes. By process I mean part of the application logic that runs in separate thread.  For example, some process can read data from database and export it in PDF files, another example is schedule job that does something periodically (e.g. generating some reports). 

In many cases such processes must run only on one node at the same time, but if that node fails, the process should be restarted on another node.
Running more then one instance of  same process (for example, each node runs one instance) will cause troubles in many cases. For example , if our process must only process data in certain state, and after processing it state should be updated, having two processes operating on same data could lead to serious data inconsistencies.

Our case

Our application has to send messages to several different service providers (SMS centers or hubs) but number of connections to specific provider is limited (in most cases to single connection) so we can't just open connections and send messages for all providers on every node. 

For every service provider, we have  Kafka topics with messages ready to be sent to this provider.
Our process should  create  a connection to given provider and while connection is active it should read messages from appropriate topics and send them over this connection. Quite simple if this should be implemented on single node application.

But, we have n application nodes and m connections and we should:

1. Open each connection only on one node
2. Balance m connection to n nodes evenly
3. Re-balance connections after node goes down or new node appears, new service provider is added or existing one deleted.

This is illustrated in picture below. Connections marked blue are the ones that had to be moved to another node as a result of re-balancing.




We can't start our processes during node startup, since we don't have any knowledge of  connections already established on another nodes, so we don't know which connections should be established on this node. So, we need cluster level mechanism that is aware of available nodes and can "calculate" which connections should be opened on which node, and then notifies all nodes about this calculated "connection deployment schema". After receiving this info, each node should establish connections assigned to it and start sending process.

Let's call this mechanism Rebalancer and it's actually also a process, and it should run only on single node at the same time. Otherwise, if we have multiple Rebelancers running at the same time - it would be inefficient and could lead to many other problems regarding information inconsistency.

Rebalancer's job is to:
  1. Gather information of currently active nodes and connections established on each of them.
  2. Periodically check if connections are distributed evenly.
  3. If connections are not distributed right, recalculate new "connection deployment schema" and notify nodes about it.

Rebalancing connections - Kafka style

First, lets see how Rebalancer gathers info on available nodes and currently established  connections. New component called HeartBeatSender is coming into play here. 

HeartBeatSender process instance is running on on each nodes send messages to Kafka topic called "heart-beats" using KafkaProducer, every 500 ms. 
Rebalancer is streaming application, also running on each node and reading messages from "heart-beats" topic. Important thing is that:
a) "heart-beats" topic must have only one partition
b) Rebalancer instances all have same group id
This ensures that only one of Rebalancer instances will actually read (and process) heartbeats at the same time. 
If the node with currently active, Rebalancer goes down, due to Kafka consumer group balancing mechanism, Rebalancer on another node will automatically take over.

Heartbeat sender code is pretty simple:

 fun initialize(nodeId: String) {
        this.producer = kafkaUtils.createProducer(
                ack = "1",
                keySerializer = StringSerializer::class.java,
                valueSerializer = NodeInfoSerializer::class.java
        )
        this.timer.schedule(SendBeatTask(nodeId), 0, beatFrequencyMS)
    }

 inner class SendBeatTask(private val nodeId: String) : TimerTask() {
        override fun run() {
            val nodeInfo = HeartBeat(nodeId, connectionsInfo, System.currentTimeMillis())
            producer.send(ProducerRecord(kafkaUtils.getHeartBeatTopicName(), "heartBeat", nodeInfo))
        }
    }

How Rebalancer works 

Rebalancer repeats following steps approximately every 5 seconds :

  1. Take node heartbeats received in last 5 seconds
  2. Filter only latest per node, and
  3. Resolve current connection deployment schema from that info (heartbeat contains info about currently established connections on that node).
  4. If all connections are deployed evenly on currently active nodes, everything is as it should be, do nothing.
  5. If there are connections that are not established on any node , or there is uneven balance of connections (possible due to new node that appeared since last re-balance), recalculate new connection deployment schema and publish it so nodes can read it. Publishing here means writing it to topic called "supplier-connection-deployment".

New schema calculation can be done in different ways. Simplest would be just to iterate through all connections that should be deployed in cluster and assign them in round-robin fashion to available nodes. This simple algorithm has one downside: in case of re-balancing (node crashed or new node appeared),  some connections currently active on surviving node could be assigned to another node, which means that they will be closed on one and opened on another node, causing pause in sending messages. This not very big problem since it only takes several seconds, and it happens rarely, but we actually use bit complicated algorithm that takes into account existing assignations and preserves connections where they currently are if possible. Connection is only moved from surviving node if it can be more evenly balanced (because new node appeared, for example)


Aggregating heartbeats every 5 seconds

KafkaStreams DSL support aggregation by time windows (frames). We will group heartbeats that are produced in same 5 second time frames and after certain time-frame is "finished" we will process this data to resolve what nodes are currently available and what connections are currently established on which node (this info is contained within heartbeats records). That sounds great , but there is one problem: KafkaStreams can't notify us when time window is finished and pass aggregated data in that moment. It works in different way: we get updated aggregated state every time it changes (new heartbeat is read from stream) and we also get time window info (based on event time of latest heartbeat). So , in order to process aggregated heartbeats only after time frame is finished, we take following steps:

1. We make sure all nodes system time is in sync (we don't need millisecond precision synchronization,, but let's keep it under 1 second). This way we can check if update is received in "current" time-frame (current from streaming app perspective).

2. On every update of heartbeats aggregation we check if this happened in "current" window, since we are not interested in events that happened longer then 5 seconds ago (updates for "older" windows can be received - older  heartbeats not yet processed because Rebalancer was not working at that time for some reason).

3. If update is for "current" window, we store value (aggregated heartbeats) to field currentWindowedHeartBeats and window to field currentWindow.

4. Before storing values to  currentWindowedHeartBeats and currentWindow we check if new update event window is just after the one already stored in currentWindow field. If so, we know that currentWindow is just finished (approximately) and it is time to process currentWindowedHeartBeats data before we overwrite it with update for new window. Processing is done by handleTimeFrameHeartBeats method as you can see from the code bellow.


private fun startHeartBeatsListener() {

        val streamConfig = kafkaUtils.createStreamsConfig(
                applicationId = "supplier-connection-rebalancer",
                keySerde = Serdes.StringSerde::class.java,
                valueSerde = HeartBeatSerde::class.java,
                exactlyOnce = true,
                commitIntervalMS = 100L
        )
        val builder = StreamsBuilder()
        val inputStream = builder.stream<String, HeartBeat>(kafkaUtils.getHeartBeatTopicName())

        inputStream
                .groupByKey()
                .windowedBy(TimeWindows.of(timeFrameSizeMS))
                .aggregate(
                        { HeartBeatsTimeFrame() },
                        { _, value, aggregate ->
                            aggregate.heartBeats.add(value)
                            aggregate
                        },
                        Materialized.`as`<String, HeartBeatsTimeFrame, WindowStore<Bytes, ByteArray>>("heart-beats-time-framed")
                                .withValueSerde(HeartBeatListSerde())
                )
                .toStream()
                .foreach { key, value ->
                     if (isCurrentTimeWindow(key.window())) {
                        if (isNewWindow(key.window())) {
                            handleTimeFrameHeartBeats(currentWindowedHeartBeats)
                        }
                        this.currentWindow = key.window()
                        this.currentWindowedHeartBeats = value
                    }
                }

        streams = KafkaStreams(builder.build(), streamConfig)
        streams.start()
    }

handleTimeFrameHeartBeats method resolves current connection deployment scheme (based on aggregated heartbeats info) and checks if deployment schema  needs to be changed. Changes might be needed because new node came into play, existing node crushed, new service providers added, existing service provider deleted, etc.

private fun handleTimeFrameHeartBeats(heartBeatsTimeFrame: HeartBeatsTimeFrame) {
  val currentScheme = resolveCurrentScheme(heartBeatsTimeFrame)
  val requiredScheme = calculateRequiredScheme()
  if (isRebalancingNeeded(currentSchemeConnections, requiredSchemeConnections)) 
      producer.send(ProducerRecord(kafkaUtils.getSupplierConnectionDeploymentTopicName(), null, requiredScheme))
  else
      log.debug("Rebalancing NOT needed")

If deployment schema changed, we are sending it to "supplier-connection-deployment" topic so all active nodes can get it.

Opening and closing connections on active nodes

Every node reads connection deployment schema changes from "supplier-connection-deployment" topic using KafkaConsumer. This consumer instances all have different group id, meaning that all of them will receive new messages (connection deployment schema's) from "supplier-connection-deployment" topic. Each node will read from schema which connections are assigned to it, and compare it with currently opened connections. If there are some connections established on this node, but according to new schema, not assigned to this node any more  it will be closed. On the other hand, if there are connections assigned to this node but currently not established, it will be established at this moment.

And that's it. This mechanism covers all cases: Single node will have all connections established, adding new nodes will result in moving connections from "old" to new one. In case some node fails, it's connections will assigned to surviving nodes. Connection deployment re-balancing process takes about half a minute due to Kafka consumer group re-balancing duration and the way we aggregate heartbeats in 5 second time windows.

Notice that  there is no direct interaction between nodes, they a actually unaware of each other network location. All data is exchanged through Kafka topics.
There are certainly ways to achieve this functionality without using Kafka, however with Kafka we get very robust and yet pretty elegant solution. 

Comments

Popular posts from this blog

Making of Message Gateway with Kafka - Part 2

Making of Message Gateway with Kafka - Part 1