воскресенье, 27 июля 2014 г.

[prog.c++] Чего ждать от lock-free структур при активном использовании динамической памяти?

В последние несколько дней активно экспериментировал с lock-free и не lock-free очередями для передачи данных между нитями. Хотел выжать максимум возможного. Получил несколько обескураживающие результаты и сделал для себя соответствующие выводы. Полагаю, что для опытных в многопоточном программировании людей все нижесказанное будет очевидными и давно известными вещами. К сожалению, таковыми для меня они не оказались. Потому, что вижу, о том и пою, гуру от многопоточности этот текст могут не читать.

Итак, если вы активно используете в своем C++ приложении передачу динамически созданных объектов между нитями, да так, что одна нить создает объект, а вторая нить затем его разрушает, то вы не получите очень уж серьезного выигрыша от lock-free очередей (или других механизмов синхронизации передачи информации между потоками, построенных на низкоуровневых инструментах вроде атомиков).

Просто потому, что весь выигрыш, который дают вам lock-free структуры, будет без следа съеден стоимостью операции delete для объекта, аллоцированного в другом потоке. Т.е. если стоимость деаллокации объекта составляет 10мкс, то стоимость передачи указателя на него в другую нить в районе 1-2мкс -- это не о чем. Ваши потоки будут гораздо больше времени ждать на аллокации/деаллокации, чем на CAS-ах или даже тупых spinlock-ах.

Под катом полностью самодостаточный тест, который можно попробовать у себя. Он делает вот что:

Сначала создает N нитей (задается в командной строке или определяется автоматически). Эти нити в цикле делают две операции: создают несколько объектов demand_t и сохраняет указатели на них объекте to_fill. Затем ждут, пока появятся объекты в объекте to_clear. Как только объект to_clear оказывается заполненным, его содержимое (т.е. все объекты demand_t) удаляются.

Смысл в том, что нить может создавать объекты demand_t для удаления другой нитью. Тут все зависит от того, как именно заданы параметры to_fill и to_clear. Сначала тест запускается так, чтобы нить создавала demand_t в свой to_fill, а удаляла из чужого to_clear. Потом тест запускается так, чтобы и to_fill и to_clear были собственными объектами нити. В этом случае операции new/delete происходят на контексте "родной" нити и, потому, получаются гораздо эффективнее. Такое поведение теста контролируется параметром shift: значение 1 указывает, что to_fill будет "родным", а to_clear -- "чужим". Если же shift=0, то to_fill и to_clear -- это родные объекты нити.

У меня на 4-х аппаратных ядрах и четырех потоках под MSVC++2013 получаются следующие результаты:

$ _vc_12_0_x64/release/_experiments.combined_pass_between.exe 4
threads: 4
...
*** shift=1 ***
allocs: 4000000, total_time: 0.953s
price: 2.3825e-007s
throughtput: 4197271.773 allocs/s
assigns: 4000000, total_time: 0.65s
price: 1.625e-007s
throughtput: 6153846.154 assigns/s

*** shift=0 ***
allocs: 4000000, total_time: 0.168s
price: 4.2e-008s
throughtput: 23809523.81 allocs/s
assigns: 4000000, total_time: 0.015s
price: 3.75e-009s
throughtput: 266666666.7 assigns/s

Здесь при shift=1 пропускная способность для передачи динамически выделенных объектов между нитями получилась порядка 4.2M объектов в секунду (4_197_271). Тогда как при shift=0 нити смогли создавать и удалять объекты с суммарной пропускной способностью 24M объектов в секунду (23_809_523).

Отдельной строкой (помеченной как assigns) стоят результаты еще одной модификации этого теста: вместо реальной аллокации и деаллокации объектов demand_t между нитями передаются указатели на статические объекты. Т.е. вызовов new/delete нет, зато есть ожидание на атомиках. Имхо, такой вариант теста показывает максимальную скорость обмена значениями unsigned int между нитями. Т.е. в случае 4-х потоков, каждый из которых читает и пишет по два unsigned int для взаимодействия с другим потоком, суммарная пропускная способность передачи значений составила 6M в секунду. Для сравнения, когда эти unsigned int-ы просто изменяются одной и той же нитью, которая, к тому же не вызывает new/delete, "суммарная пропускная способность" составляет 267M (хотя в этом случае такой термин вряд ли применим вообще).

