Kehaw

为什么Kafka会丢失消息


在本文中,我们来对 Kafka 集群进行暴力分析,从而搞清楚到底在什么情况下 Kafka 会丢失数据。在阅读本文之前,建议先了解一些关于 Kafka 容错功能相关的文章,因为本文建立在你已经了解了ACK(acknowledgements)和 replication 的概念基础之上。

我将会尽可能的模拟所有可能的情况,在每种情况下,我都会往一个叫做test1的topic中写入数据,在结束的时候,我们来查看总共丢失了多少条消息,我这里没有考虑网络不稳定的情况,在网络不稳定(比如延迟、丢包)情况下的测试,将在实际情况发生后进行讨论研究。

这里有一些辅助测试的bash脚本来执行测试任务,例如在场景之间重新创建集群,创建topic等,如果打算自己亲自测试的话,你可以在GitHub上找到这些脚本。

下载好这些脚本之后,当服务器上还没有Kafka集群运行的时候,执行以下命令来初始化环境:

$ blockade up
$ bash update-hosts.sh
$ bash create-topic.sh kafka1 test1

而需要重新创建集群的时候,请执行下列命令:

$ bash reset-cluster.sh
$ bash create-topic.sh kafka1 test1

场景0:ACK = ALL 开启事务

有一种情况,即事务失败之后,依然接收到了offset的commit,导致偏移量变化,再次消费的时候,无法获取到失败的那个offset而可能是另外一个offset,所以导致消息丢失。

解决方法为在开启事务的前提下,请设置auto.commit.enabled = false

场景1:ACK = 0 当一个节点发生故障,Partition Leader 进行 fail-over

在这个场景下,我们将发送十万条消息,其中acks = 0,这意味着客户端不需要返回任何确认。在大约三万条消息的时候,我将会 kill 掉 leader partition 节点。

首先执行下列命令初始化环境。

$ bash create-topic.sh kafka1 test1
Created topic "test1".
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:
Topic: test1    Partition: 0    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2

接下来,我们开始发消息。 它以每秒一万条消息的速度向“test1”发送十万条消息。它使用的ack模式为0(Fire-and-Forget)。

$ python producer.py 100000 0.0001 0 test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
%3|1537022288.472|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9094/bootstrap]: 172.17.0.5:9094/3: Receive failed: Connection reset by peer
%3|1537022288.472|ERROR|rdkafka#producer-1| [thrd:172.17.0.5:9094/bootstrap]: 172.17.0.5:9094/3: Receive failed: Connection reset by peer
%3|1537022288.572|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9094/bootstrap]: 172.17.0.5:9094/3: Connect to ipv4#172.17.0.5:9094 failed: Connection refused
%3|1537022288.572|ERROR|rdkafka#producer-1| [thrd:172.17.0.5:9094/bootstrap]: 172.17.0.5:9094/3: Connect to ipv4#172.17.0.5:9094 failed: Connection refused
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Sent: 100000
Delivered: 100000
Failed: 0

在消费了差不多快三万条消息的时候,我们开始杀死Partition leader。

blockade kill kafka3

当所有的消息发送结束之后,我们通过运行脚本来获取当前最新的offset,从而得到topic保留了多少消息。print-hw.sh脚本包含三个参数,分别是:将要运行命令行的broker名称、端口号和topic名。

bash print-hw.sh kafka2 19093 test1
test1:0:93236

print-hw.sh的代码如下:

#!/bin/bash
CONTAINER_ID=$(docker ps | grep $1 | awk '{ print $1 }')
docker exec -t $CONTAINER_ID kafka-run-class kafka.tools.GetOffsetShell --broker-list $1:$2 --time -1 --topic $3

我们看到我们丢失了producer发送的6764条消息。这是由于连接故障和leader fail-over共同造成的。

fail-over: 故障转移

场景2:ACK = 1 当一个节点发生故障,Partition Leader 进行 fail-over

producer 配置 request.required.acks = 1告诉代理,一旦leader partition 已将消息写入其本地日志,则 producer 需要确认。这种情况下仍然可能会丢失消息,但是丢失的消息数量会少于acks = 0,因为我们仅仅会因为leader进行了fail-over才会丢失数据。

我们重新创建Kafka环境:

$ bash reset-cluster.sh 
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION
kafka1          e180fe0cc4fa    UP      172.17.0.3      NORMAL
kafka2          637e7ef260ba    UP      172.17.0.4      NORMAL
kafka3          ccffaab3d0c0    UP      172.17.0.5      NORMAL
zk1             d751f9f7aaac    UP      172.17.0.2      NORMAL 
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3

$ bash create-topic.sh kafka1 test1
Created topic "test1".
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: test1    Partition: 0    Leader: 1    Replicas: 1,3,2    Isr: 1,3,2

这一次,leader 存在于 broker1 之上,现在我们开始发送十万条消息,记住此时 ACK = 1:

$ python producer.py 100000 0.0001 1 test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
%3|1537091610.422|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Receive failed: Connection reset by peer
%3|1537091610.422|ERROR|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Receive failed: Connection reset by peer
Success: 35187 Failed: 4813
%3|1537091610.527|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Connect to ipv4#172.17.0.3:9092 failed: Connection refused
%3|1537091610.527|ERROR|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: Connect to ipv4#172.17.0.3:9092 failed: Connection refused
Success: 39356 Failed: 10644
Success: 49356 Failed: 10644
Success: 59356 Failed: 10644
Success: 69356 Failed: 10644
Success: 79356 Failed: 10644
Success: 89356 Failed: 10644
Sent: 100000
Delivered: 89356
Failed: 10644

大概在消费了三万条消息之后,kill掉broker1:

blockade kill kafka1

在acks = 1的情况下,仅确认(ACK)了89356条消息,但我们希望所有或几乎所有已经被ACK的消息都将在故障转移后继续存在。

$ bash print-hw.sh kafka2 19093 test1
test1:0:89356

我们看到所有被ACK的消息都被保存了下来。

我们重新执行这个测试方案,这时我们采用10个producer同时发送消息,每个producer发送十万条数据。我通过Python脚本后台执行这些命令。

