пятница, 19 декабря 2014 г.

[prog.thoughts] Message-passing: чем примитивнее, тем лучше?

Нижеследующий текст является чем-то вроде рефлексии вокруг механизмов взаимодействия между агентами в SObjectizer. Но, полагаю, изложенный материал будет интересен более широкому кругу читателей, которые разрабатывают собственные инструменты на основе message-passing-а или же используют подобные инструменты в своей работе.

В качестве примера, который будет использоваться на протяжении всей заметки, взят сценарий с двумя агентами a_receiver и a_processor. Агент a_receiver накапливает запросы, которые приходят откуда-то извне. Агент a_processor занимается обработкой этих запросов. Такое разделение обязанностей позволяет защитить обработку запросов от всплесков нагрузки -- a_receiver отбрасывает "лишние" запросы, что позволяет удерживать нагрузку на обработчика на некотором стабильном уровне, даже если временами его производительность "проседает". Чуть подробнее эта идея рассматривается в одной из предшествующих заметок.

Итак, первоначально в SObjectizer агенты могли взаимодействовать только посредством обмена асинхронными сообщениями. Это означало, что если a_processor хочет запросить у a_receiver список накопленных запросов, то a_processor должен отослать сообщение a_receiver и дождаться ответного сообщения от a_receiver-а:

struct msg_take_requests : public so_5::rt::signal_t {};

struct msg_process_requests : public so_5::rt::message_t { ... };

class a_receiver_t : public so_5::rt::agent_t
{
...
   // Обработка просьбы отдать все накопленные запросы для обработки.
   void evt_take_requests()
   {
      // Отсылаем список запросов в виде сообщения.
      so_5::send< msg_process_requests >( m_processor_mbox,
            std::move( m_requests ) );
   }
...
   // Список накопленных запросов, ожидающих обработки.
   std::vector< application_request > m_requests;
};

class a_processor_t : public so_5::rt::agent_t
{
...
   // Очередной такт работы начинается с получения списка запросов.
   void evt_process_requests( const msg_process_requests & evt )
   {
      ... // Обработка полученных запросов.

      // Запрос следующей порции.
      so_5::send< msg_take_requests >( m_receiver_mbox );
   }
};

Схема работы очень тривиальная. Но иногда приводящая к реализации в агентах кучи мелких методов-событий, получающих от кого-то ответ и отсылающих кому-то следующее сообщение. Тем самым размазывая простую линейную логику по разрозненным мелким методам.

Предположим, что a_processor работает на потактовой основе. Каждый такт, скажем, 250ms. В начале такта у a_receiver-а запрашиваются накопленные запросы. Потом они обрабатываются. Если все это заняло меньше 250ms, то a_processor засыпает до начала следующего такта. Если же следующий так уже должен был начаться (не уложились в длительность текущего такта), то сразу же начинаем следующий такт. В коде это может выглядеть следующим образом:

class a_processor_t : public so_5::rt::agent_t
{
...
   // Начало очередного такта.
   void evt_next_turn()
   {
      // Эта метка потребуется для определения времени работы
      // по текущему такту.
      m_turn_started_at = std::chrono::steady_clock::now();
      
      // Запрашиваем очередную порцию данных.
      so_5::send< msg_take_requests >( m_receiver_mbox );
   }

   // Получены подлежащие обработке запросы.
   void evt_process_requests( const msg_process_requests & evt )
   {
      // Обработка запросов.
      ...
      // Определяем, когда должен начаться следующий такт.
      auto turn_duration_left = calc_time_to_next_turn();
      if( !turn_duration_left )
         // Начинаем следующий такт сразу же.
         so_5::send_to_agent< msg_next_turn >( *this );
      else
         // До начала такта можно поспать.
         so_5::send_delayed_to_agent< msg_next_turn >( *this, turn_duration_left );
   }
};

В принципе, с этим вполне можно жить. Настолько вполне, что не смотря на периодически всплывавшие разговоры о добавлении синхронности в SObjectizer, более 10 лет в SObjectizer-5 синхронности не было и это не сказывалось на качестве и скорости разработки прикладного софта.

