понедельник, 21 июля 2014 г.

[prog.c++] Предварительная информация о новом thread_pool-диспетчере в SObjectizer

В рамках разработки SObjectizer 5.4.0 как-то буднично произошло совсем не рядовое событие: полку штатных диспетчеров, доступных пользователю "искаропки" прибыло :) Появился thread_pool-диспетчер, который запускает привязанных к нему агентов на пуле рабочих потоков. До сих пор пользователям было доступно всего три диспетчера, появившихся в SO-4 еще в 2002-2003-м годах, а именно:

  • самый простой диспетчер с одной рабочей нитью (one_thread-диспетчер). Все привязанные к нему агенты работают на одной общей рабочей нити. Если какой-то из агентов "подвиснет", то подвиснут и все остальные агенты на этой рабочей нити;
  • диспетчер с активными объектами (active_obj-диспетчер). Каждому привязанном к диспетчеру агенту выделяется своя собственная рабочая нить. Если агент зависает, то зависает только его нить, на других агентов это не влияет;
  • диспетчер с активными группами (active_group-диспетчер). Рабочая нить выделяется группе агентов. Соответственно, если один агент из группы подвиснет, то подвиснут и остальные члены этой группы. Но другие агенты, принадлежащие другим группам (как и принадлежащие другим диспетчерам), смогут продолжать свою работу на своих рабочих контекстах.

Теперь к этому набору добавился еще один диспетчер. Он создает пул рабочих нитей и заявки привязанных к диспетчеру агентов динамически распределяются между этими нитями. Если какой-то из агентов подвиснет, то это заблокирует одну нить пула. Но работа остальных рабочих нитей будет продолжена.

Релиз SO-5.4.0, по оптимистичным оценкам, состоится не раньше второй половины августа, поэтому "в мир" новый thread_pool-диспетчер выйдет еще не скоро. Но я попробую рассказать о нем подробнее уже сейчас, дабы получить возможность обсудить сделанное и, при необходимости и появлении более удачных идей/предложений, иметь время на переработку. Для заинтересовавшихся рассказ о thread_pool-диспетчере под катом.

В SObjectizer режим работы агента отдан на откуп диспетчеру, к которому агент привязывается при регистрации. Далее диспетчер определяет на каком контексте агент будет работать, когда и как события агентов будут запускаться, будет ли меняться рабочий контекст агента (т.е. будет ли агент перемещаться с одной рабочей нити на другую или нет), будет ли агент обрабатывать свои события по одному или же несколько событий могут быть параллельно запущены на обработку на нескольких рабочих нитях. Ну и т.д.

Все четыре имеющихся на данный момент штатных диспетчера (т.е. one_thread, active_obj, active_group, thread_pool) обеспечивают агентам очень важное свойство: обработчики событий агентов гарантированно запускаются только по одному и только на контексте одной нити. Т.е. если агент работает на одном из таких диспетчеров и обменивается с другими агентами информацией только посредством сообщений, то ему не нужно беспокоится о синхронизации доступа к собственным данным: это не нужно, т.к. агент всегда работает только на одной нити.

Основная разница между thread_pool-диспетчером и тремя остальными диспетчерами заключается в том, что у агента на thread_pool-диспетчере рабочая нить может время от времени меняться. Т.е. сообщение X агент может обработать на нити t1, а следующее сообщение Y -- уже на нити t2. Т.е. в зависимости от загруженности рабочих нитей thread_pool-диспетчера агент может "мигрировать" с одной рабочей нити на другую. Эта миграция производится без ведома агента. При этом гарантируется, что пока агент не закончит обрабатывать сообщение X, его никуда не переместят и обработчик сообщения Y не будет запущен.

Диспетчеры one_thread, active_obj и active_group жестко привязывают агент к одной рабочей нити, мигрировать на какую-то другую нить на этих диспетчерах агент в принципе не может. Иногда это важно. Например, когда агент подключается и работает с какими-то внешними ресурсами (устройствами, БД и т.д.) посредством сторонних библиотек, которые требуют, чтобы подключение, работа и отключение от устройства происходили на одной и той же нити (ЕМНИП, некоторые ODBC-драйвера отличались такой особенностью). Поэтому, если для агента важна стабильность его рабочего контекста, то такой агент на thread_pool-диспетчере лучше не запускать.

