четверг, 19 августа 2021 г.

[prog.thougths] Попытка спроектировать механизм взаимодействия процессов на одной ноде через shared memory по принципу single-producer/multi-consumers

С конца прошлой недели занимаюсь проектированием механизма, посредством которого процесс-producer смог бы передавать через shared memory информацию одному или нескольким процессам-consumer-ам. Вроде бы придумалось какое-то решение, которое выглядит рабочим. Данный пост является попыткой еще одной проверки этого решения: если получится упомянутый механизм внятно описать словами, то есть хорошая вероятность, что он будет воплощен в коде.

Может быть кому-то будет интересно посмотреть на чужой велосипед. Вдруг кто-то из читателей обнаружит незамеченные мной косяки или вообще ткнет пальцем в готовое решение, которое я, в силу узости кругозора, тупо не заметил.

В общем, всех заинтересовавшихся приглашаю под кат.

Типа постановка задачи

Деталей раскрывать не могу, т.к. все это делается не для собственного развлечения. Поэтому постараюсь рассказать общими словами, но максимально подробно.

Есть процесс-producer, который производит некие объекты с информацией. Эта информация должна поступать процессам-consumer-ам. У одного producer-а может быть сразу несколько consumer-ов.

Темп генерации объектов -- не больше 50-60 в секунду на начальном этапе, возможно, затем будет 100-150 в секунду.

Главным образом генерируются объекты размером в несколько мегабайт каждый (вероятно, под двадцать мегабайт максимум). В первую очередь нужно поддержать обмен именно такими большими объектами. Возможно, со временем producer начнет в общем потоке генерировать и объекты меньшего размера (от сотен байт до сотен килобайт), но главная цель сейчас -- это научиться передавать объекты размером в несколько мегабайт в количестве нескольких десятков в секунду.

Собственно, shared memory рассматривается именно из-за размера передаваемых объектов. Как-то не хочется пропихивать сотни мегабайт данных между процессами, работающими на одной ноде, через IPC-механизмы типа сокетов или пайпов.

Обязательные граничные условия

Есть важные особенности, которые оказывают серьезное влияние на итоговое решение.

Во-первых, после того, как producer разместил (будем говорить "опубликовал") в shared memory готовый объект, содержимое этого объекта уже не меняется. Процессы-consumer-ы только читают содержимое опубликованных объектов, а producer после публикации объект не модифицирует. Т.е. опубликованные объекты можно считать константными.

Во-вторых, нет требований надежной доставки объектов. Т.е. если объект опубликовали, но информация об этом не дошла до consumer-ов по каким-то причинам (например, consumer-ов еще нет), то это не страшно. Объект будет просто потерян. Такие потери не есть хорошо, но и страшного в этом нет, т.к. producer все равно постоянно генерирует новые объекты с более актуальной информацией.

В-третьих, и это самое главное, что приходится учитывать: взаимодействующие процессы нельзя считать надежными, т.е. они могут падать и/или подвисать. Надежным мы можем считать лишь реализацию механизма обмена данными через shared memory (т.к. мы эту реализацию делаем сами от и до), но вот остальная часть этих процессов -- это сторонний код, на качество и надежность которого мы повлиять не можем. Так что в любой момент любой из процессов может завершить свою работу потому, что на какой-то из рабочих нитей сторонний код выполнил деление на ноль, обратился по невалидному указателю, переполнил стек или отстрелил себе ногу еще каким-то способом (коих в C++ и теплой ламповой сишечке овердофига).

Так что одно из главных требований к обсуждаемому механизму -- это способность пережить падение+рестарт или зависание любого из процессов (будь то producer или consumer).

Еще одно требование -- механизм обмена данными должен щадяще относиться к потреблению ресурсов. Процессы producer и consumer будут достаточно прожорливыми сами по себе (в смысле потребления CPU), поэтому желательно избежать вещей, которые жрут CPU при реализации межпроцессного обмена (типа busy waiting).

Есть контроль за жизнеспособностью producer-а и consumer-ов

Процессы producer и consumer работают под присмотром супервизора, который их запускает и затем следит за их жизнеспособностью. Так что если какой-то из этих процессов аварийно завершит работу, то супервизор перезапустит упавший процесс.

Предполагается, что супервизор будет иметь какие-то средства для проверки того, что процесс не завис и работает нормально (посредством каких-то прикладных ping-ов). Так что есть надежда, что супервизор сможет перезапускать и зависшие процессы (например, вошедшие в бесконечный цикл или поймавшие дедлок).

Предполагаемый механизм обмена данными

Описание разбито на две части.

В первой части описывается принцип работы обсуждаемого механизма. Этот механизм нуждается в каком-то способе обмена некими управляющими сообщениями между процессами. И этот способ не shared memory, а что-то из категории сокетов, именованных каналов (named pipes) или POSIX message queues. Что именно из IPC будет использоваться не оказывает непосредственного влияния на принцип работы самого механизма, поэтому в первой части описания просто предполагается, что такой способ есть.

Во второй части обсуждается, как именно может быть реализован обмен сообщениями между процессами с использованием имеющихся в Linux/Windows механизмов IPC.

Принцип работы механизма обмена данными через shared memory

Предпосылки

У процесса-producer-а есть некая точка входа с фиксированным именем, через которую процессы-consumer-ы могут присылать producer-у свои сообщения. Для того, чтобы consumer мог получать данные от producer-а, consumer должен знать имя точки входа. Каким именно способом consumer получит это имя мы не рассматриваем (через параметры командной строки, из конфигурационного файла, из переменных окружения и т.д.).

У каждого consumer-а есть некий собственный идентификатор. Что-то вроде короткого имени. У всех consumer-ов, которые хотят получать информацию от общего producer-а, должны быть уникальные (не совпадающие) идентификаторы. В противном случае producer не сможет нормально взаимодействовать с consumer-ами. Каким именно способом обеспечивается уникальность идентификаторов consumer-ов мы не рассматриваем, считаем, что это свойство тем или иным способом обеспечивается.

Начальные действия producer-а при старте (после рестарта)

Producer создает отображаемый в память файл с уникальным именем. Отображает этот файл в собственное адресное пространство и производит начальную инициализацию неких структур данных, позволяющих использовать эту область в качестве арены, в которой будут располагаться публикуемые объекты.

Что именно это за структуры и какая политика аллокации будет использоваться мы сейчас не рассматриваем. Суть в том, что producer будет поддерживать внутри арены какой-то список блоков памяти со статусом каждого блока (свободен, выделен-но-не-опубликован, опубликован).

Подключение producer-а к consumer-у

Чтобы получать информацию от producer-а consumer должен подключиться к producer-у. Для этого consumer выполняет следующие действия:

1. Создает для себя новую точку входа с уникальным именем;
2. Отсылает на точку входа producer-а сообщение attach_request, в котором передает свой идентификатор и имя только что созданной точки входа;
3. Producer отсылает на точку входа consumer-а ответное сообщение attach_ack, в котором передает имя файла, который consumer-у нужно отобразить в собственное адресное пространство. Так же в attach_ack сообщается session_id, который используется в последующем обмене сообщениями между producer-ом и consumer-ом;
4. Если consumer не получает ответа от producer-а за положенное время, то consumer возвращается к шагу №1;
5. Если consumer вовремя получил от producer-а attach_ack, то consumer открывает файл с именем, полученным от producer-а, и осуществляет отображение содержимого этого файла в собственное адресное пространство. Если при этом возникает какая-то ошибка, то consumer возвращается к шагу №1.

Процедура публикации новых объектов producer-ом

Публикация объекта выполняется в две стадии.

На первой стадии внутри producer-а кто-то запрашивает блок памяти некоторого размера. Producer пытается найти область подходящего размера внутри арены. Если такая область есть, то она оформляется в виде нового блока, который находится в статусе "выделен-но-не-опубликован".

Если найти область памяти должного размера не удалось, то возвращается код ошибки и запросивший память код сам решает, что ему делать дальше: повторить попытку после тайм-аута, прервать выполнение работы процесса, отказаться от попытки разместить информацию в разделяемой памяти и продолжить свою работу.

Если же память была выделена, то запросивший память код производит запись нужных значений в выделенный блок памяти, после чего дает команду опубликовать блок в качестве готового объекта. Это уже вторая стадия.

На второй стадии producer отсылает всем подключенным consumer-ам сообщение publish_announce. Сообщение publish_announce отсылается в ту точку входа, которую consumer сообщил при подключении. Если запись publish_announce для какого-то consumer-а завершается неудачно, то producer считает этого consumer-а неработоспособным и предпринимает соответствующие действия (описываются ниже).

Каждый consumer должен в течении заданного времени ответить сообщением announce_ack. Всем consumer-ам, которые успели это сделать, отсылается сообщение publish, в котором содержится информация об опубликованном блоке.

Если consumer не успел прислать announce_ack, то сообщение publish такому consumer-у не отсылается.

По результатам отсылки publish-ей для опубликованного объекта выставляется значение счетчика ссылок: счетчик ссылок для опубликованного объекта равен количеству отосланных publish-е (отосланных без ошибок).

Так, если есть пять consumer-ов, на publish_announce было вовремя получено три announce_ack и все три publish-а были отосланы успешно, то счетчик ссылок для опубликованного объекта будет равен трем.

После публикации статус блока памяти в арене, в котором находится опубликованный объект, меняется на "опубликован".

Процесс-producer для каждого consumer-а ведет список объектов, которые были опубликованы для этого consumer-а (т.е. списки объектов, для которых этому consumer-у были отосланы сообщения publish). Благодаря этому списку producer точно знает, какие объекты были опубликованы для конкретного consumer-а.

Подтверждать получение сообщения publish consumer-у не нужно.

Когда consumer заканчивает использовать опубликованный объект, он отсылает producer-у сообщение release, в котором указывается, какой именно объект освобождается.

Получив release producer:

  • вычеркивает объект из списка опубликованных для данного consumer-а объектов;
  • уменьшает счетчик ссылок для опубликованного объекта. Если счетчик ссылок обнуляется, то блок памяти, принадлежавший объекту, помечается как свободный и возвращается в арену для последующего переиспользования.

