четверг, 27 декабря 2018 г.

[prog.flame] Эх, если бы с многопоточностью все было так просто...

Давно хотел что-то подобное написать, но руки не доходили. А намедни совпало: сперва столкнулся с "хитрым" багом в коде, а затем увидел замечательную по своей незамутненности фразу в комментариях к обсуждавшейся вчера статье:

Ни один мейнстрим язык не защитит вас от проблем с многопоточностью на уровне типов.

Насколько я понимаю, Rust защищает разработчика всего от одного вида проблем в многопоточном коде: от гонок. Причем, насколько мне позволяет судить мое давным давно забытое образование, всего от одного типа гонок. А именно от "проезда" по данным, которые сейчас кто-то другой пишет или читает.

Давеча столкнулся в своем коде с другой разновидностью гонок (если это не гонки, то подскажите, плиз, как это называется).

Для того, чтобы объяснить, что произошло, я постараюсь привести несколько фрагментов C++ кода. Это грубое и примитивное приближение к тому, что было у меня. Показываю все это в коде с комментариями ибо расписать все это словами у меня не получилось. Если кому-то непонятно, что происходит в C++ коде, то задавайте вопросы в комментариях, постараюсь ответить.

Итак, есть объект, предназначенный для совместного использования из нескольких нитей, поэтому его содержимое защищено mutex-ом:

class shared_data_processor {
   // Это замок объекта, который защищает объект от многопоточного доступа.
   std::mutex lock_;
   ...
   // Объект может быть в нескольких состояниях. Данное перечисление
   // описывает эти состояния.
   enum class state {
      passive, // Объект еще не был активирован.
      started, // Объект активирован и работает.
      stopping, // Объект остатавливается, но работу еще не завершил.
      stopped // Объект остановлен, больше не работает.
   };
   // И вот само текущее состояние объекта.
   state state_{ passive };
   ...
   // Список объектов, которым в определенный момент нужно сказать,
   // что объект прекращает свою работу.
   stop_guard_container stop_guards_;
public:
   ...
   void start() { // Запуск объекта.
      std::lock_guard<std::mutex> l{lock_};
      ...
      state_ = state::started;
      ...
   }
   void stop() { // Инициирование останова объекта.
      std::lock_guard<std::mutex> l{lock_};
      if(state::started == state_) {
         ... // Выполнение необходимых действий.
         // Всем stop_guard-ам нужно сказать, что операция stop началась.
         for(auto & sg : stop_guards_)
            sg.notify();
      }
   }
   ...
   // Установка вспомогательного объекта, который должен получить
   // нотификацию при начале операции stop.
   void setup_stop_guard(stop_guard guard) {
      std::lock_guard<std::mutex> l{lock_};
      if(state::stopping == state_ || state::stopped == state_)
         // Операция stop уже была выполнена, устанавливать
         // новый stop_guard нельзя.
         throw std::runtime_error("stop() already called");
      // Все нормально, просто сохраняем этот объект у себя.
      stop_guards_.push_back(guard);
   }
   ...
};

Суть в том, что этот объект может находится в нескольких состояниях. И когда он находится в состояниях stopping/stopped, то операцию setup_stop_guard выполнять нельзя.

Был еще один объект, который внутри себя объединял std::thread и shared_data_processor: в конструкторе запускалась новая рабочая нить, на которой у shared_data_processor вызывался метод start. Что-то в духе:

class data_processor_wrapper {
   shared_data_processor processor_;
   std::thread work_thread_;
   ...
public:
   data_processor_wrapper()
      :  work_thread_{
            // Сразу в конструкторе запускаем новую рабочую нить и
            // передаем ей лямбду, которая и будет главной функцией
            // новой нити.
            [this] {
               // Запускаем processor.
               processor_.start();
               // Далее работаем пока нас не остановят.
               while(!processor.stopped())
                  processor.handle_next();
            }
         }
   {}
   ~data_processor_wrapper()
   {
      // В деструкторе останавливаем рабочую нить.
      processor_.stop();
      work_thread_.join();
   }
   ...
};

Т.е. у data_processor_wrapper такого вида вроде бы ничего сложного. Но на самом деле конструктор data_processor_wrapper был несколько сложнее. Он мог получать дополнительный аргумент -- функцию, которую нужно было на новой рабочей нити сразу после вызова метода start() у shared_data_processor-а. Кроме того, у wrapper-а был метод stop(), который позволял остановить работу извне. Т.е. на самом деле было что-то подобное вот этому:

class data_processor_wrapper {
   shared_data_processor processor_;
   std::thread work_thread_;
   ...
public:
   data_processor_wrapper(
      std::function<void(shared_data_processor &)> tuner)
      :  work_thread_{
            // Сразу в конструкторе запускаем новую рабочую нить и
            // передаем ей лямбду, которая и будет главной функцией
            // новой нити.
            // Аргумент tuner пробрасываем в новую нить через лямбду.
            [this, tuner] {
               // Запускаем processor.
               processor_.start();
               // Если задана функция tuner, то вызываем ее.
               if(tuner)
                  tuner(processor_);
               // Далее работаем пока нас не остановят.
               while(!processor.stopped())
                  processor.handle_next();
            }
         }
   {}
   ~data_processor_wrapper()
   {
      // В деструкторе останавливаем рабочую нить.
      stop();
   }
   ...
   void stop() { // Даем команду на останов и дожидаемся завершения работы.
      processor_.stop();
      if(work_thread.joinable())
         work_thread_.join();
   }
};

Итак, есть объект shared_data_processor и data_processor_wrapper. В реальности, конечно же, они были хитрее, чем показано выше, но сейчас важно то, что написаны они были давно и все это время нормально работали. А потом в один прекрасный момент мне потребовалось сделать специальный wrapper уже для data_processor_wrapper-а. Что-то вроде:

class special_wrapper {
   data_processor_wrapper processor_;
   ...
public:
   special_wrapper()
      :  processor_{
            // Нам нужно при старте рабочей нити установить свой
            // собственный stop_guard. Поэтому в конструктор
            // data_processor_wrapper-а мы передаем функцию, которая
            // именно это и делает.
            [](shared_data_processor & p) {
               p.setup_stop_guard(make_special_stop_guard());
            } 
         }
   {}

   void stop() { processor_.stop(); }
   ...
};

Т.е. по сути мы получаем вот что:

  • запускается новая рабочая нить;
  • на этой нити вызывается shared_data_processor::start();
  • затем на этой нити вызывается shared_data_processor::setup_stop_guard();
  • затем эта нить работает до тех пор, пока не будет вызван stop().

Надеюсь, пока все достаточно просто и понятно. Поэтому вот такой код, который использует special_wrapper не должен вызвать сложности:

void do_something() {
   special_wrapper wrapper;
   ... // Какие-то действия.
   wrapper.stop();
   ... // Еще какие-то действия.
}

Вот у меня он тоже сложностей не вызывал. Пока не начал эпизодически падать из-за того, что возникало исключение std::runtime_error("stop() already called").

Не знаю, кто из читателей сразу может указать причину. Мне потребовалось около часа времени для того, чтобы разобраться что к чему.

Суть в том, что я находился в предположении, что к моменту завершения работы конструктора special_wrapper все начальные операции на новой рабочей нити уже будут выполнены. Т.е., я был уверен, что будет следующая последовательность действий:

  • вызывается конструктор special_wrapper;
  • запускается новая рабочая нить;
  • на этой нити вызывается shared_data_processor::start();
  • затем на этой нити вызывается shared_data_processor::setup_stop_guard();
  • завершается конструктор special_wrapper;
  • затем эта нить работает до тех пор, пока не будет вызван stop().

Что, естественно, было совсем не так :)

На самом деле при завершении конструктора special_wrapper мы можем быть уверены лишь в том, что ОС создала нам новую нить. Но далеко не факт, что эта нить успела начать работать. На что я, собственно говоря, и напоролся.

Получилось следующее:

  • на нити T1 вызывается конструктор special_wrapper;
  • запускается новая рабочая нить T2;
  • на нити T2 вызывается shared_data_processor::start();
  • на нити T1 завершается конструктор special_wrapper;
  • на нити T1 выполняется часть последующих действий, включая вызов stop();
  • на нити T2 вызывается shared_data_processor::setup_stop_guard(). Что и приводит к выбросу исключения, т.к. stop() для shared_data_processor уже вызван, а после этого вызывать setup_stop_guard() уже нельзя.

Причем такое стечение обстоятельств возникает не часто. Я гонял тесты несколько дней, прежде чем у меня это баг проявился в первый раз. А когда занялся его поисками, то для его повторения приходилось делать по несколько десятков запусков теста, чтобы баг возник снова.

Вот такая вот гримаса многопоточности. Вроде как все данные защищены, никакой порчи нет (т.е. нет классических data races). Но из-за того, что мне сложно вообразить все возможные сценарии взаимного исполнения параллельно работающих нитей, возникают некорректные предположения о том, в каком порядке будут выполняться действия на каждой из них. Отсюда и ошибки. Плюс везение в их своевременном обнаружении.


Мораль все этой истории проста. Повторю то, что говорил уже многократно: многопоточность это сложно. Поэтому не нужно думать, что выбрав какой-то мегаинструмент вы оказываетесь надежно защищены от ужасов многопоточности. Там такое большое пространство для граблей, что всегда есть куда вступить и получить по мозгам.

Комментариев нет: