вторник, 24 декабря 2013 г.

[prog.c++] GCC под Windows: удивлен скоростью работы std::mutex/std::condition_variable

В стандартную C++11 входят базовые средства для поддержки многопоточности: std::thread, std::mutex и std::condition_variable (подробнее см.здесь). Но т.к. под Windows я пока пользуюсь MSVS2010, где эти штуки не реализованы, то единственный доступный вариант для работы с std::thread -- GCC, благо в версии 4.8.* это все уже есть (может были и в более ранних, я не исследовал этот вопрос, т.к. имел возможность сразу взять 4.8).

В случае GCC под Linux-ом никаких проблем с использованием thread из C++11 нет. Там все работает с максимальной скоростью. А вот под Windows с использованием GCC все гораздо хуже.

Я использую не родной вариант MinGW, а его порты: TDM-GCC и MinGW-w64. В них реализация std::thread под Windows доступна. При этом для MinGW-w64 есть два варианта: win-thread и posix-thread, так вот std::thread работает только в варианте posix-thread.

Проблемы с производительностью проявились на небольшом синтетическом тесте, который я использовал для своих экспериментов. В начале в тесте использовалась библиотека ACE, но затем захотелось попробовать std::thread.

Исходный текст теста приведен в конце поста под катом. Самое важное, что хоть там и используется очередь сообщений, в которой std::mutex/std::condition_variable задействованны для обеспечения thread safety очереди, но вся работа происходит на одной рабочей нити. Т.е. фактически этот тест показывает накладные расходы на работу внутренностей очереди (и, в том числе, накладные расходы std::mutex).

Так вот под MinGW (как в варианте TDM-GCC, так и MinGW-w64) тест показывает скорость приблизительно в 330K сообщений в секунду. Тогда как вариант с ACE показывал около 6.8M сообщений в секунду. Самое смешное, что тот же тест с использованием std::thread на Linux под управлением виртуалки так же оказывался значительно быстрее: порядка 5.2M сообщений в секунду. Под виртуалкой!

Вероятно, дело в реализации std::thread в libc++, которая идет с этими вариантами GCC. Подозреваю, что она базируется на коде для POSIX Threads, а сами POSIX Threads имитируются через WinAPI. Отсюда и такая низкая производительность. Причем, забавно, тот же тест под Cygwin GCC работает чуть быстрее -- порядка 1.3M msg/sec. Но все равно это далеко от производительности кода, который напрямую использует WinAPI.

В поисках других сборок GCC под Windows нашел еще один интересный дистрибутив: nuwem-mingw. В его основе лежит WinGW-w64, но с win-threads вместо posix-threads. Поэтому в чистом виде мой тест на сборке nuwem-mingw не проверишь. Зато в nuwem-mingw включено еще много чего, в том числе и скомпилированный вариант Boost-1.55. Это дало возможность адаптировать код теста под Boost и проверить его скорость. Результат получился таким же, как и с ACE: около 6.8M msg/sec.

Вот такие дела. Все проверялось на 2-х ядерном Pentium 1.4GHz, 4Gb RAM, Win7 64bit. Версии GCC во всех случаях были 4.8.2, тест собирался в 64-битовом режиме.

Update. Скомпилированный в режиме x86_amd64 компилятором из MSVC2013 Express показывает результат в два раза худший, чем nuwem-mingw с Boost-ом: порядка 3.5M msg/sec.

PS. Кстати, еще один интересный момент. У меня есть два варианта TDM-GCC: основной 64-битовый и точно такой же, но 32-битовый. Так вот 32-х битовый вариант оказался компилировать тест указав, что не знает функцию std::stoul. Хотя точно такой же 64-битовый TDM-GCC собрал код без проблем. На эту же функцию ругнулся и GCC из Cygwin-а. Все остальные варианты, включая GCC на Linux-е, успешно std::stoul проглотили. В общем, C++ скоро четвертый десяток разменяет, а приключения все те же :)

PPS. В Boost-е обнаружились такие нужные мне сейчас вещи, как spin-lock-и и lock-free очереди. Как бы я не превратился со временем в пользователя Boost-а :) Кстати, кто-нибудь проверял масштабируемость таймеров в Boost.Asio? Можно там создавать таймера в количестве тысяч/десятков тысяч штук?

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <deque>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <thread>

typedef void (*handler_pfn_t)(void *);

//
// demand_t
//

struct demand_t
{
   handler_pfn_t m_handler;
   void * m_param;

   demand_t()
      :  m_handler( nullptr )
      ,  m_param( nullptr )
   {}

   demand_t(
      handler_pfn_t handler,
      void * param )
      :  m_handler( handler )
      ,  m_param( param )
   {}
};

typedef std::deque< demand_t > demand_container_t;

//
// demand_queue_t
//

class demand_queue_t
{
   public:
      demand_queue_t();
      ~demand_queue_t();

      void
      push(
         handler_pfn_t handler,
         void * param );

      enum
      {
         demand_extracted = 1,
         shutting_down = 2,
         no_demands = 3
      };

      int
      pop(
         demand_container_t & queue_item );

      void
      start_service();

      void
      stop_service();

      void
      clear();

   private:
      demand_container_t m_demands;

      std::mutex m_lock;
      std::condition_variable m_not_empty;

      bool m_shutting_down;
      bool m_is_waiting;
};

demand_queue_t::demand_queue_t()
   :  m_shutting_down( false )
   ,  m_is_waiting( false )
{
}

demand_queue_t::~demand_queue_t()
{
   m_demands.clear();
}

