Our first ROS2 UAVCAN (PyCyphal) publisher and service nodes

Hello everyone,
we are working in interfacing some UAVCAN nodes to our ROS2 system. Doing this we created two scripts, one that subscribes to a PORT-ID and republishes the information in a topic for each of the nodes that publish in the PORT-I. And the other script interfaces a ROS2 service with a UAVCAN command request. The scripts rely on a third node that republishes the heartbeat from UAVCAN to a ROS2 topic.

Neither of the scripts fully comply with the NODE-ID and the PORT-ID (and maybe other bits) but we guessed that would b good to share them so that someone could benefit from them and/or get some pointers in ways to make the scripts more efficient/optimal.

Script #1 ROS2 Heartbeat interface

#!/usr/bin/env python3
import sys
compiled_dsdl_dir = "/home/pi/.yakut"

sys.path.insert(0,str(compiled_dsdl_dir))
try:
    import pyuavcan.application
except:
    print("pyuavcan application not found")

import uavcan.node  # noqa

import rclpy
from rclpy.node import Node
from mk4_msgs.msg import Heartbeat

import getmac
import asyncio
import typing
from dataclasses import dataclass

import pyuavcan
from pyuavcan.transport.can.media.socketcan import SocketCANMedia
from pyuavcan.transport.can import CANTransport
from pyuavcan.presentation import Presentation
from pyuavcan.application._node_factory import SimpleNode
from pyuavcan.application._registry_factory import make_registry
from pyuavcan.application.node_tracker import NodeTracker, Entry

class UavcanNodeTracker(Node):
    HEALTH_NOMINAL          = 0
    HEALTH_ADVISORY         = 1
    HEALTH_CAUTION          = 2
    HEALTH_WARNING          = 3

    MODE_OPERATIONAL        = 0
    MODE_INITIALISATION     = 1
    MODE_MAINTENANCE        = 2
    MODE_SOFTWARE_UPDATE    = 3

    @dataclass
    class HeartbeatInfo():
        uptime: int = 0
        mode: int = -1
        health: int = -1
        vssc: int = -1

    def __init__(self):
        super().__init__('uavcan_heartbeat_to_ros')
        self.get_logger().info('uavcan_heartbeat_to_ros node initialising')

        ## Get parameters if on the server
        self.declare_parameter('uavcan.node_name', 'rossrobotics.mk4.nodetracker')
        uavcan_node_name = self.get_parameter('uavcan.node_name').value

        self.declare_parameter('uavcan.sub_name', 'node_tracker')
        uavcan_sub_name = self.get_parameter('uavcan.sub_name').value

        self.declare_parameter('uavcan.can_interface', 'can0')
        uavcan_can_interface = self.get_parameter('uavcan.can_interface').value

        self.declare_parameter('uavcan.local_node_id', 6)
        uavcan_local_node_id = self.get_parameter('uavcan.local_node_id').value

        self.declare_parameter('uavcan.mtu', 8)
        uavcan_mtu = self.get_parameter('uavcan.mtu').value

        self.declare_parameter('uavcan.version.major', 1)
        uavcan_v_major = self.get_parameter('uavcan.version.major').value

        self.declare_parameter('uavcan.version.minor', 0)
        uavcan_v_minor = self.get_parameter('uavcan.version.minor').value

        self.declare_parameter('uavcan.timer_freq', 50)
        uavcan_timer_period = 1.0/ self.get_parameter('uavcan.timer_freq').value

        self.declare_parameter('ros.pub_freq', 2)
        ros_pub_period = 1.0/ self.get_parameter('ros.pub_freq').value

        self.declare_parameter('ros.topic_name', 'uavcan_node_tracker')
        ros_topic_name = self.get_parameter('ros.topic_name').value

        ## Create empty dictionaries
        self.id_name_dict = {}
        self.id_uniqueid_dict = {}
        self.id_alive_dict = {}
        self.id_hbinfo_dict = {}

        mac_addr = getmac.get_mac_address().split(':')
        mac_addr_int  = []
        for x in range(len(mac_addr)):
            mac_addr_int.append(int(mac_addr[x],16)) ##converts string to int

        unique_id_pad = [0] * (16-len(mac_addr_int)) ## 16 is used here as the unique id is at uint8[16] array
        ## According to their documentation the unique id should be globally unique to each node, as we will have
        ## multiple nodes on the ccu, the local node id is added to the unique id for the node. This is added in
        ## with a byte gap from the mac address so if/when debugging it is easy to decipher the two
        unique_id_pad[16-len(mac_addr_int)-2] = uavcan_local_node_id

        unique_id = unique_id_pad + mac_addr_int

        ## Setup uavcan node
        uavcan_node_info = uavcan.node.GetInfo_1.Response(
            software_version=uavcan.node.Version_1(major=uavcan_v_major, minor=uavcan_v_minor),
            name=uavcan_node_name,unique_id=unique_id)

        registry = make_registry()
        media = SocketCANMedia(uavcan_can_interface, mtu=uavcan_mtu)
        transport = CANTransport(media, local_node_id=uavcan_local_node_id)
        presentation = Presentation(transport)
        self.tracker_uavcan_node = SimpleNode(presentation,uavcan_node_info, registry)

        self.hb_sub = self.tracker_uavcan_node.make_subscriber(uavcan.node.Heartbeat_1_0, uavcan_sub_name)
        self.hb_sub.receive_in_background(self.hb_rec)
        self.tracker_uavcan_node.start()

        # Setup uavcan node tracker
        self.node_tracker = NodeTracker(self.tracker_uavcan_node)
        self.node_tracker.add_update_handler(self.uavcan_node_update_handler)

        self.uavcan_timer = self.create_timer(uavcan_timer_period, self.uavcan_timer_callback)
        self.hb_timer = self.create_timer(ros_pub_period, self.hb_to_ros_timer)

        # Create ROS publisher
        self.heartbeat_publisher = self.create_publisher(Heartbeat, ros_topic_name, 1)

        self.get_logger().info('uavcan_heartbeat_to_ros node initialised')

    def hb_to_ros_timer(self):
        for node_id in self.id_name_dict:
            if self.id_alive_dict[node_id] and self.id_name_dict[node_id] != None:
                self.id_hbinfo_dict[node_id].uptime = self.node_tracker.registry[node_id].heartbeat.uptime
                self.id_hbinfo_dict[node_id].mode = self.node_tracker.registry[node_id].heartbeat.mode.value
                self.id_hbinfo_dict[node_id].health = self.node_tracker.registry[node_id].heartbeat.health.value
                self.id_hbinfo_dict[node_id].vssc = self.node_tracker.registry[node_id].heartbeat.vendor_specific_status_code
                if self.id_hbinfo_dict[node_id].mode == self.MODE_OPERATIONAL:
                    if self.id_hbinfo_dict[node_id].health == self.HEALTH_ADVISORY:
                        msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
                            + str(node_id) + ' has a minor failure dectected! This is not affecting functionality.'
                        self.get_logger().warn(msg, throttle_duration_sec=2)
                    elif self.id_hbinfo_dict[node_id].health == self.HEALTH_CAUTION:
                        msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
                            + str(node_id) + ' has encountered a major failure! It has degradated performance.'
                        self.get_logger().error(msg, throttle_duration_sec=2)
                    elif self.id_hbinfo_dict[node_id].health == self.HEALTH_WARNING:
                        msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
                            + str(node_id) + ' has suffered a fatal malfunction! It is unable to perform its intended functionality.'
                        self.get_logger().error(msg, throttle_duration_sec=2)
                else:
                    msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
                            + str(node_id) + ' waiting for operational mode.'
                    self.get_logger().info(msg, throttle_duration_sec=5)

            if self.id_name_dict[node_id] != None:
                hb_msg = Heartbeat()
                hb_msg.header.stamp = self.get_clock().now().to_msg()
                hb_msg.header.frame_id = self.id_name_dict[node_id]
                hb_msg.alive = self.id_alive_dict[node_id]
                hb_msg.uptime = self.id_hbinfo_dict[node_id].uptime
                hb_msg.health = self.id_hbinfo_dict[node_id].health
                hb_msg.mode = self.id_hbinfo_dict[node_id].mode
                hb_msg.vssc = self.id_hbinfo_dict[node_id].vssc
                hb_msg.node_id = node_id
                self.heartbeat_publisher.publish(hb_msg)

    def uavcan_timer_callback(self):
        _loop = asyncio.get_event_loop()
        _loop.run_until_complete(self.run())

    def uavcan_node_update_handler(self, node_id: int, old: typing.Optional[Entry], new: typing.Optional[Entry]):
        if node_id not in self.id_name_dict and old == None and new != None:
            self.get_logger().info('New uavcan node has been found. ID: %d' % node_id)
            self.id_name_dict[node_id] = None
            self.id_uniqueid_dict[node_id] = None
            self.id_alive_dict[node_id] = None
            self.id_hbinfo_dict[node_id] = self.HeartbeatInfo()
        elif node_id in self.id_name_dict and new == None:
            msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
                + str(node_id) + ' has gone offline.'
            self.get_logger().warn(msg)
            self.id_alive_dict[node_id] = False
        elif node_id in self.id_name_dict and old != None and new != None:
            if new.info == None:
                msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
                    + str(node_id) +  ' has restarted.'
                self.get_logger().info(msg)
                self.id_alive_dict[node_id] = False
            else:
                self.id_name_dict[node_id] = new.info.name.tobytes().decode()
                self.id_uniqueid_dict[node_id] = new.info.unique_id
                self.id_alive_dict[node_id] = True
                msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
                    + str(node_id) + ' has respnded to the get info request.'
                self.get_logger().info(msg)

    async def hb_rec(self, msg, meta: pyuavcan.transport.TransferFrom):
        pass

    async def run(self):
        self.hb_sub.receive_in_background(self.hb_rec)

