Execution module

DDS discovery datacentric memory execution serialization platform protocol transport portable system toolchain

Safe DDS provides a set of interfaces to allow the application to implement their own execution model. This allows for integrating Safe DDS into specific execution models such as leveraging the library within threaded environments.

Safe DDS Usage

Note that Safe DDS does not provide any kind of thread locking mechanism. In order to use Safe DDS in a threaded environment, it must be assumed that all provided APIs are non thread safe, and application locking mechanisms shall be implemented to protect every concurrent API call.

The Safe DDS execution model is based on ISpinnable interface, which provides:

  • A non-blocking has_pending_work method that enables the entities to check if they have pending work to be done.

  • A blocking with timeout spin method that enables the entities to perform their internal tasks including transport operations.

By means of these two methods, the entities can be integrated into any kind of execution model, including blocking and non-blocking ones.

ISpinnable interface

API Reference

For more information about execution::ISpinnable interface, check API Reference execution::ISpinnable

In general, most of the entities in Safe DDS are ISpinnable and implement the interface methods.

has_pending_work

This method returns true if the entity has pending work to be done, and false otherwise. It takes no arguments and returns a bool.

get_next_work_timepoint

This method returns TimePoint with next work TimePoint of the spinnable entity.

spin

This method performs the entity internal tasks, including possible transport operations. This method returns a safedds::ReturnCode indicating the result of the spin operation and takes as argument:

  • Timepoint as execution::Timepoint Timeout for the spin operation to be performed.

Default Execution Model

Safe DDS provides a default executor at the BasicExecutor class.

API Reference

For more information about execution::BasicExecutor interface, check API Reference execution::BasicExecutor

This executor can be instantiated using the create_default_executor member function of DomainParticipantFactory, and can be used as follows:

dds::DomainParticipantFactory factory{};

/****************************
* CREATE DDS ENTITIES HERE *
****************************/

// Retrieve the default executor from a DomainParticipantFactory.
execution::ISpinnable* executor = factory.create_default_executor();

if (nullptr == executor)
{
    std::cout << "Error creating default executor" << std::endl;
}

// 10 Hz loop
while (1)
{
    execution::TimePoint next_timepoint =
            get_platform().get_current_timepoint() + execution::TimePeriod::from_ms(100);

    // Spin the executor if there is pending work
    if (executor->has_pending_work())
    {
        executor->spin(next_timepoint);
    }

    // Sleep until next timepoint
    execution::TimePeriod sleep_time =
            next_timepoint - get_platform().get_current_timepoint();

    usleep(sleep_time.to_us());
}

Executor example

A naive implementation of a custom executor could be as follows:

// CustomExecutor that dispatches messages to the participants in order of priority
class CustomExecutor : public execution::ISpinnable, private transport::IMessageObserver
{
public:

    CustomExecutor(
            std::map<dds::BaseDomainParticipant*, uint8_t>& participants)
        : transport_(get_transport())
    {
        // Fill participants_ vector
        for (auto it = participants.begin(); it != participants.end(); ++it)
        {
            participants_.push_back(it->first);
        }

        // Sort participants_ vector by priority, that is the element of the argument map
        std::sort(participants_.begin(), participants_.end(),
                [&participants](
                    dds::BaseDomainParticipant* a,
                    dds::BaseDomainParticipant* b)
                {
                    return participants[a] < participants[b];
                });

        // Add elements to spinnables_ vector in priotity order
        for (auto participant : participants_)
        {
            spinnables_.push_back(participant);

            for (size_t i = 0; i < participant->get_publishers().size(); i++)
            {
                dds::BasePublisher* publisher = participant->get_publishers().at(i);
                spinnables_.push_back(publisher);
            }

            for (size_t i = 0; i < participant->get_subscribers().size(); i++)
            {
                dds::BaseSubscriber* subscriber = participant->get_subscribers().at(i);
                spinnables_.push_back(subscriber);
            }

            for (size_t i = 0; i < participant->get_datareaders().size(); i++)
            {
                dds::BaseDataReader* reader = participant->get_datareaders().at(i);
                spinnables_.push_back(reader);
            }

            for (size_t i = 0; i < participant->get_datawriters().size(); i++)
            {
                dds::BaseDataWriter* writer = participant->get_datawriters().at(i);
                spinnables_.push_back(writer);
            }

            for (size_t i = 0; i < participant->get_topics().size(); i++)
            {
                dds::BaseTopic* topic = participant->get_topics().at(i);
                spinnables_.push_back(topic);
            }
        }

        // Set the executor as the message observer
        transport_.set_message_observer(*this);
    }

    // ISpinnable interface
    ReturnCode spin(
            const execution::TimePoint& tm) noexcept override
    {
        // Spin the transport
        transport_.spin(tm);

        // Spin the participants if there is time left and they have pending work
        for (auto spinnable : spinnables_)
        {
            if (get_platform().get_current_timepoint() > tm)
            {
                break;
            }

            // Spin the actual participant
            if (spinnable->has_pending_work())
            {
                spinnable->spin(tm);
            }
        }

        return ReturnCode::OK;
    }

    bool has_pending_work() const noexcept override
    {
        bool has_pending_work = transport_.has_pending_work();

        for (auto spinnable : spinnables_)
        {
            has_pending_work |= spinnable->has_pending_work();
        }

        return has_pending_work;
    }

    execution::TimePoint get_next_work_timepoint() const noexcept
    {
        execution::TimePoint next_work_timepoint = transport_.get_next_work_timepoint();

        for (auto spinnable : spinnables_)
        {
            next_work_timepoint = execution::TimePoint::min(
                next_work_timepoint, spinnable->get_next_work_timepoint());
        }

        return next_work_timepoint;
    }

    // IMessageObserver interface
    void on_message_received(
            const memory::IConstByteArrayView& msg,
            const transport::Locator& /* reception_locator */) noexcept override
    {
        for (auto participant : participants_)
        {
            size_t processed_bytes = 0;
            participant->process_message(msg, processed_bytes);
        }
    }

private:

    std::vector<dds::BaseDomainParticipant*> participants_;
    std::vector<execution::ISpinnable*> spinnables_;
    transport::ITransport& transport_;
};