Kafka nightmare replication issues on FreeBSD.

Mon, Jun 25, 2018

I recently created a tool for a client to export some data from a Kafka topic and after waiting about 10 minutes for it to export, the program returned a rather bizarre error:

kafka: error while consuming telemetry/0: kafka server: Message contents does not match its CRC.

Intuition gave me a bad feeling about that error, but I was not prepared for what was going to come next. I decide to look up the error quickly and the error indicated that the CRC calculated by the consumer was not matching the message header. Huh, what was going on here?

Before I dive into this article, it’s helpful to learn a little bit about the software that was being run when we started running into this problem. We had 3 VMs on an Azure cluster that were running FreeBSD. The producers were using https://github.com/Shopify/sarama and the consumers were using https://github.com/bsm/sarama-cluster

What’s also interesting is, no matter how many machines we might have had in the cluster (if they were all FreeBSD), we most likely would have experienced failure across the board. This wasn’t really great since the entire point of replicating the cluster across N machines was to prevent complete system failure. In this case, it really wouldn’t have helped us. A tear falls down my cheek for failed distributed programming promises.

Software Version
FreeBSD 11.0-RELEASE-p8
ZFS -
Kafka 0.10.2
Zookeeper 3.4.10
OpenJDK 1.8.0_121-b13
Sarama 5e8fd95863bd4a894fcd29225547d56967f189ad
Sarama-cluster d98592677c0aa08d8aafb882d344fb461682b722

A little bit after my export tool ran, I got a ping on Slack that one of the services that uses this Kafka topic was no longer working. I check the logs of that service, and sure enough, the same error:

kafka: error while consuming topic/0: kafka server: Message contents does not match its CRC.

Because this was a live running service, my first thought was to use the power of Kafka to shift the offset by 1 for this particular consumer so we can skip this corrupt message and get everything running again quickly.

kafka-consumer-groups.sh is a command line tool that let’s you do exactly this, except it doesn’t work on kafka-0.10.2. That was a little bit of a surprise to me, I assumed one of the coolest things about Kafka is that you can pick an offset to consume from. The libraries we were using also had no way to manually select an offset to start from.

Okay, crap, so I need to upgrade Kafka to shift the offsets. This is a good opportunity to get on 1.0.0 anyway and perhaps restarting the services will actually fix the problem. I update my Ansible scripts for kafka 1.0.1 and start reading the upgrade guide for a live running environment: https://kafka.apache.org/documentation/#upgrade.

Great, we’re on 1.0.1, but the problem still exists. I have the ability to move the offet over, so I shift the offset by 1.

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group "api-consumer-group" --topic "telemetry:0" --reset-offsets --shift-by 1 --execute

I restart the service and the error comes up again. Darn. So it’s not only one message that is corrupt, it’s a range, that’s not great. I decided to commit to this strategy because this service needed to be up and running for a live demo in the coming week.

So instead, my idea was to shift the offset by a larger number until I find a number that actually works. Once I find a number that works, I can do a manual binary search to find the exact offset where the corruption started and shift it by 1.

After doing a manual binary search, I have found that the corrupted records are between 23769420-23772231 inclusive and corrupted, so good messages begin at 23772232 so 2811 corrupted messages. I run ./kafka-consumer-groups.sh again, but this time I specify the exact offset to start from instead of using shift-by

I restart the service again, and it works! Great, let’s hope this monkey patch works until the live demo. Nope, the next day, the same message appears again.

At this point, I’ve had my fair share of complicated problems, and I have a deep gut instinct that it’s most likely not the code we’ve written because the problem started to appear on multiple other topics in the cluster and the problem was produced by multiple independent services.

Of course, the book Pragmatic Programmer still pops up in my head:

“select” Isn’t Broken:

It is rare to find a bug in the OS or the compiler, or even a third-party product or library. The bug is most likely in the application.

So I focus on starting from the application and then working outwards, I upgrade our libraries used in our code. I upgrade Sarama to 1.6.0 and I upgrade sarama-cluster to master. I run our deployment scripts and everything is running again.

I do the ridiculous offset binary-search trick again to shift everything and sure enough the issue comes up again with our Kafka libraries upgraded to the latest version. I decide to look at the logs of the broker themslelves and this is what I see;