def main(args=None):
    rclpy.init(args=args)
    uavcan_node_tracker = UavcanNodeTracker()
    rclpy.spin(uavcan_node_tracker)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)

    uavcan_node_tracker.close() # closes the uavcan node
    uavcan_node_tracker.destroy_node()
    rclpy.shutdown()

if __name__ == "__main__":
    main()

Environmental sensor data pubisher

#!/usr/bin/env python3
from re import T
import sys

compiled_dsdl_dir = "/home/pi/.yakut"

sys.path.insert(0,str(compiled_dsdl_dir))

try:
    import rr_data_types.sensors.Environment_1_0
except:
    print("range array data type not found")

try:
    import pyuavcan.application
except:
    print("pyuavcan application not found")

import uavcan.node  # noqa

import asyncio
import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from mk4_msgs.msg import Gas
from mk4_msgs.msg import Heartbeat
from sensor_msgs.msg import Temperature, FluidPressure, RelativeHumidity
from dataclasses import dataclass
import time
import getmac
from pyuavcan.transport.can.media.socketcan import SocketCANMedia
from pyuavcan.transport.can import CANTransport
from pyuavcan.presentation import Presentation
from pyuavcan.application._node_factory import SimpleNode
from pyuavcan.application._registry_factory import make_registry
from pyuavcan.application import register

class EnvironmentUavcan():
    @dataclass
    class EnvData():
        temperature: float = 0.0
        pressure: float = 0.0
        humidity: float = 0.0
        gas: float= 0.0

    def __init__(self, can_interface, mtu, local_node_id, local_node_name, uavcan_sub_name, \
            v_major, v_minor, register_file, reg_sub_key, reg_sub_value, reg_bit_key, reg_bit_value):
        mac_addr = getmac.get_mac_address().split(':')
        mac_addr_int  = []
        for x in range(len(mac_addr)):
            mac_addr_int.append(int(mac_addr[x],16)) ##converts string to int

        unique_id_padding = [0] * (16-len(mac_addr_int)) ## 16 is used here as the unique id is at uint8[16] array
        ## According to their documentation the unique id should be globally unique to each node, as we will have
        ## multiple nodes on the ccu, the local node id is added to the unique id for the node. This is added in
        ## with a byte gap from the mac address so if/when debugging it is easy to decipher the two
        unique_id_padding[16-len(mac_addr_int)-2] = local_node_id

        unique_id = unique_id_padding + mac_addr_int
        uavcan_node_info = uavcan.node.GetInfo_1.Response(
            software_version=uavcan.node.Version_1(major=v_major, minor=v_minor),
            name=local_node_name, unique_id=unique_id)

        self.id_envdata_dict = {}

        registry = make_registry(register_file=register_file)
        registry.setdefault(reg_sub_key,register.Value(integer16=register.Integer16(reg_sub_value)))
        registry.setdefault(reg_bit_key, register.Value(string=register.String(reg_bit_value)))

        media = SocketCANMedia(can_interface, mtu=mtu)
        transport = CANTransport(media, local_node_id=local_node_id)
        presentation = Presentation(transport)
        self.env_uavcan_node = SimpleNode(presentation,uavcan_node_info, registry)

        self.env_uavcan_sub = self.env_uavcan_node.make_subscriber(rr_data_types.sensors.Environment_1_0, uavcan_sub_name)
        self.env_uavcan_node.start()

    async def read_env_msg(self):
        async for msg, meta in self.env_uavcan_sub:
            if meta.source_node_id not in self.id_envdata_dict:
                self.id_envdata_dict[meta.source_node_id] = self.EnvData()
                self.id_envdata_dict[meta.source_node_id].temperature = msg.celsius
                self.id_envdata_dict[meta.source_node_id].pressure = msg.pascal
                self.id_envdata_dict[meta.source_node_id].humidity = msg.r_humidity
                self.id_envdata_dict[meta.source_node_id].gas = msg.mOhm

    def close(self):
        self.env_uavcan_node.close()

