четверг, 17 мая 2018 г.

[prog.c++] Data exchange between threads without a pain? CSP-channels to rescue

Development of multi-threaded code is a complex task. Really complex. Fortunately there are some high-level abstractions those have been invented long time ago, for example: task-based parallelism, map-reduce/fork-join, CSP, actors and so on.

But it seems that many C++ developers just don't know anything else than std::thread and std::mutex. There are too many questions like: "I have to launch three work threads. The first must do that, the second must do that, and the third must do that. I launch them this way and do information exchange between them this way. Am I do it right?"

It is a pity that many C++ developers, especially novices, know about std::thread but don't know about ready to use tools which can simplify their lives (like Intel TBB, HPX, QP/C++, Boost.Fiber, FastFlow, CAF, SObjectizer, etc).

It seems that a simple example of usage of some existing library can help to explain how high-level abstractions can reduce the complexity of multi-threaded code development. Because we develop SObjectizer as a tool for simplification of development on multi-threaded code in C++ we will try to show how to use CSP-channels implemented in SObjectizer to avoid developer's headache.

A Simple Demo Example

This repository contains a very simple demo example. It is a small application which performs a "dialog" with an user on the main thread. When an user enters 'exit' the application finishes its work.

The application has two additional work threads. One of them is used to simulate data acquisition from some sensor. The second is used to "saving" the "acquired" data to a file.

There is no any real work in the application, sleep operation are used to block work threads for some time. That way we simulate interaction with external device and file system.

This is a very crude imitation, but we hope it won't confuse readers. Our goal is to show interaction between work threads by using CSP-channels (they are called mchains in SObjectizer), but not to implement an integration with real external device.

A Few Words About The Example's Working Principle

We have three work threads: the main application thread and two additional threads.

The first additional thread, that we will call meter_reader_thread, is dedicated to data "acquisition" from the "sensor". This thread requires two mchains. The first mchain will be used for sending commands to meter_reader_thread itself. For example, this mchain will be the target for messages of type acquisition_turn. When meter_reader_thread receives a message of that type it will perform "acquisition" of data.

The second mchain is required for sending "acquired_data" to the second additional work thread. The second thread, which we will call file_writer_thread, is dedicated to "storing" "acquired" data to a file. file_writer_thread reads commands from mchain and "performs" them. The thread sleeps if the mchain is empty.

So we have the following simple scheme:

Both additional threads finish their work when mchains will be closed in the main thread.

Explanations For A Simple Example

You can found source code of a simple example here.

We will go from simple things to more complex ones. At the beginning we will see the code of meter_reader_thread and file_writer_thread functions and then we will go to main() function where we will talk about some multi-thread related tricks.

file_writer_thread() Function

The file_writer_thread() function is the simplest function in the example. There is the full source code:

void file_writer_thread(
      // A channel for incoming commands.
      so_5::mchain_t file_write_ch) {
   // Read all messages from the channel until the channel will be closed.
   receive(from(file_write_ch),
         // A handler for write_data messages.
         [&](so_5::mhood_t<write_data> cmd) {
            // Simulate data storing.
            std::cout << cmd->file_name_ << ": write started" << std::endl;
            std::this_thread::sleep_for(350ms);
            std::cout << cmd->file_name_ << ": write finished" << std::endl;
         });
}

All what file_writer_thread() is doing -- is waiting to return from receive(). receive() function is waiting for arrival of a message into the channel and, when a message comes, finds a handler for this message between handlers passed to receive().

In this case we use just one handler: for a message of type write_data. This handler will be called when a message of this type is sent into the channel. This handler contains all "business logic" -- simulation of writing data to a file.

There are two versions of receive() in SObjectizer. The first version extracts just one message from a channel. But we use another version in this example. The second version extracts all messages and returns only when the channel is closed. It means that the return from file_writer_thread() will be performed only when receive() finished its work. And this will be only when someone close channel file_write_ch.

meter_reader_thread() Function

The meter_reader_function() function is a bit more complex:

void meter_reader_thread(
      // A channel for timer signals.
      so_5::mchain_t timer_ch,
      // A channel for communication with file_writer_thread.
      so_5::mchain_t file_write_ch) {

   // A type for periodic timer signal.
   struct acquisition_turn : public so_5::signal_t {};

   // Just a counter for generation of new file names.
   int ordinal = 0;

   // Start periodic timer.
   auto timer = so_5::send_periodic<acquisition_turn>(timer_ch, 0ms, 750ms);

   // Read all messages from the channel until the channel will be closed.
   receive(from(timer_ch),
         // A handler for acquisition_turn signal.
         [&](so_5::mhood_t<acquisition_turn>) {
            // Simulate data acquisition.
            std::cout << "meter read started" << std::endl;
            std::this_thread::sleep_for(50ms);
            std::cout << "meter read finished" << std::endl;

            // Send a command to write a new file.
            so_5::send<write_data>(file_write_ch,
                  "data_" + std::to_string(ordinal) + ".dat");
            ++ordinal;
         });
}

