How to generate a sample Cyphal transfer using PyCyphal for testing

It is occasionally needed to generate sample traffic for testing Cyphal applications and new Cyphal implementations. This note shows how to do that using PyCyphal.

These snippets rely on undocumented internals because the PyCyphal library was not originally designed for this use case, but later it was found to be convenient. In the future these entities are likely to be promoted to the public API.

Cyphal/CAN

from pycyphal.transport import Priority
from pycyphal.transport.can._identifier import ServiceCANID
from pycyphal.transport.can._session._transfer_sender import serialize_transfer

# Define the transfer payload. It may consist of multiple fragments which will be concatenated.
fragmented_payload = [b'Hello world!']

# Define the CAN ID. Use the MessageCANID to define a message transfer instead.
can_id = ServiceCANID(
    priority=Priority.NOMINAL,
    source_node_id=124,
    destination_node_id=123,
    service_id=456,
    request_not_response=False,
).compile(fragmented_transfer_payload=fragmented_payload)

# Generate Cyphal frames and compile them into CAN frames.
frames = [
    x.compile()
    for x in serialize_transfer(
        compiled_identifier=can_id,
        transfer_id=23,
        fragmented_payload=fragmented_payload,
        max_frame_payload_bytes=7,  # 7 for Classic CAN, 63 for CAN FD.
    )
]

print(frames)

Cyphal/UDP

from pycyphal.transport.commons.high_overhead_transport import serialize_transfer
from pycyphal.transport import Priority, Timestamp, MessageDataSpecifier, ServiceDataSpecifier
from pycyphal.transport.udp._frame import UDPFrame

priority = Priority.NOMINAL
transfer_id = 123456789
data_specifier = MessageDataSpecifier(
    subject_id=1234,
)
source_node_id = 2345       # None means anonymous
destination_node_id = None  # None means broadcast
fragmented_payload = [memoryview(b'Hello world!')]
mtu = 1408

def construct_frame(index: int, end_of_transfer: bool, payload: memoryview) -> UDPFrame:
    return UDPFrame(
        priority=priority,
        source_node_id=source_node_id,
        destination_node_id=destination_node_id,
        data_specifier=data_specifier,
        transfer_id=transfer_id,
        index=index,
        end_of_transfer=end_of_transfer,
        user_data=0,
        payload=payload,
    )

datagram_headers_and_payloads = [
    tuple(map(bytes, fr.compile_header_and_payload()))
    for fr in serialize_transfer(fragmented_payload, mtu, construct_frame)
]
print(datagram_headers_and_payloads)

# To obtain the final datagram payload, concatenate the header with the frame payload:
#print(list(map(b''.join, datagram_headers_and_payloads)))
2 Likes

How to generate the serialized representation of a DSDL object

No need to rely on any undocumented API entities.

# The import from PyCyphal has to precede any DSDL imports because it installs the auto-compilation import hook.
from pycyphal.dsdl import serialize

# Import the DSDL type of interest.
from uavcan.file import Read_1

# Construct the DSDL object we're going to serialize.
obj = Read_1.Response()
obj.data.value = b'Hello world!'

# Serialize the object.
print(b''.join(serialize(obj)))

To deserialize an existing serialized representation:

# The import from PyCyphal has to precede any DSDL imports because it installs the auto-compilation import hook.
from pycyphal.dsdl import deserialize

# Import the DSDL type of interest.
from uavcan.file import Read_1

# The serialized representation to deserialize (may be fragmented):
data = [
    b'\x00\x00\x0c\x00Hello world!',
]

# Deserialize the object.
obj = deserialize(Read_1.Response, data)
print(obj)

# You can also make a JSON out of it if you're so inclined:
from pycyphal.dsdl import to_builtin
import json
print(json.dumps(to_builtin(obj)))

This is incredibly useful for testing purposes. Thank you for creating this helpful example!

1 Like