RFC: Zero-configuration Cyphal with named topics

I designed and implemented a simple transport-agnostic extension that works on top of the existing Cyphal transports, adding support for minimalist named topics. I tried not to compromise on Cyphal’s general simplicity and robustness while also avoiding the need for manual configuration. The system is entirely plug-and-play while also supporting instant power-up-and-go, provided that the once automatically established configuration is recovered from non-volatile memory.

Requirements I defined for this work:

  • Discriminate data flows using descriptive string names instead of integer port-IDs.

  • Allow nodes to join the network with zero prior configuration of the protocol (at least above the physical layer).

  • Introduction of new topics and/or nodes must not disturb operation of the existing participants.

  • A fully converged network must offer a service quality at least as good as a statically configured network.

  • The autoconfiguration protocol must be stateless and must not require central coordinators or special nodes.

  • Retain backward compatibility with old Cyphal nodes that do not support named topics.

  • Preferably, the background service traffic should be exchanged at a constant rate to simplify latency and throughput analysis.

  • Once a stable configuration is found, it should be possible to store it in the non-volatile memory per node for instant recovery after power cycling, bypassing the autoconfiguration stage. Obsolete or incorrect per-node configuration should not affect the rest of the network. This ensures that a vehicular network with named topics will perform identically to a fully statically configured one until its configuration is changed.

  • Scalability beyond a thousand of nodes and topics per network.

  • The solution should be implementable in under 1k lines of C without dynamic memory or undue computing costs for small nodes.

Stretch goals:

The PoC is implemented as a very compact C library in less than 1k SLoC, which I call cy. The library is transport-agnostic and thus requires glue logic to bind it with the specific transport library and platform-specific code underneath in a user-friendly way, so I made another 500-SLoC library specifically for Cyphal/UDP on POSIX, which I call cy_udp (actually it should be renamed into cy_udp_posix). The actual user-facing product is therefore cy_udp, while cy is its core. There should be cy_can as well.

Please look at the PoC, read how it works, and perhaps test it locally here:

Distributed consensus protocols are very hard to get right. It is not impossible that there are major design flaws in my solution, so I would very much welcome an in-depth review and criticism.

Impact on the project

Compatibility with existing nodes is generally not affected. A named-topic-capable node can interact with an old node provided that the topic is pinned, meaning that its name is the subject-ID; e.g., /1234. The range of subject-IDs in [0, 6144) is now dedicated for named topic allocation, while the remaining range in [6144, 8192) is reserved for fixed subject-IDs and pinned topics. It is possible to pin topics anywhere, but it is not recommended as there exist edge cases where it may cause allocation collisions – more on this in the readme.

Libudpard/libcanard/libserard will require a slight extension of their APIs to add optional support for node-ID autoconfiguration and topic allocation collision detection. The overall impact is about 100 SLoC per library. The current PoC works with vanilla libudpard but there is a risk of data misinterpretation; more on this in the README.

The heartbeat message is replaced with a new one, suppose it will be called cyphal.node.Heartbeat. It is obviously wire-compatible with the old heartbeat but it adds more data for the distributed consensus algorithm, which piggybacks on the heartbeat message for simplicity. The exchange rate of the heartbeats should also be increased up to about 10 Hz; even though it is not strictly required for the protocol to function, it speeds up the initial configuration stage.

One undesirable but hard-to-avoid side effect is that every node that wishes to participate in the named topic protocol must process heartbeat messages from all other nodes. In a network with a large number of participants this may be burdensome for small MCUs. I attempted to simplify the heartbeat processing pipeline as much as possible. Right now it amounts to just deserializing the message and searching two binary trees, each topic-count elements large. If a reallocation is needed, a few more tree traversals are added. I am not yet certain how it’s going to scale but I am cautiously optimistic.

Next steps

If this first PoC does not uncover any major design flaws, the next step would be to add support for RPC endpoints, which is much easier because it does not require consensus.

Then we could focus on building compact wrapper libraries with very simple API that combine Cy and one of the libxxxards into atomic packages for various systems and protocols: Cyphal/CAN for SocketCAN, Cyphal/CAN for baremetal environment, Cyphal/UDP for POSIX (which is my cy_udp), Cyphal/UDP for baremetal, etc. The goal here is to achieve a significant simplification of the API and a reduction of the entry barrier.

TL;DR

// SET UP LOCAL NODE:
struct cy_udp_t cy_udp;
cy_err_t res = cy_udp_new(&cy_udp,
                          local_unique_id,  // 64-bit composed of VID+PID+IID
                          "/my_namespace",  // topic name prefix (defaults to "/")
                          (uint32_t[3]){ udp_parse_iface_address("127.0.0.1") },
                          CY_NODE_ID_INVALID, // will self-allocate
                          1000);            // tx queue capacity per interface
if (res < 0) { ... }

// JOIN A TOPIC (to publish and/or subscribe).
// To interface with an old node that does not support named topics, put the subject-ID into the topic name;
// e.g., `/1234`. This will bypass the automatic subject-ID allocation and pin the topic as specified.
struct cy_udp_topic_t my_topic;
cy_err_t res = cy_udp_topic_new(&cy_udp,
                                &my_topic,
                                "my_topic",  // expands into "/my_namespace/my_topic"
                                NULL);
if (res < 0) { ... }

// SUBSCRIBE TO TOPIC (nothing needs to be done if we want to publish):
struct cy_subscription_t my_subscription;
cy_err_t res = cy_udp_subscribe(&my_topic,
                                &my_subscription,
                                1024 * 1024,                       // extent (max message size)
                                CY_TRANSFER_ID_TIMEOUT_DEFAULT_us, // going to remove this
                                on_message_received_callback);
if (res < 0) { ... }

// SPIN THE EVENT LOOP
while (true) {
    const cy_err_t err_spin = cy_udp_spin_once(&cy_udp);
    if (err_spin < 0) { ... }

    // PUBLISH MESSAGES (no need to do anything else unlike in the case of subscription)
    // Optionally we can check if the local node has a node-ID. It will automatically appear
    // if not given explicitly at startup in a few seconds. If a collision is discovered,
    // it will briefly disappear and re-appear again a few seconds later.
    if (cy_has_node_id(&cy_udp.base)) {
        char msg[256];
        sprintf(msg, "I am %016llx. time=%lld us", (unsigned long long)cy_udp.base.uid, (long long)now);
        const struct cy_payload_t payload = { .data = msg, .size = strlen(msg) };
        const cy_err_t            pub_res = cy_udp_publish(&my_topic, now + 100000, payload);
        if (pub_res < 0) { ... }
    }
}

Related posts

Now, building a ROS 2 middleware based on Cyphal is entirely within reach. Here is an interesting article on the subject: ROS 2 Over Email: rmw_email, an Actual Working RMW Implementation – Christophe Bédard. Do we have any volunteers?

2 Likes

UTF-8?

Yes, but this proposal seems worthy of discussing as part of a minor version bumb of v1 (v1.1).

Why not separate this into a new message to to allow 1Hz heartbeats where dynamic configuration is not used? Furthermore, we’d want to stop sending these messages after an initial configuration phase, right?

The acceptable character set is currently very conservative and follows DDS — [0-9a-zA-Z_], plus separators /, plus local node replacement ~. We might extend it in the future. The encoding is utf8 for future compatibility even though we’re only using ASCII for now.

Okay, but then we need to release v1.0-beta first?

  1. Every subscription requires memory, so from the standpoint of smaller nodes it is desirable to have one subscription rather than two.
  2. There is some intersection between the fields carried by the heartbeat and the CRDT gossip message; specifically, they both need the 64-bit UID of the sender.
  3. It is computationally cheaper to handle one large message than two smaller messages.

It’s possible, but I don’t want it to be the default behavior. We should be able to repair the network should it become partitioned and then de-partitioned (departitioning may cause conflicts that require CRDT traffic to resolve). Plus I want the network to be continuously introspectable. The gossip messages allow one to observe the topics and how they are used by each participant.

As a follow-up to the call, I want to emphasize that switching to named topics does not increase the network utilization beyond the fixed-rate background CRDT gossip traffic (that sits on top of heartbeats). We certainly do not transmit the topic name with each transfer. The CRDT finds a unique subject-ID for every topic, and each node that publishes or subscribes to a given topic will have a replica of the relevant part of the global mapping table (topic name → subject-ID) stored locally.

Instead of recommending 10 Hz heartbeats, we could keep them at 1 Hz but instead allow unscheduled heartbeats if the CRDT detects a collision (multiple topics on the same subject-ID) or a divergence (multiple subject-IDs on the same topic). This will increase the gossip traffic during the autoconfiguration stage, but then it will drop back down to 1 Hz/node until someone wants to join the network.

To run the example app locally, build it (should work on any POSIX):

git clone https://github.com/pavel-kirienko/cy --recursive
cd cy
mkdir build && cd build && cmake ..
make -j10

And run a few nodes like this, from the build/examples directory:

# Publish on /my_namespace/foo and subscribe to /my_namespace/bar
./udp_node iface=127.0.0.1 ns=/my_namespace pub=foo sub=bar

# Subscribe to /my_namespace/foo and publish to /my_namespace/bar and also /baz
./udp_node iface=127.0.0.1 ns=/my_namespace sub=foo pub=bar pub=/baz

The examples are built with debugging options by default that 1. maximize collisions by forcing each topic to prefer the same subject-ID of zero (which creates 6144 times more collisions compared to an ordinary network, so you can see the scaling effects), and 2. moves the heartbeat traffic to /8191 instead of the default /7509 so that you could subscribe to it using Yakut (PyCyphal does not really allow overriding the heartbeat type so the subject needs to be different for better observability). These options can be disabled here:

You can look at what’s happening in the network using Wireshark with @erik.rainey’s plugins.

I would like to also talk about named RPC endpoints. RPC might look similar to subjects in Cyphal v1.0, but in v1.1, once we add named resources, significant differences will emerge. I would like to share some thoughts on this, but before we do that, I want to ensure that we are all roughly on the same page about the current approach to implementing named topics. The key features of my proposed named topics architecture are that the solution is decentralized, eventually convergent, and under the CAP model it sacrifices consistency for availability and partition tolerance. And, well, per my design intention it has to be under 1k SLoC in C.

The latest commit adds RPC support to my PoC: https://github.com/pavel-kirienko/cy#rpc

Upon some contemplation, I realized that RPCs do not have to be a separate feature. Their use case can be addressed by enabling peer-to-peer responses to messages published on ordinary topics. Thus, an RPC endpoint is just an ordinary topic, where by convention the application will send a response to each message published on that topic. This can also be exploited to build anycast high-availability RPC endpoints. The details can be seen in the linked section of the README.

This design goes well with the overall theme of removing everything unnecessary from the design, making it as lean as possible. It would appear that at this point, there is nothing left to remove.

(side note: a very similar logic can be used to implement reliable transfers, although it’s not something I’ve given much thought to yet; right now I am focusing on named resources only).

I also made two demo apps: a file server and a file client; you can find them in the repo and run locally. The file server subscribes to file/server (where the namespace is given via command-line arguments), and the client publishes requests to that topic.

File server

#include "cy_udp.h"
#include <time.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <errno.h>
#include <err.h>

static uint64_t random_uid(void)
{
    const uint16_t vid = UINT16_MAX; // This is the reserved public VID.
    const uint16_t pid = (uint16_t)rand();
    const uint32_t iid = (uint32_t)rand();
    return (((uint64_t)vid) << 48U) | (((uint64_t)pid) << 32U) | iid;
}

/// Request schema:
///     uint64      read_offset
///     utf8[<=256] file_path
/// Response schema:
///     uint32      errno
///     byte[<=256] data
void on_file_read_msg(struct cy_subscription_t* const sub,
                      const cy_us_t                   ts,
                      const struct cy_transfer_meta_t transfer,
                      const struct cy_payload_t       payload)
{
    assert(sub != NULL);
    if ((payload.size < 10) || (payload.size > (256 + 2 + 8))) {
        CY_TRACE(sub->topic->cy, "Malformed request: Payload size %zu is invalid", payload.size);
        return;
    }
    assert(payload.data != NULL);

    // Deserialize the payload.
    uint64_t read_offset = 0;
    memcpy(&read_offset, payload.data, 8);
    uint16_t path_len = 0;
    memcpy(&path_len, ((const char*)payload.data) + 8, 2);
    char file_name[257];
    if (path_len > 256) {
        CY_TRACE(sub->topic->cy, "Malformed request: File path length %u is too long", path_len);
        return;
    }
    memcpy(file_name, ((const char*)payload.data) + 10, path_len);
    file_name[path_len] = '\0';

    // Prepare response buffer.
    struct response_t
    {
        uint32_t error;
        uint16_t data_len;
        uint8_t  data[256];
    } response;
    response.data_len = 0;

    // Read the file, 256 bytes max, at the specified offset.
    errno            = 0;
    FILE* const file = fopen(file_name, "rb");
    if ((file != NULL) && (fseek(file, (long)read_offset, SEEK_SET) == 0)) {
        response.data_len = (uint16_t)fread(response.data, 1, 256, file);
    }
    response.error = (uint32_t)errno;
    (void)fclose(file);

    // Send the response.
    CY_TRACE(sub->topic->cy,
             "Responding to file read request: %s, offset %llu, size %u, error %u",
             file_name,
             (unsigned long long)read_offset,
             response.data_len,
             response.error);
    (void)cy_respond(sub->topic, //
                     ts + 1000000,
                     transfer,
                     (struct cy_payload_t){ .data = &response, .size = response.data_len + 6 });
}

/// The only command line argument is the node namespace.
int main(const int argc, char* argv[])
{
    srand((unsigned)time(NULL));

    // SET UP THE NODE.
    struct cy_udp_t cy_udp;
    cy_err_t        res = cy_udp_new(&cy_udp,
                              random_uid(),
                              (argc > 1) ? argv[1] : "~",
                              (uint32_t[3]){ udp_parse_iface_address("127.0.0.1") },
                              CY_NODE_ID_INVALID,
                              1000);
    if (res < 0) {
        errx(res, "cy_udp_new");
    }

    // SET UP THE FILE READ TOPIC.
    struct cy_udp_topic_t topic_file_read;
    res = cy_udp_topic_new(&cy_udp, &topic_file_read, "file/read", NULL);
    if (res < 0) {
        errx(res, "cy_udp_topic_new");
    }
    struct cy_subscription_t sub_file_read;
    res = cy_udp_subscribe(&topic_file_read, &sub_file_read, 1024, CY_TRANSFER_ID_TIMEOUT_DEFAULT_us, on_file_read_msg);
    if (res < 0) {
        errx(res, "cy_udp_subscribe");
    }

    // SPIN THE EVENT LOOP.
    while (1) {
        res = cy_udp_spin_once(&cy_udp);
        if (res < 0) {
            errx(res, "cy_udp_spin_once");
        }
    }

    return 0;
}

File client

#include "cy_udp.h"
#include <time.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <err.h>

#define MEGA 1000000LL

#define RESPONSE_TIMEOUT (3 * MEGA)

static size_t smaller(const size_t a, const size_t b)
{
    return (a < b) ? a : b;
}

struct file_read_request_t
{
    uint64_t read_offset;
    uint16_t path_len;
    char     path[256];
};
struct file_read_response_t
{
    uint32_t error;
    uint16_t data_len;
    uint8_t  data[256];
};

static uint64_t random_uid(void)
{
    const uint16_t vid = UINT16_MAX; // This is the reserved public VID.
    const uint16_t pid = (uint16_t)rand();
    const uint32_t iid = (uint32_t)rand();
    return (((uint64_t)vid) << 48U) | (((uint64_t)pid) << 32U) | iid;
}

/// Command line arguments: namespace, file name.
/// The read file will be written into stdout as-is.
int main(const int argc, char* argv[])
{
    if (argc < 3) {
        fprintf(stderr, "Usage: %s <namespace> <file>\n", argv[0]);
        return 1;
    }
    srand((unsigned)time(NULL));

    // PREPARE THE FILE REQUEST OBJECT.
    struct file_read_request_t req;
    req.read_offset = 0;
    req.path_len    = (uint16_t)strlen(argv[2]);
    if (req.path_len > 256) {
        fprintf(stderr, "File path length %u is too long\n", req.path_len);
        return 1;
    }
    memcpy(req.path, argv[2], req.path_len);

    // SET UP THE NODE.
    struct cy_udp_t cy_udp;
    cy_err_t        res = cy_udp_new(&cy_udp, //
                              random_uid(),
                              argv[1],
                              (uint32_t[3]){ udp_parse_iface_address("127.0.0.1") },
                              CY_NODE_ID_INVALID,
                              1000);
    if (res < 0) {
        errx(res, "cy_udp_new");
    }

    // SET UP THE FILE READ TOPIC.
    struct cy_udp_topic_t topic_file_read;
    res = cy_udp_topic_new(&cy_udp, &topic_file_read, "file/read", NULL);
    if (res < 0) {
        errx(res, "cy_udp_topic_new");
    }

    // WAIT FOR THE NODE TO JOIN THE NETWORK.
    // We consider the node joined when it has a node-ID and there have been no topic conflicts/divergences
    // for some time. This stage can be skipped if we have a configuration hint recovered from nonvolatile storage.
    fprintf(stderr, "Waiting for the node to join the network...\n");
    while (!cy_ready(&cy_udp.base)) {
        res = cy_udp_spin_once(&cy_udp);
        if (res < 0) {
            errx(res, "cy_udp_spin_once");
        }
    }

    // READ THE FILE SEQUENTIALLY.
    while (true) {
        const cy_us_t now = cy_udp_now();

        // Send the request.
        fprintf(stderr, "\nRequesting offset %llu...\n", (unsigned long long)req.read_offset);
        struct cy_response_future_t future;
        res = cy_udp_publish(&topic_file_read,
                             now + MEGA,
                             (struct cy_payload_t){ .size = req.path_len + 10, .data = &req },
                             now + RESPONSE_TIMEOUT,
                             &future);
        if (res < 0) {
            errx(res, "cy_udp_publish");
        }

        // Wait for the response while spinning the event loop.
        // We could do it asynchronously as well, but in this simple application it is easier to do it synchronously.
        // We could also spin the loop in a background thread and use a condition variable to wake up the main thread.
        assert(future.state == cy_future_pending);
        while (future.state == cy_future_pending) {
            res = cy_udp_spin_once(&cy_udp);
            if (res < 0) {
                errx(res, "cy_udp_spin_once");
            }
        }
        if (future.state == cy_future_failure) {
            errx(0, "Request timed out");
        }
        assert(future.state == cy_future_success);

        // Process the next chunk.
        struct file_read_response_t resp;
        memcpy(&resp, future.response_payload.data, smaller(sizeof(resp), future.response_payload.size));
        if (resp.error != 0) {
            errx((int)resp.error, "Remote error");
        }
        if (resp.data_len > 0) {
            fwrite(resp.data, 1, resp.data_len, stdout);
            fflush(stdout);
            req.read_offset += resp.data_len;
        } else {
            fprintf(stderr, "\nFinished transferring %llu bytes\n", (unsigned long long)req.read_offset);
            break;
        }
    }

    return 0;
}

Notes about retained messages

Suppose there is a topic foo that is updated intermittently. We have a low-power node that wakes up briefly infrequently, and when it’s up, it needs to quickly check the latest message published on foo.

Any number of permanently-up nodes that subscribe to foo, or those that publish on it, can store the last foo message locally, and at the same time subscribe to a new topic, let’s call it foo?. The low-power node would wake up and publish an empty request message to foo?; all nodes that subscribe to it will respond with the latest message that they recorded from foo. The solution is decentralized (no single router) and fault-tolerant (as long as at least one node subscribing to foo? is up, the retained messages remain available).

The solution takes a dozen lines of code to implement.

If anyone here remembers this old discussion:

Named topics actually resolve ~all dynamic routing issues we’ve been discussing there. I think it should be a no-brainer to build a very simple router node (not a bridge) that plugs to a Cyphal/anything network on one side and some other Cyphal network on the other side. One particular example is linking a local network (in-vehicle or whatever) with the cloud (via Cyphal/serial over TCP as an option). I’m not planning to explore that immediately since I don’t have a use case at hand, but if anyone finds it immediately relevant, we could talk about it.

You talk about arbitration in your README. Can you detail how arbitration is performed, specifically?

Why are you using eviction count? This is normally done using vector clocks. Does that concept not work for this for some reason?

Even more, found this on the interwebs: https://arxiv.org/pdf/1905.13064

1 Like

The eviction count is a Lamport clock; I mentioned this in the code. Vector clocks scale poorly with network size considering that we target systems where dynamic/large memory is undesirable. Lamport clock requires constant memory.

See below

The consensus algorithm relies on two CRDTs.

Topic age counter

This is needed to support an essential condition that newcomers do not disturb an existing network configuration. Without this provision, the network would likely be useless as any new topic or node could introduce a disturbance, potentially causing data loss. The topic age counter is incremented at some rate by all nodes that are members of that topic (meaning that they subscribe and/or publish on it, or at least are interested in potentially doing so at some point in the future). Every time a node broadcasts the metadata of that topic, the local age counter value is included, which allows all other nodes that are interested in this topic to synchronize their counters by simply choosing max(local_counter, remote_counter). The max function trivially satisfies the CRDT requirements as it is commutative, associative, and idempotent.

The age counter may differ by a few counts between participants because the counting never stops, while CRDT only guarantee eventual consistency. As elaborated in the README, nodes use \lfloor \log_2{\text{counter}} \rfloor for ranking topics to eliminate jitter.

Eviction counter (aka local collision Lamport clock)

When a node registers a new topic locally upon request from the application, it doesn’t yet know if this topic is already known to the network, and how it is allocated, so it starts from the default topic configuration: age zero, eviction count zero. The subject-ID of a topic is computed simply as (rapidhash(topic_name) + eviction_counter) % 6144, so the initial subject-ID is just the hash modulo subject-ID space. Subsequently, the topic metadata is gossiped to the network.

In a sparsely populated network, it will likely cause no conflict, allowing the initial configuration to survive. If there were other nodes that attempted to pub/sub on this topic simultaneously with the local node, they will use the same initial subject-ID configuration since they use the same hash and topic name, ensuring that every participant starts out with identical subject-ID mapping even before they are able to communicate. With every passing second and with every received transfer, the topic will age, and with age, it becomes more likely to win arbitration against possible contenders for the same subject-ID.

If the initially proposed subject-ID happened to collide with another topic, the first published gossip message will allow the nodes that know the conflicting topic to notice the collision. When that happens, the collision arbitration will be performed to decide who can keep using this subject-ID and who has to move; naturally, the topic with the smaller age will need to move. This is decided locally by each node that observes the conflict, and since they all share the same metadata for all topics they are interested in, they all will come to the same conclusion independently. When a topic is very young, however, a brief divergence is possible, causing a few ping-pong migrations until the age is settled. Regardless of the outcome, each node observing the conflict will immediately publish the new state, allowing the network to cross-check and synchronize the new state.

If the initially proposed subject-ID happened to differ from the values used by other nodes on the same topic (which would happen if there was a collision with another topic), then each node that is also on this topic will notice the divergence upon receiving this gossip message. In this case, a divergence arbitration will be performed to decide who is right and who has to update their records to ensure that all participants share the same subject-ID. As before, the older topic wins; given the same age (actually log-age, since we compare logarithms instead of raw ages), the topic with a greater eviction count wins because a greater eviction count implies that the topic saw more collisions and thus configurations with any lesser eviction count are non-viable (notice that this only holds if the age is equal; otherwise, a configuration with a greater age but smaller eviction count could still be viable because the greater age could allow the topic to evict the contender from the same subject-ID). The local configuration is NOT altered if (log(local_age) > log(remote_age)) || ((log(local_age) == log(remote_age)) && (local_evictions > remote_evictions); in this case the local metadata is gossiped ASAP to synchronize age and thus help other nodes on this topic win arbitration against contenders. This is the case when the eviction count may go backward, unlike the age counter, which only increments.