We define a type for signal acquisition_turn here. This signal will be periodically sent to simulate data "acquisition".

Then we start periodic signal of type acquisition_turn by calling send_periodic() function. SObjectizer will send an instances of acquisition_turn into timer_ch every 750ms.

Then we do a call to receive() function just like we have done above. We pass a handler to receive() that implements a reaction to acquisition_turn. This handler simulates data "acquisition" and then sends a command to store "acquired" data to file_writer_thread by sending message of type write_data into file_write_ch channel.

It means that meter_reader_thread() sleeps inside receive() and periodically wakes up when acquisition_turn is sent by SObjectizer. When meter_reader_thread wakes up it sends write_data message to file_writer_thread and then falls to sleep again. Until someone close timer_ch channel.

main() Function

Before we go to discussion about main()'s code it is necessary to explain some tricky moments. Because without such explanations some fragments of main() can be unclear.

The main problem we have when working with threads and CSP-channels is correct and well-timed completion of work threads. It means that if we start a thread then we must call std::thread::join() method to wait the completion of that thread (we don't use detached threads here). The simplest way is to call std::thread::join() manually at the end of main(). Something like that:

int main() {
   ...
   std::thread file_writer{file_writer_thread};
   ...
   file_writer.join();
}

But this naive approach doesn't defend us from exceptions and other ways of leaving the scope.

An utility class that calls std::thread::join() in the destructor can help us. We could do something like:

class auto_joiner {
   std::thread & t_;
   ... // Copy/move are disabled here.
public:
   auto_joiner(std::thread & t) : t_{t} {}
   ~auto_joiner() { t_.join(); }
};

int main() {
   ...
   std::thread file_writer{file_writer_thread};
   auto_joiner file_writer_joiner{file_writer};
   ...
}

There is no need to implement such utility class -- SObjectizer already provides similar tool. We will see it in the code bellow. The main difference is: SObjectizer's utility class allows to call join() for several std::thread objects.

But there is a nuance: if a thread performs a call to receive() then we should close the channel. Otherwise there won't be a return from receive() and we will sleep on join() forever.

It means that we should take care about automatic closing of channel before exit from main(). We will use the similar approach as for calling join() for work thread: a helper object that calls close() for a channel in the destructor. It means we will do something like:

int main() {
   ...
   auto ch = so_5::create_mchain(...);
   auto_closer ch_closer{ch};
   ...
}

Like with auto_joiner there is no need to implement auto_closer class, because SObjectizer already has an appropriate implementation.

We have discussed how to take care about calling join() for threads and close() for channels. But there still is another very important nuance: an order in which join() and close() should be called. Because if we write such simple sequence:

int main() {
   ...
   auto ch = so_5::create_mchain(...);
   auto_closer ch_closer{ch};
   ...
   std::thread work_thread{[ch]{ receive(from(ch), ...); }};
   auto_joiner work_thread_joiner{work_thread};
   ...
}

we will receive a classical deadlock and will stuck in the destructor of auto_joiner.

It is because the destructor of auto_joiner is called before the destructor of auto_closer. It means that we will call join() for a thread which sleeps in receive() on the unclosed channel.

To resolve this problem we should change the order of creation of objects in main() function:

int main() {
   ...
   // Create an object for work thread. But thread itself is not started yet.
   std::thread work_thread;
   auto_joiner work_thread_joiner{work_thread};
   ...
   // Now we can create a channel for work thread.
   auto ch = so_5::create_mchain(...);
   auto_closer ch_closer{ch};
   ...
   // And now we can start new work thread.
   work_thread = std::thread{[ch]{ receive(from(ch), ...); }};
   ...
}

Now we can take a look at the code of main() function:

int main() {
   // Launch SObjectizer.
   so_5::wrapped_env_t sobj;

   // Objects for thread created now...
   std::thread meter_reader, file_writer;
   // ...just to create auto_joiner object before auto_closer object.
   auto joiner = so_5::auto_join(meter_reader, file_writer);

   // Create channels for our threds.
   auto timer_ch = so_5::create_mchain(sobj);
   auto writer_ch = so_5::create_mchain(sobj);
   // Channels must be automatically closed on exit.
   auto closer = so_5::auto_close_drop_content(timer_ch, writer_ch);

   // Now we can start our threads.
   meter_reader = std::thread(meter_reader_thread, timer_ch, writer_ch);
   file_writer = std::thread(file_writer_thread, writer_ch);

   // We should work until user enters 'exit' or closes the input stream.
   std::cout << "Type 'exit' to quit:" << std::endl;

   std::string cmd;
   while(std::getline(std::cin, cmd)) {
      if("exit" == cmd)
         break;
      else
         std::cout << "Type 'exit' to quit" << std::endl;
   }

   // Just returns from main(). All channels will be closed automatically
   // (because of closer object) and all threads will be joined automatically
   // (because of joiner object).
   return 0;
}

We hope that this code is obvious now. Two things may require some explanations.

The first one is a creation of so_5::wrapped_env_t object at the beginning of main(). It is an instance of SObjectizer Environment. This Environment is necessary as for creation of mchains, as for serving of timers (a call of send_periodic() in meter_reader_thread() hides access to SObjectizer's timer).

The second one is a call of auto_close_drop_content(). From the one side it's obvious: this function returns auto_closer object that will close mchains in the destructor. But, from other side, what does 'drop_content' mean?

There are two modes of channel close operation. In the first mode all unprocessed messages from mchain will be removed. For example, if there are 100500 messages in the mchain and close() is called then all those messages will be just dropped. This mode called 'drop_content' and auto_close_drop_content() creates auto_closer object which uses that mode.

The second mode works just opposite: it will keep all unprocessed messages in the mchain. It allows receive() to complete processing of the messages in the mchain. But new message can't be placed into closed mchain. This mode called 'retain_content'.

Both modes are used in different cases. We need drop_content mode here and because of that we use auto_close_drop_content() function.

A Result Of The First Example

If we run the first example we will see the expected results:

We see sequential "acquisitions" and "writes".

The Second Example: Make The Simple Example More Complex With Overload Control For file_writer_thread

The full source code of the second example can be found here.

The first version of our example is a bit idealistic: we believe that writing the "acquired" data will always be completed before the next "acquisition". But in a real life it not always possible: a time of communication with external devices or filesystem can vary in wide ranges. It means we have to take care about cases when "writing" operation will take much more time and count of messages in file_write_ch will grow.

To simulate problematic cases we slightly modify meter_reader_thread() and file_writer_thread() functions. We shorten period for acquisition_turn signal:

auto timer = so_5::send_periodic<acquisition_turn>(timer_ch, 0ms, 300ms);

We also modify file_writer_thread() to choose random time of "write" operation from range [295ms, 1s]. It means that sometimes "write" operation will be completed before the next "acquisition", but not in most cases. Sometimes "write" operation will last much longer than pause between "acquisitions". So we rewrite file_writer_thread() as following:

void file_writer_thread(
      // A channel for incoming commands.
      so_5::mchain_t file_write_ch) {
   // Helpers for random values generation.
   std::mt19937 rd_gen{std::random_device{}()};
   // Pauses will be generated from a range [295ms, 1s].
   std::uniform_int_distribution<int> rd_dist{2951000};

   // Read all messages from the channel until the channel will be closed.
   receive(from(file_write_ch),
         // A handler for write_data messages.
         [&](so_5::mhood_t<write_data> cmd) {
            // Generate random pause for simulation.
            const auto pause = rd_dist(rd_gen);
            // Simulate data storing.
            std::cout << cmd->file_name_ << ": write started (pause:"
                  << pause << "ms)" << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds{pause});
            std::cout << cmd->file_name_ << ": write finished" << std::endl;
         });
}

It means that file_write_ch can collect unprocessed messages now. It is a known overload problem: a producer generates more data than consumer can process. This is unpleasant problem and we have to cope with it.

For example, it is possible to implement "back pressure" mechanism. When a producer starts to overload a consumer then the consumer tells the producer about it. In the case of CSP-channels a natural way to do that is to block the producer on an attempt to write to the full channel. The blocked producer will be unblocked when the consumer will be able to get next bunch of messages from the producer.

So we want that meter_reader_thread will fall to sleep if file_writer_thread can't process incoming messages at the current speed. Can SObjectizer provide this for us?

Yes!

It is necessary to specify additional parameters during creation of a mchain. In the first version of our example we use the simplest way of mchain creation:

auto writer_ch = so_5::create_mchain(sobj);

In that case a mchain without any limits will be created. This mchain can hold as many messages as allows available amount of RAM.

But we want "back pressure". So we can't use unlimited mchain. We need to limit the capacity of new mchain.

We also want that a producer fall asleep on attempt to push a message to the full channel. It isn't a problem. But in SObjectizer we must limit this sleeping time. For example: fall asleep but no more than for five seconds.

SObjectizer requires a time limit because it is very easy to catch a deadlock without such limit. For example, thread T1 tries to write a message to overloaded channel C1 for thread T2. Thread T2 is trying to write a message to overloaded channel C2 for thread T3. And thread T3 is trying to write a message to overloaded channel C0 for thread T1. If there are time limits then deadlock will be broken automatically.

So far so good: we can limit capacity of a channel and can limit waiting time. But there is a question: "What to do if the channel is still full after waiting time elapsed?"

We can choose what to do in such case. For example, we can remove the oldest message in the channel. Or we can ignore the newest message. Or we can throw an exception.

We will use "remove oldest" policy for our case. It is rather logical because we already have a new data from sensor and storing this data is more actual than storing the oldest one. Because of that we will create file_write_ch the following way:

auto writer_ch = so_5::create_mchain(sobj,
      // Wait on full channel no more than 300ms.
      300ms,
      // No more than 2 messages in the channel.
      2,
      // Memory will be preallocated.
      so_5::mchain_props::memory_usage_t::preallocated,
      // The oldest message should be removed on overflow.
      so_5::mchain_props::overflow_reaction_t::remove_oldest);

May be some additional explanation is required for so_5::mchain_props::memory_usage_t::preallocated value. This parameter tells how memory for message queue inside a mchain will be managed. Because our channel has very small size it is better to preallocate memory for that queue. That is way we use memory_usage_t::preallocated here.

Limitations For Timer Channel For meter_reader_thread

We have introduced limits for file_write_ch. But we also have timer_ch. Is there any sense to limit this channel?

There is a such sense. We can have a channel with capacity for just one message for storing acquisition_turn signals. If that channel already has an instance of that signal then new signals can be safely ignored.

Because of that we rewrite timer_ch creation code:

auto timer_ch = so_5::create_mchain(sobj,
      // Just one message can wait in the channel.
      1,
      // Memory will be preallocated.
      so_5::mchain_props::memory_usage_t::preallocated,
      // Newest message will be ignored if the channel is full.
      so_5::mchain_props::overflow_reaction_t::drop_newest);

There are two important distinctions here:

  • there is no any waiting on attempt to push a message into the full channel. Such waiting is useless. Moreover SObjectizer's timer will push messages into that channel. This timer can't wait in principle (otherwise the timer can't do its work);
  • we use 'drop_newest' policy for handling overload case. It means that when the timer will push a new message into the full channel that message will be ignored.

A Result Of The Second Example

After launch of the second example we can see the following picture:

One may see that part of file numbers are gone from debugging output from file_writer_thread. For example, there is data_26.dat after data_24.dat, but there is no data_25.dat. It is because write_data for data_25.dat was removed on channel overload.

One also can see that meter_reader_thread can perform several "acquisitions" when file_writer_thread performs "writing" for a long time.

The Third Example: Add Some Control For meter_reader_thread

The full source code of the third example can be found here.

It is hard to resist to desire to make the example yet more complex. This time we will add some control to meter_reader_thread. Really, what could stop us from an attempt to add a possibility to increase or decrease a period of data acquisition? Let's do it.

Let's the main thread accept two new commands: `inc` for increasing time between acquisitions and `dec` for decreasing this time.

The main question here is how to deliver new commands to meter_reader_thread? But it is not a hard question. We simply add two new message types:

// A command to decrease data acquisition period.
struct dec_read_period : public so_5::signal_t {};

// A command to increase data acquisition period.
struct inc_read_period : public so_5::signal_t {};

The main thread will send such messages to meter_reader_thread when an user enters some of them:

// We should work until user enters 'exit' or closes the input stream.
bool stop_execution = false;
while(!stop_execution) {
   std::cout << "Type 'exit' to quit, 'inc' or 'dec':" << std::endl;

   std::string cmd;
   if(std::getline(std::cin, cmd)) {
      if("exit" == cmd)
         stop_execution = true;
      else if("inc" == cmd)
         so_5::send<inc_read_period>(control_ch);
      else if("dec" == cmd)
         so_5::send<dec_read_period>(control_ch);
   }
   else
      stop_execution = true;
}

But which channel will be used for these commands? This is an interesting question.

Of course we can use timer_ch for sending new commands. But we want to show more SObjectizer's features and because of that we will use two channels for meter_reader_thread:

  • the first one (with name control_ch) will be used for inc_/dec_read_period messages. It will be the simplest channel without any restrictions;
  • the second one (with name timer_ch) will be used for periodic acquisition_turn messages. This will be fixed-size mchain with 'drop_newest` policy.

For a simplicity these channels will be created in main() function and will be passed to meter_reader_thread() function as parameters:

// Control channel for meter_reader_thread. Without any restrictions.
auto control_ch = so_5::create_mchain(sobj);
// A timer channel for meter_reader_thread.
auto timer_ch = so_5::create_mchain(control_ch->environment(),
      // Just one message can wait in the channel.
      1,
      // Memory will be preallocated.
      so_5::mchain_props::memory_usage_t::preallocated,
      // Newest message will be ignored if the channel is full.
      so_5::mchain_props::overflow_reaction_t::drop_newest);
...
// Channels must be automatically closed on exit.
auto closer = so_5::auto_close_drop_content(control_ch, timer_ch, writer_ch);

// Now we can start our threads.
meter_reader = std::thread(meter_reader_thread, control_ch, timer_ch, writer_ch);

Modified Version Of meter_reader_thread() Function

There are two major changes in meter_reader_thread() in comparison with the first and the second versions of the example.

The first change is: now we can increment or decrement acquisition period. Because of that we will no use send_periodic() function anymore. Now we will track actual "acquisition" time. If this time is greater than acquisition period then we will send next acquisition_turn without any delay. But if actual "acquisition" time is less than acquisition period then we will send acquisition_turn as delayed message.

Because of that handling of acquisition_turn now will look like:

// A handler for acquisition_turn signal.
[&](so_5::mhood_t<acquisition_turn>) {
   // We should know how much time was spent.
   // Take the first timepoint.
   const auto started_at = std::chrono::steady_clock::now();

   // Simulate data acquisition.
   std::cout << "meter read started" << std::endl;
   std::this_thread::sleep_for(50ms);
   std::cout << "meter read finished" << std::endl;

   // Send a command to write a new file.
   so_5::send<write_data>(file_write_ch,
         "data_" + std::to_string(ordinal) + ".dat");
   ++ordinal;

   // Now we can calculate time spent.
   const auto duration = std::chrono::steady_clock::now() - started_at;
   // If procedure lasts too long then the next acquisition
   // must be scheduled immediately.
   if(duration >= current_period) {
      std::cout << "period=" << current_period.count()
            << "ms, no sleep" << std::endl;
      so_5::send<acquisition_turn>(timer_ch);
   }
   else {
      // Otherwise we can sleep for some time.
      const auto sleep_time = to_ms(current_period - duration);
      std::cout << "period=" << current_period.count() << "ms, sleep="
            << sleep_time.count() << "ms" << std::endl;
      so_5::send_delayed<acquisition_turn>(timer_ch,
            current_period - duration);
   }
}),

The second change is: now we have to deal with two channels. Because of that we will use so_5::select() instead of so_5::receive(). Function select() is similar to receive() but can receive and handle messages from several channels.

As result we will use select() that way (it is very schematic example without specific details):

// Read all messages from the channel until all channels will be closed.
so_5::select(so_5::from_all(),
   // Handlers for timer channel.
   case_(timer_ch,
      [&](so_5::mhood_t<acquisition_turn>) {...}),
   // Handlers for commands from control channel.
   case_(control_ch,
      [&](so_5::mhood_t<inc_read_period>) {...},
      [&](so_5::mhood_t<dec_read_period>) {...})
);

This code tells: we will wait messages from two channels until all channels will be closed and there are two sets of message handlers (each set for every channel). For timer_ch there is just one handler -- for acquisition_turn signal. And there are two handlers for messages from control_ch -- for inc_/dec_read_period.

It means that this version of meter_reader_thread() returns only when both channels are closed in main() function.

A Result Of The Third Example

After launch of the third example and issue several 'inc' commands we can see the following picture:

Conclusion

Someone can say that examples described above are not interesting and are far from real life. May be. We won't argue. Our goal was to show some of SObjectizer's features which can make development of multi-threaded code easier. Because of that we use artificial example that has to be understandable. But if a reader proposes a more interesting example then we will try to implement it and describe it in a new article.

It is important to note another thing: SObjectizer is not a unique tool. Similar features may be found in other libraries (like HPX or Boost.Fibers). Other frameworks can also have additional features which could be more appropriate for your tasks than those we implemented in our tool.

So an additional bullet point of this article is: try to look around, probably there are a lot of ready to use tools which can simplify your work greatly and save you many hours of writing, debugging, documenting and maintaining homegrown "solutions".

In conclusion we want invite everyone to try SObjectizer and to share your experience. Your feedback is very important to us and allow us to make SObjectizer better.

Отправить комментарий