class EnvironmentPublisher(Node):
    def __init__(self):
        super().__init__('environment_sensor_publisher')
        self.get_logger().info("Initialising environment_sensor_publisher node")

        self.declare_parameter('uavcan.local_node_name', 'rossrobotics.mk4.ccu.environment')
        self.uavcan_local_node_name = self.get_parameter('uavcan.local_node_name').value

        self.declare_parameter('uavcan.environment_node_name', 'rossrobotics.mk4.environment')
        self.uavcan_environment_node_name = self.get_parameter('uavcan.environment_node_name').value

        self.declare_parameter('uavcan.can_interface', 'can0')
        self.uavcan_can_interface = self.get_parameter('uavcan.can_interface').value

        self.declare_parameter('uavcan.mtu', 8)
        self.uavcan_mtu = self.get_parameter('uavcan.mtu').value

        self.declare_parameter('uavcan.local_node_id', 5)
        self.uavcan_local_node_id = self.get_parameter('uavcan.local_node_id').value

        self.declare_parameter('uavcan.subscriber_port_name', 'environment_readings')
        self.uavcan_sub_name = self.get_parameter('uavcan.subscriber_port_name').value

        self.declare_parameter('uavcan.version.major', 1)
        self.uavcan_v_major = self.get_parameter('uavcan.version.major').value

        self.declare_parameter('uavcan.version.minor', 0)
        self.uavcan_v_minor = self.get_parameter('uavcan.version.minor').value

        self.declare_parameter('uavcan.register.file_path', '/home/pi/environment_sensor_driver.db')
        self.uavcan_register_file_path = self.get_parameter('uavcan.register.file_path').value

        self.declare_parameter('uavcan.register.subscriber.key_name', 'uavcan.sub.environment_readings.id')
        self.uavcan_register_sub_key_name = self.get_parameter('uavcan.register.subscriber.key_name').value

        self.declare_parameter('uavcan.register.subscriber.value', 1640)
        self.uavcan_register_sub_value = self.get_parameter('uavcan.register.subscriber.value').value

        self.declare_parameter('uavcan.register.bitrate.key_name', 'uavcan.can.bitrate')
        self.uavcan_register_bitrate_key_name = self.get_parameter('uavcan.register.bitrate.key_name').value

        self.declare_parameter('uavcan.register.bitrate.value', '1000000 1000000')
        self.uavcan_register_bitrate_value = self.get_parameter('uavcan.register.bitrate.value').value

        self.declare_parameter('ros.publish_freq', 1)
        sec_in_ns = 1000000000.0
        self.publish_period = sec_in_ns / self.get_parameter('ros.publish_freq').value

        self.declare_parameter('ros.hb_topic_name', 'uavcan_node_tracker')
        hb_topic_name = self.get_parameter('ros.hb_topic_name').value

        self.id_name_dict = {}
        self.id_alive_dict = {}
        self.id_temperature_publisher_dict = {}
        self.id_pressure_publisher_dict = {}
        self.id_humidity_publisher_dict = {}
        self.id_gas_publisher_dict = {}
        self.id_envdata_dict = {}

        self.node_tracker = self.create_subscription(Heartbeat, hb_topic_name, self.node_tracker_callback, 25)

        self.get_logger().info("environment_sensor_publisher node initialised")

    def env_publisher_timer_callback(self):
        for key in self.id_alive_dict:
            if self.id_alive_dict[key]:
                temperature_msg = Temperature()
                temperature_msg.header.stamp = self.get_clock().now().to_msg()
                temperature_msg.header.frame_id = self.id_name_dict[key]
                temperature_msg.temperature = self.id_envdata_dict[key].temperature
                self.id_temperature_publisher_dict[key].publish(temperature_msg)

                pressure_msg = FluidPressure()
                pressure_msg.header.stamp = self.get_clock().now().to_msg()
                pressure_msg.header.frame_id = self.id_name_dict[key]
                pressure_msg.fluid_pressure = self.id_envdata_dict[key].pressure
                self.id_pressure_publisher_dict[key].publish(pressure_msg)

                humidity_msg = RelativeHumidity()
                humidity_msg.header.stamp = self.get_clock().now().to_msg()
                humidity_msg.header.frame_id = self.id_name_dict[key]
                humidity_msg.relative_humidity = self.id_envdata_dict[key].humidity
                self.id_humidity_publisher_dict[key].publish(humidity_msg)

                gas_msg = Gas()
                gas_msg.header.stamp = self.get_clock().now().to_msg()
                gas_msg.header.frame_id = self.id_name_dict[key]
                gas_msg.gas = self.id_envdata_dict[key].gas
                self.id_gas_publisher_dict[key].publish(gas_msg)

    def node_tracker_callback(self, msg):
        if self.uavcan_environment_node_name in msg.header.frame_id:
            if msg.node_id not in self.id_temperature_publisher_dict:
                topic_name = msg.header.frame_id.replace('.', '_') + '/temperature'
                self.id_temperature_publisher_dict[msg.node_id] = self.create_publisher(Temperature, topic_name, 1)
                topic_name = msg.header.frame_id.replace('.', '_') + '/pressure'
                self.id_pressure_publisher_dict[msg.node_id] = self.create_publisher(FluidPressure, topic_name, 1)
                topic_name = msg.header.frame_id.replace('.', '_') + '/humidity'
                self.id_humidity_publisher_dict[msg.node_id] = self.create_publisher(RelativeHumidity, topic_name, 1)
                topic_name = msg.header.frame_id.replace('.', '_') + '/gas'
                self.id_gas_publisher_dict[msg.node_id] = self.create_publisher(Gas, topic_name, 1)
                self.id_alive_dict[msg.node_id] = msg.alive
                self.id_envdata_dict[msg.node_id] = None
                self.id_name_dict[msg.node_id] = msg.header.frame_id
            else:
                self.id_alive_dict[msg.node_id] = msg.alive

