пятница, 3 февраля 2017 г.

[life.business] Таки своя компания...

Много воды утекло с момента, когда в этом блоге появились первые размышления о создании собственной компании. Посеянные тогда зерна сомнений и надежд таки дали свои всходы и привели к тому, что своя компания у меня таки появилась. Осенью 2016-го года мы с несколькими бывшими коллегами создали ООО "СтифСтрим". Ну, или как задумывалось изначально, stiffstream.

Занимаемся, в первую очередь, SObjectizer-ом. По сути, смысл создания компании был в том, чтобы за SObjectizer-ом появилась некая коммерческая структура, которая могла бы заниматься предоставлением услуг вокруг SObjectizer-а на платной основе. Есть задумки по поводу пары-тройки продуктов над SObjectizer-ом, которые, вероятно, будут распространяться уже под двойной лицензией. Так же думаем эксплуатировать свой большой опыт в разработке софта вообще и софта на C++ в частности: обучение, консультации, небольшие разовые заказные разработки. В общем, если у вас что-то не идет с вашим C++ным кодом, то вы теперь знаете, к кому можно обратиться ;) Контрибьюторов в Boost среди нас нет, но работающий и понятный код на C++ мы писать умеем.

Делаем мы все это за собственный счет, без внешних инвестиций. Поэтому многое приходится делать самим, своими руками. Из-за этого что-то может выглядеть кустарно и убого, но со временем мы напильничком все доработаем.

Ну вот как-то так, можно сказать, что официальное представление состоялось. Добавлю, наверное, вот что. Ведение своего дела, очевидно, занятие более геморройное и неблагодарное, чем работа "на дядю" за фиксированную зарплату. И если бы у нас была возможность заниматься интересными и сложными проектами будучи наемными работниками, мы бы вряд ли в такую авантюру ввязались бы. Работы такой, однако, у нас в городе нет. Поэтому попробуем создать ее для себя сами. Пока есть возможность, нужно пробовать. Вот мы и пробуем.

PS. По поводу названия. У слова stiff много смыслов в английском. Нас почти все устраивают :) Тем не менее, официальная трактовка -- это "тугой/жестки/несгибаемый/упертый поток". Те, кто меня знает и кто со мной плотно работал, думаю, не удивятся "жесткости" и "упертости" ;)

среда, 1 февраля 2017 г.

[prog.c++] Небольшой сниппет ко вчерашней теме (порционное чтение из CSP-канала с учетом тайм-аута)

При написании вчерашнего поста я что-то затормозил. На самом деле организовать чтение пакетов из не более чем 100 сообщений из CSP-канала так, чтобы с момента получения первого сообщения прошло не более 100ms, совсем не сложно. Для этого не требуется второй вспомогательный канал. Достаточно одного канала. Но читать из него нужно дважды:

  • сначала читается первое сообщение из будущего пакета без ограничения времени;
  • затем читаются остальные сообщения будущего пакета но так, чтобы общее время на их чтение не превышало 100ms.

У нас в SO-5 это выглядит вот так:

   for(;;) {
      // Ожидаем первого сообщения.
      const auto r = receive(chain, infinite_wait,
            [&](const kafka_msg & msg) { buffer.push_back(msg); });

      // Если сообщение действительно пришло, то ждем либо остальные
      // сообщения, либо завершение тайм-аута.
      if(r.handled()) {
         receive(from(chain).handle_n(buffer_size - 1).total_time(100ms),
               [&](const kafka_msg & msg) { buffer.push_back(msg); });
         // На этот момент buffer точно не пуст, поэтому сохраняем его.
         persist(buffer);
      }
      else
         // Ничего не вычитали из канала т.к. он закрыт. Продолжать нет смысла.
         break;
   }

Тут первый receive выполняет ожидание и обработку всего одного сообщения. А вот второй receive либо обрабатывает 99 сообщений (т.е. формирует полный пакет), либо же работает 100ms (причем не суть важно на что тратится это время -- на обработку сообщений или же на их ожидание).

Я не очень хорошо знаю Go, но, по-моему, там в select-е можно задать тайм-аут только на ожидание сообщений. Так что SObjectizer-овский total_time() для Go-шного select-а нужно будет написать, из коробки в Go я такого не припомню. Если ошибаюсь, то поправьте меня, плиз.

Полный же код демонстрационного примера можно найти вот в этом репозитории.

Этот пример, кстати, демонстрирует возможность использования SObjectizer-5 для многопоточного программирования вообще без акторов. Только CSP-шные каналы.

