вторник, 29 января 2019 г.

[prog.c++] "Modern" Dining Philosophers in C++ with Actors and CSP (part 2)

In the previous post we discussed several implementations of "dining philosophers problems" based on Actor Model. In this post I will try to tell about some implementations based on ideas from CSP. Forks, philosophers and waiter will be represented as std::thread and will communicate each other via CSP-channels only.

Source code can be found in the same repository.

CSP-based Implementations

Like in the previous post we will start from implementation of Dijkstra's solution, then we will go to simple solution with putting the first fork if an attempt to take the second fork fails (without arbiter), and then we will go to solution with waiter/arbiter. It means that we will see the same solutions like in previous post, but reimplemented without Actors. Moreover the same set of messages (e.g. take_t, taken_t, busy_t, put_t) from Actor-based implementations will be reused "as is" in CSP-based implementations.

As in previous post SObjectizer framework is used, but now I will use only CSP-related features of SObjectizer. And we will see SObjectizer-based code that doesn't use agents at all. I hope the code with my comments and explanations will be understandable. If not feel free to ask questions in comments.

Dijkstra's Solution

Source code of the implementation of Dijkstra's solution can be found here.

Fork Thread

Fork thread in the implementation of Dijstra's solution does very simple thing: reads and handles messages take_t and put_t until input channel will be closed. The code of fork thread looks like that:

void fork_process(
   so_5::mchain_t fork_ch )
{
   // State of the fork.
   bool taken = false;

   // Queue of waiting philosophers.
   std::queue< so_5::mbox_t > wait_queue;

   // Receive and handle all messages until the channel will be closed.
   so_5::receive( so_5::from( fork_ch ),
         [&]( so_5::mhood_t<take_t> cmd ) {
            if( taken )
               // Fork already taken. The requester should be stored in queue.
               wait_queue.push( cmd->m_who );
            else
            {
               // Fork can be acquired by the requester.
               taken = true;
               so_5::send< taken_t >( cmd->m_who );
            }
         },
         [&]( so_5::mhood_t<put_t> ) {
            if( wait_queue.empty() )
               taken = false// No more waiting philosophers. Fork is free again.
            else
            {
               // The first philosopher from queue should be notified.
               const auto who = wait_queue.front();
               wait_queue.pop();
               so_5::send< taken_t >( who );
            }
         } );
}

Function fork_process is the main function of fork thread. It receives just one argument: input channel that was created somewhere else.

Then some variables are declared. These variables will contain the state of "fork".

And then there is a "loop" until channel will be closed:

so_5::receive( so_5::from( fork_ch ),
      [&]( so_5::mhood_t<take_t> cmd ) {...},
      [&]( so_5::mhood_t<put_t> ) {...} );

This "loop" is just a single call to SObjectizer's function receive(). There are several forms of receive() function in SObjectizer and here we see just one on them. It reads all messages from specified channel and calls appropriate message handlers for messages read. If there is no a message handler for a message this message will not be handled and will just be thrown out.

Message handlers are specified as lambda functions. These lambda functions are just twins of the same lambdas from the Actors-based solution. They implement just the same logic.

Philosopher Thread

Philosopher thread is implemented as philosopher_process function:

