Some basic idea:
- A topic may has many partitions.
- A partition has to be fit in a server.
- Partitions are distributed and replicated over servers.
- Each partition has a leader server. If the leader server fails, then one of the follower will become leader.
Partition, Segment:
- For a partition, it is physically stored by many segment.
For example, a partition has below 4 segments.
For one segment, the index and log file relationship are like:
Basically, as long as we have offset in partition, then we can find the segment, offset in segment, then the message.
Consumer, Partition and Stream
- If there are 10 partitions for a topic and 3 consumer instances (C1,C2,C3 started in that order) all belonging to the same Consumer Group, we can have different consumption models that allow read parallelism as below
- Each consumer uses a single stream.In this model, when C1 starts all 10 partitions of the topic are mapped to the same stream and C1 starts consuming from that stream. When C2 starts, Kafka rebalances the partitions between the two streams. So, each stream will be assigned to 5 partitions(depending on the rebalance algorithm it might also be 4 vs 6) and each consumer consumes from its stream. Similarly, when C3 starts, the partitions are again rebalanced between the 3 streams. Note that in this model, when consuming from a stream assigned to more than one partition, the order of messages will be jumbled between partitions.
- Each consumer uses more than one stream (say C1 uses 3, C2 uses 3 and C3 uses 4).In this model, when C1 starts, all the 10 partitions are assigned to the 3 streams and C1 can consume from the 3 streams concurrently using multiple threads. When C2 starts, the partitions are rebalanced between the 6 streams and similarly when C3 starts, the partitions are rebalanced between the 10 streams. Each consumer can consume concurrently from multiple streams. Note that the number of streams and partitions here are equal. In case the number of streams exceed the partitions, some streams will not get any messages as they will not be assigned any partitions.
Deliver guarantee
- If a consumer fails, and another consumer continues the partition. Kafka doesn’t guarantee that the message only commits once.
- If the application requires only one commit, application should design the logic by itself.
Acknowledge Offset
- If the consumer acknowledges a particular message offset, it implies that the consumer has received all messages prior to that offset in the partition.
When producer produces a record, which partition it sends to?
Take a look ProducerRecord.java:
A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional partition number, and an optional key and value.
If a valid partition number is specified that partition will be used when sending the record. If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion.
Understand min.compaction.lag.ms, max.compaction.lag.ms
log.cleaner.min.compaction.lag.ms
: by setting this value greater than 0 will prevent log cleaner from compacting messages with an age newer than a minimum message age. Allows to delay compacting records.log.cleaner.max.compaction.lag.ms
: The maximum amount of time message will remain uncompacted. Please note that this is not a hard deadline as it is subject to availability of log cleaner threads and actual compaction time.
Log compaction periodically runs in the background by re-copying log segments.
min.insync.replicas
Minimum number of data copies that topic are willing to run.
So you can start a 3 node cluster and create a topic with replicator-factor of 3. If one of the brokers is down there are only two copies left “in sync”. So the system will keep running if min.insync.replicas is 2 but it will stop accepting new messages if min.insync.replicas is 3.
Some people do not like to run with min.insync.replicas = 1 because if that machine’s storage is lost, the data will be lost forever.
With min.insync.replicas = 2 (or higher) there will always be another copy of the data available so a disk failure will not cause message loss.
Why only certain partition has higher lag?
Some times, we see only certain partition has higher lag. That normally because the key is not evenly distributed. Kafka has multiple partitions. When a message is published, it calculate which partition to process based on the key. For same message, if key is the same. This message will be also published to one partition. If such message takes longer time for consumer to process, this partition will have higher lag than others.
public ProducerRecord(String topic, K key, V value) { this(topic, (Integer)null, (Long)null, key, value, (Iterable)null); }
Instead, we want the message can be evenly distributed to all partitions. We can set a random number when choosing the key. Or we can set key as null. It will distribute the message to partitions randomly.
public ProducerRecord(String topic, V value) { this(topic, null, null, null, value, null); }
Below are the links where I studied from:
http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
http://tech.meituan.com/kafka-fs-design-theory.html
http://stackoverflow.com/questions/23136500/how-kafka-broadcast-to-many-consumer-groups
https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html
https://stackoverflow.com/questions/37511922/topics-partitions-and-keys
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
https://help.aiven.io/en/articles/3995429-topic-log-compaction
https://dzone.com/articles/kafka-architecture-log-compaction
https://www.quora.com/What-is-the-difference-between-min-insync-replicas-and-replication-factor