При этом можно увидеть, что рукопашная работа с рабочими нитями требует изрядной осторожности. Все эти auto_join и auto_close -- они же не от хорошей жизни появились. А для того, чтобы при возникновении исключений каналы закрывались, а рабочие потоки join-ились. Из-за этого, кстати говоря, приходится сначала объявлять объекты типа std::thread и создавать auto_join-ер для них, потом создавать каналы и auto_closer-ы для каналов. И только после этого стартовать рабочие нити. Если поменять последовательность, то при возникновении исключения можно запросто повиснуть на join-е: рабочая нить не завершиться, поскольку никто не закроет канал, на котором она висит в receive.

Подробнее про mchain-ы в SObjectizer (которые и есть CSP-каналы) можно узнать в этой презентации или в этом разделе Wiki.

PS. Ну и не могу не сказать еще раз про современный C++. В примере используется C++14, работать с ним приятно. Теперь бы еще поднять скорость компиляции C++ного кода на порядок, вообще было бы здорово.

вторник, 31 января 2017 г.

[prog.flame] Непонятое из рассказа про миграцию со Scala на Go (глазами пользователя SObjectizer)

Нашел, кажется, на RSDN-е ссылку на прекрасное: "Making the move from Scala to Go, and why we’re not going back". Там люди рассказывают про то, как они писали-писали на Scala, задолбались и с удовольствием перешли на Go. От чего сейчас буквально писают кипятком очень счастливы.

На мой взгляд, когда люди меняют высокоуровневый язык, вроде Scala, на низкоуровневую примитивщину вроде Go, то это означает, что они что-то делают неправильно. Скорее всего, неправильно сделали с самого начала: выбрали экскаватор вместо лопаты там, где нужно было выкопать яму под нужник на даче. Но это тема отдельного разговора.

В еще большем недоумении меня оставило другое: в статье описывается пример с чтением потока сообщения из Kafka и сохранения накопленных сообщений в БД. Люди попытались примерить к этой задачке Модель Акторов в реализации Akka и что-то у них не получилось. Дословно одна из причин недовольства Akka описана так: "Furthermore, the stream came from a Kafka Consumer, and in our wrapper we needed to provide a `digest` function for each consumed message that ran in a `Future`. Circumventing the issue of mixing Futures and Actors required extra head scratching time." Честно говоря я не понимаю, о чем это все.

Но если посмотреть на пример того, как они обозначают решение этой же задачи на потоках в Go, то мы видим вот что:

buffer := []kafkaMsg{}
bufferSize := 100
timeout := 100 * time.Millisecond

for {
  select {
    case kafkaMsg := <-channel:
      buffer = append(buffer, kafkaMsg)
      if len(buffer) >= bufferSize {
        persist()
      }
    case<-time.After(timeout):
      persist()
  }
}

func persist() {
      insert(buffer)
      buffer = buffer[:0]
}

Честно говоря, я не понимаю, откуда могут возникнуть затруднения в реализации этого же на акторах. Вот, скажем, у нас в SObjectizer-5 это могло бы выглядеть вот так:

class kafka_msg_persister final : public so_5::agent_t {
   std::vector< kafka_msg > buffer_;
   static constexpr std::size_t buffer_size = 100;
   so_5::timer_id_t dump_pause_;

public :
   kafka_msg_persister(context_t ctx) : so_5::agent_t(std::move(ctx)) {
      struct dump_by_timer : public so_5::signal_t {};

      so_subscribe_self()
         .event([this](const kafka_msg & msg) {
            buffer_.push_back(msg);
            if(buffer_.size() >= buffer_size)
               persist();
            else
               dump_pause_ = so_5::send_periodic<dump_by_timer>(*this, 100ms, 0ms );
         })
         .event<dump_by_timer>([this]{ persist(); });
   }

private :
   void persist() {
      insert(buffer_);
      buffer_.clear();
      dump_pause_.reset();
   }
};

Если я правильно понимаю пример кода на Go выше, то логика у него какая-то странная: сохранение сообщений в БД осуществляется либо когда буфер полностью заполняется, либо же спустя 100ms после получения последнего сообщения. Странность здесь такая. Обычно тайм-аут для сохранения попавших в буфер сообщений отсчитывают от момента получения первого сообщения. А не от момента получения последнего. Ведь если у нас, скажем, есть поток сообщений с темпом 90ms, то первое сообщение будет сохранено в БД только спустя 9000ms (т.е. 9s) после получения.

