Вчера более-менее четко оформилась мысль, бродившая в голове уже несколько недель. Попытался ее зафиксировать под катом. На мой взгляд, msg_stream может не только упростить интеграцию частей приложения, написанных на SObjectizer и без него. Но и оказать влияние на разработку SObjectizer-овских агентов.
После добавления в SO-5.5.9 такой штуки, как wrapped_env, стало проще объединять в одном приложении части, написанные на агентах, с частями, написанными без использования SObjectizer. Грубо говоря, теперь не нужно выдумывать велосипеды для того, чтобы сделать что-то вроде:
int main() { // Запускаем написанную на агентах часть приложения // в отдельном потоке... so_5::wrapped_env_t env{ &sobjectizer_part_init }; // ...а на основном потоке производим диалог с пользователем. while( true ) { std::string choice; std::cout << "Please enter action: ..." << std::endl; std::cin >> choice; if( choice == "exit" ) break; else if( choice == ... ) // И так далее... } } |
Когда приложение разбито на SObjectizer-часть и не-SObjectizer-часть, то возникает вопрос: как эти части взаимодействуют друг с другом?
Отсылка команд в SObjectizer-часть выполняется элементарно: задействуются обычные mbox-ы (почтовые ящики) и обычные функции для отсылки сообщений:
int main() { // Запускаем написанную на агентах часть приложения // в отдельном потоке... so_5::wrapped_env_t env{ &sobjectizer_part_init }; // Этот mbox будет использоваться для отсылки команд // в SObjectizer-часть приложения. auto control_mbox = env.environment().create_local_mbox( "control" ); // ...а на основном потоке производим диалог с пользователем. while( true ) { std::string choice; std::cout << "Please enter action: ..." << std::endl; std::cin >> choice; if( choice == "exit" ) break; else if( choice == "start-calc" ) // Инициируем вычисления в SObjectizer-части приложения // посредством отсылки соответствующего сообщения на // управляющий mbox. so_5::send< start_calculation >( control_mbox ); else if( choice == ... ) // И так далее... } } |
Однако, вопрос передачи информации в обратную сторону (т.е. из SObjectizer-части в не-SObjectizer-часть) пока не решен. Агенты оперируют асинхронными сообщениями, но в сторону не-SObjectizer-части отослать асинхронное сообщение они не могут. Просто некуда отсылать такое сообщение!
Собственно, давно появилась смутная идея о том, чтобы предоставить некий аналог mbox-а, куда агенты могут отсылать сообщения, а доставать отосланные сообщения из такого mbox-а будут не-агенты. И описываемое ниже понятие msg_stream как раз является результатом развития этой смутной идеи.
Итак, msg_stream -- это специальный вариант почтового ящика с собственной очередью сообщений. С одной стороны msg_stream выглядит как обычный почтовый ящик, в него можно отсылать сообщения как и в любой другой mbox.
Однако, отосланные в msg_stream сообщения не диспетчируются автоматически и не отдаются на обработку подписчикам. Вместо этого сообщения сохраняются в собственной внутренней очереди msg_stream-а и извлекаются оттуда посредством специальных функций.
Выглядеть это может следующим образом:
int main() { // Запускаем написанную на агентах часть приложения // в отдельном потоке... so_5::wrapped_env_t env{ &sobjectizer_part_init }; // Этот mbox будет использоваться для отсылки команд // в SObjectizer-часть приложения. auto control_mbox = env.environment().create_local_mbox( "control" ); // Этот msg_stream будет использоваться для отсылки ответов // из SObjectizer-части приложения. auto result_stream = env.environment().create_msg_stream( ... /* параметры */ ); // ...а на основном потоке производим диалог с пользователем. while( true ) { std::string choice; std::cout << "Please enter action: ..." << std::endl; std::cin >> choice; if( choice == "exit" ) break; else if( choice == "start-calc" ) // Инициируем вычисления в SObjectizer-части приложения // посредством отсылки соответствующего сообщения на // управляющий mbox. // И сразу сообщаем, куда нужно отсылать ответ. so_5::send< start_calculation >( control_mbox, result_stream ); else if( choice == "get-calc-result" ) { // Пытаемся прочитать ответ. so_5::receive( result_stream, // Обработчик, который будет обрабатывать сообщение из потока. so_5::handler( []( const calculation_result & r ) { std::cout << "calculation result: " << r << std::endl; } ), // Обработчик для другого типа сообщения. so_5::handler( []( const calculation_failure & f ) { std::cout << "calculation failure: " << f << std::endl; } ), // И еще один обработчик, уже для сигнала. so_5::handler< pending >( [] { std::cout << "calculation is still pending, try later..." << std::endl; } ) ); } else if( choice == ... ) // И так далее... } } |
Вот, в двух словах, вся идея вокруг msg_stream-а. Далее начинаются скучные детали :)
Скорее всего для msg_stream-а не будет отдельного типа. Метод create_msg_stream будет возвращать уже привычный mbox_t (т.е. умную ссылку на mbox). Что позволит использовать msg_stream таким же образом, как и другие типы почтовых ящиков (в том числе и для отсылки отложенных и периодических сообщений). Главное отличие -- это невозможность подписки на msg_stream. Т.е. если какой-то агент попытается подписаться на сообщения из msg_stream, то возникнет ошибка (будет брошено исключение).
Поскольку сообщения, отсылаемые в msg_stream, не диспетчируются, а укладываются в отдельную очередь сообщений, то владельцу msg_stream нужно будет периодически вызывать функцию so_5::receive, для того, чтобы извлекать и обрабатывать находящиеся в msg_stream сообщения.
Возможно, имеет смысл завести семейство функций so_5::receive:
// Самый простой вариант: извлечь и обработать всего одно сообщение. // Если сообщения нет, уснуть на неограниченное время. so_5::receive( msg_stream, handler_list... ); // Более сложный вариант: извлечь и обработать всего одно сообщение, // если поток пустой, то ждать не более указанного времени. so_5::receive_wait_for( msg_stream, timeout, handler_list... ); // Извлечь и обработать не менее N сообщений. // Если сообщений недостаточно, ждать их поступления неограниченное время. so_5::receive_n( msg_stream, quantity, handler_list... ); // Извлечь и обработать не менее N сообщений. // Если сообщений недостаточно, то ждать их поступления не более указанного времени. so_5::receive_n_wait_for( msg_stream, quantity, timeout, handler_list... ); |
Или же всего одну функцию, но с возможностью передачи ей дополнительных параметров:
// Самый простой вариант: извлечь и обработать всего одно сообщение. // Если сообщения нет, уснуть на неограниченное время. so_5::receive( msg_stream, handler_list... ); // Более сложный вариант: извлечь и обработать всего одно сообщение, // если поток пустой, то ждать не более указанного времени. so_5::receive( msg_stream, so_5::wait_for(timeout), handler_list... ); // Извлечь и обработать не менее N сообщений. // Если сообщений недостаточно, ждать их поступления неограниченное время. so_5::receive( msg_stream, so_5::at_least(quantity), handler_list... ); // Извлечь и обработать не менее N сообщений. // Если сообщений недостаточно, то ждать их поступления не более указанного времени. so_5::receive( msg_stream, so_5::at_least(quantity), so_5::wait_for(timeout), handler_list... ); |
Но самое интересное, что можно сделать с msg_stream-ами -- это определять их параметры при создании.
Так, можно указать, должна ли очередь сообщений в msg_stream быть неограниченной или ограниченной по длине. А если она ограниченная по длине, то что нужно делать, при попытке запихнуть еще одно сообщение в уже полную очередь:
- прервать работу приложения (вызвать std::abort);
- породить исключение;
- проигнорировать новое сообщение;
- вытолкнуть из очереди самое старое сообщение;
- приостановить операцию send на msg_stream на какое-то время. Если за это время в msg_stream освободится место, то новое сообщение будет помещено в msg_stream. Если нет, то:
- либо прервать работу приложения (вызвать std::abort);
- либо породить исклюение;
- либо проигнорировать новое сообщение.
Т.о. получается возможность иметь над msg_stream практически полноценный overload control, да еще и с обратной связью (т.е. отправитель сообщения может быть приостановлен на send-е, если получатель не успевает разгребать свою входящую очередь).
А это уже делает использование msg_stream-ов интересным не только для не-SObjectizer-частей приложения, но и для самих SObjectizer-овских агентов. Возникает соблазн позволить нескольким агентам распределять нагрузку друг на друга через msg_stream-ы... Однако, тут еще много белых пятен. Поэтому, полагаю, нужно пойти привычным путем: сделать самую первую и простую версию msg_stream и опробовать ее. Когда появится некоторый опыт, тогда уже и думать о том, в какую сторону двигаться дальше.
Вроде как контуры решения с msg_stream выглядят более-менее понятно. Полагаю, что сразу после релиза версии 5.5.12 с минорными обновлениями можно будет приступить к работе над 5.5.13 с msg_stream-ами.
Комментариев нет:
Отправить комментарий