Примечание. Если на второй стадии выясняется, что публиковать объект не для кого (т.е. либо сейчас нет подключившихся consumer-ов, либо нет вовремя приславших announce_ack consumer-ов), то объект не публикуется, а блок памяти, выделенный под этот объект сразу же освобождается и возвращается в арену.

Поведение producer-а при отсутствии свободного места в текущей арене

Рано или поздно может возникнуть ситуация, когда consumer-ы будут освобождать опубликованные объекты медленнее, чем producer-у нужно создавать и публиковать новые объекты. Например, если какой-то consumer притормозит, подвиснет или аварийно прекратит свою работу.

В этом случае свободная память в арене закончится и producer не сможет выделять новые блоки памяти под публикуемые данные.

В такой ситуации producer начинает отсчет времени, в течении которого запросы на выделение блока памяти завершаются неудачно. Когда это время превысит некоторый заданный предел, producer выполняет следующее:

  • создает новый файл с уникальным именем, выполняет его отображение в память и инициализирует эту область как новую арену;
  • старую арену producer перестает обслуживать (т.е. отменяет отображение в память для старого файла, закрывает этот файл и делает unlink для него);
  • всем подключившимся consumer-ам отсылается команда reattach;
  • producer удаляет всю информацию, относящуюся к подключившимся consumer-ам (т.е. producer отныне считает, что подключенных consumer-ов нет).

Живые consumer-ы получив команду reattach должны:

  • освободить все опубликованные для них объекты;
  • отменить отображение в память старого файла и закрыть этот файл;
  • выполнить заново процедуру подключения к producer-у (при этом будет получено новое имя файла для отображения в память и новый идентификатор сессии).

Обязательства consumer-а по освобождению опубликованных объектов

Каждый consumer должен контролировать время, в течении которого используются опубликованные объекты. Когда это время превышает некий установленный порог, consumer должен аварийно завершить свою работу, т.к. consumer не может работать нормально (либо у него где-то что-то зависло и он не может обрабатывать публикуемые объекты с должной скоростью, либо же у него произошла утечка ресурсов, среди которых были и опубликованные объекты).

Действия producer-а при получении повторного сообщения attach_request от consumer-а

В сообщении attach_request содержится уникальный идентификатор consumer-а. Это позволяет producer-у определить, считается ли данный consumer уже подключенным или нет.

Если producer получает attach_request от consumer-а, который считается подключенным (и для которого есть список опубликованных объектов), то это означает, что consumer рестартовал и вся имеющаяся информация об этом consumer-е перестала быть актуальной. Поэтому producer:

  • проходит по списку опубликованных для этого consumer-а объектов и уменьшает счетчик ссылок для объектов на единицу. Если для какого-то объекта счетчик ссылок обнуляется, то блок памяти под этот объект объявляется свободным и возвращается в арену для последующего переиспользования;
  • очищает список опубликованных для consumer-а объектов (считаем, что для этой новой инкарнации consumer-а еще ничего не было опубликовано).

Действия producer-а при невозможности отправить исходящее сообщение consumer-у

Если producer предпринимает попытку отослать consumer-у исходящее сообщение (например, publish_announce, publish, ping_ack, reattach) и получает ошибку отсылки (записи) сообщения, то producer специальным образом помечает этого consumer-а и перестает отсылать ему какие-либо сообщения вообще.

Если ошибка записи произошла из-за аварийного завершения работы consumer-а, то consumer рестартует и пришлет новый attach_request. Это даст понять producer-у, что старая информация о consumer-е перестала быть актуальной.

Если же ошибка записи вызвана какими-то другими причинами, а consumer на самом деле все еще работает, то отсутствие активности со стороны producer-а будет диагностирована consumer-ом (см. ниже).

Действия consumer-а при отсутствии активности со стороны producer-а

Каждый подключившийся к producer-у consumer должен контролировать время отсутствия входящих сообщений от producer-а. Если это время превышает некий установленный порог, то consumer должен отослать producer-у сообщение ping.

Если затем в течении длительного времени от producer-а не будет никаких входящих сообщений (будь то ping_ack, publish_announce или reattach), то consumer должен посчитать, что producer недееспособен (завершил работу или завис и вскоре будет перезапущен). Поэтому consumer должен начать процедуру нового подключения к consumer-у.

Может случиться так, что с producer-ом все нормально, просто по каким-то причинам нарушилось взаимодействие между producer-ом и consumer-ом (см. выше). В этом случае переподключение consumer-а к producer-у создаст новый канал связи с producer-ом, а producer актуализирует свою информацию о consumer-е.

При этом суммарное время до отправки ping-а и время ожидания ответа на ping должно превышать время, в течении которого consumer должен освободить опубликованные для него объекты.

Такое требование гарантирует то, что в случае переподключения consumer-а к продолжающему свою работу producer-у, у consumer-а нет ссылок на неосвобожденные опубликованные объекты. Все опубликованные объекты к этому времени уже должны быть освобождены или же consumer должен прекратить свою работу из-за невозможности освободить какой-то из объектов вовремя.

Пояснения по принятым и описанным выше решениям

Взаимное обнаружение неработоспособности

Процесс producer контролирует работоспособность consumer-ов по наличию сообщений release. Если от consumer-а вовремя приходят release для опубликованных объектов, то значит с consumer-ом все нормально. Если ничего не публикуется, то смысла дополнительно следить за consumer-ами нет. Жив ли consumer или нет выяснится после попытки опубликовать новый объект.

