
Хотелось бы сделать себе заметку (что-то типа cheat-sheet) с командами для работе с Kafka. Сейчас я приведу готовые примеры использования команд на готовых примера можно будет увидеть работу самой кафки.
Установка Kafka скриптов в Unix/Linux
Чтобы использовать скрипты для работы с Кафка, выполним небольшую установку:
$ KAFKA_VERSION=2.12-2.2.1 && \
wget -q -O - https://www-us.apache.org/dist/kafka/2.2.1/kafka_${KAFKA_VERSION}.tgz | (cd /opt; tar -zxvf -) && cd /opt/kafka_${KAFKA_VERSION}
Для простоты использования, можно добавить PATH (в ~/.bashrc):
export KAFKA_VERSION=2.12-2.2.1
export KAFKA_HEAP_OPTS="-Xmx1g -Xms1g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
export PATH="$PATH:/opt/kafka_${KAFKA_VERSION}/bin"
После этого, стоит обновить файл:
$ . ~/.bashrc
Или:
$ source ~/.bashrc
Полезное чтиво:
Перейдем к работе!
Работа с Kafka в Unix/Linux
Вывести список топиков:
$ kafka-topics.sh --list \
--zookeeper zookeeper_host:zookeeper_host_port
Где:
- zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
- zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.
Так же, можно использовать:
$ kafka-topics.sh --list \
--zookeeper $(cat ZooKeeperList.txt)
Где:
- ZooKeeperList.txt — это список хостов самого зукипера (zookeeper:port).
Получить информацию о топике:
$ kafka-topics.sh --describe \
--zookeeper zookeeper_host:zookeeper_host_port \
--topic test_topic
Где:
- zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
- zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.
Создание топика:
$ kafka-topics.sh --create \
--zookeeper $(cat ZooKeeperList.txt) \
--topic test_topic \
--replication-factor 2 \
--partitions 3
Вот небольшой скрипт для создания топиков:
ENV=nonprod
# 1: A name for the topic
# 2: A replication factor
# 3: A partition
KAFKA_TOPIC_LIST=(
"MyKafkaTopic:3:1"
"MyKafkaTopic2:3:1"
)
for topic in "${KAFKA_TOPIC_LIST[@]}" ; do
a_topic=$(echo "$topic"| cut -d ":" -f1)
a_replication_factor=$(echo "$topic"| cut -d ":" -f2)
a_partition=$(echo "$topic"| cut -d ":" -f3)
kafka-topics.sh --create \
--zookeeper $(cat /opt/kafka_2.12-2.2.1/mskkafkatest-${ENV}_ZooKeeperList.txt) \
--topic $a_topic \
--replication-factor $a_replication_factor \
--partitions $a_partition
done;
Записать в топик меседжы:
$ kafka-console-producer.sh \
--broker-list $(cat BrokersList.txt) \
--producer.config producer.properties \
--topic topic_name_here
Прочитать с топика меседжы:
$ kafka-console-consumer.sh \
--bootstrap-server $(cat BrokersList.txt) \
--consumer.config producer.properties \
--topic topic_name_here \
--from-beginning
Удалить топик:
$ kafka-topics.sh --delete \
--zookeeper zookeeper_host:zookeeper_host_port \
--topic test_topic
Где:
- zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
- zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.
Так, написал небольшой скрипт для удаление:
#--------------
# Delete topics
#--------------
ENV=nonprod
KAFKA_TOPIC_LIST=(
"MyKafkaTopic2"
"test"
"test2"
)
for topic in "${KAFKA_TOPIC_LIST[@]}" ; do
echo "Deleting ${topic} ....."
kafka-topics.sh --delete \
--zookeeper $(cat ZooKeeperList.txt) \
-topic $topic
done;
Добавить партицию:
$ kafka-topics.sh --alter \
--zookeeper zookeeper_host:zookeeper_host_port \
--topic test_topic \
--partitions 3
Где:
- zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
- zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.
Посмотреть конфиг для топика:
$ kafka-configs.sh --describe \
--zookeeper zookeeper_host:zookeeper_host_port \
--entity-type topics \
--entity-name test_topic
Где:
- zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
- zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.
Установить время хранения (не рекомендуется) записей в топике:
$ kafka-topics.sh --alter \
--zookeeper zookeeper_host:zookeeper_host_port \
--topic test_topic \
--config retention.ms=1000
Где:
- zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
- zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.
Установить время хранения (современный способ) записей в топике:
$ kafka-configs.sh --alter \
--zookeeper zookeeper_host:zookeeper_host_port \
--entity-type topics \
--entity-name test_topic \
--add-config retention.ms=1000
Где:
- zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
- zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.
Если вам нужно удалить все сообщения в теме, вы можете использовать данную проперти и задать время хранения. Сначала установите время хранения на очень низкое значение (1000 мс), подождите несколько секунд, а затем верните время хранения обратно к предыдущему значению.
Примечание: Время хранения по умолчанию составляет 24 часа (86400000 миллисекунд).
Можно вернуть все как и было:
$ kafka-topics.sh --alter \
--zookeeper zookeeper_host:zookeeper_host_port \
--topic mytopic \
--delete-config retention.ms
Показать список сообщений для конкретного топика:
$ kafka-console-consumer.sh \
--zookeeper zookeeper_host:zookeeper_host_port \
--topic test_topic \
--from-beginning
Где:
- zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
- zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.
Или:
$ kafka-console-consumer.sh \
--bootstrap-server $(cat BrokersList.txt) \
--consumer.config producer.properties \
--topic jive_invoices_staging \
--from-beginning
Чтобы просмотреть offset позиции для consumer группы (для каждой из партиций):
$ kafka-consumer-offset-checker.sh \
--zookeeper zookeeper_host:zookeeper_host_port \
--group group_ID_here \
--topic your_topic_here
Где:
- zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
- zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.
Чтобы начать заново (сбросьте смещение на 0):
$ kafka-streams-application-reset.sh \
--input-topics your_topic_here \
--application-id group_ID_here \
--bootstrap-servers bootstrap_host:bootstrap_port
Где:
- your_topic_here — Топик.
- group_ID_here — ИД группы.
- bootstrap_host — Хост.
- bootstrap_port — Порт.
Получить самое раннее смещение в топике:
$ kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic mytopic \
--time -2
Получить последнее смещение еще в топике:
$ kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic mytopic \
--time -1
Получить потребительские смещения (consumer offsets) для топика:
$ kafka-consumer-offset-checker.sh \
--zookeeper=zookeeper_host:zookeeper_host_port \
--topic=mytopic \
--group=my_consumer_group
Где:
- zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
- zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.
Считать из __consumer_offsets:
$ kafka-console-consumer.sh \
--consumer.config config/consumer.properties \
--from-beginning \
--topic __consumer_offsets \
--zookeeper zookeeper_host:zookeeper_host_port \
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
Где:
- zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
- zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.
Вывести список consumer групп:
$ kafka-consumer-groups.sh --list \
--zookeeper zookeeper_host:zookeeper_host_port
Или:
kafka-consumer-groups.sh --list \
--new-consumer \
--bootstrap-server localhost:9092
Просмотр сведений о consumer группе:
$ kafka-consumer-groups.sh --describe \
--zookeeper zookeeper_host:zookeeper_host_port \
--group group_name
Где:
- zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
- zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.
Посмотр сообщений в топике:
$ kafkacat -C -b localhost:9092 -t mytopic -p 0 -o -5 -e
Или:
$ kafka-console-consumer.sh \
--bootstrap-server $(cat BrokersList.txt) \
--consumer.config producer.properties \
--topic test2 \
--from-beginning
Чтобы записать в топик:
$ kafka-console-producer.sh \
--broker-list $(cat BrokersList.txt) \
--producer.config producer.properties \
--topic test2
Запускаем Zookeeper shell:
$ zookeeper-shell.sh \
zookeeper_host:zookeeper_host_port
Провести перформенс тесты:
$ kafka-producer-perf-test.sh \
--topic topic_here \
--num-records 1 \
--record-size 2048 \
--throughput -1 \
--producer.config producer.properties \
--producer-props \
acks=1 \
buffer.memory=67108864 \
compression.type=none \
batch.size=8196
Вот и все, статья «Работа с Kafka в Unix/Linux» завершена.