суббота, 7 мая 2022 г.

[prog.c++] Еще один связанный с многопоточностью баг

Обычно стараюсь писать многопоточный код на базе SObjectizer-а, но не всегда есть такая возможность, иногда приходится колупаться с голыми нитями, mutex-ами и condition_variable. Давеча, как раз довелось взяться за голые нити и, естественно, допустил глупую ошибку, которую пришлось в течении часа отлавливать.

Итак, есть объект acquisition_manager, который владеет несколькими acquisition_thread. Передача информации между acquisition_manager и acquisition_thread происходит через объекты gate: для каждого acquision_thread создается своей gate, ссылка на который отдается в конструктор acquision_thread.

Периодически acquision_manager наполняет объекты gate параметрами, после чего дает сигнал acquision_thread выполнить нужную работу и поместить в gate результаты, а когда acquisition_thread завершает свою часть работы, то acquisition_manager забирает результаты из все того же объекта gate.

Для взаимодействия между acquisition_manager и acquisition_thread у класса acquisition_thread есть методы:

start_acquisition()
wait_acquisition_completion()

Предполагается, что менеджер подготавливает информацию в gate, потом дергает start_acquisition(). После вызова start_acquisition() менеджер не должен обращаться к содержимому gate. Менеджер должен вызывать wait_acquisition_completion() и только после этого менеджер может смотреть, что же в итоге оказалось в gate.

Грубо говоря, у менеджера работа построена следующим образом:

for(;;) {
  for(auto & g : gates_) {
    ... // Установка значение в gates.
  }
  for(auto & t : threads_) {
    t.start_acquisition();
  }
  for(auto & t : threads_) {
    t.wait_acquisition_completion();
  }
  for(auto & g : gates_) {
    ... // Взятие и обработка значений из gates.
  }
}

В общем, до вызова start_acquisition() и после вызова wait_acquisition_completion() менеджер распоряжается содержимым gate. А вот в промежутке между вызовами start_acquisition() и wait_acquisition_completion() над содержимым gate властвует acquisition_thread.

Реализация не выглядела сложной, но баг я таки умудрился допустить.

Суть была в том, что wait_acquisition_completion() имел приблизительно такую реализацию:

void acquisition_thread::wait_acquisition_completion() {
  std::unique_guard<std::mutex> lock{lock_};
  if(!data_acquired_)
    data_acquired_cv_.wait(lock, [this]{ return data_acquired_; });
}

Т.е. текущая нить приостанавливалась пока в data_acquired_ не оказывалось значение true.

А значение data_acquired_ изменялось в основном теле acquisition_thread:

void acquisition_thread::body() {
  std::unique_lock<std::mutex> lock{lock_};
  for(;;) {
    if(!params_prepared_)
      params_prepared_cv_.wait(lock, [this]{ return params_prepared_; });
    // Указываем, что данные пока не получены.
    data_acquired_ = false;
    // Так же снимаем отметку о том, что параметры подготовлены.
    params_prepared_ = false;

    // Освобождаем lock_ на время сбора данных.
    lock.unlock();

    ... // Получение данных.

    // Все остальное нужно делать при захваченном lock_.
    lock.lock();
    data_acquired_ = true;
    data_acquired_cv_.notify_one(); // Будим того, кто спит в wait_acquisition_completion.
  }
}

Чтобы картина стала полной нужно еще показать и метод start_acquisition:

void acquisition_thread::start_acquisition() {
  std::lock_guard<std::mutex> lock{lock_};
  params_prepared_ = true;
  params_prepared_cv_.notify_one(); // Будим нить сбора данных, которая ждет параметров.
}

Реализация была простой, а вот поведение оказалось непредсказуемым и поиски слишком долгими для такой глупой ошибки.

Я почему-то предполагал, что когда менеджер вызывает start_acquisition() и внутри start_acquisition() происходит взведение params_prepared_cv_, то соответствующая acquisition_thread сразу же пытается захватить acquisition_thread::lock_. И когда этот самый lock_ освобождается при выходе из start_acquisition(), то acquisition_thread тут же захватывает этот lock_. Поэтому, когда менеджер вслед за start_acquisition() вызовет wait_acquisition_completion(), то в wait_acquisition_completion мы заснем на захвате lock_.

Оказалось же, что менеджер успевает выйти из start_acquisition() и зайти в wait_acquisition_completion() еще до того, как нить сбора данных среагирует на взведенный params_prepared_cv_.

Из-за этого в wait_acquision_completion() обнаруживается, что data_acquired_ равно true (это старое значение) и ожидания завершения сбора данных на data_acquired_cv_ не происходит. В таком случае сразу же происходит возврат из wait_acquisition_completion() и менеджер думает, что в gate лежат готовые данные, хотя на самом деле они туда еще и не попали (или попадают прямо в этот момент).

Исправление тривиальное: нужно перенести сброс data_acquired_ из body() в start_acquisition() (или в wait_acquisiton_completion()).

Причем, что забавно, на черновике у меня сброс data_acquired_ был изначально именно в start_acquision(), но при переписывании "начисто" я почему-то решил, что присваивать data_acquired_ значение false нужно именно в body(). Улучшил, так сказать :)


Мораль сей басни уже традиционная: если у вас есть возможность не писать многопоточный код, то не пишите его. Если многопоточность все-таки нужна, то попробуйте сперва какие-нибудь высокоуровневые инструменты. И за голую многопоточность с mutex и condition_variable беритесь лишь тогда, когда другого выбора уже нет.

6 комментариев:

sv комментирует...

В wait_acquisiton_completion его тоже нельзя переносить, так как acquisition_thread::body может успеть выполниться задолго до того, как дойдет выполнение до wait_acquisiton_completion

eao197 комментирует...

@sv

body() не может просто так завершиться, для этого должна быть команда от менеджера. Эта команда может быть дана либо до вызова start_acquisition, либо уже после завершения wait_acquisition_completion.

sv комментирует...

Под "завершиться" я понимал не полное завершение, а завершение одной итерации цикла (или одного этапа выполнения задачи)

eao197 комментирует...

@sv

Когда одна итерация закончится, то body заснет на ожидании новых параметров на следующей итерации. А новые параметры могут быть отданы менеджером только после завершения wait_acquisition_completion. Так что даже если итерация закончится еще до вызова wait_acquisition_completion, то body ничего больше делать не будет.

sv комментирует...

Да, вижу что код body намного сложнее чем казалось в начале. Мне кажется, неплохо бы его переписать. Хотябы для того, чтобы захват мьютекса не переносился на следущую итерацию, очень уж это не интуитивно

eao197 комментирует...

@sv

С захватом mutex-а в body сложнее:

- он захватывается еще перед входом в цикл;
- затем он автоматически освобождается и перезахватывается в процессе ожидания params_prepared_cv_;
- затем он вручную освобождается перед тем, как заняться сбором данных (это может быть длительная операция);
- затем он вручную снова захватывается после окончания сбора данных перед выставлением data_acquired_cv_.

Так что на следующую итерацию цикла мы приходим уже с захваченным mutex-ом.

И это наиболее простая версия body которую я смог придумать :(