пятница, 20 марта 2015 г.

[prog.c++11] Многопоточность и Pub-Sub с помощью SObjectizer

Одной из задач, которая возлагалась на SObjectizer, была задача упрощения работы с многопоточностью. Имхо, с этой задачей SObjectizer успешно справляется. Под катом небольшой пример того, как это выглядит на практике.

В рамках поиска способов реализации низкоуровневого мониторинга в SObjectizer потребовалось провести небольшой эксперимент. Суть в том, что в приложении может быть куча "источников данных" (т.е. объектов, которые содержат какие-то интересные с точки зрения мониторинга значения вроде количества рабочих нитей и размеры очередей сообщений). Существует некая сущность-distributor, которая с заданным темпом рассылает сообщения с текущими значениями источников данных. В сообщении нужно как-то указать, к какому именно источнику данных относится текущее значение.

У каждого источника данных есть текстовое имя. Но вопрос заключался в том, как это имя передавать в сообщении?

Можно передавать только указатель на имя. Тогда сообщение будет "легким", т.к. оно содержит лишь указатель. Но доступ к имени может получиться дорогим, т.к. само имя будет лежать "далеко" от текущего значения и обработчику сообщения придется "дотягиваться" до имени. Можно передавать в сообщении полное имя. Тогда сообщение будет "тяжелым". Но зато доступ к имени окажется дешевле за счет того, что имя и значение будут лежать "ближе" друг к другу.

В общем, потребовалось написать небольшую многопоточную программу-тест, в которой создавалось бы N источников данных и несколько потребителей сообщений о значениях этих источников данных. Каждый потребитель должен работать на своей рабочей нити. Плюс distributor на отдельной нити должен периодически рассылать сообщения с текущими значениями источников данных.

Ниже приведен исходный код варианта написанного на SObjectizer (с использованием возможностей находящейся сейчас в разработке версии 5.5.4). Там комментариев написано чуть ли не столько же, сколько и кода, так что при желании можно будет разобраться. Тем не менее, пару пояснений-наблюдений можно высказать.

В принципе, можно было бы сделать так, чтобы агент-distributor сам создал все источники данных, а для того, чтобы в памяти они не располагались последовательно, он бы мог их случайным образом перемешать. Однако, это было бы не очень похоже на то, что будет происходить в реальном SObjectizer-приложении, где источниками данных будут владеть разные сущности, созданные и работающие независимо друг от друга. Поэтому в тесте есть агенты a_data_source_owner_t, которые и владеют источниками данных. Эти агенты сообщают об источниках данных агенту-distributor-у посредством сообщений. Каждое такое сообщение отсылается со случайной задержкой. Тем самым обеспечивается случайный порядок следования описаний источников данных в агенте-distributor-е. А так же лиший раз показывается, насколько просто работать с таймерами в SO-5 ;)

Исходный код агентов получился длиннее, чем я ожидал. Но изрядную часть кода составляют конструкторы и перечень атрибутов агентов. Как этот объем уменьшить пока не представляю. Ведь агент -- это самостоятельная сущность, которой нужно где-то хранить состояние между вызовами событий агента. Кроме как в собственных атрибутах негде. А раз есть атрибуты, значит и появляется их декларация, инициализация и т.д.

Если не пользоваться инструментами вроде SObjectizer, то для такого теста разработчику потребовался бы какой-то вариант thread-safe очереди сообщений. Программист бы создавал несколько экземпляров этих очередей, передавал бы ссылки на них в соответствующие рабочие нити, затем бы опрашивал очереди в цикле. Не могу сказать, какой подход был бы компактнее по объему кода, но по моему прошлому опыту, совершить ошибку при ручной работе с message-queue гораздо проще.

Несколько иная ситуация была бы, если бы разработчик использовал какой-то готовый Publish-Subscribe инструмент. Например, тот же MQTT или даже AMQP (хотя для такой задачи AMQP -- это чистой воды overkill). В этом случае код получился бы, как мне представляется, очень похожим на то, что сделано на SObjectizer-е.

