суббота, 3 сентября 2016 г.

[prog.c++] Нововведения в SObjectizer-5.5.18: сообщения distribution_started и distribution_finished

Возможность получения мониторинговой информации о том, что происходит внутри SObjectizer Environment была добавлена довольно давно, еще в версии 5.5.4. Работает этот механизм на потактовой основе: пользователь может задать темп, с которым он хочет получать мониторинговую информацию. Например, раз в 250ms. И SObjectizer каждые 250ms будет отсылать на специальный mbox целую пачку сообщений с текущими значениями.

Со временем у этого механизма выявился небольшой недостаток: для ряда сценариев обработки мониторинговой информации было очень желательно знать, что очередной такт раздачи данных начался или закончился. В версии 5.5.18 этот недостаток был устранен. В пространство имен so_5::stats::messages добавлены новые сообщения distribution_started и distribution_finished. Сообщение distribution_started отсылается в начале каждого такта раздачи очередной порции мониторинговой информации. Фактически, это сообщение первое, которое отсылается stats_controller-ом в начале такта. Затем уже следуют сообщения от всех источников данных внутри SObjectizer Environment. А закрывает такт сообщение distribution_finished. Это последнее сообщение, после которого уже не будет никаких сообщений, относящихся к только что закончившемуся такту.

Два эти сообщения упрощают обработку сценариев, для которых требуется иметь информацию о моментах начала/завершения тактов. Например, для обновления информации на графиках или для фиксации транзакций с очередной порцией свежих данных в БД.

Под катом пример агента, который инициирует раздачу мониторинговой информации три раза в секунду, после чего обрабатывает только сообщения с данными о длине очередей заявок на рабочих нитях запущенных в SObjectizer Environment диспетчеров (для фильтрации нужных сообщений используется механизм delivery filters).

// Agent for receiving run-time monitoring information.
class a_stats_listener_t : public so_5::agent_t
{
public :
   a_stats_listener_t(
      // Environment to work in.
      context_t ctx,
      // Address of logger.
      so_5::mbox_t logger )
      :  so_5::agent_t( ctx )
      ,  m_logger( std::move( logger ) )
   {}

   virtual void so_define_agent() override
   {
      using namespace so_5::stats;

      auto & controller = so_environment().stats_controller();

      // Set up a filter for messages with run-time monitoring information.
      so_set_delivery_filter(
         // Message box to which delivery filter must be set.
         controller.mbox(),
         // Delivery predicate.
         []( const messages::quantity< std::size_t > & msg ) {
            // Process only messages related to dispatcher's queue sizes.
            return suffixes::work_thread_queue_size() == msg.m_suffix;
         } );

      // We must receive messages from run-time monitor.
      so_default_state()
         .event(
            // This is mbox to that run-time statistic will be sent.
            controller.mbox(),
            &a_stats_listener_t::evt_quantity )
         .event( controller.mbox(),
            [this]( const messages::distribution_started & ) {
               so_5::send< log_message >( m_logger, "--- DISTRIBUTION STARTED ---" );
            } )
         .event( controller.mbox(),
            [this]( const messages::distribution_finished & ) {
               so_5::send< log_message >( m_logger, "--- DISTRIBUTION FINISHED ---" );
            } );
   }

   virtual void so_evt_start() override
   {
      // Change the speed of run-time monitor updates.
      so_environment().stats_controller().set_distribution_period(
            std::chrono::milliseconds( 330 ) );
      // Turn the run-timer monitoring on.
      so_environment().stats_controller().turn_on();
   }

private :
   const so_5::mbox_t m_logger;

   void evt_quantity(
      const so_5::stats::messages::quantity< std::size_t > & evt )
   {
      std::ostringstream ss;

      ss << "stats: '" << evt.m_prefix << evt.m_suffix << "': " << evt.m_value;

      so_5::send< log_message >( m_logger, ss.str() );
   }
};
Отправить комментарий