Храним состояние чата на стеке
Всё новое - это хорошо забытое старое!
Сейчас многие пишут различных ботов, которые в IM общаются с пользователем и как-то помогают пользователю жить.
Если Вы посмотрите на код многих ботов, то он обычно сводится к одному и тому же паттерну:
- приходит сообщение
- оно передаётся пользовательскому обработчику сообщений (
callback
)
Это в общем-то универсальный способ написания ботов. Он подходит и для чатов с одним человеком и для ботов, подключаемых в группы. С этим способом всё хорошо кроме одного: код даже простых ботов часто бывает довольно запутан.
Давайте попробуем его распутать.
Начну с дисклаймеров:
- То что описано в этой статье подходит для ботов вида
бот <-> один человек
. - Код, который приведён в данной статье - является кодом-скетчем. Написан специально для этой статьи за 15 минут. Так что не судите строго.
- Я применял подобный подход в бизнесе: с балансированием нагрузки. Но, увы, мой продакшен код имеет много инфраструктурных зависимостей и так просто его не опубликовать. Поэтому в статье используется этот скетч. Я коснусь вопросов развития парадигмы (опишу куда и как мы развивали).
Ну а теперь поехали.
В качестве опоры рассмотрим асинхронную библиотеку aiogram, python3.7+. По ссылке есть пример простого echo-бота.
Скопирую его сюда:
"""
This is a echo bot.
It echoes any incoming text messages.
"""
import logging
from aiogram import Bot, Dispatcher, executor, types
API_TOKEN = 'BOT TOKEN HERE'
# Configure logging
logging.basicConfig(level=logging.INFO)
# Initialize bot and dispatcher
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
@dp.message_handler(regexp='(^cat[s]?$|puss)')
async def cats(message: types.Message):
with open('data/cats.jpg', 'rb') as photo:
'''
# Old fashioned way:
await bot.send_photo(
message.chat.id,
photo,
caption='Cats are here ',
reply_to_message_id=message.message_id,
)
'''
await message.reply_photo(photo, caption='Cats are here ')
@dp.message_handler()
async def echo(message: types.Message):
# old style:
# await bot.send_message(message.chat.id, message.text)
await message.answer(message.text)
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)
Видим, что организация бота - традиционная. Каждый раз, когда пользователь нам что-то пишет - вызывается функция-обработчик.
Что плохого в этой парадигме?
То, что функция-обработчик для реализации сложных диалогов должна на каждом своём вызове восстанавливать свой стейт из какого-то хранилища.
Если взглянуть на большинство ботов поддерживающих какой-то бизнес (например приём на работу), то они задают пользователю 1..N вопросов, затем по итогу этих вопросов что-то делают (например сохраняют анкету в БД).
Если бы можно было писать бота в традиционном стиле (а не колбечном), то можно было бы хранить данные пользователя прямо на стеке.
Давайте попробуем это сделать.
Я набросал скетч модуля, подключив который можно использовать с этой библиотекой:
# Файл - chat_dispatcher.py
import asyncio
class ChatDispatcher:
class Timeout(RuntimeError):
def __init__(self, last_message):
self.last_message = last_message
super().__init__('timeout exceeded')
def __init__(self, *,
chatcb,
shardcb = lambda message: message.from_user.id,
inactive_timeout = 15 * 60):
self.chatcb = chatcb
self.shardcb = shardcb
self.inactive_timeout = inactive_timeout
self.chats = {}
async def handle(self, message):
shard = self.shardcb(message)
loop = asyncio.get_event_loop()
if shard not in self.chats:
self.chats[shard] = {
'task': self.create_chat(loop, shard),
'messages': [],
'wait': asyncio.Event(),
'last_message': None,
}
self.chats[shard]['messages'].append(message)
self.chats[shard]['wait'].set()
def create_chat(self, loop, shard):
async def _chat_wrapper():
try:
await self.chatcb(self.get_message(shard))
finally:
del self.chats[shard]
return loop.create_task(_chat_wrapper())
def get_message(self, shard):
async def _get_message(inactive_timeout=self.inactive_timeout):
while True:
if self.chats[shard]['messages']:
last_message = self.chats[shard]['messages'].pop(0)
self.chats[shard]['last_message'] = last_message
return last_message
try:
await asyncio.wait_for(self.chats[shard]['wait'].wait(),
timeout=inactive_timeout)
except asyncio.TimeoutError:
self.chats[shard]['wait'].set()
raise self.Timeout(self.chats[shard]['last_message'])
if not self.chats[shard]['messages']:
self.chats[shard]['wait'].clear()
return _get_message
Небольшие пояснения:
Инстанцируется класс ChatDispatcher
с передачей ему следующих
параметров:
- функции шардинга входящих сообщений (почему названо шардингом - позднее, когда коснёмся больших нагрузок). Функция возвращает уникализированное число указывающее на диалог. В примере - просто возвращает идентификатор пользователя.
- функции которая будет выполнять работу обслуживания чата.
- Значение таймаута по неактивности пользователя.
Описание работы:
- В ответ на первое сообщение пользователя создаётся асинхронная задача, которая будет обслуживать диалог. Эта задача будет работать до тех пор пока диалог не завершится.
- Чтобы получить сообщение пользователя мы запрашиваем его в явной
форме. Пример чата
echo
:
async def chat(get_message):
message = await get_message()
await message.answer(message.text)
- Отвечаем на сообщения так, как предлагает нам библиотека
(
message.answer
).
Давайте попробуем написать бота в этой парадигме
# Файл bot.py
import asyncio
import re
from .chat_dispatcher import ChatDispatcher
import logging
from aiogram import Bot, Dispatcher, executor, types
API_TOKEN ='Сюда впишите токен Вашего бота'
logging.basicConfig(level=logging.INFO)
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
async def chat(get_message):
try:
message = await get_message()
await message.answer('Умею складывать числа, введите первое число')
first = await get_message()
if not re.match('^\d+$', str(first.text)):
await first.answer('это не число, начните сначала: /start')
return
await first.answer('Введите второе число')
second = await get_message()
if not re.match('^\d+$', str(second.text)):
await second.answer('это не число, начните сначала: /start')
return
result = int(first.text) + int(second.text)
await second.answer('Будет %s (/start - сначала)' % result)
except ChatDispatcher.Timeout as te:
await te.last_message.answer('Что-то Вы долго молчите, пойду посплю')
await te.last_message.answer('сначала - /start')
chat_dispatcher = ChatDispatcher(chatcb=chat,
inactive_timeout=20)
@dp.message_handler()
async def message_handle(message: types.Message):
await chat_dispatcher.handle(message)
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)
Написанный пример бота - просто складывает пару чисел и выдаёт результат.
Выглядит результат работы так:
Ну а теперь рассмотрим поближе код. Инстанцирование не должно вызывать вопросы.
Интеграция с нашим скетчем сделана так что в стандартном обработчике мы
вызываем await chat_dispatcher.handle(message)
. А чат мы описали в
функции chat
, повторю сюда его код:
async def chat(get_message):
try:
message = await get_message()
await message.answer('Умею складывать числа, введите первое число')
first = await get_message()
if not re.match('^\d+$', str(first.text)):
await first.answer('это не число, начните сначала: /start')
return
await first.answer('Введите второе число')
second = await get_message()
if not re.match('^\d+$', str(second.text)):
await second.answer('это не число, начните сначала: /start')
return
result = int(first.text) + int(second.text)
await second.answer('Будет %s (/start - сначала)' % result)
except ChatDispatcher.Timeout as te:
await te.last_message.answer('Что-то Вы долго молчите, пойду посплю')
await te.last_message.answer('сначала - /start')
Код обслуживания чата - просто запрашивает один за другим данные у
пользователя. Ответы пользователя просто складируются на стеке
(переменные first
, second
, message
).
Функция get_message
может выбросить исключение, если пользователь
ничего не вводит в течение установленного таймаута (и ей же можно
передать таймаут по месту).
Стейт диалога - прямо связан с номером строки внутри этой функции.
Продвигаясь вниз по коду - мы продвигаемся по схеме диалога. Внести
изменения в ветку диалога - не просто, а очень просто! Таким образом
стейт-машины не нужны. В этой парадигме можно писать очень сложные
диалоги и понимать их код будет значительно проще чем код с
callback
‘ами.
Недостатки
Куда ж без них.
- На каждого активного пользователя приходится одна таск-корутина. В среднем один CPU нормально обслуживает около 1000 пользователей, потом начинаются задержки.
- Рестарт всего демона - приводит к прекращению всех диалогов (и перезапуску их).
- Код [из примера] не приспособлен к масштабированию нагрузки и интернационализации.
Если со второй проблемой понятно что делать: перехватить сигнал останова и сообщить пользователям “у меня тут ЧП, пожар, вернусь немного позднее”. То последняя проблема может вызывать сложности. Давайте рассмотрим её:
Масштабирование нагрузки
Очевидно, нагруженные боты надо пускать на многих бакендах сразу.
Соответственно будет использоваться webHook
режим работы.
Если просто балансировать webHook
между скажем двумя бакендами, то
очевидно нужно как-то обеспечить чтобы один и тот же пользователь
приходил к одной и той же корутине, которая ведёт с ним диалог.
Мы это сделали следующим образом.
- На балансере парсим JSON входящего сообщения (
message
) - Выбираем из него идентификатор пользователя
- По идентификатору вычисляем номер бакенда (== шарда).
Например по алгоритму
user_id % Nshards
. - Перенаправляем запрос шарду.
Идентификатор пользователя - становится ключем шардирования между корутинами диалогов и основой для вычисления шард-номера бакенда в балансере.
Код такого балансера простой - пишется на любом языке за 10 минут. Не буду его приводить.
Заключение
Если писать ботов в этой парадигме, то можно довольно просто переделывать диалоги с одного на другой. При этом что важно - новый программист легко разбирается в коде диалогов, которые кто-то сделал до него.
Почему большинство пишет ботов в колбечной архитектуре - я не знаю.
Раньше писали в такой парадигме. Обслуживание чатов в таком стиле было принято в эпоху IRC и ботов для него. Так что я не претендую на какую-то новизну.
И ещё. Если использовать эту парадигму на языке с оператором goto
, то
это будет как раз красивый пример применения goto
(циклы в диалогах
красиво делаются на goto
). К сожалению это не о Python.