четверг, 12 мая 2016 г.

[prog.c++14] Как же выглядят реальные агенты в SObjectizer?

Если не ошибаюсь, уже несколько раз говорил о том, что C++ Actor Framework (он же CAF) производит довольно неоднозначное впечатление. С одной стороны -- это очень удачная попытка изобразить Erlang-овые процессы в виде C++ных акторов. Но, с другой стороны, никогда толком не понимал, как же в CAF-е будет выглядеть что-либо более-менее приближенное к реальности. Т.е. не тестовая программка из категории ping-pong или fixed-stack. А настоящий, живой актор, выполняющий какую-то прикладную логику.

Ведь один из уроков, который мы выучили, уже довольно долго работая с агентами в C++, -- это то, что агенты со временем пухнут в размерах. Тупо код одного агента может занимать не одну сотню строк (а в редких случаях и не одну тысячу строк). Как все это уложить в CAF-овские акторы с кучей лямбда-функций... Это вопрос.

А для того, чтобы было более-менее понятно о чем идет речь, под катом размещен код реального агента, реализованного на SObjectizer. Повторюсь, код реальный, не из игрушечного примерчика. Но это, можно сказать, только самое начало жизни агента. Первая работающая реализация. Которая еще будет дорабатываться.

Полностью описывать что, как, зачем и почему делает этот агент не буду, т.к. пока нет планов открывать проект, в рамках которого он разрабатывался. Поэтому ограничусь самыми общими пояснениями.

Задача этого агента в том, чтобы периодически сканировать некоторый каталог. В этот каталог внешние программулины скидывают результаты своей работы в виде файлов с json-ом внутри. Каждый такой файл нужно поднять, заглянуть вовнутрь, определить, кому следует доставить содержимое и отправить в нужный MQ-шный топик опять же в виде json-а. Если через какое-то время из ответного топика не будет получено подтверждение, то файл должен быть отправлен повторно. Если же подтверждение будет получено, то файл удаляется за ненадобностью. Так же регулярно должны удаляться слишком старые файлы (которые по каким-то причинам не были отправлены).

Агент реализован в виде шаблонного класса (в этом одно из принципиальных отличий SO-5 от предыдущих версий SObjectizer -- в SO-5 агенты могут быть шаблонами). Нужно это потому, что приходится сканировать несколько каталогов, в каждом из которых лежат файлы с разным json-ом. И эти разные json-ы в программе отображаются на объекты разных типов. Плюс к тому в разных каталогах нужно использовать разные политики выборки файлов: в каких-то случаях в первую очередь должны отсылаться самые свежие файлы, в каких-то -- самые старые. Поэтому-то агент a_postman_t параметризуется типом POSTMAN, который отвечает за эти особенности поведения агента-почтальона. И для каждого каталога в программе будет создан свой агент, со своим собственным типом POSTMAN-а.

Привожу только код самого агента, без вспомогательных функций, которые этим агентом используются. Изначально планировалось, что агент будет еще объемнее. Но GCC-ный компилятор, которым мы пока пользуемся, стал падать с segfault-ом на некоторых шаблонных конструкциях, поэтому часть кода из агента была вынесена в свободные функции.

Надеюсь, данный пример создаст представление о том, как выглядят SObjectizer-овские агенты в реальных приложениях. Т.к. примеры и бенчмарки -- это одно. А вот разработка кода для продакшена -- совсем другое. Заодно можно посмотреть, насколько страшно выглядит современный C++14 (хотя я плохо знаю C++, так что это может быть и не самым хорошим примером).

Сразу ЗЫ: если у кого-то возникнет вопрос "а почему это нужно было делать на C++ и SObjectizer, если тоже самое можно было бы сделать на Python, Ruby или Go?" Ответа два. Первый и главный: потому что! :) Второй, политкорректный: на откровенно слабых устройствах планируется запускать комплекс из нескольких процессов, поэтому чем менее ресурсоемким будет каждый из них, тем лучше. Так что выбор, по сути, был между Go и C++. Ну а здесь выбор очевиден ;)