$ bash concurrent-producer.sh 100000 0.0001 1 test1
Run ID: 2 Success: 10000 Failed: 0
Run ID: 4 Success: 10000 Failed: 0
Run ID: 6 Success: 10000 Failed: 0
Run ID: 10 Success: 10000 Failed: 0
Run ID: 10 Success: 20000 Failed: 0
Run ID: 3 Success: 10000 Failed: 0
Run ID: 3 Success: 20000 Failed: 0
Run ID: 3 Success: 30000 Failed: 0
Run ID: 3 Success: 40000 Failed: 0
Run ID: 3 Success: 50000 Failed: 0
Run ID: 1 Success: 10000 Failed: 0
Run ID: 1 Success: 20000 Failed: 0
Run ID: 1 Success: 30000 Failed: 0
Run ID: 9 Success: 10000 Failed: 0
Run ID: 9 Success: 20000 Failed: 0
Run ID: 9 Success: 30000 Failed: 0
Run ID: 9 Success: 40000 Failed: 0
Run ID: 9 Success: 50000 Failed: 0
Run ID: 9 Success: 60000 Failed: 0
%3|1537094097.198|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.198|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.198|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.198|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.198|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.198|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.198|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.198|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.199|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.201|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.201|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.198|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.199|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.201|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.201|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.202|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.206|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.206|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.209|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
%3|1537094097.209|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Receive failed: Connection reset by peer
Run ID: 2 Success: 19930 Failed: 70
%3|1537094097.337|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.337|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.338|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.338|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.344|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.344|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
Run ID: 8 Success: 8495 Failed: 1505
%3|1537094097.354|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.355|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.358|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.358|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
Run ID: 7 Success: 7435 Failed: 2565
%3|1537094097.362|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.364|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
Run ID: 5 Success: 6966 Failed: 3034
%3|1537094097.386|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.394|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.387|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.394|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.395|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.395|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.404|FAIL|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
%3|1537094097.404|ERROR|rdkafka#producer-1| [thrd:172.17.0.4:9093/bootstrap]: 172.17.0.4:9093/2: Connect to ipv4#172.17.0.4:9093 failed: Connection refused
Run ID: 1 Success: 37279 Failed: 2721
Run ID: 6 Success: 14414 Failed: 5586
Run ID: 10 Success: 23463 Failed: 6537
Run ID: 4 Success: 12325 Failed: 7675
Run ID: 9 Success: 62589 Failed: 7411
Run ID: 2 Success: 19930 Failed: 10070
Run ID: 3 Success: 51539 Failed: 8461
Run ID: 9 Success: 62589 Failed: 17411
Run ID: 8 Success: 8495 Failed: 11505
Run ID: 7 Success: 7435 Failed: 12565
Run ID: 3 Success: 51539 Failed: 18461
Run ID: 5 Success: 6966 Failed: 13034
Run ID: 1 Success: 37279 Failed: 12721
Run ID: 10 Success: 23463 Failed: 16537
Run ID: 6 Success: 14414 Failed: 15586
Run ID: 7 Success: 7435 Failed: 22565
Run ID: 4 Success: 12325 Failed: 17675
Run ID: 2 Success: 19930 Failed: 20070
Run ID: 1 Success: 37279 Failed: 22721
Run ID: 8 Success: 8495 Failed: 21505
Run ID: 10 Success: 23463 Failed: 26537
Run ID: 5 Success: 6966 Failed: 23034
Run ID: 6 Success: 14414 Failed: 25586
Run ID: 7 Success: 7435 Failed: 32565
Run ID: 1 Success: 37279 Failed: 32721
Run ID: 10 Success: 23463 Failed: 36537
Run ID: 2 Success: 19930 Failed: 30070
Run ID: 4 Success: 12325 Failed: 27675
Run ID: 7 Success: 7435 Failed: 42565
Run ID: 8 Success: 8495 Failed: 31505
Run ID: 5 Success: 6966 Failed: 33034
Run ID: 1 Success: 37279 Failed: 42721
Run ID: 6 Success: 14414 Failed: 35586
Run ID: 7 Success: 7435 Failed: 52565
Run ID: 2 Success: 19930 Failed: 40070
Run ID: 4 Success: 12325 Failed: 37675
Run ID: 8 Success: 8495 Failed: 41505
Run ID: 5 Success: 6966 Failed: 43034
Run ID: 6 Success: 14414 Failed: 45586
Run ID: 4 Success: 12325 Failed: 47675
Run ID: 8 Success: 8495 Failed: 51505
Run ID: 5 Success: 6966 Failed: 53034
Run ID: 5 Success: 6966 Failed: 63034
Run ID: 2 Success: 29608 Failed: 40392
Run ID: 3 Success: 52695 Failed: 27305
Run ID: 3 Success: 62695 Failed: 27305
Run ID: 4 Success: 16069 Failed: 53931
Run ID: 6 Success: 17998 Failed: 52002
Run ID: 7 Success: 10298 Failed: 59702
Run ID: 9 Success: 63620 Failed: 26380
Run ID: 1 Success: 46565 Failed: 43435
Run ID: 10 Success: 27688 Failed: 42312
Run ID: 2 Success: 39608 Failed: 40392
Run ID: 2 Success: 49608 Failed: 40392
Run ID: 2 Success: 59608 Failed: 40392
Run ID: 2 Sent: 100000 Delivered: 59608 Failed: 40392
Run ID: 4 Success: 26069 Failed: 53931
Run ID: 3 Success: 72695 Failed: 27305
Run ID: 3 Sent: 100000 Delivered: 72695 Failed: 27305
Run ID: 5 Success: 16393 Failed: 63607
Run ID: 4 Success: 36069 Failed: 53931
Run ID: 5 Success: 26393 Failed: 63607
Run ID: 4 Success: 46069 Failed: 53931
Run ID: 4 Sent: 100000 Delivered: 46069 Failed: 53931
Run ID: 5 Success: 36393 Failed: 63607
Run ID: 5 Sent: 100000 Delivered: 36393 Failed: 63607
Run ID: 6 Success: 27998 Failed: 52002
Run ID: 6 Success: 37998 Failed: 52002
Run ID: 6 Success: 47998 Failed: 52002
Run ID: 6 Sent: 100000 Delivered: 47998 Failed: 52002
Run ID: 7 Success: 20298 Failed: 59702
Run ID: 7 Success: 30298 Failed: 59702
Run ID: 8 Success: 16788 Failed: 53212
Run ID: 7 Success: 40298 Failed: 59702
Run ID: 7 Sent: 100000 Delivered: 40298 Failed: 59702
Run ID: 8 Success: 26788 Failed: 53212
Run ID: 8 Success: 36788 Failed: 53212
Run ID: 8 Success: 46788 Failed: 53212
Run ID: 8 Sent: 100000 Delivered: 46788 Failed: 53212
Run ID: 9 Success: 73620 Failed: 26380
Run ID: 9 Sent: 100000 Delivered: 73620 Failed: 26380
Run ID: 1 Success: 56565 Failed: 43435
Run ID: 1 Sent: 100000 Delivered: 56565 Failed: 43435
Run ID: 10 Success: 37688 Failed: 42312
Run ID: 10 Success: 47688 Failed: 42312
Run ID: 10 Success: 57688 Failed: 42312
Run ID: 10 Sent: 100000 Delivered: 57688 Failed: 42312
Runs complete