Процесс consumer контролирует работоспособность producer-а за счет учета времени отсутствия сообщений от producer-а. Если producer перестал что-либо присылать и не отвечает на ping, значит старый producer прекратил свое существование и нужно начинать попытки подключения к новому producer-у (который либо уже запущен, либо будет запущен со временем).

Зачем нужен reattach и выделение новой арены?

При нормальном режиме работы и правильно рассчитанном размере арены ситуации, когда в арене закончилось свободное место, должны возникать редко и эти ситуации должны нормализоваться естественным образом (из-за нехватки места producer не может опубликовать какие-то данные, но это не страшно, т.к. вскоре будет подготовлена следующая порция данных).

Нас должны волновать две ситуации:

1. Какой-то consumer конкретно завис и его пока еще не рестартовали.
2. Какой-то consumer притормозил и не успевает освобождать опубликованные объекты. Однако, он все еще работает и опубликованные для него объекты этому consumer-у все еще нужны.

Проблема в том, что нет надежных способов отличить одну ситуацию от другой. Кроме того, вторая ситуация может плавно вести к первой: т.е. сперва consumer тормозит немного, затем начинает тормозить все больше и больше, пока он не зависает окончательно.

Мы могли бы подождать какой-то время (большее чем время, которое дается consumer-у на освобождение опубликованных объектов), после чего бы просто посчитали бы, что consumer уже мертв и все, что было опубликовано для consumer-а, должно быть освобождено. Это дало бы нам возможность использовать ту же арену не создавая новую.

Однако, такое ожидание не есть хорошо, т.к. в таком случае пострадают те consumer-ы, которые продолжают работать нормально.

Следовательно, лучше создать новую арену, заставить живых и нормально работающих consumer-ов переподключиться, после чего начать публикацию новых объектов уже через новую арену.

Если те consumer-ы, которые не смогли сразу переподключиться просто временно подтормаживали, то они спокойно дообработают те объекты, которые для них были опубликованы в старой арене и уже затем переключатся на новую арену.

Те же consumer-ы, которые зависли намертво, будут просто рано или поздно убиты супервайзером. Но это уже никак не скажется на публикацию объектов через новую арену.

В приципе, у этого подхода есть опасность того, сперва один consumer начнет притормаживать и из-за него придется создать новую арену. Затем притормозит второй процесс и будет создана еще одна арена (при этом все еще будет существовать первая арена). Затем притормозит третий процесс и т.д. Арены будут плодиться и это будет вести к чрезмерному расходу памяти.

Но эта опасность в данном случае компенсируется тем, что consumer-ов будет мало (считанные единицы) и большому количеству арен просто неоткуда будет взяться.

Защита consumer-ов от перегрузки

В описанном механизме уже заложена одна степень защиты consumer-ов от слишком быстрого потока публикаций: сообщение publish отсылается только тем consumer-ам, которые успели вовремя ответить на publish_announce. Следовательно, если какой-то consumer начинает подтормаживать, то он будет присылать announce_ack-и с опозданием и новые publish-ы для него не будут отсылаться до тех пор, пока consumer не вернется к нормальному ритму и не сможет отсылать announce_ack вовремя.

Но эта защита может не сработать, если в consumer-е взаимодействие с producer-ом и обработка опубликованных объектов разнесена на разные рабочие нити. Так, одна нить общается с producer-ом и складывает указатели на опубликованные объекты в какую-то очередь. А вторая нить берет указатели из очереди и делает прикладную обработку опубликованных объектов.

В таком подходе скорость взаимодействия с producer-ом может не зависеть от скорости прикладной обработки.

Чтобы защититься от подобной реализации consumer-а можно сделать ограничение в producer-е: опубликовать для consumer-а можно не более N объектов. Если consumer "взял" N объектов, но еще не освободил ни один из них, то публикация N+1 объекта для этого consumer-а не выполняется.

Возможные способы реализации обмена сообщениями

Несколько дополнительных вводных, которые не были озвучены ранее

Нужна поддержка Linux и Windows

Первая реализация описанного выше механизма должна сперва заработать под Linux-ом, а после того, как она докажет свою состоятельность на тестах, ее придется портировать еще и под Windows. Так что в основу обмена сообщениями должны быть положены средства, которые есть и в Linux, и в Windows, и которые работают более-менее схожим образом на обоих платформах.

Каждый consumer работает сразу с несколькими producer-ами

У одного producer-а может быть несколько consumer-ов. Об этом уже говорилось выше. Но сейчас нужно добавить, что каждый из предполагаемых consumer-ов должен будет получать информацию сразу от нескольких producer-ов. Т.е. будет какая-то такая схема:

Соответственно, есть вопрос о том, как контролировать IPC для общения сразу с несколькими consumer-ами и producer-ами.

Можно на каждое соединение между producer-ом и consumer-ом выделить отдельную рабочую нить, на которой будут выполняться блокирующие операции read/write (send/receive). Но более привлекательно пока что выглядит модель с использованием одной единственной нити, на которой операции read/write будут выполняться в неблокирующем режиме (посредством poll в Linux, к примеру).

