Задачу подготовил Артём Шумейко — внештатный автор, амбассадор Selectel и автор YouTube-канала о разработке.
Условие
Компания решили внедрить брокер сообщений RabbitMQ, так как в последнее время нагрузка на сервисы возросла.
Вы пишете сервис, который рассылает электронные письма пользователям. Для этого вы постоянно слушаете RabbitMQ на предмет новых сообщений и при получении сообщения отправляете рассылку. Но иногда ваш сервис не может обработать запрос на рассылку по неизвестной причине — и что еще хуже, вы не можете исправить этот баг. В результате часть email не отправляются, а пользователи пропускают важные новости о компании.
Задача
Разработайте механизм, который позволяет правильно обрабатывать сообщения в RabbitMQ. При успешной обработке сообщения должны удаляться из очереди, а в случае ошибки — возвращаться в очередь и обрабатываться повторно.
Решение
У RabbitMQ есть встроенные механизмы для подтверждения отправки сообщений, которые помогают решать проблему потерянных писем. Основная идея заключается в использовании механизма ACK/NACK. Когда потребитель получает сообщение из очереди, он либо подтверждает его успешную обработку (ACK), либо сообщает о неудаче и просит отправить обратно в очередь (NACK). Это гарантирует, что сообщение не потеряется.
Посмотрим, как это работает.
Получение сообщений и подтверждение успешной обработки. Когда сообщение обрабатывается правильно, потребитель должен отправить RabbitMQ сигнал, используя basic_ack. Это удалит сообщение из очереди.
def callback(ch, method, properties, body):
try:
process_message(body) # Ваш код обработки
ch.basic_ack(delivery_tag=method.delivery_tag) # Подтверждение успешной обработки
except Exception as e:
# Если что-то пошло не так, зафиксируем ошибку
print(f"Ошибка при обработке сообщения: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # Сообщение вернется в очередь
Обработка ошибок и повторная отправка. Если при обработке сообщения возникает ошибка, оно возвращается в очередь с помощью команды basic_nack. Ключевой параметр здесь — requeue=True, который указывает RabbitMQ, что сообщение нужно поместить обратно в очередь для повторной обработки.
Это решение помогает избежать потери сообщений, особенно в сценариях, когда система временно испытывает проблемы. Например, когда сеть или база данных недоступны. Сообщения, обработка которых завершилась сбоем, повторно попадают в очередь и будут обработаны, как только система восстановит работоспособность.