четверг, 26 октября 2023 г.

[prog.c++.sobjectizer] Several aspects of integration with foreign multithreading libraries

During discussion with Marco Arena about a new article from his series "SObjectizer Tales" I was asked:

You can see that gRPC "sync API" manages threading internally. This might be a problem. Speaking of which, I am really interested in your experience on SObjectizer living together with other threading hosts. How do you usually manage things in this situation? Imagine also that gRPC threads are not "exposed" anyhow.

It's a very interesting question, and it's hard to answer in a few words. But I'll try...

Let's imagine a case when we have to use SObjectizer and some other multithreaded library in an application. What kind of problems can we encounter and what can we do with them?

In the following text I'll use the imaginary library "MegaThreads" just for convenience.


The first problem is start/stop order.

Why is it important?

It's because C++ is a language without a Garbage Collector and we can easily get dangling pointers/references.

Let's imagine that we can schedule execution of a lambda function inside MegaThreads and that lambda function may send SObjectizer's messages to SObjectizer's mboxes. Something like that:

int main() {
   // MegaThreads will be started first.
   mega_threads::main_runner mega_runner;
   mega_runner.start();

   // Then SObjectizer will be started.
   so_5::wrapped_env_t sobjectizer;

   // Schedule an action to be run by MegaThread.
   mega_runner.schedule(
     ..., // Some condition
     [&sobjectizer]() {
        // Mbox to be used as the destination.
        const auto dest = sobjectizer.environment().create_mbox("notifications");
        // Send a notification.
        so_5::send<some_important_info>(dest, ...);
     });

   ... // Some more actions.

   // It's time to finish the job. Just return and SObjectizer
   // and MegaThreads will automatically complete their work
   // in the destructors.
   return 0;
}

The problem here is the saved reference to SObjectizer's wrapped_env_t inside the lambda function. This lambda may be called just after completion of wrapped_env_t destruction, but before the destruction of MegaThreads's main_runner. This will lead to UB because of dereference of a dangling reference.

In this simple case the problem may be solved easily: we can just place an instance of wrapped_env_t before an instance of main_runner:

int main() {
   // SObjectizer will be started first.
   so_5::wrapped_env_t sobjectizer;

   // MegaThreads will be started after.
   mega_threads::main_runner mega_runner;
   mega_runner.start();

   ... // As earlier.

   return 0;
}

But there may be more complex cases when SObjectizer's agents hold references to MegaThreads entities, and MegaThreads objects may have references to SObjectizer's mboxes/mchains or something else.

In such cases we have to implement some kind of graceful shutdown before destruction of SObjectizer and MegaThreads instances. For example, we can define a signal `msg_shutdown` that has to be handled by our agents to deregister itself. Something like that:

int main() {
   // SObjectizer will be started first.
   so_5::wrapped_env_t sobjectizer;

   // MegaThreads will be started after.
   mega_threads::main_runner mega_runner;
   mega_runner.start();

   ... // As earlier.

   // It's time to shut down the application.
   // Send a special signal to SObjectizer.
   so_5::send<msg_shutdown>(
      sobjectizer.environment().create_mbox("shutdown_nofity"));

   ... // Wait for the ACK from SObjectizer.
       // This wait may be implemented different ways,
       // just assume that it's possible.

   // Now mega_runner can be destroyed, because there is no
   // references to it inside SObjectizer.

   return 0;
}

Note that it may be not simple to implement such a graceful shutdown, but we won't dig into such details in this post.


The second problem I wanted to discuss is executing agent's methods from outside of the SObjectizer's worker threads.

Let's suppose that MegaThreads dictates us to implement an interface to get important notifications from the MegaThreads. Something like that:

namespace mega_threads {

class notification_listener {
public:
   virtual void handle(const notify *) = 0;
};

}

And we have to receive notifications from MegaThreads by our agent.

There is a temptation to implement this interface by an agent class. Something like:

class my_agent
  : public so_5::agent_t // It's an agent.
  , public mega_threads::notification_listener // It's also a listener.
{
   ...
   void handle(const mega_threads::notify *) override {...}
   ...
   void so_evt_start() override {
      mega_threads::add_listener(*this);
      ...
   }
   void so_evt_finish() override {
      ...
      mega_threads::remove_listener(*this);
   }
};

