В последние несколько дней активно экспериментировал с lock-free и не lock-free очередями для передачи данных между нитями. Хотел выжать максимум возможного. Получил несколько обескураживающие результаты и сделал для себя соответствующие выводы. Полагаю, что для опытных в многопоточном программировании людей все нижесказанное будет очевидными и давно известными вещами. К сожалению, таковыми для меня они не оказались. Потому, что вижу, о том и пою, гуру от многопоточности этот текст могут не читать.
Итак, если вы активно используете в своем C++ приложении передачу динамически созданных объектов между нитями, да так, что одна нить создает объект, а вторая нить затем его разрушает, то вы не получите очень уж серьезного выигрыша от lock-free очередей (или других механизмов синхронизации передачи информации между потоками, построенных на низкоуровневых инструментах вроде атомиков).
Просто потому, что весь выигрыш, который дают вам lock-free структуры, будет без следа съеден стоимостью операции delete для объекта, аллоцированного в другом потоке. Т.е. если стоимость деаллокации объекта составляет 10мкс, то стоимость передачи указателя на него в другую нить в районе 1-2мкс -- это не о чем. Ваши потоки будут гораздо больше времени ждать на аллокации/деаллокации, чем на CAS-ах или даже тупых spinlock-ах.
Под катом полностью самодостаточный тест, который можно попробовать у себя. Он делает вот что:
Сначала создает N нитей (задается в командной строке или определяется автоматически). Эти нити в цикле делают две операции: создают несколько объектов demand_t и сохраняет указатели на них объекте to_fill. Затем ждут, пока появятся объекты в объекте to_clear. Как только объект to_clear оказывается заполненным, его содержимое (т.е. все объекты demand_t) удаляются.
Смысл в том, что нить может создавать объекты demand_t для удаления другой нитью. Тут все зависит от того, как именно заданы параметры to_fill и to_clear. Сначала тест запускается так, чтобы нить создавала demand_t в свой to_fill, а удаляла из чужого to_clear. Потом тест запускается так, чтобы и to_fill и to_clear были собственными объектами нити. В этом случае операции new/delete происходят на контексте "родной" нити и, потому, получаются гораздо эффективнее. Такое поведение теста контролируется параметром shift: значение 1 указывает, что to_fill будет "родным", а to_clear -- "чужим". Если же shift=0, то to_fill и to_clear -- это родные объекты нити.
У меня на 4-х аппаратных ядрах и четырех потоках под MSVC++2013 получаются следующие результаты:
$ _vc_12_0_x64/release/_experiments.combined_pass_between.exe 4
threads: 4
...
*** shift=1 ***
allocs: 4000000, total_time: 0.953s
price: 2.3825e-007s
throughtput: 4197271.773 allocs/s
assigns: 4000000, total_time: 0.65s
price: 1.625e-007s
throughtput: 6153846.154 assigns/s
*** shift=0 ***
allocs: 4000000, total_time: 0.168s
price: 4.2e-008s
throughtput: 23809523.81 allocs/s
assigns: 4000000, total_time: 0.015s
price: 3.75e-009s
throughtput: 266666666.7 assigns/s
Здесь при shift=1 пропускная способность для передачи динамически выделенных объектов между нитями получилась порядка 4.2M объектов в секунду (4_197_271). Тогда как при shift=0 нити смогли создавать и удалять объекты с суммарной пропускной способностью 24M объектов в секунду (23_809_523).
Отдельной строкой (помеченной как assigns) стоят результаты еще одной модификации этого теста: вместо реальной аллокации и деаллокации объектов demand_t между нитями передаются указатели на статические объекты. Т.е. вызовов new/delete нет, зато есть ожидание на атомиках. Имхо, такой вариант теста показывает максимальную скорость обмена значениями unsigned int между нитями. Т.е. в случае 4-х потоков, каждый из которых читает и пишет по два unsigned int для взаимодействия с другим потоком, суммарная пропускная способность передачи значений составила 6M в секунду. Для сравнения, когда эти unsigned int-ы просто изменяются одной и той же нитью, которая, к тому же не вызывает new/delete, "суммарная пропускная способность" составляет 267M (хотя в этом случае такой термин вряд ли применим вообще).
Цифры 4.2M (для allocs) и 6M (assigns) можно интерпретировать по-разному. При желании можно их удвоить, т.к. каждая нить выполняет операции чтения-ожидания-записи над двумя atomic-ами. С другой стороны, если нить пишет в очередь исходящие сообщения и принимает входящие, то как раз два atomic-а и потребуются. Поэтому я считаю именно так: 4.2M обменов сообщениями между нитями. Хотя, даже если цифры и удвоить, результаты получаются совсем не такие впечатляющие, как в случае с локальной работой каждой нити.
Под Linux и GCC 4.9.0 получается аналогичная картинка (хотя я пускаю тест под VirtualBox с эмуляцией двух процессорных ядер, т.ч. на реальном железе показатели могут отличаться в ту или иную сторону):
threads: 2
...
*** shift=1 ***
allocs: 2000000, total_time: 3.067s
price: 1.5335e-06s
throughtput: 652103.0323 allocs/s
assigns: 2000000, total_time: 2.941s
price: 1.4705e-06s
throughtput: 680040.8024 assigns/s
*** shift=0 ***
allocs: 2000000, total_time: 0.134s
price: 6.7e-08s
throughtput: 14925373.13 allocs/s
assigns: 2000000, total_time: 0.022s
price: 1.1e-08s
throughtput: 90909090.91 assigns/s
Какой практический вывод из всего этого напрашивается? Таковых несколько:
- во-первых, использование динамической памяти желательно сводить к минимуму, если есть необходимость передавать мелкие объекты между сущностями, намного выгоднее это делать посредством копирования значений;
- во-вторых, чем больше работы можно сделать на контексте одной нити, тем лучше. Задействовать многопоточность нужно в условиях, когда требуется действительно вытесняющая многозадачность и независимое выполнение каких-то операций (вроде синхронного ввода/вывода, обращения к внешним устройствам/службам, работа с СУБД и т.д.). Либо же в случае, если стоимость выполняемой на другой нити операции реально очень высока и выделение ее на отдельное аппаратное ядро принесет выигрыш (скажем, обработка больших объемов данных, которые можно раздробить на части и поручить обработку независимых частей разным ядрам);
- в-третьих, при передаче информации между независимыми нитями нужно стараться как можно чаще копировать данные, и как можно реже передавать ответственность за освобождение динамической памяти другой нити.
Ну и, в продолжение практических выводов (кто про что, а вшивый про баню -- я про SObjectizer). В случае реализации модели акторов (агентов, активных объектов) передача сообщений между акторами посредством динамически-созданных объектов -- это губительно для масштабирования. По крайней мере в случае, если память освобождает не та нить, которая выделила память. Поэтому идеальный вариант очередей сообщений для акторов/агентов -- это очередь копий экземпляров сообщений, а не очередь указателей на динамически созданные экземпляры. (С другой стороны, тут еще нужно посмотреть, как оно будет, если у сообщения сразу несколько получателей, а размеры сообщений исчисляются десятками килобайт или же сообщения содержат в себе объекты с указателями внутри, как те же std::string-и или std::vector). Либо же, нужны какие-то механизмы очистки памяти (memory reclamation), чтобы заявка, сформированная нитью-отправителем, не уничтожалась нитью-получателем, а специальным образом помечалась. После чего нить-отправитель могла бы ее подчистить.
Тут, кстати говоря, на повестку дня вновь выходит вопрос о контроле за нагрузкой на акторов/агентов. Например, если можно указать, что для нормальной работы агенту нужно всего 300Kb, то для агента выделяется блок в 300Kb, внутри которого аллоцируется память под адресованные агенту сообщения. Получается такая циклическая арена, внутри которой хранятся сообщения агента и работа с которой происходит максимально быстро... Но это уже совсем другая история.
Под катом обещанный код теста. Красотой не блещет, но мне нужен был quick & dirty вариант для быстрых экспериментов, потому марафет не наводился. Тип demand_t имеет такой вид, чтобы быть более-менее похожим на описание заявки в SObjectizer-е.
#include <atomic> #include <chrono> #include <cstdlib> #include <functional> #include <iostream> #include <memory> #include <string> #include <thread> #include <vector> class benchmarker_t { public : inline void start() { m_start = std::chrono::high_resolution_clock::now(); } inline void finish_and_show_stats( unsigned long long events, const std::string & title ) { auto finish_time = std::chrono::high_resolution_clock::now(); const double duration = std::chrono::duration_cast< std::chrono::milliseconds >( finish_time - m_start ).count() / 1000.0; const double price = duration / events; const double throughtput = 1 / price; std::cout.precision( 10 ); std::cout << title << ": " << events << ", total_time: " << duration << "s" << "\n""price: " << price << "s" << "\n""throughtput: " << throughtput << " " << title << "/s" << std::endl; } private : std::chrono::high_resolution_clock::time_point m_start; }; const std::size_t cycles_count = 1000000; const std::size_t vector_size = 1; struct demand_t { void * m_ptr1 = nullptr; void * m_ptr2 = nullptr; void * m_ptr3 = nullptr; void * m_ptr4 = nullptr; void * m_ptr5 = nullptr; void * m_ptr6 = nullptr; }; struct demand_vector_t { std::atomic_uint m_vector_size; std::vector< demand_t * > m_demands; }; using demand_vector_shptr_t = std::shared_ptr< demand_vector_t >; using alloc_func_t = std::function< demand_t *() >; using dealloc_func_t = std::function< void( demand_t * ) >; void thread_body( demand_vector_t * to_fill, demand_vector_t * to_clear, alloc_func_t alloc_func, dealloc_func_t dealloc_func ) { for( std::size_t c = 0; c != cycles_count; ++c ) { while( 0 != to_fill->m_vector_size.load( std::memory_order_acquire ) ) std::this_thread::yield(); for( auto & p : to_fill->m_demands ) p = alloc_func(); to_fill->m_vector_size.store( to_fill->m_demands.size(), std::memory_order_release ); while( 0 == to_clear->m_vector_size.load( std::memory_order_acquire ) ) std::this_thread::yield(); for( auto & p : to_clear->m_demands ) { dealloc_func( p ); p = nullptr; } to_clear->m_vector_size.store( 0, std::memory_order_release ); } } void do_test( unsigned int thread_count, unsigned int shift, alloc_func_t alloc_func, dealloc_func_t dealloc_func, const char * operation_name ) { std::vector< demand_vector_shptr_t > demand_vectors( thread_count, demand_vector_shptr_t() ); for( auto & v : demand_vectors ) { v = demand_vector_shptr_t( new demand_vector_t ); v->m_vector_size.store( 0, std::memory_order_release ); v->m_demands.resize( vector_size, nullptr ); } benchmarker_t benchmarker; benchmarker.start(); std::vector< std::thread > threads; threads.reserve( thread_count ); for( std::size_t i = 0; i != thread_count; ++i ) threads.emplace_back( thread_body, demand_vectors[ i ].get(), demand_vectors[ (i+shift) % thread_count ].get(), alloc_func, dealloc_func ); for( auto & t : threads ) t.join(); benchmarker.finish_and_show_stats( thread_count * cycles_count, operation_name ); } demand_t * real_alloc() { return new demand_t; } void real_dealloc( demand_t * p ) { delete p; } demand_t * pseudo_alloc() { static demand_t demand; return &demand; } void pseudo_dealloc( demand_t * p ) {} int main( int argc, char ** argv ) { unsigned int thread_count = std::thread::hardware_concurrency(); if( 1 < argc ) thread_count = std::atoi( argv[ 1 ] ); std::cout << "threads: " << thread_count << "\n..." << std::endl; std::cout << "*** shift=1 ***" << std::endl; do_test( thread_count, 1, real_alloc, real_dealloc, "allocs" ); do_test( thread_count, 1, pseudo_alloc, pseudo_dealloc, "assigns" ); std::cout << "\n" "*** shift=0 ***" << std::endl; do_test( thread_count, 0, real_alloc, real_dealloc, "allocs" ); do_test( thread_count, 0, pseudo_alloc, pseudo_dealloc, "assigns" ); } |
Комментариев нет:
Отправить комментарий