class Environment():
    def __init__(self):
        self.env_publisher = EnvironmentPublisher()
        self.env_uavcan = EnvironmentUavcan(self.env_publisher.uavcan_can_interface,\
            self.env_publisher.uavcan_mtu,\
            self.env_publisher.uavcan_local_node_id,\
            self.env_publisher.uavcan_local_node_name,\
            self.env_publisher.uavcan_sub_name,\
            self.env_publisher.uavcan_v_major,\
            self.env_publisher.uavcan_v_minor,\
            self.env_publisher.uavcan_register_file_path,\
            self.env_publisher.uavcan_register_sub_key_name,\
            self.env_publisher.uavcan_register_sub_value,\
            self.env_publisher.uavcan_register_bitrate_key_name,\
            self.env_publisher.uavcan_register_bitrate_value)

        self.prev_time = 0
        self.executor = MultiThreadedExecutor()
        self.executor.add_node(self.env_publisher)

    async def loop(self):
        while 1:
            time_now = time.time_ns()
            await asyncio.sleep(0.0001)
            if (time_now - self.prev_time) > self.env_publisher.publish_period:
                self.prev_time = time_now
                for key in self.env_publisher.id_envdata_dict:
                    self.env_publisher.id_envdata_dict[key] = self.env_uavcan.id_envdata_dict[key]
                self.env_publisher.env_publisher_timer_callback()
                self.executor.spin_once(timeout_sec = 0.0)

async def main():
    env = Environment()

    try:
        task1 = asyncio.create_task(env.env_uavcan.read_env_msg())
        task2 = asyncio.create_task(env.loop())
        await task1
        await task2
    finally:
        env.env_uavcan.close()

if __name__ == '__main__':
    rclpy.init()
    asyncio.run(main())
    rclpy.shutdown()

Headlight interface

#!/usr/bin/env python3
import asyncio
import sys
import struct
import getmac

import rclpy
from rclpy.node import Node
from mk4_msgs.srv import Headlight as HeadlightSrv
from mk4_msgs.msg import Heartbeat
from rclpy.executors import MultiThreadedExecutor

compiled_dsdl_dir = "/home/pi/.yakut"

sys.path.insert(0,str(compiled_dsdl_dir))

import uavcan.node  # noqa
import uavcan.primitive.scalar

from pyuavcan.transport.can.media.socketcan import SocketCANMedia
from pyuavcan.transport.can import CANTransport
from pyuavcan.presentation import Presentation
from pyuavcan.application._node_factory import SimpleNode
from pyuavcan.application._registry_factory import make_registry
from pyuavcan.application import register

