вторник, 31 марта 2015 г.

[prog.c++] Шаблоны против копипасты :)

Был у меня класс. Из которого путем копипасты был сделан другой класс. В течении долгого времени избавиться от копипасты не удавалось, т.к. отличия в реализациях были мелкими, а времени придумать, как эти различия вынести куда-то, не хватало. Но пришла необходимость расширить оба класса практически одинаковой функциональностью.

Сначала был модифицирован первый класс, потом, по образу и подобию, второй. Объем дублируемого кода стал ну совсем уж неприличным. Поэтому пришлось сесть и придумать, как с этим быть. В итоге, вот из этого:

namespace so_5
{

namespace disp
{

namespace thread_pool
{

namespace impl
{

//
// dispatcher_t
//
/*!
 * \since v.5.4.0
 * \brief An implementation of thread pool dispatcher.
 */
class dispatcher_t
  : public so_5::rt::dispatcher_t
  , public tp_stats::stats_supplier_t
  {
  private :
    //! Data for one cooperation.
    struct cooperation_data_t
      {
        //! Event queue for the cooperation.
        agent_queue_shptr_t m_queue;

        //! Count of agents form that cooperation.
        /*!
         * When this counter is zero then cooperation data
         * must be destroyed.
         */
        std::size_t m_agents;

        /*!
         * \since v.5.5.4
         * \brief Description of that queue for run-time monitoring.
         */
        tp_stats::queue_description_holder_ref_t m_queue_desc;

        cooperation_data_t(
          agent_queue_shptr_t queue,
          std::size_t agents,
          const stats::prefix_t & data_source_name_prefix,
          const std::string & coop_name )
          : m_queue( std::move( queue ) )
          , m_agents( agents )
          , m_queue_desc(
              tp_stats::make_queue_desc_holder(
                  data_source_name_prefix,
                  coop_name,
                  agents ) )
          {}

        /*!
         * \since v.5.5.4
         * \brief Update queue information for run-time monitoring.
         */
        void
        update_queue_stats()
          {
            m_queue_desc->m_desc.m_agent_count = m_agents;
            m_queue_desc->m_desc.m_queue_size = m_queue->size();
          }
      };

    //! Map from cooperation name to the cooperation data.
    typedef std::map< std::string, cooperation_data_t >
        cooperation_map_t;

    //! Data for one agent.
    struct agent_data_t
      {
        //! Event queue for the agent.
        /*!
         * It could be individual queue or queue for the whole
         * cooperation (to which agent is belonging).
         */
        agent_queue_shptr_t m_queue;

        /*!
         * \since v.5.5.4
         * \brief Description of that queue for run-time monitoring.
         *
         * \note This description is created only if agent
         * uses individual FIFO.
         */
        tp_stats::queue_description_holder_ref_t m_queue_desc;

        //! Constructor for the case when agent uses cooperation FIFO.
        agent_data_t(
          agent_queue_shptr_t queue )
          : m_queue( std::move( queue ) )
          {}

        //! Constructor for the case when agent uses individual FIFO.
        /*!
         * In this case a queue_description object must be created.
         */
        agent_data_t(
          agent_queue_shptr_t queue,
          const stats::prefix_t & data_source_name_prefix,
          const so_5::rt::agent_t * agent_ptr )
          : m_queue( std::move( queue ) )
          , m_queue_desc(
              tp_stats::make_queue_desc_holder(
                  data_source_name_prefix,
                  agent_ptr ) )
          {}

        /*!
         * \since v.5.5.4
         * \brief Does agent use cooperation FIFO?
         */
        bool
        cooperation_fifo() const
          {
            return !m_queue_desc;
          }

        /*!
         * \since v.5.5.4
         * \brief Update queue description with current information.
         *
         * \attention Must be called only if !cooperation_fifo().
         */
        void
        update_queue_stats()
          {
            m_queue_desc->m_desc.m_agent_count = 1;
            m_queue_desc->m_desc.m_queue_size = m_queue->size();
          }
      };

    //! Map from agent pointer to the agent data.
    typedef std::map< so_5::rt::agent_t *, agent_data_t >
        agent_map_t;

  public :
    dispatcher_t( const dispatcher_t & ) = delete;
    dispatcher_t & operator=( const dispatcher_t & ) = delete;

    //! Constructor.
    dispatcher_t(
      std::size_t thread_count )
      : m_thread_count( thread_count )
      , m_data_source( stats_supplier() )
      {
        m_threads.reserve( thread_count );

        for( std::size_t i = 0; i != m_thread_count; ++i )
          m_threads.emplace_back( std::unique_ptr< work_thread_t >(
                new work_thread_t( m_queue ) ) );
      }

    virtual void
    start( so_5::rt::environment_t & env ) override
      {
        m_data_source.start( env.stats_repository() );

        forauto & t : m_threads )
          t->start();
      }