Что задействовать для обмена сообщениями?

Описанный выше механизм предполагает обмен сообщениями attach_request, attach_ack, publish_announce, announce_ack, publish, release, ping, ping_ack.

Делать обмен этими сообщениями через shared memory не хочется, поскольку:

  • я не припоминаю, чтобы в Linux-е был такой примитив межпроцессной синхронизации, как Event, доступный сразу для нескольких процессов (sem_open, sem_wait и пр. есть, а вот именно Event-а не припомню). Соответственно, мне не понятно, как можно сделать относительно дешевое ожидание поступления нового сообщения для процесса;
  • мне непонятно как сделать так, чтобы один consumer на одной рабочей нити мог ждать уведомления сразу от нескольких producer-ов, если эти уведомления идут через shared memory. Могу ошибаться, но на Windows это можно сделать через именованные Event-ы и WaitForMultipleObject. Но вот под Linux-ом такой функциональности вроде как нет.

Поэтому на первый план выходят такие Linux-овые механизмы, как named pipes и POSIX message queue.

Пока я склоняюсь к тому, чтобы за основу взять named pipes. Из-за того, что контролировать доступность данных для чтения сразу для нескольких пайпов можно посредством poll. Что делает возможным вынесение работы со всеми producer-ами (или со всему consumer-ами) одну общую рабочую нить.

Использование named pipes для обмена сообщениями между producer-ом и consumer-ом может выглядеть следующим образом:

  • процесс-producer создает named pipe с фиксированным для этого producer-а именем. Например, для producer-а с именем InStreamOne это может быть "./myprj-InStreamOne". Имя этого named pipe будет известно всем consumer-ам;
  • процесс-producer открывает свой named pipe в режиме чтения. Процессы-consumer-ы открывают этот named pipe в режиме записи. Все сообщения, которые consumer-ы адресуют producer-у (attach_request, announce_ack, release, ping), записываются в этот named pipe;
  • каждый consumer для взаимодействия с producer-ом создает свой собственный named pipe с уникальным именем. Например, если consumer с уникальным идентификатором ProcessorX хочет взаимодействовать с producer-ом с именем InStreamOne, то создается named pipe с именем "./myprj-ProcessorX-InStreamOne-PID-TIMESTAMP" (где вместо PID и TIMESTAMP подставляются соответствующие значения);
  • consumer открывает свой named pipe в режиме чтения;
  • consumer отсылает имя своего уникального named pipe в сообщении attach_request;
  • producer открывает named pipe очередного consumer-а в режиме записи. Все сообщения, которые адресуются consumer-у (attach_ack, publish_announce, publish, ping_ack, reattach) отсылаются producer-ом в этот named pipe.

Вроде бы такая схема должна работать, причем с использованием неблокирующих операций над named pipe.

Если с ее реализацией возникнут какие-то проблемы, то придется смотреть в сторону POSIX message queue.

Как все это ляжет на имеющиеся в Windows средства я пока не прикидывал, так что даже описанный способ общения через named pipes может претерпеть существенные изменения.

Заключение

Вот такой вот механизм пока что удалось придумать. Вроде бы за время написания текста никаких новых косяков в нем не обнаружил. Если же вам в нем что-то показалось странным или подозрительным, то дайте мне знать, пожалуйста. Ну или может вы знаете какой-то более простой способ для решения подобной задачи?

В любом случае спасибо, что дочитали.

20 комментариев:

Yury Schkatula комментирует...

Собственно, вырысовывается классическая схема "шина данных" + "шина управления". Как в том же VoIP, например: отдельно ходят SIP-сообщения и статусы, и отдельно RTP с голосом/видео/etc

Так и тут: сами данные через shared memory, и отдельный канал для управляющей информации

eao197 комментирует...

@Yury Schkatula:

Я не имел дела с SIP, так что не могу сказать, насколько аналогия точна. Но выглядит похоже.

sv комментирует...

Что-то мне кажется, что подошел бы простой подход как в kafka, когда есть циклическая очередь, в которую продюсер складывает данные. Каждый consumer сам отслеживает свою позицию в этой очереди. Последнюю позицию он может держать например в той же shared memory в отдельном поле, благо места для таких полей нужно совсем немного.

eao197 комментирует...

@Unknown

Я не понимаю как в разделяемой циклической очереди отслеживать смерть кого-то из участников и как делать восстановление после падения.

sv комментирует...

А зачем отслеживать смерть? С восстановлением после падения все просто - вся инфа есть в очереди. То есть, в очереди записан и номер последнего сообщения, который отправил producer, и номера сообщений, которые читали consumer-ы.
Единственная проблема, которая есть - как не дать consumer-у читать уже перезаписанное на новой итерации сообщение. Тут довольно сложно что-то придумать, наверно, consumer-у нужно копировать сообщения себе в память

eao197 комментирует...

@Unknown

Ну вот, например, упал consumer. Что делать с объектами, которые были для него опубликованы, но которые он еще не вернул (а ведь память у нас не безразмерная)? Если producer знает, что consumer мертв, то он может вернуть блоки из под этих объектов обратно в арену.

Или падает producer. Рестартует и должен понять, в каком состоянии арена. Может ли он доверять тому, что осталось в памяти после его падения? Не может.