我们在执行过程中kill掉leader,成功的总数为537722条。

bash print-hw.sh kafka1 19092 test1
test1:0:537717

我们发现,10个Producer发送了总共100万条数据,但是有5条已经被ACK的消息丢失了!

再次执行这个测试用例。

$ bash concurrent-producer-silent.sh 100000 0.0001 1 test1
Runs complete
Acknowledged total: 766804

$ bash print-hw.sh kafka1 19092 test1
test1:0:766793

这次它丢失了11条已经被ACK的消息,总体来说还不错。

场景3:ACK = ALL

我们将重复执行相同的并发,即十个Producer发送100万条消息,中途杀死leader。

$ bash concurrent-producer-silent.sh 100000 0.0001 all test1
Runs complete
Acknowledged total: 889751

在大概十秒之后,我们杀死了leader,最后我们得到的结果如下:

$ bash print-hw.sh kafka2 19093 test1
test1:0:889758

ACK = ALL 的情况下,我们没有丢失数据,事实上还多出来7条没有被ACK的消息。因此,我们看到在ACK = ALL的时候,当leader发生故障的时候不会丢失已经被ACK的消息。

场景4:ACK = 1 将Leader与Zookeeper和其他Node隔离开

Kafka leader与Zookeeper失去通信应该会导致更多的消息丢失。这里所指的失去通信是永久失去通信,也叫隔离

在这种情况下,我以大约一万条消息每秒的速度发送了十万条消息,大约在三万条消息的时候,我通过命令让kafka leader与Zookeeper断开了通信。

$ python producer.py 100000 0.0001 1 test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
... (over 33000 error lines here)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Success: 66453 Failed: 33547
Sent: 100000
Delivered: 66453
Failed: 33547

在中途隔离(失去与Zookeeper的通信)了kafka2。

$ blockade partition kafka2

我们可以看到在确认了六万条消息之后,开始出现time out错误,在结束之后,我们总共确认了66453条消息。

在下面的输出中,我们可以看到,当leader失败之后,切换到了kafka3,但是消息只剩下了34669条,意味着我们丢失了31784条消息。

$ bash print-topic-details.sh kafka3 test1
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: test1    Partition: 0    Leader: 3    Replicas: 2,3,1    Isr: 3,1

$ bash print-hw.sh kafka1 19092 test1
test1:0:34669

让我们更深入地了解发生了什么。我有一个slow-producer.py脚本,它每秒发布一条消息并打印出更多信息。我们将重置集群,运行这个新脚本,然后再次隔离leader。

这个脚本加入了一些打印信息如下:

  • 在开始和发生改变的时候打印leader的id
  • 当消息被ACK之后,打印消息的值和offset信息
  • 检测消息的offset低于警告值的时候发出警告
$ python slow-producer.py 100 1 1 test1
Current leader: 1
Value: 1 Offset: none
Value: 2 Offset: 1
Value: 3 Offset: 2
Value: 4 Offset: 3
Value: 5 Offset: 4
Value: 6 Offset: 5
Value: 7 Offset: 6
Value: 8 Offset: 7
Value: 9 Offset: 8
Value: 10 Offset: 9
Value: 11 Offset: 10
Value: 12 Offset: 11
Value: 13 Offset: 12
Value: 14 Offset: 13
Value: 15 Offset: 14
Value: 16 Offset: 15
Value: 17 Offset: 16
Value: 18 Offset: 17
Value: 19 Offset: 18
Value: 20 Offset: 19
Value: 21 Offset: 20
Value: 22 Offset: 21
Value: 23 Offset: 22
Value: 24 Offset: 23
Value: 25 Offset: 24
Value: 26 Offset: 25
Value: 27 Offset: 26
Value: 28 Offset: 27
%3|1537197027.978|FAIL|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: 1 request(s) timed out: disconnect
%3|1537197027.978|ERROR|rdkafka#producer-1| [thrd:172.17.0.3:9092/bootstrap]: 172.17.0.3:9092/1: 1 request(s) timed out: disconnect
Current leader: 2
ERROR! Value: 29 Offset: none Error: Local: Message timed out
ERROR! Value: 30 Offset: none Error: Local: Message timed out
ERROR! Value: 31 Offset: none Error: Local: Message timed out
ERROR! Value: 32 Offset: none Error: Local: Message timed out
ERROR! Value: 33 Offset: none Error: Local: Message timed out
ERROR! Value: 34 Offset: none Error: Local: Message timed out
ERROR! Value: 35 Offset: none Error: Local: Message timed out
ERROR! Value: 36 Offset: none Error: Local: Message timed out
ERROR! Value: 37 Offset: none Error: Local: Message timed out
ERROR! Value: 38 Offset: none Error: Local: Message timed out
ERROR! Value: 39 Offset: none Error: Local: Message timed out
ERROR! Value: 40 Offset: none Error: Local: Message timed out
ERROR! Value: 41 Offset: none Error: Local: Message timed out
ERROR! Value: 42 Offset: none Error: Local: Message timed out
ERROR! Value: 43 Offset: none Error: Local: Message timed out
ERROR! Value: 44 Offset: none Error: Local: Message timed out
ERROR! Value: 45 Offset: none Error: Local: Message timed out
ERROR! Value: 46 Offset: none Error: Local: Message timed out
ERROR! Value: 47 Offset: none Error: Local: Message timed out
ERROR! Value: 48 Offset: none Error: Local: Message timed out
ERROR! Value: 49 Offset: none Error: Local: Message timed out
ERROR! Value: 50 Offset: none Error: Local: Message timed out
ERROR! Value: 51 Offset: none Error: Local: Message timed out
ERROR! Value: 52 Offset: none Error: Local: Message timed out
ERROR! Value: 53 Offset: none Error: Local: Message timed out
ERROR! Value: 54 Offset: none Error: Local: Message timed out
ERROR! Value: 55 Offset: none Error: Local: Message timed out
ERROR! Value: 56 Offset: none Error: Local: Message timed out
ERROR! Value: 57 Offset: none Error: Local: Message timed out
ERROR! Value: 58 Offset: none Error: Local: Message timed out
ERROR! Value: 59 Offset: none Error: Local: Message timed out
ERROR! Value: 60 Offset: none Error: Local: Message timed out
ERROR! Value: 61 Offset: none Error: Local: Message timed out
ERROR! Value: 62 Offset: none Error: Local: Message timed out
ERROR! Value: 63 Offset: none Error: Local: Message timed out
ERROR! Value: 64 Offset: none Error: Local: Message timed out
ERROR! Value: 65 Offset: none Error: Local: Message timed out
ERROR! Value: 66 Offset: none Error: Local: Message timed out
ERROR! Value: 67 Offset: none Error: Local: Message timed out
ERROR! Value: 68 Offset: none Error: Local: Message timed out
ERROR! Value: 69 Offset: none Error: Local: Message timed out
ERROR! Value: 70 Offset: none Error: Local: Message timed out
ERROR! Value: 71 Offset: none Error: Local: Message timed out
ERROR! Value: 72 Offset: none Error: Local: Message timed out
ERROR! Value: 73 Offset: none Error: Local: Message timed out
ERROR! Value: 74 Offset: none Error: Local: Message timed out
ERROR! Value: 75 Offset: none Error: Local: Message timed out
ERROR! Value: 76 Offset: none Error: Local: Message timed out
ERROR! Value: 77 Offset: none Error: Local: Message timed out
ERROR! Value: 78 Offset: none Error: Local: Message timed out
ERROR! Value: 79 Offset: none Error: Local: Message timed out
ERROR! Value: 80 Offset: none Error: Local: Message timed out
ERROR! Value: 81 Offset: none Error: Local: Message timed out
ERROR! Value: 82 Offset: none Error: Local: Message timed out
ERROR! Value: 83 Offset: none Error: Local: Message timed out
ERROR! Value: 84 Offset: none Error: Local: Message timed out
ERROR! Value: 85 Offset: none Error: Local: Message timed out
ERROR! Value: 86 Offset: none Error: Local: Message timed out
ERROR! Value: 87 Offset: none Error: Local: Message timed out
ERROR! Value: 88 Offset: none Error: Local: Message timed out
Partition fail-over, messages lost: 12
Value: 89 Offset: 15
Value: 90 Offset: 16
Value: 91 Offset: 17
Value: 92 Offset: 18
Value: 93 Offset: 19
Value: 94 Offset: 20
Value: 95 Offset: 21
Value: 96 Offset: 22
Value: 97 Offset: 23
Value: 98 Offset: 24
Value: 99 Offset: 25
Value: 100 Offset: 26
Sent: 100
Delivered: 40
Failed: 60