void philosopher_process(
   trace_maker_t & tracer,
   so_5::mchain_t control_ch,
   std::size_t philosopher_index,
   so_5::mbox_t left_fork,
   so_5::mbox_t right_fork,
   int meals_count )
{
   int meals_eaten{ 0 };

   random_pause_generator_t pause_generator;

   // This channel will be used for replies from forks.
   auto self_ch = so_5::create_mchain( control_ch->environment() );

   while( meals_eaten < meals_count )
   {
      tracer.thinking_started( philosopher_index, thinking_type_t::normal );

      // Simulate thinking by suspending the thread.
      std::this_thread::sleep_for(
            pause_generator.think_pause( thinking_type_t::normal ) );

      // Try to get the left fork.
      tracer.take_left_attempt( philosopher_index );
      so_5::send< take_t >( left_fork, self_ch->as_mbox(), philosopher_index );

      // Request sent, wait for a reply.
      so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
         [&]( so_5::mhood_t<taken_t> ) {
            // Left fork is taken.
            // Try to get the right fork.
            tracer.take_right_attempt( philosopher_index );
            so_5::send< take_t >(
                  right_fork, self_ch->as_mbox(), philosopher_index );

            // Request sent, wait for a reply.
            so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
               [&]( so_5::mhood_t<taken_t> ) {
                  // Both fork are taken. We can eat.
                  tracer.eating_started( philosopher_index );

                  // Simulate eating by suspending the thread.
                  std::this_thread::sleep_for( pause_generator.eat_pause() );

                  // One step closer to the end.
                  ++meals_eaten;

                  // Right fork should be returned after eating.
                  so_5::send< put_t >( right_fork );
               } );

            // Left fork should be returned too.
            so_5::send< put_t >( left_fork );
         } );

   }

   // Notify about the completion of the work.
   tracer.philosopher_done( philosopher_index );
   so_5::send< philosopher_done_t >( control_ch, philosopher_index );
}

There is a plenty of code so let's deal with it part-by-part. We will start from prototype of philosopher_process function:

void philosopher_process(
   trace_maker_t & tracer,
   so_5::mchain_t control_ch,
   std::size_t philosopher_index,
   so_5::mbox_t left_fork,
   so_5::mbox_t right_fork,
   int meals_count )

There a lot of arguments and some of them need to be explained.

There is no SObjectizer's agents in CSP-based solutions so we can't use agent_state_listener_t to form traces of philosopher's actions. Because of that philosopher has to explicitly tell what is happened to it, like in that fragment:

tracer.thinking_started( philosopher_index, thinking_type_t::normal );

So the 'tracer' is a reference to object that forms traces which we can see and analyze after a simulation.

Argument 'control_ch' is an output channel where message philosopher_done_t should be written after completion of philosopher's work. This channel will be used to detect the moment when all philosophers complete simulation.

Arguments 'left_fork' and 'right_fork' identify channels of left and right forks. These channels will be used for sending take_t and put_t messages. But if 'left_fork' and 'right_fork' are channels why they have type mbox_t instead of mchain_t?

It is a good question. But we won't discuss it now. The answer will be delayed until we seen solution with a waiter. It is enough to say that every channel is a kind of mbox and because of that we can hold a reference to channel via mbox_t handle.

Then some variables are defined:

int meals_eaten{ 0 };

random_pause_generator_t pause_generator;

// This channel will be used for replies from forks.
auto self_ch = so_5::create_mchain( control_ch->environment() );

They form state of a "philosopher".

Maybe the most important variable here is 'self_ch'. It is the personal channel for "philosopher". A reference to that channel will be passed in take_t message. And this channel will be used for reading replies from "forks".

Then we see main loop with simulation of philosopher's logic: thinking, grabbing forks, eating.

In contrast to Actors-based implementation this_thread::sleep_for() is used for suspending thread when thinking and eating are simulated.

Attempts to take a fork expressed almost the same way as in Actor-based implementation, via sending take_t message:

so_5::send< take_t >( left_fork, self_ch->as_mbox(), philosopher_index );

Please note that the same type take_t is used here. This type has field of type mbox_t, but 'self_ch' variable has type mchain_t. So we can't pass 'self_ch' to take_t directly, we have to convert a reference to mchain to a reference to mbox via as_mbox() method.

Here we also see a call to receive() function:

so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
      [&]( so_5::mhood_t<taken_t> ) {...} );

This invocation of receive() function returns only when a message of type taken_t will be read and handled. Or if channel is closed. So here we wait while a fork sends us taken_t reply.

It is almost all that can be said about philosopher_process function. But one interesting moment can be mentioned -- this is nested call of receive() for the same channel:

so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
      [&]( so_5::mhood_t<taken_t> ) {
         ...
         so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
               [&]( so_5::mhood_t<taken_t> ) {...} );
         ...
      } );

This allows to express philosopher's logic in a straightforward manner.

Run Simulation Routine

We didn't discuss routines those run simulations in Actors-based implementations. But we have to do that for CSP-based Dijkstra's solution. Because run_simulation() function contains some important moments mentioned but not shown before. And because run_simulation() code shows how hard to write correct multi-threaded code.

There is the full code of run_simulation() function:
void run_simulation(
   so_5::environment_t & env,
   const names_holder_t & names ) noexcept
{
   const auto table_size = names.size();
   const auto join_all = []( std::vector<std::thread> & threads ) {
      forauto & t : threads )
         t.join();
   };

   trace_maker_t tracer{
         env,
         names,
         random_pause_generator_t::trace_step() };

   // Create forks.
   std::vector< so_5::mchain_t > fork_chains;
   std::vector< std::thread > fork_threads( table_size );

   for( std::size_t i{}; i != table_size; ++i )
   {
      // Personal channel for fork.
      fork_chains.emplace_back( so_5::create_mchain(env) );
      // Run fork as a thread.
      fork_threads[ i ] = std::thread{ fork_process, fork_chains.back() };
   }

   // Chain for acks from philosophers.
   auto control_ch = so_5::create_mchain( env );

   // Create philosophers.
   const auto philosopher_maker =
         [&](auto index, auto left_fork_idx, auto right_fork_idx) {
            return std::thread{
                  philosopher_process,
                  std::ref(tracer),
                  control_ch,
                  index,
                  fork_chains[ left_fork_idx ]->as_mbox(),
                  fork_chains[ right_fork_idx ]->as_mbox(),
                  default_meals_count };
         };
   std::vector< std::thread > philosopher_threads( table_size );
   for( std::size_t i{}; i != table_size - 1u; ++i )
   {
      // Run philosopher as a thread.
      philosopher_threads[ i ] = philosopher_maker( i, i, i+1u );
   }
   // The last philosopher should take forks in opposite direction.
   philosopher_threads[ table_size - 1u ] = philosopher_maker(
         table_size - 1u,
         table_size - 1u,
         0u );

   // Wait while all philosophers completed.
   so_5::receive( so_5::from( control_ch ).handle_n( table_size ),
         [&names]( so_5::mhood_t<philosopher_done_t> cmd ) {
            fmt::print( "{}: done\n", names[ cmd->m_philosopher_index ] );
         } );

   // Wait for completion of philosopher threads.
   join_all( philosopher_threads );

   // Close channels for all forks.
   forauto & ch : fork_chains )
      so_5::close_drop_content( ch );

   // Wait for completion of fork threads.
   join_all( fork_threads );

   // Show the result.
   tracer.done();

   // Stop the SObjectizer.
   env.stop();
}

I hope some code of run_simulation() is quite understandable. Because of that I comment only some fragments of it.

We have to create threads for "forks". Every fork_process should receive a channel as an argument. The following fragment does exactly that:

// Create forks.
std::vector< so_5::mchain_t > fork_chains;
std::vector< std::thread > fork_threads( table_size );

for( std::size_t i{}; i != table_size; ++i )
{
   // Personal channel for fork.
   fork_chains.emplace_back( so_5::create_mchain(env) );
   // Run fork as a thread.
   fork_threads[ i ] = std::thread{ fork_process, fork_chains.back() };
}

We create channels and spawn threads here. We also collect channels because we need reference to them for philosophers. We also need to collect threads because those threads should be joined before the return from run_simulation().

Then we have to create threads for philosophers. And we also have to collect threads because we have to join they later. This is done in the following fragment:

std::vector< std::thread > philosopher_threads( table_size );
for( std::size_t i{}; i != table_size - 1u; ++i )
{
   // Run philosopher as a thread.
   philosopher_threads[ i ] = philosopher_maker( i, i, i+1u );
}
// The last philosopher should take forks in opposite direction.
philosopher_threads[ table_size - 1u ] = philosopher_maker(
      table_size - 1u,
      table_size - 1u,
      0u );

Then we should take some time to philosophers to do the simulation. At the end of simulation every philosopher send philosopher_done_t message to control channel. Because of that we wait while we receive all philosopher_done_t messages:

so_5::receive( so_5::from( control_ch ).handle_n( table_size ),
      [&names]( so_5::mhood_t<philosopher_done_t> cmd ) {
         fmt::print( "{}: done\n", names[ cmd->m_philosopher_index ] );
      } );

This call to receive() returns only when exactly table_size messages of type philosopher_done_t will be received and handled.

After receive of all philosopher_done_t messages we can do cleanup actions.

First of all we join all philosopher threads:

join_all( philosopher_threads );

Then we should join all fork threads. But if we simply call join_all for fork_threads we will hang forever. It is because every fork thread sleeps in call to receive() and won't be awoken.

Because of that we close all fork channels first and only then we call join for all fork threads:

// Close channels for all forks.
forauto & ch : fork_chains )
   so_5::close_drop_content( ch );

// Wait for completion of fork threads.
join_all( fork_threads );

Some Words About noexcept

I hope that the code of run_simulation() is fully understandable now and I can try to explain why run_simulation() is marked as noexcept. This is because there is no exception safety at all. And the only good way to cope with exceptions in such trivial implementation is to terminate the whole application.

Why run_simulation() is not exception safe?

Let's suppose we have created some fork threads and we get an exception during creation of the next fork thread. To provide basic exception safe guarantee we should finish work of already spawned fork threads. One can think that the fragment of forks threads creation should look like:

try
{
   for( std::size_t i{}; i != table_size; ++i )
   {
      // Personal channel for fork.
      fork_chains.emplace_back( so_5::create_mchain(env) );
      // Run fork as a thread.
      fork_threads[ i ] = std::thread{ fork_process, fork_chains.back() };
   }
}
catch( ... )
{
   for( std::size_t i{}; i != fork_chains.size(); ++i )
   {
      so_5::close_drop_content( fork_chains[ i ] );
      if( fork_threads[ i ].joinable() )
         fork_threads[ i ].join();
   }
   throw;
}

Unfortunately, it is obvious but incorrect solution. Because an exception can be thrown after creation of fork threads. So it is better to do something like that:

struct fork_threads_stuff_t {
   std::vector< so_5::mchain_t > m_fork_chains;
   std::vector< std::thread > m_fork_threads;

   fork_threads_stuff_t( std::size_t table_size )
      :  m_fork_threads( table_size )
   {}
   ~fork_threads_stuff_t()
   {
      for( std::size_t i{}; i != m_fork_chains.size(); ++i )
      {
         so_5::close_drop_content( m_fork_chains[ i ] );
         if( m_fork_threads[ i ].joinable() )
            m_fork_threads[ i ].join();
      }
   }

   void run()
   {
      for( std::size_t i{}; i != m_fork_threads.size(); ++i )
      {
         // Personal channel for fork.
         m_fork_chains.emplace_back( so_5::create_mchain(env) );
         // Run fork as a thread.
         m_fork_threads[ i ] = std::thread{ fork_process, m_fork_chains.back() };
      }
   }
} fork_threads_stuff{ table_size }; // Preallocate resources.
fork_threads_stuff.run(); // Create channels and run threads.
   // All necessary cleanup will be done in the destructor of fork_threads_stuff.

Or we can use some kind of tricks to run cleanup code at the end of scope (Boost's ScopeExit, GSL's finally() or other variations on this topic).

The same issue we have with philosopher threads. And this issue can be solved similar way.

