При создании управляемых событиями приложений с использованием Kafka на стороне потребителя после получения сообщения Kafka ваше приложение должно что-то с ним сделать. Для этого сообщения в блоге давайте назовем эту часть «Обработка сообщения».

Что такое Кафка?

Но прежде чем мы углубимся в это, давайте убедимся, что мы все на одной странице, и освежим нашу память в том, что такое Кафка.

Apache Kafka — это сверхмощная распределенная платформа потоковой передачи, которая позволяет приложениям обрабатывать, хранить и анализировать большие потоки данных в режиме реального времени. Он создан для обработки больших объемов, высокой пропускной способности и низкой задержки потоков данных.

В Kafka потребитель — это приложение, которое читает сообщения из одной или нескольких тем. Потребители подписываются на одну или несколько тем и потребляют сообщения, созданные в этих темах. Потребители отслеживают смещение сообщений, которые они использовали, чтобы они могли продолжить с того места, на котором они остановились, в случае сбоев или перезапусков.

Потребители также могут быть частью группы потребителей, где каждому потребителю в группе назначается подмножество разделов для темы, что позволяет параллельно потреблять данные.

Работа с примерами

Теперь, когда мы все на одной странице, давайте создадим пример. Представьте, что вы работаете в компании ACME, и у вас есть тема Kafka, которая получает новое сообщение каждый раз, когда создается новый клиент. Этот клиент должен получить электронное письмо о том, что его учетная запись полностью создана и что ему необходимо подтвердить свой адрес электронной почты.

Когда мы читаем что-то вроде «когда что-то происходит», делаем «что-то еще», всегда думаем о событиях! События у Кафки — это сообщения!

Мы собираемся создать микросервис, который будет:

  • Получите это новое сообщение для клиентов от Kafka.
  • Составьте красивое электронное письмо.
  • Отправьте заказчику.
  • Добавить новое сообщение в другую тему о том, что сообщение было отправлено!

Давайте нарисуем это для большей наглядности:

Где повторные попытки?

Круто, но это сообщение в блоге о повторных попытках, верно? Где повторы во всем этом?

Ну, любая часть обработки может пойти не так, и мы не хотим, чтобы наши клиенты пропустили электронные письма о создании своей учетной записи, не так ли? Итак, давайте поговорим о нескольких вещах, которые могут пойти не так, и о том, как мы можем справиться с ними с помощью повторных попыток:

  • База данных шаблонов электронной почты может быть недоступна. Нет проблем, мы просто будем пытаться подключиться, пока он не восстановится.
  • Сохраненный внутри него шаблон может быть недействительным. Мы проверим это и предупредим команду, чтобы обновить его, если это необходимо.
  • Сервер SMTP может быть недоступен или отбросить сообщение. Мы будем пытаться отправить электронное письмо, пока оно не пройдет.
  • Приложение может неправильно составить SMTP-сообщение, и SMTP-сервер отклонит его. Мы перехватим эту ошибку и исправим композицию перед повторной попыткой.
  • Отправка сообщения в другую тему Kafka может вернуть ошибку на стороне сервера Kafka. Мы будем продолжать попытки, пока он не будет успешно отправлен.

Для устранения некоторых из упомянутых ошибок может потребоваться модификация кода или данных, например обновление шаблона или исправление способа составления приложением SMTP-сообщения. Эти типы ошибок могут не подходить для повторных попыток, поскольку они могут потребовать ручного вмешательства.

С другой стороны, другие ошибки, такие как временное время простоя базы данных, ненадежное создание сообщений или перегруженные SMTP-серверы, потенциально могут быть устранены повторной попыткой операции.

Разблокировка повторных попыток потребителя Kafka

В качестве ежедневной проблемы в некоторых наших микросервисах отсутствие встроенного механизма повторных попыток в Kafka может вызывать разочарование. Я экспериментировал с различными программными архитектурами для повторной обработки сообщений, но ни одна из них не была идеальным решением — но есть одна, с которой проще всего справиться.

Прежде чем погрузиться в решение, позвольте мне рассказать о моих предыдущих попытках.

AWS SQS: отличный вариант, но с ограничениями

Как ярый сторонник AWS, при рассмотрении повторных попыток моей первой мыслью был AWS SQS.

Amazon Simple Queue Service (SQS) – это полностью управляемый сервис очередей сообщений, который позволяет разделить и масштабировать микросервисы, распределенные системы и бессерверные приложения. Он поставляется со встроенными возможностями повторных попыток, такими как алгоритм отсрочки, максимальный предел повторных попыток перед переходом в очередь недоставленных сообщений и поддерживает максимальный размер сообщения 256 КБ.

Хотя SQS — отличный вариант для повторных попыток, он не соответствует размеру сообщений. Сообщения Kafka по умолчанию могут иметь размер до 1 МБ и даже больше.

Но затем вы говорите мне: «Хорошо, Алекс, но это не имеет большого значения», ну, это как бы так, потому что у нас нет хорошего способа контролировать размер сообщения, он может вырасти и сломать ваше приложение, даже если вы этого не осознаете! И к тому времени вы потеряете сообщения или у вас будет большой производственный инцидент.

Таким образом, хотя SQS — отличный вариант, он не подходит для наших микросервисов на основе Kafka. Но не волнуйтесь, мы нашли решение, которое работает еще лучше.

Кафка

Да, именно так, раз SQS не подходит, почему бы не использовать саму Кафку?

Я попытался создать внутреннюю тему приложения Kafka, которая будет использоваться только приложением, где приложение будет отправлять ему сообщения, а в случае сбоя оно снова поставит сообщение в очередь с новым атрибутом счетчика в заголовке. Нарисую схему, чтобы было понятнее:

Хотя этот подход действительно работает, он имеет ряд существенных недостатков. Во-первых, настроить темы Kafka не так просто, как SQS, создание нескольких тем может быть несколько громоздким, а работа со всеми потребителями и производителями в коде может стать довольно запутанной.

Кроме того, Kafka не поддерживает встроенные повторные попытки. Это связано с тем, что протокол Kafka предназначен для потоковой передачи данных с высокой пропускной способностью и малой задержкой, а повторные попытки увеличивают нагрузку и усложняют систему.

Базы данных

Отчаянные времена требуют отчаянных мер, верно?

В крайнем случае я решил использовать базы данных для повторных попыток отправки сообщения. Современные базы данных SQL имеют механизмы БЛОКИРОВКИ, которые можно использовать для использования таблицы в качестве очереди.

К моему удивлению, этот подход сработал! Однако это не идеальное решение. Базы данных не предназначены для работы в качестве очередей, и их использование таким образом может оказаться затруднительным. У этого подхода есть несколько недостатков:

  • Базы данных создаются для согласованности с использованием транзакций, что может сделать их производительными, но они никогда не будут работать так же хорошо, как специализированный инструмент, такой как Kafka. Это может легко стать узким местом в системе.
  • Управление всем этим кодом может легко стать проблематичным, и это непростая схема для всей компании.

Итак, что теперь? Память!

Попробовав все вышеперечисленные решения и будучи неудовлетворенным результатами, я решил посмотреть, как другие компании и фреймворки обрабатывают повторные попытки.

Одной из сред, которая мне особенно запомнилась, является Spring, среда Java с открытым исходным кодом, которая предоставляет комплексную модель программирования и конфигурации для создания современных корпоративных приложений.

Весенний путь

Одним из модулей Spring является Spring Kafka, который предоставляет несколько способов обработки повторных попыток при использовании сообщений. К ним относятся использование интерфейсов RetryTemplate и RetryCallback из библиотеки Spring Retry для определения политики повторных попыток, использование аннотации KafkaListener для настройки поведения повторных попыток для каждого метода или использование AOP для обработки повторных попыток с помощью RetryInterceptor.

Spring Kafka позволяет разработчикам выбирать подход, который лучше всего соответствует их варианту использования, будь то глобальная политика повторных попыток или более детальная конфигурация повторных попыток для каждого метода.

Прошло много времени с тех пор, как я не работал с Java, и код Spring далеко не прост для понимания (это довольно продвинутый материал), но после более глубокого погружения в структуру Spring Kafka я обнаружил, что он реализует обработчик ошибок с использованием стратегия обработки исключений, создаваемых потребителями.

