Подписки с сохранением курсора на клиенте

Не всегда более производительное решение - решение, требующее меньше ресурсов при своей работе, - является лучшим. Часто сопутствующие факторы являются более значимыми: предсказуемость поведения при сбоях, скорость восстановления работоспособности после сбоев и т.п.

Рассмотрим это на примере систем межсервисного взаимодействия.

  • Курьер доставил заказ. По смене статуса заказа надо уведомить заинтересованные стороны об этих событиях.
  • Клиент отправляет сообщение в чат поддержки. Нужно уведомить сервисы поддержки о поступивших данных от клиента.
  • Построение отчёта завершено. Ожидающий отчёт пользователь может его загрузить. Надо его уведомить об этом.

Знакомые/типовые ситуации. Одному сервису надо уведомить другой (другие) о происшедших событиях.

Давайте немного усложним:

Сервер - находится в нашей юрисдикции. Мы следим за тем, чтоб ресурсов ему на всё хватало. Добавляем ноды в кластер и т.п.

Заинтересованные в событиях стороны - находятся “где-то”. На серверах клиентов. Которые экспериментируют, экономят на железе. Используют программные решения, не предназначенные для нагрузки, и т.п.

Какие способы уведомления есть?

Активность со стороны сервера

Это, в общем-то, типовое решение. Сервер держит список заинтересованных сторон. По мере появления событий выполняет HTTP-запросы к клиентам.

Подвариант этого решения: Websocket. Сервер отправляет события в сокеты всем подписанным сторонам.

Какие нюансы возникают при таком подходе?

Повторы, обработка ошибок

Рано или поздно любой TCP/HTTP-канал сталкивается с недоступностью другой стороны. Что делать после возникновения ошибки? Повторять запросы? Что делать с вновь поступающими запросами? Ждать, пока успешно выполнятся предыдущие?

Рассмотрим виды ошибок:

  1. Сетевые
  2. Устранимые (после повтора могут исчезнуть) HTTP (500, 502, 504, и т.п.)
  3. Неустранимые (4xx)

Получив неустранимую ошибку, клиент может только записать её в лог. То есть, если полная остановка доставки сообщений не приемлема, то, получив неустранимую ошибку, типовым решением будет считать, что “уведомление доставлено”, и переходить к доставке следующих уведомлений. Вероятно, это единственный нормальный путь.

Идя по этому пути, надо постоянно и внимательно следить за мониторами таких ошибок. Анализировать трафик на тему “почему возникла неустранимая ошибка?” и “можно ли жить дальше с этой ошибкой”.

Но это не самая большая проблема.

Более интересными являются проблемы:

  • повторов
  • 500-х ошибок

500-е ошибки

Мы выполняем запрос-передачу данных для сервера X. Происходит 500-я ошибка. Что это?

Возможны два варианта:

  1. Сервис-приёмник данных по какой-то причине именно сейчас не работает (перегружается, переключается БД итп). В этом случае повтор запроса в дальнейшем приведёт нас к успеху.
  2. В сервисе допущена ошибка, приводящая к 500. В этом случае, сколько бы повторов мы ни сделали, до исправления кода в приёмнике ситуация не изменится.

То есть, по повторяемости запросов ошибки у нас делятся на три вида:

  1. Те, которым повтор поможет (сетевые, устранимые 500-ки).
  2. Те, которым повтор не поможет, но выглядят как те, которым поможет (неустранимые 500-ки).
  3. Те, которым повтор не поможет (например 40x-ки).

Разрабатывая политику повторов, помимо указанной проблемы, имеем ещё множество других проблем:

  1. Как часто повторять запросы?
  2. Не будем ли мы “укладывать” внешний сервис, повторяя запросы?
  3. Не будем ли сами “укладываться”, если одна из внешних систем по какой-то причине имеет некорректный TCP-стек (iptables DROP)?

Если посмотреть на систему повторов запросов, то обнаружится, что практически в каждом случае она выбирается индивидуально.

Подытожим:

Если сервис, генерирующий событие, и занимается доставкой его до заинтересованных сторон, то имеем

плюсы:

  • минимальный лаг доставки
  • минимальная нагрузка на хранилище сообщений;

минусы:

  • необходимость повторов в случае неуспеха доставок
  • необходимость ведения реестра, кому что доставлено и кому что нужно доставить
  • двусмысленность некоторых ошибок: непонятно, можно (нужно) ли повторять, или нет
  • зависимость от стека TCP на стороне клиентов (iptables -j DROP занимает слот отправки вплоть до таймаута)
  • система повторов может быть причиной DDoS для клиентских сервисов.

Также есть некоторое количество организационных минусов:

  • После того, как клиент прекратил де-факто работу (тут два варианта: сервера выключены, сервера не выключены), система продолжает доставлять ему уведомления.

Вебсокет в режиме клиент-сервер

Часть описанных проблем решает постоянное соединение, инициируемое клиентом. Однако именно часть.

