Под катом чуть более подробное описание решения задачи md5_bruteforce2 на SObjectizer.
Само решение находится вот в этой ветке Svn-репозитория. Единственный исходный файл с решением можно посмотреть либо прямо в репозитории, либо на pastebin.
Сразу предупреждаю, что в коде используются фичи SObjectizer версии 5.5.5, которая сейчас находится в стадии ранней альфы, а в релиз должна уйти где-то ближе к концу мая 2015-го. На стабильной версии 5.5.4 было на на 10-15 строк побольше, за счет более многословного определения типов сообщений.
Суть решения в том, что используются три типа агентов:
- агенты-воркеры, реализуемые C++ным типом worker_t. Реагируют всего на одно сообщение: msg_process_line, в котором содержится прочитанная из stdin строка с заданием. Агент парсит строку, выделяет из ее объект типа task_t, после чего осуществляет поиск пароля в указанном диапазоне. Если при в процессе работы агента возникает исключение, то оно выпускается наружу, что приводит к "падению" агента и дерегистрации кооперации, в которой он находился;
- агент для чтения stdin, реализуемый типом input_loader_t. Реагирует всего на одно сообщение msg_load_next, получив которое пытается прочитать очередную строку с stdin. Но зато имеет два состояния. В обычном, дефолтном состоянии, агент честно пытается прочитать не пустую строку с stdin-а. Если это удается, то прочитанная строка в виде сообщения msg_process_line отправляется агенту-менеджеру. Однако, если в процессе чтения обнаружился EOF, то агент переходит в состояние st_eof и отсылает менеджеру сигнал msg_no_more_lines. Находясь в состоянии st_eof, агент-читатель не пытается читать stdin при получении очередного msg_load_next. Вместо этого сразу же отсылается msg_no_more_lines;
- агент-менеджер (он же мастер), реализуемый C++ным типом manager_t. При своем старте создает столько воркеров, сколько ему было задано (либо значение из командной строки, либо по одному на каждое доступное ядеро). Когда получает подтверждение о готовности очередного воркера, запрашивает у агента-читателя следующую строку с stdin. Когда строка поступает, отдает ее первому из свободных воркеров. Когда от воркера поступает сообщение с результатом обработки (это либо сообщение msg_password_found, либо сообщение msg_password_not_found), воркер помещается в список свободных воркеров, а агенту-читателю отсылается очередное сообщение msg_load_next. Если же менеджер получает уведомление о "падении" воркера, то менеджер выбрасывает информацию о задаче, которая была выдана этому воркеру и создает нового воркера. Работа завершается, когда менеджер получает msg_no_more_lines и при этом не оказывается задач, находящихся в обработке.
В общем, ничего сложного. Но решение использует несколько, если не фокусов, то особенностей реализации.
Во-первых, чтение stdin синхронизировано с результатами поиска паролей. И хотя, физически, чтение stdin происходит на другом потоке, а не на том, где обрабатываются результаты поиска, чтение инициируется только тогда, когда у менеджера появляется свободный воркер. До этого момента содежимое stdin-а не читается.
Во-вторых, агент-читатель может отсылать msg_no_more_lines менеджеру несколько раз. Фактически, менеджер не воспринимает no_more_lines тогда, когда у него есть работающие воркеры. Сообщение доходит до менеджера, менеджер проверяет, что обработка не закончилась, и не реагирует на него. Реакция произойдет только когда освободится последний воркер: менеджер в очередной раз запросит у читателя следующую строку и уже в этот раз, получив no_more_lines, завершит работу программы. Если же последний воркер не завершится нормально, а "упадет", то менеджер все равно создаст нового воркера, затем запросит новую строку, и лишь получив в ответ no_more_lines, остановит приложение. Возможно, если воркеров будет сотня-другая тысяч, то это поведение окажется неэффективным и его нужно будет переделывать. Но пока такая реализация выглядит допустимой.
В-третьих, менедежер не может получить причину "падения" воркера. Воркер выпускает наружу исключение, это исключение логируется самим SObjectizer-ом. А менеджер получает лишь уведомление, что воркер закончил свою работу из-за исключения. Но в данном уведомлении информации об исключении нет. Логирование исключений SObjectizer, по умолчанию, выполняет на stderr. Это дело можно переопределить, назначив собственный exception_logger, но этого в данной реализации не делается.
Пожалуй, самый важный момент реализации связан с агентами-воркерами. Менеджер использует для них отдельный экземпляр thread_pool-диспетчера:
class manager_t : public agent_t { public : manager_t( context_t ctx, mbox_t input_loader, size_t worker_count ) ... , m_worker_disp( thread_pool::create_private_disp( so_environment(), worker_count ) ) {} ... void create_new_worker() { const auto worker_id = "worker-" + to_string( m_worker_ordinal++ ); introduce_child_coop( *this, // Worker id will be name of the worker cooperation. worker_id, // Worker will work on thread pool. m_worker_disp->binder( thread_pool::params_t{} ), [&]( agent_coop_t & coop ) { |
Таким образом воркеры будут привязываться к одному и тому же пулу потоков, новые потоки для воркеров создаваться не будут.
Далее важно то, что менеджер подписывается на уведомления о регистрации/дерегистрации коопераций с воркерами. Каждый воркер -- это отдельная кооперация. И менеджер пользуется функциональностью SObjectizer-овских нотификаторов, которые могут отсылать уведомления после завершения операций регистрации и дерегистрации кооперации:
coop.add_reg_notificator( make_coop_reg_notificator( so_direct_mbox() ) ); coop.add_dereg_notificator( make_coop_dereg_notificator( so_direct_mbox() ) ); |
И следующий важный момент -- это указание того, что нужно делать, когда агент выпускае наружу исключение. Когда агент выбрасывает исключение, SObjectizer спрашивает у агента, как поступить в данном случае. Агент может сказать, мол, проигнорируй его или грохни приложение через std::abort. Но по умолчанию агент отвечает "спроси у кооперации, может она лучше знает". SObjectizer спрашивает у кооперации, которой принадлежит агент. Та так же может ответить, мол, проигнорируй, грохни программу или спроси у того, кому я подчиняюсь. И т.д.
В данном случае нам нужно, чтобы кооперация, в которую входит проблемный агент, была дерегистрирована. Но чтобы приложение продолжило работать. Поэтому мы явно задаем такой тип реакции:
coop.set_exception_reaction( exception_reaction_t::deregister_coop_on_exception ); |
Пожалуй, это все хитрости реализации. Разве что можно сказать, что каждый из агентов работат всего на одном потоке. Для агента-читателя этот поток предоставлен приватным one_thread-диспетчером. Для менеджера этот поток предоставлен диспетчером по умолчанию, который есть в каждом экземпляре SObjectizer Environment. Для воркеров выделяется одна нить из пула. Но, повторюсь, каждый агент работает всего на одном потоке, поэтому агентам нет надобности заботится о синхронизации доступа к своим потрохам.
Немного статистики по объему кода для тех, кто уделяет внимание этому показателю. На ревизии 1396 файл md5_brute_force.cpp имеет объем в 437 строк. А если выбросить пустые строки и строки, не содержащие хотя бы одной буквы или цифры (т.е. строки с открывающими/закрывающими скобками), то останется всего 273 строк, часть из которых комментарии.
Объем можно было бы еще довольно заметно уменьшить, если бы вот вместо такой подписки агента-менеджера:
virtual void so_define_agent() override { so_subscribe_self() .event( &manager_t::evt_next_line_received ) .event< msg_no_more_lines >( &manager_t::evt_no_more_lines ) .event( &manager_t::evt_password_found ) .event( &manager_t::evt_password_not_found ) .event( &manager_t::evt_worker_coop_started ) .event( &manager_t::evt_worker_coop_destroyed ); } |
Использовалось бы что-то вроде:
virtual void so_define_agent() override { so_subscribe_self() .event( &manager_t::evt_next_line_received ) .event< msg_no_more_lines >( [this] { if( m_scheduled_lines.empty() ) so_deregister_agent_coop_normally(); } ) .event( [this]( const msg_password_found & evt ) { handle_worker_result( get<0>(evt), get<1>(evt) ); } ) .event( [this]( const msg_password_not_found & evt ) { handle_worker_result( get<0>(evt), "NOT FOUND" ); } ) .event( &manager_t::evt_worker_coop_started ) .event( &manager_t::evt_worker_coop_destroyed ); } |
Но я придерживаюсь мнения, что сокращение объема кода за счет падения читабельности, понятности и простоты, не стоит затраченных усилий.
В завершении хочу сказать, что реализация примеров md5_bruteforce и md5_bruteforce2 оказалось очень полезной. Получился хороший толчок в сторону сокращения многословности SObjectizer-овского кода. Сухим остатком от этого уже стали методы introduce_coop, introduce_child_coop плюс возможность определять тупл в качестве SObjectizer-сообщения (т.е. шаблон tuple_as_message_t). И это еще не предел, еще несколько мыслей настойчиво крутится в голове и требуют воплощения в коде.
Был приятно удивлен несколькими вещами. Во-первых, тем, насколько востребованы оказались самые разные возможности SObjectizer в одном небольшом примере. В частности, тут есть и сообщения с данными внутри, и сигналы, и безымянные MPSC-почтовые ящики (т.н. direct_mbox-ы), и именованные MPSC-ящики, и явное задание собственных имен для коопераций, и автоматическое именование коопераций, и нотификации о происходящих с кооперациями событиях, и реакция на выброшенные агентами исключения, и явно заданные состояния агентов, и разные типы диспетчеров... В общем все, чем потихонечку наполнялся SObjectizer-5 все эти годы, было использовано и позволило сократить количество труда и забот программиста.
Во-вторых, даже на таком небольшом примере лучше понимаешь, как с ростом объема и сложности приложения, объем связанного с самим SObjectizer-ом кода снижается в относительном выражении. Т.е. в простом HelloWorld-е описания сообщений и агентов занимают изрядную долю кода и потребляют значительную часть времени и внимания разработчика. Тогда как уже в примере на 300-400 строк, связанные с SO-5 вещи становится все менее и менее заметными. Более того, в реализациии данной задачи количество времени (да и кода, пожалуй), которое пришлось потратить на работу с чтением, парсингом и валидацией заданий, оказалось чуть ли не больше, чем количество времени, ушедшего на реализацию и отладку логики работы агентов.
Так что, имхо, тот редкий случай, когда измерение у кого код длинее, оказалось очень полезным на практике.
Комментариев нет:
Отправить комментарий