It is occasionally needed to generate sample traffic for testing Cyphal applications and new Cyphal implementations. This note shows how to do that using PyCyphal.
These snippets rely on undocumented internals because the PyCyphal library was not originally designed for this use case, but later it was found to be convenient. In the future these entities are likely to be promoted to the public API.
Cyphal/CAN
from pycyphal.transport import Priority
from pycyphal.transport.can._identifier import ServiceCANID
from pycyphal.transport.can._session._transfer_sender import serialize_transfer
# Define the transfer payload. It may consist of multiple fragments which will be concatenated.
fragmented_payload = [b'Hello world!']
# Define the CAN ID. Use the MessageCANID to define a message transfer instead.
can_id = ServiceCANID(
priority=Priority.NOMINAL,
source_node_id=124,
destination_node_id=123,
service_id=456,
request_not_response=False,
).compile(fragmented_transfer_payload=fragmented_payload)
# Generate Cyphal frames and compile them into CAN frames.
frames = [
x.compile()
for x in serialize_transfer(
compiled_identifier=can_id,
transfer_id=23,
fragmented_payload=fragmented_payload,
max_frame_payload_bytes=7, # 7 for Classic CAN, 63 for CAN FD.
)
]
print(frames)
Cyphal/UDP
from pycyphal.transport.commons.high_overhead_transport import serialize_transfer
from pycyphal.transport import Priority, Timestamp, MessageDataSpecifier, ServiceDataSpecifier
from pycyphal.transport.udp._frame import UDPFrame
priority = Priority.NOMINAL
transfer_id = 123456789
data_specifier = MessageDataSpecifier(
subject_id=1234,
)
source_node_id = 2345 # None means anonymous
destination_node_id = None # None means broadcast
fragmented_payload = [memoryview(b'Hello world!')]
mtu = 1408
def construct_frame(index: int, end_of_transfer: bool, payload: memoryview) -> UDPFrame:
return UDPFrame(
priority=priority,
source_node_id=source_node_id,
destination_node_id=destination_node_id,
data_specifier=data_specifier,
transfer_id=transfer_id,
index=index,
end_of_transfer=end_of_transfer,
user_data=0,
payload=payload,
)
datagram_headers_and_payloads = [
tuple(map(bytes, fr.compile_header_and_payload()))
for fr in serialize_transfer(fragmented_payload, mtu, construct_frame)
]
print(datagram_headers_and_payloads)
# To obtain the final datagram payload, concatenate the header with the frame payload:
#print(list(map(b''.join, datagram_headers_and_payloads)))