Необходимость повторов и двусмысленность ошибок - снимаются. Однако, необходимость ведения реестра, кому и что нужно доставить (если мы говорим о системе сообщений “без потерь”) остаётся. Зависимость от стека TCP на стороне клиента снижается, но не до нуля. Система также может быть причиной DDoS для клиентских сервисов.

Пулинг

Достоинства пулинга

  • Если у клиента проблемы со связью, нагрузкой – он просто не делает запросы
  • После того, как клиент отключается (организационный момент), – он перестаёт делать запросы
  • Максимально быстрое восстановление работоспособности после факапов.

Недостатки пулинга

  • минимальный лаг доставки сообщений равен интервалу пулинга, который обычно выбирается ненулевым
  • множество сервисов пулящих один создают существенно бОльшую нагрузку, нежели случай с активным сервисом. Сервисы, для которых нет сейчас никаких сообщений, всё равно создают нагрузку на подсистему доставки сообщений.

Ещё один неочевидный, организационный недостаток пулинга: часто способ получения новой порции данных связан со структурой хранения данных.

Если говорить о межсервисном взаимодействии (невысокое количество клиентов), то получается, что пулинг клиентом – наиболее выгодное решение. Почему?

  • отсутствие двусмысленности, описанной выше
  • наиболее быстрое восстановление работоспособности после сбоя
  • максимальная независимость от сетевого стека TCP на клиенте
  • нет необходимости хранить/майнтенить список клиентов.

Недостатки пулинга

Для чего вводят интервал пулинга? Каждый клиент, делающий 1 запрос за данными в секунду, - это 1RPS нагрузки. Почему нельзя пулить, не используя интервал (делать запрос сразу после получения результатов предыдущего)? Потому что обычно запрос за данными является сравнительно дорогим. А дорогим он является потому, что, как правило, некорректно спроектирован.

Как правило, запросы для пулинга формулируются как “есть ли данные для меня; если есть, то какие?”. Такие запросы (в случае, если они некорректно спроектированы) зачастую имеют следующие проблемы:

  • запрос неиндексирован
  • при перегрузках количество данных в ответе может расти, или время выполнения запроса ухудшаться.

В случае, если получение очередной порции данных сопровождается простой выборкой из BTREE индекса, то и ответ на вопрос “есть ли данные?”, как правило, сравнительно бесплатен. Об индексах поговорим ниже.

А сейчас давайте рассмотрим алгоритм работы традиционного пулера.

  1. Первичная инциализация пулера. index := 0. index – это обобщённая переменная, указывающая на позицию запрашиваемых данных.
  2. Выполняется запрос limit данных с позиции index.
  3. Обрабатываем полученные данные
  4. index := index + 1
  5. Пауза соответствующая интервалу
  6. Перейти к шагу 2

Если рассматривать этот алгоритм, то видим, что переменная index и есть то, что связывает нас со структурой хранения данных.

Такой алгоритм, как правило, используют новички и… приводят себя к трудноустранимой проблеме: запросы с большими значениями index сделать индексируемыми крайне сложно. Почти невозможно.

Почему разработчик попадает в такую ситуацию? Потому что проектирует БД и API отдельно друг от друга. А нужно посмотреть на все компоненты в целом и на влияние их друг на друга.

Проблема состоит в том, что в БД, как правило, данные хранятся в виде плоских таблиц. Когда мы получаем очередную порцию данных с одними и теми же условиями фильтрации, то приходится делать что-то вроде следующего:

SELECT
    *
FROM
    "table"
WHERE
    "somefield" = $1
LIMIT
    100
OFFSET
    $2

То есть, index из алгоритма пересчитывается в смещение ($2). Такой запрос из БД имеет всё более ухудшающийся план выполнения по мере роста смещения (которое растёт с ростом index).

Как сделать план независящим от положения смещения? Использовать вместо смещения выборку из индекса:

SELECT
    *
FROM
    "table"
WHERE
    "id" > $1
ORDER BY
    "id"
LIMIT
    100

В этом случае алгоритм клиента должен поменяться таким образом, чтобы каждой итерации цикла использовать значение index, взятое от предыдущего шага. Перепишем алгоритм:

  1. Первичная инициализация. index := 0
  2. Выполняем запрос limit данных, передавая в запрос index
  3. Вычисляем новое значение index, как максимум от id в ответе
  4. Обрабатываем данные
  5. Пауза, соответствующая интервалу
  6. Перейти к шагу 2

В системе с такой архитектурой, как правило, уже нет существенных препятствий к снижению интервала до минимальных значений (вплоть до нуля).

Но давайте ещё порефлексируем над архитектурой. Что плохого в ней?

  • Алгоритм привязан к структуре данных
  • Выполняется практически полностью на стороне клиента
  • Вследствие предыдущей проблемы сложно, например, централизованно модифицировать его на иную работу после факапов/проблем.
  • Пользователь может сам подставлять в index произвольные значения. Иногда это может быть нежелательно или приводить к багам, которые разработчику сервера сложно понять.

Давайте ещё раз модифицируем алгоритм. Заменим index на state и управлять им будем с сервера:

  1. Первичная инициализация. state := null.
  2. Выполняем запрос limit данных, передавая в запрос значение state
  3. В каждом ответе, помимо данных, сервер возвращает new_state. state := new_state
  4. Обрабатываем данные
  5. Пауза-интервал
  6. Перейти к шагу 2

Что мы получили? Гибкость.

  • Переменная state определяется только сервером и не обязана быть привязанной к числу смещения. При желании в этой переменной можем хранить JSON со многими полями.
  • При желании можем ограничить возможности пользователя “хачить” запросы (использовать другие значения index, помните выше мы об этом говорили?). Этого можно достичь, например криптоподписывая state.

Если в переменной state хранится не только позиция окна, а, например, и значения фильтров и криптоподпись, то эту переменную имеет смысл называть курсором. Переименуем переменную ещё раз и избавимся от постоянных задержек:

  1. Первичная инициализация. cursor := null, filters = значения_фильтров.
  2. Выполняем запрос limit данных, передавая в запрос значение cursor, filters.
  3. В каждом ответе, помимо данных, сервер возвращает cursor. cursor := response.cursor
  4. Обрабатываем данные
  5. Если данные были, перейти к шагу 2
  6. Пауза-интервал
  7. Перейти к шагу 2

Таким образом, получаем алгоритм, минимизирующий число запросов, если данных для клиента нет, и запрашивающий данные с максимальной производительностью, если таковые имеются.

Рекомендации по работе с курсорами:

  • Поскольку хранением курсора между запросами озадачен клиент, то имеет смысл хранить в курсоре и версию ПО сервера. В этом случае можно написать дополнительный код, обеспечивающий обратную совместимость (конвертацию форматов курсоров).
  • Во избежание трудных багов весь набор фильтров, полученных в первом запросе, хорошо хранить в курсоре и в последующих запросах игнорировать параметры фильтрации не из курсора. Перфекционисты могут даже выделить инициализацию курсора в отдельный запрос.
  • Во избежание введения в соблазн пользователей использовать в своём коде какие-то данные из курсора, лучше не использовать человекочитаемую строку в значении курсора. JSON, пропущенный через base64-кодирование (и криптоподписанный) подходит идеально.

Пример. Изменение алгоритма после сбоя.

Любая система гарантированной доставки сообщений из точки А в точку B в случае факапов будет накапливать пул недоставленных сообщений. После восстановления работоспособности будет период времени, когда приёмник данных сильно отстаёт от источника.

В случае, если порядок доставки сообщений возможно нарушать, то обработчик запроса с курсором может (продетектировав значительное отставание) начать возвращать два потока данных: тот, на который подписан клиент, и тот же, но с более актуальными данными.

То есть, limit делим, например, пополам. Половину limit’а заполняем данными из обычного курсора. А во второй половине начинаем передавать данные, начиная от id, с небольшим отставанием.

Таким образом, пользователи, запросившие отчёт прямо во время факапа, продолжат его ждать (и дождутся). А пользователи, запросившие отчёт после факапа, получат его с небольшой задержкой.

Пример алгоритма серверной стороны, включающего второй поток в случае сильного отставания, приведён на рисунке.

Пофантазировав, схему можно дополнить не одним, а несколькими фолбеками.

Курсорная репликация

Описанные курсоры можно использовать для репликации данных с сервиса на сервис.

Часто один сервис должен иметь у себя кеш/реплику части данных другого сервиса. При этом требований синхронности к этой реплике нет. Поменялись данные в сервисе A. Они должны максимально быстро поменяться и в сервисе B.

Например, мы хотим реплицировать табицу пользователей с сервиса на сервис.

Для такой репликации можно использовать что-то готовое из инструментария баз данных, а можно сделать небольшой “велосипед”. Предположим, что пользователи хранятся в БД PostgreSQL. Тогда делаем следующее:

  • создаём дополнительный столбик SERIAL/BIGSERIAL в таблице users, назовём его lsn (Last sequence number).
  • модифицируем изменяющие пользователей запросы, чтобы на каждое изменение записи пользователя значение lsn устанавливалось бы из растущей последовательности
  • строим по полю lsn (уникальный) BTREE индекс.

В этом случае обновление записи пользователя будет выглядеть примерно так:

UPDATE
   "users"
SET
   "name" = $1,
   ...
   "lsn" = DEFAULT /* последовательность */
WHERE
   "user_id" = $21

А запрос для работы курсора будет выглядеть как-то так:

SELECT
    *
FROM
    "users"
WHERE
    "lsn" > $1
ORDER BY
    "lsn"
LIMIT
    $2

Каждое обновление пользовательской записи будет перемещать её в конец списка lsn. При этом общий размер отставания никогда не превысит размер таблицы пользователей.

Итоги

  1. Почти во всех случаях, когда применяется активная система уведомлений зависимых сервисов, её можно заменить описанной курсорной подпиской.
  2. При этом проблемы доступности клиентов, настроек, работоспособности TCP-стека останутся у клиентов
  3. Максимально быстрое и простое восстановление после простоя/сбоев. Отсутствие двусмысленностей в кодах ошибок.