[2018-03-17 20:11:58,551] ERROR [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error for partition telemetry-0 to broker 3:org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt. (kafka.server.ReplicaFetcherThread)
[2018-03-17 20:11:58,747] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition topic-2 to broker 2:org.apache.kafka.common.errors.CorruptRecordException: This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt. (kafka.server.ReplicaFetcherThread)

Looking at that error message, it appears that the replica thread inside of Kafka is running into exactly the same problem as our consumers. Uh oh, at this point I know it’s definitely not our application code, so much for that Pragmatic Programmer tip, gut instinct all the way!

There is a Kafka tool to let’s you see deeper into the health of your cluster ./kafka-topics.sh --zookeeper localhost:2181 --describe. Here’s the output:

Topic:telemetry PartitionCount:6 ReplicationFactor:3 Configs:
 Topic: telemetry Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2
 Topic: telemetry Partition: 1 Leader: 3 Replicas: 3,2,1 Isr: 3
 Topic: telemetry Partition: 2 Leader: 1 Replicas: 1,3,2 Isr: 1
 Topic: telemetry Partition: 3 Leader: 2 Replicas: 2,3,1 Isr: 2
 Topic: telemetry Partition: 4 Leader: 3 Replicas: 3,1,2 Isr: 3
 Topic: telemetry Partition: 5 Leader: 1 Replicas: 1,2,3 Isr: 1

If you look at the Isr column, it stands for In-Sync replica. It appears that this cluster is not healthy because only the leader broker is in sync while all the other brokers cannot replicate that partition. Our goal here is to get that Isr column back to 1,2,3.

So at this point, here are the options:

    • Hardware failure, such as disk failure, or network connectivity issues.
    • The cluster is misbehaving because of resource allocation issues, perhaps it’s going OOM, or we’re out disk space.
    • A bug with the Kafka version we were using
    • An OpenJDK bug

I decide to quickly look into our various servers to see if this is a resource allocation issue that is causing Kafka to misbehave. This was a red herring, one of the servers that was responsible for pushing to this topic actually filled its root partition and for a second I thought that might have been the issue, but that issue was fixed and the issue still remained.

The actual Kafka instances seemed to be fine, they were nowhere near capacity in terms of disk space, and the chances of having hardware failure across 3 machines was unlikely.

it gets a little confusing on what could be wrong and I start to realize that this is becoming a difficult problem and we have to get this live system working. I can’t just adjust things at random and hope that the problem is fixed. It’s time to dig deeper, it’s time to look at what Kafka is actually writing to disk.

Digging deeper

Kafka writes the messages it receives into a log folder. The log folder contains a folder for each topic and partition combo. This is how it might look for this particular topic we’re having issues with:

.
|-- telemetry-0
|   |-- 00000000000000000000.index
|   |-- 00000000000000000000.log
|   |-- 00000000000000000000.timeindex
|   |-- 00000000000003859610.index
|   |-- 00000000000003859610.log
|   |-- 00000000000003859610.snapshot
|   |-- 00000000000003859610.timeindex
|   |-- 00000000000008551431.index
|   |-- 00000000000008551431.log
|   |-- 00000000000008551431.snapshot
|   |-- 00000000000008551431.timeindex
|   |-- 00000000000012458429.snapshot
|   `-- leader-epoch-checkpoint
|-- telemetry-1
|   |-- 00000000000000000000.index
|   |-- 00000000000000000000.log
|   |-- 00000000000000000000.timeindex
|   |-- 00000000000003854233.index
|   |-- 00000000000003854233.log
|   |-- 00000000000003854233.timeindex
|   |-- 00000000000008541867.index
|   |-- 00000000000008541867.log
|   |-- 00000000000008541867.timeindex
|   `-- leader-epoch-checkpoint
|-- telemetry-2
|   |-- 00000000000000000000.index
|   |-- 00000000000000000000.log
|   |-- 00000000000000000000.timeindex
|   |-- 00000000000003850719.index
|   |-- 00000000000003850719.log
|   |-- 00000000000003850719.timeindex
|   |-- 00000000000008543680.index
|   |-- 00000000000008543680.log
|   |-- 00000000000008543680.timeindex
|   `-- leader-epoch-checkpoint

The .log file is where the messages we push to Kafka are stored. We’re going to attempt to extract the offset that is corrupt to see what exactly is corrupt about it. If you’re curious on learning about the internals of Kafka, check out this article, it was a lot simpler than I thought: https://thehoard.blog/how-kafkas-storage-internals-work-3a29b02e026

Kafka provides a tool, kafka.tools.DumpLogSegments that lets you dive into these log files and grab more details about individual records that are in the file. Hilariously enough, when I ran this tool, it bails right when it hits a corrupt message.

Exception in thread "main" org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller than minimum record overhead (14).````

Okay, so the GOOD record right before it hits that is this:

baseOffset: 71233 lastOffset: 71235 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 10285560 CreateTime: -1 isvalid: true size: 471 magic: 2 compresscodec: NONE crc: 491186814

A cool way to dump this on FreeBSD is to use the hexdump command line tool. The position is the file position of this record, and true size is how big the record is.

We run hd -s 10285560 -n 471 /var/db/kafka/influxdb-telemetry-0/00000000000000000000.log to dump the good record. If you want to actually read the header, you can find the wire format over at: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol.

009ce30d  00 00 00 00 00 01 16 27  00 00 01 4b 00 00 00 00  |.......'...K....|
009ce31d  02 ee 03 fd 04 00 00 00  00 00 01 ff ff ff ff ff  |................|
009ce32d  ff ff ff ff ff ff ff ff  ff ff ff ff ff ff ff ff  |................|
009ce33d  ff ff ff ff ff ff ff ff  ff 00 00 00 02 a8 02 00  |................|
009ce34d  00 00 01 9a 02 00 8b 09  89 54 4f d9 dd 82 b2 14  |.........TO.....|

The next record position is the last position + true size, in this case the offset would be 10286031.

The next record output:

$ hd -s 10286031 -n 471 /var/db/kafka/influxdb-telemetry-0/00000000000000000000.log

009cf3cf  00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 00  |................|
*
009cf59f

The star here indicates that every line is the same from 009cf3cf to 009cf59f. Why the hell is Kafka writing 471 zeroes to its log files? Did a buffer get flushed to disk early? What’s happening here?

At this point, I decided to open an issue with Kafka, this seems like a really bad bug and I hope I could get some insight from them. You can find my open issue here: https://issues.apache.org/jira/browse/KAFKA-6679, it still remains open.

Is it our filesystem? Is it our operating system?

We’re using ZFS in our environment, perhaps Kafka isn’t equipped to handle ZFS properly? Also, zfs has compression built in and we have it enabled. Let’s try switching the compression algorithm from LZ4 to gzip and restart the cluster.

The problem pops up again, and now I’m strongly convinced the issue is some sort of FreeBSD + ZFS + Kafka combination.

Trial by fire

At this point, I’m pretty annoyed by the problem, and no amount of adjusting of our existing environment will seem to fix the issue. Further, I’m convinced I can find the root of the issue if I continue to dig deeper, but I’m constrainted by having to get the live environment operational and I already have a strong feeling that switching to Linux will fix this issue.

I decided to deploy 3 new brokers and configure them exactly to spec, i.e, Ubuntu 16.04 LTS + ext4. These 3 brokers will join the existing cluster, increasing our cluster count from 3 machines to 6. If our FreeBSD brokers run into the issue again, we will elect our Linux brokers to become the new leaders and we will most likely not bring the FreeBSD brokers up again.

After the 3 Linux servers joined the cluster, I had to use the ./kafka-reassign-partitions.sh to replicate the entire cluster to the new servers. You can actually constrain how quickly it replicates which is really nice, this is what I ran:

./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/rp.json --throttle 50000000 --execute, where --throttle is 50MB/s and the JSON file looked something like this to replicate to all 6 partitions.

{
    "version":1,
    "partitions":[
        {"topic":"telemetry","partition":0,"replicas":[1,2,3,4,5,6]},
        {"topic":"telemetry","partition":1,"replicas":[1,2,3,4,5,6]},
        {"topic":"telemetry","partition":2,"replicas":[1,2,3,4,5,6]},
    ]
}

Of course the real JSON file included all the topics (including __consumer_offsets) topics.

Intermission: Wait a minute, where the hell did all my consumer offsets go?

Because our consumers to our cluster were down for more than 24 hrs, we lost all of our offsets! This has to be one of the most infuriating defaults of Kafka.

There is a Kafka config option called offsets.retention.minutes which defaults to 24 hours. If your consumer does not consume from a consumer group in that time, you lose all your offsets!

So after the new cluster was up and opertional, all the consumers started consuming messages from the beginning of their topic.

I hear this one is being fixed: https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days but who in the world would think this was a sane default?

I ended up recovering all the offsets, some of them by re-running everything, and some of them by looking through log files. The fun never ends!

Our consumers are in the right place now, did we fix it?

Sure enough, the FreeBSD servers had the same issue 24 hrs later, while the Linux servers did not have the issue at all. Success!

I shutdown the FreeBSD servers and decomissioned the FreeBSD servers:

{
    "version":1,
    "partitions":[
        {"topic":"telemetry","partition":0,"replicas":[4,5,6]},
        {"topic":"telemetry","partition":1,"replicas":[4,5,6]},
        {"topic":"telemetry","partition":2,"replicas":[4,5,6]},
    ]
}

This sucked because our Kafka cluster took only 64gb of disk space on Linux + zfs while it’s now taking 260gb of disk space on Linux + ext4.

Advice and next steps

I recommend that you stay far away from running Kafka on FreeBSD. Stick to running it on an operating system and a file system that it is well tested on.

Hopefully the Kafka devs, or I can take the time to find the root cause of the issue and fix it so future users do not have to deal with it.

At minimum, I think Kafka devs should advice users to stay far away from FreeBSD until this problem is fixed.