Обычно стараюсь писать многопоточный код на базе SObjectizer-а, но не всегда есть такая возможность, иногда приходится колупаться с голыми нитями, mutex-ами и condition_variable. Давеча, как раз довелось взяться за голые нити и, естественно, допустил глупую ошибку, которую пришлось в течении часа отлавливать.
Итак, есть объект acquisition_manager, который владеет несколькими acquisition_thread. Передача информации между acquisition_manager и acquisition_thread происходит через объекты gate: для каждого acquision_thread создается своей gate, ссылка на который отдается в конструктор acquision_thread.
Периодически acquision_manager наполняет объекты gate параметрами, после чего дает сигнал acquision_thread выполнить нужную работу и поместить в gate результаты, а когда acquisition_thread завершает свою часть работы, то acquisition_manager забирает результаты из все того же объекта gate.
Для взаимодействия между acquisition_manager и acquisition_thread у класса acquisition_thread есть методы:
start_acquisition()
wait_acquisition_completion()
Предполагается, что менеджер подготавливает информацию в gate, потом дергает start_acquisition(). После вызова start_acquisition() менеджер не должен обращаться к содержимому gate. Менеджер должен вызывать wait_acquisition_completion() и только после этого менеджер может смотреть, что же в итоге оказалось в gate.
Грубо говоря, у менеджера работа построена следующим образом:
for(;;) { for(auto & g : gates_) { ... // Установка значение в gates. } for(auto & t : threads_) { t.start_acquisition(); } for(auto & t : threads_) { t.wait_acquisition_completion(); } for(auto & g : gates_) { ... // Взятие и обработка значений из gates. } } |
В общем, до вызова start_acquisition() и после вызова wait_acquisition_completion() менеджер распоряжается содержимым gate. А вот в промежутке между вызовами start_acquisition() и wait_acquisition_completion() над содержимым gate властвует acquisition_thread.
Реализация не выглядела сложной, но баг я таки умудрился допустить.
Суть была в том, что wait_acquisition_completion() имел приблизительно такую реализацию:
void acquisition_thread::wait_acquisition_completion() { std::unique_guard<std::mutex> lock{lock_}; if(!data_acquired_) data_acquired_cv_.wait(lock, [this]{ return data_acquired_; }); } |
Т.е. текущая нить приостанавливалась пока в data_acquired_ не оказывалось значение true.
А значение data_acquired_ изменялось в основном теле acquisition_thread:
void acquisition_thread::body() { std::unique_lock<std::mutex> lock{lock_}; for(;;) { if(!params_prepared_) params_prepared_cv_.wait(lock, [this]{ return params_prepared_; }); // Указываем, что данные пока не получены. data_acquired_ = false; // Так же снимаем отметку о том, что параметры подготовлены. params_prepared_ = false; // Освобождаем lock_ на время сбора данных. lock.unlock(); ... // Получение данных. // Все остальное нужно делать при захваченном lock_. lock.lock(); data_acquired_ = true; data_acquired_cv_.notify_one(); // Будим того, кто спит в wait_acquisition_completion. } } |
Чтобы картина стала полной нужно еще показать и метод start_acquisition:
void acquisition_thread::start_acquisition() { std::lock_guard<std::mutex> lock{lock_}; params_prepared_ = true; params_prepared_cv_.notify_one(); // Будим нить сбора данных, которая ждет параметров. } |
Реализация была простой, а вот поведение оказалось непредсказуемым и поиски слишком долгими для такой глупой ошибки.
Я почему-то предполагал, что когда менеджер вызывает start_acquisition() и внутри start_acquisition() происходит взведение params_prepared_cv_, то соответствующая acquisition_thread сразу же пытается захватить acquisition_thread::lock_. И когда этот самый lock_ освобождается при выходе из start_acquisition(), то acquisition_thread тут же захватывает этот lock_. Поэтому, когда менеджер вслед за start_acquisition() вызовет wait_acquisition_completion(), то в wait_acquisition_completion мы заснем на захвате lock_.
Оказалось же, что менеджер успевает выйти из start_acquisition() и зайти в wait_acquisition_completion() еще до того, как нить сбора данных среагирует на взведенный params_prepared_cv_.
Из-за этого в wait_acquision_completion() обнаруживается, что data_acquired_ равно true (это старое значение) и ожидания завершения сбора данных на data_acquired_cv_ не происходит. В таком случае сразу же происходит возврат из wait_acquisition_completion() и менеджер думает, что в gate лежат готовые данные, хотя на самом деле они туда еще и не попали (или попадают прямо в этот момент).
Исправление тривиальное: нужно перенести сброс data_acquired_ из body() в start_acquisition() (или в wait_acquisiton_completion()).
Причем, что забавно, на черновике у меня сброс data_acquired_ был изначально именно в start_acquision(), но при переписывании "начисто" я почему-то решил, что присваивать data_acquired_ значение false нужно именно в body(). Улучшил, так сказать :)
Мораль сей басни уже традиционная: если у вас есть возможность не писать многопоточный код, то не пишите его. Если многопоточность все-таки нужна, то попробуйте сперва какие-нибудь высокоуровневые инструменты. И за голую многопоточность с mutex и condition_variable беритесь лишь тогда, когда другого выбора уже нет.
6 комментариев:
В wait_acquisiton_completion его тоже нельзя переносить, так как acquisition_thread::body может успеть выполниться задолго до того, как дойдет выполнение до wait_acquisiton_completion
@sv
body() не может просто так завершиться, для этого должна быть команда от менеджера. Эта команда может быть дана либо до вызова start_acquisition, либо уже после завершения wait_acquisition_completion.
Под "завершиться" я понимал не полное завершение, а завершение одной итерации цикла (или одного этапа выполнения задачи)
@sv
Когда одна итерация закончится, то body заснет на ожидании новых параметров на следующей итерации. А новые параметры могут быть отданы менеджером только после завершения wait_acquisition_completion. Так что даже если итерация закончится еще до вызова wait_acquisition_completion, то body ничего больше делать не будет.
Да, вижу что код body намного сложнее чем казалось в начале. Мне кажется, неплохо бы его переписать. Хотябы для того, чтобы захват мьютекса не переносился на следущую итерацию, очень уж это не интуитивно
@sv
С захватом mutex-а в body сложнее:
- он захватывается еще перед входом в цикл;
- затем он автоматически освобождается и перезахватывается в процессе ожидания params_prepared_cv_;
- затем он вручную освобождается перед тем, как заняться сбором данных (это может быть длительная операция);
- затем он вручную снова захватывается после окончания сбора данных перед выставлением data_acquired_cv_.
Так что на следующую итерацию цикла мы приходим уже с захваченным mutex-ом.
И это наиболее простая версия body которую я смог придумать :(
Отправить комментарий