Monthly Archives: February 2016

Kafka Study Summary

Some basic idea:

  • A topic may has many partitions.
  • A partition has to be fit in a server.
  • Partitions are distributed and replicated over servers.
  • Each partition has a leader server. If the leader server fails, then one of the follower will become leader.

Partition, Segment:

  • For a partition, it is physically stored by many segment.

kafka1
For example, a partition has below 4 segments.
kafka2

For one segment, the index and log file relationship are like:
kafka3
Basically, as long as we have offset in partition, then we can find the segment, offset in segment, then the message.

Consumer, Partition and Stream

  • If there are 10 partitions for a topic and 3 consumer instances (C1,C2,C3 started in that order) all belonging to the same Consumer Group, we can have different consumption models that allow read parallelism as below
  • Each consumer uses a single stream.In this model, when C1 starts all 10 partitions of the topic are mapped to the same stream and C1 starts consuming from that stream. When C2 starts, Kafka rebalances the partitions between the two streams. So, each stream will be assigned to 5 partitions(depending on the rebalance algorithm it might also be 4 vs 6) and each consumer consumes from its stream. Similarly, when C3 starts, the partitions are again rebalanced between the 3 streams. Note that in this model, when consuming from a stream assigned to more than one partition, the order of messages will be jumbled between partitions.
  • Each consumer uses more than one stream (say C1 uses 3, C2 uses 3 and C3 uses 4).In this model, when C1 starts, all the 10 partitions are assigned to the 3 streams and C1 can consume from the 3 streams concurrently using multiple threads. When C2 starts, the partitions are rebalanced between the 6 streams and similarly when C3 starts, the partitions are rebalanced between the 10 streams. Each consumer can consume concurrently from multiple streams. Note that the number of streams and partitions here are equal. In case the number of streams exceed the partitions, some streams will not get any messages as they will not be assigned any partitions.

Deliver guarantee

  • If a consumer fails, and another consumer continues the partition. Kafka doesn’t guarantee that the message only commits once.
  • If the application requires only one commit, application should design the logic by itself.

kafka4

Acknowledge Offset

  • If the consumer acknowledges a particular message offset, it implies that the consumer has received all messages prior to that offset in the partition.

When producer produces a record, which partition it sends to?

Take a look ProducerRecord.java:

A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional partition number, and an optional key and value.

If a valid partition number is specified that partition will be used when sending the record. If no partition is specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is present a partition will be assigned in a round-robin fashion.

Understand min.compaction.lag.ms, max.compaction.lag.ms

kafka_compact2

  • log.cleaner.min.compaction.lag.ms : by setting this value greater than 0 will prevent log cleaner from compacting messages with an age newer than a minimum message age. Allows to delay compacting records.
  • log.cleaner.max.compaction.lag.ms : The maximum amount of time message will remain uncompacted. Please note that this is not a hard deadline as it is subject to availability of log cleaner threads and actual compaction time.

Log compaction periodically runs in the background by re-copying log segments.

min.insync.replicas

Minimum number of data copies that topic are willing to run.

So you can start a 3 node cluster and create a topic with replicator-factor of 3. If one of the brokers is down there are only two copies left “in sync”. So the system will keep running if min.insync.replicas is 2 but it will stop accepting new messages if min.insync.replicas is 3.

Some people do not like to run with min.insync.replicas = 1 because if that machine’s storage is lost, the data will be lost forever.

With min.insync.replicas = 2 (or higher) there will always be another copy of the data available so a disk failure will not cause message loss.

Why only certain partition has higher lag?

Some times, we see only certain partition has higher lag. That normally because the key is not evenly distributed. Kafka has multiple partitions. When a message is published, it calculate which partition to process based on the key. For same message, if key is the same. This message will be also published to one partition. If such message takes longer time for consumer to process, this partition will have higher lag than others.

public ProducerRecord(String topic, K key, V value) {
    this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
}

kafka_key_to_same_partition

Instead, we want the message can be evenly distributed to all partitions. We can set a random number when choosing the key. Or we can set key as null. It will distribute the message to partitions randomly.

kafka_key_to_different_partition

public ProducerRecord(String topic, V value) {
    this(topic, null, null, null, value, null);
}