Producer должен создать новую арену, которую он проинициализирует гарантированно правильно. Но тогда он должен как-то сообщить consumer-ам о том, что дальше нужно работать с другим memory-mapped file.

> Тут довольно сложно что-то придумать, наверно, consumer-у нужно копировать сообщения себе в память

Копирование чего-то в память consumer-а напрочь лишает смысла всю эту затею.

sv комментирует...

> Ну вот, например, упал consumer. Что делать с объектами, которые были для него опубликованы

У нас цикличная очередь. Все объекты рано или поздно перезапишутся.

> Или падает producer. Рестартует и должен понять, в каком состоянии арена

В структуре очереди есть вся необходимая инфа. Индекс (или позиция в файле) последнего элемента, количество элементов и тп

eao197 комментирует...

@Unknown

> У нас цикличная очередь. Все объекты рано или поздно перезапишутся.

Кроме очереди у нас есть арена фиксированного размера. Если арена имеет размер в 150MiB и в ней "застряло" пяток объектов по 15MiB, то вариант ждать "рано или поздно" выглядит сомнительно.

> В структуре очереди есть вся необходимая инфа. Индекс (или позиция в файле) последнего элемента, количество элементов и тп

Эту структуру обслуживает producer. Producer аварийно завершает свою работу, где гарантии, что элементы этой структуры остались в валидном состоянии? Ведь многопоточный producer запросто может упасть именно в тот момент, пока какая-то из его нитей занималась обновлением информации о состоянии объектов в арене.

sv комментирует...

> Кроме очереди у нас есть арена фиксированного размера.
А что за арена кроме очереди? Очередь - и есть арена

> Producer аварийно завершает свою работу, где гарантии, что элементы этой структуры остались в валидном состоянии?
Всегда нужно иметь какие-то гарантии работы. То что элементы структуры останутся в валидном состоянии должно быть одной из гарантий. Если вы будете использовать какую-нибудь кафку, то она же не повредится из-за того, что продюсер упал? Тут тоже самое.
Например, для того, чтобы это гарантировать, можно формировать управляющие элементы очереди (количество элементов, индекс последнего элемента и тп) в отдельной ячейки памяти, а потом атомарно подменять указатель на этот индекс памяти.
Ну в общем способ какой-нибудь есть.

Тоесть единственная реальная проблема - это как не читать уже перезаписанные данные. Но думаю в целом схема более рабочая, чем нерабочая

eao197 комментирует...

@Unknown

> А что за арена кроме очереди? Очередь - и есть арена

ИМХО, делать арену самой очередью, в которой размеры объектов могут отличаться на порядки, не самый удобный вариант. Особенно если нужно учитывать, что какой-то consumer может притормозить. И тогда все (producer и остальные consumer-ы) будут вынуждены работать дальше со скоростью самого медленного consumer-а. А это в принципе не правильно. Пусть лучше медленный consumer теряет объекты, но остальные будут работать с нормальной скоростью.

> Всегда нужно иметь какие-то гарантии работы.

И откуда их взять? Рабочая нить, которой нужно опубликовать объект в арене, принудительно приостанавливает все остальные рабочие нити процесса, дабы ни одна из них случайно не сделала деление на 0? А как защититься от внешних воздействий, когда процесс producer-а кто-то принудительно убивает извне?

> То что элементы структуры останутся в валидном состоянии должно быть одной из гарантий.

У меня проблема с придумыванием способа обеспечения этих самых гарантий.

> Если вы будете использовать какую-нибудь кафку, то она же не повредится из-за того, что продюсер упал?

Так kafka -- это отдельный процесс по отношению к процессу producer-а. Падение producer-а никак не влияет на то, что творится в памяти kafka-брокера.

> Ну в общем способ какой-нибудь есть.

Только вот мне нужно знать этот способ дабы воплотить его в коде. Подозрения на то, что такой способ может быть недостаточно, к сожалению.

sv комментирует...

> ИМХО, делать арену самой очередью, в которой размеры объектов могут отличаться на порядки, не самый удобный вариант.

Почему? Да, структура очереди будет сложнее, нужно будет перечислять размеры всех элементов.

> И тогда все (producer и остальные consumer-ы) будут вынуждены работать дальше со скоростью самого медленного consumer-а

Нет, продюсер ждать никого не будет. Мне кажется, если в процессе эксплуатации возникнет проблема обгона продюсером консюмеров, то решать ее созданием второй очереди и т.п. - худшая идея. Это нужно решать админскими методами, по русски - увеличив размер очереди при следующем запуске. То есть логика такая, что если у нас не хватило памяти на что-то, значит у нас не хватило памяти, значит память нужно увеличивать. Зачем для этого придумывать что-то еще, что усложнит отладку в 500 раз?

> И откуда их взять?

Ну вот я предложил ниже один из способов. Может есть и другие.

> Так kafka -- это отдельный процесс

А в чем разница? Можно считать эту очередь недо-кафкой.

eao197 комментирует...

> Почему? Да, структура очереди будет сложнее, нужно будет перечислять размеры всех элементов.

Ну как раз потому, что "структура очереди будет сложнее".

> Нет, продюсер ждать никого не будет.