Such cleanup code makes run_simulation() bigger and harder to understand. Because run_simulation() is just an example without any attempt to pretend to be production-ready I decided to skip any exception handling and mark run_simulation() as noexcept: any exception will crash the whole example and I suppose this is appropriate for this kind of applications.

But the problem is that without some experience with multithreading it is hard to understand which cleanup code will work and which not. For example, a novice programmer can write cleanup code that do joining of spawned threads, but forget to close channels before calling to join. It is a subtle bug that hard to find.

Simple Solution Without Waiter

Now we can see how the simple solution with returns of forks and without usage of waiter looks like.

Source code of this implementation can be found here.

Fork Thread

Fork thread is implemented as fork_process function that looks like:

void fork_process(
   so_5::mchain_t fork_ch )
{
   // State of the fork.
   bool taken = false;

   // Receive and handle all messages until the channel will be closed.
   so_5::receive( so_5::from( fork_ch ),
         [&]( so_5::mhood_t<take_t> cmd ) {
            if( taken )
               so_5::send< busy_t >( cmd->m_who );
            else
            {
               taken = true;
               so_5::send< taken_t >( cmd->m_who );
            }
         },
         [&]( so_5::mhood_t<put_t> ) {
            if( taken )
               taken = false;
         } );
}

We can see that it is simpler one than in Dijkstra's solutions. The same picture we saw discussing Actor-based implementations.

Philosopher Thread

Philosopher thread is implemented as philosopher_process function. This function is similar to the one from Dijkstra's solution:

void philosopher_process(
   trace_maker_t & tracer,
   so_5::mchain_t control_ch,
   std::size_t philosopher_index,
   so_5::mbox_t left_fork,
   so_5::mbox_t right_fork,
   int meals_count )
{
   int meals_eaten{ 0 };

   // This flag is necessary for tracing of philosopher actions.
   thinking_type_t thinking_type{ thinking_type_t::normal };

   random_pause_generator_t pause_generator;

   // This channel will be used for replies from forks.
   auto self_ch = so_5::create_mchain( control_ch->environment() );

   while( meals_eaten < meals_count )
   {
      tracer.thinking_started( philosopher_index, thinking_type );

      // Simulate thinking by suspending the thread.
      std::this_thread::sleep_for( pause_generator.think_pause( thinking_type ) );

      // For the case if we can't take forks.
      thinking_type = thinking_type_t::hungry;

      // Try to get the left fork.
      tracer.take_left_attempt( philosopher_index );
      so_5::send< take_t >( left_fork, self_ch->as_mbox(), philosopher_index );

      // Request sent, wait for a reply.
      so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
         []( so_5::mhood_t<busy_t> ) { /* nothing to do */ },
         [&]( so_5::mhood_t<taken_t> ) {
            // Left fork is taken.
            // Try to get the right fork.
            tracer.take_right_attempt( philosopher_index );
            so_5::send< take_t >(
                  right_fork, self_ch->as_mbox(), philosopher_index );

            // Request sent, wait for a reply.
            so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
               []( so_5::mhood_t<busy_t> ) { /* nothing to do */ },
               [&]( so_5::mhood_t<taken_t> ) {
                  // Both fork are taken. We can eat.
                  tracer.eating_started( philosopher_index );

                  // Simulate eating by suspending the thread.
                  std::this_thread::sleep_for( pause_generator.eat_pause() );

                  // One step closer to the end.
                  ++meals_eaten;

                  // Right fork should be returned after eating.
                  so_5::send< put_t >( right_fork );

                  // Next thinking will be normal, not 'hungry_thinking'.
                  thinking_type = thinking_type_t::normal;
               } );

            // Left fork should be returned.
            so_5::send< put_t >( left_fork );
         } );

   }

   // Notify about the completion of the work.
   tracer.philosopher_done( philosopher_index );
   so_5::send< philosopher_done_t >( control_ch, philosopher_index );
}

