Всё новое - это хорошо забытое старое!

Сейчас многие пишут различных ботов, которые в IM общаются с пользователем и как-то помогают пользователю жить.

Если Вы посмотрите на код многих ботов, то он обычно сводится к одному и тому же паттерну:

  • приходит сообщение
  • оно передаётся пользовательскому обработчику сообщений (callback)

Это в общем-то универсальный способ написания ботов. Он подходит и для чатов с одним человеком и для ботов, подключаемых в группы. С этим способом всё хорошо кроме одного: код даже простых ботов часто бывает довольно запутан.

Давайте попробуем его распутать.

Начну с дисклаймеров:

  1. То что описано в этой статье подходит для ботов вида бот <-> один человек.
  2. Код, который приведён в данной статье - является кодом-скетчем. Написан специально для этой статьи за 15 минут. Так что не судите строго.
  3. Я применял подобный подход в бизнесе: с балансированием нагрузки. Но, увы, мой продакшен код имеет много инфраструктурных зависимостей и так просто его не опубликовать. Поэтому в статье используется этот скетч. Я коснусь вопросов развития парадигмы (опишу куда и как мы развивали).

Ну а теперь поехали.

В качестве опоры рассмотрим асинхронную библиотеку aiogram, python3.7+. По ссылке есть пример простого echo-бота.

Скопирую его сюда:

"""
This is a echo bot.
It echoes any incoming text messages.
"""

import logging

from aiogram import Bot, Dispatcher, executor, types

API_TOKEN = 'BOT TOKEN HERE'

# Configure logging
logging.basicConfig(level=logging.INFO)

# Initialize bot and dispatcher
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)



@dp.message_handler(regexp='(^cat[s]?$|puss)')
async def cats(message: types.Message):
    with open('data/cats.jpg', 'rb') as photo:
        '''
        # Old fashioned way:
        await bot.send_photo(
            message.chat.id,
            photo,
            caption='Cats are here ',
            reply_to_message_id=message.message_id,
        )
        '''

        await message.reply_photo(photo, caption='Cats are here ')


@dp.message_handler()
async def echo(message: types.Message):
    # old style:
    # await bot.send_message(message.chat.id, message.text)

    await message.answer(message.text)


if __name__ == '__main__':
    executor.start_polling(dp, skip_updates=True)

Видим, что организация бота - традиционная. Каждый раз, когда пользователь нам что-то пишет - вызывается функция-обработчик.

Что плохого в этой парадигме?

То, что функция-обработчик для реализации сложных диалогов должна на каждом своём вызове восстанавливать свой стейт из какого-то хранилища.

Если взглянуть на большинство ботов поддерживающих какой-то бизнес (например приём на работу), то они задают пользователю 1..N вопросов, затем по итогу этих вопросов что-то делают (например сохраняют анкету в БД).

Если бы можно было писать бота в традиционном стиле (а не колбечном), то можно было бы хранить данные пользователя прямо на стеке.

Давайте попробуем это сделать.

Я набросал скетч модуля, подключив который можно использовать с этой библиотекой:

# Файл - chat_dispatcher.py
import asyncio

class ChatDispatcher:
    class Timeout(RuntimeError):
        def __init__(self, last_message):
            self.last_message = last_message
            super().__init__('timeout exceeded')

    def __init__(self, *,
                 chatcb,
                 shardcb = lambda message: message.from_user.id,
                 inactive_timeout = 15 * 60):
        self.chatcb = chatcb
        self.shardcb = shardcb
        self.inactive_timeout = inactive_timeout
        self.chats = {}

    async def handle(self, message):
        shard = self.shardcb(message)

        loop = asyncio.get_event_loop()

        if shard not in self.chats:
            self.chats[shard] = {
                'task': self.create_chat(loop, shard),
                'messages': [],
                'wait': asyncio.Event(),
                'last_message': None,
            }
        self.chats[shard]['messages'].append(message)
        self.chats[shard]['wait'].set()


    def create_chat(self, loop, shard):
        async def _chat_wrapper():
            try:
                await self.chatcb(self.get_message(shard))
            finally:
                del self.chats[shard]

        return loop.create_task(_chat_wrapper())

    def get_message(self, shard):
        async def _get_message(inactive_timeout=self.inactive_timeout):
            while True:
                if self.chats[shard]['messages']:
                    last_message = self.chats[shard]['messages'].pop(0)
                    self.chats[shard]['last_message'] = last_message
                    return last_message

                try:
                    await asyncio.wait_for(self.chats[shard]['wait'].wait(),
                                           timeout=inactive_timeout)
                except asyncio.TimeoutError:
                    self.chats[shard]['wait'].set()
                    raise self.Timeout(self.chats[shard]['last_message'])

                if not self.chats[shard]['messages']:
                    self.chats[shard]['wait'].clear()
        return _get_message

Небольшие пояснения:

Инстанцируется класс ChatDispatcher с передачей ему следующих параметров:

  1. функции шардинга входящих сообщений (почему названо шардингом - позднее, когда коснёмся больших нагрузок). Функция возвращает уникализированное число указывающее на диалог. В примере - просто возвращает идентификатор пользователя.
  2. функции которая будет выполнять работу обслуживания чата.
  3. Значение таймаута по неактивности пользователя.

Описание работы:

  1. В ответ на первое сообщение пользователя создаётся асинхронная задача, которая будет обслуживать диалог. Эта задача будет работать до тех пор пока диалог не завершится.
  2. Чтобы получить сообщение пользователя мы запрашиваем его в явной форме. Пример чата echo: python async def chat(get_message): message = await get_message() await message.answer(message.text)
  3. Отвечаем на сообщения так, как предлагает нам библиотека (message.answer).

Давайте попробуем написать бота в этой парадигме

# Файл bot.py

import asyncio
import re
from .chat_dispatcher import ChatDispatcher
import logging
from aiogram import Bot, Dispatcher, executor, types

API_TOKEN ='Сюда впишите токен Вашего бота'

logging.basicConfig(level=logging.INFO)
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)


async def chat(get_message):
    try:
        message = await get_message()
        await message.answer('Умею складывать числа, введите первое число')

        first = await get_message()
        if not re.match('^\d+$', str(first.text)):
            await first.answer('это не число, начните сначала: /start')
            return

        await first.answer('Введите второе число')
        second = await get_message()

        if not re.match('^\d+$', str(second.text)):
            await second.answer('это не число, начните сначала: /start')
            return

        result = int(first.text) + int(second.text)
        await second.answer('Будет %s (/start - сначала)' % result)

    except ChatDispatcher.Timeout as te:
        await te.last_message.answer('Что-то Вы долго молчите, пойду посплю')
        await te.last_message.answer('сначала - /start')


chat_dispatcher = ChatDispatcher(chatcb=chat,
                                 inactive_timeout=20)

@dp.message_handler()
async def message_handle(message: types.Message):
    await chat_dispatcher.handle(message)


if __name__ == '__main__':
    executor.start_polling(dp, skip_updates=True)

Написанный пример бота - просто складывает пару чисел и выдаёт результат.

Выглядит результат работы так:

Ну а теперь рассмотрим поближе код. Инстанцирование не должно вызывать вопросы.

Интеграция с нашим скетчем сделана так что в стандартном обработчике мы вызываем await chat_dispatcher.handle(message). А чат мы описали в функции chat, повторю сюда его код:

async def chat(get_message):
    try:
        message = await get_message()
        await message.answer('Умею складывать числа, введите первое число')

        first = await get_message()
        if not re.match('^\d+$', str(first.text)):
            await first.answer('это не число, начните сначала: /start')
            return

        await first.answer('Введите второе число')
        second = await get_message()

        if not re.match('^\d+$', str(second.text)):
            await second.answer('это не число, начните сначала: /start')
            return

        result = int(first.text) + int(second.text)
        await second.answer('Будет %s (/start - сначала)' % result)

    except ChatDispatcher.Timeout as te:
        await te.last_message.answer('Что-то Вы долго молчите, пойду посплю')
        await te.last_message.answer('сначала - /start')

Код обслуживания чата - просто запрашивает один за другим данные у пользователя. Ответы пользователя просто складируются на стеке (переменные first, second, message).

Функция get_message может выбросить исключение, если пользователь ничего не вводит в течение установленного таймаута (и ей же можно передать таймаут по месту).

Стейт диалога - прямо связан с номером строки внутри этой функции. Продвигаясь вниз по коду - мы продвигаемся по схеме диалога. Внести изменения в ветку диалога - не просто, а очень просто! Таким образом стейт-машины не нужны. В этой парадигме можно писать очень сложные диалоги и понимать их код будет значительно проще чем код с callback'ами.

Недостатки

Куда ж без них.

  1. На каждого активного пользователя приходится одна таск-корутина. В среднем один CPU нормально обслуживает около 1000 пользователей, потом начинаются задержки.
  2. Рестарт всего демона - приводит к прекращению всех диалогов (и перезапуску их).
  3. Код [из примера] не приспособлен к масштабированию нагрузки и интернационализации.

Если со второй проблемой понятно что делать: перехватить сигнал останова и сообщить пользователям "у меня тут ЧП, пожар, вернусь немного позднее". То последняя проблема может вызывать сложности. Давайте рассмотрим её:

Масштабирование нагрузки

Очевидно, нагруженные боты надо пускать на многих бакендах сразу. Соответственно будет использоваться webHook режим работы.

Если просто балансировать webHook между скажем двумя бакендами, то очевидно нужно как-то обеспечить чтобы один и тот же пользователь приходил к одной и той же корутине, которая ведёт с ним диалог.

Мы это сделали следующим образом.

  1. На балансере парсим JSON входящего сообщения (message)
  2. Выбираем из него идентификатор пользователя
  3. По идентификатору вычисляем номер бакенда (== шарда). Например по алгоритму user_id % Nshards.
  4. Перенаправляем запрос шарду.

Идентификатор пользователя - становится ключем шардирования между корутинами диалогов и основой для вычисления шард-номера бакенда в балансере.

Код такого балансера простой - пишется на любом языке за 10 минут. Не буду его приводить.

Заключение

Если писать ботов в этой парадигме, то можно довольно просто переделывать диалоги с одного на другой. При этом что важно - новый программист легко разбирается в коде диалогов, которые кто-то сделал до него.

Почему большинство пишет ботов в колбечной архитектуре - я не знаю.

Раньше писали в такой парадигме. Обслуживание чатов в таком стиле было принято в эпоху IRC и ботов для него. Так что я не претендую на какую-то новизну.

И ещё. Если использовать эту парадигму на языке с оператором goto, то это будет как раз красивый пример применения goto (циклы в диалогах красиво делаются на goto). К сожалению это не о Python.