Durable event streams with Kafka clustering
Description
https://kafka.apache.org/documentation/#intro_streaming
https://youtu.be/FKgi3n-FyNU
services:
zookeeper (state) https://zookeeper.apache.org/doc/current/zookeeperStarted.html
kafka
Prerequisites
configure hosts files or add to DNS:
cat > /etc/hosts < EOF
192.168.0.31 kafka-1.home.lan
192.168.0.32 kafka-2.home.lan
192.168.0.33 kafka-3.home.lan
EOF
install software
yum install java-1.8.0-openjdk -y
curl -O http://mirror.vorboss.net/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
tar xvf kafka_2.13-2.6.0.tgz
ZooKeeper
minimum 3 brokers 5 recommended for zookeeper.
Configure zookeeper
cat > config/my-zookeeper.properties < EOF
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
dataLogDir=/var/log/zookeeper/
server.1=192.168.0.31:2888:3888
server.2=192.168.0.32:2888:3888
server.3=192.168.0.33:2888:3888
EOF
For each kafka host add a unique id matching the host number
mkdir /var/lib/zookeeper/
echo "1" > /var/lib/zookeeper/myid
Start each zookeeper instance in a terminal
bin/zookeeper-server-start.sh config/my-zookeeper.properties
Kafka cluster
Configure each kafka instance, the config is identical except for the broker id which must be unique, again match the hostname number.
cat > config/my-server.properties < EOF
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.31:2181,192.168.0.32:2181,192.168.0.33:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF
Start the kafka instances
bin/kafka-server-start.sh config/my-server.properties
Topics
Create a replicated topic
[root@kafka-1 kafka_2.13-2.6.0]# bin/kafka-topics.sh --create \
> --zookeeper localhost:2181 \
> --topic testing \
> --partitions 2 \
> --replication-factor 2
Created topic testing.
[root@kafka-3 kafka_2.13-2.6.0]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
testing
Producer and consumer
Connect to 2 instances and start a producer and consumer. In the producer send a message line, this will be printed to the consumer.
[root@kafka-1 kafka_2.13-2.6.0]# bin/kafka-console-producer.sh --topic testing --bootstrap-server localhost:9092
>hello
[root@kafka-3 kafka_2.13-2.6.0]# bin/kafka-console-consumer.sh --topic testing --from-beginning --bootstrap-server localhost:9092
hello
## Production
Tuning
swapiness = 1
vm.dirty_background_ratio = 5 (from d 10) vm.dirty_ratio = 60 to 80 (from d 20)
checking > egrep “dirty | writeback” /proc/vmstat |
xfs best but ext4 option.
noatime for mounts
networking
net.core.wmem_default = 2097152 (128 to 2MB) net.core.rmem_default = 2097152 (128 to 2MB) net.core.rmem_max = “4096 65536 2048000” net.core.wmem_max = “4096 65536 2048000” net.ipv4.tcp_window_scaling = 1 #net.ipv4.tcp_max_syn_backlog = 1024 #net.core.netdev_max_backlog = 1000
Java - Garbage First (G1) collector.
Adjects to workload with consistent GC pause times. Example 64GB broker with 5GB JVM.
export JAVA_HOME=/usr/java/jdk1.8.0_51 export KAFKA_JVM_PERFORMANCE_OPTS=”-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true”
stream processing
https://github.com/robinhood/faust