Настройка Kafka в Docker — создание узла в compose и конфигурация кластера

Настройка Kafka в Docker

Рассматриваем процесс настройки Kafka с использованием Docker и docker-compose, разбираем, как запустить один узел и целый кластер Kafka, а также говорим о конфигурации Producer и Consumer.

Введение

Kafka — это брокер сообщений в стриминговом режиме. Он обеспечивает обмен информацией между сервисами в режиме реального времени. Мы уже подробно рассказывали о технологии Kafka в отдельном тексте, поэтому здесь сосредоточимся на практической настройке.

Если вы не знакомы с Docker, рекомендуем пройти бесплатный курс «Docker с нуля: как работают контейнеры и зачем они нужны». В нем рассказываем, что такое Docker, как запускать контейнеры, собирать образы и использовать Docker Compose, а еще разберемся, чем технология отличается от Kubernetes. Все материалы подкреплены практическими примерами и будут понятны для начинающих.

Настройка одного узла

Первый шаг — запустить сервер Kafka в единственном экземпляре, чтобы убедиться в работоспособности. После этого можно переходить к полноценной реализации кластера.

Запуск сервера ZooKeeper в docker-compose.yml

Рассмотрим, что такое ZooKeeper и зачем он нужен. Apache Kafka использует ZooKeeper для управления ключевыми аспектами работы кластера. ZooKeeper выполняет несколько важных функций.

  • Хранение метаданных о брокерах, топиках и партициях: структуру кластера, список брокеров, какие топики и партиции настроены, какой брокер за что отвечает.
  • Координация лидеров партиций: выбор лидеров для партиций и поддержание согласованности при сбоях, например, если один из брокеров выходит из строя.
  • Хранение информации о доступности и состоянии брокеров: отслеживание статусов всех узлов в кластере и быстрое восстановление работы при ошибках.
  • Синхронизация настроек и событий: все компоненты Kafka получают актуальную информацию о состоянии кластера и синхронизируются между собой.

До версии 2.8 включительно запуск кластера Kafka без ZooKeeper невозможен — он является обязательным внешним сервисом для корректной работы распределенной архитектуры Kafka.

На сервере или виртуальной машине создадим docker-compose.yml и добавим сервис ZooKeeper:


      version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1

Порты для ZooKeeper и Kafka

Так как мы разворачиваем ZooKepeer и Kafka через Docker, важно правильно настроить проброс портов до контейнеров. Это обеспечит взаимодействие между узлами кластера, доступ к ним из других сервисов и извне.

Стандартные порты для наших компонентов:

  • ZooKeeper: 2181 — основной порт, через который Kafka-брокеры и администраторы взаимодействуют с ZooKeeper;
  • Kafka: 9092 — основной порт для связи клиентов (продюсеров, консьюмеров, инструментов мониторинга) с брокером Kafka. Каждый следующий узел Kafka может использовать свой порт (например, 9093 для второго узла).

В примере выше мы пробросили порт 2181 для ZooKeeper. Это значит, что порт контейнера будет доступен на том же порту хоста (нашего сервера). Все остальные контейнеры в одной Docker-сети смогут обращаться к ZooKeeper по внутренним именам и стандартным портам. Например, узлы Kafka будут обращаться к ZooKeper по адресу zookeeper:2181.

Запуск сервера Kafka

1. Добавим Kafka в секцию services нашего docker-compose.yml:


      services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zookeeper:2888:3888

  kafka1:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://{{ VM_IP_ADDRESS }}:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: 
"kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
    depends_on:
      - zookeeper

Важно: не забудьте заменить VM_IP_ADDRESS на IP-адрес вашей виртуальной машины.

2. Запустим контейнеры следующей командой, где ключ -d запускает их в фоновом режиме:

docker-compose up -d

Подключение с помощью Offset Explorer

Offset Explorer (ранее Kafka Tool) — популярная утилита для работы с кластерами Kafka. Она позволяет:

  • визуализировать архитектуру кластера;
  • просматривать существующих брокеров, группы получателей, отправки сообщений и задержки у консьюмеров;
  • читать сообщения;
  • управлять топиками, группами и ACL.

Для начала скачайте с официального сайта приложение под вашу ОС. После установки нужно лишь добавить новое подключение: укажите адрес вашей ВМ с развернутым брокером Kafka, проброшенный порт 9092, а также параметры подключения к ZooKeeper. 

