Подключение Go к Apache Kafka — создание Producer и Consumer и настройка клиента - Академия Selectel

Подключение Go к Apache Kafka

Разбираем установку клиента Go и запуск кластера Kafka, учим создавать продюсеры и консьюмеры, а также задавать дополнительные настройки клиента.

Введение

При планировании инфраструктуры со множеством сервисом можно столкнуться с проблемой коммуникации между ними. Если в цепочке завязанных друг на друге сервисов один станет недоступен, то процесс обработки информации остановится. Для решения подобной проблемы применяются брокеры сообщений, такие как RabbitMQ или Redis Stream. Это централизованные системы обмена данными между сервисами. 

Один из таких брокеров — Apache Kafka, платформа для обработки потоков данных в реальном времени. Она позволяет приложениям обмениваться сообщениями через систему публикации и подписки, что делает ее незаменимым инструментом для высоконагруженных систем. Вы можете развернуть кластер Apache Kafka и администрировать его самостоятельно или же воспользоваться нашим managed-решением — облаком для Apache Kafka.

В этой статье мы рассмотрим, как настроить и использовать Golang для взаимодействия с Apache Kafka, сосредоточившись на создании Producer и Consumer.

Что такое топик, брокер, потребитель и продюсер 

Чтобы разобраться в теме, нужно знать основные понятия.

Топик  — логическое название очереди сообщений. Топики могут быть разбиты на партиции для масштабируемости. Каждая из них хранит данные последовательно, и сообщения в ней идентифицируются офсетами.

Брокер — сервер, который принимает, хранит и отправляет сообщения, опубликованные в топиках. Kafka-кластер состоит из нескольких брокеров, каждый из которых обслуживает часть топиков.

Продюсер (Producer) — компонент, публикующий данные в топики Kafka. Он отвечает за распределение сообщений по разделам топикам и поддерживает конфигурацию уровня подтверждений.

Потребитель (Consumer) — компонент, читающий данные из топиков. Потребители объединяются в группы для параллельной обработки данных. Каждый раздел топика назначается только одному потребителю в группе.

Представьте систему обработки заказов. Продюсером может быть сервис, записывающий новые заказы в топик orders, а потребителем — служба доставки, которая читает данные из этого топика для выполнения заказов. Такое распределение позволяет обрабатывать большое количество событий одновременно, поддерживая масштабируемость и надежность системы.

Установка клиента Golang

Для работы с Apache Kafka в Go мы будем использовать драйвер Confluent Kafka. Перед началом убедитесь, что у вас установлен Go версии 1.17 или выше. Также необходимо активировать Go-модуль в нашем проекте:


    mkdir kafka-go && cd kafka-go
go mod init kafka-go-getting-started
go get github.com/confluentinc/confluent-kafka-go/v2

Эта библиотека предоставляет API для работы с продюсерами и потребителями, поддерживает высокую производительность и гарантирует доставку сообщений.

Запуск кластера Apache Kafka

Для локальной разработки или тестирования удобно использовать Docker. В нашем тесте мы используем кластер из трех брокеров Kafka и одного сервера ZooKeeper, развернутых в контейнерах. Это позволяет моделировать реальную распределенную архитектуру и при этом экономить потребление памяти. Все-таки развернуть четыре виртуальных машины гораздо затратнее по ресурсам, чем столько же контейнеров, особенно если это разворачивается на вашем рабочем компьютере. 

Иногда хочется просто нажать кнопку — и чтобы все работало. Если вы не готовы заниматься настройкой самостоятельно, можете получить готовый облачный кластер Apache Kafka в панели управления Selectel. Такой подход сэкономит ваше время и ресурсы, потраченные на самостоятельное развертывание, а бонусом вы получите удобный интерфейс, мониторинг, SLA и для любителей Infrastructure as Code — управление через Terraform.

Вернемся к нашему локальному кластеру. После установки Docker и Docker Compose в директории проекта создаем файл docker-compose.yml. Ниже приведен пример файла для docker compose:


    services:
  zoo1:
    image: confluentinc/cp-zookeeper:7.3.2
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888
  kafka1:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1
  kafka2:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9093"
      - "29093:29093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1
  kafka3:
    image: confluentinc/cp-kafka:7.3.2
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9094"
      - "29094:29094"
    environment:
     KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1

После создания файла запускаем кластер при помощи docker compose up c ключом -d для работы в фоновом режиме: 


    test@kafka-go:~/docker/kafka-stack-docker-compose$ docker compose up -d
[+] Running 5/5
✔ Network kafka-stack-docker-compose_default  Created                                                                                                                                                       
✔ Container zoo1                              Started                                                                                                                                                          
✔ Container kafka3                            Started                                                                                                                                                          
 ✔ Container kafka2                            Started                                                                                                                                                          
 ✔ Container kafka1                            Started   

Создание Producer

Продюсер является генератором данных и отвечает за их отправку в топики Kafka. В нашем примере мы создадим два продюсера, которые будут отличаться типом записи в топик: синхронным и асинхронным.

В первую очередь необходимо создать файл, например, producer.go. В код импортируем модуль Go для Kafka:


    import (
	…
 	"github.com/confluentinc/confluent-kafka-go/kafka"
…  
)

И далее создаем объект ConfigMap, отвечающий за конфигурацию продюсера: 


     // Настройка конфигурации продюсера
 config := &kafka.ConfigMap{
 "bootstrap.servers": "192.168.18.138:9092,192.168.18.138:9093,192.168.18.138:9094", // Адреса серверов в тестовом кластере Kafka. Поскольку все брокеры расположены на одном хосте, то у них один адрес, но слушают они на разных портах.
 "acks":              "all", // Ожидание подтверждения от всех реплик
 "client.id": 		"myProducer" 
}

Здесь:

  • bootstrap.servers — адреса серверов в тестовом кластере Kafka;
  • acks — включение ожидания подтверждения от всех реплик;
  • client.id — имя продюсера.

Инициализация

В файле producer.go выполним инициализацию продюсера. Для этого можно использовать следующий шаблон:


    // Инициализация продюсера
producer, err := kafka.NewProducer(config)
if err != nil {
    fmt.Printf("Failed to create producer: %s\n", err)
    os.Exit(1)
}

defer producer.Close()
fmt.Println("Producer initialized")

При инициализации мы передаем продюсеру конфигурацию из объекта ConfigMap. После этого переходим к разделу отправки сообщений. Мы можем настроить синхронное или асинхронное взаимодействие в зависимости от требований нашей инфраструктуры. Ниже рассмотрим оба варианта.

Настройка асинхронного взаимодействия — asynchronous writes

Использование такого взаимодействия целесообразно, когда:

  • неважно, чтобы Producer доставил сообщение для Consumer прямо сейчас. Сервисы, принимающие сообщения, могут быть заняты долгими и сложными вычислениями, которые не позволяют выдать ответ сразу же;
  • у сервиса, отправляющего сообщения в топик, нет необходимости получить ответ сразу же. Producer может до получения ответа выполнять другую работу;
  • нам не важен ответ. То есть случаи, когда сообщение даже не было обработано, не влияют на работу.

Фрагмент кода для отправки сообщений в асинхронном виде:


    // Отправка сообщений в топик
    topic := "async-topic"
        for _, word := range []string{"this", "is", "asynchronous", "message", "delivery", "in", "kafka", "with", "Go", "Client"} {
                producer.Produce(&kafka.Message{
                        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
                        Value:          []byte(word),
                }, nil)
        }

В Go отправка сообщения в топик производится вызовом метода Produce(), в который мы передаем объект Message. При необходимости можно использовать опциональный канал chan Event, который используется для получения результатов отправки сообщения. Объект Message содержит поле interface{}, которое используется для передачи произвольных данных вместе с сообщением в обработчик событий продюсера.


    // Асинхронная обработка событий продюсера
    go func() {
        for e := range producer.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition.Error)
                } else {
                    fmt.Printf("Successfully produced record to topic %s partition [%d] @ offset %v\n",
                    *ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
                }
            }
        }
    }()