    virtual void
    shutdown() override
      {
        m_queue.shutdown();
      }

    virtual void
    wait() override
      {
        forauto & t : m_threads )
          t->join();

        m_data_source.stop();
      }

    virtual void
    set_data_sources_name_base(
      const std::string & name_base ) override
      {
        m_data_source.set_data_sources_name_base(
            "tp"// thread-pool
            name_base,
            this );
      }

    //! Bind agent to the dispatcher.
    so_5::rt::event_queue_t *
    bind_agent(
      so_5::rt::agent_ref_t agent,
      const params_t & params )
      {
        std::lock_guard< spinlock_t > lock( m_lock );

        if( fifo_t::individual == params.query_fifo() )
          return bind_agent_with_inidividual_fifo(
              std::move( agent ), params );

        return bind_agent_with_cooperation_fifo(
            std::move( agent ), params );
      }

    //! Unbind agent from the dispatcher.
    void
    unbind_agent(
      so_5::rt::agent_ref_t agent )
      {
        std::lock_guard< spinlock_t > lock( m_lock );

        auto it = m_agents.find( agent.get() );
        if( it != m_agents.end() )
          {
            if( it->second.cooperation_fifo() )
              {
                auto it_coop = m_cooperations.find(
                    agent->so_coop_name() );
                if( it_coop != m_cooperations.end() &&
                    0 == --(it_coop->second.m_agents) )
                  {
                    // agent_queue object can be destroyed
                    // only when it is empty.
                    it_coop->second.m_queue->wait_for_emptyness();

                    m_cooperations.erase( it_coop );
                  }
              }
            else
              // agent_queue object can be destroyed
              // only when it is empty.
              it->second.m_queue->wait_for_emptyness();

            m_agents.erase( it );
          }
      }

  private :
    //! Queue for active agent's queues.
    dispatcher_queue_t m_queue;

    //! Count of working threads.
    const std::size_t m_thread_count;

    //! Pool of work threads.
    std::vector< std::unique_ptr< work_thread_t > > m_threads;

    //! Object's lock.
    spinlock_t m_lock;

    //! Information about cooperations.
    /*!
     * Information to this map is added only if an agent is
     * using cooperation FIFO mechanism.
     */
    cooperation_map_t m_cooperations;

    //! Information of agents.
    agent_map_t m_agents;

    /*!
     * \since v.5.5.4
     * \brief Data source for the run-time monitoring.
     */
    tp_stats::data_source_t m_data_source;

    //! Creation event queue for an agent with individual FIFO.
    so_5::rt::event_queue_t *
    bind_agent_with_inidividual_fifo(
      so_5::rt::agent_ref_t agent,
      const params_t & params )
      {
        agent_queue_shptr_t queue = make_new_agent_queue( params );

        m_agents.emplace(
            agent.get(),
            agent_data_t{
                queue,
                m_data_source.prefix(),
                agent.get() } );

        return queue.get();
      }

    //! Creation event queue for an agent with individual FIFO.
    /*!
     * If the data for the agent's cooperation is not created yet
     * it will be created.
     */
    so_5::rt::event_queue_t *
    bind_agent_with_cooperation_fifo(
      so_5::rt::agent_ref_t agent,
      const params_t & params )
      {
        auto it = m_cooperations.find( agent->so_coop_name() );
        if( it == m_cooperations.end() )
          it = m_cooperations.emplace(
              agent->so_coop_name(),
              cooperation_data_t(
                  make_new_agent_queue( params ),
                  1,
                  m_data_source.prefix(),
                  agent->so_coop_name() ) )
              .first;
        else
          it->second.m_agents += 1;

        try
          {
            m_agents.emplace( agent.get(),
                agent_data_t( it->second.m_queue ) );
          }
        catch( ... )
          {
            // Rollback m_cooperations modification.
            if0 == --(it->second.m_agents) )
              m_cooperations.erase( it );
            throw;
          }

        return it->second.m_queue.get();

      }

    //! Helper method for creating event queue for agents/cooperations.
    agent_queue_shptr_t
    make_new_agent_queue(
      const params_t & params )
      {
        return agent_queue_shptr_t(
            new agent_queue_t(
                m_queue,
                params.query_max_demands_at_once() ) );
      }

    /*!
     * \since v.5.5.4
     * \brief Helper method for casting to stats_supplier-object.
     */
    tp_stats::stats_supplier_t &
    stats_supplier()
      {
        return *this;
      }

    /*!
     * \since v.5.5.4
     * \brief Implementation of stats_supplier-related stuff.
     */
    virtual void
    supply( tp_stats::stats_consumer_t & consumer ) override
      {
        // Statics must be collected on locked object.
        std::lock_guard< spinlock_t > lock( m_lock );

        consumer.set_thread_count( m_threads.size() );

        forauto & q : m_cooperations )
          {
            auto & s = q.second;
            s.update_queue_stats();
            consumer.add_queue( s.m_queue_desc );
          }

        forauto & a : m_agents )
          {
            auto & s = a.second;
            if( !s.cooperation_fifo() )
              {
                s.update_queue_stats();
                consumer.add_queue( s.m_queue_desc );
              }
          }
      }
  };

/* namespace impl */

/* namespace thread_pool */

/* namespace disp */

/* namespace so_5 */
namespace so_5
{

namespace disp
{

namespace adv_thread_pool
{

namespace impl
{

//
// dispatcher_t
//
/*!
 * \since v.5.4.0
 * \brief An implementation of thread pool dispatcher.
 */
class dispatcher_t
  : public so_5::rt::dispatcher_t
  , public tp_stats::stats_supplier_t
  {
  private :
    //! Data for one cooperation.
    struct cooperation_data_t
      {
        //! Event queue for the cooperation.
        agent_queue_ref_t m_queue;

        //! Count of agents form that cooperation.
        /*!
         * When this counter is zero then cooperation data
         * must be destroyed.
         */
        std::size_t m_agents;

        /*!
         * \since v.5.5.4
         * \brief Description of that queue for run-time monitoring.
         */
        tp_stats::queue_description_holder_ref_t m_queue_desc;

        cooperation_data_t(
          agent_queue_ref_t queue,
          std::size_t agents,
          const stats::prefix_t & data_source_name_prefix,
          const std::string & coop_name )
          : m_queue( std::move( queue ) )
          , m_agents( agents )
          , m_queue_desc(
              tp_stats::make_queue_desc_holder(
                  data_source_name_prefix,
                  coop_name,
                  agents ) )
          {}

        /*!
         * \since v.5.5.4
         * \brief Update queue information for run-time monitoring.
         */
        void
        update_queue_stats()
          {
            m_queue_desc->m_desc.m_agent_count = m_agents;
            m_queue_desc->m_desc.m_queue_size = m_queue->size();
          }
      };

    //! Map from cooperation name to the cooperation data.
    typedef std::map< std::string, cooperation_data_t >
        cooperation_map_t;

    //! Data for one agent.
    struct agent_data_t
      {
        //! Event queue for the agent.
        /*!
         * It could be individual queue or queue for the whole
         * cooperation (to which agent is belonging).
         */
        agent_queue_ref_t m_queue;

        /*!
         * \since v.5.5.4
         * \brief Description of that queue for run-time monitoring.
         *
         * \note This description is created only if agent
         * uses individual FIFO.
         */
        tp_stats::queue_description_holder_ref_t m_queue_desc;

        //! Constructor for the case when agent uses cooperation FIFO.
        agent_data_t(
          agent_queue_ref_t queue )
          : m_queue( std::move( queue ) )
          {}

        //! Constructor for the case when agent uses individual FIFO.
        /*!
         * In this case a queue_description object must be created.
         */
        agent_data_t(
          agent_queue_ref_t queue,
          const stats::prefix_t & data_source_name_prefix,
          const so_5::rt::agent_t * agent_ptr )
          : m_queue( std::move( queue ) )
          , m_queue_desc(
              tp_stats::make_queue_desc_holder(
                  data_source_name_prefix,
                  agent_ptr ) )
          {}

        /*!
         * \since v.5.5.4
         * \brief Does agent use cooperation FIFO?
         */
        bool
        cooperation_fifo() const
          {
            return !m_queue_desc;
          }

        /*!
         * \since v.5.5.4
         * \brief Update queue description with current information.
         *
         * \attention Must be called only if !cooperation_fifo().
         */
        void
        update_queue_stats()
          {
            m_queue_desc->m_desc.m_agent_count = 1;
            m_queue_desc->m_desc.m_queue_size = m_queue->size();
          }
      };

    //! Map from agent pointer to the agent data.
    typedef std::map< so_5::rt::agent_t *, agent_data_t >
        agent_map_t;

  public :
    dispatcher_t( const dispatcher_t & ) = delete;
    dispatcher_t & operator=( const dispatcher_t & ) = delete;

    //! Constructor.
    dispatcher_t(
      std::size_t thread_count )
      : m_thread_count( thread_count )
      , m_data_source( stats_supplier() )
      {
        m_threads.reserve( thread_count );

        for( std::size_t i = 0; i != m_thread_count; ++i )
          m_threads.emplace_back( std::unique_ptr< work_thread_t >(
                new work_thread_t( m_queue ) ) );
      }

    virtual void
    start( so_5::rt::environment_t & env ) override
      {
        m_data_source.start( env.stats_repository() );

        forauto & t : m_threads )
          t->start();
      }