class HeadlightUavcanInterface():
    RESPONSE_SUCCESS = 0
    RESPONSE_FAILURE = 1
    RESPONSE_NOT_AUTHERIZED = 2
    RESPONSE_BAD_COMMAND = 3
    RESPONSE_BAD_PARAMETER = 4
    RESPONSE_BAD_STATE = 5
    RESPONSE_INTERNAL_ERROR = 6
    RESPONSE_NO_RESPONSE = -1

    def __init__(self, can_interface, mtu, local_node_id, local_node_name, \
            v_major, v_minor, register_file, reg_bit_key, reg_bit_value, \
            execute_command_identifier, publisher_port_name, reg_pub_key, reg_pub_value):

        mac_addr = getmac.get_mac_address().split(':')
        mac_addr_int  = []
        for x in range(len(mac_addr)):
            mac_addr_int.append(int(mac_addr[x],16)) ##converts string to int

        unique_id_padding = [0] * (16-len(mac_addr_int)) ## 16 is used here as the unique id is at uint8[16] array
        ## According to their documentation the unique id should be globally unique to each node, as we will have
        ## multiple nodes on the ccu, the local node id is added to the unique id for the node. This is added in
        ## with a byte gap from the mac address so if/when debugging it is easy to decipher the two
        unique_id_padding[16-len(mac_addr_int)-2] = local_node_id

        unique_id = unique_id_padding + mac_addr_int
        uavcan_node_info = uavcan.node.GetInfo_1.Response(
            software_version=uavcan.node.Version_1(major=v_major, minor=v_minor),
            name=local_node_name, unique_id=unique_id,protocol_version=uavcan.node.Version_1(major=1, minor=0))

        registry = make_registry(register_file=register_file)
        registry.setdefault(reg_pub_key,register.Value(integer16=register.Integer16(reg_pub_value)))
        registry.setdefault(reg_bit_key, register.Value(string=register.String(reg_bit_value)))
        media = SocketCANMedia(can_interface, mtu=mtu)
        transport = CANTransport(media, local_node_id=local_node_id)
        presentation = Presentation(transport)
        self.headlight_uavcan_node = SimpleNode(presentation,uavcan_node_info, registry)

        self.headlight_activate_publisher = self.headlight_uavcan_node.make_publisher(\
            uavcan.primitive.scalar.Bit_1_0, publisher_port_name)

        self.id_headlightclient_dict = {}
        self.id_name_dict = {}
        self.id_command_dict = {}

        self.send_activate_headlight = False
        self.activate_headlight = False

        self.byte_array_to_send = bytearray()
        self.uavcan_response = None
        self.send_req = False
        self.uavcan_execute_command_identifier = execute_command_identifier
        self.headlight_uavcan_node.start()

    async def async_execute_command(self):
        while 1:
            await asyncio.sleep(0.0001)
            for key in self.id_headlightclient_dict:
                if self.id_headlightclient_dict[key] is None:
                    self.id_headlightclient_dict[key] = self.headlight_uavcan_node.make_client(\
                        uavcan.node.ExecuteCommand_1_1, key, self.id_name_dict[key] + '.headlight')

                if self.id_command_dict[key] is not None and self.send_req:
                    req = uavcan.node.ExecuteCommand_1.Request()
                    req.command= self.uavcan_execute_command_identifier
                    req.parameter = self.id_command_dict[key]
                    attempts = 0
                    while attempts < 3:
                        try:
                            response, metadata = await(self.id_headlightclient_dict[key].call(req))
                            self.uavcan_response = response.status
                            break
                        except:
                            attempts += 1
                            self.uavcan_response = -1
                    self.send_req = False

    async def async_activate_headlight_publisher(self):
        while 1:
            await asyncio.sleep(0.0001)
            if self.send_activate_headlight:
                await self.headlight_activate_publisher.publish(uavcan.primitive.scalar.Bit_1_0(self.activate_headlight))
                self.send_activate_headlight = False

    def close(self):
        self.headlight_uavcan_node.close()