После подключения вы увидите свой узел Kafka и сможете управлять им через графический интерфейс.

Скриншот интерфейса Offset Explorer.
Интерфейс Offset Explorer. Источник.

Настройка кластера

Один брокер подходит для тестов, но для отказоустойчивости и масштабируемости нужен кластер. Теперь добавим новые узлы.

Добавление брокеров (узлов)

Ранее в docker-compose.yml для переменной KAFKA_BROKER_ID мы указали значение 1. Этот параметр должен быть уникальным для каждого узла в кластере для внутренней координации брокеров. 

Добавим еще два узла в нашу инсталляцию. Для каждого нужно добавить дополнительную секцию в разделе services файла docker-compose.yml:


      kafka2:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://{{ VM_IP_ADDRESS }}:9093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: 
"kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
    depends_on:
      - zookeeper

  kafka3:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://{{ VM_IP_ADDRESS }}:9094
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
    depends_on:
      - zookeeper

Примечание: параметр KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR должен быть ≤ числу брокеров. При одном узле ставьте 1, при трех — можно использовать 3.

Создание Producer и Consumer

В docker-compose.yml мы указали все необходимое для работы кластера. Теперь нужно проверить обмен сообщениями в нашей инсталляции.

В реальных сценариях в качестве Producer и Consumer могут выступать различные сервисы. Рассмотрим на нескольких примерах.

  • Устройства умного дома отправляют данные телеметрии, а сервер управления умным домом их считывает.
  • Информация о сделках на бирже отправляется в кластер Kafka, а веб-приложение выступает получателем и отображает новые сделки.
  • Сервисы отправляют свои логи в Kafka, а Elasticsearch собирает данные с узлов Kafka.
  • Сервис аналитики отправляет события в Kafka, а ClickHouse считывает их для отчетов в реальном времени.

Но в рамках инструкции не будем работать с подобной инфраструктурой, а создадим простые Producer и Consumer. Продюсер будет раз в 30 секунд отправлять сообщение с порядковым номером, а консьюмер — его считывать.

Для начала организуем директорию для создания Producer и Consumer и добавим в нее две соответствующие директории. Также в каждой из создадим Dockerfile и файл формата .py с названием директории:


      ./producer/Dockerfile
./producer/producer.py
./consumer/Dockerfile
./consumer/consumer.py 

Подготовка Producer

В Dockerfile продюсера (producer/Dockerfile) добавим фрагмент:


      FROM python:3.10-slim
WORKDIR /app
COPY producer.py .
RUN pip install kafka-python
CMD ["python", "producer.py"]

В файл producer/producer.py:


      import time
import sys
from kafka import KafkaProducer

bootstrap_servers = [
    '{{ VM_IP_ADDRESS }}:9092',
    '{{ VM_IP_ADDRESS }}:9093',
    '{{ VM_IP_ADDRESS }}:9094'
]

producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
msg_number = 1

while True:
    message = f'This is {msg_number} message'.encode()
    producer.send('events', message)
    producer.flush()
    print(f"Sent: {message.decode()}", flush=True)
    msg_number += 1
    time.sleep(30)

Важно: не забудьте заменить VM_IP_ADDRESS на IP-адрес вашей виртуальной машины.

Подготовка Consumer

В consumer/Dockerfile добавим следующий фрагмент:


      FROM python:3.10-slim
WORKDIR /app
COPY consumer.py .
RUN pip install kafka-python
CMD ["python", "consumer.py"]

В файл consumer/consumer.py добавим:


      import sys
from kafka import KafkaConsumer

bootstrap_servers = [
    '{{ VM_IP_ADDRESS }}:9092',
    '{{ VM_IP_ADDRESS }}:9093',
    '{{ VM_IP_ADDRESS }}:9094'
]

consumer = KafkaConsumer(
    'events',
    bootstrap_servers=bootstrap_servers,
    group_id='test-group',
    auto_offset_reset='earliest'
)

for msg in consumer:
    print(f"[{msg.topic}] partition={msg.partition}, "
          f"offset={msg.offset}, value={msg.value.decode()}",
          flush=True)

Параметр auto_offset_reset='earliest' позволяет прочитать все старые сообщения из топика, даже если консьюмер запущен позже продюсера.

Примечание: не забудьте заменить VM_IP_ADDRESS на IP вашей виртуальной машины.

Добавление producer и consumer в Docker Compose