Поэтому мне кажется, что правильнее было бы отсчитывать тайм-аут от момента получения первого сообщения. Для этого можно было бы переделать показанного выше агента так:

class kafka_msg_persister2 final : public so_5::agent_t {
   std::vector< kafka_msg > buffer_;
   static constexpr std::size_t buffer_size = 100;
   so_5::timer_id_t dump_pause_;

public :
   kafka_msg_persister2(context_t ctx) : so_5::agent_t(std::move(ctx)) {
      struct dump_by_timer : public so_5::signal_t {};

      so_subscribe_self()
         .event([this](const kafka_msg & msg) {
            if(buffer_.empty())
               dump_pause_ = so_5::send_periodic<dump_by_timer>(*this, 100ms, 0ms );

            buffer_.push_back(msg);
            if(buffer_.size() >= buffer_size)
               persist();
         })
         .event<dump_by_timer>([this]{ persist(); });
   }

private :
   void persist() {
      insert(buffer_);
      buffer_.clear();
      dump_pause_.reset();
   }
};

Либо же можно было бы использовать факт того, что агент в SO-5 -- это конечный автомат, на переходы между состояниями которого можно повесить обработчики:

class kafka_msg_persister3 final : public so_5::agent_t {
   std::vector< kafka_msg > buffer_;
   static constexpr std::size_t buffer_size = 100;

   state_t empty{this}, not_empty{this};

   so_5::timer_id_t dump_pause_;

public :
   kafka_msg_persister3(context_t ctx) : so_5::agent_t(std::move(ctx)) {
      struct dump_by_timer : public so_5::signal_t {};

      this >>= empty;

      empty
         .on_enter([this]{ dump_pause_.reset(); })
         .transfer_to_state<kafka_msg>(not_empty);

      not_empty
         .on_enter([this]{
            dump_pause_ = so_5::send_periodic<dump_by_timer>(*this, 100ms, 0ms );
         })
         .event([this](const kafka_msg & msg) {
            buffer_.push_back(msg);
            if(buffer_.size() >= buffer_size) {
               persist();
               this >>= empty;
            }
         });
         .event<dump_by_timer>([this]{ persist(); });
   }

private :
   void persist() {
      insert(buffer_);
      buffer_.clear();
   }
};

Т.е. при входе в состояние not_empty мы взводим таймер на 100ms, при входе в empty сбрасываем таймер, поскольку сейчас он нам уже не нужен.

Очевидно, что примеры на C++ и SO-5 более многословны, чем код на Go. Но задача была в том, чтобы показать, что на акторах накопление сообщений в буфер и затем сброс накопленных сообщений в БД по таймеру или по исчерпанию буфера -- это не сложно. Откуда у людей с этим возникли проблемы не понятно. Может быть дело в Akka, может быть они просто Akka готовить не умеют. Не знаю.

Очевидно, что есть люди, которым модель акторов в принципе не нравится. И которые предпочитают использовать CSP-ные каналы. Ну чтож, попробуем изобразить этот же пример на CSP-шных каналах, в SObjectizer-5:

std::vector<kafka_msg> buffer;
static constexpr std::size_t buffer_size = 100;

for(;;) {
   receive(from(chain).handle_n(buffer_size).empty_timeout(100ms),
      [&](const kafka_msg & msg) {
         buffer.push_back(msg);
      });
   if(!buffer.empty())
      persist(buffer);
}

Здесь мы выходим из receive либо после получения 100 сообщений kafka_msg, либо после того, как канал был пуст в течении 100ms. Как раз то, что было в исходном примере на Go.

А вот получить на каналах поведение, когда тайм-аут нужно отсчитывать не от последнего сообщения, а от первого полученного, с ходу не получается. Тут нужно думать. И, как мне кажется, на акторах такое делается проще, чем на каналах. Ну либо нужно использовать сразу несколько каналов. Очень грубо это может выглядеть так:

std::vector<kafka_msg> buffer;
static constexpr std::size_t buffer_size = 100;

struct dump_by_timer : public so_5::signal_t {};
auto timer_chain = create_mchain(env);

select( so_5::from_all(),
   case_(chain, [&](const kafka_msg & msg) {
         if(buffer.empty())
            so_5::send_delayed<dump_by_timer>(timer_chain, 100ms);
         buffer.push_back(msg);
         if(buffer.size() >= buffer_size)
            persist(buffer);
      }),
   case_(timer_chain, [&](mhood_t<dump_by_timer>) {
         if(!buffer.empty())
            persist(buffer);
      }) );