class HeadlightRosInterface(Node):
    def __init__(self):
        super().__init__('headlight_uavcan_srv')

        self.get_logger().info("Initialising headlight_uavcan_srv node")

        self.declare_parameter('uavcan.local_node_name', 'rossrobotics.mk4.ccu.bumper')
        self.uavcan_local_node_name = self.get_parameter('uavcan.local_node_name').value

        self.declare_parameter('uavcan.bumper_node_name', 'rossrobotics.mk4.bumper')
        self.uavcan_bumper_node_name = self.get_parameter('uavcan.bumper_node_name').value

        self.declare_parameter('uavcan.publisher_port_name', 'activate_headlights')
        self.uavcan_pub_name = self.get_parameter('uavcan.publisher_port_name').value

        self.declare_parameter('uavcan.can_interface', 'can0')
        self.uavcan_can_interface = self.get_parameter('uavcan.can_interface').value

        self.declare_parameter('uavcan.mtu', 8)
        self.uavcan_mtu = self.get_parameter('uavcan.mtu').value

        self.declare_parameter('uavcan.local_node_id', 4)
        self.uavcan_local_node_id = self.get_parameter('uavcan.local_node_id').value

        self.declare_parameter('uavcan.version.major', 1)
        self.uavcan_v_major = self.get_parameter('uavcan.version.major').value

        self.declare_parameter('uavcan.version.minor', 0)
        self.uavcan_v_minor = self.get_parameter('uavcan.version.minor').value

        self.declare_parameter('uavcan.register.file_path', '/home/pi/headlight_interface.db')
        self.uavcan_register_file_path = self.get_parameter('uavcan.register.file_path').value

        self.declare_parameter('uavcan.register.publisher.key_name', 'uavcan.pub.activate_headlights.id')
        self.uavcan_register_pub_key_name = self.get_parameter('uavcan.register.publisher.key_name').value

        self.declare_parameter('uavcan.register.publisher.value', 1639)
        self.uavcan_register_pub_value = self.get_parameter('uavcan.register.publisher.value').value

        self.declare_parameter('uavcan.register.bitrate.key_name', 'uavcan.can.bitrate')
        self.uavcan_register_bitrate_key_name = self.get_parameter('uavcan.register.bitrate.key_name').value

        self.declare_parameter('uavcan.register.bitrate.value', '1000000 1000000')
        self.uavcan_register_bitrate_value = self.get_parameter('uavcan.register.bitrate.value').value

        self.declare_parameter('uavcan.execute_command_identifier', 18)
        self.uavcan_execute_command_identifier = self.get_parameter('uavcan.execute_command_identifier').value

        self.declare_parameter('ros.heartbeat_topic_name', 'uavcan_node_tracker')
        ros_heartbeat_topic_name = self.get_parameter('ros.heartbeat_topic_name').value

        self.declare_parameter('ros.service_name','rossrobotics_mk4_bumper/headlight')
        ros_service_name = self.get_parameter('ros.service_name').value

        self.ros_heartbeat_subscriber = self.create_subscription(Heartbeat, ros_heartbeat_topic_name, self.node_tracker_callback, 5)
        self.ros_headlight_service = self.create_service(HeadlightSrv, ros_service_name, self.headlight_srv_callback)

        self.id_name_dict = {}
        self.id_alive_dict = {}
        self.id_command_dict = {}

        self.headlight_uavcan_interface = HeadlightUavcanInterface.__new__(HeadlightUavcanInterface)
        self.uavcan_node_alive = False

        self.get_logger().info("headlight_uavcan_srv node initialised")

    def node_tracker_callback(self, msg):
        if self.uavcan_bumper_node_name in msg.header.frame_id:
            if msg.node_id not in self.id_name_dict:
                self.id_alive_dict[msg.node_id] = msg.alive
                self.id_name_dict[msg.node_id] = msg.header.frame_id
            else:
                self.id_alive_dict[msg.node_id] = msg.alive

    def headlight_srv_callback(self, request, response):
        headlight_to_call = request.headlight
        mode_byte = request.mode.to_bytes(1,byteorder='big')
        ints_byte_array = bytearray(struct.pack("f", request.intensity))

        headlight_to_call_ids = []
        response_message = ""

        if len(self.id_name_dict) != len(headlight_to_call):
            response_message = response_message + 'The requested number of headlights have not been found on the CAN bus. Requested: ' \
                + str(len(headlight_to_call)) + ', Found: ' \
                + str(len(self.id_name_dict)) + ' on the CAN bus. '

        for key in self.id_name_dict:
            for headlight in headlight_to_call:
                if headlight in self.id_name_dict[key]:
                    headlight_to_call_ids.append(key)

        self.headlight_uavcan_interface.activate_headlight = False
        self.headlight_uavcan_interface.send_activate_headlight = True

        for id in headlight_to_call_ids:
            if self.id_alive_dict[id]:
                self.headlight_uavcan_interface.id_command_dict[id] = mode_byte + ints_byte_array
                self.headlight_uavcan_interface.send_req = True

                while self.headlight_uavcan_interface.uavcan_response == None:
                    pass

                if self.headlight_uavcan_interface.uavcan_response == self.headlight_uavcan_interface.RESPONSE_SUCCESS:
                    response.success = True
                    response_message = response_message + 'Uavcan node: ' + self.id_name_dict[id] + ', ID: ' \
                        + str(id) + \
                        '. Successfully set. '
                elif self.headlight_uavcan_interface.uavcan_response == self.headlight_uavcan_interface.RESPONSE_FAILURE:
                    response.success = False
                    response_message = response_message + 'Uavcan node: ' + self.id_name_dict[id] + ', ID: ' \
                        + str(id) + \
                        '. Headlight is disconnected or malfunctioning. '
                elif self.headlight_uavcan_interface.uavcan_response == self.headlight_uavcan_interface.RESPONSE_BAD_PARAMETER:
                    response.success = False
                    response_message = response_message + 'Uavcan node: ' + self.id_name_dict[id] + ', ID: ' \
                        + str(id) + \
                        '. Not successful. Mode selected not available. '
                elif self.headlight_uavcan_interface.uavcan_response == self.headlight_uavcan_interface.RESPONSE_NO_RESPONSE:
                    response.success = False
                    response_message = response_message + 'Uavcan node: ' + self.id_name_dict[id] + ', ID: ' \
                        + str(id) + \
                        '. Not sucessful. No response. '

                self.headlight_uavcan_interface.uavcan_response = None
                self.headlight_uavcan_interface.id_command_dict[id] = None
            else:
                response.success = False
                response_message = response_message + 'Uavcan node: ' + self.id_name_dict[id] + ', ID: ' \
                        + str(id) + \
                        '. Not alive. '

        self.headlight_uavcan_interface.activate_headlight = True
        self.headlight_uavcan_interface.send_activate_headlight = True

        if len(self.id_name_dict) != len(headlight_to_call):
            response.success = False
        response.message = response_message

        return response