But there are two important distinctions.

The first one is thinking_type variable. It is used for two purposes: for tracing of philosopher actions and for calculation of sleeping time when philosopher in 'thinking' state.

The second one is handling of busy_t messages. We can see it in calls to receive() functions:

so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
   []( so_5::mhood_t<busy_t> ) { /* nothing to do */ },
   [&]( so_5::mhood_t<taken_t> ) {
      ...
      so_5::receive( so_5::from( self_ch ).handle_n( 1u ),
         []( so_5::mhood_t<busy_t> ) { /* nothing to do */ },
         [&]( so_5::mhood_t<taken_t> ) {...} );

Message handlers for busy_t are empty. But it is normal for that case: there is no more to do when busy_t is received, all necessary actions were performed before call to receive() or will be performed just after return from receive(). The presence of those empty handlers is mandatory because they tell receive() to return control back when busy_t is extracted from channel.

Waiter With Timestamps

There is also a solution with waiter that I want to discuss briefly. Speaking about Actors-based implementation we saw waiter_with_queue solution and I also mentioned waiter_with_timestamps solution. A special trick was used in those implementations: "waiter" entity created additional mboxes those were used as mboxes of "forks", but messages from those mboxes were handled by "waiter". This trick allows to use the same implementation of "philosopher" actor without "fork" actors.

A similar trick is necessary in a CSP-based implementation to allow to reuse already written philosopher_process routine. Can waiter create a set of mchains which will be used by philosophers as channels of "forks" and can waiter then read messages from those mchains?

Unfortunately, no.

There is no problem in creation of several mchains. The problem is in reading from several mchains.

SObjectizer has a select() function that allows to read and handle messages from several mchains, for example:

so_5::select( so_5::from_all(),
   case_(ch1, one_handler_1, one_handler_2, one_handler_3, ...),
   case_(ch2, two_handler_1, two_handler_2, two_handler_3, ...),
   ...);

But the list of "cases" should be known at the compile time. In my implementations of "dining philosopher problem" a list of channels to read will be known only at run-time, so existing select() function can't be used. So we can't use the trick from Actors-based implementation in its source form.

But we can rethink it.

The main problem is: original messages take_t and put_t have no fork's index inside. It means that we should receive those indexes somehow. And if we can't place those indexes inside original take_t and put_t we can have new versions of those messages:

struct extended_take_t final : public so_5::message_t
{
   const so_5::mbox_t m_who;
   const std::size_t m_philosopher_index;
   const std::size_t m_fork_index;

   extended_take_t(
      so_5::mbox_t who,
      std::size_t philosopher_index,
      std::size_t fork_index )
      :  m_who{ std::move(who) }
      ,  m_philosopher_index{ philosopher_index }
      ,  m_fork_index{ fork_index }
   {}
};

struct extended_put_t final : public so_5::message_t
{
   const std::size_t m_fork_index;

   extended_put_t(
      std::size_t fork_index )
      :  m_fork_index{ fork_index }
   {}
};

Note. There is no need to derive messages from so_5::message_t class, but I usually do that because this way has some minor benefits. In that case I used inheritance just to show this way of definition of SObjectizer's messages.

A "waiter" entity can read those messages from a single input channel. So our task is transparently translate original take_t and put_t messages to their extended versions and redirect extended messages to another destination.

To do that we can implement our own mbox type:

class wrapping_mbox_t final : public so_5::extra::mboxes::proxy::simple_t
{
   using base_type_t = so_5::extra::mboxes::proxy::simple_t;

   // Where messages should be delivered.
   const so_5::mbox_t m_target;
   // Index of fork to be placed into extended_take_t and extended_put_t.
   const std::size_t m_fork_index;

   // IDs of types to be intercepted and translated.
   static std::type_index original_take_type;
   static std::type_index original_put_type;

public :
   wrapping_mbox_t(
      const so_5::mbox_t & target,
      std::size_t fork_index )
      :  base_type_t{ target }
      ,  m_target{ target }
      ,  m_fork_index{ fork_index }
   {}

   // This is the main method of so_5::abstract_message_box_t for message
   // delivery. We override it to intercept and translate some types of
   // messages.
   void do_deliver_message(
      const std::type_index & msg_type,
      const so_5::message_ref_t & message,
      unsigned int overlimit_reaction_deep ) const override
   {
      if( original_take_type == msg_type )
      {
         // Take access to the content of original message.
         const auto & original_msg = so_5::message_payload_type<::take_t>::
               payload_reference( *message );
         // Send new message instead of original one.
         so_5::send< extended_take_t >(
               m_target,
               original_msg.m_who,
               original_msg.m_philosopher_index,
               m_fork_index );
      }
      else if( original_put_type == msg_type )
      {
         // Send new message instead of original signal.
         so_5::send< extended_put_t >( m_target, m_fork_index );
      }
      else
         base_type_t::do_deliver_message(
               msg_type,
               message,
               overlimit_reaction_deep );
   }

   // Factory method just for simplicity of wrapping_mbox_t creation.
   static auto make(
      const so_5::mbox_t & target,
      std::size_t fork_index )
   {
      return so_5::mbox_t{ new wrapping_mbox_t{ target, fork_index } };
   }
};

std::type_index wrapping_mbox_t::original_take_type{ typeid(::take_t) };
std::type_index wrapping_mbox_t::original_put_type{ typeid(::put_t) };

A simple way of custom mbox definition is used here: helper wrapper from companion project so_5_extra allows to significantly reduce amount of work implementing proxy mboxes. Without usage of this class I have to derive from so_5::abstract_message_mbox_t directly and to implement several pure virtual methods from base class.

Anyway there is a wrapping_mbox_t now. So we can create a set of wrapping_mbox_t instances -- one for every fork index. Philosophers will receive references to those wrapping_mboxes. Waiter will read incoming messages from a single input channel. And waiter_process function will be as simple as:

void waiter_process(
   so_5::mchain_t waiter_ch,
   details::waiter_logic_t & logic )
{
   // Receive and handle all messages until the channel will be closed.
   so_5::receive( so_5::from( waiter_ch ),
         [&]( so_5::mhood_t<details::extended_take_t> cmd ) {
            logic.on_take_fork( std::move(cmd) );
         },
         [&]( so_5::mhood_t<details::extended_put_t> cmd ) {
            logic.on_put_fork( std::move(cmd) );
         } );
}

Of course all waiter's logic is implemented elsewhere and that code is not as simple, but we won't dive into it. The full source code of waiter_with_timestamps solution can be found here.

Now we can answer the question "why fork's channels are passed to philosopher_process as mboxes?" It is because I created wrapped_mbox_t for waiter_with_timestamps solution.

I could create custom mchain instead of custom mbox. But it is required some more work than creation of custom mbox with help from so_5::extra::mboxes::proxy tools. And because I didn't want to spend a lot of time implementing different solutions for "dining philosopher problem" I decided to use mboxes instead of mchains. Maybe there will be so_5::extra::mchains::proxy in some future version of so_5_extra...

Conclusion

That's all I wanted to tell about my Actors- and CSP-based implementations of "dining philosopher problem". My goal was to show how those solutions can look like. I hope it was interesting for you.

If you have an interest in SObjectizer and want to know more please take a look at our slides about SObjectizer-5.5 and SObjectizer's documentation. SObjectizer itself can be obtained from SourceForge or from GitHub mirror. I want to mention that for last two years SObjectizer's development is based mainly on user's requests. So if you want to see something in SObjectizer just tell us and your feature-request can be fulfilled in some upcoming SObjectizer's release.

That's all, thanks your time. I'll be glad to answer questions in comments.

Комментариев нет:

Отправить комментарий