При написании вчерашнего поста я что-то затормозил. На самом деле организовать чтение пакетов из не более чем 100 сообщений из CSP-канала так, чтобы с момента получения первого сообщения прошло не более 100ms, совсем не сложно. Для этого не требуется второй вспомогательный канал. Достаточно одного канала. Но читать из него нужно дважды:
- сначала читается первое сообщение из будущего пакета без ограничения времени;
- затем читаются остальные сообщения будущего пакета но так, чтобы общее время на их чтение не превышало 100ms.
У нас в SO-5 это выглядит вот так:
for(;;) { // Ожидаем первого сообщения. const auto r = receive(chain, infinite_wait, [&](const kafka_msg & msg) { buffer.push_back(msg); }); // Если сообщение действительно пришло, то ждем либо остальные // сообщения, либо завершение тайм-аута. if(r.handled()) { receive(from(chain).handle_n(buffer_size - 1).total_time(100ms), [&](const kafka_msg & msg) { buffer.push_back(msg); }); // На этот момент buffer точно не пуст, поэтому сохраняем его. persist(buffer); } else // Ничего не вычитали из канала т.к. он закрыт. Продолжать нет смысла. break; } |
Тут первый receive выполняет ожидание и обработку всего одного сообщения. А вот второй receive либо обрабатывает 99 сообщений (т.е. формирует полный пакет), либо же работает 100ms (причем не суть важно на что тратится это время -- на обработку сообщений или же на их ожидание).
Я не очень хорошо знаю Go, но, по-моему, там в select-е можно задать тайм-аут только на ожидание сообщений. Так что SObjectizer-овский total_time() для Go-шного select-а нужно будет написать, из коробки в Go я такого не припомню. Если ошибаюсь, то поправьте меня, плиз.
Полный же код демонстрационного примера можно найти вот в этом репозитории.
Этот пример, кстати, демонстрирует возможность использования SObjectizer-5 для многопоточного программирования вообще без акторов. Только CSP-шные каналы.
При этом можно увидеть, что рукопашная работа с рабочими нитями требует изрядной осторожности. Все эти auto_join и auto_close -- они же не от хорошей жизни появились. А для того, чтобы при возникновении исключений каналы закрывались, а рабочие потоки join-ились. Из-за этого, кстати говоря, приходится сначала объявлять объекты типа std::thread и создавать auto_join-ер для них, потом создавать каналы и auto_closer-ы для каналов. И только после этого стартовать рабочие нити. Если поменять последовательность, то при возникновении исключения можно запросто повиснуть на join-е: рабочая нить не завершиться, поскольку никто не закроет канал, на котором она висит в receive.
Подробнее про mchain-ы в SObjectizer (которые и есть CSP-каналы) можно узнать в этой презентации или в этом разделе Wiki.
PS. Ну и не могу не сказать еще раз про современный C++. В примере используется C++14, работать с ним приятно. Теперь бы еще поднять скорость компиляции C++ного кода на порядок, вообще было бы здорово.
Комментариев нет:
Отправить комментарий