Python API to configure the subject-IDs/service-IDs/registers

Any suggestions for the python API to use to configure the subject-IDs/service-IDs/registers of an other node?
Like yakut command:

y r 125 reg.rmap.service.module.TH.GetDataAndMetadata.1.0.id 150

I need to terminate the configuration of a node after node-ID assign by pug&play:

    with make_node(NodeInfo(name="org.opencyphal.pycyphal.demo.master"), "master.db") as node:
        CentralizedAllocator(node)

Bare bones:

from uavcan.register import Access_1, Name_1
acc = local_node.make_client(Access_1, remote_node_id)              # Create a client
request = Access_1.Request(name=Name_1(reg_name), value=reg_value)  # Create request object
response = await self._acc(request)                                 # Send request, wait for response
if response is None:
    raise RuntimeError("Oops, request has timed out")
print("The register value is:", response)

If you need something sophisticated:

# License: CC0 public domain dedication
# No warranty

from __future__ import annotations
import re
import collections.abc
from typing import Iterator
from loguru import logger
import pycyphal
import pycyphal.transport.redundant
import pycyphal.transport.can
import pycyphal.transport.can.media.socketcan
import pycyphal.application
import pycyphal.application.node_tracker
from pycyphal.application.register import ValueProxyWithFlags
import uavcan.node
import uavcan.register


class RegisterProxy(collections.abc.Mapping[str, pycyphal.application.register.ValueProxyWithFlags]):
    _VALID_PAT = re.compile(r"^[a-z_]+(\.\w+)+[<=>]?$")

    def __init__(self, local_node: pycyphal.application.Node, remote_node_id: int):
        self._cache: dict[str, ValueProxyWithFlags] = {}
        self._acc = local_node.make_client(uavcan.register.Access_1, remote_node_id)
        self._list = local_node.make_client(uavcan.register.List_1, remote_node_id)

    @property
    def remote_node_id(self) -> int:
        return int(self._acc.output_transport_session.destination_node_id)

    @staticmethod
    async def new(local_node: pycyphal.application.Node, remote_node_id: int) -> RegisterProxy:
        rp = RegisterProxy(local_node, remote_node_id)
        await rp.reload()
        return rp

    async def reload(self) -> None:
        """
        Invalidate the local cache and fetch all registers from the remote node anew.
        This is basically equivalent to creating a new instance.
        """
        logger.debug("{!r}: Reloading", self)
        names: set[str] = set()
        for idx in range(2**16):
            req = uavcan.register.List_1.Request(idx)
            resp_meta = await self._list.call(req)
            if not resp_meta:
                raise pycyphal.application.NetworkTimeoutError(f"Register list has timed out: {req}")
            resp, _ = resp_meta
            assert isinstance(resp, uavcan.register.List_1.Response)
            if name := resp.name.name.tobytes().decode():
                assert self._VALID_PAT.match(name), f"Invalid register name: {name!r}"
                names.add(name)
            else:
                break
        logger.debug("{!r}: Fetched names: {}", self, names)
        self._cache.clear()
        for n in names:
            await self.read_through(n)
        logger.trace("{!r}: Fetched registers: {}", self, self._cache)
        assert names == set(self._cache.keys())

    async def read_through(self, name: str) -> ValueProxyWithFlags:
        """
        Fetch the register from the remote node and update the local cache.
        Returns empty if no such register, in which case the cache is not altered.
        Raises pycyphal.application.NetworkTimeoutError on timeout.
        """
        return await self.write_through(name, pycyphal.application.register.Value())

    async def write_through(self, name: str, value: pycyphal.application.register.RelaxedValue) -> ValueProxyWithFlags:
        """
        Write the specified register on the remote node, update the cache with the response.
        Returns empty if no such register, in which case the cache is not altered.
        Raises pycyphal.application.NetworkTimeoutError on timeout.
        If the value is empty, this is equivalent to a read operation.
        """
        v = pycyphal.application.register.ValueProxy(value).value
        req = uavcan.register.Access_1.Request(name=uavcan.register.Name_1(name), value=v)
        resp = await self._acc(req)
        if not resp:
            raise pycyphal.application.NetworkTimeoutError(f"Register access has timed out: {req}")
        assert isinstance(resp, uavcan.register.Access_1.Response)
        res = ValueProxyWithFlags(resp.value, resp.mutable, resp.persistent)
        if not res.value.empty:
            self._cache[name] = res
            logger.trace("{!r}: {!r} <- {!r}", self, name, res)
        return res

    def __getitem__(self, k: str) -> ValueProxyWithFlags:
        """Obtain item from the cache (does not access the network). The value may be obsolete."""
        return self._cache[k]

    def __len__(self) -> int:
        return len(self._cache)

    def __iter__(self) -> Iterator[str]:
        """Iterate the cached values, which may be obsolete (does not access the network)."""
        return iter(self._cache)

    def __repr__(self) -> str:
        return str(
            pycyphal.util.repr_attributes(
                self, remote_node_id=self._acc.output_transport_session.destination_node_id, size=len(self._cache)
            )
        )

    def __str__(self) -> str:
        if len(self) == 0:
            return ""
        max_name_len = max(map(len, self.keys()))
        return "\n".join(
            "\t".join(
                [
                    k.ljust(max_name_len),
                    ("immutab", "mutable")[v.mutable],
                    ("volatil", "persist")[v.persistent],
                    str(pycyphal.dsdl.to_builtin(v.value)),
                ]
            )
            for k, v in sorted(self.items())
        )

There is also a collection of useful entities for dealing with registers in Yakut:

This is slightly offtopic, but your example looks concerning because this is not how you configure subject-IDs. Ports are named simply like this_is_your_port and the name is embedded into:

uavcan\.(pub|sub|srv|cln)\.PORT_NAME\.(id|type)

This is documented here:

Based on what I see, your register should be renamed roughly into uavcan.srv.th_get_data_and_metadata.id, and there also needs to be a uavcan.srv.th_get_data_and_metadata.type that contains the name of the data type.

It is also a design mistake to expose the name of the data type in the register name because these are unrelated entities. More on this in the Guide, look for “syntax-semantic separation”.

Last but not least, please do not put your custom data types into the reg namespace. Use your own, vendor-specific namespace instead.

Thanks a lot !
Great support

1 Like