среда, 15 апреля 2015 г.

[prog] Паттерн Async Completion Token

Есть такой простой и удобный, в некоторых случаях, паттерн под названием Async Completion Token. Но так как он удобен только в некоторых случаях, то не всегда вовремя вспоминается. А жаль. Поскольку иногда он позволяет перейти от stateful-модели к stateless.

В общем случае этот паттерн хорош для ситуаций, когда некоторая сущность A получает задачу t и после выполнения цепочки шагов результат обработки t должен дойти до сущности B. Но фокус в том, что обработка t подразумевает асинхронное общение еще с несколькими сущностями C, D, E... Т.е. имеем приблизительно такой сценарий:


    A     B     C     D     E
   ===   ===   ===   ===   ===
t ->|           |     |     |
    |---------->|     |     |
    |<----------|     |     |
    |                 |     |
    |---------------->|     |
    |<----------------|     |
    |                       |
    |---------------------->|
    |<----------------------|
    |
    |--->|

Паттерн ACT позволяет не хранить внутри A информации о том, что происходит с t. Вся необходимая информация передается с самой задачей t. Так, когда A отсылает t сущности C, заявка на обработку t может выглядеть как (payload: t, reply_to: A, act: (next: D, dest: B)) (т.е. обработай t, верни результат A, ACT-ом является значение (next: D, dest: B)). Когда к A приходит результат обработки в виде (result: t', act: (next: D, dest: B)), А расшифровав ACT понимает, что далее t' нужно передавать D и отсылает запрос вида (payload: t', reply_to: A, act: (next: E, dest: B)). В ответ A получит что-то вроде (result: t'', act: (next: E, dest: B)). Последует запрос к E вида (payload: t'', act: (dest: B)). Получив от E ответ вида (result: t''', act: (dest: B)) сущность A просто перешлет t''' адресату B. При этом никакого промежуточного хранения информации внутри A не нужно.

О паттерне Async Completion Token я узнал чуть более 10 лет назад из работы профессора Дугласа Шмидта (одного из разработчиков фреймворка ACE). И даже в свое время написал пространный пост на RSDN, рассказав в нем об одном из случаев использования ACT в продакшене (правда, там сильно много лишних деталей, но так уж у меня получается). Повторюсь, временами очень удобная штука. Но из-за того, что временами, то не всегда вовремя про нее вспоминаешь.

На практике при использовании ACT возникает два основных вопроса:

  1. В каком виде представляется ACT. Пока асинхронно взаимодействующие сущности общаются между собой внутри одного процесса, особых проблем нет. Но вот когда ACT нужно передавать между процессами или даже узлами сети, тут возникают вопросы формата и способа сериализации:
    • во-первых, это вопросы включения ACT-ов друг в друга. Так, когда A отдает запрос C, ACT может иметь вид (next: D, dest: B). Но если C, в свою очередь, нужно передать запрос сущности F, то ACT должен трансформироваться во что-то вроде (dest: A, act: (next: D, dest: B)). Если F в свою очередь перешлет запрос G, то ACT трансформируется во что-то вроде (dest: F, act: (dest: A, act: (next: D, dest: B))) и т.д. Соответственно, при движении в обратном направлении должна происходить корректная распаковка ACT-ов;
    • во-вторых, нужно заботится о версионности. Так, пока запрос от A к C гулял где-то между F и G, компонент A обновили до новой версии, в которой ACT уже имеет другой формат. Получив старое значение ACT вида (next: D, dest: B) обновленный A должен суметь с этим справиться.

    В случае использования текстового представления нужно заботиться о том, чтобы значения внутри ACT-ов не мешали их правильно парсить и упаковывать/распаковывать вложенные друг в друга ACT-ы. В случае двоичной сериализации очень здорово, когда система сериализации позволяет оперировать абстрактными типами данных. Например, есть базовый тип act_t от которого делаются ACT-ы наследники, скажем, A::act_t, C::act_t, F::act_t и т.д. В этом случае создание ACT-а с чужим ACT-ом внутри происходит автоматически. Достаточно, чтобы система сериализации понимала конструкции вида:
    type F::act_t {
      string dest;
      act_t * external_act;
    };
    
    Если система двоичной сериализации такие вещи понимает, упаковка/распаковка чужих ACT-ов -- это одно удовольствие :)
  2. Stateless-работа с ACT -- это, конечно, хорошо. Особенно когда прикладная система способна переживать промежуточные потери сообщений. Например, когда пользователь запрашивает у сервера Web-страничку с новыми анекдотами, а сервер из-за нагрузки или профилактических работ не может ее собрать из отдельных частей. Здесь A, C, D, E -- это компоненты, работающие на стороне Web-сервера, сообщения между которыми могут бесследно потеряться. Если сообщение от A к C потеряется, то ничего страшного не произойдет. Просто пользователь не увидит своей странички, может быть попробует еще раз позже.
    Но что, если речь идет о проведении списания средств при выполнении платежной транзакции? Как в этих случаях обеспечивать возможность рестартов и продолжения выполнения операции? Где-то в цепочке должны быть stateful-компоненты, которые могут взять и сохранить прикладное сообщение в какой-то БД, а затем восстановить его и продолжить обработку. Насколько удобно работать с ACT-ом в БД? В каком виде его хранить? Сколько места отводить под него?

В принципе, это все не такие сложные проблемы, как может показаться с моих слов. Просто они есть и, решая использовать ACT, лучше знать о них заранее :)

Ну и в качестве закрепления материала маленький пример использования ACT-ов для игрушечной задачки md5_bruteforce2 (в частности, ее решении на SObjectizer-е).

Есть менеждер, распоряжающийся несколькими воркерами. Для каждого воркера нужно запросить новую задачу. Но прийти на обработку новая задача может с некоторым опозданием. Вопрос в том, как при поступлении задачи определить воркера, которому эта задача должна быть отправлена.

Первое, что приходит в голову -- это завести у менеджера список свободных воркеров. Воркеры помещаются туда при создании. Затем извлекаются оттуда при поступлении очередной задачи. Ивозвращаются при завершении выполнения ранее полученных задач:

class manager_t : public agent_t
{
private :
  ...
  // Список свободных воркеров, на которых можно распределять
  // полученные задачи.
  list< string > m_free_workers;

  // Поступила новая задача, которую нужно кому-то отдать.
  void evt_next_line_received( const msg_process_line & evt )
  {
    // Берем первого свободного воркера...
    auto worker_id = m_free_workers.front();
    // ...в списке его больше быть не должно...
    m_free_workers.pop_front();
    // ...и отдаем задачу ему...
    ...
  }

  // Поступило уведомление, что очередной воркер стартовал и готов к работе.
  void evt_worker_coop_started( const msg_coop_registered & evt )
  {
    // Сохраняем его в списке свободных воркеров...
    m_free_workers.push_back( evt.m_coop_name );
    // ...и запрашиваем новую задачу.
    request_next_task();
  }

  // Обрабатываем результат выполнения задачи.
  void handle_worker_result(
    const string & worker_id, const string & result )
  {
    ... // Какая-то прикладная обработка...

    // Теперь воркер должен быть возвращен в список свободных воркеров.
    m_free_workers.push_back( worker_id );
    // Т.к. есть свободный воркер, нужно запрашивать очередную задачу.
    request_next_task();
  }
  ...
};

Можно заметить, что новая задача запрашивается только тогда, когда есть свободные воркеры. Но для кого именно задача запрашивается не важно. Когда она придет, будет просто взят первый свободный из списка и все.

При использовании ACT список свободных воркеров не нужен.

Нужно запрашивать новую задачу сразу для конкретного воркера. Т.е. имя свободного воркера передается читателю задач вместе с запросом новой задачи. Когда читатель получает новую задачу, он возвращает ее назад менеджеру вместе с именем воркера.

Имя воркера в этом случае будет выполнять роль того самого токена, который передается между асинхронно взаимодействующими между друг с другом сущностями:

class manager_t : public agent_t
{
private :
  // Списка свободных воркеров больше нет.
  ...

  // Поступила новая задача, которую нужно кому-то отдать.
  void evt_next_line_received( const msg_line_loaded & evt )
  {
    // Имя воркера, которому предназначается задача, приходит
    // вместе с самой задачей.
    const auto & worker_id = get<0>( evt );
    const auto & line = get<1>( evt );
    ...
  }

  // Поступило уведомление, что очередной воркер стартовал и готов к работе.
  void evt_worker_coop_started( const msg_coop_registered & evt )
  {
    // Сразу же запрашиваем новую задачу именно для этого воркера.
    request_next_task( evt.m_coop_name );
  }

  // Обрабатываем результат выполнения задачи.
  void handle_worker_result(
    const string & worker_id, const string & result )
  {
    ... // Какая-то прикладная обработка...

    // Для этого свободного воркера нужно запросить новую задачу.
    request_next_task( worker_id );
  }
...
};

Полный вариант решения md5_bruteforce2 с использованием ACT можно найти здесь.

Отправить комментарий