Как работать с темами в Kafka
Разбираем, что такое топики (темы) и партиции в Kafka и как с ними работать.
Что такое «партиция», «топик или тема» в Apache Kafka
Apache Kafka — это распределенная потоковая платформа, предназначенная для обработки данных в реальном времени. Ее основные концепции — топики и партиции.
Топик, или тема, в Kafka — это логическая единица хранения сообщений. Каждый топик представляет собой категорию или поток данных, куда производители (producers) отправляют свои сообщения, а потребители (consumers, консьюмеры) читают их.
Партиция — это физическая единица внутри топика, которая разбивает топик на несколько логически независимых частей. Каждая представляет собой упорядоченный, неизменяемый набор записей, которые добавляются в конец партиции.
Партиции позволяют распределить данные топика по нескольким брокерам, обеспечивая параллелизм и масштабируемость. У нас доступна только однонодная Кафка (на момент выхода текста). Но здесь партиции также можно применить для параллельной обработки данных несколькими консьюмерами.
Как создать, настроить и обновить тему
Посмотрим, как создавать топик в панели управления my.selectel. Для этого переходим во вкладку Облачная платформа → Базы данных, нажимаем Создать кластер и выбираем Kafka.
После создания можно будет настроить темы.
Переходим во вкладку Топики. Задаем имя темы и количество разделов (партиций):
Впоследствии вы сможете увеличить количество разделов в топике.
Для подключения к топику нужно создать пользователя и выдать ему необходимую роль:
- продюсер — позволяет посылать данные в Kafka,
- консьюмер — позволяет читать данные из Kafka
- сразу обе.
Создадим в качестве примера двух пользователей (это можно сделать в одноименной вкладке). Один будет продюсером, другой — консьюмером:
Впоследствии у пользователя можно добавить или убрать роль. Также можно выбрать один из доступных типов доступа:
- ко всем топикам,
- по имени топика,
- по префиксу.
Подключение к кластеру
Примеры можно посмотреть во вкладке Подключение. Для простоты будем использовать консольную утилиту kafkacat.
Пример подключения консьюмера:
kafkacat -C -b master.152f5a93-8dc3-4b72-a1fc-eb953ec8b382.c.dbaas.selcloud.ru:9092 -t test_topic -X sasl.username=consumer -X sasl.password=<consumer_password> -X security.protocol=SASL_PLAINTEXT -X sasl.mechanisms=SCRAM-SHA-512
После подключения вы увидите сообщения:
% Reached end of topic test_topic [0] at offset 0
% Reached end of topic test_topic [1] at offset 0
Это значит, что консьюмер успешно подключился к теме и ее партициям.
Пример подключения продюсера:
kafkacat -P -b master.152f5a93-8dc3-4b72-a1fc-eb953ec8b382.c.dbaas.selcloud.ru:9092 -t test_topic -X sasl.username=producer -X sasl.password=<producer_password> -X security.protocol=SASL_PLAINTEXT -X sasl.mechanisms=SCRAM-SHA-512
После подключения продюсера в терминале можно начать вводить какие-то значения, они должны появиться в терминале с консьюмером:
% Reached end of topic test_topic [0] at offset 0
% Reached end of topic test_topic [1] at offset 0
2
1
% Reached end of topic test_topic [0] at offset 1
% Reached end of topic test_topic [1] at offset 1
3
% Reached end of topic test_topic [1] at offset 2
4
% Reached end of topic test_topic [0] at offset 2
6
% Reached end of topic test_topic [0] at offset 3
7
% Reached end of topic test_topic [0] at offset 4
8
% Reached end of topic test_topic [1] at offset 3
9
% Reached end of topic test_topic [0] at offset 5
0
% Reached end of topic test_topic [1] at offset 4
Можно заметить, что значения записываются в разные партиции.
Чтобы посмотреть, какие значения лежат в определенных партициях, используйте флаг -p <partition_idx>:
kafkacat -C -b master.152f5a93-8dc3-4b72-a1fc-eb953ec8b382.c.dbaas.selcloud.ru:9092 -t test_topic -X sasl.username=consumer -X sasl.password=<consumer_password> -X security.protocol=SASL_PLAINTEXT -X sasl.mechanisms=SCRAM-SHA-512 -o beginning -p 1
1
3
8
0
a
d
3
5
8
9
11
1
1
1
1
% Reached end of topic test_topic [1] at offset 15
Попробуем увеличить число партиций в теме с двух до трех:
После обновления и переподключения можно увидеть, что продюсер начал писать в новую партицию. То есть партиции можно представить как отдельные бакеты, которые связаны с конкретным топиком. У каждой есть свой offset, который инкрементируется самой Kafka.
C помощью флага -o <offset_position> можно прочитать данные с определенного офсета:
kafkacat -C -b master.152f5a93-8dc3-4b72-a1fc-eb953ec8b382.c.dbaas.selcloud.ru:9092 -t test_topic -X sasl.username=consumer -X sasl.password=<consumer_password> -X security.protocol=SASL_PLAINTEXT -X sasl.mechanisms=SCRAM-SHA-512 -o beginning -p 1 -o 20
2
1
1
1
1
% Reached end of topic test_topic [1] at offset 27
Таким образом можно смотреть сообщения из разных партиций топика и перечитывать их с какой-то позиции (офсета).
Как настроить topic retention
Topic retention — это политика хранения сообщений в теме. Она определяет, как долго сообщения остаются в топике до их удаления.
Основные параметры, влияющие на retention:
- retention.ms — максимальное время хранения сообщений в миллисекундах;
- retention.bytes — максимальный размер данных, который может храниться в топике.
На момент создания текста эти настройки можно менять только на уровне кластера. По дефолту стоит значение 168 часов. По истечении этого времени ранее созданные сообщения удалятся.
Поменяем значение параметра log.retention.ms на 10000 ms:
Теперь Kafka будет удалять сообщения, которые созданы более 10 секунд назад. После перезапуска консьюмера с начальным офсетом в терминале не должно быть сообщений:
kafkacat -C -b master.152f5a93-8dc3-4b72-a1fc-eb953ec8b382.c.dbaas.selcloud.ru:9092 -t test_topic -X sasl.username=consumer -X sasl.password=secret123 -X security.protocol=SASL_PLAINTEXT -X sasl.mechanisms=SCRAM-SHA-512 -o beginning
% Reached end of topic test_topic [0] at offset 45
% Reached end of topic test_topic [1] at offset 44
% Reached end of topic test_topic [2] at offset 37
Но если опять записать какие-то сообщения и подождать 10 секунд, сообщения останутся в терминале. Дело в том, что в Kafka есть настройка log.retention.check.interval.ms, которая по дефолту равна пяти минутам.
log cleaner кафки проверяет сообщения только раз в пять минут.
Заключение
Apache Kafka предоставляет мощные возможности для обработки потоков данных. Понимание концепций топиков и партиций, умение создавать и настраивать топики, а также подключать продюсеров и консьюмеров — ключевые навыки для эффективного использования Kafka в различных сценариях. А настройки retention позволяют контролировать жизненный цикл сообщений и обеспечивают гибкость в управлении данными.