Тем не менее, для повышения привлекательности SObjectizer в него была добавлена возможность выполнения синхронных запросов к агентам. Агент a_processor может отослать синхронный запрос и тут же подождать результата его выполнения. Хотя сам запрос придет к a_receiver-у в виде обычного асинхронного сообщения и a_receiver даже не будет знать, что его кто-то ждет. Главное отличие для a_receiver будет в том, что ему уже не нужно отсылать ответное сообщение, а достаточно вернуть результат в качестве возвращаемого значения.

С применением синхронного взаимодействия приведенный выше пример примет вот такой вид:

class a_receiver_t : public so_5::rt::agent_t
{
...
   // Обработка просьбы отдать все накопленные запросы для обработки.
   std::vector< application_request > evt_take_requests()
   {
      // Возвращаем накопленные запросы.
      std::vector< application_request > tmp;
      m_requests.swap( tmp );

      return tmp;
   }
...
   // Список накопленных запросов, ожидающих обработки.
   std::vector< application_request > m_requests;
};

class a_processor_t : public so_5::rt::agent_t
{
...
   // Начало очередного такта.
   void evt_next_turn()
   {
      // Эта метка потребуется для определения времени работы
      // по текущему такту.
      m_turn_started_at = std::chrono::steady_clock::now();
      
      // Запрашиваем очередную порцию данных.
      auto requests = m_receiver_mbox
            ->get_one< std::vector< application_request > >()
            .wait_forever()
            .sync_get< msg_take_requests >();

      // Обработка запросов.
      ...
      // Определяем, когда должен начаться следующий такт.
      auto turn_duration_left = calc_time_to_next_turn();
      if( !turn_duration_left )
         // Начинаем следующий такт сразу же.
         so_5::send_to_agent< msg_next_turn >( *this );
      else
         // До начала такта можно поспать.
         so_5::send_delayed_to_agent< msg_next_turn >( *this, turn_duration_left );
   }
};

Т.е. в реализации стало меньше сущностей (нет сообщения msg_process_requests, нет разделения событий a_processor-а на evt_next_turn и evt_process_requests), логика работы a_receiver-а и a_processor-а становится более линейной. Что, в принципе, хорошо.

Лично мне в реализации синхронных запросов в SObjectizer больше всего нравится то, что исполнитель запросов (т.е. a_receiver в данном примере) результат своей операции просто возвращает как возвращаемое значение метода-события. На мой взгляд, это действительно делает работу с агентами проще. Во-первых, за счет того, что не нужно вводить никаких новых сообщений. Во-вторых, не нужно думать, кому именно отсылать ответное сообщение. Так, если посмотреть на самый первый пример кода, то можно увидеть, что a_receiver знает адрес агента a_processor:

// Обработка просьбы отдать все накопленные запросы для обработки.
void evt_take_requests()
{
   // Отсылаем список запросов в виде сообщения.
   so_5::send< msg_process_requests >( m_processor_mbox,
         std::move( m_requests ) );
}

В простом случае, когда один a_receiver жестко связан с одним a_processor, это нормально. Но что, если a_receiver один, а a_processor-ов несколько? И каждый a_processor шлет свои запросы a_receiver-у и ожидат ответное сообщение?

В случае с ответным асинхронным сообщением пришлось бы придумывать механизм передачи адреса a_processor-а в исходном запросе. Т.е. что-то вроде:

struct msg_take_requests : public so_5::rt::message_t
{
   // Адрес, на который следует отсылать ответ.
   const so_5::rt::mbox_t m_reply_to;
   ...
};

struct msg_process_requests : public so_5::rt::message_t { ... };

class a_receiver_t : public so_5::rt::agent_t
{
...
   // Обработка просьбы отдать все накопленные запросы для обработки.
   void evt_take_requests( const msg_take_requests & evt )
   {
      // Отсылаем список запросов в виде сообщения.
      so_5::send< msg_process_requests >( evt.m_reply_to,
            std::move( m_requests ) );
   }
};