    virtual void
    shutdown() override
      {
        m_queue.shutdown();
      }

    virtual void
    wait() override
      {
        forauto & t : m_threads )
          t->join();

        m_data_source.stop();
      }

    virtual void
    set_data_sources_name_base(
      const std::string & name_base ) override
      {
        m_data_source.set_data_sources_name_base(
            "atp"// adv-thread-pool
            name_base,
            this );
      }

    //! Bind agent to the dispatcher.
    so_5::rt::event_queue_t *
    bind_agent(
      so_5::rt::agent_ref_t agent,
      const params_t & params )
      {
        std::lock_guard< spinlock_t > lock( m_lock );

        if( fifo_t::individual == params.query_fifo() )
          return bind_agent_with_inidividual_fifo(
              std::move( agent ), params );

        return bind_agent_with_cooperation_fifo(
            std::move( agent ), params );
      }

    //! Unbind agent from the dispatcher.
    void
    unbind_agent(
      so_5::rt::agent_ref_t agent )
      {
        std::lock_guard< spinlock_t > lock( m_lock );

        auto it = m_agents.find( agent.get() );
        if( it != m_agents.end() )
          {
            if( it->second.cooperation_fifo() )
              {
                auto it_coop = m_cooperations.find(
                    agent->so_coop_name() );
                if( it_coop != m_cooperations.end() &&
                    0 == --(it_coop->second.m_agents) )
                  {
                    m_cooperations.erase( it_coop );
                  }
              }

            m_agents.erase( it );
          }
      }

  private :
    //! Queue for active agent's queues.
    dispatcher_queue_t m_queue;

    //! Count of working threads.
    const std::size_t m_thread_count;

    //! Pool of work threads.
    std::vector< std::unique_ptr< work_thread_t > > m_threads;

    //! Object's lock.
    spinlock_t m_lock;

    //! Information about cooperations.
    /*!
     * Information to this map is added only if an agent is
     * using cooperation FIFO mechanism.
     */
    cooperation_map_t m_cooperations;

    //! Information of agents.
    agent_map_t m_agents;

    /*!
     * \since v.5.5.4
     * \brief Data source for the run-time monitoring.
     */
    tp_stats::data_source_t m_data_source;

    //! Creation event queue for an agent with individual FIFO.
    so_5::rt::event_queue_t *
    bind_agent_with_inidividual_fifo(
      so_5::rt::agent_ref_t agent,
      const params_t & /*params*/ )
      {
        agent_queue_ref_t queue = make_new_agent_queue();

        m_agents.emplace(
            agent.get(),
            agent_data_t{
                queue,
                m_data_source.prefix(),
                agent.get() } );

        return queue.get();
      }

    //! Creation event queue for an agent with individual FIFO.
    /*!
     * If the data for the agent's cooperation is not created yet
     * it will be created.
     */
    so_5::rt::event_queue_t *
    bind_agent_with_cooperation_fifo(
      so_5::rt::agent_ref_t agent,
      const params_t & /*params*/ )
      {
        auto it = m_cooperations.find( agent->so_coop_name() );
        if( it == m_cooperations.end() )
          it = m_cooperations.emplace(
                agent->so_coop_name(),
                cooperation_data_t(
                    make_new_agent_queue(),
                    1,
                    m_data_source.prefix(),
                    agent->so_coop_name() ) )
              .first;
        else
          it->second.m_agents += 1;

        try
          {
            m_agents.emplace( agent.get(),
                agent_data_t( it->second.m_queue ) );
          }
        catch( ... )
          {
            // Rollback m_cooperations modification.
            if0 == --(it->second.m_agents) )
              m_cooperations.erase( it );
            throw;
          }

