суббота, 19 августа 2023 г.

[prog.c++.message-passing] A few words after watching Anthony Williams's "Designing for C++ Concurrency Using Message Passing"

There is Anthony Williams's talk "Designing for C++ Concurrency Using Message Passing" at ACCU 2023. It wasn't an interesting talk for me personally because I didn't find anything new for myself (not surprising, because I have been responsible for the development of a message passing framework for years).

But I would like to add my 2 cents to the topic. I hope my thoughts will help to understand how message passing frameworks can differ from each other. And how such differences can affect the design and writing of software.


First of all entities that receive and process messages (let's call them "actors") can be represented as objects with callbacks or as execution threads.

When actors are objects, the "push" model is used for message delivery: the framework will call a callback from the actor object when there is a message to be processed.

When actors are execution threads (native OS threads or some kind of "green threads" (fibers/stackful coroutines)), the "pull" model is used: an actor periodically calls the "receive" method. If there is a pending message, the "receive" returns it and the actor processes it. If there are no pending messages, the actor is suspended and resumed when a new message arrives.

This difference affects the way we program actors. Actors in the form of execution threads allow writing a more "sequential" code. For example, let's imagine that we have to send command A to an external device, then wait for 0.25s, and then send command B, wait for 0.1s and then read the state of the device. It can be expressed something like this:

void device_controller(mailslot mslot) {
  for(;;) {
    switch(mslot.receive()) {
    case msgs::init_device:
      send_command_A();
      sleep(250ms);
      send_command_B();
      sleep(100ms);
      read_device_state();
      ...
    break;

    case msgs::quit:
      return;

    case ... // Other messages.
    }
  }
}

When actors are objects with callbacks this simple sequence has to be split into several small message handlers:

class device_controller {
  ...
  void on_message(message msg) override {
    switch(msg) {
    case msgs::init_device:
      send_command_A();
      timer.send_after(250ms, msgs::send_command_B, this);
    break;

    case msgs::send_command_B:
      send_command_B();
      timer.send_after(100ms, msgs::read_device_state, this);
    break;

    case msgs::read_device_state:
      read_device_state();
      ...
    break;

    case ... // Other messages.
    }
  }
};

Such splitting of logic can make code much harder to understand and maintain.

On the other hand, actors as objects can be simpler in cases where you have to process different types of incoming messages at the same time. For example, let's say we have to process msgs::current_status and msgs::cancel messages even if we're inside device initialization sequence.

There are other advantages/disadvantages to each approach that unfortunately can't be observed in this post (which was planned as a short one). What I want to emphasize is that you will be dealing with either objects with callbacks or execution threads, and I doubt that there is a message passing framework that supports both approaches as first-class citizens. If you know of a message passing framework that supports both approaches, let me know.


How many sources of messages can we use?

Another important aspect is the source of the messages. Typically, an "actor" has only one queue of incoming messages (which may be called "mail-box", "mail-slot", "message-queue", or something similar).

It's a very simple and straightforward approach that is widely used, especially when actors are objects with callbacks.

But there may be a problem with a single source of messages: let's say there are messages M1, M2, M3, M4 and M5 in the inbound message queue, but your actor is only ready to process M2 and M5. What to do with M1, M3 and M4?

One approach is to simply discard them. It's a very practical approach and it works well in most cases. However, sometimes it may be necessary to keep M1, M3 and M4 in the inbound queue and return to them some time later. This can be solved by approaches like "selective receive" a la Erlang or Akka's stashing.

There can be multiple sources of incoming messages. For example, several message boxes or inbound queues, and the receiver can read messages from any of them. Sometimes the receiver can decide which inbound queue it wants to read at the moment, for example, if an actor is an execution thread:

struct worker_context {
  mailslot commands_;
  mailslot replies_;
  mailslot data_;
};

void demo_actor(worker_context ctx) {
  // Main loop: reading commands.
  for(;;) {
    switch(ctx.commands_.receive()) {
    case msgs::quit:
      return;

    case msgs::turn_on:
    {
      send_turn_on_command();
      // Nested loop: waiting for replies.
      book ack_received = false;
      while(!ack_received) {
        switch(ctx.replies_.receive()) {
          case msgs::ack:
            ack_received = true;
          break;

          case ... // Other messages.
        }
      }
    }
    break;

    case ... // Other messages.
    }
  }
}

The number of sources can affect not only the receivers of messages, but also the senders. For example, if each actor has a single incoming queue, then how to implement 1-to-many interaction? Should a sender perform N sends (one for each recipient)? Or should there be something like a bulletin board where a sender publishes a message and it is automatically delivered to all subscribed receivers?

There are, of course, other aspects, like message priorities, overload control, revoking of messages and so on. Unfortunately, they are out of scope of this post.


How can we associate a handler with a particular message type (or message instance)?

This aspect may not be as important as the previous two, but it can affect the convenience of writing and maintaining message handlers. The simplest way is to use something like this:

void my_actor::on_message(const message & msg) {
  switch(msg.id) {
  case msgs::command_A:
  {
    const auto & cmd_A = dynamic_cast<const command_A &>(msg);
    ... // Working with cmd_A.
  }
  break;

  case msgs::command_B:
  {
    const auto & cmd_B = dynamic_cast<const command_B &>(msg);
    ... // Working with cmd_b.
  }
  break;

  case ... // Other messages.
  }
}

This approach is tedious and, unfortunately, error-prone, especially when maintaining software written years ago by people who have left the project.

It's much better if you can specify handlers for messages you are interested in and the framework automatically calls the appropriate handler when a message arrives. Something like this:

my_actor::my_actor() {
  declare_handler(&my_actor::on_command_A);
  declare_handler(&my_actor::on_command_B);
  ... // Other messages.
}

void my_actor::on_command_A(const command_A & msg) {...}

void my_actor::on_command_B(const command_B & msg) {...}

Sometimes it may be necessary to select a message handler not only by the message type, but also by a message content. In this case a framework can simplify your life if it can do such a selection, however, because C++ doesn't have pattern matching yet, this functionality is rather exotic and may look a bit weird.


How can we maintain the state of an actor?

I think that the worst way to maintain the state of an actor is to use some internal variable with checking its value in every message handler. For example:

class device_controller {
  enum class state {
    initial,
    command_A_sent,
    command_B_sent,
    ready,
    ...
  };
  state state_{state::initial};
  ...
  void on_message(message msg) override {
    switch(msg) {
    case msgs::init_device:
      if(state::initial == state_) {
        send_command_A();
        state_ = state::command_A_sent;
        timer.send_after(250ms, msgs::send_command_B, this);
      }
    break;

    case msgs::send_command_B:
      if(state::command_A_sent == state_) {
        send_command_B();
        state_ = state::command_B_sent;
        timer.send_after(100ms, msgs::read_device_state, this);
      }
    break;

    case msgs::read_device_state:
      if(state::command_B_sent == state_) {
        read_device_state();
        ...
        state_ = state::ready;
      }
    break;

    case ... // Other messages.
    }
  }
};

It's much better if the framework allows you to specify a set of possible states in some declarative way. For example:

class device_controller {
  struct state_initial {};
  struct state_command_A_sent {};
  struct state_command_B_sent {};
  struct state_ready {};
  ...
  void on_init_device() {
    send_command_A();
    change_state(state_command_A_sent{});
  }

  void on_send_command_B() {
    send_command_B();
    change_state(state_command_B_sent{});
  }

  void on_read_device_state() {
    read_device_state();
    ...
    change_state(state_ready{});
  }

public:
  device_controller() {
    define_handler<state_initial>(&device_controller::on_init_device);
    define_handler<state_command_A_sent>(&device_controller::on_send_command_B);
    define_handler<state_command_B_sent>(&device_controller::on_read_device_state);
    define_handler<ready>(...);
    ...
  }
};

Please note that there are several ways to express an actor's state in a more or less declarative form. The example above is just an illustration.


Of course, there are many other aspects related to message passing frameworks (such as support for distributed applications, debugging and unit-testing of actors, storing messages in databases, online monitoring, and so on). It's impossible to touch some of them even briefly in a small blog post. So I've mentioned a few topics I missed in Anthony Williams' talk.

Комментариев нет: