Крамольная идея о декларативном описании конвейеров для обработки сообщений (т.е. о передаче сообщения от одной стадии к другой), была подкинута в обсуждении на LOR-е. Первая пробная реализация была сделана почти сразу. Но потом захотелось большего :)
И вот что буквально только что заработало в качестве прототипа:
virtual void so_evt_start() override { const auto self = so_direct_mbox(); auto first_pipeline = make_pipeline( *this, src | first_stage( "=" ) | second_stage( "-" ) | broadcast( src | third_stage( self ), src | length_calculator() | length_printer() ), autoname ); auto second_pipeline = make_pipeline( *this, src | first_stage( "#" ) | second_stage( "*" ) | third_stage( self ), autoname, // Every stage will work on its own thread. disp::active_obj::create_private_disp( so_environment() )->binder() ); send< first >( first_pipeline, " one " ); send< first >( second_pipeline, " two " ); } |
Здесь создается два конвейера, с несколькими стадиями обработки внутри. Каждая стадия представляется в виде небольшой лямбды. При этом во время компиляции на уровне типов проверяется, чтобы то, что выдает в качестве результата предыдущая стадия, принималось на вход следующей стадией. Т.е. если первая стадия принимает сообщение типа first, а генерирует сообщение типа second, то следующая стадия должна принимать second на вход.
Для каждой стадии создается собственный агент, работа которого заключается в получении очередного сообщения, вызова заданной лямбды и передачи возвращенного из лямбды сообщения дальше. Если лямбда ничего не вернула, то ничего дальше и не отсылается. Что позволяет создавать, например, фильтры.
Каждый конвейер представляется в виде отдельной кооперации. Соответственно, можно назначить кооперации свой диспетчер. Это можно увидеть при создании second_pipeline, там идет привязка к диспетчеру с активными объектами. Так что каждая стадия второго конвейера будет представлена своей собственной рабочей нитью.
Но самое интересное происходит при создании first_pipeline :)
Там после второй стадии идет разделение обработки на два потока. Т.е. инструкция broadcasts() запускает обработку результата second_stage по двум параллельным, независимым веткам. Хотя, за счет C++ных variadic templates, можно делать и больше параллельных веток.
Конечно, это всего лишь прототип и тут есть еще над чем подумать. Например, как сделать так, чтобы при описании конвейера не нужно было начинать со специального маркера src. И как упростить описание стадий, чтобы не нужно было писать вот в таком духе:
stage_handler_t< third, fourth > length_calculator() { return []( const third & msg ) { return make_message< fourth >( msg.m_data.length() ); }; } |
Но еще интереснее придумать, как сделать так, чтобы по конвейеру можно было передавать сигнал EOF.
Это может потребоваться вот в каком случае: допустим, есть конвейер, одной из стадий которого является буфферизатор. Т.е. стадия, которая накапливает несколько мелких сообщений, а дальше отдает одно большое. Когда поток заканчивается, нужно флушировать содержимое буфферизатора. Вопрос в том, как это сделать.
Еще одна интересная задачка -- вставка одной стадии сразу в несколько конвейеров (т.е. создание точки пересечения/объединения конвейеров). И передача в стадию нескольких разных входящих сообщений (например, основное прикладное сообщение и сигнал от таймера). Да, в общем-то, можно и еще чего-нибудь нафантазировать :)
Комментариев нет:
Отправить комментарий