среда, 1 февраля 2017 г.

[prog.c++] Небольшой сниппет ко вчерашней теме (порционное чтение из CSP-канала с учетом тайм-аута)

При написании вчерашнего поста я что-то затормозил. На самом деле организовать чтение пакетов из не более чем 100 сообщений из CSP-канала так, чтобы с момента получения первого сообщения прошло не более 100ms, совсем не сложно. Для этого не требуется второй вспомогательный канал. Достаточно одного канала. Но читать из него нужно дважды:

  • сначала читается первое сообщение из будущего пакета без ограничения времени;
  • затем читаются остальные сообщения будущего пакета но так, чтобы общее время на их чтение не превышало 100ms.

У нас в SO-5 это выглядит вот так:

   for(;;) {
      // Ожидаем первого сообщения.
      const auto r = receive(chain, infinite_wait,
            [&](const kafka_msg & msg) { buffer.push_back(msg); });

      // Если сообщение действительно пришло, то ждем либо остальные
      // сообщения, либо завершение тайм-аута.
      if(r.handled()) {
         receive(from(chain).handle_n(buffer_size - 1).total_time(100ms),
               [&](const kafka_msg & msg) { buffer.push_back(msg); });
         // На этот момент buffer точно не пуст, поэтому сохраняем его.
         persist(buffer);
      }
      else
         // Ничего не вычитали из канала т.к. он закрыт. Продолжать нет смысла.
         break;
   }

Тут первый receive выполняет ожидание и обработку всего одного сообщения. А вот второй receive либо обрабатывает 99 сообщений (т.е. формирует полный пакет), либо же работает 100ms (причем не суть важно на что тратится это время -- на обработку сообщений или же на их ожидание).

Я не очень хорошо знаю Go, но, по-моему, там в select-е можно задать тайм-аут только на ожидание сообщений. Так что SObjectizer-овский total_time() для Go-шного select-а нужно будет написать, из коробки в Go я такого не припомню. Если ошибаюсь, то поправьте меня, плиз.

Полный же код демонстрационного примера можно найти вот в этом репозитории.

Этот пример, кстати, демонстрирует возможность использования SObjectizer-5 для многопоточного программирования вообще без акторов. Только CSP-шные каналы.

При этом можно увидеть, что рукопашная работа с рабочими нитями требует изрядной осторожности. Все эти auto_join и auto_close -- они же не от хорошей жизни появились. А для того, чтобы при возникновении исключений каналы закрывались, а рабочие потоки join-ились. Из-за этого, кстати говоря, приходится сначала объявлять объекты типа std::thread и создавать auto_join-ер для них, потом создавать каналы и auto_closer-ы для каналов. И только после этого стартовать рабочие нити. Если поменять последовательность, то при возникновении исключения можно запросто повиснуть на join-е: рабочая нить не завершиться, поскольку никто не закроет канал, на котором она висит в receive.

Подробнее про mchain-ы в SObjectizer (которые и есть CSP-каналы) можно узнать в этой презентации или в этом разделе Wiki.

PS. Ну и не могу не сказать еще раз про современный C++. В примере используется C++14, работать с ним приятно. Теперь бы еще поднять скорость компиляции C++ного кода на порядок, вообще было бы здорово.

Отправить комментарий