И в этом нет ничего удивительного, т.к. доставка сообщений через почтовые ящики в SObjectizer -- это не что иное, как очень простая форма Publish-Subscribe. А пара из mbox-а и типа сообщения в SObjectizer является аналогом topic-а в Pub-Sub. И код вроде:

so_default_state()
  .event( m_data_mbox,
      [this]( const light_data & evt ) {
        handle_sample( evt.m_name, evt.m_suffix, evt.m_gauge );
      } );

является, по сути, аналогом чего-то вроде:

mq.subscribe( data_topic_name,
  [this]( const message & msg ) {
    auto & evt = dynamic_castconst light_data & >(msg);
    handle_sample( evt.m_name, evt.m_suffix, evt.m_gauge );
  } );

Так что можно сказать, что многопоточное программирование с помощью SObjectizer сильно похоже на многопоточное программирование с использованием Publish-Subscribe модели.

На что еще в данном примере можно обратить внимание?

В программе где все диспетчеры/биндеры-к-диспетчерам/агенты/сообщения являются динамически-созданными объектами, нет ни одного оператора new и delete.

В зависимости от значения константы data_holder_count в функции init() в тесте может быть создано изрядное количество агентов. Например, в данном случае двести тысяч. Не проблема. Хоть миллион.

Количество отложенных сообщений вдвое больше агентов с источниками данных. Т.е. если таких агентов двести тысяч, то отложенных сообщений будет четыреста тысяч. Опять же нет проблем, механизм таймеров в SObjectizer позволяет обрабатывать и не такое количество таймеров.


Ну а теперь, после чуть подзатянувшегося введения, сам исходный текст тестовой программы:

#include <iostream>
#include <cstdlib>
#include <cstdio>
#include <cstring>
#include <string>

#include <so_5/all.hpp>

// Ограничение на длину уникальной части имени источника данных.
const std::size_t name_length = 32;

// Суффиксы для имен источников данных.
// Полное имя формируется из уникального префикса + один из
// заранее определенных и не меняющихся суффиксов.
// Например: data_holder_1.#2.suffix.second
//
// Т.к. во время работы программы зачение суффикса изменится не может,
// то при распространении значения источника данных можно пересылать
// только уникальную часть имени и указатель на константный суффикс.
const char * const first_suffix = ".suffix.first";
const char * const second_suffix = ".suffix.second";

// "Легкое" сообщение с текущим значением источника данных.
// Вместо копии уникальной части имени отсылается указатель.
struct light_data : public so_5::rt::message_t
  {
    const char * m_name;
    const char * m_suffix;

    unsigned int m_gauge;

    light_data(
      const char * name,
      const char * suffix,
      unsigned int gauge )
      : m_name( name )
      , m_suffix( suffix )
      , m_gauge( gauge )
      {}
  };

// "Тяжелое" сообщение с текущим значением источника данных.
// Сообщение содержит копию уникальной части имени источника данных.
struct heavy_data : public so_5::rt::message_t
  {
    char m_name[ name_length ];
    const char * m_suffix;

    unsigned int m_gauge;

    heavy_data(
      const char * name,
      const char * suffix,
      unsigned int gauge )
      : m_suffix( suffix )
      , m_gauge( gauge )
      {
        std::strncpy( m_name, name, sizeof(m_name) );
      }
  };

// Сам источник данных.
struct data_source
  {
    char m_name[ name_length ];
    const char * m_suffix;

    unsigned int m_gauge;

    data_source(
      const char * name,
      const char * suffix,
      unsigned int gauge )
      : m_suffix( suffix )
      , m_gauge( gauge )
      {
        std::strncpy( m_name, name, sizeof(m_name) );
      }
  };

// Сообщение для регистрации очередного источника данных.
// Отсылается агентом data_source_owner-ом агенту distributor-у.
struct msg_register_data_source : public so_5::rt::message_t
  {
    const data_source * m_data;

    msg_register_data_source(
      const data_source * data )
      : m_data( data )
      {}
  };

// Сигнал о том, что очередной data_processor получил все сообщения
// о текущих значениях источников данных в рамках очередной итерации.
struct msg_iteration_finished : public so_5::rt::signal_t {};

/*
 * Агент, который владеет двумя источниками данных.
 *
 * Никакой полезной работы не делает. Его главная задача -- обеспечить
 * существование источников данных пока тест работает.
 *
 * Выполняет всего одно действие: при старте внутри SObjectizer отсылает
 * информацию о своих источниках данных агенту distributor-у.
 * Отсылка производится посредством отложенных сообщений со случайно
 * выбранной задержкой. Что приводит к тому, что информация об источниках
 * данных в distributor-е будет "перемешанной".
 */
class a_data_source_owner_t : public so_5::rt::agent_t
  {
  public :
    a_data_source_owner_t(
      // На этом контексте агент будет работать в SObjectizer.
      context_t ctx,
      // Почтовый ящик distributor-а, на который нужно отсылать
      // сообщения об источниках данных.
      so_5::rt::mbox_t distributor_mbox,
      // Базовая часть имен для источников данных.
      const std::string & name )
        // Контекст должен быть передан в базовый класс.
      : so_5::rt::agent_t( ctx )
        // Почтовый ящик distributor-а должен быть сохранен,
        // т.к. будет использоваться не в конструкторе, а позже,
        // после регистрации в SObjectizer.
      , m_distributor( std::move( distributor_mbox ) )
        // Инициализируются источники данных.
      , m_first_data_source( make_name( name, 1 ).c_str(), first_suffix, 1 )
      , m_second_data_source( make_name( name, 2 ).c_str(), second_suffix, 2 )
      {}

    // Реакция на начало работы в SObjectizer.
    // Данный метод вызывается автоматически для агента после того,
    // как агент будет успешно зарегистрирован и получит свой собственный
    // рабочий контекст. Это будет первый метод, который будет вызван
    // у агента на этом рабочем контексте.
    virtual void
    so_evt_start() override
      {
        // Вспомогательная локальная функция для вычисления случайной
        // задержки для отложенного сообщения.
        auto delay = [] {
            return std::chrono::milliseconds( 1 + std::rand() % 250 );
        };

        // Регистрация обоих источников данных посредством
        // отложенных сообщений со случайной задержкой.
        so_5::send_delayed< msg_register_data_source >(
            so_environment(),
            m_distributor, delay(), &m_first_data_source );

        so_5::send_delayed< msg_register_data_source >(
            so_environment(),
            m_distributor, delay(), &m_second_data_source );
      }

  private :
    /*
     * Персональные данные агента-владельца источников данных.
     */

    // На этот почтовый ящик нужно отсылать сообщения о
    // регистрации источников данных.
    const so_5::rt::mbox_t m_distributor;

    // Сами источники данных в количестве двух штук.
    const data_source m_first_data_source;
    const data_source m_second_data_source;

    // Вспомогательный метод для генерации уникальной
    // части имени источника данных.
    static std::string
    make_name( const std::string & prefix, int number )
      {
        return prefix + ".#" + std::to_string( number );
      }
  };

/*
 * Агент для обработки собщений о текущих значениях источников данных.
 *
 * Сообщения о текущих значениях поступают в рамках итераций. На каждой
 * итерации приходит по одному сообщению для каждого источника данных.
 * Как только все значения получены, итерация считается завершенной и агент
 * информирует distributor-а о завершении итерации. Что позволяет
 * distributor-у начать новую итерацию или же завершить тест.
 *
 * Агент обрабатывает как light_data-сообщения, так и heavy_data-сообщения.
 * При этом не важно, какое сообщение начало итерацию и какое сообщение
 * завершает итерацию.
 */