На данный момент thread_pool-диспетчер реализован очень просто. При старте он создает столько рабочих нитей, сколько ему было указано пользователем (если пользователь отказался задавать конкретное значение, то создается std::thread::hardware_concurrency() нитей). Все нити создаются сразу. Если для них будет недостаточно работы, то часть из них будет "спать" в ожидании потока заявок. Т.е. thread_pool не занимается динамическим увеличением или уменьшением пула рабочих потоков.

Диспетчер использует систему из двух типов очередей. Первый тип очереди -- это agent_queue, очередь для заявок на обработку событий агентов. Каждому агенту, когда он привязывается к thread_pool-диспетчеру, назначается экземпляр agent_queue. Заявки агента сначала сохраняются в его agent_queue, а когда наступает время обработки заявок агента, они последовательно извлекаются из agent_queue и обрабатываются агентом. Очередей типа agent_queue может быть очень много, в пределе по одной очереди на каждого привязанного к диспетчеру агента.

Второй тип очередей -- это dispatcher_queue. Это очередь из agent_queue. Очередь типа dispatcher_queue всего одна на весь диспетчер. В dispatcher_queue хранятся указатели только на не пустые (активные) agent_queue.

Принцип работы у thread_pool-диспетчера получается следующий. Когда агенту отсылается сообщение, оно укладывается в agent_queue этого агента. Если оказывается, что до этого очередь агента была пустой, то указатель на ставшую активной очередь помещается в конец dispatcher_queue. После чего рано или поздно до этой очереди доберется какая-то из рабочих нитей диспетчера и приступит к обработке хранящихся в agent_queue сообщений.

Т.е. рабочие нити диспетчера висят на dispatcher_queue и ждут, пока в dispatcher_queue попадет указатель на активную agent_queue. Как только активная agent_queue появляется, её захватывает какая-то рабочая нить, а остальные рабочие нити остаются ждать, пока в dispatcher_queue не упадет еще что-то.

Когда рабочая нить завершает обработку agent_queue, она вновь обращается к dispatcher_queue. Если dispatcher_queue не пуста, то рабочая нить забирает первый элемент из dispatcher_queue и приступает к его обработке. Если же dispatcher_queue пуста, то рабочая нить засыпает до появление чего-нибудь в dispatcher_queue.

Как только что было сказано, в dispatcher_queue хранятся не экземпляры сообщений для обработки, а указатели на agent_queue. А уже в agent_queue хранятся сообщения. Причем к моменту, когда какая-то рабочая нить возьмет из dispatcher_queue указатель на очередную активную agent_queue может оказаться, что в agent_queue лежит не одно, а несколько сообщений. Что в этом случае будет делать рабочая нить?

Когда рабочая нить берет на обработку не пустую agent_queue, она последовательно обрабатывает N заявок из очереди. Значение N задается в параметрах привязки агента к диспетчеру. Т.е. для какого-то агента можно задать N=1, а для какого-то N=1000. По умолчанию сейчас N задается равным 4 (параметр max_demands_at_once).

Т.о., если рабочая нить получает agent_queue в которой N или более заявок, то N заявок будут обработаны последовательно, без перерывов и промежуточных обращений к dispatcher_queue. Если после обработки N заявок обнаруживается, что agent_queue все еще не пуста, то указатель на эту agent_queue помещается в конец dispatcher_queue. А сама рабочая нить заново обращается к dispatcher_queue за новой активной agent_queue из головы dispatcher_queue. Такой режим работы позволяет более-менее равномерно обрабатывать события всех привязанных к диспетчеру агентов. А то ведь может быть какой-то слишком загруженный агент, к котором сообщения сыпятся постоянно и, если не ограничивать N, то рабочая нить может быть полностью занята только сообщениями этого агента. Кроме того, установка N большего 1 благоприятно влияет на производительность диспетчера (подробнее на эту тему ниже).

Однако поведение, когда для агента последовательно обрабатывается N заявок, вне зависимости от того, когда они появились по отношению к заявкам других агентов, может привести к неожиданным для разработчика эффектам. Допустим, кто-то подряд отсылает сообщения X, Y и снова X. На X подписаны агенты a1 и a2, на Y -- только a2. Пользователь может ожидать, что при deliver_message(X1), deliver_message(Y1), deliver_message(X2), последовательность запуска обработчиков будет такой: a1.X1, a2.X1, a2.Y1, a1.X2, a2.X2. Однако, если параметр N больше 1, то последовательность вполне может быть вот такой: a1.X1, a2.X1, a2.Y1, a2.X2, a1.X2. Т.е. второе сообщение X сначала обработает a2, а не a1.