Below are the links where I studied from:
http://research.microsoft.com/en-us/um/people/srikanth/netdb11/netdb11papers/netdb11-final12.pdf
http://tech.meituan.com/kafka-fs-design-theory.html
http://stackoverflow.com/questions/23136500/how-kafka-broadcast-to-many-consumer-groups
https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html
https://stackoverflow.com/questions/37511922/topics-partitions-and-keys
https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html

Kafka takes on under-replication in 2.3 release


https://help.aiven.io/en/articles/3995429-topic-log-compaction
https://dzone.com/articles/kafka-architecture-log-compaction
https://www.quora.com/What-is-the-difference-between-min-insync-replicas-and-replication-factor

zookeeper java.net.ConnectException: Connection refused

Today when I ran zookeeper by zkServer.sh start. It doesn’t run. Because I couldn’t see the port 2181 by netstat command. Then I tried zkServer.sh start-foreground. It showed error below:

2016-02-01 21:38:37,110 - FATAL [main:QuorumPeerMain@83] - Invalid config, exiting abnormally
org.apache.zookeeper.server.quorum.QuorumPeerConfig$ConfigException: Error processing /home/hadoop/zookeeper/zookeeper-3.3.6/bin/../conf/zoo.cfg
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:110)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:99)
at org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
Caused by: java.lang.IllegalArgumentException: /tmp/zookeeper/myid file is missing
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parseProperties(QuorumPeerConfig.java:320)
at org.apache.zookeeper.server.quorum.QuorumPeerConfig.parse(QuorumPeerConfig.java:106)
... 2 more

After google, I found it is because I should put the myid file in dataDir. In this way, zookeeper knows the id for the server.

My configuration.
zoo.cfg, 3 servers has the same file.

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
server.1=centmaster:2888:3888
server.2=centslave1:2888:3888
server.3=centslave2:2888:3888

myid, it is a file in dataDir. It only contains the ASCII “1”:

1

After this configuration, zookeeper runs successfully.

Verify Preorder Serialization of a Binary Tree

One way to serialize a binary tree is to use pre-oder traversal. When we encounter a non-null node, we record the node’s value. If it is a null node, we record using a sentinel value such as #.

_9_
/ \
3 2
/ \ / \
4 1 # 6
/ \ / \ / \
# # # # # #
For example, the above binary tree can be serialized to the string “9,3,4,#,#,1,#,#,2,#,6,#,#”, where # represents a null node.

Given a string of comma separated values, verify whether it is a correct preorder traversal serialization of a binary tree. Find an algorithm without reconstructing the tree.

Each comma separated value in the string must be either an integer or a character ‘#’ representing null pointer.

You may assume that the input format is always valid, for example it could never contain two consecutive commas such as “1,,3”.

Example 1:
“9,3,4,#,#,1,#,#,2,#,6,#,#”
Return true

Example 2:
“1,#”
Return false

Example 3:
“9,#,#,1”
Return false

Solution. I came up with the first solution naturally by using recursion. The idea is that if current is ‘#’, we simply return true. If current is a number, call validHelper 2 times with pos++. In the end, return true if pos == list.length – 1.

public static boolean isValidSerialization(String preorder) {
    String[] list = preorder.split(",");
    int[] pos = new int[] {-1};
    if (!validHelper(list, pos) || pos[0] != list.length - 1) { // in the end, pos[0] should equal len - 1
        return false;
    }
    return true;
}

public static boolean validHelper(String[] list, int[] pos) {
    pos[0]++;
    if (pos[0] >= list.length) {  // add pos[0] first
        return false;
    }
    if (list[pos[0]].equals("#")) { // if is '#', return true.
        return true;
    }
    // if is a number, call 2 times.
    return validHelper(list, pos) && validHelper(list, pos);
}

There is another cool solution from this post. It’s about number of internal node and leaf node. Basically, number of leaf node should be always greater than number of internal node + 1. We use diff to measure the difference between number of internal node and leaf node. For each loop, diff–. If current node is an internal node, diff = diff + 2.

public static boolean isValidSerialization2(String preorder) {
    String[] nodes = preorder.split(",");
    int diff = 1;
    for (String node : nodes) {
        if (--diff < 0) {
            return false;
        }
        if (!node.equals("#")) {
            diff += 2;
        }
    }
    return diff == 0;
}

Check my code on github: link