В этом примере асинхронной отправки мы используем горутину (goroutine) для обработки отчетов о доставке сообщений. Если канал доставки Events() обозначен, то отчеты будут отправляться туда.  Для ожидания этих отчетов можно снова использовать горутину. Но даже в этом случае канал Events() должен обрабатываться, так как он содержит ошибки на уровне клиента и управляющие сообщения.

Настройка синхронного взаимодействия— synchronous writes

У синхронной записи есть свое отличие. После отправки сообщения Producer ждет от Kafka подтверждения о том, что оно доставлено и сохранено в топик. При этом Producer блокируется до получения подтверждения. Но как только подтверждение поступит, код продолжает выполняться. 

В этом кроется и преимущество, и недостаток данного типа взаимодействия. С одной стороны, мы получаем высокую надежность и упрощенную локализацию неполадок. С другой, задержка становится выше из-за ожидания подтверждения, а вместе с этим падает производительность при большом количестве сообщений.

Для реализации синхронного взаимодействия необходимо создать канал доставки и для каждого сообщения ожидать подтверждения от Kafka:


    // Создание канала доставки
    deliveryChan := make(chan kafka.Event)

    // Отправка сообщений в синхронном режиме
    topic := "sync-topic"
    for _, word := range []string{"this", "is", "synchronous", "message", "delivery", "in", "kafka", "with", "Go", "Client"} {
        producer.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, deliveryChan)

        // Ожидание события доставки
        event := <-deliveryChan
        m := event.(*kafka.Message)

        if m.TopicPartition.Error != nil {
            fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
        } else {
            fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
            	*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
        }
    }

    // Закрытие канала подтверждений
    close(deliveryChan)

Как можно увидеть, в выполняемом цикле переход к следующей итерации выполняется только после получения подтверждения через канал kafka.Event.

Создание Consumer

Роль консьюмера подразумевает считывание сообщений из топиков Kafka. Инициация считывания происходит со стороны консьюмера, то есть к брокеру сообщений направляется запрос на получение новых сообщений. При этом каждый консьюмер должен принадлежать какой-либо консьюмер-группе.

Для создания consumer создадим файл consumer.go, в который также импортируем модуль Go для Kafka.


    import (
	…
 	"github.com/confluentinc/confluent-kafka-go/kafka"
…  
)

Создадим ConfigMap для нашего консьюмера:


    // Настройка конфигурации консьюмера
 config := &kafka.ConfigMap{
 "bootstrap.servers": "192.168.18.138:9092,192.168.18.138:9093,192.168.18.138:9094", 
  "group.id":          "myGroup", 
      "auto.offset.reset": "smallest", 
        })

Здесь:

  • bootstrap.servers — адреса Kafka-брокеров;
  • group.id — идентификатор группы потребителей;
  • auto.offset.reset — стратегия поведения при отсутствии смещения, например, smallest — для чтения с начала.

 Инициализация

Для инициализации consumer используем следующий фрагмент кода:


    // Инициализация консьюмера
    consumer, err := kafka.NewConsumer(config)
    if err != nil {
        panic(fmt.Sprintf("Failed to create consumer: %v", err))
    }
    
    if err != nil {
		panic(err)
    }
   
    err = consumer.SubscribeTopics([]string{"async-topic", "sync-topic"}, nil) // Указываем подписку на топики async-topic и sync-topic.

    if err != nil {
            panic(err)
    }
    fmt.Println("Consumer initialized")

Методу Consumer.SubscribeTopics мы передали названия топиков, из которых нужно получать сообщения.

После инициализации потребителя основная логика приложения строится вокруг цикла, который обрабатывает сообщения. Для этого используется метод consumer.Poll, который возвращает сообщения или события по мере их поступления. Также это может произойти по таймауту, который указывается в миллисекундах.


    for run == true {
    ev := consumer.Poll(100)
    switch e := ev.(type) {
    case *kafka.Message:
        // application-specific processing
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

consumer.Close()

Рекомендуется в конце цикла консьюмера использовать метод consumer.Close — он гарантирует закрытие активных сокетов и очистку состояния. Это необходимо для правильной балансировки передачи партиции другим консьюмерам, участникам одной и той же группы. В ином случае ребалансировка произойдет только после завершения сессии по таймауту. 

Настройка синхронных коммитов — synchronous commits

В Kafka важно фиксировать смещения (offsets), чтобы отслеживать, какие сообщения уже обработаны. Go-клиент предоставляет метод consumer.Commit для синхронного коммита. Вот пример:


    msg_count := 0
for run == true {
    ev := consumer.Poll(100)
    switch e := ev.(type) {
    case *kafka.Message:
        msg_count += 1
        if msg_count % MIN_COMMIT_COUNT == 0 {
            consumer.Commit()
        }
        fmt.Printf("%% Message on %s:\n%s\n",
            e.TopicPartition, string(e.Value))

    case kafka.PartitionEOF:
        fmt.Printf("%% Reached %v\n", e)
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

Здесь синхронный коммит выполняется каждые MIN_COMMIT_COUNT сообщений. Также мы можем выполнять коммит по таймауту, чтобы гарантировать постоянное обновление смещения.

Гарантии доставки — delivery guarantees

Kafka предоставляет разные уровни гарантий доставки сообщений.

  • At least once  — такой способ гарантирует, что сообщение будет доставлено как минимум один раз. Смещение фиксируется после успешной обработки. Это исключает потери, но может привести к дубликатам, если обработка завершится успешно, а коммит не сработает.
  • At most once — сообщение может быть доставлено потребителю не более одного раза. Смещение фиксируется до обработки сообщения, что добавляет производительности, но несет риски потери данных.
  • Exactly Once -— сообщение доставляется и обрабатывается ровно один раз. Подобная стратегия исключает как потери, так и дубликаты, но при этом требует больше ресурсов и сложнее в реализации.

Ниже пример реализации варианта At most once, в котором мы фиксируем смещение до обработки сообщения:


    for run == true {
    ev := consumer.Poll(100)
    switch e := ev.(type) {
    case *kafka.Message:
        err = consumer.CommitMessage(e)
        if err == nil {
            msg_process(e)
        }

    case kafka.PartitionEOF:
        fmt.Printf("%% Reached %v\n", e)
    case kafka.Error:
        fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
        run = false
    default:
        fmt.Printf("Ignored %v\n", e)
    }
}

Настройка асинхронных коммитов — asynchronous commits

Чтобы выполнить коммит асинхронно, можно взять пример из настройки синхронных коммитов и вызвать метод consumer.Commit в горутине:


    if msg_count % MIN_COMMIT_COUNT == 0 {
            go func() {
                offsets, err := consumer.Commit()
            }()
        }

Также при использовании асинхронных коммитов стоит озаботиться событиями ребалансировки, возвращаемых методом consumer.Poll.

Дополнительные настройки клиента

Для продюсера и консюмера можно настроить разные опции. Их полный список вы найдете в документации к Apache Kafka — мы рассмотрим только наиболее важные. 

Буферизация байтов

Эта опция позволяет задать минимальное количество байтов информации, которое нам необходимо для получения. Таким образом, консюмер будет опрашивать брокер на наличие новых данных. Если их объем будет равен или больше заданного количества, брокер отправит эти данные общим пакетом.


    r := kafka.NewReader(kafka.ReaderConfig{
	Brokers:  []string{kafka-1, kafka-2, kafka-3},
	Topic:    $topic,
	GroupID:  "myGroup",
	MinBytes: 5,
	MaxBytes: 1e6,
})

При этом, если указать опцию минимального количества байтов — MinBytes, то Kafka потребует добавить и верхнюю границу в виде максимального количества байтов, которые кластер может отправить консьюмеру — MaxBytes.

Максимальное время ожидания

При использовании буферизации байтов можно столкнуться с проблемой: некоторые данные застревают на брокере, пока не наберется минимальный объем. Чтобы ее решить, используйте настройку максимального времени ожидания между полученными сообщениями — MaxWait. По прошествии заданного времени новое сообщение будет получено, даже если его объем меньше указанного минимального объема.


    r := kafka.NewReader(kafka.ReaderConfig{
	Brokers:  []string{kafka-1, kafka-2, kafka-3},
	Topic:    $topic,
	GroupID:  "myGroup",
	MinBytes: 5,
	MaxBytes: 1e6,
	MaxWait: 3 * time.Second,
})

Смещение позиции сообщения — offset

Когда новый консьюмер добавляется к топику, у него есть две опции получения данных:

  • Earliest — консьюмер начнет получать данные с первого доступного сообщения в топике;
  • Latest — консьюмеру будут поступать только те данные, которые появились после его добавления к топику.

Для этой настройки используются константы FirstOffset и LastOffset: 


    r := kafka.NewReader(kafka.ReaderConfig{
	Brokers:  []string{kafka-1, kafka-2, kafka-3},
	Topic:    $topic,
	GroupID:  "myGroup",
	StartOffset: kafka.FirstOffset,
	})

Заметьте, что такая опция применяется только к новым группам консьюмеров. Если вы уже получали данные с тем же GroupID, то продолжите с того места, где остановились.

Пакетирование сообщений

Ранее мы указали настройки для консьюмеров, теперь рассмотрим опции для продюсеров.

Пакетирование сообщений сходно с настройками буферизации байтов и максимального времени ожидания, поскольку выполняет те же действия, но со стороны продюсера.

Главные параметры здесь — Batch Size и Batch Timeout:

  • Batch Size — определяет количество сообщений, которые должны быть отправлены в буфер перед записью в брокер;
  • Batch Timeout: — определяет время между отправкой сообщений.

Аналогично консьюмеру, данные отправляются по истечении времени, даже если их объем не удовлетворяет минимальному. 

w := kafka.NewWriter(kafka.WriterConfig{


    w := kafka.NewWriter(kafka.WriterConfig{
	Brokers: []string{kafka-1, kafka-2, kafka-3},
	Topic:   $topic,
	BatchSize: 10,
	BatchTimeout: 2 * time.Second,
})

Вывод подтверждений

В кластере Kafka сообщения распределяются между несколькими брокерами. При этом один из брокеров будет лидером, а остальные — репликами. Исходя из этого есть три варианта вывода:

  • подтверждение от всех брокеров, что сообщение получено — это соответствует значению -1;
  • только подтверждение от лидера о получении сообщения — значение 1. Реплики также получат сообщение, однако мы не будем ждать их ответа;
  • ни один брокер не подтверждает получение — соответствует значению 0.

Вывод подтверждений настраивается с помощью опции RequiredAcks:


    w := kafka.NewWriter(kafka.WriterConfig{
	Brokers: []string{kafka-1, kafka-2, kafka-3},
	Topic:   $topic,
	RequiredAcks: 1,
})

Заключение

Мы рассмотрели, как подключить Go к Kafka, как создавать Producer и Consumer, а также некоторые аспекты тонкой настройки клиента. И вот на этом этапе мы пока что остановимся, поскольку изучать эту тему можно на протяжении многих статей. Надеемся, что этот материал поможет вам интегрировать Apache Kafka в ваши Go-приложения и желаем успехов в ваших проектах.

В Selectel вы можете воспользоваться облаком для Apache Kafka. Мы возьмем на себя развертывание и администрирование Kafka в облаке, а также настройку и оптимизацию производительности сервиса. А еще позаботимся об оборудовании, его модернизации и замене комплектующих. Вы получите полностью готовый к работе сервис Kafka с оплатой только за потребленные ресурсы. Работать с сервисом можно из панели управления, через API или Terraform.

Пример пошаговой реализации проекта на Kafka в Selectel вы найдете в Академии, а подробности о подключении к кластеру — в нашей документации.