среда, 2 декабря 2015 г.

[prog.c++11] В SObjectizer задышал механизм mchain-ов (aka msg_stream)

Фактически, дополнение к посту двухнедельной давности, в котором идея механизма взаимодействия между SObjectizer-овскими и не-SObjectizer-овскими частями приложения описывалась под названием msg_stream. За это время механизм сменил несколько названий, называется сейчас mchain, вполне себе работает и, думаю, уже близок к тому, чтобы считать его стабильным и вполне готовым к релизу. Под катом немного информации о том, что получилось.

Так же небольшая просьба к читателям: пара идентификаторов, задействованных в механизме mchain-ов, мне кажется не очень удачной. Но это лучшее, что пока пришло в голову. Если кто-то сможет предложить более благозвучные и понятные варианты, то это будет серьезная помощь проекту.

Итак, mchain -- это такая себе отдельная FIFO-очередь для сообщений, которая не привязана ни к какому из диспетчеров. Т.е. ответственность за выборку и обработку сообщений из mchain-а лежит на программисте. Нужен mchain для того, чтобы один его конец засунуть в SObjectizer-овскую часть приложения дабы SObjectizer-овские агенты могли отсылать в него сообщения как в обычный mbox. А другой конец mchain-а остается снаружи, в не-SObjectizer-овской части приложения и пользователь сам выбирает моменты, когда ему удобно прочитать и обработать содержимое mchain-а.

Сами же mchain-ы делятся на два основных типа -- без ограничений на длину FIFO-очереди и с ограничениями. Если mchain не имеет ограничений на размер, то в него можно запихать столько сообщений, сколько позволяет здравый смысл или объем доступной оперативной памяти. Соответственно, тут сложно говорить о каком-то overload control.

А вот mchain-ы с ограничениями на размер запрещают добавлять в mchain новые сообщения, если mchain уже полностью заполнен. При попытке добавить еще одно сообщение можно притормозить отправителя на некоторое время. Если за это время место в mchain-е не освободилось, то предпринимается одно из указанных при создании mchain-а действий:

  • новое сообщение игнорируется;
  • из mchain-а выбрасывается самое старое сообщение;
  • порождается исключение;
  • работа приложения прерывается посредством вызова std::abort().

В общем, базовый набор инструментов для примитивного, но действенного overload control. Так же нужно отметить, что тайм-аут при попытке вставить новое сообщение в полный mchain не обязателен. Если этот тайм-аут не задан, то одно из четырех вышеописанных действий выполняется сразу же, без каких-либо пауз или задержек.

Так же при создании mchain-а с ограничением на размер можно задать, каким образом будет выделяться память под очередь заявок: динамически по мере роста очереди или же вся очередь создается сразу в виде одного буфера фиксированного размера. Т.е. в первом случае мы экономим память ценой быстродействия, во втором, напротив, увеличиваем быстродействие, но за счет расхода памяти.

Как раз тут всплывает первый из не самых удачных идентификаторов. Способ работы с памятью для mchain-а фиксированного размера задается перечислением storage_memory (со значениями dynamic и preallocated). Выглядит это, приблизительно, вот так:

namespace props = so_5::mchain_props;
auto ch = env.create_mchain(
      so_5::make_limited_without_waiting_mchain_params(
            3,
            props::storage_memory::dynamic,
            props::overflow_reaction::remove_oldest ) );

ИМХО, название storage_memory не точно отражает смысл данного параметра. А вот с другими вариантами что-то туго. Есть мысль на счет memory_consumption, но не уверен.

Выборка и обработка сообщений из mchain-а осуществляется посредством семейства функций receive.

Эти функции извлекают сообщение из mchain-а, ищут для него обработчик в списке предоставленных пользователем и, если обработчик найден, вызывают его. Просто так извлечь экземпляр сообщения из очереди для того, чтобы сохранить его где-то у себя и попробовать что-то сделать с экземпляром по-своему, нельзя. Дело в том, что в mchain агенты могут отсылать не только обычные асинхронные сообщения, но и синхронные запросы (которые в терминологии SO-5 называются service_request-ы). Так вот, обработка service_request-а должна осуществляется по специальным правилам. Внутри функций receive эти правила должным образом соблюдаются. А вот если обработку заявки из mchain-а отдать на волю пользователя, то возможны варианты. Поэтому пока вся обработка происходит внутри вызова receive.