        return it->second.m_queue.get();

      }

    //! Helper method for creating event queue for agents/cooperations.
    agent_queue_ref_t
    make_new_agent_queue()
      {
        return agent_queue_ref_t( new agent_queue_t( m_queue ) );
      }

    /*!
     * \since v.5.5.4
     * \brief Helper method for casting to stats_supplier-object.
     */
    tp_stats::stats_supplier_t &
    stats_supplier()
      {
        return *this;
      }

    /*!
     * \since v.5.5.4
     * \brief Implementation of stats_supplier-related stuff.
     */
    virtual void
    supply( tp_stats::stats_consumer_t & consumer ) override
      {
        // Statics must be collected on locked object.
        std::lock_guard< spinlock_t > lock( m_lock );

        consumer.set_thread_count( m_threads.size() );

        forauto & q : m_cooperations )
          {
            auto & s = q.second;
            s.update_queue_stats();
            consumer.add_queue( s.m_queue_desc );
          }

        forauto & a : m_agents )
          {
            auto & s = a.second;
            if( !s.cooperation_fifo() )
              {
                s.update_queue_stats();
                consumer.add_queue( s.m_queue_desc );
              }
          }
      }
  };

/* namespace impl */

/* namespace adv_thread_pool */

/* namespace disp */

/* namespace so_5 */

Получилось вот это:

namespace so_5
{

namespace disp
{

namespace thread_pool
{

namespace impl
{

//
// adaptation_t
//
/*!
 * \since v.5.5.4
 * \brief Adaptation of common implementation of thread-pool-like dispatcher
 * to the specific of this thread-pool dispatcher.
 */
struct adaptation_t
  {
    static const char *
    dispatcher_type_name()
      {
        return "tp"// thread_pool.
      }

    static bool
    is_individual_fifo( const params_t & params )
      {
        return fifo_t::individual == params.query_fifo();
      }

    static void
    wait_for_queue_emptyness( agent_queue_t & queue )
      {
        queue.wait_for_emptyness();
      }
  };

//
// dispatcher_t
//
/*!
 * \since v.5.4.0
 * \brief Actual type of this thread-pool dispatcher.
 */
using dispatcher_t =
    common_implementation::dispatcher_t<
        work_thread_t,
        dispatcher_queue_t,
        agent_queue_t,
        params_t,
        adaptation_t >;

/* namespace impl */

/* namespace thread_pool */

/* namespace disp */

/* namespace so_5 */
namespace so_5
{

namespace disp
{

namespace adv_thread_pool
{

namespace impl
{

//
// adaptation_t
//
/*!
 * \since v.5.5.4
 * \brief Adaptation of common implementation of thread-pool-like dispatcher
 * to the specific of this thread-pool dispatcher.
 */
struct adaptation_t
  {
    static const char *
    dispatcher_type_name()
      {
        return "atp"// adv_thread_pool.
      }

    static bool
    is_individual_fifo( const params_t & params )
      {
        return fifo_t::individual == params.query_fifo();
      }

    static void
    wait_for_queue_emptyness( agent_queue_t & /*queue*/ )
      {
        // This type of agent_queue doesn't require waiting for emptyness.
      }
  };

//
// dispatcher_t
//
/*!
 * \since v.5.4.0
 * \brief Actual type of this thread-pool dispatcher.
 */
using dispatcher_t =
    so_5::disp::thread_pool::common_implementation::dispatcher_t<
        work_thread_t,
        dispatcher_queue_t,
        agent_queue_t,
        params_t,
        adaptation_t >;

/* namespace impl */

/* namespace adv_thread_pool */

/* namespace disp */

/* namespace so_5 */

Вынесенную в шаблон общую часть реализации можно увидеть здесь.

Еще раз убедился в том, что шаблоны в C++ рулят и бибикают (в лучшем смысле этого слова, конечно же). И очень хорошо, что шаблоны в C++ -- это совсем не то, что генерики в Java/C#. Т.к. попытка сделать все то же самое на чистом ООП потребовала бы больше времени и сил. Т.к. не смотря на использование в обоих случаях одинаковых имен вроде work_thread_t, agent_queue_t и params_t, все эти типы определены в разных пространствах имен и они разные (а имена совпадают потому, что они решают одни и те же задачи, но по-другому).

Отправить комментарий