class a_data_processor_t : public so_5::rt::agent_t
  {
  public :
    a_data_processor_t(
      // На этом контексте агент будет работать в SObjectizer.
      context_t ctx,
      // Из этого почтового ящика будут поступать текущие значения
      // источников данных.
      so_5::rt::mbox_t data_mbox,
      // На этот почтовый ящик нужно отсылать сигнал о том, что
      // агент завершил очередную итерацию.
      so_5::rt::mbox_t distributor_mbox,
      // Количество сообщений о текущем значении источников данных,
      // которые должны быть обработаны в рамках одной итерации.
      std::size_t iteration_size )
      : so_5::rt::agent_t( ctx )
      , m_data_mbox( std::move( data_mbox ) )
      , m_distributor_mbox( std::move( distributor_mbox ) )
      , m_iteration_size( iteration_size )
      {}

    // Этот метод вызывается в процессе регистрации агента и позволяет
    // агенту настроится для дальнейшей работы.
    // В данном случае выполняется подписка на получаемые агентом
    // сообщения.
    virtual void
    so_define_agent() override
      {
        //
        // Агенту достаточно одного состояния по умолчанию, в котором
        // он будет обрабатывать все свои события.
        //
        // Поскольку реакция на сообщения является элементарной,
        // обработчики сообщений задаются лямбда-функциями прямо
        // при подписке на сообщение.
        //
        // Тип сообщения, на которое выполняется подписка, определяется
        // по сигнатуре лямбда-функций.
        //
        so_default_state()
          .event(
            // Сообщения о значении источника данных прилетают из
            // почтового ящика m_data_mbox.
            m_data_mbox,
            // Отдельная реакция на "легкое" сообщение.
            [this]( const light_data & evt ) {
              handle_sample( evt.m_name, evt.m_suffix, evt.m_gauge );
            } )
          .event(
            m_data_mbox,
            // Отдельная реакция на "тяжелое" сообщение.
            [this]( const heavy_data & evt ) {
              handle_sample( evt.m_name, evt.m_suffix, evt.m_gauge );
            } );
      }

  private :
    /*
     * Персональные данные агента-обработчика.
     */

    // Почтовые ящики с которыми нужно работать.
    const so_5::rt::mbox_t m_data_mbox;
    const so_5::rt::mbox_t m_distributor_mbox;

    // Размер одной итерации (т.е. сколько сообщений нужно получить,
    // чтобы считать итерацию законченной).
    const std::size_t m_iteration_size;

    // Некий буфер, который будет использоваться для имитации обработки
    // значений источников данных.
    char m_last_sample[ name_length * 3 ];

    // Счетчик сообщений для очередной итерации.
    std::size_t m_samples_received = 0;

    // Имитация обработки текущего значения источника данных.
    void
    handle_sample(
      const char * name,
      const char * suffix,
      unsigned int value )
      {
        // Это просто имитация какой-то обработки значения.
        std::sprintf( m_last_sample, "%s%s%u", name, suffix, value );

        // Не пора ли завершать итерацию?
        if( m_iteration_size <= ++m_samples_received )
          {
            // Да, итерацию пора завершать. Возвращаемся в
            // исходное состояние и информируем о завершении итерации
            // агент-distributor.
            m_samples_received = 0;
            so_5::send< msg_iteration_finished >( m_distributor_mbox );
          }
      }
  };

/*
 * Агент, который координирует работу теста.
 *
 * Стартует в состоянии st_wait_data_sources и принимает сообщения о
 * регистрации источников данных. Информация об источниках данных сохраняется
 * в том же порядке, в котором поступает. Т.к. сообщения приходят со
 * случайной задержкой, то и порядок следования описаний источников данных
 * оказывается случайным.
 *
 * После того, как все источники данных будут зарегистрированы, начинается
 * серия итераций с использованием "легких" сообщений. В начале каждой
 * итерации агент отсылает сообщения о значении всех источников данных
 * в специальный почтовый ящик. После чего ждет сигналы о том, что
 * агенты-обработчики итерацию завершили.
 *
 * После того, как завершится серия итераций с "легкими" сообщения проводится
 * аналогичная серия, но с "тяжелыми" сообщениями.
 *
 * Когда завершается серия итераций с "тяжелыми" сообщениями, вся кооперация
 * дерегистрируется и работа теста завершается.
 */
class a_distributor_t : public so_5::rt::agent_t
  {
  public :
    a_distributor_t(
      // На этом контексте агент будет работать в SObjectizer.
      context_t ctx,
      // В этот почтовый ящик нужно отсылать текущие значения
      // источников данных.
      so_5::rt::mbox_t data_mbox,
      // Сколько источников данных будет задействовано в тесте.
      // Это нужно знать для того, чтобы определить, когда регистрацию
      // источников данных следует завершить и начать серию итераций.
      std::size_t data_sources_count,
      // Сколько агентов-обработчиков будет задействовано в тесте.
      // Это нужно знать для того, чтобы определить, когда очередная
      // итерация была закончена всеми обработчиками.
      std::size_t data_processor_count,
      // Сколько итераций нужно провести в каждой серии.
      std::size_t iteration_count )
      : so_5::rt::agent_t( ctx )
      , m_data_mbox( std::move( data_mbox ) )
      , m_data_sources_count( data_sources_count )
      , m_data_processor_count( data_processor_count )
      , m_iteration_count( iteration_count )
      {
        // Резервируем память сразу, чтобы не делать это при
        // последующих push_back-ах.
        m_data_sources.reserve( m_data_sources_count );
      }

    // Как и агент-обработчик, данный агент нуждается в настройке
    // перед тем, как он начнет работать внутри SObjectizer.
    virtual void
    so_define_agent() override
      {
        // Начинать работу нужно в специальном состоянии, в котором
        // агент ожидает сообщения с информацией об источниках данных.
        this >>= st_wait_data_sources;

        //
        // Все сообщения, на которые подписывается агент, приходят
        // на его собственный почтовый ящик. Поэтому при подписке
        // никаких почтовых ящиков явно не указывается.
        //

        // В начальном состоянии агент реагирует только на один тип 
        // сообщения. Тип этого сообщения выводится из сигнатуры
        // метода-обработчика.
        st_wait_data_sources.event( &a_distributor_t::evt_data_source );

        // В состоянии, в котором агент проводит серию итераций
        // с "легкими" сообщениями, обрабатываются сигналы о
        // завершении итерации. Эти сигналы обрабатываются
        // специальным методом, который знает, что речь идет
        // об итерации с "легкими" сообщениями.
        st_light_data_iterations.event< msg_iteration_finished >(
            &a_distributor_t::evt_light_data_iteration_finished );

        // Аналогично, сигнал о завершении итерации с "тяжелыми"
        // сообщении обрабатывается в отдельном состоянии. И реагирует
        // на сигнал метод, который знает, что речь идет об итерации
        // с "тяжелыми" сообщениями.
        st_heavy_data_iterations.event< msg_iteration_finished >(
            &a_distributor_t::evt_heavy_data_iteration_finished );
      }

  private :
    /*
     * Персональные данные агента-distributor-а.
     */

    // Состояния, которые будет использовать агент.
    const so_5::rt::state_t st_wait_data_sources = so_make_state();
    const so_5::rt::state_t st_light_data_iterations = so_make_state();
    const so_5::rt::state_t st_heavy_data_iterations = so_make_state();

    // Почтовый ящик, в который нужно отсылать сообщения о текущих
    // значениях источников данных.
    const so_5::rt::mbox_t m_data_mbox;

    // Ограничения, в которых работает агент-distributor.
    const std::size_t m_data_sources_count;
    const std::size_t m_data_processor_count;
    const std::size_t m_iteration_count;

    // Список источников данных.
    // Список накапливает описания по мере получения
    // сообщений msg_register_data_source.
    std::vector< const data_source * > m_data_sources;

    // Сколько агентов-обработчиков уже завершили свои итерации. 
    std::size_t m_processor_iterations_passed = 0;
    // Сколько итераций было проведено в рамках очередной серии.
    std::size_t m_iterations_passed = 0;

    // Когда была начата очередная серия итераций.
    std::chrono::high_resolution_clock::time_point m_started_at;

    // Реакция на получение описания очередного источника данных.
    void
    evt_data_source( const msg_register_data_source & evt )
      {
        m_data_sources.push_back( evt.m_data );

        // Не пора ли начинать серию итераций?
        if( m_data_sources_count == m_data_sources.size() )
          // Да, начинаем серию итераций с "легкими" сообщениями.
          start_measure< light_data >( st_light_data_iterations );
      }

    // Реакция на завершение очередным агентом-обработчиком итерации
    // с "легкими" сообщениями.
    void
    evt_light_data_iteration_finished()
      {
        handle_iteration_finish< light_data >(
            // Эта лямбда задает действия, которые нужно выполнить,
            // если вся серия итераций оказалась законченной.
            [this] {
              // Покажем результат серии.
              show_result( "light_data" );
              // Начнем серию итераций с "тяжелыми" сообщениями.
              start_measure< heavy_data >( st_heavy_data_iterations );
            } );
      }

    // Реакция на завершение очередным агентом-обработчиком итерации
    // с "тяжелыми" сообщениями.
    void
    evt_heavy_data_iteration_finished()
      {
        handle_iteration_finish< heavy_data >(
            // Эта лямбда задает действия, которые нужно выполнить,
            // если вся серия итераций оказалась законченной.
            [this] {
              // Покажем результат серии.
              show_result( "heavy_data" );
              // Завершим работу кооперации и теста.
              so_deregister_agent_coop_normally();
            } );
      }

    // Начало очередной серии.
    // Параметр шаблона указывает, какие именно сообщения
    // будут отсылаться на каждой итерации.
    // 
    // А параметр state указывает, в какое состояние агент должен
    // быть переведен перед началом серии.
    templateclass MSG >
    void
    start_measure( const so_5::rt::state_t & state )
      {
        // Настройка состояния агента для проведения серии.
        this >>= state;
        m_iterations_passed = 0;
        m_started_at = std::chrono::high_resolution_clock::now();

        // Инициируем первую итерацию серии.
        next_iteration< MSG >();
      }

    templatetypename MSG >
    void
    next_iteration()
      {
        // В начале каждой итерации нужно обнулять это значение,
        // т.к. оно модифицируется при обработке сигналов
        // msg_iteration_finished на предыдущих итерациях.
        m_processor_iterations_passed = 0;

        forauto d : m_data_sources )
          // Значение очередного источника данных идет
          // отдельным сообщением в специально предназначенный
          // для этого почтовый ящик.
          // В результате сообщение получат все агенты-обработчики.
          so_5::send< MSG >( m_data_mbox,
              d->m_name, d->m_suffix, d->m_gauge );
      }

    templatetypename MSG, typename ON_MEASURE_FINISH >
    void
    handle_iteration_finish( ON_MEASURE_FINISH reaction )
      {
        if( m_data_processor_count <= ++m_processor_iterations_passed )
          {
            if( m_iteration_count <= ++m_iterations_passed )
              reaction();
            else
              next_iteration< MSG >();
          }
      }

    void
    show_result( const char * what )
      {
        using namespace std::chrono;

        // Показываем затраченное на серию итераций время в миллисекундах,
        // но с дробной частью.
        std::cout << what << ": "
          << duration_cast< microseconds >( high_resolution_clock::now() -
                m_started_at ).count() / 1000.0 << "ms"
          << std::endl;
      }
  };

