вторник, 17 ноября 2015 г.

[prog.c++11] Есть мысль добавить понятие msg_stream в SObjectizer-5.5.

Вчера более-менее четко оформилась мысль, бродившая в голове уже несколько недель. Попытался ее зафиксировать под катом. На мой взгляд, msg_stream может не только упростить интеграцию частей приложения, написанных на SObjectizer и без него. Но и оказать влияние на разработку SObjectizer-овских агентов.

После добавления в SO-5.5.9 такой штуки, как wrapped_env, стало проще объединять в одном приложении части, написанные на агентах, с частями, написанными без использования SObjectizer. Грубо говоря, теперь не нужно выдумывать велосипеды для того, чтобы сделать что-то вроде:

int main()
{
   // Запускаем написанную на агентах часть приложения
   // в отдельном потоке...
   so_5::wrapped_env_t env{ &sobjectizer_part_init };

   // ...а на основном потоке производим диалог с пользователем.
   whiletrue )
   {
      std::string choice;
      std::cout << "Please enter action: ..." << std::endl;
      std::cin >> choice;
      if( choice == "exit" ) break;
      else if( choice == ... ) // И так далее...
   }
}

Когда приложение разбито на SObjectizer-часть и не-SObjectizer-часть, то возникает вопрос: как эти части взаимодействуют друг с другом?

Отсылка команд в SObjectizer-часть выполняется элементарно: задействуются обычные mbox-ы (почтовые ящики) и обычные функции для отсылки сообщений:

int main()
{
   // Запускаем написанную на агентах часть приложения
   // в отдельном потоке...
   so_5::wrapped_env_t env{ &sobjectizer_part_init };
   // Этот mbox будет использоваться для отсылки команд
   // в SObjectizer-часть приложения.
   auto control_mbox = env.environment().create_local_mbox( "control" );

   // ...а на основном потоке производим диалог с пользователем.
   whiletrue )
   {
      std::string choice;
      std::cout << "Please enter action: ..." << std::endl;
      std::cin >> choice;
      if( choice == "exit" ) break;
      else if( choice == "start-calc" )
         // Инициируем вычисления в SObjectizer-части приложения
         // посредством отсылки соответствующего сообщения на
         // управляющий mbox.
         so_5::send< start_calculation >( control_mbox );
      else if( choice == ... ) // И так далее...
   }
}

Однако, вопрос передачи информации в обратную сторону (т.е. из SObjectizer-части в не-SObjectizer-часть) пока не решен. Агенты оперируют асинхронными сообщениями, но в сторону не-SObjectizer-части отослать асинхронное сообщение они не могут. Просто некуда отсылать такое сообщение!

Собственно, давно появилась смутная идея о том, чтобы предоставить некий аналог mbox-а, куда агенты могут отсылать сообщения, а доставать отосланные сообщения из такого mbox-а будут не-агенты. И описываемое ниже понятие msg_stream как раз является результатом развития этой смутной идеи.

Итак, msg_stream -- это специальный вариант почтового ящика с собственной очередью сообщений. С одной стороны msg_stream выглядит как обычный почтовый ящик, в него можно отсылать сообщения как и в любой другой mbox.

Однако, отосланные в msg_stream сообщения не диспетчируются автоматически и не отдаются на обработку подписчикам. Вместо этого сообщения сохраняются в собственной внутренней очереди msg_stream-а и извлекаются оттуда посредством специальных функций.

Выглядеть это может следующим образом:

int main()
{
   // Запускаем написанную на агентах часть приложения
   // в отдельном потоке...
   so_5::wrapped_env_t env{ &sobjectizer_part_init };
   // Этот mbox будет использоваться для отсылки команд
   // в SObjectizer-часть приложения.
   auto control_mbox = env.environment().create_local_mbox( "control" );
   // Этот msg_stream будет использоваться для отсылки ответов
   // из SObjectizer-части приложения.
   auto result_stream = env.environment().create_msg_stream( ... /* параметры */ );

   // ...а на основном потоке производим диалог с пользователем.
   whiletrue )
   {
      std::string choice;
      std::cout << "Please enter action: ..." << std::endl;
      std::cin >> choice;
      if( choice == "exit" ) break;
      else if( choice == "start-calc" )
         // Инициируем вычисления в SObjectizer-части приложения
         // посредством отсылки соответствующего сообщения на
         // управляющий mbox.
         // И сразу сообщаем, куда нужно отсылать ответ.
         so_5::send< start_calculation >( control_mbox, result_stream );
      else if( choice == "get-calc-result" )
      {
         // Пытаемся прочитать ответ.
         so_5::receive( result_stream,
            // Обработчик, который будет обрабатывать сообщение из потока.
            so_5::handler( []( const calculation_result & r ) {
                  std::cout << "calculation result: " << r << std::endl;
               } ),
            // Обработчик для другого типа сообщения.
            so_5::handler( []( const calculation_failure & f ) {
                  std::cout << "calculation failure: " << f << std::endl;
               } ),
            // И еще один обработчик, уже для сигнала.
            so_5::handler< pending >( [] {
                  std::cout << "calculation is still pending, try later..." << std::endl;
               } ) );
      }
      else if( choice == ... ) // И так далее...
   }
}

Вот, в двух словах, вся идея вокруг msg_stream-а. Далее начинаются скучные детали :)

Скорее всего для msg_stream-а не будет отдельного типа. Метод create_msg_stream будет возвращать уже привычный mbox_t (т.е. умную ссылку на mbox). Что позволит использовать msg_stream таким же образом, как и другие типы почтовых ящиков (в том числе и для отсылки отложенных и периодических сообщений). Главное отличие -- это невозможность подписки на msg_stream. Т.е. если какой-то агент попытается подписаться на сообщения из msg_stream, то возникнет ошибка (будет брошено исключение).

Поскольку сообщения, отсылаемые в msg_stream, не диспетчируются, а укладываются в отдельную очередь сообщений, то владельцу msg_stream нужно будет периодически вызывать функцию so_5::receive, для того, чтобы извлекать и обрабатывать находящиеся в msg_stream сообщения.

Возможно, имеет смысл завести семейство функций so_5::receive:

// Самый простой вариант: извлечь и обработать всего одно сообщение.
// Если сообщения нет, уснуть на неограниченное время.
so_5::receive( msg_stream, handler_list... );

// Более сложный вариант: извлечь и обработать всего одно сообщение,
// если поток пустой, то ждать не более указанного времени.
so_5::receive_wait_for( msg_stream, timeout, handler_list... );

// Извлечь и обработать не менее N сообщений.
// Если сообщений недостаточно, ждать их поступления неограниченное время.
so_5::receive_n( msg_stream, quantity, handler_list... );

// Извлечь и обработать не менее N сообщений.
// Если сообщений недостаточно, то ждать их поступления не более указанного времени.
so_5::receive_n_wait_for( msg_stream, quantity, timeout, handler_list... );

Или же всего одну функцию, но с возможностью передачи ей дополнительных параметров:

// Самый простой вариант: извлечь и обработать всего одно сообщение.
// Если сообщения нет, уснуть на неограниченное время.
so_5::receive( msg_stream, handler_list... );

// Более сложный вариант: извлечь и обработать всего одно сообщение,
// если поток пустой, то ждать не более указанного времени.
so_5::receive( msg_stream, so_5::wait_for(timeout), handler_list... );

// Извлечь и обработать не менее N сообщений.
// Если сообщений недостаточно, ждать их поступления неограниченное время.
so_5::receive( msg_stream, so_5::at_least(quantity), handler_list... );

// Извлечь и обработать не менее N сообщений.
// Если сообщений недостаточно, то ждать их поступления не более указанного времени.
so_5::receive( msg_stream, so_5::at_least(quantity), so_5::wait_for(timeout), handler_list... );

Но самое интересное, что можно сделать с msg_stream-ами -- это определять их параметры при создании.

Так, можно указать, должна ли очередь сообщений в msg_stream быть неограниченной или ограниченной по длине. А если она ограниченная по длине, то что нужно делать, при попытке запихнуть еще одно сообщение в уже полную очередь:

  • прервать работу приложения (вызвать std::abort);
  • породить исключение;
  • проигнорировать новое сообщение;
  • вытолкнуть из очереди самое старое сообщение;
  • приостановить операцию send на msg_stream на какое-то время. Если за это время в msg_stream освободится место, то новое сообщение будет помещено в msg_stream. Если нет, то:
    • либо прервать работу приложения (вызвать std::abort);
    • либо породить исклюение;
    • либо проигнорировать новое сообщение.

Т.о. получается возможность иметь над msg_stream практически полноценный overload control, да еще и с обратной связью (т.е. отправитель сообщения может быть приостановлен на send-е, если получатель не успевает разгребать свою входящую очередь).

А это уже делает использование msg_stream-ов интересным не только для не-SObjectizer-частей приложения, но и для самих SObjectizer-овских агентов. Возникает соблазн позволить нескольким агентам распределять нагрузку друг на друга через msg_stream-ы... Однако, тут еще много белых пятен. Поэтому, полагаю, нужно пойти привычным путем: сделать самую первую и простую версию msg_stream и опробовать ее. Когда появится некоторый опыт, тогда уже и думать о том, в какую сторону двигаться дальше.

Вроде как контуры решения с msg_stream выглядят более-менее понятно. Полагаю, что сразу после релиза версии 5.5.12 с минорными обновлениями можно будет приступить к работе над 5.5.13 с msg_stream-ами.

Отправить комментарий