Совсем скоро мы сможем запустить наш кластер и проверить его работу, но перед этим важно контейниризировать наши Producer и Consumer. Для этого в текущей директории создадим docker-compose.yml и добавим содержимое:


      services:
  producer:
	build: ./producer
  consumer:
	build: ./consumer
	depends_on:
  	- producer

Запуск кластера с docker-compose

Все готово к запуску кластера. На хосте с Kafka перейдем в директорию с docker-compose.yml и запустим наш кластер командой:


      docker compose up -d

Далее на нашем ПК откроем приложение Offset Explorer и добавим в параметры подключения остальные узлы Kafka с портами 9093 и 9094 соответственно. 

Подключение к Kafka из другого Docker-контейнера

Как только в Offset Explorer будут видны все три брокера, запустим Producer и Consumer. Для этого выполним в директории с docker-compose.yml команду:


      docker compose up -d

После запуска вернемся в Offset Explorer и перейдем в Topics → events → Partitions → Partition 0 → Data — здесь нажмем кнопку Retrieve Messages. После этого увидим таблицу с сообщениями, которые Producer уже успел записать в топик. 

В Consumers → test-group → Offsets нажмем Refresh Offsets. Появится статистика по тому, сколько сообщений считал Сonsumer.

Альтернативно можно просмотреть логи прямо из консоли:


      docker compose logs -f

Это позволит отслеживать обмен сообщениями между Producer и Consumer в реальном времени.

Скриншот консоли.
Просмотр логов в консоли.

Остановка Kafka в Docker

Остановить работу кластера можно всего одной командой. Ее нужно выполнить в той же директории, где лежит наш docker-compose.yml для Kafka:


      docker compose down

Аналогичные действия будут и для остановки созданных Producer и Consumer.

Apache Kafka на инфраструктуре Selectel

Что делать, если нет своей виртуальной машины для развертывания кластера Kafka? В этом случае можно воспользоваться инфраструктурой Selectel и развернуть ВМ нужной конфигурации. Дополнительно вы получите удобную панель управления, мониторинг, управление бэкапами, SLA и многое другое.

Но внедрение Kafka может привнести целый ряд технических сложностей, требующих внимательного и компетентного инженерного подхода. Если вы хотите полностью снять с себя заботы по настройке, обслуживанию и оптимизации производительности, используйте облако для Kafka (Kafka-as-a-service). 

Принцип работы решения.
Принцип работы решения.

Среди преимуществ Kafka-as-a-Service: 

  • легкое и надежное масштабирование,
  • высокая производительность,
  • потоковая обработка данных в реальном времени,
  • простая интеграция с другими системами,
  • хранение больших объемов данных.

Сервис разворачивается за несколько кликов в панели управления.

1. Заходим в раздел Облачная платформа → Базы данных и нажимаем Создать кластер.

Скриншот панели управления Selectel. Создание кластера.

2. Вводим имя, выбираем регион и пул. В поле СУБД указываем Kafka.

Скриншот панели управления Selectel. Конфигурирование кластера.

3. Настраиваем конфигурацию, указываем количество vCPU, RAM и размер диска. При необходимости изменить конфигурацию нод можно и после создания кластера.

4. Выбираем или создаем подсеть: приватную — если доступ из интернета не нужен (можно подключить статический публичный IP) или публичную — тогда все адреса публичной подсети будут доступны из интернета. 

5. Проверяем цену и нажимаем Создать кластер

Скриншот панели управления Selectel.

Кластер будет готов к работе, когда перейдет в статус ACTIVE. Подробнее о подключении к кластеру, управлении топиками, мониторинге кластера и не только — в документации. Внутри вы найдете множество пошаговых инструкций, примеров и полезных советов по использованию Kafka-as-a-Service.

Заключение

Мы последовательно прошли путь от развертывания одного брокера Kafka до полноценного кластера с несколькими узлами. Настроили Producer и Consumer, убедились в корректной передаче сообщений и узнали, как контролировать работу кластера через Offset Explorer и логи контейнеров.

При этом рассмотрели альтернативный сценарий — использование Kafka-as-a-Service. Такой подход снимает с разработчиков необходимость управлять инфраструктурой вручную и позволяет сосредоточиться на логике приложений. Вы можете начать с небольшого тестового кластера, а затем масштабировать его под растущие нагрузки, не теряя времени на рутину.