Но здесь возможно множественное срабатывание таймеров. Например, если идет очень большая последовательность сообщений, то на каждое 1-ое, 101-ое, 201-ое и т.д. сообщения будет отсылаться отложенный сигнал. А потом в какой-то прекрасный момент эти сигналы начнут приходить. Что не есть большая проблема, но и не есть хорошо. Поэтому придется немного поколупаться с отменой ранее отосланных отложенных сигналов. И более-менее реальный код выглядел бы как-то так:

std::vector<kafka_msg> buffer;
static constexpr std::size_t buffer_size = 100;

struct dump_by_timer : public so_5::signal_t {};

auto timer_chain = create_mchain(env,
      // Нам нужно хранить не более одного сигнала в этом канале.
      1,
      // Место под сигнал выделим сразу.
      so_5::mchain_props::memory_usage_t::preallocated,
      // При поступлении нового сигнала старый выбрасываем за ненадобностью.
      so_5::mchain_props::overflow_reaction_t::remove_oldest);

// Идентификатор таймера нам нужен дабы была возможность отменять
// доставку отложенного сигнала.
so_5::timer_id_t dump_timer;

select( so_5::from_all(),
   case_(chain, [&](const kafka_msg & msg) {
         if(buffer.empty())
            dump_timer = so_5::send_periodic<dump_by_timer>(timer_chain, 100ms, 0s);
         buffer.push_back(msg);
         if(buffer.size() >= buffer_size) {
            dump_timer.reset();
            persist(buffer);
         }
      }),
   case_(timer_chain, [&](mhood_t<dump_by_timer>) {
         if(!buffer.empty())
            persist(buffer);
      }) );

К чему я это все веду (ну, естественно, за вычетом маркетинга SO-5)? К тому, что язык высокого уровня дает разработчику возможность создавать те абстракции, которые ему нужны для решения задачи. Нужны акторы -- можно сделать акторов, нужны CSP-шные каналы -- можно сделать каналы. Если же возможность по созданию абстракций под задачу не нужна, значит задача вполне себе решается более простым и примитивным языком. Т.е. изначально людям нужен был Go, а не Scala. И нахрена было тянуть в проект Scala, дабы затем плакаться о том, что "кололись, но жрали" -- не понятно.

Впрочем, если посмотреть на бэкграунд разработчиков:

то все встает на свои места. Лишь у одного было знакомство с Java за плечами. В общем, люди изначально не видели инструментов, которые специально создавались для нормальной разработки софта, обычными программистами, а не хипстерами. Но за Scala взялись. От и результат такой вот и получился ;)

[life.cinema] Очередной кинообзор (2017/01)

Как-то очень быстро пролетел январь и подошло время публиковать очередной кинообзор. Как обычно в начале списка идут фильмы, которые понравились больше, а в конце -- которые понравились меньше.

Пассажиры (Passengers, 2016). Кино, конечно же, для девочек. Но блин, как же классно оно сделано! Посмотреть можно только для того, чтобы окунуться в мир очень качественно снятой фантастики.

Великая стена (The Great Wall, 2016). Ни в коем разе не шедевр. А вот в качестве красочного фэнтезийного боевика -- хорош.

Война против всех (War on Everyone, 2016). Посмотреть можно. Но лично я ждал большего.

Уличный кот по кличке Боб (A Street Cat Named Bob, 2016). Хороший, добрый фильм. На 80% держится на обаянии главного героя -- рыжего кота.

Пустыня (Desierto, 2015). На удивление неплохо. Финалом лично я несколько разочарован, но все-таки на удивление неплохо.

Демон внутри (The Autopsy of Jane Doe, 2016). Весьма достойный представитель жанра мистических ужастиков. Не могу сказать, что мне прям понравилось-понравилось. Но посмотрел с удовольствием.

Шпионы по соседству (Keeping Up with the Joneses, 2016). Странное впечатление. Похоже, авторы балансировали между тем, чтобы сделать нормальный фильм и тем, чтобы опустить возрастное ограничение для зрителей как можно ниже. В результате получилось впихнуть фильм в категорию PG-13, что на пользу фильму явно не пошло.

Автобан (Collide, 2016). Начало было многообещающим, но финал фильма все испортил.

Под покровом ночи (Nocturnal Animals, 2016). Операторская работа в фильме выше всяких похвал. По после просмотра без ответа остаются следующие вопросы: что это было? Почему у этого такой высокий рейтинг на Кинопоиске? Зачем я это посмотрел?