Работа с Kafka в Unix/Linux

Хотелось бы сделать себе заметку (что-то типа cheat-sheet) с командами для работе с Kafka. Сейчас я приведу готовые примеры использования команд на готовых примера можно будет увидеть работу самой кафки.

Установка Kafka скриптов в Unix/Linux

Чтобы использовать скрипты для работы с Кафка, выполним небольшую установку:

$ KAFKA_VERSION=2.12-2.2.1 && \
wget -q -O - https://www-us.apache.org/dist/kafka/2.2.1/kafka_${KAFKA_VERSION}.tgz | (cd /opt; tar -zxvf -) && cd /opt/kafka_${KAFKA_VERSION}

Для простоты использования, можно добавить PATH (в ~/.bashrc):

export KAFKA_VERSION=2.12-2.2.1
export KAFKA_HEAP_OPTS="-Xmx1g -Xms1g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
export PATH="$PATH:/opt/kafka_${KAFKA_VERSION}/bin"

После этого, стоит обновить файл:

$ . ~/.bashrc

Или:

$ source ~/.bashrc

Полезное чтиво:

Установка Kafka в Unix/Linux

Перейдем к работе!

Работа с Kafka в Unix/Linux

Вывести список топиков:

$ kafka-topics.sh --list \
    --zookeeper zookeeper_host:zookeeper_host_port

Где:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Так же, можно использовать:

$ kafka-topics.sh --list \
    --zookeeper $(cat ZooKeeperList.txt)

Где:

  • ZooKeeperList.txt — это список хостов самого зукипера (zookeeper:port).

Получить информацию о топике:

$ kafka-topics.sh --describe \
     --zookeeper zookeeper_host:zookeeper_host_port \
     --topic test_topic

Где:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Создание топика:

$ kafka-topics.sh --create \
    --zookeeper $(cat ZooKeeperList.txt) \
    --topic test_topic \
    --replication-factor 2 \
    --partitions 3

Вот небольшой скрипт для создания топиков:

ENV=nonprod


# 1: A name for the topic
# 2: A replication factor
# 3: A partition
KAFKA_TOPIC_LIST=(
    "MyKafkaTopic:3:1"
    "MyKafkaTopic2:3:1"
    )


for topic in "${KAFKA_TOPIC_LIST[@]}" ; do
    a_topic=$(echo "$topic"| cut -d ":" -f1)
    a_replication_factor=$(echo "$topic"| cut -d ":" -f2)
    a_partition=$(echo "$topic"| cut -d ":" -f3)

    kafka-topics.sh --create \
        --zookeeper $(cat /opt/kafka_2.12-2.2.1/mskkafkatest-${ENV}_ZooKeeperList.txt) \
        --topic $a_topic \
        --replication-factor $a_replication_factor \
        --partitions $a_partition
done;

Записать в топик меседжы:

$ kafka-console-producer.sh \
--broker-list $(cat BrokersList.txt) \
--producer.config producer.properties \
--topic topic_name_here

Прочитать с топика меседжы:

$ kafka-console-consumer.sh \
    --bootstrap-server $(cat BrokersList.txt) \
    --consumer.config producer.properties \
    --topic topic_name_here \
    --from-beginning

Удалить топик:

$ kafka-topics.sh --delete \
   --zookeeper zookeeper_host:zookeeper_host_port \
   --topic test_topic

Где:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Так, написал небольшой скрипт для удаление:

#--------------
# Delete topics
#--------------
ENV=nonprod


KAFKA_TOPIC_LIST=(
    "MyKafkaTopic2"
    "test"
    "test2"
    )


for topic in "${KAFKA_TOPIC_LIST[@]}" ; do
    echo "Deleting ${topic} ....."
    kafka-topics.sh --delete \
        --zookeeper $(cat ZooKeeperList.txt) \
        -topic $topic
done;

Добавить партицию:

$ kafka-topics.sh --alter \
    --zookeeper zookeeper_host:zookeeper_host_port \
    --topic test_topic \
    --partitions 3

Где:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Посмотреть конфиг для топика:

$ kafka-configs.sh --describe \
      --zookeeper zookeeper_host:zookeeper_host_port \
      --entity-type topics \
      --entity-name test_topic

Где:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Установить время хранения (не рекомендуется) записей в топике:

$ kafka-topics.sh --alter \
    --zookeeper zookeeper_host:zookeeper_host_port \
    --topic test_topic \
    --config retention.ms=1000