// Функция для выполнения стартовых действий внутри SObjectizer.
// Она создает всех прикладных агентов теста и регистрирует их
// в SObjectizer в виде одной кооперации.
void
init(
  // Конкретный экземпляр SObjectizer, внутри которого будет
  // выполняться работа теста.
  so_5::rt::environment_t & env )
  {
    // Ограничения для теста.
    const std::size_t data_holder_count = 200000;
    const std::size_t data_source_count = data_holder_count * 2;
    const std::size_t processor_count = 2;
    const std::size_t iterations_count = 10;

    std::cout << "*** holders: " << data_holder_count
        << ", processors: " << processor_count
        << ", iterations: " << iterations_count << std::endl;

    // Приступаем к формировании кооперации прикладных агентов.
    // Имя для кооперации назначит сам SObjectizer.
    // Т.к. никаких дополнительных параметров нет, основным рабочим
    // контекстом для кооперации будет диспетчер по умолчанию.
    auto coop = env.create_coop( so_5::autoname );

    // Это специальный почтовый ящик, в который агент-distributor
    // будет отсылать значения источников данных.
    // Из этого почтового ящика агенты-обработчики будут получать
    // отправленные distributor-ом сообщения.
    auto data_mbox = env.create_local_mbox();

    // Агент-distributor. Будет работать на диспетчере по умолчанию,
    // который является основным диспетчером для кооперации.
    // Метод make_agent возвращает указатель на созданного агента.
    // Этот указатель нам нужен в дальнейшем для доступа к персональному
    // почтовому ящику агента-distributor-а.
    auto distributor = coop->make_agent< a_distributor_t >(
        data_mbox,
        data_source_count,
        processor_count,
        iterations_count );

    // Агенты-владельцы источников данных должны вполнять свои
    // стартовые действия на отдельном пуле рабочих потоков.
    // Для чего создается частный диспетчер для этих агентов.
    // Количество рабочих потоков будет определено автоматически
    // (либо по количеству доступных ядер, либо же два потока, если
    // работа идет на одноядерной машине).
    auto data_holder_disp = so_5::disp::thread_pool::create_private_disp();

    // Создаем столько владельцев источников данных, сколько
    // нужно для теста.
    for( std::size_t i = 0; i != data_holder_count; ++i )
      coop->make_agent_with_binder< a_data_source_owner_t >(
          // Каждый агент привязывается к частному диспетчеру
          // с пулом рабочих потоков.
          // При этом нужно указать, что каждый агент использует
          // свою FIFO очередь и не зависит от других агентов.
          // Это позволит диспетчеру распределять агентов, которые
          // входят в одну кооперацию, по всем своим рабочим нитям.
          // Если бы агенты использовали общую FIFO очередь кооперации,
          // то они бы все работали последовательно и только на одной
          // рабочей нити из пула.
          data_holder_disp->binder(
            so_5::disp::thread_pool::params_t{}.fifo(
                so_5::disp::thread_pool::fifo_t::individual ) ),
          // В качестве почтового ящика, на который нужно отсылать
          // сообщения об источниках данных, используется персональный
          // почтовый ящик агента-distributor-а.
          distributor->so_direct_mbox(),
          // Базовая части уникального имени источников данных
          // для этого агента.
          std::string( "data_holder_" ) + std::to_string( i ) );

    // Создаем столько агентов-обработчиков, сколько нужно для теста.
    for( std::size_t i = 0; i != processor_count; ++i )
      coop->make_agent_with_binder< a_data_processor_t >(
          // Каждый агент привязывается к своему частному
          // диспетчеру с одной рабочей нитью.
          // Т.е. каждый агент-обработчик будет иметь в качестве
          // рабочего контекста свою собственную рабочую нить.
          so_5::disp::one_thread::create_private_disp()->binder(),
          // Почтовый ящик из которого будут приходить сообщения
          // об очередном текущем значении очередного источника данных.
          data_mbox,
          // В качестве почтового ящика, на который нужно отсылать
          // сигналы о завершении итерации, используется персональный
          // почтовый ящик агента-distributor-а.
          distributor->so_direct_mbox(),
          data_source_count );

    // Все агенты созданы. Осталось только зарегистрировать кооперацию.
    env.register_coop( std::move( coop ) );
  }

int
main()
  {
    try
      {
        //
        // Запуск SObjectizer и теста внутри него.
        // Функция launch сама создает экземпляр SObjectizer,
        // настроит его и вызовет стартовую функцию init, передав
        // в нее ссылку на этот экземпляр.
        //
        // Возврат из launch произойдет после того, как завершит
        // свою работу созданная внутри init() кооперация.
        //
        // Информирование об ошибках происходит посредством исключений.
        //
        so_5::launch( &init );

        return 0;
      }
    catchconst std::exception & x )
      {
        std::cerr << "Exception: " << x.what() << std::endl;
      }

    return 2;
  }

Комментариев нет:

Отправить комментарий