понедельник, 14 марта 2016 г.

[prog.c++] Механизм multi chain select (он же multi channel receive) начал дышать...

Многострадальная фича, которая раньше называлась multi channel receive и которую было не очень понятно как воплощать в жизнь, начала дышать. Но уже под названием multi chain select (по аналогии с конструкцией select из языка Go).

Вызов такого select-а выглядит приблизительно таким образом:

so_5::select( wait_time,
   case_( ch_one, handlers... ),
   case_( ch_two, handlers... ),
   ...
   case_( ch_last, handlers... ) );

Ничего лучше с эстетической точки зрения придумать не удалось. Если у кого-то есть какие-то соображения на этот счет, прошу поделиться в комментариях.

Пока готов только самый простой вариант select-а, который выбирает всего одно сообщение из любого из mchain-ов. С ожиданием появления этого сообщения или без ожидания. После некоторого тестирования приступим к реализации более продвинутого select-а, аналогичного продвинутому receive (где можно будет использовать методы вроде handle_n, extract_n и др.).

Под катом небольшой пример, в котором создается три канала (mchain-а) и три рабочих нити. Каждая рабочая нить получает один из каналов в качестве параметра и пишет в свой канал несколько целых чисел, делая паузу случайной длительности после каждой записи. Затем каждая рабочая нить закрывает свой канал и завершает работу. Главная нить посредством select-а вычитывает числа из этих трех каналов. Работа главной нити завершается, когда обнаруживается, что все каналы закрыты.

#include <so_5/all.hpp>

#include <random>

using namespace so_5;
using namespace std;
using namespace std::chrono;

void worker_thread( mchain_t ch, unsigned int values )
{
   default_random_engine rnd_dev;
   rnd_dev.seed();
   uniform_int_distribution<> rnd_gen{ 0250 };

   forunsigned int i = 0; i != values; ++i )
   {
      send< unsigned int >( ch, i );
      this_thread::sleep_for( milliseconds( rnd_gen( rnd_dev ) ) );
   }

   close_retain_content( ch );
}

auto mk_chain( wrapped_env_t & sobj )
{
   return sobj.environment().create_mchain( make_unlimited_mchain_params() );
}

int main()
{
   wrapped_env_t sobj;

   auto ch1 = mk_chain( sobj );
   auto ch2 = mk_chain( sobj );
   auto ch3 = mk_chain( sobj );

   thread w1{ [ch1]{ worker_thread( ch1, 4 ); } };
   thread w2{ [ch2]{ worker_thread( ch2, 5 ); } };
   thread w3{ [ch3]{ worker_thread( ch3, 6 ); } };

   whiletrue )
   {
      auto r = select( infinite_wait,
         case_( ch1, []( unsigned int v ) { cout << "w1: " << v << endl; } ),
         case_( ch2, []( unsigned int v ) { cout << "w2: " << v << endl; } ),
         case_( ch3, []( unsigned int v ) { cout << "w3: " << v << endl; } ) );

      if( mchain_props::extraction_status_t::chain_closed == r.status() )
         break;
   }

   w1.join();
   w2.join();
   w3.join();
}
Отправить комментарий