А вот при синхронном запросе к a_receiver ничего подобного выдумывать не нужно. Получив msg_take_requests агент a_receiver просто возвращает вектор запросов, а уже сам SObjectizer отдает этот вектор именно тому a_processor-у, которые инициировал запрос.

Такая внезапная удобность синхронных запросов заставила задуматься о том, что было бы полезно воспользоваться ей и в других сценариях, когда синхронный запрос сделать нельзя. Ведь чтобы синхронный запрос отработал требуется, чтобы инициатор и исполнитель запроса работали на разных рабочих нитях. А это не всегда возможно. Например, в ситуации, когда SObjectizer будет приспособлен для исключительно однопоточных приложений. Тогда вообще все агенты будут работать на одной, главной нити приложения.

В этих случаях может быть востребована огранизация цепочек сообщений. Т.е. агент a_processor отсылает запрос, агент a_receiver его обрабатывает, а результат обработки автоматически конвертируется SObjectizer-ом в ответное сообщение для a_processor-а.

Нормальных имен для элементов данного механизма пока еще не придумано, в текущем приближении это могло бы выглядеть следующим образом:

class a_receiver_t : public so_5::rt::agent_t
{
...
   // Обработка просьбы отдать все накопленные запросы для обработки.
   std::vector< application_request > evt_take_requests()
   {
      // Возвращаем накопленные запросы.
      std::vector< application_request > tmp;
      m_requests.swap( tmp );

      return tmp;
   }
...
   // Список накопленных запросов, ожидающих обработки.
   std::vector< application_request > m_requests;
};

class a_processor_t : public so_5::rt::agent_t
{
...
   // Начало очередного такта.
   void evt_next_turn()
   {
      // Эта метка потребуется для определения времени работы
      // по текущему такту.
      m_turn_started_at = std::chrono::steady_clock::now();
      
      // Запрашиваем очередную порцию данных.
      so_5::ask< msg_take_requests, std::vector< application_request > >(
            m_receiver_mbox );
   }

   // Получены подлежащие обработке запросы.
   void evt_process_requests(
      const so_5::rt::answer< std::vector< application_request > > & evt )
   {
      // Обработка запросов.
      ...
      // Определяем, когда должен начаться следующий такт.
      auto turn_duration_left = calc_time_to_next_turn();
      if( !turn_duration_left )
         // Начинаем следующий такт сразу же.
         so_5::send_to_agent< msg_next_turn >( *this );
      else
         // До начала такта можно поспать.
         so_5::send_delayed_to_agent< msg_next_turn >( *this, turn_duration_left );
   }
};

Т.е. здесь a_receiver-у все равно, дергают ли его evt_take_requests из-за синхроного запроса, из-за обычного асинхронного сообщения или же это часть цепочки сообщений. Он просто возвращает то, что накопил, и больше ни о чем не думает.

Если данная идея с организацией цепочки сообщений будет реализована, то она позволит решить еще одну частную проблемку, которая изредка, но возникает при прикладной разработке. А именно: необходимость узнать, что отправленное какому-то агенту сообщение дошло до получателя и было им воспринято.

Допустим, есть агент, который парсит log-файл и выбирает оттуда данные для сохранения в БД. Но сам он в БД не ходит. Вместо этого собранные данные отсылаются агенту-коммитеру, который пишет их в БД. В принципе, агент-парсер должен дождаться момента фиксации очередной порции, и только после этого сможет продолжать свою работу. Но как узнать, что сообщение до агента-коммитера дошло и было успешно обработано?

На сегодняшний день было всего два механизма: ответное асинхронное сообщение и выполнение синхронного запроса. В первом случае нужно писать больше кода. Во втором случае нужно, во-первых, чтобы агенты работали на разных контекстах. И, во-вторых, нужно, чтобы агент-парсер при выполнении запроса к коммитеру никому больше не мешал. Хотя, на самом-то деле, на этом же контексте могут работать и другие агенты. Работа которых будет приостановлена пока агент-парсер ждет ответа от агента-коммитера.

На мой вгляд, цепочки сообщений в этом случае будут удобнее: нет необходимости в новом сообщении и рабочий контекст агент-парсер не блокирует.