В принципе, ничего особенного в этом нет, вполне нормальный сценарий для обработки сообщений несколькими независимыми агентами, когда сообщения отсылает кто-то еще. Более того, если a1 и a2 выглядят для thread_pool-диспетчера независимыми, то они могут работать параллельно друг другу на разных рабочих нитях и тогда последовательность обработчиков может быть вообще чуть ли не произвольная. Например, a2.X1, a2.Y1, a1.X1, a1.X2, a2.X2.

Однако, агенты a1 и a2 вполне могут быть зависимыми друг от друга. Например, они могут разделять между собой общие данные и тогда нужно как-то решать вопрос синхронизации доступа к этим данным.

Сделать это можно следующим образом:

  • во-первых, агенты a1 и a2 должны быть членами одной кооперации агентов. Здесь ничего нового, это обычная практика при использовании SObjctizer: агенты, разделяющие общие данные, объявляются членами одной кооперации;
  • во-вторых, при привязке агентов к thread_pool-диспетчеру нужно указать, что агенты используют режим cooperation FIFO.

Когда диспетчер обнаруживает, что агент использует режим cooperation FIFO, диспетчер создает одну agent_queue на всю кооперацию и назначает эту общую agent_queue всем агентам кооперации (точнее тем агентам, которые требуют режим cooperation FIFO). Т.о. если cooperation FIFO задан и для a1, и для a2, то оба эти агента будут иметь общую agent_queue. Что означает, что события этих агентов будут выполняться только последовательно и только на одной нити. Параллельной обработки не будет, не получится такого, что a1 работает на нити t1, тогда как a2 -- на нити t2. При этом параметр N работает не для агентов, а для agent_queue (т.е. для кооперации). Это означает, что для a1 и a2 последовательность запуска обработчиков будет более предсказуемой. Т.е. могут быть вариации между a1.X1, a2.X1,... и a2.X1, a1.X1,..., но не может быть так, что a2 обработает X1, Y1 и X2 еще до того, как a1 получит X1.

По-умолчанию thread_pool-диспетчер использует режим cooperation FIFO для всех агентов. Это позволяет обезопасить агентов одной кооперации, т.е. если агенты зависимы друг от друга и разделяют общие данные, но пользователь об этом забыл, то ничего страшного не произойдет: агенты одной кооперации будут работать как единая активная группа на диспетчере active_group (хотя периодически могут менять свой контекст из-за миграции между нитями).

Так же, для отдельных агентов кооперации или же для всех агентов кооперации можно назначить режим individual FIFO. В этом случае у каждого агента будет своя agent_queue и каждый агент будет работать сам по себе. Если между агентами были какие-то зависимости по общим разделяемым данным или по последовательности реакции на внешние воздействия, но пользователь об этом забыл и задал individual FIFO, то тут SObjectizer уже ничем помочь не сможет.

Теперь можно перейти к самому грустному :) На данный момент для dispatcher_queue используется очень простая реализация на основе std::mutex и std::condition_variable. Поэтому, если к диспетчеру будет привязано много агентов с режимом individual FIFO и маленьким значением N (max_demands_at_once), то накладные расходы на работу с dispatcher_queue вырастают в разы, а производительность в разы падает. Хотя, если N большой (например, 32 и выше), и агенты довольно плотно получают сообщения, то производительность thread_pool-диспетчера вполне на уровне других штатных диспетчеров. Но тут явно есть непаханное поле для улучшений.

Начать с самой простой реализации было решено из-за того, что более ценно сейчас просто появление стабильно и корректно работающего нового диспетчера. После чего в процессе его эксплуатации будет накапливаться информация, на основании которой можно будет судить, нужен ли он вообще. И, если нужен, то насколько удобен. А если в чем-то неудобен, то как это исправить. И т.д. В общем, сначала нужно ввязаться драку, а там уже будем посмотреть :)

Возможно, в процессе подготовки SO-5.4.0 появятся более интересные и удачные способы организации работы thread_pool-диспетчера. И, в зависимости от их трудоемкости, что-то может быть воплощено в жизнь. Или же отложено до 5.4.1 и последующих версий. Поскольку работы и так много. В частности, следующая штука по плану -- это первый диспетчер для поддержки "многопоточных" агентов, т.е. агентов, которые способны параллельно обрабатывать свои события на нескольких нитях одновременно.

Отправить комментарий