И что он будет делать? Будет перезаписывать блоки, которые consumer-ами все еще используются?

> если в процессе эксплуатации возникнет проблема обгона продюсером консюмеров, то решать ее созданием второй очереди и т.п. - худшая идея.

Создание новой арены -- это крайний шаг. Защита от перегрузки в описанном в посте варианте базируется на том, что медленные consumer-ы не будут получать сообщение publish, т.к. не будут вовремя отвечать на publish_announce. Тем самым для медленных consumer-ов сам producer не будет публиковать новые объекты (а только для быстрых consumer-ов).

Создание новой арены происходит только когда основной механизм защиты от перегрузки перестает работать (из-за конкретных подвисаний consumer-ов). И как раз создание новой арены позволяет просто переключить на нее всех живых consumer-ов забив на судьбу старой арены с затормозившими consumer-ами.

> Ну вот я предложил ниже один из способов. Может есть и другие.

Простите, не одного пригодного для воплощения в коде не увидел. Но это, скорее всего, потому что я тупой.

> А в чем разница?

В том, что память kafka-брокера не зависит от того, что творит и в какой именно момент падает producer.

sv комментирует...

> Ну как раз потому, что "структура очереди будет сложнее".

Но не сложнее, чем ваш протокол

> Будет перезаписывать блоки, которые consumer-ами все еще используются?

Да. В перспективе, такого нужно не допускать, выделяя достаточную память.

> Создание новой арены -- это крайний шаг.

Но он все равно есть. Его нужно тестировать, править в нем ошибки. Чем меньше таких "шагов", тем сильно лучше.

> Простите, не одного пригодного для воплощения в коде не увидел

Например, для того, чтобы это гарантировать, можно формировать управляющие элементы очереди (количество элементов, индекс последнего элемента и тп) в отдельной ячейки памяти, а потом атомарно подменять указатель на этот индекс памяти.

> В том, что память kafka-брокера не зависит от того, что творит и в какой именно момент падает producer.

Но в вашей схеме тоже есть shared мемори, которая творится producer-ом. В чем же разница?

eao197 комментирует...

> Но не сложнее, чем ваш протокол

В моем варианте протокол общения и структура блоков в арене вообще никак не связаны друг с другом. И consumer-ы вообще не знают, как и что в арене организовано.

> Да.

"Да" в данном случае будет вести к ППЦ, т.к. producer будет перезаписывать память, которую читает consumer.

> В перспективе, такого нужно не допускать, выделяя достаточную память.

Нет такой штуки, как достаточная память. Если consumer "заснет" на несколько секунд, то и арены в десяток гигабайт не будет достаточно. Что будет весело, если система будет крутиться у пользователя с 8GiB RAM на борту.

> Например, для того, чтобы это гарантировать, можно формировать управляющие элементы очереди (количество элементов, индекс последнего элемента и тп) в отдельной ячейки памяти, а потом атомарно подменять указатель на этот индекс памяти.

Эта отдельная ячейка ведь будет частью арены. Соответственно, к ней так же все те же вопросы по консистентности ее состояния.

> Но в вашей схеме тоже есть shared мемори, которая творится producer-ом. В чем же разница?

Разница в том, что объекты, которые producer публикует, уже гарантированно в нормальном состоянии. Объекты же для которых в арене память выделена, но которые еще не опубликованы, могут находиться в любом состоянии, в том числе и в невалидном. consumer-ы этого не видят.

Если producer падает, то он создает новую арену и проблемы доверия содержимому старой арены ему решать вообще не нужно.

sv комментирует...

> И consumer-ы вообще не знают, как и что в арене организовано.

Знают, не знают, это вопросы абстракции.

> producer будет перезаписывать память, которую читает consumer.

Это да, одна из нерешенных проблем в моей структуре. Но может это можно обойти на чтении данных. Например, перед чтением и после чтения куска данных проверять, чтобы какой-нибудь счетчик совпадал, и если не совпадает, считать то, что было прочитано не валидным. То есть да, это приводит к более тесной интеграции с бизнес-процессами, но мне кажется это может стоить того.

> то и арены в десяток гигабайт не будет достаточно. Что будет весело, если система будет крутиться у пользователя с 8GiB RAM на борту.

Ну дак и что делать в вашем же случае? Вы просто выедете всю память и все.

> Эта отдельная ячейка ведь будет частью арены

Не вижу никакой проблемы. Эта ячейка живет своей жизнью. Валидной она считается только тогда, когда на нее указывает указатель, который заменяется атомарно. Все остальное время она может быть невалидной сколько угодно

> Объекты же для которых в арене память выделена, но которые еще не опубликованы, могут находиться в любом состоянии, в том числе и в невалидном. consumer-ы этого не видят.

У меня тоже самое, не вижу в чем разница

> Если producer падает, то он создает новую арену и проблемы доверия содержимому старой арены ему решать вообще не нужно.

При желании в моей схеме можно сделать также. А можно продолжать писать в ту же очередь, так как эта очередь всегда валидна по определению, данному выше

eao197 комментирует...

> Знают, не знают, это вопросы абстракции.

Когда не знают -- это вопрос абстракции.
Когда знают -- это уже вопрос контракта. И геморроя по его соблюдению. В описанном мной варианте у consumer-ов такого геморроя нет совсем.

> Например, перед чтением и после чтения куска данных проверять, чтобы какой-нибудь счетчик совпадал, и если не совпадает

Мне думается, что вы не представляете себе картину происходящего. Consumer получает некий указатель + размер. И начинает по этой области памяти ходить взад-назад в течении, скажем 5ms. Сколько и каких итераций он сделает по дороге -- заранее неизвестно. И если producer начнет перезаписывать память, по которой сейчас ползает consumer-а, то последствия будут такие, что даже сложно себе представить.

> Ну дак и что делать в вашем же случае?

В худшем случае арен будет столько, сколько будет consumer-ов. Т.е. есть один нормальный consumer и (N-1) затормозивших на разных стадиях. При этом если супервизор диагностирует зависание очередного consumer-а, то этот consumer убивается и арена, которая удерживалась этим consumer-ом, автоматически возвращается ОС.

> Все остальное время она может быть невалидной сколько угодно

Чтобы это было так, вам нужно делать копию блока, описывающего текущее состояние арены, на каждую модификацию арены. Затем должным образом модифицировать эту копию и атомарно заменять указатель на текущий описатель состояния арены.

> У меня тоже самое, не вижу в чем разница

Я вроде бы объяснил достаточно. Не знаю как сделать это еще подробнее.

sv комментирует...

> В описанном мной варианте у consumer-ов такого геморроя нет совсем.

Зато есть геморрой принять сообщение, вовремя на него ответить, вовремя послать пинг...

> И если producer начнет перезаписывать память

Тут нужно искать компромиссы с бизнесом. Не со мной, а с бизнесом. Возможно, бизнес-код такой, что он довольно быстро заметит повреждение данных, например, сравнив чек-суммы. А если не заметит, но например ничего не успеет с этими данными сделать (например, если обработка данных делится на фазы парсинг -> работа), то об устаревании данных можно предупредить уже после парсинга. Да, это требует взаимодействия с бизнес-логикой продукта, но у вас нету цели написать универсальную библиотеку на все. К тому же в своих предположениях вы и так опирались на некоторые бизнес-логику.

Может решение лежит в плоскости копировать нужные данные в память процесса. Да, это замедлит работу, но 1) сильно ли? 2) возможно это замедление не скажется на времени всей обработки

> В худшем случае арен будет столько, сколько будет consumer-ов

Нет ничего хуже кода, который выполняется раз в 500 лет. Пусть он или выполняется всегда, или никогда

> Чтобы это было так, вам нужно делать копию блока

Если мы пишем в арену несколько мегабайт, то скопировать мелкий заголовок не проблема. Плюс я не утверждаю, что это единственный верный способ, можно попробовать что-то еще. Например, сделать очередь в виде списка, где каждый элемент знает свое начало и конец.

eao197 комментирует...

> Может решение лежит в плоскости копировать нужные данные в память процесса. Да, это замедлит работу, но 1) сильно ли? 2) возможно это замедление не скажется на времени всей обработки

Это решение уже принято. Объекты, которые формируются producer-ом, дорого копировать в память producer-ов. Не нужно такое делать. Отсюда и все пляски с бубном.

> Нет ничего хуже кода, который выполняется раз в 500 лет.

Поэтому нужно забить на обработку ошибок и проблем, так?

> Если мы пишем в арену несколько мегабайт, то скопировать мелкий заголовок не проблема.

А почему вы решили, что указателю на этот заголовок (а ведь указатель в той же самой арене) вообще можно доверять?

Ну вот рестартует producer, открывает тот же самый memory-mapped file, читает первые 8-мь байт (считаем, что там лежит указатель на описание состояния арены). Получаем некоторое значение X. Откуда мы знаем, что оно валидно?

sv комментирует...

> Поэтому нужно забить на обработку ошибок и проблем, так?

нет, нужно сделать так, чтобы это не происходило раз в 500 лет.
Например, в вашей схеме, можно создать заранее 10 арен, и заполнять каждую арену до упора, после чего брать следущую свободную арену из этих 10. Свободной считается та, над которой не сидит никакой сервис. В результате и упрощается сама арена (вместо арены можно взять тупую очередь), и главное, больше нет кода, который исполняется раз за 500 лет

> Откуда мы знаем, что оно валидно?

Мы на это должны полагаться. Иначе ничего не выйдет. В вашей схеме продюсер, точно также падая, может закорраптить уже существующие данные в арене, которые потом попытаются прочитать консьюмеры, пока продюсер не успел подняться

eao197 комментирует...

> нет, нужно сделать так, чтобы это не происходило раз в 500 лет.

Лучше быть богатым и здоровым, чем бедным и больным, понятно.

> Например, в вашей схеме, можно создать заранее 10 арен

Если размер одной арены для нормальной работы 500MiB и дополнительные нужно создавать лишь изредка, когда случается форсмажор, то это плохо. А сразу создать 10 арен по 500Mib -- это хорошо. Ну OK.

> Мы на это должны полагаться.

А чем вы гарантируете это "должны"?
Например, есть ли гарантия от ОС, что когда вы делаете маппинг файла в память в первый раз, то первые 8 байт будут иметь нулевые значения?