Когда возникает исключение, не являющееся BatchListenerFailedException (которое, как знает Spring, невозможно повторить), обработчик ошибок повторяет попытку пакета записей из памяти. Это предотвращает повторную балансировку потребителей во время расширенной последовательности повторных попыток и обеспечивает более элегантное решение.

По моему честному мнению, среда Spring Kafka предлагает надежный и гибкий подход к обработке повторных попыток, который стоит рассмотреть для любого корпоративного приложения.

Он предоставляет ряд вариантов для выбора разработчиками и позволяет более детально контролировать поведение повторных попыток, что делает его отличным выбором для любой организации, стремящейся внедрить механизм повторных попыток для своей системы обмена сообщениями.

Воссоздание механизма повторных попыток Spring Kafka в Go

В настоящее время я работаю (и люблю 💙) с Go, и, насколько я могу судить, я не нашел переписывания фреймворка Spring Kafka и Retry в Go, так что давайте создадим свой собственный!

Здесь я хочу указать на одно ключевое отличие моей реализации от подхода Spring Kafka. Я ценю идею приостановить Потребитель и вернуться к предыдущей записи партии в случае катастрофического сбоя. Однако я не считаю это необходимым и решил реализовать его по-другому.

Мой подход включает в себя:

  • Создание Consumer, который читает из темы.
  • Создание канала Go, который обрабатывает повторные попытки.
  • Если сообщение не удается обработать, оно отправляется в канал повторных попыток.
  • Основной потребитель продолжает обрабатывать основную тему, в то время как другие сообщения повторяются.
  • Если повторные попытки исчерпаны, сообщение отправляется в постоянный DLQ для последующего изучения.

⚠️ Важно отметить, что этот подход предполагает, что ваше приложение не сильно зависит от порядка сообщений. Если это так, вам нужно будет остановить потребителя при повторной попытке сообщения. Этот подход, однако, позволяет добиться прогресса в теме, пока предпринимаются повторные попытки.

Вот код

Объяснение обычным текстом

Пакет kafka_retry_dlq предоставляет механизм для использования сообщений из темы Kafka с механизмом повторных попыток в случае ошибок обработки.

Типы

  • ProcessRetryHandler: интерфейс, который должен быть реализован типами, которые будут обрабатывать сообщения. Он имеет следующие методы:
  • Ошибка Process(context.Context, kafka.Message): обрабатывает сообщение, возвращает ошибку, если обработка не удалась.
  • MoveToDLQ(context.Context, kafka.Message): перемещает сообщение в очередь недоставленных сообщений.
  • MaxRetries() int: возвращает максимальное количество повторных попыток для сообщения.
  • Backoff() backoff.BackOff: возвращает стратегию отсрочки для повторной попытки сообщения.

Переменные

  • retryQueue: канал kafka.Message с буфером на 1000 сообщений.

Функции

  • NewConsumerWithRetry(ctx context.Context, Brokers []string, тематическая строка, раздел int, обработчик ProcessRetryHandler): создает нового потребителя Kafka с механизмом повторных попыток.
  • Он создает новую программу чтения Kafka с предоставленными брокерами, темой, разделом и размером буфера.
  • Он запускает горутину, которая читает сообщения из топика Kafka и обрабатывает их с помощью предоставленного обработчика.
  • Если обработка сообщения возвращает ошибку, сообщение отправляется в канал retryQueue.
  • Он запускает другую горутину, которая прослушивает канал retryQueue и обрабатывает сообщения с помощью предоставленного обработчика, пока не будет достигнуто максимальное количество повторных попыток или сообщение не будет успешно обработано.
  • Если достигнуто максимальное количество повторных попыток, сообщение перемещается в очередь недоставленных сообщений с помощью метода обработчика MoveToDLQ.

В заключение

Повторы играют важную роль в повышении отказоустойчивости систем, управляемых событиями, позволяя им обрабатывать непредвиденные сбои и ошибки. Однако очень важно также сочетать повторные попытки с метриками, целевыми показателями уровня обслуживания (SLO) и соглашениями об уровне обслуживания (SLA), чтобы гарантировать, что система не будет все время находиться в неисправном состоянии. Цель состоит в том, чтобы повторные попытки приложения выполнялись только в исключительных случаях, и сочетание повторных попыток с SLO и SLA помогает гарантировать это.