воскресенье, 13 июля 2014 г.

[prog.c++] И я присоединился к полку писателей собственных примитивов синхронизации

Не от хорошей жизни, чесслово! :) Просто на некоторых операциях ну уж очень не хочется использовать тяжелые примитивы синхронизации, вроде std::mutex или ACE_RW_Thread_Mutex. А хорошие альтернативные реализации, например, как в библиотеке CDS, потребуют подтягивания дополнительных зависимостей. Вот и ловишь себя на мысли, что какой-нибудь spinlock -- это же совсем маленькая штука, всего на несколько строк (см., например, здесь), так почему бы и не написать... :)

Под катом две реализации RW-lock-а, т.е. примитива синхронизации, который позволяет обращаться к ресурсу либо нескольким "читателям", либо всего одному "писателю". Реализации тривиальные, не обеспечивающие "честности". Кроме того, если захватившая замок нить ломается, то замок остается в занятом состоянии. Однако, для моих нужд этого хватает.

Первая реализация, под названием rw_spinlock -- это примитивная реализация на двух atomic-ах, до которой додумался самостоятельно. Она компактная и понятная. Хотя и не очень эффективная.

Вторая реализация, под названием reader_priority_rw_spinlock -- это переписанный мной вариант класса RWSpinRPrefT из библиотеки CDS. Только моя реализация использует исключительно std-шные штуки и битовые операции, тогда как CDS-ный класс RWSpinRPrefT применяет собственные определения (которые могут, как я понимаю, мапиться на std- или boost-, или еще на что-нибудь), а так же битовые поля и union-ы. В reader_priority_rw_spinlock кода больше, чем в rw_spinlock. Но, надеюсь, ничего я не напутал. Данная реализация местами чуть побыстрее rw_spinlock.

На таком низком уровне я и до перехода в менеджеры не программировал, поэтому сейчас вообще тяжко. Вкуривал значение констант std::memory_order_* долго и упорно, но не уверен, что все воспринял. Кто интересуется этой темой, вот хорошая статья на эту тему на русском языке (как мне показалось ее написал автор библиотеки CDS). Поэтому если кто-то сведующий заметит проблемы в моем коде, прошу сразу же ткнуть меня мордой лица в соответствующие кучи :)

Заодно интересно пообщаться с кем-нибудь вот на какую тему. Допустим, у нас есть несколько рабочих потоков (нитей) и очередей сообщений для обмена сообщениями между потоками, каждому потоку принадлежит своя очередь. Каждый поток зависает на ожидании очередного сообщения, просыпается при его появлении, обрабатывает сообщение и, если очередь оказывается пустой, засыпает вновь.

Простая реализация такой очереди на C++11 требует одного std::mutex-а и одного std::condition_variable.

Однако у такой простой реализации есть проблема: если две нити начнут использовать такую очередь для ping-pong-а единичных сообщений, то производительность будет крайне низкой. Речь будет идти всего о сотнях тысяч сообщений в секунду. На мой взгляд это из-за того, что когда первая нить завершает обработку своего сообщения и ничего больше не обнаруживает, то засыпает на condition_variable слишком надолго. Попутно для работы с condition_variable несколько раз захватывается и освобождается std::mutex, что есть слишком долго и дорого.

Если заставить нити блокировать очереди сообщений посредством spinlock-ов и обходиться без засыпания на condition_variable, то производительность значительно увеличивается.

Но, со spinlock-ами плохо то, что они оправданы, если между нитями действительно идет постоянный интенсивный обмен сообщениями. Если же этот обмен имеет спорадический характер (т.е. временами пусто, временами густо), то spinlock-и будут зазря растрачивать процессорное время в ожидании очередного всплеска обмена сообщениями.

Поэтому на ум приходит тривиальная штука: при ожидании следующего сообщения на пустой очереди сначала провести несколько итераций на spinlock-е, а затем заснуть на std::mutex+condition_variable. Тогда получится, что пока сообщения идут плотным потоком, мы не трогаем "тяжелые" объекты синхронизации. Но, как-то только возникает какая-то заметная пауза, переходим в честный режим ожидания, позволяя ОС выделить освобожденные нами ресурсы другим нитям.

Кто-нибудь сталкивался с таким гибридным подходом? Если да, то можно ли где-то посмотреть (в смысле есть ли OpenSource-проекты с использованием такой схемы, чтобы глянуть на готовые реализации)? Какие были грабли? Оправдан ли такой подход (если нет, то почему)?

Примечание. Приведенный ниже код проверялся только под MSVS 2013 Express.

Upd.С подачи Alex Syrnikov исправлена ошибка в rw_spinlock, возникшая из-за переделки одного класса в другой.

Upd. После пары дней самостоятельной медитации и комментариев читателей блога реализация reader_priority_rw_spinlock была исправлена и переработана.

class rw_spinlock
   {
   private :
      std::atomic_flag m_lock;
      std::atomic_uint m_readers;

      inline void lock_object()
         {
            while( m_lock.test_and_set( std::memory_order_acquire ) )
               ;
         }

      inline void unlock_object()
         {
            m_lock.clear( std::memory_order_release );
         }

   public :
      inline rw_spinlock()
         {
            m_lock.clear( std::memory_order_release );
            m_readers.store( 0, std::memory_order_release );
         }

      inline void lock_shared()
         {
            lock_object();
            ++m_readers;
            unlock_object();
         }

      inline void unlock_shared()
         {
            --m_readers;
         }

      inline void lock()
         {
            lock_object();
            while0 != m_readers.load( std::memory_order_consume ) )
               ;
         }

      inline void unlock()
         {
            unlock_object();
         }
   };

class reader_priority_rw_spinlock
   {
   private :
      std::atomic_uint_fast32_t m_counters;

      static const std::uint_fast32_t writter_bit = 0x80000000u;

   public :
      inline reader_priority_rw_spinlock()
         {
            m_counters.store( 0, std::memory_order_release );
         }

      inline void lock_shared()
         {
            std::uint_fast32_t expected =
               m_counters.load( std::memory_order_acquire );
            std::uint_fast32_t desired;

            do
               {
                  expected &= ~writter_bit;
                  desired = expected + 1;
               }
            while( !m_counters.compare_exchange_weak(
                  expected, desired,
                  std::memory_order_release,
                  std::memory_order_relaxed ) );
         }

      inline void unlock_shared()
         {
            std::uint_fast32_t expected =
               m_counters.load( std::memory_order_acquire );
            std::uint_fast32_t desired;
            do
               {
                  desired = expected - 1;
               }
            while( !m_counters.compare_exchange_weak(
                  expected, desired,
                  std::memory_order_release,
                  std::memory_order_relaxed ) );
         }

      inline void lock()
         {
            std::uint_fast32_t expected;
            std::uint_fast32_t desired;

            do
               {
                  expected = 0;
                  desired = writter_bit;
               }
            while( !m_counters.compare_exchange_weak(
                  expected, desired,
                  std::memory_order_release,
                  std::memory_order_relaxed ) );
         }

      inline void unlock()
         {
            m_counters.store( 0, std::memory_order_release );
         }
   };
Отправить комментарий