На данный момент есть два варианта receive:

// #1
receive(mchain, timeout, handlers...);
// #2
receive(receive_params, handler...);

Первый вариант, самый простой, берет всего одно сообщение из mchain и пытается его обработать. Если mchain пуст, то ждет в течении timeout-а.

Второй вариант, более сложный, использует список ограничений и обрабатывает все сообщения из mchain до тех пор, пока не сработает какое-то из ограничений. Например:

// Обработать 3 сообщения.
// Если в mchain 3-х сообщений нет, то ждать пока они не появятся.
// Выход из mchain произойдет либо после обработки 3-х сообщений,
// либо если mchain будет закрыт явным образом.
receive( from(chain).handle_n( 3 ),
      handlers... );

// Обработать 3 сообщения, но ждать поступления сообщений не более 200ms.
// Т.е. если mchain оказался пустым и в него ничего не поступило
// в течении 200ms, то произойдет возврат из receive даже если еще
// не было обработано 3 сообщения.
receive( from(chain).handle_n( 3 ).empty_timeout( milliseconds(200) ),
      handlers... );

// Обработать все сообщения, которые поступают в mchain.
// Прервать обработку, если пауза между поступающими сообщениями
// превысит 500ms.
receive( from(chain).empty_timeout( milliseconds(500) ),
      handlers... );

// Обрабатывать все сообщения в течении 2s.
// Выход из receive произойдет либо по истечении 2s (не важно, сколько
// именно сообщений будет обработано за это время), либо если mchain
// будет закрыт явным образом.
receive( from(chain).total_time( seconds(2) ),
      handlers... );

// То же, что и в предыдущем случае, но выход так же происходит после
// извлечения из очереди 1000 заявок.
receive( from(chain).extract_n( 1000 ).total_time( seconds(2) ),
      handlers... );

Следует отметить, что количество извлеченных из mchain заявок и количество обработанных сообщений не обязательно должны совпадать. Сначала из mchain извлекается заявка, потом для нее в списке обработчиков ищется обработчик. Если обработчик найден, то заявка обрабатывается (т.е. обрабатывается сообщение или service_request, для которого была создана заявка). Если же обработчик не найден (например, пользователю не интересны сообщения какого-то типа в данный момент), то заявка выбрасывается. При этом счетчик извлеченных заявок внутри receive инкрементируется для каждой заявки, а вот счетчик обработанных -- только для тех заявок, для которых обработчики были найдены.

Механизма selective receive, как в Erlang-е, нет. Т.е. если для заявки не был найден обработчик, то заявка выбрасывается, обратно в очередь, дабы быть обработанной другим receive, она уже не возвращается.

Функции receive возвращают значение типа mchain_receive_result, из которого можно получить количество извлеченных и обработанных заявок, а так же статус mchain-а (был ли он пуст или же вообще закрыт).

Список обработчиков для receive задается в виде списка лямбда-функций:

so_5::send< a_supervisor::ask_status >( req_mbox );
auto r = receive(
      from( chain ).empty_timeout( milliseconds(200) ),
      []( a_supervisor::status_idle ) {
         cout << "status: IDLE" << endl;
      },
      []( a_supervisor::status_in_progress ) {
         cout << "status: IN PROGRESS" << endl;
      },
      []( a_supervisor::status_finished v ) {
         cout << "status: finished in " << v.m_ms << "ms" << endl;
      } );
if( !r.handled() )
   cout << "--- no response from supervisor ---" << endl;

Типы аргументов для лямбда-функций-обработчиков как раз и определяют, какие типы сообщений будут обрабатываться. Т.е. если внутри receive будет звлечено сообщение, отличное от a_supervisor::status_idle, a_supervisor::status_in_progress и a_supervisor::status_finished, то оно будет выброшено без обработки.

Как раз со вторым, более продвинутым вариантом receive связан еще один не самый удачный идентификатор: empty_timeout. Функция с таким именем сейчас задает время ожидания на пустом mchain. Мне кажется, что это не сильно понятно. Но на что заменить? Может на no_msg_timeout?

Ну и еще несколько слов про другие свойства mchain, полезные и не очень :)

