Одной из задач, которая возлагалась на SObjectizer, была задача упрощения работы с многопоточностью. Имхо, с этой задачей SObjectizer успешно справляется. Под катом небольшой пример того, как это выглядит на практике.
В рамках поиска способов реализации низкоуровневого мониторинга в SObjectizer потребовалось провести небольшой эксперимент. Суть в том, что в приложении может быть куча "источников данных" (т.е. объектов, которые содержат какие-то интересные с точки зрения мониторинга значения вроде количества рабочих нитей и размеры очередей сообщений). Существует некая сущность-distributor, которая с заданным темпом рассылает сообщения с текущими значениями источников данных. В сообщении нужно как-то указать, к какому именно источнику данных относится текущее значение.
У каждого источника данных есть текстовое имя. Но вопрос заключался в том, как это имя передавать в сообщении?
Можно передавать только указатель на имя. Тогда сообщение будет "легким", т.к. оно содержит лишь указатель. Но доступ к имени может получиться дорогим, т.к. само имя будет лежать "далеко" от текущего значения и обработчику сообщения придется "дотягиваться" до имени. Можно передавать в сообщении полное имя. Тогда сообщение будет "тяжелым". Но зато доступ к имени окажется дешевле за счет того, что имя и значение будут лежать "ближе" друг к другу.
В общем, потребовалось написать небольшую многопоточную программу-тест, в которой создавалось бы N источников данных и несколько потребителей сообщений о значениях этих источников данных. Каждый потребитель должен работать на своей рабочей нити. Плюс distributor на отдельной нити должен периодически рассылать сообщения с текущими значениями источников данных.
Ниже приведен исходный код варианта написанного на SObjectizer (с использованием возможностей находящейся сейчас в разработке версии 5.5.4). Там комментариев написано чуть ли не столько же, сколько и кода, так что при желании можно будет разобраться. Тем не менее, пару пояснений-наблюдений можно высказать.
В принципе, можно было бы сделать так, чтобы агент-distributor сам создал все источники данных, а для того, чтобы в памяти они не располагались последовательно, он бы мог их случайным образом перемешать. Однако, это было бы не очень похоже на то, что будет происходить в реальном SObjectizer-приложении, где источниками данных будут владеть разные сущности, созданные и работающие независимо друг от друга. Поэтому в тесте есть агенты a_data_source_owner_t, которые и владеют источниками данных. Эти агенты сообщают об источниках данных агенту-distributor-у посредством сообщений. Каждое такое сообщение отсылается со случайной задержкой. Тем самым обеспечивается случайный порядок следования описаний источников данных в агенте-distributor-е. А так же лиший раз показывается, насколько просто работать с таймерами в SO-5 ;)
Исходный код агентов получился длиннее, чем я ожидал. Но изрядную часть кода составляют конструкторы и перечень атрибутов агентов. Как этот объем уменьшить пока не представляю. Ведь агент -- это самостоятельная сущность, которой нужно где-то хранить состояние между вызовами событий агента. Кроме как в собственных атрибутах негде. А раз есть атрибуты, значит и появляется их декларация, инициализация и т.д.
Если не пользоваться инструментами вроде SObjectizer, то для такого теста разработчику потребовался бы какой-то вариант thread-safe очереди сообщений. Программист бы создавал несколько экземпляров этих очередей, передавал бы ссылки на них в соответствующие рабочие нити, затем бы опрашивал очереди в цикле. Не могу сказать, какой подход был бы компактнее по объему кода, но по моему прошлому опыту, совершить ошибку при ручной работе с message-queue гораздо проще.
Несколько иная ситуация была бы, если бы разработчик использовал какой-то готовый Publish-Subscribe инструмент. Например, тот же MQTT или даже AMQP (хотя для такой задачи AMQP -- это чистой воды overkill). В этом случае код получился бы, как мне представляется, очень похожим на то, что сделано на SObjectizer-е.
И в этом нет ничего удивительного, т.к. доставка сообщений через почтовые ящики в SObjectizer -- это не что иное, как очень простая форма Publish-Subscribe. А пара из mbox-а и типа сообщения в SObjectizer является аналогом topic-а в Pub-Sub. И код вроде:
so_default_state() .event( m_data_mbox, [this]( const light_data & evt ) { handle_sample( evt.m_name, evt.m_suffix, evt.m_gauge ); } ); |
является, по сути, аналогом чего-то вроде:
mq.subscribe( data_topic_name, [this]( const message & msg ) { auto & evt = dynamic_cast< const light_data & >(msg); handle_sample( evt.m_name, evt.m_suffix, evt.m_gauge ); } ); |
Так что можно сказать, что многопоточное программирование с помощью SObjectizer сильно похоже на многопоточное программирование с использованием Publish-Subscribe модели.
На что еще в данном примере можно обратить внимание?
В программе где все диспетчеры/биндеры-к-диспетчерам/агенты/сообщения являются динамически-созданными объектами, нет ни одного оператора new и delete.
В зависимости от значения константы data_holder_count в функции init() в тесте может быть создано изрядное количество агентов. Например, в данном случае двести тысяч. Не проблема. Хоть миллион.
Количество отложенных сообщений вдвое больше агентов с источниками данных. Т.е. если таких агентов двести тысяч, то отложенных сообщений будет четыреста тысяч. Опять же нет проблем, механизм таймеров в SObjectizer позволяет обрабатывать и не такое количество таймеров.
Ну а теперь, после чуть подзатянувшегося введения, сам исходный текст тестовой программы:
#include <iostream> #include <cstdlib> #include <cstdio> #include <cstring> #include <string> #include <so_5/all.hpp> // Ограничение на длину уникальной части имени источника данных. const std::size_t name_length = 32; // Суффиксы для имен источников данных. // Полное имя формируется из уникального префикса + один из // заранее определенных и не меняющихся суффиксов. // Например: data_holder_1.#2.suffix.second // // Т.к. во время работы программы зачение суффикса изменится не может, // то при распространении значения источника данных можно пересылать // только уникальную часть имени и указатель на константный суффикс. const char * const first_suffix = ".suffix.first"; const char * const second_suffix = ".suffix.second"; // "Легкое" сообщение с текущим значением источника данных. // Вместо копии уникальной части имени отсылается указатель. struct light_data : public so_5::rt::message_t { const char * m_name; const char * m_suffix; unsigned int m_gauge; light_data( const char * name, const char * suffix, unsigned int gauge ) : m_name( name ) , m_suffix( suffix ) , m_gauge( gauge ) {} }; // "Тяжелое" сообщение с текущим значением источника данных. // Сообщение содержит копию уникальной части имени источника данных. struct heavy_data : public so_5::rt::message_t { char m_name[ name_length ]; const char * m_suffix; unsigned int m_gauge; heavy_data( const char * name, const char * suffix, unsigned int gauge ) : m_suffix( suffix ) , m_gauge( gauge ) { std::strncpy( m_name, name, sizeof(m_name) ); } }; // Сам источник данных. struct data_source { char m_name[ name_length ]; const char * m_suffix; unsigned int m_gauge; data_source( const char * name, const char * suffix, unsigned int gauge ) : m_suffix( suffix ) , m_gauge( gauge ) { std::strncpy( m_name, name, sizeof(m_name) ); } }; // Сообщение для регистрации очередного источника данных. // Отсылается агентом data_source_owner-ом агенту distributor-у. struct msg_register_data_source : public so_5::rt::message_t { const data_source * m_data; msg_register_data_source( const data_source * data ) : m_data( data ) {} }; // Сигнал о том, что очередной data_processor получил все сообщения // о текущих значениях источников данных в рамках очередной итерации. struct msg_iteration_finished : public so_5::rt::signal_t {}; /* * Агент, который владеет двумя источниками данных. * * Никакой полезной работы не делает. Его главная задача -- обеспечить * существование источников данных пока тест работает. * * Выполняет всего одно действие: при старте внутри SObjectizer отсылает * информацию о своих источниках данных агенту distributor-у. * Отсылка производится посредством отложенных сообщений со случайно * выбранной задержкой. Что приводит к тому, что информация об источниках * данных в distributor-е будет "перемешанной". */ class a_data_source_owner_t : public so_5::rt::agent_t { public : a_data_source_owner_t( // На этом контексте агент будет работать в SObjectizer. context_t ctx, // Почтовый ящик distributor-а, на который нужно отсылать // сообщения об источниках данных. so_5::rt::mbox_t distributor_mbox, // Базовая часть имен для источников данных. const std::string & name ) // Контекст должен быть передан в базовый класс. : so_5::rt::agent_t( ctx ) // Почтовый ящик distributor-а должен быть сохранен, // т.к. будет использоваться не в конструкторе, а позже, // после регистрации в SObjectizer. , m_distributor( std::move( distributor_mbox ) ) // Инициализируются источники данных. , m_first_data_source( make_name( name, 1 ).c_str(), first_suffix, 1 ) , m_second_data_source( make_name( name, 2 ).c_str(), second_suffix, 2 ) {} // Реакция на начало работы в SObjectizer. // Данный метод вызывается автоматически для агента после того, // как агент будет успешно зарегистрирован и получит свой собственный // рабочий контекст. Это будет первый метод, который будет вызван // у агента на этом рабочем контексте. virtual void so_evt_start() override { // Вспомогательная локальная функция для вычисления случайной // задержки для отложенного сообщения. auto delay = [] { return std::chrono::milliseconds( 1 + std::rand() % 250 ); }; // Регистрация обоих источников данных посредством // отложенных сообщений со случайной задержкой. so_5::send_delayed< msg_register_data_source >( so_environment(), m_distributor, delay(), &m_first_data_source ); so_5::send_delayed< msg_register_data_source >( so_environment(), m_distributor, delay(), &m_second_data_source ); } private : /* * Персональные данные агента-владельца источников данных. */ // На этот почтовый ящик нужно отсылать сообщения о // регистрации источников данных. const so_5::rt::mbox_t m_distributor; // Сами источники данных в количестве двух штук. const data_source m_first_data_source; const data_source m_second_data_source; // Вспомогательный метод для генерации уникальной // части имени источника данных. static std::string make_name( const std::string & prefix, int number ) { return prefix + ".#" + std::to_string( number ); } }; /* * Агент для обработки собщений о текущих значениях источников данных. * * Сообщения о текущих значениях поступают в рамках итераций. На каждой * итерации приходит по одному сообщению для каждого источника данных. * Как только все значения получены, итерация считается завершенной и агент * информирует distributor-а о завершении итерации. Что позволяет * distributor-у начать новую итерацию или же завершить тест. * * Агент обрабатывает как light_data-сообщения, так и heavy_data-сообщения. * При этом не важно, какое сообщение начало итерацию и какое сообщение * завершает итерацию. */ class a_data_processor_t : public so_5::rt::agent_t { public : a_data_processor_t( // На этом контексте агент будет работать в SObjectizer. context_t ctx, // Из этого почтового ящика будут поступать текущие значения // источников данных. so_5::rt::mbox_t data_mbox, // На этот почтовый ящик нужно отсылать сигнал о том, что // агент завершил очередную итерацию. so_5::rt::mbox_t distributor_mbox, // Количество сообщений о текущем значении источников данных, // которые должны быть обработаны в рамках одной итерации. std::size_t iteration_size ) : so_5::rt::agent_t( ctx ) , m_data_mbox( std::move( data_mbox ) ) , m_distributor_mbox( std::move( distributor_mbox ) ) , m_iteration_size( iteration_size ) {} // Этот метод вызывается в процессе регистрации агента и позволяет // агенту настроится для дальнейшей работы. // В данном случае выполняется подписка на получаемые агентом // сообщения. virtual void so_define_agent() override { // // Агенту достаточно одного состояния по умолчанию, в котором // он будет обрабатывать все свои события. // // Поскольку реакция на сообщения является элементарной, // обработчики сообщений задаются лямбда-функциями прямо // при подписке на сообщение. // // Тип сообщения, на которое выполняется подписка, определяется // по сигнатуре лямбда-функций. // so_default_state() .event( // Сообщения о значении источника данных прилетают из // почтового ящика m_data_mbox. m_data_mbox, // Отдельная реакция на "легкое" сообщение. [this]( const light_data & evt ) { handle_sample( evt.m_name, evt.m_suffix, evt.m_gauge ); } ) .event( m_data_mbox, // Отдельная реакция на "тяжелое" сообщение. [this]( const heavy_data & evt ) { handle_sample( evt.m_name, evt.m_suffix, evt.m_gauge ); } ); } private : /* * Персональные данные агента-обработчика. */ // Почтовые ящики с которыми нужно работать. const so_5::rt::mbox_t m_data_mbox; const so_5::rt::mbox_t m_distributor_mbox; // Размер одной итерации (т.е. сколько сообщений нужно получить, // чтобы считать итерацию законченной). const std::size_t m_iteration_size; // Некий буфер, который будет использоваться для имитации обработки // значений источников данных. char m_last_sample[ name_length * 3 ]; // Счетчик сообщений для очередной итерации. std::size_t m_samples_received = 0; // Имитация обработки текущего значения источника данных. void handle_sample( const char * name, const char * suffix, unsigned int value ) { // Это просто имитация какой-то обработки значения. std::sprintf( m_last_sample, "%s%s: %u", name, suffix, value ); // Не пора ли завершать итерацию? if( m_iteration_size <= ++m_samples_received ) { // Да, итерацию пора завершать. Возвращаемся в // исходное состояние и информируем о завершении итерации // агент-distributor. m_samples_received = 0; so_5::send< msg_iteration_finished >( m_distributor_mbox ); } } }; /* * Агент, который координирует работу теста. * * Стартует в состоянии st_wait_data_sources и принимает сообщения о * регистрации источников данных. Информация об источниках данных сохраняется * в том же порядке, в котором поступает. Т.к. сообщения приходят со * случайной задержкой, то и порядок следования описаний источников данных * оказывается случайным. * * После того, как все источники данных будут зарегистрированы, начинается * серия итераций с использованием "легких" сообщений. В начале каждой * итерации агент отсылает сообщения о значении всех источников данных * в специальный почтовый ящик. После чего ждет сигналы о том, что * агенты-обработчики итерацию завершили. * * После того, как завершится серия итераций с "легкими" сообщения проводится * аналогичная серия, но с "тяжелыми" сообщениями. * * Когда завершается серия итераций с "тяжелыми" сообщениями, вся кооперация * дерегистрируется и работа теста завершается. */ class a_distributor_t : public so_5::rt::agent_t { public : a_distributor_t( // На этом контексте агент будет работать в SObjectizer. context_t ctx, // В этот почтовый ящик нужно отсылать текущие значения // источников данных. so_5::rt::mbox_t data_mbox, // Сколько источников данных будет задействовано в тесте. // Это нужно знать для того, чтобы определить, когда регистрацию // источников данных следует завершить и начать серию итераций. std::size_t data_sources_count, // Сколько агентов-обработчиков будет задействовано в тесте. // Это нужно знать для того, чтобы определить, когда очередная // итерация была закончена всеми обработчиками. std::size_t data_processor_count, // Сколько итераций нужно провести в каждой серии. std::size_t iteration_count ) : so_5::rt::agent_t( ctx ) , m_data_mbox( std::move( data_mbox ) ) , m_data_sources_count( data_sources_count ) , m_data_processor_count( data_processor_count ) , m_iteration_count( iteration_count ) { // Резервируем память сразу, чтобы не делать это при // последующих push_back-ах. m_data_sources.reserve( m_data_sources_count ); } // Как и агент-обработчик, данный агент нуждается в настройке // перед тем, как он начнет работать внутри SObjectizer. virtual void so_define_agent() override { // Начинать работу нужно в специальном состоянии, в котором // агент ожидает сообщения с информацией об источниках данных. this >>= st_wait_data_sources; // // Все сообщения, на которые подписывается агент, приходят // на его собственный почтовый ящик. Поэтому при подписке // никаких почтовых ящиков явно не указывается. // // В начальном состоянии агент реагирует только на один тип // сообщения. Тип этого сообщения выводится из сигнатуры // метода-обработчика. st_wait_data_sources.event( &a_distributor_t::evt_data_source ); // В состоянии, в котором агент проводит серию итераций // с "легкими" сообщениями, обрабатываются сигналы о // завершении итерации. Эти сигналы обрабатываются // специальным методом, который знает, что речь идет // об итерации с "легкими" сообщениями. st_light_data_iterations.event< msg_iteration_finished >( &a_distributor_t::evt_light_data_iteration_finished ); // Аналогично, сигнал о завершении итерации с "тяжелыми" // сообщении обрабатывается в отдельном состоянии. И реагирует // на сигнал метод, который знает, что речь идет об итерации // с "тяжелыми" сообщениями. st_heavy_data_iterations.event< msg_iteration_finished >( &a_distributor_t::evt_heavy_data_iteration_finished ); } private : /* * Персональные данные агента-distributor-а. */ // Состояния, которые будет использовать агент. const so_5::rt::state_t st_wait_data_sources = so_make_state(); const so_5::rt::state_t st_light_data_iterations = so_make_state(); const so_5::rt::state_t st_heavy_data_iterations = so_make_state(); // Почтовый ящик, в который нужно отсылать сообщения о текущих // значениях источников данных. const so_5::rt::mbox_t m_data_mbox; // Ограничения, в которых работает агент-distributor. const std::size_t m_data_sources_count; const std::size_t m_data_processor_count; const std::size_t m_iteration_count; // Список источников данных. // Список накапливает описания по мере получения // сообщений msg_register_data_source. std::vector< const data_source * > m_data_sources; // Сколько агентов-обработчиков уже завершили свои итерации. std::size_t m_processor_iterations_passed = 0; // Сколько итераций было проведено в рамках очередной серии. std::size_t m_iterations_passed = 0; // Когда была начата очередная серия итераций. std::chrono::high_resolution_clock::time_point m_started_at; // Реакция на получение описания очередного источника данных. void evt_data_source( const msg_register_data_source & evt ) { m_data_sources.push_back( evt.m_data ); // Не пора ли начинать серию итераций? if( m_data_sources_count == m_data_sources.size() ) // Да, начинаем серию итераций с "легкими" сообщениями. start_measure< light_data >( st_light_data_iterations ); } // Реакция на завершение очередным агентом-обработчиком итерации // с "легкими" сообщениями. void evt_light_data_iteration_finished() { handle_iteration_finish< light_data >( // Эта лямбда задает действия, которые нужно выполнить, // если вся серия итераций оказалась законченной. [this] { // Покажем результат серии. show_result( "light_data" ); // Начнем серию итераций с "тяжелыми" сообщениями. start_measure< heavy_data >( st_heavy_data_iterations ); } ); } // Реакция на завершение очередным агентом-обработчиком итерации // с "тяжелыми" сообщениями. void evt_heavy_data_iteration_finished() { handle_iteration_finish< heavy_data >( // Эта лямбда задает действия, которые нужно выполнить, // если вся серия итераций оказалась законченной. [this] { // Покажем результат серии. show_result( "heavy_data" ); // Завершим работу кооперации и теста. so_deregister_agent_coop_normally(); } ); } // Начало очередной серии. // Параметр шаблона указывает, какие именно сообщения // будут отсылаться на каждой итерации. // // А параметр state указывает, в какое состояние агент должен // быть переведен перед началом серии. template< class MSG > void start_measure( const so_5::rt::state_t & state ) { // Настройка состояния агента для проведения серии. this >>= state; m_iterations_passed = 0; m_started_at = std::chrono::high_resolution_clock::now(); // Инициируем первую итерацию серии. next_iteration< MSG >(); } template< typename MSG > void next_iteration() { // В начале каждой итерации нужно обнулять это значение, // т.к. оно модифицируется при обработке сигналов // msg_iteration_finished на предыдущих итерациях. m_processor_iterations_passed = 0; for( auto d : m_data_sources ) // Значение очередного источника данных идет // отдельным сообщением в специально предназначенный // для этого почтовый ящик. // В результате сообщение получат все агенты-обработчики. so_5::send< MSG >( m_data_mbox, d->m_name, d->m_suffix, d->m_gauge ); } template< typename MSG, typename ON_MEASURE_FINISH > void handle_iteration_finish( ON_MEASURE_FINISH reaction ) { if( m_data_processor_count <= ++m_processor_iterations_passed ) { if( m_iteration_count <= ++m_iterations_passed ) reaction(); else next_iteration< MSG >(); } } void show_result( const char * what ) { using namespace std::chrono; // Показываем затраченное на серию итераций время в миллисекундах, // но с дробной частью. std::cout << what << ": " << duration_cast< microseconds >( high_resolution_clock::now() - m_started_at ).count() / 1000.0 << "ms" << std::endl; } }; // Функция для выполнения стартовых действий внутри SObjectizer. // Она создает всех прикладных агентов теста и регистрирует их // в SObjectizer в виде одной кооперации. void init( // Конкретный экземпляр SObjectizer, внутри которого будет // выполняться работа теста. so_5::rt::environment_t & env ) { // Ограничения для теста. const std::size_t data_holder_count = 200000; const std::size_t data_source_count = data_holder_count * 2; const std::size_t processor_count = 2; const std::size_t iterations_count = 10; std::cout << "*** holders: " << data_holder_count << ", processors: " << processor_count << ", iterations: " << iterations_count << std::endl; // Приступаем к формировании кооперации прикладных агентов. // Имя для кооперации назначит сам SObjectizer. // Т.к. никаких дополнительных параметров нет, основным рабочим // контекстом для кооперации будет диспетчер по умолчанию. auto coop = env.create_coop( so_5::autoname ); // Это специальный почтовый ящик, в который агент-distributor // будет отсылать значения источников данных. // Из этого почтового ящика агенты-обработчики будут получать // отправленные distributor-ом сообщения. auto data_mbox = env.create_local_mbox(); // Агент-distributor. Будет работать на диспетчере по умолчанию, // который является основным диспетчером для кооперации. // Метод make_agent возвращает указатель на созданного агента. // Этот указатель нам нужен в дальнейшем для доступа к персональному // почтовому ящику агента-distributor-а. auto distributor = coop->make_agent< a_distributor_t >( data_mbox, data_source_count, processor_count, iterations_count ); // Агенты-владельцы источников данных должны вполнять свои // стартовые действия на отдельном пуле рабочих потоков. // Для чего создается частный диспетчер для этих агентов. // Количество рабочих потоков будет определено автоматически // (либо по количеству доступных ядер, либо же два потока, если // работа идет на одноядерной машине). auto data_holder_disp = so_5::disp::thread_pool::create_private_disp(); // Создаем столько владельцев источников данных, сколько // нужно для теста. for( std::size_t i = 0; i != data_holder_count; ++i ) coop->make_agent_with_binder< a_data_source_owner_t >( // Каждый агент привязывается к частному диспетчеру // с пулом рабочих потоков. // При этом нужно указать, что каждый агент использует // свою FIFO очередь и не зависит от других агентов. // Это позволит диспетчеру распределять агентов, которые // входят в одну кооперацию, по всем своим рабочим нитям. // Если бы агенты использовали общую FIFO очередь кооперации, // то они бы все работали последовательно и только на одной // рабочей нити из пула. data_holder_disp->binder( so_5::disp::thread_pool::params_t{}.fifo( so_5::disp::thread_pool::fifo_t::individual ) ), // В качестве почтового ящика, на который нужно отсылать // сообщения об источниках данных, используется персональный // почтовый ящик агента-distributor-а. distributor->so_direct_mbox(), // Базовая части уникального имени источников данных // для этого агента. std::string( "data_holder_" ) + std::to_string( i ) ); // Создаем столько агентов-обработчиков, сколько нужно для теста. for( std::size_t i = 0; i != processor_count; ++i ) coop->make_agent_with_binder< a_data_processor_t >( // Каждый агент привязывается к своему частному // диспетчеру с одной рабочей нитью. // Т.е. каждый агент-обработчик будет иметь в качестве // рабочего контекста свою собственную рабочую нить. so_5::disp::one_thread::create_private_disp()->binder(), // Почтовый ящик из которого будут приходить сообщения // об очередном текущем значении очередного источника данных. data_mbox, // В качестве почтового ящика, на который нужно отсылать // сигналы о завершении итерации, используется персональный // почтовый ящик агента-distributor-а. distributor->so_direct_mbox(), data_source_count ); // Все агенты созданы. Осталось только зарегистрировать кооперацию. env.register_coop( std::move( coop ) ); } int main() { try { // // Запуск SObjectizer и теста внутри него. // Функция launch сама создает экземпляр SObjectizer, // настроит его и вызовет стартовую функцию init, передав // в нее ссылку на этот экземпляр. // // Возврат из launch произойдет после того, как завершит // свою работу созданная внутри init() кооперация. // // Информирование об ошибках происходит посредством исключений. // so_5::launch( &init ); return 0; } catch( const std::exception & x ) { std::cerr << "Exception: " << x.what() << std::endl; } return 2; } |
Комментариев нет:
Отправить комментарий