class Headlight():
    def __init__(self):
        self.ros_headlight_interface = HeadlightRosInterface()
        self.uavcan_headlight_interface = HeadlightUavcanInterface(\
            self.ros_headlight_interface.uavcan_can_interface,\
            self.ros_headlight_interface.uavcan_mtu, \
            self.ros_headlight_interface.uavcan_local_node_id, \
            self.ros_headlight_interface.uavcan_local_node_name, \
            self.ros_headlight_interface.uavcan_v_major, \
            self.ros_headlight_interface.uavcan_v_minor, \
            self.ros_headlight_interface.uavcan_register_file_path,\
            self.ros_headlight_interface.uavcan_register_bitrate_key_name,\
            self.ros_headlight_interface.uavcan_register_bitrate_value, \
            self.ros_headlight_interface.uavcan_execute_command_identifier, \
            self.ros_headlight_interface.uavcan_pub_name, \
            self.ros_headlight_interface.uavcan_register_pub_key_name, \
            self.ros_headlight_interface.uavcan_register_pub_value)

        self.ros_headlight_interface.headlight_uavcan_interface = self.uavcan_headlight_interface
        self.executor = MultiThreadedExecutor()
        self.executor.add_node(self.ros_headlight_interface)

    async def loop(self):
        while 1:
            await asyncio.sleep(0.0001)
            for key in self.ros_headlight_interface.id_name_dict:
                if key not in self.uavcan_headlight_interface.id_name_dict:
                    self.uavcan_headlight_interface.id_name_dict[key] = self.ros_headlight_interface.id_name_dict[key]
                    self.uavcan_headlight_interface.id_headlightclient_dict[key] = None
                    self.uavcan_headlight_interface.id_command_dict[key] = None

            self.executor.spin_once(timeout_sec = 0.0)

async def main():
    headlight = Headlight()

    try:
        task1 = asyncio.create_task(headlight.uavcan_headlight_interface.async_execute_command())
        task2 = asyncio.create_task(headlight.loop())
        task3 = asyncio.create_task(headlight.uavcan_headlight_interface.async_activate_headlight_publisher())
        await task1
        await task2
        await task3
    finally:
        headlight.uavcan_headlight_interface.close()
        pass

if __name__ == '__main__':
    rclpy.init()
    asyncio.run(main())
    rclpy.shutdown()
1 Like