Где:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Установить время хранения (современный способ) записей в топике:

$ kafka-configs.sh --alter \
      --zookeeper zookeeper_host:zookeeper_host_port \
      --entity-type topics \
      --entity-name test_topic \
      --add-config retention.ms=1000

Где:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Если вам нужно удалить все сообщения в теме, вы можете использовать данную проперти и задать время хранения. Сначала установите время хранения на очень низкое значение (1000 мс), подождите несколько секунд, а затем верните время хранения обратно к предыдущему значению.

Примечание: Время хранения по умолчанию составляет 24 часа (86400000 миллисекунд).

Можно вернуть все как и было:

$ kafka-topics.sh --alter \
     --zookeeper zookeeper_host:zookeeper_host_port \
     --topic mytopic \
     --delete-config retention.ms

Показать список сообщений для конкретного топика:

$ kafka-console-consumer.sh \
    --zookeeper zookeeper_host:zookeeper_host_port \
    --topic test_topic \
    --from-beginning

Где:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Или:

$ kafka-console-consumer.sh \
    --bootstrap-server $(cat BrokersList.txt) \
    --consumer.config producer.properties \
    --topic jive_invoices_staging \
    --from-beginning

Чтобы просмотреть offset позиции для consumer группы (для каждой из партиций):

$ kafka-consumer-offset-checker.sh \
     --zookeeper zookeeper_host:zookeeper_host_port \
     --group group_ID_here \
     --topic your_topic_here

Где:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Чтобы начать заново (сбросьте смещение на 0):

$ kafka-streams-application-reset.sh \
    --input-topics your_topic_here \
    --application-id group_ID_here \
    --bootstrap-servers bootstrap_host:bootstrap_port

Где:

  • your_topic_here — Топик.
  • group_ID_here — ИД группы.
  • bootstrap_host — Хост.
  • bootstrap_port — Порт.

Получить самое раннее смещение в топике:

$ kafka-run-class.sh kafka.tools.GetOffsetShell \
       --broker-list localhost:9092 \
       --topic mytopic \
       --time -2

Получить последнее смещение еще в топике:

$ kafka-run-class.sh kafka.tools.GetOffsetShell \
      --broker-list localhost:9092 \
      --topic mytopic \
      --time -1

Получить потребительские смещения (consumer offsets) для топика:

$ kafka-consumer-offset-checker.sh \
     --zookeeper=zookeeper_host:zookeeper_host_port \
     --topic=mytopic \
     --group=my_consumer_group

Где:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Считать из __consumer_offsets:

$ kafka-console-consumer.sh \
      --consumer.config config/consumer.properties \
      --from-beginning \
      --topic __consumer_offsets \
      --zookeeper zookeeper_host:zookeeper_host_port \
      --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

Где:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Вывести список consumer групп:

$ kafka-consumer-groups.sh --list \
     --zookeeper zookeeper_host:zookeeper_host_port 

Или:

kafka-consumer-groups.sh --list \
      --new-consumer \
     --bootstrap-server localhost:9092 

Просмотр сведений о consumer группе:

$ kafka-consumer-groups.sh --describe \
    --zookeeper zookeeper_host:zookeeper_host_port \
    --group group_name

Где:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Посмотр сообщений в топике:

$ kafkacat -C -b localhost:9092 -t mytopic -p 0 -o -5 -e

Или:

$ kafka-console-consumer.sh \
    --bootstrap-server $(cat BrokersList.txt) \
    --consumer.config producer.properties \
    --topic test2 \
    --from-beginning

Чтобы записать в топик:

$ kafka-console-producer.sh \
    --broker-list $(cat BrokersList.txt) \
    --producer.config producer.properties \
    --topic test2

Запускаем Zookeeper shell:

$ zookeeper-shell.sh \
     zookeeper_host:zookeeper_host_port

Провести перформенс тесты:

$ kafka-producer-perf-test.sh \
  --topic topic_here \
  --num-records 1 \
  --record-size 2048 \
  --throughput -1 \
  --producer.config producer.properties \
  --producer-props \
    acks=1 \
    buffer.memory=67108864 \
    compression.type=none \
    batch.size=8196

Вот и все, статья «Работа с Kafka в Unix/Linux» завершена.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *

Этот сайт использует Akismet для борьбы со спамом. Узнайте, как обрабатываются ваши данные комментариев.