因此,我们看到,完全隔离的节点比acks = 1的节点故障更差,因为代理需要一段时间才能检测到它失去了Zookeeper连接。在发送十万条消息时,我们丢失了31784条已ACK的消息!因为即使重新选举了新的leader,kafka1仍在接受消息。

场景5:ACK = ALL 将Leader与Zookeeper和其他Node隔离开

除了设置acks = all以外,其他的操作步骤与场景4一模一样。

$ python producer.py 100000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
... (many many timeout errors here)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Success: 37640 Failed: 62360
Sent: 100000
Delivered: 37640
Failed: 62360

最终,我们收到了37640条确认的消息。

bash print-topic-details.sh kafka1 test1
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: test1    Partition: 0    Leader: 1    Replicas: 2,1,3    Isr: 1,3
$ bash print-hw.sh kafka1 19092 test1
test1:0:37641

我们看到新的leader变成了kafka1,而且我们没有丢失任何消息。实际上,topic中居然还有一条没有确认的消息。

因此,必须使用Acks = all来避免领导者故障转移中的数据丢失,无论它们是由于网络分区还是节点故障引起的。

重新执行10个Producer发送消息,尽管我们得到ACK的消息非常少,但是已经被ACK的消息并没有丢失。

场景6:ACK = 1 仅断开Leader与Zookeeper的通信,但保持与其他Node的通信

在这种情况下,我们将leader与Zookeeper隔离,而不与其他Kafka节点隔离。

$ bash create-topic.sh kafka3 test1
Created topic "test1".
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: test1    Partition: 0    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1
$ bash print-controller.sh kafka2
1

消息开始发送:

$ python producer.py 100000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
... (many many time out errors)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Success: 89027 Failed: 10973
Sent: 100000
Delivered: 89027
Failed: 10973

中途隔离kafka2与Zookeeper的通信。

$ blockade partition kafka1,kafka2,kafka3 kafka1,kafka3,zk1

最终计数显示了89027条已确认的消息,只有45518条进入topic,意味着我们丢失了43509条消息。

$ bash print-hw.sh kafka1 19092 test1
test1:0:45518

同样,我们使用slow-producer.py与日志结合起来,确认发生了什么:

