from uavcan.register import Access_1, Name_1
acc = local_node.make_client(Access_1, remote_node_id) # Create a client
request = Access_1.Request(name=Name_1(reg_name), value=reg_value) # Create request object
response = await self._acc(request) # Send request, wait for response
if response is None:
raise RuntimeError("Oops, request has timed out")
print("The register value is:", response)
If you need something sophisticated:
# License: CC0 public domain dedication
# No warranty
from __future__ import annotations
import re
import collections.abc
from typing import Iterator
from loguru import logger
import pycyphal
import pycyphal.transport.redundant
import pycyphal.transport.can
import pycyphal.transport.can.media.socketcan
import pycyphal.application
import pycyphal.application.node_tracker
from pycyphal.application.register import ValueProxyWithFlags
import uavcan.node
import uavcan.register
class RegisterProxy(collections.abc.Mapping[str, pycyphal.application.register.ValueProxyWithFlags]):
_VALID_PAT = re.compile(r"^[a-z_]+(\.\w+)+[<=>]?$")
def __init__(self, local_node: pycyphal.application.Node, remote_node_id: int):
self._cache: dict[str, ValueProxyWithFlags] = {}
self._acc = local_node.make_client(uavcan.register.Access_1, remote_node_id)
self._list = local_node.make_client(uavcan.register.List_1, remote_node_id)
@property
def remote_node_id(self) -> int:
return int(self._acc.output_transport_session.destination_node_id)
@staticmethod
async def new(local_node: pycyphal.application.Node, remote_node_id: int) -> RegisterProxy:
rp = RegisterProxy(local_node, remote_node_id)
await rp.reload()
return rp
async def reload(self) -> None:
"""
Invalidate the local cache and fetch all registers from the remote node anew.
This is basically equivalent to creating a new instance.
"""
logger.debug("{!r}: Reloading", self)
names: set[str] = set()
for idx in range(2**16):
req = uavcan.register.List_1.Request(idx)
resp_meta = await self._list.call(req)
if not resp_meta:
raise pycyphal.application.NetworkTimeoutError(f"Register list has timed out: {req}")
resp, _ = resp_meta
assert isinstance(resp, uavcan.register.List_1.Response)
if name := resp.name.name.tobytes().decode():
assert self._VALID_PAT.match(name), f"Invalid register name: {name!r}"
names.add(name)
else:
break
logger.debug("{!r}: Fetched names: {}", self, names)
self._cache.clear()
for n in names:
await self.read_through(n)
logger.trace("{!r}: Fetched registers: {}", self, self._cache)
assert names == set(self._cache.keys())
async def read_through(self, name: str) -> ValueProxyWithFlags:
"""
Fetch the register from the remote node and update the local cache.
Returns empty if no such register, in which case the cache is not altered.
Raises pycyphal.application.NetworkTimeoutError on timeout.
"""
return await self.write_through(name, pycyphal.application.register.Value())
async def write_through(self, name: str, value: pycyphal.application.register.RelaxedValue) -> ValueProxyWithFlags:
"""
Write the specified register on the remote node, update the cache with the response.
Returns empty if no such register, in which case the cache is not altered.
Raises pycyphal.application.NetworkTimeoutError on timeout.
If the value is empty, this is equivalent to a read operation.
"""
v = pycyphal.application.register.ValueProxy(value).value
req = uavcan.register.Access_1.Request(name=uavcan.register.Name_1(name), value=v)
resp = await self._acc(req)
if not resp:
raise pycyphal.application.NetworkTimeoutError(f"Register access has timed out: {req}")
assert isinstance(resp, uavcan.register.Access_1.Response)
res = ValueProxyWithFlags(resp.value, resp.mutable, resp.persistent)
if not res.value.empty:
self._cache[name] = res
logger.trace("{!r}: {!r} <- {!r}", self, name, res)
return res
def __getitem__(self, k: str) -> ValueProxyWithFlags:
"""Obtain item from the cache (does not access the network). The value may be obsolete."""
return self._cache[k]
def __len__(self) -> int:
return len(self._cache)
def __iter__(self) -> Iterator[str]:
"""Iterate the cached values, which may be obsolete (does not access the network)."""
return iter(self._cache)
def __repr__(self) -> str:
return str(
pycyphal.util.repr_attributes(
self, remote_node_id=self._acc.output_transport_session.destination_node_id, size=len(self._cache)
)
)
def __str__(self) -> str:
if len(self) == 0:
return ""
max_name_len = max(map(len, self.keys()))
return "\n".join(
"\t".join(
[
k.ljust(max_name_len),
("immutab", "mutable")[v.mutable],
("volatil", "persist")[v.persistent],
str(pycyphal.dsdl.to_builtin(v.value)),
]
)
for k, v in sorted(self.items())
)
There is also a collection of useful entities for dealing with registers in Yakut:
This is slightly offtopic, but your example looks concerning because this is not how you configure subject-IDs. Ports are named simply like this_is_your_port and the name is embedded into:
uavcan\.(pub|sub|srv|cln)\.PORT_NAME\.(id|type)
This is documented here:
Based on what I see, your register should be renamed roughly into uavcan.srv.th_get_data_and_metadata.id, and there also needs to be a uavcan.srv.th_get_data_and_metadata.type that contains the name of the data type.
It is also a design mistake to expose the name of the data type in the register name because these are unrelated entities. More on this in the Guide, look for “syntax-semantic separation”.