Transport module

DDS discovery datacentric memory execution serialization platform protocol transport portable system toolchain

The Transport module is responsible for interfacing with the underlying transport system. This module provides the following interfaces:

The transport module is related to the following layers:

Default transport

If Safe DDS has been built with any built-in transport support using the SAFEDDS_TRANSPORT (see CMake options), the default transport can be retrieved using the system-wide get_transport method. Analogously, a system-wide default transport can be set using the set_transport method. This API will be used internally by the DomainParticipantFactory to instantiate DomainParticipant entities.

// Retrieve the default transport (depends on the transport implementation built).
transport::ITransport& transport = get_transport();

// Setting a default transport at runtime.
CustomTransport custom_transport;
set_transport(custom_transport);

Safe DDS Usage

Provided transport implementations use the transport::PreallocGUIDLocatorDatabse class to store and handle discovered locators.

ITransport interface

API Reference

For more information about transport::ITransport interface, check API Reference:

In general, transport::ITransport allows for sending messages to a certain destination and for listening to incoming messages:

  1. In order to send a message the commit_message method will be used.

  2. In order to listen to messages, the Safe DDS transport interface provides two different approaches:

    • listen_message method allows the transport to listen to incoming messages for a given period of time.

    • The transport::ITransport implements ISpinnable interface, allowing the incoming messages reception to be performed in a non-blocking way using the Safe DDS Execution module.

  3. Additionally, transport::ITransport provides methods for:

    • Notifying newly discovered locators in order to store the relationship between entity identifiers and transport locators via the on_locator_discovered method.

    • Retrieving the maximum size of a message that can be sent by the transport via the get_message_max_size method.

    • Listening on a certain locator via the listen_on_locator method.

    • Setting message observers for incoming messages to be called when a new message is received (and the transport implements ISpinnable interface) via the set_message_observer method.

get_message_max_size

This method is invoked by Safe DDS layers to retrieve the maximum size of a message that the transport can send. It has no parameters and returns the maximum size of a message as size_t.

API Reference

For more information about transport::ITransport::get_message_max_size method, check API Reference:

commit_message

This method is invoked when an entity has a new message ready to be sent. The transport implementation must send the message (provided as memory::IInsertableList<MessageElement>) to the destination protocol::GUID.

This callback receives the following parameters:

  • Message buffer as memory::IInsertableList<MessageElement>

    This object contains one or more memory segments that should be sent to the destination as a single message.

  • Origin as protocol::GUID

    GUID of the entity that is sending the message.

  • Destination as protocol::GUID

    GUID of the entity that shall receive the message.

  • Timepoint as execution::Timepoint

    Timeout for the message to be sent.

API Reference

For more information about transport::ITransport::commit_message method, check API Reference:

set_message_observer

This method allows the transport to receive a transport::IMessageObserver that will be notified when a new message is received in the spin operation of the transport. These message observers are intended to be notified about the reception of a message using the ISpinnable interface.

API Reference

For more information about transport::ITransport::set_message_observer method, check API Reference:

listen_on_locator

This method is invoked by Safe DDS layers for the transport to start listening on a specific locator. This callback returns a safedds::ReturnCode and receives the following parameters:

  • Locator as transport::Locator

    Locator to listen on.

API Reference

For more information about transport::ITransport::listen_on_locator method, check API Reference:

listen_on_first_available_locator

This method is invoked by Safe DDS layers to get a listen locator when the DomainParticipantWireProtocolQosPolicy::announced_locator is not configured. This method shall return a valid transport::Locator which is already configured to listen to messages.

API Reference

For more information about transport::ITransport::listen_on_first_available_locator method, check API Reference:

listen_message

This method enables the transport to listen for incoming messages in any of the locators that have been previously registered with the listen_on_locator method during a give period of time. This callback returns a transport::Locator and receives the following parameters:

  • Incoming message buffer as memory::IMutableByteArrayView

    This object contains one memory segment that shall be filled with the received message.

  • Reception locator as transport::Locator

    Output parameter with the Locator where the message has been received.

  • Timepoint as execution::Timepoint

    Timeout for the message to be received.

API Reference

For more information about transport::ITransport::listen_on_locator method, check API Reference:

on_locator_discovered

This method is invoked when a new locator is identified, possibly by the Discovery module. The transport implementation may require to store the provided protocol::GUID and transport::Locator for future use.

The stored information can be useful when implementing the commit_message method, as this method will provide origin and destination locators of a certain message.

This callback receives the following parameters:

  • Discovered entity GUID as protocol::GUID

    GUID of the entity that has been discovered.

  • Metatraffic flag as bool

    Indicates whether the locator belongs to metatraffic.

  • Discovered entity locator as transport::Locator

    Locator of the entity that has been discovered.

  • Discoverer entity GUIDPrefix as protocol::GUIDPrefix

    GUIDPrefix of the entity that discovered the locator.

API Reference

For more information about transport::ITransport::on_locator_discovered method, check API Reference:

on_locators_unregistered

This method is invoked when a previously discovered locator is removed, possibly by the Discovery module. The transport implementation may require to remove the stored protocol::GUID. This callback receives the following parameters:

  • Unregistered entity GUID as protocol::GUID

    GUID of the entity that has been unregistered.

  • Discoverer entity GUIDPrefix as protocol::GUIDPrefix

    GUIDPrefix of the entity that received the unregister event.

API Reference

For more information about transport::ITransport::on_locators_unregistered method, check API Reference:

Example implementation

Note

For complete transport implementation, please refer to the POSIX UDPv4 transport implementation in Safe DDS code base.

A naive Safe DDS transport implementation that sends messages to a remote host using an abstract transport is provided in the following example:

class CustomTransport : public transport::ITransport
{
    void commit_message(
            memory::IInsertableList<protocol::MessageElement>& view_list,
            const protocol::GUID& /* origin_guid */,
            const protocol::GUID& receiver_guid,
            const execution::TimePoint& /* timepoint */) noexcept override
    {
        // Store the message in a continuous buffer.
        size_t buffer_size = 0;
        for (size_t i = 0; i < view_list.size(); i++)
        {
            buffer_size += view_list.at(i)->const_size();
        }

        uint8_t* buffer = new uint8_t[buffer_size];
        size_t buffer_offset = 0;

        for (size_t i = 0; i < view_list.size(); i++)
        {
            memory::IConstByteArrayView* view = view_list.at(i);
            memcpy(&buffer[buffer_offset], view->const_data(), view->const_size());
            buffer_offset += view->const_size();
        }

        memory::byte_array::ByteArrayView view = {buffer, buffer_offset};

        // Retrieve the locator from the remote entities map.
        transport::Locator locator = remote_entities_[receiver_guid];

        // Transport specific function to send the message to the locator [NOT PROVIDED]
        custom_send_message(view, locator);

        delete [] buffer;
    }

    size_t get_message_max_size() const noexcept override
    {
        // This transport allows sending messages of maximum 5000 bytes.
        return 5000;
    }

    ReturnCode set_message_observer(
            transport::IMessageObserver& observer) noexcept override
    {
        // Override the local message observer.
        observer_ = &observer;
        return ReturnCode::OK;
    }

    void on_locator_discovered(
            const protocol::GUID& guid,
            bool /* is_metatraffic */,
            const transport::Locator& locator,
            const protocol::GUIDPrefix& /* discoverer */) noexcept override
    {
        // Store the discovered locator.
        remote_entities_[guid] = locator;
    }

    void on_locators_unregistered(
            const protocol::GUID& guid,
            const protocol::GUIDPrefix& /* discoverer */) noexcept override
    {
        // Remove the locator.
        remote_entities_.erase(guid);
    }

    ReturnCode listen_on_locator(
            const transport::Locator& reception_locator) noexcept override
    {
        // Transport specific function to listen on a given locator [NOT PROVIDED]
        custom_start_listening(reception_locator);

        return ReturnCode::OK;
    }

    transport::Locator listen_on_first_available_locator() noexcept override
    {
        // Return hardcoded default locator
        transport::Locator default_listen_locator = transport::Locator::from_ipv4({127, 0, 0, 1}, 8000);
        listen_on_locator(default_listen_locator);
        return default_listen_locator;
    }

    ReturnCode spin(
            const execution::TimePoint& tm) noexcept override
    {
        // Listen message and notify the observer if exists.
        if (observer_ != nullptr)
        {
            transport::Locator locator = {};
            uint8_t* buffer = new uint8_t[10000];
            memory::byte_array::ByteArrayView view = {buffer, 10000};

            ReturnCode code = listen_message(view, locator, tm);

            if (code == ReturnCode::OK)
            {
                // Received message contained in view from locator.
                // Notify the observer.
                observer_->on_message_received(view, locator);
            }

            delete [] buffer;
        }

        return ReturnCode::OK;
    }

    bool has_pending_work() const noexcept override
    {
        // Transport specific function to check whether there is data in
        // the transport [NOT PROVIDED]
        return custom_transport_has_data();
    }

    execution::TimePoint get_next_work_timepoint() const noexcept override
    {
        return has_pending_work() ? execution::TIME_ZERO: execution::TIME_INFINITE;
    }

    ReturnCode listen_message(
            memory::IMutableByteArrayView& msg,
            transport::Locator& reception_locator,
            const execution::TimePoint& timepoint) noexcept override
    {
        execution::TimePeriod wait_time =
                timepoint - get_platform().get_current_timepoint();
        size_t milliseconds_to_wait =
                wait_time.seconds * 1000 + wait_time.nanoseconds / 1000000;

        // Transport specific function to listen for a message for a given
        // period of time [NOT PROVIDED]
        reception_locator = custom_listen_message(msg, milliseconds_to_wait);

        return msg.size() > 0 ? ReturnCode::OK : ReturnCode::TRANSPORT_TIMEOUT;
    }

private:

    // Local message observer
    transport::IMessageObserver* observer_{nullptr};

    // Locator storage
    std::map<protocol::GUID, transport::Locator> remote_entities_;
};