Многострадальная фича, которая раньше называлась multi channel receive и которую было не очень понятно как воплощать в жизнь, начала дышать. Но уже под названием multi chain select (по аналогии с конструкцией select из языка Go).
Вызов такого select-а выглядит приблизительно таким образом:
so_5::select( wait_time, case_( ch_one, handlers... ), case_( ch_two, handlers... ), ... case_( ch_last, handlers... ) ); |
Ничего лучше с эстетической точки зрения придумать не удалось. Если у кого-то есть какие-то соображения на этот счет, прошу поделиться в комментариях.
Пока готов только самый простой вариант select-а, который выбирает всего одно сообщение из любого из mchain-ов. С ожиданием появления этого сообщения или без ожидания. После некоторого тестирования приступим к реализации более продвинутого select-а, аналогичного продвинутому receive (где можно будет использовать методы вроде handle_n, extract_n и др.).
Под катом небольшой пример, в котором создается три канала (mchain-а) и три рабочих нити. Каждая рабочая нить получает один из каналов в качестве параметра и пишет в свой канал несколько целых чисел, делая паузу случайной длительности после каждой записи. Затем каждая рабочая нить закрывает свой канал и завершает работу. Главная нить посредством select-а вычитывает числа из этих трех каналов. Работа главной нити завершается, когда обнаруживается, что все каналы закрыты.
#include <so_5/all.hpp> #include <random> using namespace so_5; using namespace std; using namespace std::chrono; void worker_thread( mchain_t ch, unsigned int values ) { default_random_engine rnd_dev; rnd_dev.seed(); uniform_int_distribution<> rnd_gen{ 0, 250 }; for( unsigned int i = 0; i != values; ++i ) { send< unsigned int >( ch, i ); this_thread::sleep_for( milliseconds( rnd_gen( rnd_dev ) ) ); } close_retain_content( ch ); } auto mk_chain( wrapped_env_t & sobj ) { return sobj.environment().create_mchain( make_unlimited_mchain_params() ); } int main() { wrapped_env_t sobj; auto ch1 = mk_chain( sobj ); auto ch2 = mk_chain( sobj ); auto ch3 = mk_chain( sobj ); thread w1{ [ch1]{ worker_thread( ch1, 4 ); } }; thread w2{ [ch2]{ worker_thread( ch2, 5 ); } }; thread w3{ [ch3]{ worker_thread( ch3, 6 ); } }; while( true ) { auto r = select( infinite_wait, case_( ch1, []( unsigned int v ) { cout << "w1: " << v << endl; } ), case_( ch2, []( unsigned int v ) { cout << "w2: " << v << endl; } ), case_( ch3, []( unsigned int v ) { cout << "w3: " << v << endl; } ) ); if( mchain_props::extraction_status_t::chain_closed == r.status() ) break; } w1.join(); w2.join(); w3.join(); } |
Комментариев нет:
Отправить комментарий