Итак, сам код.

templatetypename POSTMAN >
class a_postman_t : public so_5::agent_t
{
   using out_t = typename POSTMAN::out_t;
   using ack_t = typename POSTMAN::ack_t;
   using key_t = typename POSTMAN::key_t;

   //! Тип отложенного сообщения о необходимости повторной отсылки
   //! файла на сервер.
   struct msg_resend_file : public so_5::message_t
   {
      const key_t m_key;

      msg_resend_file( key_t k ) : m_key( std::move(k) ) {}
   };

public :
   //! Инициализирующий конструктор.
   a_postman_t(
      context_t ctx,
      POSTMAN postman,
      mosqt::instance_t transport )
      :  so_5::agent_t( ctx )
      ,  m_postman( std::move(postman) )
      ,  m_transport( std::move(transport) )
      {}

   virtual void
   so_define_agent() override
   {
      msg_ns::topic_subscriber::subscribe(
            m_transport,
            m_postman.ack_topic_name(),
            [this]( const so_5::mbox_t & mbox )
            {
               so_subscribe( mbox ).event( &a_postman_t::evt_ack );
            } );

      so_subscribe_self()
         .template event< details::msg_next_time_to_rescan >(
               &a_postman_t::evt_next_time_to_rescan )
         .template event< details::msg_rescan_now >(
               &a_postman_t::evt_rescan_now )
         .event( &a_postman_t::evt_resend_file );

   }

   virtual void
   so_evt_start() override
   {
      logger().info( "postman started" );

      initiate_rescan_now();

      m_periodic_rescan_timer =
            so_5::send_periodic< details::msg_next_time_to_rescan >(
                  *this,
                  m_postman.dir_rescan_period(),
                  m_postman.dir_rescan_period() );
   }

private :
   //! Тип контейнера для хранения описаний отосланных, но пока еще
   //! не подтвержденых файлов.
   using inflight_message_map_t = std::map< key_t, details::sent_file_info_t >;

   //! Реализация действий почтальона, специфическая для конкретного
   //! типа сообщений.
   POSTMAN m_postman;

   //! MQTT-шный транспорт, через который будет происходить обмен
   //! сообщениями с сервером.
   mosqt::instance_t m_transport;

   //! Таймер для периодического сигнала о необходимости перечитать
   //! каталог с исходящими сообщениями.
   so_5::timer_id_t m_periodic_rescan_timer;

   //! Время, когда в последний раз было пересканирование каталога
   //! с исходящими сообщениями.
   details::clock_type::time_point m_last_rescan_time;

   //! Список известных агенту имен файлов с исходящими сообщениями.
   details::file_name_only_container_t m_known_files;

   //! Сообщения, которые сейчас находятся в стадии доставки.
   inflight_message_map_t m_inflight_messages;

   //! Реакция на получение ответа от сервера.
   void
   evt_ack( const msg_ns::topic_subscriber::msg_type & cmd )
   {
      auto ack = try_extract_ack( cmd );
      if( ack )
         handle_ack( *ack );
   }

   void
   evt_next_time_to_rescan()
   {
      const auto now = details::clock_type::now();
      if( m_last_rescan_time + m_postman.dir_rescan_period() < now )
      {
         do_directory_rescan();
         send_some_messages_if_possible();
      }
   }

   void
   evt_rescan_now()
   {
      do_directory_rescan();
      send_some_messages_if_possible();
   }

   void
   evt_resend_file( const msg_resend_file & cmd )
   {
      logger().info( "time to resend msg: {}", cmd.m_key );

      auto it = m_inflight_messages.find( cmd.m_key );
      if( it != m_inflight_messages.end() )
      {
         auto erase_it_and_send_next = [&] {
            m_inflight_messages.erase( it );
            try_select_and_send_next_message();
         };

         it->second.m_attempts += 1;
         if( it->second.m_attempts > m_postman.resend_attempts() )
         {
            logger().notice( "no more delivery attempts for msg: {}, "
                  "file: {}", cmd.m_key, it->second.m_path );

            erase_it_and_send_next();
         }
         else
         {
            logger().info( "resending msg: {}, attempt: {}, file: {}",
                  cmd.m_key, it->second.m_attempts, it->second.m_path );

            details::try_load_and_process_message< out_t >(
                  it->second.m_path,
                  [&]( const auto & msg ) {
                     msg_ns::topic_publisher::publish(
                           m_transport,
                           m_postman.out_topic_name( msg ),
                           msg );
                  },
                  [&]( const auto & x ) {
                     this->logger().error(
                           "unable to resend msg: {}, error: {}",
                           cmd.m_key, x.what() );
                     erase_it_and_send_next();
                  } );
         }
      }
      else
         logger().notice( "no info about inflight msg: {}", cmd.m_key );
   }

   //! Доступ к логгеру агента.
   spdlog::logger &
   logger() const { return m_postman.logger(); }

   void
   initiate_rescan_now() const
   {
      so_5::send< details::msg_rescan_now >( *this );
   }

   boost::optional< ack_t >
   try_extract_ack( const msg_ns::topic_subscriber::msg_type & cmd )
   {
      try
      {
         return cmd.decode< ack_t >();
      }
      catchconst std::exception & x )
      {
         logger().error( "unable to parse ack message: {}", x.what() );
      }

      return boost::optional< ack_t >();
   }

   void
   handle_ack( const ack_t & ack )
   {
      const auto key = m_postman.extract_key( ack );
      logger().info( "ack received, key: {}", key );

      auto it = m_inflight_messages.find( key );
      if( it != m_inflight_messages.end() )
      {
         auto file = it->second.m_path;
         m_inflight_messages.erase( it );

         utilize_delivered_message( file );
         try_select_and_send_next_message();
      }
      else
         logger().notice( "ack for unknown msg: {}", key );
   }

   void
   do_directory_rescan()
   {
      logger().debug( "directory rescan started" );

      // Берем полное оглавление...
      auto full_list = details::get_full_directory_content( m_postman );
      // ...затем убираем из него те файлы, которые доставляются сейчас...
      remove_inflight_files( full_list );
      // ...затем те, что уже слишком устарели...
      utilize_too_old_files( full_list );
      // ...а оставшиеся упорядочиваем так, как это требуется
      // согласно логики доставки.
      m_postman.sort_files( full_list );

      // Оставляем только имена файлов, все остальное нам не нужно.
      auto known_files = transform_full_file_list( std::move(full_list) );

      m_last_rescan_time = details::clock_type::now();

      m_known_files.swap( known_files );

      logger().debug( "directory rescan finished, known_files: {}",
            m_known_files.size() );
   }

   void
   send_some_messages_if_possible()
   {
      while( !m_known_files.empty() &&
            m_inflight_messages.size() < m_postman.inflight_messages() )
      {
         select_and_send_next_message();
      }
   }

   void
   remove_inflight_files(
      details::full_file_info_container_t & what )
   {
      if( !m_inflight_messages.empty() )
      {
         // Сначала соберем имена всех файлов, которые
         // находятся в процессе доставки.
         std::vector< fs::path > inflight_files;
         inflight_files.reserve( m_inflight_messages.size() );
         transform(
               begin(m_inflight_messages), 
               end(m_inflight_messages),
               back_inserter(inflight_files),
               []( const auto & map_item ) {
                  return map_item.second.m_path;
               } );

         // Затем отсортируем их для того, чтобы затем можно
         // было использовать двоичный поиск.
         sort( begin(inflight_files), end(inflight_files) );
         // Теперь можно вычеркнуть те имена, которые используются
         // в данный момент для доставки на сервер.
         what.erase(
               std::remove_if( begin(what), end(what),
                  [&inflight_files]( const auto & info ) {
                     return std::binary_search(
                           begin(inflight_files),
                           end(inflight_files),
                           info.m_path );
                  } ),
               end(what) );
      }
   }

   void
   utilize_too_old_files(
      details::full_file_info_container_t & full_list )
   {
      using namespace std::chrono;

      auto now = std::time(nullptr);
      // Сначала найдем все файлы, чье время жизни истекло.
      auto it = remove_if( begin(full_list), end(full_list),
            [this, now]( const auto & info ) {
               return (info.m_mtime + duration_cast< seconds >(
                     m_postman.message_file_lifetime() ).count()) < now;
            } );
      // Есть ли вообще что-то для удаления?
      auto count = distance( it, end(full_list) );
      if( count )
      {
         std::vector< fs::path > old_files;
         old_files.reserve( count );
         transform( it, end(full_list), back_inserter(old_files),
               []( const auto & info ) { return info.m_path; } );

         // Теперь имена слишком старых файлов из исходного списка
         // можно удалить. Они там больше не нужны.
         full_list.erase( it, end(full_list) );

         // Осталось удалить слишком старые файлы.
         remove_too_old_files( old_files );
      }
   }

   void
   utilize_delivered_message(
      const fs::path & file )
   {
      logger().info( "remove file with delivered msg, file: {}", file );
      try
      {
         fs::remove( file );
      }
      catchconst std::exception & x )
      {
         logger().error( "unable to remove file: {}, error: {}",
               file, x.what() );
      }
   }

   void
   remove_too_old_files(
      const std::vector< fs::path > & files )
   {
      for_each( begin(files), end(files), [this]( const auto & f ) {
         try
         {
            this->logger().info( "remove too old file: {}", f );
            fs::remove( f );
         }
         catchconst std::exception & x )
         {
            this->logger().error( "unable to remove file: {}, error: {}",
                  f, x.what() );
         }
      } );
   }

   void
   try_select_and_send_next_message()
   {
      if( m_known_files.empty() )
         initiate_rescan_now();
      else
         select_and_send_next_message();
   }

   void
   select_and_send_next_message()
   {
      cpp_util_3::ensure< std::runtime_error >( !m_known_files.empty(),
            []{ return "m_known_files must have some items!"; } );

      auto f = m_known_files.front();
      m_known_files.pop_front();

      load_and_send_message_from_file( f );
   }

   void
   load_and_send_message_from_file( const fs::path & file )
   {
      details::try_load_and_process_message< out_t >( file,
         [&]( const out_t & msg ) {
            // Основные действия вынесены в отдельный метод,
            // дабы у компилятора не сносило крышу и он не падал
            // из-за internal compiler error.
            send_message_first_time( file, msg );
         },
         [&]( const std::exception & x ) {
            logger().error( "unable to load and sent message from file: {}, "
                  "error: {}", file, x.what() );
         } );
   }

   void
   send_message_first_time(
      const fs::path & file,
      const out_t & msg )
   {
      const auto key = m_postman.extract_key( msg );

      // Сразу же отсылаем отложенное сообщение для перепосылки
      // этого файла, если от сервера не придет подтверждение.
      auto resend_timer = so_5::send_periodic< msg_resend_file >(
            *this,
            m_postman.resend_period(),
            m_postman.resend_period(),
            key );

      // Сохраняем информацию о том, что файл ушел на сервер.
      m_inflight_messages.emplace( key,
            details::sent_file_info_t{ file, std::move(resend_timer) } );

      logger().info( "sending msg: {}, from file: {}", key, file );

      // Осталось опубликовать сообщение.
      msg_ns::topic_publisher::publish(
            m_transport,
            m_postman.out_topic_name( msg ),
            msg );
   }
};

Комментариев нет: