Back to blog results

8월 2, 2018 By Mridul Verma

Building Replicated Stateful Systems using Kafka as a Commit Log

In today’s world, we are building and designing systems at scale like never seen before. Every microservice that we build is either stateful or stateless. And building replicated systems in these states is quite challenging. But before we dig into this more, let’s first look more deeply at these two different states for background and context.

Stateless Service

These services are somewhat easy to manage and scaling semantics are somewhat easy when compared to stateful services. We don’t need to worry about nodes going down or service becoming unserviceable because these services are stateless, so one can directly start using another node.

Stateful Service

These services are more challenging to manage because they have some state so we need to build these services considering the ramifications of nodes going down and service becoming unserviceable. So essentially, we need to make these services fault tolerant.

There might be two kinds of stateful services:

  • Some services serve as a cache in front of another stateful service, to guarantee better performance. These services do have a fallback, so fault tolerance is not that big of an issue for these services strictly from state loss perspective.
  • Some other services do not have any fallback, so state which is present on the service is not present anywhere else. So we need to make sure that we provide durability, so that even if some nodes are down (up to a certain limit), there is no data/state loss.

Building Fault-tolerant Services

In the context of this blog, we are going to talk about how can we make sure that these stateful services without fallback remain fault tolerant.

Let’s first see how can we make these services fault-tolerant:

  • Provide replication support in your service: This means that whenever you make a write request to one of your primary servers, make sure you send this write request to all the other replicas as well.
  • Use an open source solution or service: This can help you out in your use case, but that might involve a lot of changes in the source code of the open source solution. Repurposing an open source solution to one’s use case is somewhat difficult because you need to understand the whole implementation and its intricate details. Also managing a different fork of the open source solution is often difficult.
  • Leverage cloud-based solutions provided by different cloud service providers: DynamoDB, S3 or RDS by Amazon are a few examples. However, these may not always fit your use-cases as sometimes these cloud services (provided by cloud providers) cannot be repurposed for all needs.

The takeaway is that we might need to build our own systems which have to support replication for availability.

A Brief Introduction to Replication

As defined in the book “Designing Data Intensive Applications,” replication is defined as keeping a copy of the same data on several different nodes, potentially in different locations. Replication provides redundancy if some nodes are unavailable, the data can still be served from the remaining nodes. Replication can also help improve performance.

There are two kinds of replication models:

  • Synchronous Replication: This guarantees exactly the same replicas at the expense of higher overhead for write calls to the primary server. This overhead would be because every replica would have its own latencies and all of these latencies would always come in critical path while making a WRITE REQUEST via the primary server.
  • Asynchronous Replication: This guarantees better response time to writes but at the expense of durability because there might be some data loss if the primary dies in between the two syncs. So some data which was there on primary and was not present on its replicas would be lost.

Both of these replication techniques have their downsides. As already mentioned, synchronous replication guarantees no data loss at the expense of higher WRITE latencies and reduced fault-tolerance whereas asynchronous replication provides much better WRITE latencies at the expense of lower number of network calls and batch syncs.

Note: Both of these techniques are not so straightforward to implement and need some deep understanding of the common pitfalls of the replication process in general. See this link for more details.

Kafka to the Rescue

Apache Kafka is an open source stream processing system that makes it much easier to solve replication issues and I’ll walk you through some use cases. As already told, replication is a challenging problem to implement unless you have had experiences with it beforehand at production scale.

Here are the top three replication challenges:

  • Exactly-once Semantics while writing to the Kafka service
  • Ordered delivery of messages
  • Atomic updates to the replicas

Kafka helps us solve these three challenges that we will leverage to implement replication for our service.

  • Exactly-once semantics while writing to the Kafka service: Kafka service along with Kafka client ensures all the writes made to the Kafka service are idempotent, i.e. Kafka makes sure that there are no duplicate messages or no messages which are not committed.
  • Ordered delivery of messages: Kafka service also makes sure that the messages written by producers in a particular order are also read in that particular order by the consumers.
  • Atomic updates to the replicas: Kafka service also makes sure that you can write messages in an atomic fashion to multiple Kafka partitions. Read this for more details.

Note: To fully take advantage of Kafka, you first have to have a basic understanding of Kafka partitions. You can read more here for additional background.

In short, Kafka partitions can be considered as logical entities where producers write data and consumers read data in an orderly fashion and this partition is replicated across different nodes, so you need not worry about the fault tolerance of these partitions. Every data which is ever written to a Kafka partition is written at a particular offset and while reading, consumer specifies the offset from which it wants to consume the data.

Using Kafka to Solve Replication

First, we need to make sure that we have one Kafka partition for every replica for our primary server.

Eg. If you want to have two replicas for every primary server, then you need to have two Kafka partitions, one for each of these replicas.

After we have this setup ready, then:

  1. The primary server needs to make sure that for every WRITE Request made, apart from updating its internal state, it will also write these WRITE Requests to all the Kafka partitions belonging to the different replicas for this primary server itself.
  2. Every Kafka consumer running on those replicas will make sure that it consumes all the WRITE Requests from the assigned Kafka partition and update its internal state corresponding to the WRITE Request.
  3. So eventually all these replicas will have the same state as of primary server.

Some of the replicas might be lagging behind but this might be because of some of the other systemic issues (which is not at all under our control) but eventually, all of the replicas will have the same state.

Figure: This diagram shows the interaction between the client and the primary server along with the replicas.

Implementation

In this design, we need not worry about the nitty gritty of implementing replication because while using Kafka we get this out of the box.

There are a few things which we might need to implement in this design as well:

  • Mechanism to figure out which are the partitions associated with the replicas and if not already there, create and assign partitions to those replicas.
  • After consuming the write request, making atomic updates to all of the replica partitions. Kafka already exposes APIs for making atomic updates to multiple partitions.
  • Reading at a particular offset from Kafka. This should be a really minimal task, as there are many open source libraries (or code segments) which will do the same stuff for you.

Now let’s discuss the positive points of this design

  • We can easily guarantee eventual consistency with this design, but if you want monotonic consistency, then that can be achieved by making sure that reads are always served by the last updated replica (or by most lagging replica in terms of updates) and this can be easily figured out by checking the uncommitted offsets for all of the replicas of these Kafka partitions.
  • High Write throughput for most of the requests on Kafka cluster, see this for more details. Also in most of the benchmarks done, it seems Kafka provides single digit latencies for most of the requests. Let’s look at the request latencies

Old_Request_Latency = PWT_1 + PWT_2 + ……. + PWT_n
New_Request_Latency = PWT_1 + KWT_2 + …. + KWT_n
where
PWT_1 = Time taken to process request on Node 1
PWT_2 = Time taken to process request on Node 2
and so on
KWT_2 = Time taken to write requst to kafka partition for replica 2
KWT_3 = Time taken to write requst to kafka partition for replica 3
and so on
Old_Request_Latency encapsulated writing requests to all
of the available replicas.
New_Request_Latency includes writing request to one of the
primary servers and making sure the write request is written
on all the concerned partitions

Essentially latencies cannot be compared between these two subsystems, but having said that as there is an extra hop for introducing Kafka, there would be some additional overhead which should be really minimal considering latencies of Kafka.

If one of the replicas is having high latencies at some moment (because of high GC pauses or a disk being slow), then that could increase latencies for the WRITE requests in general. But in our case using Kafka, we can easily get over this issue as we would just be adding the WRITE REQUEST to the REPLICA’S KAFKA partition so it would be the responsibility of the REPLICA to sync itself whenever it has some CPU cycles.

Apart from these pros, there are obviously some cons in this design as well.

  • Although Kafka gives higher write throughput for the majority of the requests, there will be an additional overhead of adding another network hop in this design. The impact should be really less of adding Kafka because Kafka is really sensitive towards WRITE latencies but still there would be some impact nonetheless.

Note: Write latencies for smaller payloads would increase by a higher percentage when compared to latencies for bigger payloads. This is because of the majority of the time for smaller payloads is spent on the network roundTrip and if we increase the number of network roundTrips, then this time is bound to increase. So if we can batch the write requests into a single write request and then write it to a Kafka partition, then the overhead of this approach should be really minimal.

Also, there is one important optimization that we can do in this current design to improve the write latency.

In the current design, we have N Kafka partitions for these n replicas. If for these n replicas we have a single Kafka partition instead of the N Kafka partitions, then we will have better write throughput as we need not make write request to every Kafka partition (n Kafka partitions in total).

The only requirement is that all of these n Kafka consumers (running on n different replicas) should belong to different consumer groups because Kafka does not allow any two consumers of a single consumer group to consume the same KTP.

So having these n Kafka consumers each belonging to n different consumer groups, should improve the write throughput and latency as these n Kafka consumers will read the WRITE Requests from a single KTP.

Figure: Diagram explaining the working of replication with single partition used across all replicas

Conclusion

In every stateful system, we are trying to solve the same problem of replication over and over. However, we now know that we can delegate this meticulous task to some other service (such as Kafka) and ensure all the replicas for your stateful system can reconstruct the state of the primary server.

For additional context, check out this brief library to show you how you can leverage Kafka to build replicated systems. This library can be plugged in any stateful system.

By implementing the APIs for this library in your master and replica nodes, you will get eventually consistent replicas for your primary stateful system out of the box.

Additional Resources

Complete visibility for DevSecOps

Reduce downtime and move from reactive to proactive monitoring.

Sumo Logic cloud-native SaaS analytics

Build, run, and secure modern applications and cloud infrastructures.

Start free trial

Mridul Verma

Mridul Verma is a senior software engineer at Sumo Logic where he focuses on search performance and ensuring customers have the best search experience. Mridul is an enthusiast with a passion for problem solving.

More posts by Mridul Verma.

People who read this also enjoyed