Making of Message Gateway with Kafka - Part 3
Balancing service provider connections across application nodes
One of the challenges with cluster-ready applications is balancing specific processes throughout the cluster nodes. By process I mean part of the application logic that runs in separate thread. For example, some process can read data from database and export it in PDF files, another example is schedule job that does something periodically (e.g. generating some reports).
In many cases such processes must run only on one node at the same time, but if that node fails, the process should be restarted on another node.
Running more then one instance of same process (for example, each node runs one instance) will cause troubles in many cases. For example , if our process must only process data in certain state, and after processing it state should be updated, having two processes operating on same data could lead to serious data inconsistencies.
Our case
Our application has to send messages to several different service providers (SMS centers or hubs) but number of connections to specific provider is limited (in most cases to single connection) so we can't just open connections and send messages for all providers on every node.
For every service provider, we have Kafka topics with messages ready to be sent to this provider.
Our process should create a connection to given provider and while connection is active it should read messages from appropriate topics and send them over this connection. Quite simple if this should be implemented on single node application.
But, we have n application nodes and m connections and we should:
1. Open each connection only on one node
2. Balance m connection to n nodes evenly
3. Re-balance connections after node goes down or new node appears, new service provider is added or existing one deleted.
This is illustrated in picture below. Connections marked blue are the ones that had to be moved to another node as a result of re-balancing.
This is illustrated in picture below. Connections marked blue are the ones that had to be moved to another node as a result of re-balancing.
We can't start our processes during node startup, since we don't have any knowledge of connections already established on another nodes, so we don't know which connections should be established on this node. So, we need cluster level mechanism that is aware of available nodes and can "calculate" which connections should be opened on which node, and then notifies all nodes about this calculated "connection deployment schema". After receiving this info, each node should establish connections assigned to it and start sending process.
Let's call this mechanism Rebalancer and it's actually also a process, and it should run only on single node at the same time. Otherwise, if we have multiple Rebelancers running at the same time - it would be inefficient and could lead to many other problems regarding information inconsistency.
Rebalancer's job is to:
- Gather information of currently active nodes and connections established on each of them.
- Periodically check if connections are distributed evenly.
- If connections are not distributed right, recalculate new "connection deployment schema" and notify nodes about it.
Rebalancing connections - Kafka style
First, lets see how Rebalancer gathers info on available nodes and currently established connections. New component called HeartBeatSender is coming into play here.
HeartBeatSender process instance is running on on each nodes send messages to Kafka topic called "heart-beats" using KafkaProducer, every 500 ms.
Rebalancer is streaming application, also running on each node and reading messages from "heart-beats" topic. Important thing is that:
a) "heart-beats" topic must have only one partition
b) Rebalancer instances all have same group id
This ensures that only one of Rebalancer instances will actually read (and process) heartbeats at the same time.
If the node with currently active, Rebalancer goes down, due to Kafka consumer group balancing mechanism, Rebalancer on another node will automatically take over.
Heartbeat sender code is pretty simple:
fun initialize(nodeId: String) {
this.producer = kafkaUtils.createProducer(
ack = "1",
keySerializer = StringSerializer::class.java,
valueSerializer = NodeInfoSerializer::class.java
)
this.timer.schedule(SendBeatTask(nodeId), 0, beatFrequencyMS)
}
inner class SendBeatTask(private val nodeId: String) : TimerTask() {
override fun run() {
val nodeInfo = HeartBeat(nodeId, connectionsInfo, System.currentTimeMillis())
producer.send(ProducerRecord(kafkaUtils.getHeartBeatTopicName(), "heartBeat", nodeInfo))
}
}
How Rebalancer works
Rebalancer repeats following steps approximately every 5 seconds :
1. Take node heartbeats received in last 5 seconds
2. Filter only latest per node, and
3. Resolve current connection deployment schema from that info (heartbeat contains info about currently established connections on that node).
4. If all connections are deployed evenly on currently active nodes, everything is as it should be, do nothing.
5. If there are connections that are not established on any node , or there is uneven balance of connections (possible due to new node that appeared since last re-balance), recalculate new connection deployment schema and publish it so nodes can read it. Publishing here means writing it to topic called "supplier-connection-deployment".
1. Take node heartbeats received in last 5 seconds
2. Filter only latest per node, and
3. Resolve current connection deployment schema from that info (heartbeat contains info about currently established connections on that node).
4. If all connections are deployed evenly on currently active nodes, everything is as it should be, do nothing.
5. If there are connections that are not established on any node , or there is uneven balance of connections (possible due to new node that appeared since last re-balance), recalculate new connection deployment schema and publish it so nodes can read it. Publishing here means writing it to topic called "supplier-connection-deployment".
New schema calculation can be done in different ways. Simplest would be just to iterate through all connections that should be deployed in cluster and assign them in round-robin fashion to available nodes. This simple algorithm has one downside: in case of re-balancing (node crashed or new node appeared), some connections currently active on surviving node could be assigned to another node, which means that they will be closed on one and opened on another node, causing pause in sending messages. This not very big problem since it only takes several seconds, and it happens rarely, but we actually use bit complicated algorithm that takes into account existing assignations and preserves connections where they currently are if possible. Connection is only moved from surviving node if it can be more evenly balanced (because new node appeared, for example)
Aggregating heartbeats every 5 seconds
KafkaStreams DSL support aggregation by time windows (frames). We will group heartbeats that are produced in same 5 second time frames and after certain time-frame is "finished" we will process this data to resolve what nodes are currently available and what connections are currently established on which node (this info is contained within heartbeats records). That sounds great , but there is one problem: KafkaStreams can't notify us when time window is finished and pass aggregated data in that moment. It works in different way: we get updated aggregated state every time it changes (new heartbeat is read from stream) and we also get time window info (based on event time of latest heartbeat). So , in order to process aggregated heartbeats only after time frame is finished, we take following steps:
1. We make sure all nodes system time is in sync (we don't need millisecond precision synchronization,, but let's keep it under 1 second). This way we can check if update is received in "current" time-frame (current from streaming app perspective).
2. On every update of heartbeats aggregation we check if this happened in "current" window, since we are not interested in events that happened longer then 5 seconds ago (updates for "older" windows can be received - older heartbeats not yet processed because Rebalancer was not working at that time for some reason).
3. If update is for "current" window, we store value (aggregated heartbeats) to field currentWindowedHeartBeats and window to field currentWindow.
4. Before storing values to currentWindowedHeartBeats and currentWindow we check if new update event window is just after the one already stored in currentWindow field. If so, we know that currentWindow is just finished (approximately) and it is time to process currentWindowedHeartBeats data before we overwrite it with update for new window. Processing is done by handleTimeFrameHeartBeats method as you can see from the code bellow.
1. We make sure all nodes system time is in sync (we don't need millisecond precision synchronization,, but let's keep it under 1 second). This way we can check if update is received in "current" time-frame (current from streaming app perspective).
2. On every update of heartbeats aggregation we check if this happened in "current" window, since we are not interested in events that happened longer then 5 seconds ago (updates for "older" windows can be received - older heartbeats not yet processed because Rebalancer was not working at that time for some reason).
3. If update is for "current" window, we store value (aggregated heartbeats) to field currentWindowedHeartBeats and window to field currentWindow.
4. Before storing values to currentWindowedHeartBeats and currentWindow we check if new update event window is just after the one already stored in currentWindow field. If so, we know that currentWindow is just finished (approximately) and it is time to process currentWindowedHeartBeats data before we overwrite it with update for new window. Processing is done by handleTimeFrameHeartBeats method as you can see from the code bellow.
private fun startHeartBeatsListener() {
val streamConfig = kafkaUtils.createStreamsConfig(
applicationId = "supplier-connection-rebalancer",
keySerde = Serdes.StringSerde::class.java,
valueSerde = HeartBeatSerde::class.java,
exactlyOnce = true,
commitIntervalMS = 100L
)
val builder = StreamsBuilder()
val inputStream = builder.stream<String, HeartBeat>(kafkaUtils.getHeartBeatTopicName())
inputStream
.groupByKey()
.windowedBy(TimeWindows.of(timeFrameSizeMS))
.aggregate(
{ HeartBeatsTimeFrame() },
{ _, value, aggregate ->
aggregate.heartBeats.add(value)
aggregate
},
Materialized.`as`<String, HeartBeatsTimeFrame, WindowStore<Bytes, ByteArray>>("heart-beats-time-framed")
.withValueSerde(HeartBeatListSerde())
)
.toStream()
.foreach { key, value ->
if (isCurrentTimeWindow(key.window())) {
if (isNewWindow(key.window())) {
handleTimeFrameHeartBeats(currentWindowedHeartBeats)
}
this.currentWindow = key.window()
this.currentWindowedHeartBeats = value
}
}
streams = KafkaStreams(builder.build(), streamConfig)
streams.start()
}
private fun handleTimeFrameHeartBeats(heartBeatsTimeFrame: HeartBeatsTimeFrame) {
val currentScheme = resolveCurrentScheme(heartBeatsTimeFrame)
val requiredScheme = calculateRequiredScheme()
if (isRebalancingNeeded(currentSchemeConnections, requiredSchemeConnections))
producer.send(ProducerRecord(kafkaUtils.getSupplierConnectionDeploymentTopicName(), null, requiredScheme))
else
log.debug("Rebalancing NOT needed")
If deployment schema changed, we are sending it to "supplier-connection-deployment" topic so all active nodes can get it.
Opening and closing connections on active nodes
Every node reads connection deployment schema changes from "supplier-connection-deployment" topic using KafkaConsumer. This consumer instances all have different group id, meaning that all of them will receive new messages (connection deployment schema's) from "supplier-connection-deployment" topic. Each node will read from schema which connections are assigned to it, and compare it with currently opened connections. If there are some connections established on this node, but according to new schema, not assigned to this node any more it will be closed. On the other hand, if there are connections assigned to this node but currently not established, it will be established at this moment.
And that's it. This mechanism covers all cases: Single node will have all connections established, adding new nodes will result in moving connections from "old" to new one. In case some node fails, it's connections will assigned to surviving nodes. Connection deployment re-balancing process takes about half a minute due to Kafka consumer group re-balancing duration and the way we aggregate heartbeats in 5 second time windows.
Notice that there is no direct interaction between nodes, they a actually unaware of each other network location. All data is exchanged through Kafka topics.
There are certainly ways to achieve this functionality without using Kafka, however with Kafka we get very robust and yet pretty elegant solution.
Notice that there is no direct interaction between nodes, they a actually unaware of each other network location. All data is exchanged through Kafka topics.
There are certainly ways to achieve this functionality without using Kafka, however with Kafka we get very robust and yet pretty elegant solution.
Really nice and interesting post. I was looking for this kind of information and enjoyed reading this one.
ReplyDeleteHadoop Training in Velachery
Big data training in velachery
Great Article The IEEE Xplore digital library is your gateway to trusted research—journals, conferences, standards, ebooks, and educational courses—with more than 3 million articles to help you fuel imagination, build from previous research, and inspire new ideas. Node Js Projects for Final Year IEEE will pave a new way in knowledge-sharing and spreading ideas across the globe. Project Centers in Chennai for CSE Node.js Corporate Training JavaScript Training in Chennai
ReplyDeleteIncredible data. Fortunate me I went over your site by some coincidence (stumbleupon). I've book-checked it for some other time! best interiors
ReplyDeleteyoutube abone satın al
ReplyDeletecami avizesi
cami avizeleri
avize cami
no deposit bonus forex 2021
takipçi satın al
takipçi satın al
takipçi satın al
takipcialdim.com/tiktok-takipci-satin-al/
instagram beğeni satın al
instagram beğeni satın al
btcturk
tiktok izlenme satın al
sms onay
youtube izlenme satın al
no deposit bonus forex 2021
tiktok jeton hilesi
tiktok beğeni satın al
binance
takipçi satın al
uc satın al
sms onay
sms onay
tiktok takipçi satın al
tiktok beğeni satın al
twitter takipçi satın al
trend topic satın al
youtube abone satın al
instagram beğeni satın al
tiktok beğeni satın al
twitter takipçi satın al
trend topic satın al
youtube abone satın al
takipcialdim.com/instagram-begeni-satin-al/
perde modelleri
instagram takipçi satın al
instagram takipçi satın al
takipçi satın al
instagram takipçi satın al
betboo
marsbahis
sultanbet
I really like this post and would like to share a content to you free of cost. I have written an original and 100% unique article on the topic How To Build A Web Application With Node JS. The article is of length 1000 words if you agree to publish it here I can send it to you right now. Please reply me on technoligent007@gmail.com. I shall be looking forward to your reply soon. ...Thanks and regards
ReplyDeleteThe King Casino | Ventureberg
ReplyDeleteDiscover the rise and fall of the king casino, herzamanindir.com/ one titanium flat iron of the world's largest The Casino is operated by the King 바카라 사이트 Casino Group. You ventureberg.com/ can
AnyToISO Professional 3.9.8 Crack is an ISO file format converter software. It's straightforward software, and it works very well. AnyToISO Crack
ReplyDeleteChristmas Messages for Your Lover that square measure wizardly appears like Heaven to be defrayal this Christmas. Christmas MSG To Loved Ones
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteThis comment has been removed by the author.
ReplyDelete