It may look like a simple solution but there are dragons 🙁

The MegaThreads calls listeners from one of its worker threads. It may lead to a case when SObjectizer invokes an agent event-handler on one thread at the same time when MegaThreads invokes `handle()` on another thread. And it may be a problem because SObjectizer's user usually assumes that an agent works on just one thread and there is no need for protection of the agent's state.

If an agent modifies its state then this modification has to be protected by mutex/spin-lock or something like that.

Even if the agent has such a protection, something like that:

class my_agent
  : public so_5::agent_t // It's an agent.
  , public mega_threads::notification_listener // It's also a listener.
{
   std::mutex m_lock;
   ...
   void handle(const mega_threads::notify * nt) override {
      std::lock_guard l{m_lock};
      ... // Modification of agent's state.
   }
   ...
   void evt_something_happen(mhood_t<some_msg>) {
      std::lock_guard l{m_lock};
      ... // Modification of agent's state.
   }
};

there is another problem: SObjectizer allows some operations (like subscriptions/unsubscriptions and switching the agent's state) only on SObjectizer's worker thread. It means that such an attempt:

class my_agent
  : public so_5::agent_t // It's an agent.
  , public mega_threads::notification_listener // It's also a listener.
{
   std::mutex m_lock;
   state_t st_wait_notification{...};
   state_t st_notified{...};
   ...
   void handle(const mega_threads::notify * nt) override {
      std::lock_guard l{m_lock};
      st_notified.activate(); // (1)
      ...
   }
};

will fail at point (1). SObjectizer will check the ID of the thread on which `st_notified.activate()` is invoked and will see that it is not SObjectizer's thread and an exception will be thrown.

What can we do in such circumstances?

May be the best solution is to make a dispatcher that will integrate with MegaThreads and will allow to bind an agent to one of MegaThreads's thread.

An example of such approach can be seen in Asio's based dispatchers in so5extra companion library (one_thread- and thread_pool-dispatchers).

But not all multithreaded libraries allow such a trick.

So the remaining solution is to make a proxy object that will send messages to an agent:

class my_agent_notification_listener
   : public mega_threads::notification_listener
{
   const so_5::mbox_t m_agent_mbox;
public:
   my_agent_notification_listener(so_5::mbox_t m)
      : m_agent_mbox{std::move(m)}
   {}

   void handle(const mega_threads::notify * nt) override {
      so_5::send<msg_take_notify>(m_agent_mbox, nt);
   }
};

class my_agent : public so_5::agent_t
{
   my_agent_notification_listener m_listener;
public:
   my_agent(context_t ctx)
      : so_5::agent_t{std::move(ctx)}
      , m_listener{so_direct_mbox()}
   {}

   void so_define_agent() override {
      so_subscribe_self()
         // Subscribe actual handler of a notification.
         .event([this](mhood_t<msg_take_notify> cmd) {...})
         ...;
   }
   void so_evt_start() override {
      mega_threads::add_listener(m_listener);
      ...
   }
   void so_evt_finish() override {
      ...
      mega_threads::remove_listener(m_listener);
   }
};

Please note that this example shows an asynchronous interaction between the listener and the agent. It is not always possible, in many cases the agent has to process notification synchronously. In such cases so_5::extra::sync can be used:

struct msg_take_notify {...};
struct msg_notify_result {...};

class my_agent_notification_listener
   : public mega_threads::notification_listener
{
   const so_5::mbox_t m_agent_mbox;
public:
   my_agent_notification_listener(so_5::mbox_t m)
      : m_agent_mbox{std::move(m)}
   {}

   // Let's suppose that handle() has to return a value.
   mega_threads::handling_result handle(
      const mega_threads::notify * nt) override
   {
      auto result = so_5::extra::sync::request_value<msg_take_notify, msg_notify_result>(
         m_agent_mbox, 20s, nt);
      return result.m_handling_result;
   }
};

class my_agent : public so_5::agent_t
{
   my_agent_notification_listener m_listener;
public:
   my_agent(context_t ctx)
      : so_5::agent_t{std::move(ctx)}
      , m_listener{so_direct_mbox()}
   {}

   void so_define_agent() override {
      so_subscribe_self()
         // Subscribe actual handler of a notification.
         .event([this](so_5::extra::sync::mutable_mhood_t<msg_take_notify, msg_notify_result> cmd) {...})
         ...;
   }
   ...
};

There is no need to use so_5::extra::sync, of course. Something similar can be created on top of SObjectizer's mchains or even via std::promise/std::future.


The last aspect is the interaction between SObjectizer agents and other parts of the application.

Fortunately, it's not a problem for most cases.

It's easy to send a message into SObjectizer, because so_5::send() can be used from any worker thread of the application.

SObjectizer's mchains can be used for delivering information from SObjectizer's agents to consumers. Or SObjectizer's agents can use MegaThreads's API to deliver necessary data to appropriate entities inside MegaThreads. Or a user may implement own mbox (or msink) that will transfer information from SObjectizer to MegaThreads transparently...

But it is important to mention two cases where additional care has to be taken.

The first case is the lifetime of data received from a foreign library. Let's see an example from above yet another time:

struct msg_take_notify {
   // NOTE: it's a pointer, not a copy.
   const mega_threads::notify * m_nt;
};

class my_agent_notification_listener
   : public mega_threads::notification_listener
{
   const so_5::mbox_t m_agent_mbox;
public:
   my_agent_notification_listener(so_5::mbox_t m)
      : m_agent_mbox{std::move(m)}
   {}

   void handle(const mega_threads::notify * nt) override {
      // Message will transfer a pointer only, not a copy.
      so_5::send<msg_take_notify>(m_agent_mbox, nt);
   }
};

class my_agent : public so_5::agent_t
{
   my_agent_notification_listener m_listener;
public:
   my_agent(context_t ctx)
      : so_5::agent_t{std::move(ctx)}
      , m_listener{so_direct_mbox()}
   {}

   void so_define_agent() override {
      so_subscribe_self()
         // Subscribe actual handler of a notification.
         .event([this](mhood_t<msg_take_notify> cmd) {
               cmd->m_nt->some_method(); // (1)
            })
         ...;
   }
   void so_evt_start() override {
      mega_threads::add_listener(m_listener);
      ...
   }
   void so_evt_finish() override {
      ...
      mega_threads::remove_listener(m_listener);
   }
};

Because msg_take_notify is handled asynchronously, it's possible that my_agent gets a dangling pointer in msg_take_notify at the point (1). Unfortunately, the invocation of the event handler in my_agent may happen before the completion of my_agent_notification_listener::handle(), and this problem may be hidden from a developer for some time.

The second case can be encountered when some methods of MegaThreads's object may be called from the thread where this object was initialized. Something like that:

class my_agent : public so_5::agent_t {
   mega_threads::renderer & m_renderer;
   ...
   void evt_data_to_render(mhood_t<msg_data> cmd) {
      // NOTE: this call will be made on a SObjectizer's thread.
      m_renderer.render(cmd->data_to_render());
   }
};

int main() {
   // SObjectizer will be started first.
   so_5::wrapped_env_t sobjectizer;

   // MegaThreads will be started after.
   mega_threads::main_runner mega_runner;
   mega_runner.start();

   // Create and initialize the render.
   mega_runner::renderer renderer(mega_runner);
   renderer.initialize(); // Initialization on the main thread.

   // Create an agent that will use renderer.
   sobjectizer.environment().introduce_coop([&](so_5::coop_t & coop) {
         coop->make_agent<my_agent>(renderer, ...);
         ...
      });

   ... // Some work.

   return 0;
}

Objects like `renderer` may bind themselves to the thread on which initialize() was called (by utilizing thread-local storage, for example). When the render() method is called from a different thread an error may occur (and it's good if such error will be detected and reported by MegaThreads, sometimes it just leads to UB). Fortunately, such cases are rare.


I hope this information will be useful and will simplify integration of SObjectizer and other multithreaded libraries in an application. If you have any questions feel free to ask in comments or by opening an issue on GitHub.

I recommend taking a look at Marco's "SObjectizer Tales" series -- it's really well written and touches on various aspects of using SObjectizer with a rather simple, but realistic example.


This post is sponsored 😉 by our little software company "StiffStream", which supports SObjectizer and RESTinio. We (re)invent bicycles for ourselves, can (re)invent them for you 😎

Комментариев нет:

Отправить комментарий