Фактически, дополнение к посту двухнедельной давности, в котором идея механизма взаимодействия между SObjectizer-овскими и не-SObjectizer-овскими частями приложения описывалась под названием msg_stream. За это время механизм сменил несколько названий, называется сейчас mchain, вполне себе работает и, думаю, уже близок к тому, чтобы считать его стабильным и вполне готовым к релизу. Под катом немного информации о том, что получилось.
Так же небольшая просьба к читателям: пара идентификаторов, задействованных в механизме mchain-ов, мне кажется не очень удачной. Но это лучшее, что пока пришло в голову. Если кто-то сможет предложить более благозвучные и понятные варианты, то это будет серьезная помощь проекту.
Итак, mchain -- это такая себе отдельная FIFO-очередь для сообщений, которая не привязана ни к какому из диспетчеров. Т.е. ответственность за выборку и обработку сообщений из mchain-а лежит на программисте. Нужен mchain для того, чтобы один его конец засунуть в SObjectizer-овскую часть приложения дабы SObjectizer-овские агенты могли отсылать в него сообщения как в обычный mbox. А другой конец mchain-а остается снаружи, в не-SObjectizer-овской части приложения и пользователь сам выбирает моменты, когда ему удобно прочитать и обработать содержимое mchain-а.
Сами же mchain-ы делятся на два основных типа -- без ограничений на длину FIFO-очереди и с ограничениями. Если mchain не имеет ограничений на размер, то в него можно запихать столько сообщений, сколько позволяет здравый смысл или объем доступной оперативной памяти. Соответственно, тут сложно говорить о каком-то overload control.
А вот mchain-ы с ограничениями на размер запрещают добавлять в mchain новые сообщения, если mchain уже полностью заполнен. При попытке добавить еще одно сообщение можно притормозить отправителя на некоторое время. Если за это время место в mchain-е не освободилось, то предпринимается одно из указанных при создании mchain-а действий:
- новое сообщение игнорируется;
- из mchain-а выбрасывается самое старое сообщение;
- порождается исключение;
- работа приложения прерывается посредством вызова std::abort().
В общем, базовый набор инструментов для примитивного, но действенного overload control. Так же нужно отметить, что тайм-аут при попытке вставить новое сообщение в полный mchain не обязателен. Если этот тайм-аут не задан, то одно из четырех вышеописанных действий выполняется сразу же, без каких-либо пауз или задержек.
Так же при создании mchain-а с ограничением на размер можно задать, каким образом будет выделяться память под очередь заявок: динамически по мере роста очереди или же вся очередь создается сразу в виде одного буфера фиксированного размера. Т.е. в первом случае мы экономим память ценой быстродействия, во втором, напротив, увеличиваем быстродействие, но за счет расхода памяти.
Как раз тут всплывает первый из не самых удачных идентификаторов. Способ работы с памятью для mchain-а фиксированного размера задается перечислением storage_memory (со значениями dynamic и preallocated). Выглядит это, приблизительно, вот так:
namespace props = so_5::mchain_props;
auto ch = env.create_mchain(
so_5::make_limited_without_waiting_mchain_params(
3,
props::storage_memory::dynamic,
props::overflow_reaction::remove_oldest ) );ИМХО, название storage_memory не точно отражает смысл данного параметра. А вот с другими вариантами что-то туго. Есть мысль на счет memory_consumption, но не уверен.
Выборка и обработка сообщений из mchain-а осуществляется посредством семейства функций receive.
Эти функции извлекают сообщение из mchain-а, ищут для него обработчик в списке предоставленных пользователем и, если обработчик найден, вызывают его. Просто так извлечь экземпляр сообщения из очереди для того, чтобы сохранить его где-то у себя и попробовать что-то сделать с экземпляром по-своему, нельзя. Дело в том, что в mchain агенты могут отсылать не только обычные асинхронные сообщения, но и синхронные запросы (которые в терминологии SO-5 называются service_request-ы). Так вот, обработка service_request-а должна осуществляется по специальным правилам. Внутри функций receive эти правила должным образом соблюдаются. А вот если обработку заявки из mchain-а отдать на волю пользователя, то возможны варианты. Поэтому пока вся обработка происходит внутри вызова receive.
На данный момент есть два варианта receive:
// #1 receive(mchain, timeout, handlers...); // #2 receive(receive_params, handler...); |
Первый вариант, самый простой, берет всего одно сообщение из mchain и пытается его обработать. Если mchain пуст, то ждет в течении timeout-а.
Второй вариант, более сложный, использует список ограничений и обрабатывает все сообщения из mchain до тех пор, пока не сработает какое-то из ограничений. Например:
// Обработать 3 сообщения. // Если в mchain 3-х сообщений нет, то ждать пока они не появятся. // Выход из mchain произойдет либо после обработки 3-х сообщений, // либо если mchain будет закрыт явным образом. receive( from(chain).handle_n( 3 ), handlers... ); // Обработать 3 сообщения, но ждать поступления сообщений не более 200ms. // Т.е. если mchain оказался пустым и в него ничего не поступило // в течении 200ms, то произойдет возврат из receive даже если еще // не было обработано 3 сообщения. receive( from(chain).handle_n( 3 ).empty_timeout( milliseconds(200) ), handlers... ); // Обработать все сообщения, которые поступают в mchain. // Прервать обработку, если пауза между поступающими сообщениями // превысит 500ms. receive( from(chain).empty_timeout( milliseconds(500) ), handlers... ); // Обрабатывать все сообщения в течении 2s. // Выход из receive произойдет либо по истечении 2s (не важно, сколько // именно сообщений будет обработано за это время), либо если mchain // будет закрыт явным образом. receive( from(chain).total_time( seconds(2) ), handlers... ); // То же, что и в предыдущем случае, но выход так же происходит после // извлечения из очереди 1000 заявок. receive( from(chain).extract_n( 1000 ).total_time( seconds(2) ), handlers... ); |
Следует отметить, что количество извлеченных из mchain заявок и количество обработанных сообщений не обязательно должны совпадать. Сначала из mchain извлекается заявка, потом для нее в списке обработчиков ищется обработчик. Если обработчик найден, то заявка обрабатывается (т.е. обрабатывается сообщение или service_request, для которого была создана заявка). Если же обработчик не найден (например, пользователю не интересны сообщения какого-то типа в данный момент), то заявка выбрасывается. При этом счетчик извлеченных заявок внутри receive инкрементируется для каждой заявки, а вот счетчик обработанных -- только для тех заявок, для которых обработчики были найдены.
Механизма selective receive, как в Erlang-е, нет. Т.е. если для заявки не был найден обработчик, то заявка выбрасывается, обратно в очередь, дабы быть обработанной другим receive, она уже не возвращается.
Функции receive возвращают значение типа mchain_receive_result, из которого можно получить количество извлеченных и обработанных заявок, а так же статус mchain-а (был ли он пуст или же вообще закрыт).
Список обработчиков для receive задается в виде списка лямбда-функций:
so_5::send< a_supervisor::ask_status >( req_mbox ); auto r = receive( from( chain ).empty_timeout( milliseconds(200) ), []( a_supervisor::status_idle ) { cout << "status: IDLE" << endl; }, []( a_supervisor::status_in_progress ) { cout << "status: IN PROGRESS" << endl; }, []( a_supervisor::status_finished v ) { cout << "status: finished in " << v.m_ms << "ms" << endl; } ); if( !r.handled() ) cout << "--- no response from supervisor ---" << endl; |
Типы аргументов для лямбда-функций-обработчиков как раз и определяют, какие типы сообщений будут обрабатываться. Т.е. если внутри receive будет звлечено сообщение, отличное от a_supervisor::status_idle, a_supervisor::status_in_progress и a_supervisor::status_finished, то оно будет выброшено без обработки.
Как раз со вторым, более продвинутым вариантом receive связан еще один не самый удачный идентификатор: empty_timeout. Функция с таким именем сейчас задает время ожидания на пустом mchain. Мне кажется, что это не сильно понятно. Но на что заменить? Может на no_msg_timeout?
Ну и еще несколько слов про другие свойства mchain, полезные и не очень :)
Для программиста mchain-ы могут выглядеть практически полностью как mbox-ы: с ними работают send-ы, send_delayed-ы, send_periodic-и, request_future и request_value. У mchain-а есть метод as_mbox(), который возвращает самый обычный mbox, что позволяет скрыть от агентов вообще сам факт того, что агенту подсовывается mchain, а не mbox. Единственная вещь, которую не позволяют делать mchain-ы -- это создавать подписки. Т.е. агент, который получил mchain в качестве параметра, не сможет на него подписаться.
На данный момент mchain позиционируется как Multi-Producer/Single-Consumer механизм. Хотя, в принципе, на receive из одного mchain-а могут повиснуть сразу несколько нитей и никаких катастрофических последствий не будет. Разве что заснув на пустом mchain-е нити не будут должны образом просыпаться по мере наполнения mchain-а, в этом случае возможны некоторые фокусы. В принципе, mchain изначально позиционировался как MPSC-контейнер. Но, если практика использования покажет, что mchain должен быть MPMC-контейнером, то это можно будет сделать. Так что тут будущее полностью зависит от пользователей и от реальных сценариев использования.
К mchain-у можно прицепить свой собственный нотификатор, который будет вызван SObjectizer-ом когда в пустой mchain падает заявка и mchain перестает быть пустым. Эта фича была добавлена с прицелом на упрощение интеграции SObjectizer-а в GUI-приложения. Какой-то GUI-вый Widget может хотеть общаться с SObjectizer-овским агентом, но как это сделать на практике? Widget может отправлять агенту сообщения на mbox агента. А агент будет отвечать сообщениями в mchain Widget-а. А Widget должен как-то узнавать, что mchain не пуст и можно вызвать receive. Для этого с mchain-ом можно связать свой нотификатор, который пихнет Widget-у обычное GUI-овое сообщение, в обработчике этого сообщения Widget и сделает вызов receive для mchain-а. ИМХО, такой способ интеграции GUI-вых и SObjectizer-овских частей приложения намного проще и универсальнее, чем разработка специальных SObjectizer-овских диспетчеров под конкретный GUI-тулкит и наследование Widget-а сразу от двух классов (скажем, от QWidget и agent_t). Так что здесь открываются интересные перспективы, которые нужно поисследовать.
Хоть mchain-ы и привязываются к SObjectizer Environment-у, но, вообще говоря, mchain-ы вполне могут использоваться в качестве эдаких thread-safe message queues для организации взаимодействия между нитями в многопоточных приложений. Т.е. если кто-то хочет заставить свои нити обмениваться асинхронными сообщениями или же синхронными service_request-ами, но не хочет писать SObjectizer-овских агентов, то из SObjectizer-а можно будет взять только mchain-ы и реализовать весь обмен на mchain-ах, не занимаясь написанием очередей сообщений самостоятельно. Например, вот такой тривиальнейший многопоточный ping-pong ;)
void do_check_handle_n( const so_5::mchain & ch1, const so_5::mchain & ch2 ) { std::thread child{ [&] { receive( from( ch1 ).handle_n( 3 ), [&ch2]( int i ) { so_5::send< int >( ch2, i ); } ); } }; so_5::send< int >( ch1, 0 ); auto r = receive( from( ch2 ).handle_n( 2 ), [&ch1]( int i ) { so_5::send< int >( ch1, i + 1 ); } ); child.join(); } |
Вот, как бы, и все основные моменты. Реализация mchain-ов, в принципе, готова. Осталось местами подшлифовать напильником примеры и документацию. Ну и если возникнут еще какие-нибудь мысли/идеи или же будут получены предложения/замечания, то есть время/возможность все это воплотить в жизнь. В граните пока еще ничего не отлито :)
После фиксации функциональности mchain-ов еще будет большой кусок работы по подготовке к переходу на более удобную для большинства C++ников нотацию (например, выбрасывание суффикса _t из имен типов). Новые имена типов будут сделаны уже сейчас, а старые останутся в качестве typedef-ов для совместимости. А уже в версии 5.6.0 останутся только новые имена.
В связи с этим всем релиз версии 5.5.13 можно ожидать к середине или к концу следующей недели.
Комментариев нет:
Отправить комментарий