Цифры 4.2M (для allocs) и 6M (assigns) можно интерпретировать по-разному. При желании можно их удвоить, т.к. каждая нить выполняет операции чтения-ожидания-записи над двумя atomic-ами. С другой стороны, если нить пишет в очередь исходящие сообщения и принимает входящие, то как раз два atomic-а и потребуются. Поэтому я считаю именно так: 4.2M обменов сообщениями между нитями. Хотя, даже если цифры и удвоить, результаты получаются совсем не такие впечатляющие, как в случае с локальной работой каждой нити.

Под Linux и GCC 4.9.0 получается аналогичная картинка (хотя я пускаю тест под VirtualBox с эмуляцией двух процессорных ядер, т.ч. на реальном железе показатели могут отличаться в ту или иную сторону):

threads: 2
...
*** shift=1 ***
allocs: 2000000, total_time: 3.067s
price: 1.5335e-06s
throughtput: 652103.0323 allocs/s
assigns: 2000000, total_time: 2.941s
price: 1.4705e-06s
throughtput: 680040.8024 assigns/s

*** shift=0 ***
allocs: 2000000, total_time: 0.134s
price: 6.7e-08s
throughtput: 14925373.13 allocs/s
assigns: 2000000, total_time: 0.022s
price: 1.1e-08s
throughtput: 90909090.91 assigns/s

Какой практический вывод из всего этого напрашивается? Таковых несколько:

  • во-первых, использование динамической памяти желательно сводить к минимуму, если есть необходимость передавать мелкие объекты между сущностями, намного выгоднее это делать посредством копирования значений;
  • во-вторых, чем больше работы можно сделать на контексте одной нити, тем лучше. Задействовать многопоточность нужно в условиях, когда требуется действительно вытесняющая многозадачность и независимое выполнение каких-то операций (вроде синхронного ввода/вывода, обращения к внешним устройствам/службам, работа с СУБД и т.д.). Либо же в случае, если стоимость выполняемой на другой нити операции реально очень высока и выделение ее на отдельное аппаратное ядро принесет выигрыш (скажем, обработка больших объемов данных, которые можно раздробить на части и поручить обработку независимых частей разным ядрам);
  • в-третьих, при передаче информации между независимыми нитями нужно стараться как можно чаще копировать данные, и как можно реже передавать ответственность за освобождение динамической памяти другой нити.

Ну и, в продолжение практических выводов (кто про что, а вшивый про баню -- я про SObjectizer). В случае реализации модели акторов (агентов, активных объектов) передача сообщений между акторами посредством динамически-созданных объектов -- это губительно для масштабирования. По крайней мере в случае, если память освобождает не та нить, которая выделила память. Поэтому идеальный вариант очередей сообщений для акторов/агентов -- это очередь копий экземпляров сообщений, а не очередь указателей на динамически созданные экземпляры. (С другой стороны, тут еще нужно посмотреть, как оно будет, если у сообщения сразу несколько получателей, а размеры сообщений исчисляются десятками килобайт или же сообщения содержат в себе объекты с указателями внутри, как те же std::string-и или std::vector). Либо же, нужны какие-то механизмы очистки памяти (memory reclamation), чтобы заявка, сформированная нитью-отправителем, не уничтожалась нитью-получателем, а специальным образом помечалась. После чего нить-отправитель могла бы ее подчистить.

Тут, кстати говоря, на повестку дня вновь выходит вопрос о контроле за нагрузкой на акторов/агентов. Например, если можно указать, что для нормальной работы агенту нужно всего 300Kb, то для агента выделяется блок в 300Kb, внутри которого аллоцируется память под адресованные агенту сообщения. Получается такая циклическая арена, внутри которой хранятся сообщения агента и работа с которой происходит максимально быстро... Но это уже совсем другая история.

Под катом обещанный код теста. Красотой не блещет, но мне нужен был quick & dirty вариант для быстрых экспериментов, потому марафет не наводился. Тип demand_t имеет такой вид, чтобы быть более-менее похожим на описание заявки в SObjectizer-е.

#include <atomic>
#include <chrono>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>