class a_log_parser_t : public so_5::rt::agent_t
{
...
   void evt_parse()
   {
      ... // Подготовка очередной порции данных.
      // Отсылка данных коммитеру с получением подтверждения
      // о том, что сообщение дошло.
      so_5::ask< msg_commit_data, void >( m_commiter, ... );
   }

   void evt_commit_data_result( const so_5::rt::answer< void > & evt )
   {
      if( evt.processed() )
         // Сообщение было получено и обработано агентом.
         ...
      else
         ... // Обработка неудачной доставки/обработки сообщения.
   }
};

С одной стороны, все, что было описано выше на счет синхронного взаимодействия агентов и нового механизма организации цепочек сообщений, мне нравится. И, надеюсь, что механизм цепочки сообщений будет реализован в следующей версии SObjectizer. Но есть и другая сторона.

Чтобы перейти к обсуждению другой стороны медали, нужно провести хорошую аналогию. Телефонный звонок против электронного письма.

Менеджеры очень любят телефонные звонки и не любят писать/читать письма. Инженеры-разработчики, напротив, не любят звонки и предпочитают почту. Почему так? Потому, что во время телефонного звонка нужно давать ответы сразу, времени подумать практически нет. Это как раз то, что нравится менеджерам ("результат здесь и сейчас"), но не нравится разработчикам ("откуда я тебе готовые ответы возьму, здесь подумать нужно, в коде покопаться, с коллегами обсудить"). Фокус в том, что нужно и то, и другое. И вопрос заключается в удачной пропорции и уместного употребления этих способов общения в зависимости от ситуации.

Но вот в SObjectizer-5 сейчас синхронные запросы и упомянутый механизм цепочки сообщений -- это как телефонный звонок: агент, к которому поступил запрос, должен дать ответ здесь и сейчас. Агент-исполнитель не может сохранить запрос и вернуться к нему чуть позже. Как и не может делегировать выполнение запроса кому-то еще. И это печально.

Ведь одна из интересных сторон агентного программирования -- это то, что интерфейсы взаимодействия между агентами определены на уровне асинхронных сообщений. Т.е. известно, какое сообщение нужно отослать и что прилетит в ответ. Но никаких деталей реализации нет. Это сильно отличается от привычных разработчику синхронных API-шных интерфейсов.

С синхронным API все привычно: чтобы вызвать метод, нужно иметь ссылку на реализующий интерфейс объект. На время вызова метода работа прерывается до тех пор, пока управление из метода не вернется. А вот с обменом сообщениями картина другая. Сообщение ушло и все. Куда оно ушло, что стоит за его обработкой -- не понятно. Пока ответ не получен, можно заниматься какими-то своими делами.

Это дает возможность написать проксирующего агента, который получает исходное сообщение, определяет, кто из подчиненных агентов лучше всего с сообщением справится, и пересылает сообщение дальше. А там может быть еще один проксирующий агент и т.д. и т.п.

В случае с асинхронными сообщениями оказывается, что вообще все агенты -- будь то прокси или реальные исполнители -- работают по одним и тем же законам: получил сообщение, отослал сообщение. Не может отослать ответ, сохранил исходный объект и вернулся к нему чуть позже.

А вот когда появляются дополнительные механизмы, вроде синхронных запросов и цепочек сообщений, ситуация перестает быть настолько простой. Проксирующий агент должен понимать, что сейчас к нему пришел синхронный запрос. И делегирование этого запроса должно выполняться вот таким вот способом. А сейчас к нему пришла цепочка сообщений и он должен работать по другому алгоритму...

В общем получается, что чем более удобные для некоторых ситуаций инструменты предлагаешь разработчику, тем больше сложностей оказывается с этими инструментами в других ситуациях. Пытаешься их разрешить, оглядываешься назад, во времена, когда кроме примитивного асинхронного обмена сообщениями ничего не было, и понимаешь, что гибкости-то там было гораздо больше. Пусть даже за эту гибкость приходилось платить чуть большим объемом прикладного кода.

Комментариев нет: