RFC: Zero-configuration Cyphal with named topics

The latest commit adds RPC support to my PoC: https://github.com/pavel-kirienko/cy#rpc

Upon some contemplation, I realized that RPCs do not have to be a separate feature. Their use case can be addressed by enabling peer-to-peer responses to messages published on ordinary topics. Thus, an RPC endpoint is just an ordinary topic, where by convention the application will send a response to each message published on that topic. This can also be exploited to build anycast high-availability RPC endpoints. The details can be seen in the linked section of the README.

This design goes well with the overall theme of removing everything unnecessary from the design, making it as lean as possible. It would appear that at this point, there is nothing left to remove.

(side note: a very similar logic can be used to implement reliable transfers, although it’s not something I’ve given much thought to yet; right now I am focusing on named resources only).

I also made two demo apps: a file server and a file client; you can find them in the repo and run locally. The file server subscribes to file/server (where the namespace is given via command-line arguments), and the client publishes requests to that topic.

File server

#include "cy_udp.h"
#include <time.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <errno.h>
#include <err.h>

static uint64_t random_uid(void)
{
    const uint16_t vid = UINT16_MAX; // This is the reserved public VID.
    const uint16_t pid = (uint16_t)rand();
    const uint32_t iid = (uint32_t)rand();
    return (((uint64_t)vid) << 48U) | (((uint64_t)pid) << 32U) | iid;
}

/// Request schema:
///     uint64      read_offset
///     utf8[<=256] file_path
/// Response schema:
///     uint32      errno
///     byte[<=256] data
void on_file_read_msg(struct cy_subscription_t* const sub,
                      const cy_us_t                   ts,
                      const struct cy_transfer_meta_t transfer,
                      const struct cy_payload_t       payload)
{
    assert(sub != NULL);
    if ((payload.size < 10) || (payload.size > (256 + 2 + 8))) {
        CY_TRACE(sub->topic->cy, "Malformed request: Payload size %zu is invalid", payload.size);
        return;
    }
    assert(payload.data != NULL);

    // Deserialize the payload.
    uint64_t read_offset = 0;
    memcpy(&read_offset, payload.data, 8);
    uint16_t path_len = 0;
    memcpy(&path_len, ((const char*)payload.data) + 8, 2);
    char file_name[257];
    if (path_len > 256) {
        CY_TRACE(sub->topic->cy, "Malformed request: File path length %u is too long", path_len);
        return;
    }
    memcpy(file_name, ((const char*)payload.data) + 10, path_len);
    file_name[path_len] = '\0';

    // Prepare response buffer.
    struct response_t
    {
        uint32_t error;
        uint16_t data_len;
        uint8_t  data[256];
    } response;
    response.data_len = 0;

    // Read the file, 256 bytes max, at the specified offset.
    errno            = 0;
    FILE* const file = fopen(file_name, "rb");
    if ((file != NULL) && (fseek(file, (long)read_offset, SEEK_SET) == 0)) {
        response.data_len = (uint16_t)fread(response.data, 1, 256, file);
    }
    response.error = (uint32_t)errno;
    (void)fclose(file);

    // Send the response.
    CY_TRACE(sub->topic->cy,
             "Responding to file read request: %s, offset %llu, size %u, error %u",
             file_name,
             (unsigned long long)read_offset,
             response.data_len,
             response.error);
    (void)cy_respond(sub->topic, //
                     ts + 1000000,
                     transfer,
                     (struct cy_payload_t){ .data = &response, .size = response.data_len + 6 });
}

/// The only command line argument is the node namespace.
int main(const int argc, char* argv[])
{
    srand((unsigned)time(NULL));

    // SET UP THE NODE.
    struct cy_udp_t cy_udp;
    cy_err_t        res = cy_udp_new(&cy_udp,
                              random_uid(),
                              (argc > 1) ? argv[1] : "~",
                              (uint32_t[3]){ udp_parse_iface_address("127.0.0.1") },
                              CY_NODE_ID_INVALID,
                              1000);
    if (res < 0) {
        errx(res, "cy_udp_new");
    }

    // SET UP THE FILE READ TOPIC.
    struct cy_udp_topic_t topic_file_read;
    res = cy_udp_topic_new(&cy_udp, &topic_file_read, "file/read", NULL);
    if (res < 0) {
        errx(res, "cy_udp_topic_new");
    }
    struct cy_subscription_t sub_file_read;
    res = cy_udp_subscribe(&topic_file_read, &sub_file_read, 1024, CY_TRANSFER_ID_TIMEOUT_DEFAULT_us, on_file_read_msg);
    if (res < 0) {
        errx(res, "cy_udp_subscribe");
    }

    // SPIN THE EVENT LOOP.
    while (1) {
        res = cy_udp_spin_once(&cy_udp);
        if (res < 0) {
            errx(res, "cy_udp_spin_once");
        }
    }

    return 0;
}

File client

#include "cy_udp.h"
#include <time.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <err.h>

#define MEGA 1000000LL

#define RESPONSE_TIMEOUT (3 * MEGA)

static size_t smaller(const size_t a, const size_t b)
{
    return (a < b) ? a : b;
}

struct file_read_request_t
{
    uint64_t read_offset;
    uint16_t path_len;
    char     path[256];
};
struct file_read_response_t
{
    uint32_t error;
    uint16_t data_len;
    uint8_t  data[256];
};

static uint64_t random_uid(void)
{
    const uint16_t vid = UINT16_MAX; // This is the reserved public VID.
    const uint16_t pid = (uint16_t)rand();
    const uint32_t iid = (uint32_t)rand();
    return (((uint64_t)vid) << 48U) | (((uint64_t)pid) << 32U) | iid;
}

/// Command line arguments: namespace, file name.
/// The read file will be written into stdout as-is.
int main(const int argc, char* argv[])
{
    if (argc < 3) {
        fprintf(stderr, "Usage: %s <namespace> <file>\n", argv[0]);
        return 1;
    }
    srand((unsigned)time(NULL));

    // PREPARE THE FILE REQUEST OBJECT.
    struct file_read_request_t req;
    req.read_offset = 0;
    req.path_len    = (uint16_t)strlen(argv[2]);
    if (req.path_len > 256) {
        fprintf(stderr, "File path length %u is too long\n", req.path_len);
        return 1;
    }
    memcpy(req.path, argv[2], req.path_len);

    // SET UP THE NODE.
    struct cy_udp_t cy_udp;
    cy_err_t        res = cy_udp_new(&cy_udp, //
                              random_uid(),
                              argv[1],
                              (uint32_t[3]){ udp_parse_iface_address("127.0.0.1") },
                              CY_NODE_ID_INVALID,
                              1000);
    if (res < 0) {
        errx(res, "cy_udp_new");
    }

    // SET UP THE FILE READ TOPIC.
    struct cy_udp_topic_t topic_file_read;
    res = cy_udp_topic_new(&cy_udp, &topic_file_read, "file/read", NULL);
    if (res < 0) {
        errx(res, "cy_udp_topic_new");
    }

    // WAIT FOR THE NODE TO JOIN THE NETWORK.
    // We consider the node joined when it has a node-ID and there have been no topic conflicts/divergences
    // for some time. This stage can be skipped if we have a configuration hint recovered from nonvolatile storage.
    fprintf(stderr, "Waiting for the node to join the network...\n");
    while (!cy_ready(&cy_udp.base)) {
        res = cy_udp_spin_once(&cy_udp);
        if (res < 0) {
            errx(res, "cy_udp_spin_once");
        }
    }

    // READ THE FILE SEQUENTIALLY.
    while (true) {
        const cy_us_t now = cy_udp_now();

        // Send the request.
        fprintf(stderr, "\nRequesting offset %llu...\n", (unsigned long long)req.read_offset);
        struct cy_response_future_t future;
        res = cy_udp_publish(&topic_file_read,
                             now + MEGA,
                             (struct cy_payload_t){ .size = req.path_len + 10, .data = &req },
                             now + RESPONSE_TIMEOUT,
                             &future);
        if (res < 0) {
            errx(res, "cy_udp_publish");
        }

        // Wait for the response while spinning the event loop.
        // We could do it asynchronously as well, but in this simple application it is easier to do it synchronously.
        // We could also spin the loop in a background thread and use a condition variable to wake up the main thread.
        assert(future.state == cy_future_pending);
        while (future.state == cy_future_pending) {
            res = cy_udp_spin_once(&cy_udp);
            if (res < 0) {
                errx(res, "cy_udp_spin_once");
            }
        }
        if (future.state == cy_future_failure) {
            errx(0, "Request timed out");
        }
        assert(future.state == cy_future_success);

        // Process the next chunk.
        struct file_read_response_t resp;
        memcpy(&resp, future.response_payload.data, smaller(sizeof(resp), future.response_payload.size));
        if (resp.error != 0) {
            errx((int)resp.error, "Remote error");
        }
        if (resp.data_len > 0) {
            fwrite(resp.data, 1, resp.data_len, stdout);
            fflush(stdout);
            req.read_offset += resp.data_len;
        } else {
            fprintf(stderr, "\nFinished transferring %llu bytes\n", (unsigned long long)req.read_offset);
            break;
        }
    }

    return 0;
}

Notes about retained messages

Suppose there is a topic foo that is updated intermittently. We have a low-power node that wakes up briefly infrequently, and when it’s up, it needs to quickly check the latest message published on foo.

Any number of permanently-up nodes that subscribe to foo, or those that publish on it, can store the last foo message locally, and at the same time subscribe to a new topic, let’s call it foo?. The low-power node would wake up and publish an empty request message to foo?; all nodes that subscribe to it will respond with the latest message that they recorded from foo. The solution is decentralized (no single router) and fault-tolerant (as long as at least one node subscribing to foo? is up, the retained messages remain available).

The solution takes a dozen lines of code to implement.