Description

https://kafka.apache.org/documentation/#intro_streaming

https://youtu.be/FKgi3n-FyNU

services:

zookeeper (state) https://zookeeper.apache.org/doc/current/zookeeperStarted.html

kafka

Prerequisites

configure hosts files or add to DNS:

cat > /etc/hosts < EOF
192.168.0.31 kafka-1.home.lan
192.168.0.32 kafka-2.home.lan
192.168.0.33 kafka-3.home.lan
EOF

install software

yum install java-1.8.0-openjdk -y
curl -O http://mirror.vorboss.net/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz
tar xvf kafka_2.13-2.6.0.tgz 

ZooKeeper

minimum 3 brokers 5 recommended for zookeeper.

Configure zookeeper

cat > config/my-zookeeper.properties < EOF
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
dataLogDir=/var/log/zookeeper/
server.1=192.168.0.31:2888:3888
server.2=192.168.0.32:2888:3888
server.3=192.168.0.33:2888:3888
EOF

For each kafka host add a unique id matching the host number

mkdir /var/lib/zookeeper/
echo "1" > /var/lib/zookeeper/myid

Start each zookeeper instance in a terminal

bin/zookeeper-server-start.sh config/my-zookeeper.properties

Kafka cluster

Configure each kafka instance, the config is identical except for the broker id which must be unique, again match the hostname number.

cat > config/my-server.properties < EOF
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.31:2181,192.168.0.32:2181,192.168.0.33:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

Start the kafka instances

bin/kafka-server-start.sh config/my-server.properties

Topics

Create a replicated topic

[root@kafka-1 kafka_2.13-2.6.0]# bin/kafka-topics.sh --create \
>     --zookeeper localhost:2181 \
>     --topic testing \
>     --partitions 2 \
>     --replication-factor 2
Created topic testing.
[root@kafka-3 kafka_2.13-2.6.0]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
testing

Producer and consumer

Connect to 2 instances and start a producer and consumer. In the producer send a message line, this will be printed to the consumer.

[root@kafka-1 kafka_2.13-2.6.0]# bin/kafka-console-producer.sh --topic testing --bootstrap-server localhost:9092
>hello
[root@kafka-3 kafka_2.13-2.6.0]# bin/kafka-console-consumer.sh --topic testing --from-beginning --bootstrap-server localhost:9092
hello

## Production

Tuning

swapiness = 1

vm.dirty_background_ratio = 5 (from d 10) vm.dirty_ratio = 60 to 80 (from d 20)

checking > egrep “dirty writeback” /proc/vmstat

xfs best but ext4 option.

noatime for mounts

networking

net.core.wmem_default = 2097152 (128 to 2MB) net.core.rmem_default = 2097152 (128 to 2MB) net.core.rmem_max = “4096 65536 2048000” net.core.wmem_max = “4096 65536 2048000” net.ipv4.tcp_window_scaling = 1 #net.ipv4.tcp_max_syn_backlog = 1024 #net.core.netdev_max_backlog = 1000

Java - Garbage First (G1) collector.

Adjects to workload with consistent GC pause times. Example 64GB broker with 5GB JVM.

export JAVA_HOME=/usr/java/jdk1.8.0_51 export KAFKA_JVM_PERFORMANCE_OPTS=”-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true”

stream processing

https://github.com/robinhood/faust

Conclusion

Refereneces