Был у меня класс. Из которого путем копипасты был сделан другой класс. В течении долгого времени избавиться от копипасты не удавалось, т.к. отличия в реализациях были мелкими, а времени придумать, как эти различия вынести куда-то, не хватало. Но пришла необходимость расширить оба класса практически одинаковой функциональностью.
Сначала был модифицирован первый класс, потом, по образу и подобию, второй. Объем дублируемого кода стал ну совсем уж неприличным. Поэтому пришлось сесть и придумать, как с этим быть. В итоге, вот из этого:
namespace so_5 { namespace disp { namespace thread_pool { namespace impl { // // dispatcher_t // /*! * \since v.5.4.0 * \brief An implementation of thread pool dispatcher. */ class dispatcher_t : public so_5::rt::dispatcher_t , public tp_stats::stats_supplier_t { private : //! Data for one cooperation. struct cooperation_data_t { //! Event queue for the cooperation. agent_queue_shptr_t m_queue; //! Count of agents form that cooperation. /*! * When this counter is zero then cooperation data * must be destroyed. */ std::size_t m_agents; /*! * \since v.5.5.4 * \brief Description of that queue for run-time monitoring. */ tp_stats::queue_description_holder_ref_t m_queue_desc; cooperation_data_t( agent_queue_shptr_t queue, std::size_t agents, const stats::prefix_t & data_source_name_prefix, const std::string & coop_name ) : m_queue( std::move( queue ) ) , m_agents( agents ) , m_queue_desc( tp_stats::make_queue_desc_holder( data_source_name_prefix, coop_name, agents ) ) {} /*! * \since v.5.5.4 * \brief Update queue information for run-time monitoring. */ void update_queue_stats() { m_queue_desc->m_desc.m_agent_count = m_agents; m_queue_desc->m_desc.m_queue_size = m_queue->size(); } }; //! Map from cooperation name to the cooperation data. typedef std::map< std::string, cooperation_data_t > cooperation_map_t; //! Data for one agent. struct agent_data_t { //! Event queue for the agent. /*! * It could be individual queue or queue for the whole * cooperation (to which agent is belonging). */ agent_queue_shptr_t m_queue; /*! * \since v.5.5.4 * \brief Description of that queue for run-time monitoring. * * \note This description is created only if agent * uses individual FIFO. */ tp_stats::queue_description_holder_ref_t m_queue_desc; //! Constructor for the case when agent uses cooperation FIFO. agent_data_t( agent_queue_shptr_t queue ) : m_queue( std::move( queue ) ) {} //! Constructor for the case when agent uses individual FIFO. /*! * In this case a queue_description object must be created. */ agent_data_t( agent_queue_shptr_t queue, const stats::prefix_t & data_source_name_prefix, const so_5::rt::agent_t * agent_ptr ) : m_queue( std::move( queue ) ) , m_queue_desc( tp_stats::make_queue_desc_holder( data_source_name_prefix, agent_ptr ) ) {} /*! * \since v.5.5.4 * \brief Does agent use cooperation FIFO? */ bool cooperation_fifo() const { return !m_queue_desc; } /*! * \since v.5.5.4 * \brief Update queue description with current information. * * \attention Must be called only if !cooperation_fifo(). */ void update_queue_stats() { m_queue_desc->m_desc.m_agent_count = 1; m_queue_desc->m_desc.m_queue_size = m_queue->size(); } }; //! Map from agent pointer to the agent data. typedef std::map< so_5::rt::agent_t *, agent_data_t > agent_map_t; public : dispatcher_t( const dispatcher_t & ) = delete; dispatcher_t & operator=( const dispatcher_t & ) = delete; //! Constructor. dispatcher_t( std::size_t thread_count ) : m_thread_count( thread_count ) , m_data_source( stats_supplier() ) { m_threads.reserve( thread_count ); for( std::size_t i = 0; i != m_thread_count; ++i ) m_threads.emplace_back( std::unique_ptr< work_thread_t >( new work_thread_t( m_queue ) ) ); } virtual void start( so_5::rt::environment_t & env ) override { m_data_source.start( env.stats_repository() ); for( auto & t : m_threads ) t->start(); } virtual void shutdown() override { m_queue.shutdown(); } virtual void wait() override { for( auto & t : m_threads ) t->join(); m_data_source.stop(); } virtual void set_data_sources_name_base( const std::string & name_base ) override { m_data_source.set_data_sources_name_base( "tp", // thread-pool name_base, this ); } //! Bind agent to the dispatcher. so_5::rt::event_queue_t * bind_agent( so_5::rt::agent_ref_t agent, const params_t & params ) { std::lock_guard< spinlock_t > lock( m_lock ); if( fifo_t::individual == params.query_fifo() ) return bind_agent_with_inidividual_fifo( std::move( agent ), params ); return bind_agent_with_cooperation_fifo( std::move( agent ), params ); } //! Unbind agent from the dispatcher. void unbind_agent( so_5::rt::agent_ref_t agent ) { std::lock_guard< spinlock_t > lock( m_lock ); auto it = m_agents.find( agent.get() ); if( it != m_agents.end() ) { if( it->second.cooperation_fifo() ) { auto it_coop = m_cooperations.find( agent->so_coop_name() ); if( it_coop != m_cooperations.end() && 0 == --(it_coop->second.m_agents) ) { // agent_queue object can be destroyed // only when it is empty. it_coop->second.m_queue->wait_for_emptyness(); m_cooperations.erase( it_coop ); } } else // agent_queue object can be destroyed // only when it is empty. it->second.m_queue->wait_for_emptyness(); m_agents.erase( it ); } } private : //! Queue for active agent's queues. dispatcher_queue_t m_queue; //! Count of working threads. const std::size_t m_thread_count; //! Pool of work threads. std::vector< std::unique_ptr< work_thread_t > > m_threads; //! Object's lock. spinlock_t m_lock; //! Information about cooperations. /*! * Information to this map is added only if an agent is * using cooperation FIFO mechanism. */ cooperation_map_t m_cooperations; //! Information of agents. agent_map_t m_agents; /*! * \since v.5.5.4 * \brief Data source for the run-time monitoring. */ tp_stats::data_source_t m_data_source; //! Creation event queue for an agent with individual FIFO. so_5::rt::event_queue_t * bind_agent_with_inidividual_fifo( so_5::rt::agent_ref_t agent, const params_t & params ) { agent_queue_shptr_t queue = make_new_agent_queue( params ); m_agents.emplace( agent.get(), agent_data_t{ queue, m_data_source.prefix(), agent.get() } ); return queue.get(); } //! Creation event queue for an agent with individual FIFO. /*! * If the data for the agent's cooperation is not created yet * it will be created. */ so_5::rt::event_queue_t * bind_agent_with_cooperation_fifo( so_5::rt::agent_ref_t agent, const params_t & params ) { auto it = m_cooperations.find( agent->so_coop_name() ); if( it == m_cooperations.end() ) it = m_cooperations.emplace( agent->so_coop_name(), cooperation_data_t( make_new_agent_queue( params ), 1, m_data_source.prefix(), agent->so_coop_name() ) ) .first; else it->second.m_agents += 1; try { m_agents.emplace( agent.get(), agent_data_t( it->second.m_queue ) ); } catch( ... ) { // Rollback m_cooperations modification. if( 0 == --(it->second.m_agents) ) m_cooperations.erase( it ); throw; } return it->second.m_queue.get(); } //! Helper method for creating event queue for agents/cooperations. agent_queue_shptr_t make_new_agent_queue( const params_t & params ) { return agent_queue_shptr_t( new agent_queue_t( m_queue, params.query_max_demands_at_once() ) ); } /*! * \since v.5.5.4 * \brief Helper method for casting to stats_supplier-object. */ tp_stats::stats_supplier_t & stats_supplier() { return *this; } /*! * \since v.5.5.4 * \brief Implementation of stats_supplier-related stuff. */ virtual void supply( tp_stats::stats_consumer_t & consumer ) override { // Statics must be collected on locked object. std::lock_guard< spinlock_t > lock( m_lock ); consumer.set_thread_count( m_threads.size() ); for( auto & q : m_cooperations ) { auto & s = q.second; s.update_queue_stats(); consumer.add_queue( s.m_queue_desc ); } for( auto & a : m_agents ) { auto & s = a.second; if( !s.cooperation_fifo() ) { s.update_queue_stats(); consumer.add_queue( s.m_queue_desc ); } } } }; } /* namespace impl */ } /* namespace thread_pool */ } /* namespace disp */ } /* namespace so_5 */ |
namespace so_5 { namespace disp { namespace adv_thread_pool { namespace impl { // // dispatcher_t // /*! * \since v.5.4.0 * \brief An implementation of thread pool dispatcher. */ class dispatcher_t : public so_5::rt::dispatcher_t , public tp_stats::stats_supplier_t { private : //! Data for one cooperation. struct cooperation_data_t { //! Event queue for the cooperation. agent_queue_ref_t m_queue; //! Count of agents form that cooperation. /*! * When this counter is zero then cooperation data * must be destroyed. */ std::size_t m_agents; /*! * \since v.5.5.4 * \brief Description of that queue for run-time monitoring. */ tp_stats::queue_description_holder_ref_t m_queue_desc; cooperation_data_t( agent_queue_ref_t queue, std::size_t agents, const stats::prefix_t & data_source_name_prefix, const std::string & coop_name ) : m_queue( std::move( queue ) ) , m_agents( agents ) , m_queue_desc( tp_stats::make_queue_desc_holder( data_source_name_prefix, coop_name, agents ) ) {} /*! * \since v.5.5.4 * \brief Update queue information for run-time monitoring. */ void update_queue_stats() { m_queue_desc->m_desc.m_agent_count = m_agents; m_queue_desc->m_desc.m_queue_size = m_queue->size(); } }; //! Map from cooperation name to the cooperation data. typedef std::map< std::string, cooperation_data_t > cooperation_map_t; //! Data for one agent. struct agent_data_t { //! Event queue for the agent. /*! * It could be individual queue or queue for the whole * cooperation (to which agent is belonging). */ agent_queue_ref_t m_queue; /*! * \since v.5.5.4 * \brief Description of that queue for run-time monitoring. * * \note This description is created only if agent * uses individual FIFO. */ tp_stats::queue_description_holder_ref_t m_queue_desc; //! Constructor for the case when agent uses cooperation FIFO. agent_data_t( agent_queue_ref_t queue ) : m_queue( std::move( queue ) ) {} //! Constructor for the case when agent uses individual FIFO. /*! * In this case a queue_description object must be created. */ agent_data_t( agent_queue_ref_t queue, const stats::prefix_t & data_source_name_prefix, const so_5::rt::agent_t * agent_ptr ) : m_queue( std::move( queue ) ) , m_queue_desc( tp_stats::make_queue_desc_holder( data_source_name_prefix, agent_ptr ) ) {} /*! * \since v.5.5.4 * \brief Does agent use cooperation FIFO? */ bool cooperation_fifo() const { return !m_queue_desc; } /*! * \since v.5.5.4 * \brief Update queue description with current information. * * \attention Must be called only if !cooperation_fifo(). */ void update_queue_stats() { m_queue_desc->m_desc.m_agent_count = 1; m_queue_desc->m_desc.m_queue_size = m_queue->size(); } }; //! Map from agent pointer to the agent data. typedef std::map< so_5::rt::agent_t *, agent_data_t > agent_map_t; public : dispatcher_t( const dispatcher_t & ) = delete; dispatcher_t & operator=( const dispatcher_t & ) = delete; //! Constructor. dispatcher_t( std::size_t thread_count ) : m_thread_count( thread_count ) , m_data_source( stats_supplier() ) { m_threads.reserve( thread_count ); for( std::size_t i = 0; i != m_thread_count; ++i ) m_threads.emplace_back( std::unique_ptr< work_thread_t >( new work_thread_t( m_queue ) ) ); } virtual void start( so_5::rt::environment_t & env ) override { m_data_source.start( env.stats_repository() ); for( auto & t : m_threads ) t->start(); } virtual void shutdown() override { m_queue.shutdown(); } virtual void wait() override { for( auto & t : m_threads ) t->join(); m_data_source.stop(); } virtual void set_data_sources_name_base( const std::string & name_base ) override { m_data_source.set_data_sources_name_base( "atp", // adv-thread-pool name_base, this ); } //! Bind agent to the dispatcher. so_5::rt::event_queue_t * bind_agent( so_5::rt::agent_ref_t agent, const params_t & params ) { std::lock_guard< spinlock_t > lock( m_lock ); if( fifo_t::individual == params.query_fifo() ) return bind_agent_with_inidividual_fifo( std::move( agent ), params ); return bind_agent_with_cooperation_fifo( std::move( agent ), params ); } //! Unbind agent from the dispatcher. void unbind_agent( so_5::rt::agent_ref_t agent ) { std::lock_guard< spinlock_t > lock( m_lock ); auto it = m_agents.find( agent.get() ); if( it != m_agents.end() ) { if( it->second.cooperation_fifo() ) { auto it_coop = m_cooperations.find( agent->so_coop_name() ); if( it_coop != m_cooperations.end() && 0 == --(it_coop->second.m_agents) ) { m_cooperations.erase( it_coop ); } } m_agents.erase( it ); } } private : //! Queue for active agent's queues. dispatcher_queue_t m_queue; //! Count of working threads. const std::size_t m_thread_count; //! Pool of work threads. std::vector< std::unique_ptr< work_thread_t > > m_threads; //! Object's lock. spinlock_t m_lock; //! Information about cooperations. /*! * Information to this map is added only if an agent is * using cooperation FIFO mechanism. */ cooperation_map_t m_cooperations; //! Information of agents. agent_map_t m_agents; /*! * \since v.5.5.4 * \brief Data source for the run-time monitoring. */ tp_stats::data_source_t m_data_source; //! Creation event queue for an agent with individual FIFO. so_5::rt::event_queue_t * bind_agent_with_inidividual_fifo( so_5::rt::agent_ref_t agent, const params_t & /*params*/ ) { agent_queue_ref_t queue = make_new_agent_queue(); m_agents.emplace( agent.get(), agent_data_t{ queue, m_data_source.prefix(), agent.get() } ); return queue.get(); } //! Creation event queue for an agent with individual FIFO. /*! * If the data for the agent's cooperation is not created yet * it will be created. */ so_5::rt::event_queue_t * bind_agent_with_cooperation_fifo( so_5::rt::agent_ref_t agent, const params_t & /*params*/ ) { auto it = m_cooperations.find( agent->so_coop_name() ); if( it == m_cooperations.end() ) it = m_cooperations.emplace( agent->so_coop_name(), cooperation_data_t( make_new_agent_queue(), 1, m_data_source.prefix(), agent->so_coop_name() ) ) .first; else it->second.m_agents += 1; try { m_agents.emplace( agent.get(), agent_data_t( it->second.m_queue ) ); } catch( ... ) { // Rollback m_cooperations modification. if( 0 == --(it->second.m_agents) ) m_cooperations.erase( it ); throw; } return it->second.m_queue.get(); } //! Helper method for creating event queue for agents/cooperations. agent_queue_ref_t make_new_agent_queue() { return agent_queue_ref_t( new agent_queue_t( m_queue ) ); } /*! * \since v.5.5.4 * \brief Helper method for casting to stats_supplier-object. */ tp_stats::stats_supplier_t & stats_supplier() { return *this; } /*! * \since v.5.5.4 * \brief Implementation of stats_supplier-related stuff. */ virtual void supply( tp_stats::stats_consumer_t & consumer ) override { // Statics must be collected on locked object. std::lock_guard< spinlock_t > lock( m_lock ); consumer.set_thread_count( m_threads.size() ); for( auto & q : m_cooperations ) { auto & s = q.second; s.update_queue_stats(); consumer.add_queue( s.m_queue_desc ); } for( auto & a : m_agents ) { auto & s = a.second; if( !s.cooperation_fifo() ) { s.update_queue_stats(); consumer.add_queue( s.m_queue_desc ); } } } }; } /* namespace impl */ } /* namespace adv_thread_pool */ } /* namespace disp */ } /* namespace so_5 */ |
Получилось вот это:
namespace so_5 { namespace disp { namespace thread_pool { namespace impl { // // adaptation_t // /*! * \since v.5.5.4 * \brief Adaptation of common implementation of thread-pool-like dispatcher * to the specific of this thread-pool dispatcher. */ struct adaptation_t { static const char * dispatcher_type_name() { return "tp"; // thread_pool. } static bool is_individual_fifo( const params_t & params ) { return fifo_t::individual == params.query_fifo(); } static void wait_for_queue_emptyness( agent_queue_t & queue ) { queue.wait_for_emptyness(); } }; // // dispatcher_t // /*! * \since v.5.4.0 * \brief Actual type of this thread-pool dispatcher. */ using dispatcher_t = common_implementation::dispatcher_t< work_thread_t, dispatcher_queue_t, agent_queue_t, params_t, adaptation_t >; } /* namespace impl */ } /* namespace thread_pool */ } /* namespace disp */ } /* namespace so_5 */ |
namespace so_5 { namespace disp { namespace adv_thread_pool { namespace impl { // // adaptation_t // /*! * \since v.5.5.4 * \brief Adaptation of common implementation of thread-pool-like dispatcher * to the specific of this thread-pool dispatcher. */ struct adaptation_t { static const char * dispatcher_type_name() { return "atp"; // adv_thread_pool. } static bool is_individual_fifo( const params_t & params ) { return fifo_t::individual == params.query_fifo(); } static void wait_for_queue_emptyness( agent_queue_t & /*queue*/ ) { // This type of agent_queue doesn't require waiting for emptyness. } }; // // dispatcher_t // /*! * \since v.5.4.0 * \brief Actual type of this thread-pool dispatcher. */ using dispatcher_t = so_5::disp::thread_pool::common_implementation::dispatcher_t< work_thread_t, dispatcher_queue_t, agent_queue_t, params_t, adaptation_t >; } /* namespace impl */ } /* namespace adv_thread_pool */ } /* namespace disp */ } /* namespace so_5 */ |
Вынесенную в шаблон общую часть реализации можно увидеть здесь.
Еще раз убедился в том, что шаблоны в C++ рулят и бибикают (в лучшем смысле этого слова, конечно же). И очень хорошо, что шаблоны в C++ -- это совсем не то, что генерики в Java/C#. Т.к. попытка сделать все то же самое на чистом ООП потребовала бы больше времени и сил. Т.к. не смотря на использование в обоих случаях одинаковых имен вроде work_thread_t, agent_queue_t и params_t, все эти типы определены в разных пространствах имен и они разные (а имена совпадают потому, что они решают одни и те же задачи, но по-другому).
Комментариев нет:
Отправить комментарий