Для программиста mchain-ы могут выглядеть практически полностью как mbox-ы: с ними работают send-ы, send_delayed-ы, send_periodic-и, request_future и request_value. У mchain-а есть метод as_mbox(), который возвращает самый обычный mbox, что позволяет скрыть от агентов вообще сам факт того, что агенту подсовывается mchain, а не mbox. Единственная вещь, которую не позволяют делать mchain-ы -- это создавать подписки. Т.е. агент, который получил mchain в качестве параметра, не сможет на него подписаться.

На данный момент mchain позиционируется как Multi-Producer/Single-Consumer механизм. Хотя, в принципе, на receive из одного mchain-а могут повиснуть сразу несколько нитей и никаких катастрофических последствий не будет. Разве что заснув на пустом mchain-е нити не будут должны образом просыпаться по мере наполнения mchain-а, в этом случае возможны некоторые фокусы. В принципе, mchain изначально позиционировался как MPSC-контейнер. Но, если практика использования покажет, что mchain должен быть MPMC-контейнером, то это можно будет сделать. Так что тут будущее полностью зависит от пользователей и от реальных сценариев использования.

К mchain-у можно прицепить свой собственный нотификатор, который будет вызван SObjectizer-ом когда в пустой mchain падает заявка и mchain перестает быть пустым. Эта фича была добавлена с прицелом на упрощение интеграции SObjectizer-а в GUI-приложения. Какой-то GUI-вый Widget может хотеть общаться с SObjectizer-овским агентом, но как это сделать на практике? Widget может отправлять агенту сообщения на mbox агента. А агент будет отвечать сообщениями в mchain Widget-а. А Widget должен как-то узнавать, что mchain не пуст и можно вызвать receive. Для этого с mchain-ом можно связать свой нотификатор, который пихнет Widget-у обычное GUI-овое сообщение, в обработчике этого сообщения Widget и сделает вызов receive для mchain-а. ИМХО, такой способ интеграции GUI-вых и SObjectizer-овских частей приложения намного проще и универсальнее, чем разработка специальных SObjectizer-овских диспетчеров под конкретный GUI-тулкит и наследование Widget-а сразу от двух классов (скажем, от QWidget и agent_t). Так что здесь открываются интересные перспективы, которые нужно поисследовать.

Хоть mchain-ы и привязываются к SObjectizer Environment-у, но, вообще говоря, mchain-ы вполне могут использоваться в качестве эдаких thread-safe message queues для организации взаимодействия между нитями в многопоточных приложений. Т.е. если кто-то хочет заставить свои нити обмениваться асинхронными сообщениями или же синхронными service_request-ами, но не хочет писать SObjectizer-овских агентов, то из SObjectizer-а можно будет взять только mchain-ы и реализовать весь обмен на mchain-ах, не занимаясь написанием очередей сообщений самостоятельно. Например, вот такой тривиальнейший многопоточный ping-pong ;)

void do_check_handle_n( const so_5::mchain & ch1, const so_5::mchain & ch2 )
{
   std::thread child{ [&] {
      receive( from( ch1 ).handle_n( 3 ), [&ch2]( int i ) {
               so_5::send< int >( ch2, i );
         } );
   } };

   so_5::send< int >( ch1, 0 );
   auto r = receive( from( ch2 ).handle_n( 2 ), [&ch1]( int i ) {
         so_5::send< int >( ch1, i + 1 );
      } );

   child.join();
}

Вот, как бы, и все основные моменты. Реализация mchain-ов, в принципе, готова. Осталось местами подшлифовать напильником примеры и документацию. Ну и если возникнут еще какие-нибудь мысли/идеи или же будут получены предложения/замечания, то есть время/возможность все это воплотить в жизнь. В граните пока еще ничего не отлито :)

После фиксации функциональности mchain-ов еще будет большой кусок работы по подготовке к переходу на более удобную для большинства C++ников нотацию (например, выбрасывание суффикса _t из имен типов). Новые имена типов будут сделаны уже сейчас, а старые останутся в качестве typedef-ов для совместимости. А уже в версии 5.6.0 останутся только новые имена.

В связи с этим всем релиз версии 5.5.13 можно ожидать к середине или к концу следующей недели.

Отправить комментарий