Нашел, кажется, на RSDN-е ссылку на прекрасное: "Making the move from Scala to Go, and why we’re not going back". Там люди рассказывают про то, как они писали-писали на Scala, задолбались и с удовольствием перешли на Go. От чего сейчас буквально писают кипятком очень счастливы.
На мой взгляд, когда люди меняют высокоуровневый язык, вроде Scala, на низкоуровневую примитивщину вроде Go, то это означает, что они что-то делают неправильно. Скорее всего, неправильно сделали с самого начала: выбрали экскаватор вместо лопаты там, где нужно было выкопать яму под нужник на даче. Но это тема отдельного разговора.
В еще большем недоумении меня оставило другое: в статье описывается пример с чтением потока сообщения из Kafka и сохранения накопленных сообщений в БД. Люди попытались примерить к этой задачке Модель Акторов в реализации Akka и что-то у них не получилось. Дословно одна из причин недовольства Akka описана так: "Furthermore, the stream came from a Kafka Consumer, and in our wrapper we needed to provide a `digest` function for each consumed message that ran in a `Future`. Circumventing the issue of mixing Futures and Actors required extra head scratching time." Честно говоря я не понимаю, о чем это все.
Но если посмотреть на пример того, как они обозначают решение этой же задачи на потоках в Go, то мы видим вот что:
buffer := []kafkaMsg{}
bufferSize := 100
timeout := 100 * time.Millisecond
for {
select {
case kafkaMsg := <-channel:
buffer = append(buffer, kafkaMsg)
if len(buffer) >= bufferSize {
persist()
}
case<-time.After(timeout):
persist()
}
}
func persist() {
insert(buffer)
buffer = buffer[:0]
}
|
Честно говоря, я не понимаю, откуда могут возникнуть затруднения в реализации этого же на акторах. Вот, скажем, у нас в SObjectizer-5 это могло бы выглядеть вот так:
class kafka_msg_persister final : public so_5::agent_t {
std::vector< kafka_msg > buffer_;
static constexpr std::size_t buffer_size = 100;
so_5::timer_id_t dump_pause_;
public :
kafka_msg_persister(context_t ctx) : so_5::agent_t(std::move(ctx)) {
struct dump_by_timer : public so_5::signal_t {};
so_subscribe_self()
.event([this](const kafka_msg & msg) {
buffer_.push_back(msg);
if(buffer_.size() >= buffer_size)
persist();
else
dump_pause_ = so_5::send_periodic<dump_by_timer>(*this, 100ms, 0ms );
})
.event<dump_by_timer>([this]{ persist(); });
}
private :
void persist() {
insert(buffer_);
buffer_.clear();
dump_pause_.reset();
}
};
|
Если я правильно понимаю пример кода на Go выше, то логика у него какая-то странная: сохранение сообщений в БД осуществляется либо когда буфер полностью заполняется, либо же спустя 100ms после получения последнего сообщения. Странность здесь такая. Обычно тайм-аут для сохранения попавших в буфер сообщений отсчитывают от момента получения первого сообщения. А не от момента получения последнего. Ведь если у нас, скажем, есть поток сообщений с темпом 90ms, то первое сообщение будет сохранено в БД только спустя 9000ms (т.е. 9s) после получения.
Поэтому мне кажется, что правильнее было бы отсчитывать тайм-аут от момента получения первого сообщения. Для этого можно было бы переделать показанного выше агента так:
class kafka_msg_persister2 final : public so_5::agent_t {
std::vector< kafka_msg > buffer_;
static constexpr std::size_t buffer_size = 100;
so_5::timer_id_t dump_pause_;
public :
kafka_msg_persister2(context_t ctx) : so_5::agent_t(std::move(ctx)) {
struct dump_by_timer : public so_5::signal_t {};
so_subscribe_self()
.event([this](const kafka_msg & msg) {
if(buffer_.empty())
dump_pause_ = so_5::send_periodic<dump_by_timer>(*this, 100ms, 0ms );
buffer_.push_back(msg);
if(buffer_.size() >= buffer_size)
persist();
})
.event<dump_by_timer>([this]{ persist(); });
}
private :
void persist() {
insert(buffer_);
buffer_.clear();
dump_pause_.reset();
}
};
|
Либо же можно было бы использовать факт того, что агент в SO-5 -- это конечный автомат, на переходы между состояниями которого можно повесить обработчики:
class kafka_msg_persister3 final : public so_5::agent_t {
std::vector< kafka_msg > buffer_;
static constexpr std::size_t buffer_size = 100;
state_t empty{this}, not_empty{this};
so_5::timer_id_t dump_pause_;
public :
kafka_msg_persister3(context_t ctx) : so_5::agent_t(std::move(ctx)) {
struct dump_by_timer : public so_5::signal_t {};
this >>= empty;
empty
.on_enter([this]{ dump_pause_.reset(); })
.transfer_to_state<kafka_msg>(not_empty);
not_empty
.on_enter([this]{
dump_pause_ = so_5::send_periodic<dump_by_timer>(*this, 100ms, 0ms );
})
.event([this](const kafka_msg & msg) {
buffer_.push_back(msg);
if(buffer_.size() >= buffer_size) {
persist();
this >>= empty;
}
});
.event<dump_by_timer>([this]{ persist(); });
}
private :
void persist() {
insert(buffer_);
buffer_.clear();
}
};
|
Т.е. при входе в состояние not_empty мы взводим таймер на 100ms, при входе в empty сбрасываем таймер, поскольку сейчас он нам уже не нужен.
Очевидно, что примеры на C++ и SO-5 более многословны, чем код на Go. Но задача была в том, чтобы показать, что на акторах накопление сообщений в буфер и затем сброс накопленных сообщений в БД по таймеру или по исчерпанию буфера -- это не сложно. Откуда у людей с этим возникли проблемы не понятно. Может быть дело в Akka, может быть они просто Akka готовить не умеют. Не знаю.
Очевидно, что есть люди, которым модель акторов в принципе не нравится. И которые предпочитают использовать CSP-ные каналы. Ну чтож, попробуем изобразить этот же пример на CSP-шных каналах, в SObjectizer-5:
std::vector<kafka_msg> buffer;
static constexpr std::size_t buffer_size = 100;
for(;;) {
receive(from(chain).handle_n(buffer_size).empty_timeout(100ms),
[&](const kafka_msg & msg) {
buffer.push_back(msg);
});
if(!buffer.empty())
persist(buffer);
}
|
Здесь мы выходим из receive либо после получения 100 сообщений kafka_msg, либо после того, как канал был пуст в течении 100ms. Как раз то, что было в исходном примере на Go.
А вот получить на каналах поведение, когда тайм-аут нужно отсчитывать не от последнего сообщения, а от первого полученного, с ходу не получается. Тут нужно думать. И, как мне кажется, на акторах такое делается проще, чем на каналах. Ну либо нужно использовать сразу несколько каналов. Очень грубо это может выглядеть так:
std::vector<kafka_msg> buffer;
static constexpr std::size_t buffer_size = 100;
struct dump_by_timer : public so_5::signal_t {};
auto timer_chain = create_mchain(env);
select( so_5::from_all(),
case_(chain, [&](const kafka_msg & msg) {
if(buffer.empty())
so_5::send_delayed<dump_by_timer>(timer_chain, 100ms);
buffer.push_back(msg);
if(buffer.size() >= buffer_size)
persist(buffer);
}),
case_(timer_chain, [&](mhood_t<dump_by_timer>) {
if(!buffer.empty())
persist(buffer);
}) );
|
Но здесь возможно множественное срабатывание таймеров. Например, если идет очень большая последовательность сообщений, то на каждое 1-ое, 101-ое, 201-ое и т.д. сообщения будет отсылаться отложенный сигнал. А потом в какой-то прекрасный момент эти сигналы начнут приходить. Что не есть большая проблема, но и не есть хорошо. Поэтому придется немного поколупаться с отменой ранее отосланных отложенных сигналов. И более-менее реальный код выглядел бы как-то так:
std::vector<kafka_msg> buffer;
static constexpr std::size_t buffer_size = 100;
struct dump_by_timer : public so_5::signal_t {};
auto timer_chain = create_mchain(env,
// Нам нужно хранить не более одного сигнала в этом канале.
1,
// Место под сигнал выделим сразу.
so_5::mchain_props::memory_usage_t::preallocated,
// При поступлении нового сигнала старый выбрасываем за ненадобностью.
so_5::mchain_props::overflow_reaction_t::remove_oldest);
// Идентификатор таймера нам нужен дабы была возможность отменять
// доставку отложенного сигнала.
so_5::timer_id_t dump_timer;
select( so_5::from_all(),
case_(chain, [&](const kafka_msg & msg) {
if(buffer.empty())
dump_timer = so_5::send_periodic<dump_by_timer>(timer_chain, 100ms, 0s);
buffer.push_back(msg);
if(buffer.size() >= buffer_size) {
dump_timer.reset();
persist(buffer);
}
}),
case_(timer_chain, [&](mhood_t<dump_by_timer>) {
if(!buffer.empty())
persist(buffer);
}) );
|
К чему я это все веду (ну, естественно, за вычетом маркетинга SO-5)? К тому, что язык высокого уровня дает разработчику возможность создавать те абстракции, которые ему нужны для решения задачи. Нужны акторы -- можно сделать акторов, нужны CSP-шные каналы -- можно сделать каналы. Если же возможность по созданию абстракций под задачу не нужна, значит задача вполне себе решается более простым и примитивным языком. Т.е. изначально людям нужен был Go, а не Scala. И нахрена было тянуть в проект Scala, дабы затем плакаться о том, что "кололись, но жрали" -- не понятно.
Впрочем, если посмотреть на бэкграунд разработчиков:
то все встает на свои места. Лишь у одного было знакомство с Java за плечами. В общем, люди изначально не видели инструментов, которые специально создавались для нормальной разработки софта, обычными программистами, а не хипстерами. Но за Scala взялись. От и результат такой вот и получился ;)