class benchmarker_t
   {
   public :
      inline void
      start()
         {
            m_start = std::chrono::high_resolution_clock::now();
         }

      inline void
      finish_and_show_stats(
         unsigned long long events,
         const std::string & title )
         {
            auto finish_time = std::chrono::high_resolution_clock::now();
            const double duration =
                  std::chrono::duration_cast< std::chrono::milliseconds >(
                        finish_time - m_start ).count() / 1000.0;
            const double price = duration / events;
            const double throughtput = 1 / price;

            std::cout.precision( 10 );
            std::cout << title << ": " << events
                  << ", total_time: " << duration << "s"
                  << "\n""price: " << price << "s"
                  << "\n""throughtput: " << throughtput << " " << title << "/s"
                  << std::endl;
         }

   private :
      std::chrono::high_resolution_clock::time_point m_start;
   };

const std::size_t cycles_count = 1000000;
const std::size_t vector_size = 1;

struct demand_t
{
   void * m_ptr1 = nullptr;
   void * m_ptr2 = nullptr;
   void * m_ptr3 = nullptr;
   void * m_ptr4 = nullptr;
   void * m_ptr5 = nullptr;
   void * m_ptr6 = nullptr;
};

struct demand_vector_t
{
   std::atomic_uint m_vector_size;

   std::vector< demand_t * > m_demands;
};
using demand_vector_shptr_t = std::shared_ptr< demand_vector_t >;

using alloc_func_t = std::function< demand_t *() >;
using dealloc_func_t = std::function< void( demand_t * ) >;

void
thread_body(
   demand_vector_t * to_fill,
   demand_vector_t * to_clear,
   alloc_func_t alloc_func,
   dealloc_func_t dealloc_func )
{
   for( std::size_t c = 0; c != cycles_count; ++c )
   {
      while0 != to_fill->m_vector_size.load( std::memory_order_acquire ) )
         std::this_thread::yield();

      forauto & p : to_fill->m_demands )
         p = alloc_func();

      to_fill->m_vector_size.store(
            to_fill->m_demands.size(),
            std::memory_order_release );

      while0 == to_clear->m_vector_size.load( std::memory_order_acquire ) )
         std::this_thread::yield();
      forauto & p : to_clear->m_demands )
      {
         dealloc_func( p );
         p = nullptr;
      }

      to_clear->m_vector_size.store( 0, std::memory_order_release );
   }
}

void
do_test(
   unsigned int thread_count,
   unsigned int shift,
   alloc_func_t alloc_func,
   dealloc_func_t dealloc_func,
   const char * operation_name )
{
   std::vector< demand_vector_shptr_t > demand_vectors( thread_count,
         demand_vector_shptr_t() );

   forauto & v : demand_vectors )
   {
      v = demand_vector_shptr_t( new demand_vector_t );
      v->m_vector_size.store( 0, std::memory_order_release );
      v->m_demands.resize( vector_size, nullptr );
   }

   benchmarker_t benchmarker;
   benchmarker.start();

   std::vector< std::thread > threads;
   threads.reserve( thread_count );
   for( std::size_t i = 0; i != thread_count; ++i )
      threads.emplace_back( thread_body,
            demand_vectors[ i ].get(),
            demand_vectors[ (i+shift) % thread_count ].get(),
            alloc_func,
            dealloc_func );

   forauto & t : threads )
      t.join();

   benchmarker.finish_and_show_stats( 
         thread_count * cycles_count, operation_name );
}

demand_t * real_alloc() { return new demand_t; }
void real_dealloc( demand_t * p ) { delete p; }

demand_t * pseudo_alloc()
{
   static demand_t demand;
   return &demand;
}

void pseudo_dealloc( demand_t * p )
{}

int main( int argc, char ** argv )
{
   unsigned int thread_count = std::thread::hardware_concurrency();

   if1 < argc )
      thread_count = std::atoi( argv[ 1 ] );

   std::cout << "threads: " << thread_count << "\n..." << std::endl;

   std::cout << "*** shift=1 ***" << std::endl;
   do_test( thread_count, 1, real_alloc, real_dealloc, "allocs" );
   do_test( thread_count, 1, pseudo_alloc, pseudo_dealloc, "assigns" );

   std::cout << "\n" "*** shift=0 ***" << std::endl;
   do_test( thread_count, 0, real_alloc, real_dealloc, "allocs" );
   do_test( thread_count, 0, pseudo_alloc, pseudo_dealloc, "assigns" );
}
Отправить комментарий