void
demand_queue_t::push(
   handler_pfn_t handler,
   void * param )
{
   std::unique_lock< std::mutex > lock( m_lock );

   if( !m_shutting_down )
   {
      m_demands.push_back( demand_t( handler, param ) );

      if( m_is_waiting )
      {
         // Someone is waiting...
         // It should be informed about new demands.
         m_is_waiting = false;
         m_not_empty.notify_one();
      }
   }
}

int
demand_queue_t::pop(
   demand_container_t & demands )
{
   std::unique_lock< std::mutex > lock( m_lock );

   whiletrue )
   {
      if( m_shutting_down )
         return shutting_down;
      else if( !m_demands.empty() )
      {
         demands.swap( m_demands );
         break;
      }
      else
      {
         // Queue is empty. We should wait for a demand or
         // a shutdown signal.
         m_is_waiting = true;
         m_not_empty.wait( lock );
      }
   }

   return demand_extracted;
}

void
demand_queue_t::start_service()
{
}

void
demand_queue_t::stop_service()
{
   std::unique_lock< std::mutex > lock( m_lock );

   m_shutting_down = true;

   // If the demands queue is empty then someone is waiting
   // for new demands inside pop().
   if( m_is_waiting )
      m_not_empty.notify_one();
}

void
demand_queue_t::clear()
{
   std::unique_lock< std::mutex > lock( m_lock );
   m_demands.clear();
}

//
// work_thread_t
//

class work_thread_t
{
   public:
      work_thread_t();
      ~work_thread_t();

      void
      put_event(
         handler_pfn_t handler,
         void * param );

      void
      start();

      void
      shutdown();

      void
      wait();

   protected:
      void
      body();

      void
      serve_demands_block(
         demand_container_t & executed_demands );

      static void
      entry_point( work_thread_t * self_object );

   private:
      demand_queue_t m_queue;

      std::atomic< bool > m_continue;

      std::unique_ptr< std::thread > m_thread;
};

work_thread_t::work_thread_t()
{
}

work_thread_t::~work_thread_t()
{
}

void
work_thread_t::put_event(
   handler_pfn_t handler,
   void * param )
{
   m_queue.push( handler, param );
}

void
work_thread_t::start()
{
   m_queue.start_service();
   m_continue = true;

   m_thread.reset(
         new std::thread( &work_thread_t::entry_point, this ) );
}

void
work_thread_t::shutdown()
{
   m_continue = false;
   m_queue.stop_service();
}

void
work_thread_t::wait()
{
   m_thread->join();

   m_queue.clear();
}

void
work_thread_t::body()
{
   // Local demands queue.
   demand_container_t demands;

   int result = demand_queue_t::no_demands;

   while( m_continue )
   {
      if( demands.empty() )
         result = m_queue.pop( demands );

      // Serve demands if any.
      if( demand_queue_t::demand_extracted == result )
         serve_demands_block( demands );
   }
}

inline void
work_thread_t::serve_demands_block(
   demand_container_t & demands )
{
   while( !demands.empty() )
   {
      demand_t demand = demands.front();
      demands.pop_front();

      (*demand.m_handler)(demand.m_param);
   }
}

void
work_thread_t::entry_point( work_thread_t * self_object )
{
   self_object->body();
}
 
using std::chrono::duration_cast;
using std::chrono::milliseconds;
using std::chrono::steady_clock;

class agent_imitator_t
{
   private :
      work_thread_t & m_context;

      const size_t m_iteration_count;
      size_t m_current_iteration;

      steady_clock::time_point m_start_time;
      steady_clock::time_point m_finish_time;

   public :
      agent_imitator_t(
         work_thread_t & context,
         size_t iteration_count )
         :  m_context( context )
         ,  m_iteration_count( iteration_count )
         ,  m_current_iteration( 0 )
      {}

      uint64_t
      duration() const
      {
         return duration_cast< milliseconds >( m_finish_time - m_start_time )
               .count();
      }

      static void
      on_start( void * param )
      {
         auto p = reinterpret_cast< agent_imitator_t * >(param);
         p->m_start_time = steady_clock::now();

         p->m_context.put_event( &on_demand, param );
      }

      static void
      on_demand( void * param )
      {
         auto p = reinterpret_cast< agent_imitator_t * >(param);

         if( ++(p->m_current_iteration) < p->m_iteration_count )
            p->m_context.put_event( &on_demand, param );
         else
            p->m_context.put_event( &on_finish, param );
      }

      static void
      on_finish( void * param )
      {
         auto p = reinterpret_cast< agent_imitator_t * >(param);

         p->m_finish_time = steady_clock::now();
         p->m_context.shutdown();
      }
};

size_t
iteration_count( int argc, char ** argv )
{
   if( argc > 1 )
   {
      return std::stoul( argv[ 1 ] );
   }

   return 1000000;
}

int
main(int argc, char ** argv)
{
   work_thread_t context;

   const size_t iterations = iteration_count( argc, argv );
   std::cout << "iterations: " << iterations << std::endl;

   agent_imitator_t agent( context, iterations );

   context.put_event( &agent_imitator_t::on_start, &agent );
   context.start();
   context.wait();

   auto duration_sec = agent.duration() / 1000.0;
   auto price = duration_sec / iterations;
   auto per_sec = 1 / price;

   std::cout.precision( 12 );
   std::cout << "Time: " << duration_sec << "s, per demand: "
         << price << "s, per sec: " << per_sec
         << std::endl;

   return 0;
}

Комментариев нет: