Данный пост является попыткой осмысления новой фичи для SObjectizer. Но, надеюсь, он так же будет интересен и программистам, занимающимся разработкой многопоточного софта вообще. Будь то на C++, на Java или Scala. Или на Go. Мнение Go-шных разработчиков, кстати говоря, будет наиболее интересным.
Итак, некоторое время назад в SO-5 были добавлены message chain-ы (mchains), которые очень похожи на Go-шные каналы (вот здесь можно посмотреть, как один и тот же простенький пример выглядит с Go-шными каналами и с mchain-ами). Главные отличия между каналами в Go и mchain-ами заключаются в том, что mchain-ы не ограничены по типу данных (т.е. в одном mchain-е могут храниться сообщения разных типов) и в SO-5 операция receive имеет более продвинутые возможности, чем чтение из канала в Go.
Принципиальное ограничение текущей процедуры receive в том, что receive делает выборку сообщений только из одного mchain. А хочется сделать возможность выполнения операции receive сразу из нескольких mchain-ов.
Где это может потребоваться?
Допустим, у нас есть две подзадачи, работающие параллельно и независимо друг от друга. Для общения с каждой из задач мы используем свой исходящий канал и свой входящий канал. Что-то вроде:
// Запускаем первую задачу. auto task1_cmd = create_mchain(...); auto task1_res = create_mchain(...); thread task1{ [&task1_cmd, &task1_res] {...} }; // Запускаем вторую задачу. auto task2_cmd = create_mchain(...); auto task2_res = create_mchain(...); thread task2{ [&task2_cmd, &task2_res] {...} }; // Получаем результат первой задачи. receive( from(task1_res), ... ); // Получаем результат второй задачи. receive( from(task2_res), ... ); |
Проблема здесь в том, что мы читаем результаты работы задач строго по очереди и строго в зафиксированной последовательности. Что не всегда есть хорошо. Ведь если вторая задача прислала свой результат намного раньше, чем первая, то мы не сможем его обработать пока не дождемся результата от первой задачи.
Хотелось бы иметь возможность обрабатывать результаты тогда, когда они появляются и в том порядке, в котором они появляются. Т.е. нужна возможность запустить receive на двух mchain-ах:
// Получаем и обрабатываем результаты задач по мере их поступления. receive( from_any_of(task1_res, task2_res), ... ); |
Здесь нужно сделать маленькое лирическое отступление. Лучше всего эта ситуация разрешается, если у нас есть возможность передать в обе задачи один и тот же mchain для сбора результатов. Тогда работает обычный receive из одного mchain-а и проблем нет.
Однако, такое простое решение не всегда будет возможно. Например, если реакция на результат зависит от того, от какой именно задачи этот результат был получен. Когда у нас всего один mchain с результатами, мы можем вообще не узнать, какая задача нам свой результат прислала.
Еще одна ситуация, в которой у нас нет большого выбора -- это когда задачи и mchain-ы для них создает кто-то другой. Т.е. мы вынуждены работать с сторонним API. Что-то вроде:
// Запускаем задачи используя сторонний API.
auto task1_controls = some_lib::create_worker(...);
auto task2_controls = another_lib::worker_factory(...).create_task(...);
// Получаем и обрабатываем результаты задач по мере их поступления.
receive(
from_any_of(
// Канал результатов первой задачи.
task1_controls.query_result_channel(),
// Канал результатов второй задачи.
task2_controls.worker_info().results_accessor().channel() ),
... );
Из вышеизложенного получается, что нечто под названием multi channel receive, необходимо. А раз так, значит нужно делать. Но тут есть вопросы.
Первый вопрос: должен ли multi channel receive поддерживать разные наборы обработчиков сообщений для каждого канала или же все обработчики должны быть общими для всех каналов? Т.е. можно писать вот так:
receive( from_any_of(task1_res, task2_res), handler_one, handler_two, handler_three, ... ); |
И тогда handler_one будет запускаться для сообщения соответствующего типа вне зависимости от того, из какого канала оно было получено. При этом мы теряем возможность по-разному реагировать на сообщения из разных каналов. Или же мы можем писать вот так:
receive( from_any(), // Обработчики сообщений из первого канала. for_all_from( task1_res, handler_one, handler_two, handler_three, ... ), // Обработчики сообщений из второго канала. for_all_from( task2_res, handler_one, handler_two, handler_three, ... ) ); |
При этом у нас появляется возможность обрабатывать однотипные сообщения из разных каналов по-разному. Но ценой дублирования одинаковых обработчиков.
В принципе, я пока склоняюсь именно ко второму варианту. Т.к. этот сценарий, т.е. разная реакция на сообщения из разных каналов, мне представляется наиболее востребованным для multi channel receive.
При этом, наверное, остается возможность уменьшить количество писанины для случая, когда обработчики сообщений из нескольких каналов тем или иным образом группируются:
receive( from_any(), // Списки обработчиков для двух каналов совпадают. for_all_from( task1_res, task2_res, handler_one, handler_two, handler_three, ... ), // А у третьего канала список обработчиков будет своим. for_all_from( task3_res, different_handler_one, different_handler_two, different_handler_three, ... ) ); |
Второй вопрос больше технический, нежели идеологический. Он связан с тем, что для реализации multi channel receive нужно научиться как-то мультиплексировать несколько condition_variable в одном wait-е, оперируюя при этом только возможностями стандартной библиотеки C++11. А в этой библиотеке аналога WaitForMultipleObjects из Win32 API, насколько я помню, нет. Соответственно, есть некая алгоритмическая задачка о том, как разбудить multi channel receive, заснувший на N каналах, когда в какой-то из этих каналов производится запись сообщения.
Некое решение этого вопроса в голове крутится. Но оно требует ограничения: канал можно задействовать только в одном multi channel receive. Т.е. если две разных нити захотят использовать один и тот же канал в своих собственных multi channel receive, то это будет невозможно.
Более того, будет невозможным и вот такой фокус:
receive( from_any(), // Для большинства сообщений из двух каналов обработчики // будут одинаковыми... for_all_from( task1_res, task2_res, handler_one, handler_two, handler_three, ... ), // Но есть одно сообщение из первого канала, которое должно // быть обработано уникальным образом. for_all_from( task1_res, very_different_handler ) ); |
Другими словами, при той схеме реализации multi channel receive, которая пока мне представляется возможной, mchain-ы в контексте multi channel receive опять оказываются Multi-Producer/Single-Consumer сущностями (а ведь они уже успели перестать быть таковыми для обычного receive).
Посему вопрос: не кажется ли такое ограничение слишком уж драконовским?
Третий вопрос вырисовался: допустим, сделан вызов multi channel receive на трех каналах. Все три каналы пусты, поэтому receive спит до получения первого сообщения из какого-либо канала. Тут один из этих трех каналов закрывается. Нужно ли выходить из receive?
Комментариев нет:
Отправить комментарий