$ python slow-producer.py 100 1 1 test1
Current leader: 3
Value: 1 Offset: none
Value: 2 Offset: 1
Value: 3 Offset: 2
Value: 4 Offset: 3
Value: 5 Offset: 4
Value: 6 Offset: 5
Value: 7 Offset: 6
Value: 8 Offset: 7
Value: 9 Offset: 8
Value: 10 Offset: 9
Value: 11 Offset: 10
Value: 12 Offset: 11
Value: 13 Offset: 12
Value: 14 Offset: 13
Value: 15 Offset: 14
Value: 16 Offset: 15
Value: 17 Offset: 16
Value: 18 Offset: 17
Value: 19 Offset: 18
Value: 20 Offset: 19
Value: 21 Offset: 20
Value: 22 Offset: 21
Value: 23 Offset: 22
Value: 24 Offset: 23
Value: 25 Offset: 24
Value: 26 Offset: 25
Value: 27 Offset: 26
Value: 28 Offset: 27
Value: 29 Offset: 28
Value: 30 Offset: 29
Value: 31 Offset: 30
Value: 32 Offset: 31
Value: 33 Offset: 32
%3|1537212099.509|FAIL|rdkafka#producer-1| [thrd:172.17.0.5:9094/bootstrap]: 172.17.0.5:9094/3: 1 request(s) timed out: disconnect
%3|1537212099.509|ERROR|rdkafka#producer-1| [thrd:172.17.0.5:9094/bootstrap]: 172.17.0.5:9094/3: 1 request(s) timed out: disconnect
Current leader: 1
ERROR! Value: 34 Offset: none Error: Local: Message timed out
ERROR! Value: 35 Offset: none Error: Local: Message timed out
ERROR! Value: 36 Offset: none Error: Local: Message timed out
ERROR! Value: 37 Offset: none Error: Local: Message timed out
ERROR! Value: 38 Offset: none Error: Local: Message timed out
ERROR! Value: 39 Offset: none Error: Local: Message timed out
...
ERROR! Value: 90 Offset: none Error: Local: Message timed out
ERROR! Value: 91 Offset: none Error: Local: Message timed out
ERROR! Value: 92 Offset: none Error: Local: Message timed out
ERROR! Value: 93 Offset: none Error: Local: Message timed out
Partition fail-over, messages lost: 12
Value: 94 Offset: 20
Value: 95 Offset: 21
Value: 96 Offset: 22
Value: 97 Offset: 23
Value: 98 Offset: 24
Value: 99 Offset: 25
Value: 100 Offset: 26
Sent: 100
Delivered: 40
Failed: 60
Kafka3 Logs
[2018-09-16 19:20:31,877] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-09-16 19:20:37,884] WARN Client session timed out, have not heard from server in 7327ms for sessionid 0x165e8aa663e0005 (org.apache.zookeeper.ClientCnxn)
[2018-09-16 19:20:37,884] INFO Client session timed out, have not heard from server in 7327ms for sessionid 0x165e8aa663e0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-09-16 19:20:39,034] INFO [Partition test1-0 broker=3] Shrinking ISR from 3,1,2 to 3 (kafka.cluster.Partition)
[2018-09-16 19:20:39,609] INFO Opening socket connection to server zk1/172.17.0.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2018-09-16 19:20:43,991] WARN Client session timed out, have not heard from server in 6005ms for sessionid 0x165e8aa663e0005 (org.apache.zookeeper.ClientCnxn)
[2018-09-16 19:20:43,991] INFO Client session timed out, have not heard from server in 6005ms for sessionid 0x165e8aa663e0005, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-09-16 19:20:44,095] INFO [ZooKeeperClient] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
We see that kafka3, the original leader loses it connection to Zookeeper. Soon after it tries to shrink the ISR to itself but cannot and declares it will wait to be connected.
kafka2 log
[2018-09-16 19:20:26,154] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions test1-0 (kafka.server.ReplicaFetcherManager)
[2018-09-16 19:20:26,185] INFO [ReplicaFetcherManager on broker 2] Added fetcher for partitions List([test1-0, initOffset 20 to broker BrokerEndPoint(1,kafka1,19092)] ) (kafka.server.ReplicaFetcherManager)
[2018-09-16 19:20:26,186] INFO [ReplicaAlterLogDirsManager on broker 2] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)
[2018-09-16 19:20:26,192] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Starting (kafka.server.ReplicaFetcherThread)
[2018-09-16 19:20:26,199] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2018-09-16 19:20:26,209] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1307994263, epoch=10152) to node 3: java.nio.channels.ClosedSelectorException. (org.apache.kafka.clients.FetchSessionHandler)
[2018-09-16 19:20:26,210] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
[2018-09-16 19:20:26,211] INFO [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
[2018-09-16 19:20:26,245] INFO [Log partition=test1-0, dir=/var/lib/kafka/data] Truncating to 20 has no effect as the largest offset in the log is 19 (kafka.log.Log)
We see that kafka2 removes the existing fetcher and creates a new one, for the new leader, starting at offset 20.
kafka1 Log
[2018-09-16 19:20:26,024] INFO [Controller id=1] Newly added brokers: , deleted brokers: 3, all live brokers: 1,2 (kafka.controller.KafkaController)
[2018-09-16 19:20:26,025] INFO [RequestSendThread controllerId=1] Shutting down (kafka.controller.RequestSendThread)
[2018-09-16 19:20:26,029] INFO [RequestSendThread controllerId=1] Stopped (kafka.controller.RequestSendThread)
[2018-09-16 19:20:26,031] INFO [RequestSendThread controllerId=1] Shutdown completed (kafka.controller.RequestSendThread)
[2018-09-16 19:20:26,069] INFO [Controller id=1] Broker failure callback for 3 (kafka.controller.KafkaController)
[2018-09-16 19:20:26,074] INFO [Controller id=1] Removed ArrayBuffer() from list of shutting down brokers. (kafka.controller.KafkaController)
[2018-09-16 19:20:26,148] INFO [RequestSendThread controllerId=1] Controller 1 connected to kafka2:19093 (id: 2 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
[2018-09-16 19:20:26,171] INFO [RequestSendThread controllerId=1] Controller 1 connected to kafka1:19092 (id: 1 rack: null) for sending state change requests (kafka.controller.RequestSendThread)
[2018-09-16 19:20:26,180] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions test1-0 (kafka.server.ReplicaFetcherManager)
[2018-09-16 19:20:26,181] INFO [Partition test1-0 broker=1] test1-0 starts at Leader Epoch 1 from offset 20. Previous Leader Epoch was: 0 (kafka.cluster.Partition)

我们看到kafka1通过Zookeeper发现kafka3消失了,并且停止了其提取程序。 kafka1上的Partition被选为leader。

请注意,fail-over发生在19:20:26,而kafka3仍认为它是19:20:39的leader,在19:20:40停止接收消息——在fail-over之后后整整14秒。

结论是,即使仅与Zookeeper隔离kafka节点,acks = 1的情况下也会丢失消息。

场景7:ACK = ALL 仅断开Leader与Zookeeper的通信,但保持与其他Node的通信

直接上结论 —— 没有丢失数据

我们将重复与场景6相同的操作,除了我们设置acks = all

$ python producer.py 100000 0.0001 all test1
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=REQUEST_TIMED_OUT,val=7,str="Broker: Request timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
... (many many time out errors)
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}
Success: 46419 Failed: 53581
Sent: 100000
Delivered: 46419
Failed: 53581

我们得到了46419条已经被确认的消息。

$ bash print-hw.sh kafka1 19092 test1
test1:0:46419

最终的检测结果表明所有的消息都没有丢失。

结论

由于acks = 1的节点故障而导致的故障转移所导致的消息丢失率非常低,我们将在接下来的第二部分看到影响网络速度的因素对该数量有何影响,而acks = all等同于RabbitMQ的发布者确认.

因此:如果一定要确保消息不丢失,请设置 `ACKS = ALL`。

参考:https://jack-vanlightly.com/blog/2018/9/14/how-to-lose-messages-on-a-kafka-cluster-part1

Kehaw

👨‍💻Ke Haw 🇨🇳👨‍👩‍👧‍👦

风吹云散去,夜色好观星
Java | 前端 | 大数据

专注于 Spring Cloud 微服务架构与数据处理,研究一切与Java相关的开发技术,包括一部分前端技术。

目前的工作主要是关于B2B大宗商品在线交易领域的数据处理。如果对本站的部分内容感兴趣,请通过邮件、Twitter联系我🤝。

Fork me on Gitee
基于Spring Security + OAuth2 + JWT 的权限认证(一) Java-Stream学习第四步:数据处理 Java-Stream学习第三步:终端操作 Java-Stream学习第二步:处理流 Java-Stream学习第一步:创建流 Electron使用串口通信 Electron下调用DLL文件 国外SaaS服务供应商都是干什么的:Part1 为什么Kafka会丢失消息 Spring Boot中使用JSR380验证框架
Description lists
Kehaw's blog
Site description
人初做事,如鸡伏卵,不舍而生气渐充;如燕营巢,不息而结构渐牢;如滋培之木,不见其长,有时而大;如有本之泉,不舍昼夜,盈科而后进,放乎四海。
Copyright
© 2014 Copyright Kehaw | All rights reserved.