Hello everyone,
we are working in interfacing some UAVCAN nodes to our ROS2 system. Doing this we created two scripts, one that subscribes to a PORT-ID and republishes the information in a topic for each of the nodes that publish in the PORT-I. And the other script interfaces a ROS2 service with a UAVCAN command request. The scripts rely on a third node that republishes the heartbeat from UAVCAN to a ROS2 topic.
Neither of the scripts fully comply with the NODE-ID and the PORT-ID (and maybe other bits) but we guessed that would b good to share them so that someone could benefit from them and/or get some pointers in ways to make the scripts more efficient/optimal.
Script #1 ROS2 Heartbeat interface
#!/usr/bin/env python3
import sys
compiled_dsdl_dir = "/home/pi/.yakut"
sys.path.insert(0,str(compiled_dsdl_dir))
try:
import pyuavcan.application
except:
print("pyuavcan application not found")
import uavcan.node # noqa
import rclpy
from rclpy.node import Node
from mk4_msgs.msg import Heartbeat
import getmac
import asyncio
import typing
from dataclasses import dataclass
import pyuavcan
from pyuavcan.transport.can.media.socketcan import SocketCANMedia
from pyuavcan.transport.can import CANTransport
from pyuavcan.presentation import Presentation
from pyuavcan.application._node_factory import SimpleNode
from pyuavcan.application._registry_factory import make_registry
from pyuavcan.application.node_tracker import NodeTracker, Entry
class UavcanNodeTracker(Node):
HEALTH_NOMINAL = 0
HEALTH_ADVISORY = 1
HEALTH_CAUTION = 2
HEALTH_WARNING = 3
MODE_OPERATIONAL = 0
MODE_INITIALISATION = 1
MODE_MAINTENANCE = 2
MODE_SOFTWARE_UPDATE = 3
@dataclass
class HeartbeatInfo():
uptime: int = 0
mode: int = -1
health: int = -1
vssc: int = -1
def __init__(self):
super().__init__('uavcan_heartbeat_to_ros')
self.get_logger().info('uavcan_heartbeat_to_ros node initialising')
## Get parameters if on the server
self.declare_parameter('uavcan.node_name', 'rossrobotics.mk4.nodetracker')
uavcan_node_name = self.get_parameter('uavcan.node_name').value
self.declare_parameter('uavcan.sub_name', 'node_tracker')
uavcan_sub_name = self.get_parameter('uavcan.sub_name').value
self.declare_parameter('uavcan.can_interface', 'can0')
uavcan_can_interface = self.get_parameter('uavcan.can_interface').value
self.declare_parameter('uavcan.local_node_id', 6)
uavcan_local_node_id = self.get_parameter('uavcan.local_node_id').value
self.declare_parameter('uavcan.mtu', 8)
uavcan_mtu = self.get_parameter('uavcan.mtu').value
self.declare_parameter('uavcan.version.major', 1)
uavcan_v_major = self.get_parameter('uavcan.version.major').value
self.declare_parameter('uavcan.version.minor', 0)
uavcan_v_minor = self.get_parameter('uavcan.version.minor').value
self.declare_parameter('uavcan.timer_freq', 50)
uavcan_timer_period = 1.0/ self.get_parameter('uavcan.timer_freq').value
self.declare_parameter('ros.pub_freq', 2)
ros_pub_period = 1.0/ self.get_parameter('ros.pub_freq').value
self.declare_parameter('ros.topic_name', 'uavcan_node_tracker')
ros_topic_name = self.get_parameter('ros.topic_name').value
## Create empty dictionaries
self.id_name_dict = {}
self.id_uniqueid_dict = {}
self.id_alive_dict = {}
self.id_hbinfo_dict = {}
mac_addr = getmac.get_mac_address().split(':')
mac_addr_int = []
for x in range(len(mac_addr)):
mac_addr_int.append(int(mac_addr[x],16)) ##converts string to int
unique_id_pad = [0] * (16-len(mac_addr_int)) ## 16 is used here as the unique id is at uint8[16] array
## According to their documentation the unique id should be globally unique to each node, as we will have
## multiple nodes on the ccu, the local node id is added to the unique id for the node. This is added in
## with a byte gap from the mac address so if/when debugging it is easy to decipher the two
unique_id_pad[16-len(mac_addr_int)-2] = uavcan_local_node_id
unique_id = unique_id_pad + mac_addr_int
## Setup uavcan node
uavcan_node_info = uavcan.node.GetInfo_1.Response(
software_version=uavcan.node.Version_1(major=uavcan_v_major, minor=uavcan_v_minor),
name=uavcan_node_name,unique_id=unique_id)
registry = make_registry()
media = SocketCANMedia(uavcan_can_interface, mtu=uavcan_mtu)
transport = CANTransport(media, local_node_id=uavcan_local_node_id)
presentation = Presentation(transport)
self.tracker_uavcan_node = SimpleNode(presentation,uavcan_node_info, registry)
self.hb_sub = self.tracker_uavcan_node.make_subscriber(uavcan.node.Heartbeat_1_0, uavcan_sub_name)
self.hb_sub.receive_in_background(self.hb_rec)
self.tracker_uavcan_node.start()
# Setup uavcan node tracker
self.node_tracker = NodeTracker(self.tracker_uavcan_node)
self.node_tracker.add_update_handler(self.uavcan_node_update_handler)
self.uavcan_timer = self.create_timer(uavcan_timer_period, self.uavcan_timer_callback)
self.hb_timer = self.create_timer(ros_pub_period, self.hb_to_ros_timer)
# Create ROS publisher
self.heartbeat_publisher = self.create_publisher(Heartbeat, ros_topic_name, 1)
self.get_logger().info('uavcan_heartbeat_to_ros node initialised')
def hb_to_ros_timer(self):
for node_id in self.id_name_dict:
if self.id_alive_dict[node_id] and self.id_name_dict[node_id] != None:
self.id_hbinfo_dict[node_id].uptime = self.node_tracker.registry[node_id].heartbeat.uptime
self.id_hbinfo_dict[node_id].mode = self.node_tracker.registry[node_id].heartbeat.mode.value
self.id_hbinfo_dict[node_id].health = self.node_tracker.registry[node_id].heartbeat.health.value
self.id_hbinfo_dict[node_id].vssc = self.node_tracker.registry[node_id].heartbeat.vendor_specific_status_code
if self.id_hbinfo_dict[node_id].mode == self.MODE_OPERATIONAL:
if self.id_hbinfo_dict[node_id].health == self.HEALTH_ADVISORY:
msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
+ str(node_id) + ' has a minor failure dectected! This is not affecting functionality.'
self.get_logger().warn(msg, throttle_duration_sec=2)
elif self.id_hbinfo_dict[node_id].health == self.HEALTH_CAUTION:
msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
+ str(node_id) + ' has encountered a major failure! It has degradated performance.'
self.get_logger().error(msg, throttle_duration_sec=2)
elif self.id_hbinfo_dict[node_id].health == self.HEALTH_WARNING:
msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
+ str(node_id) + ' has suffered a fatal malfunction! It is unable to perform its intended functionality.'
self.get_logger().error(msg, throttle_duration_sec=2)
else:
msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
+ str(node_id) + ' waiting for operational mode.'
self.get_logger().info(msg, throttle_duration_sec=5)
if self.id_name_dict[node_id] != None:
hb_msg = Heartbeat()
hb_msg.header.stamp = self.get_clock().now().to_msg()
hb_msg.header.frame_id = self.id_name_dict[node_id]
hb_msg.alive = self.id_alive_dict[node_id]
hb_msg.uptime = self.id_hbinfo_dict[node_id].uptime
hb_msg.health = self.id_hbinfo_dict[node_id].health
hb_msg.mode = self.id_hbinfo_dict[node_id].mode
hb_msg.vssc = self.id_hbinfo_dict[node_id].vssc
hb_msg.node_id = node_id
self.heartbeat_publisher.publish(hb_msg)
def uavcan_timer_callback(self):
_loop = asyncio.get_event_loop()
_loop.run_until_complete(self.run())
def uavcan_node_update_handler(self, node_id: int, old: typing.Optional[Entry], new: typing.Optional[Entry]):
if node_id not in self.id_name_dict and old == None and new != None:
self.get_logger().info('New uavcan node has been found. ID: %d' % node_id)
self.id_name_dict[node_id] = None
self.id_uniqueid_dict[node_id] = None
self.id_alive_dict[node_id] = None
self.id_hbinfo_dict[node_id] = self.HeartbeatInfo()
elif node_id in self.id_name_dict and new == None:
msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
+ str(node_id) + ' has gone offline.'
self.get_logger().warn(msg)
self.id_alive_dict[node_id] = False
elif node_id in self.id_name_dict and old != None and new != None:
if new.info == None:
msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
+ str(node_id) + ' has restarted.'
self.get_logger().info(msg)
self.id_alive_dict[node_id] = False
else:
self.id_name_dict[node_id] = new.info.name.tobytes().decode()
self.id_uniqueid_dict[node_id] = new.info.unique_id
self.id_alive_dict[node_id] = True
msg = 'Uavcan node: ' + self.id_name_dict[node_id] + ', ID: ' \
+ str(node_id) + ' has respnded to the get info request.'
self.get_logger().info(msg)
async def hb_rec(self, msg, meta: pyuavcan.transport.TransferFrom):
pass
async def run(self):
self.hb_sub.receive_in_background(self.hb_rec)
def main(args=None):
rclpy.init(args=args)
uavcan_node_tracker = UavcanNodeTracker()
rclpy.spin(uavcan_node_tracker)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
uavcan_node_tracker.close() # closes the uavcan node
uavcan_node_tracker.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
Environmental sensor data pubisher
#!/usr/bin/env python3
from re import T
import sys
compiled_dsdl_dir = "/home/pi/.yakut"
sys.path.insert(0,str(compiled_dsdl_dir))
try:
import rr_data_types.sensors.Environment_1_0
except:
print("range array data type not found")
try:
import pyuavcan.application
except:
print("pyuavcan application not found")
import uavcan.node # noqa
import asyncio
import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from mk4_msgs.msg import Gas
from mk4_msgs.msg import Heartbeat
from sensor_msgs.msg import Temperature, FluidPressure, RelativeHumidity
from dataclasses import dataclass
import time
import getmac
from pyuavcan.transport.can.media.socketcan import SocketCANMedia
from pyuavcan.transport.can import CANTransport
from pyuavcan.presentation import Presentation
from pyuavcan.application._node_factory import SimpleNode
from pyuavcan.application._registry_factory import make_registry
from pyuavcan.application import register
class EnvironmentUavcan():
@dataclass
class EnvData():
temperature: float = 0.0
pressure: float = 0.0
humidity: float = 0.0
gas: float= 0.0
def __init__(self, can_interface, mtu, local_node_id, local_node_name, uavcan_sub_name, \
v_major, v_minor, register_file, reg_sub_key, reg_sub_value, reg_bit_key, reg_bit_value):
mac_addr = getmac.get_mac_address().split(':')
mac_addr_int = []
for x in range(len(mac_addr)):
mac_addr_int.append(int(mac_addr[x],16)) ##converts string to int
unique_id_padding = [0] * (16-len(mac_addr_int)) ## 16 is used here as the unique id is at uint8[16] array
## According to their documentation the unique id should be globally unique to each node, as we will have
## multiple nodes on the ccu, the local node id is added to the unique id for the node. This is added in
## with a byte gap from the mac address so if/when debugging it is easy to decipher the two
unique_id_padding[16-len(mac_addr_int)-2] = local_node_id
unique_id = unique_id_padding + mac_addr_int
uavcan_node_info = uavcan.node.GetInfo_1.Response(
software_version=uavcan.node.Version_1(major=v_major, minor=v_minor),
name=local_node_name, unique_id=unique_id)
self.id_envdata_dict = {}
registry = make_registry(register_file=register_file)
registry.setdefault(reg_sub_key,register.Value(integer16=register.Integer16(reg_sub_value)))
registry.setdefault(reg_bit_key, register.Value(string=register.String(reg_bit_value)))
media = SocketCANMedia(can_interface, mtu=mtu)
transport = CANTransport(media, local_node_id=local_node_id)
presentation = Presentation(transport)
self.env_uavcan_node = SimpleNode(presentation,uavcan_node_info, registry)
self.env_uavcan_sub = self.env_uavcan_node.make_subscriber(rr_data_types.sensors.Environment_1_0, uavcan_sub_name)
self.env_uavcan_node.start()
async def read_env_msg(self):
async for msg, meta in self.env_uavcan_sub:
if meta.source_node_id not in self.id_envdata_dict:
self.id_envdata_dict[meta.source_node_id] = self.EnvData()
self.id_envdata_dict[meta.source_node_id].temperature = msg.celsius
self.id_envdata_dict[meta.source_node_id].pressure = msg.pascal
self.id_envdata_dict[meta.source_node_id].humidity = msg.r_humidity
self.id_envdata_dict[meta.source_node_id].gas = msg.mOhm
def close(self):
self.env_uavcan_node.close()
class EnvironmentPublisher(Node):
def __init__(self):
super().__init__('environment_sensor_publisher')
self.get_logger().info("Initialising environment_sensor_publisher node")
self.declare_parameter('uavcan.local_node_name', 'rossrobotics.mk4.ccu.environment')
self.uavcan_local_node_name = self.get_parameter('uavcan.local_node_name').value
self.declare_parameter('uavcan.environment_node_name', 'rossrobotics.mk4.environment')
self.uavcan_environment_node_name = self.get_parameter('uavcan.environment_node_name').value
self.declare_parameter('uavcan.can_interface', 'can0')
self.uavcan_can_interface = self.get_parameter('uavcan.can_interface').value
self.declare_parameter('uavcan.mtu', 8)
self.uavcan_mtu = self.get_parameter('uavcan.mtu').value
self.declare_parameter('uavcan.local_node_id', 5)
self.uavcan_local_node_id = self.get_parameter('uavcan.local_node_id').value
self.declare_parameter('uavcan.subscriber_port_name', 'environment_readings')
self.uavcan_sub_name = self.get_parameter('uavcan.subscriber_port_name').value
self.declare_parameter('uavcan.version.major', 1)
self.uavcan_v_major = self.get_parameter('uavcan.version.major').value
self.declare_parameter('uavcan.version.minor', 0)
self.uavcan_v_minor = self.get_parameter('uavcan.version.minor').value
self.declare_parameter('uavcan.register.file_path', '/home/pi/environment_sensor_driver.db')
self.uavcan_register_file_path = self.get_parameter('uavcan.register.file_path').value
self.declare_parameter('uavcan.register.subscriber.key_name', 'uavcan.sub.environment_readings.id')
self.uavcan_register_sub_key_name = self.get_parameter('uavcan.register.subscriber.key_name').value
self.declare_parameter('uavcan.register.subscriber.value', 1640)
self.uavcan_register_sub_value = self.get_parameter('uavcan.register.subscriber.value').value
self.declare_parameter('uavcan.register.bitrate.key_name', 'uavcan.can.bitrate')
self.uavcan_register_bitrate_key_name = self.get_parameter('uavcan.register.bitrate.key_name').value
self.declare_parameter('uavcan.register.bitrate.value', '1000000 1000000')
self.uavcan_register_bitrate_value = self.get_parameter('uavcan.register.bitrate.value').value
self.declare_parameter('ros.publish_freq', 1)
sec_in_ns = 1000000000.0
self.publish_period = sec_in_ns / self.get_parameter('ros.publish_freq').value
self.declare_parameter('ros.hb_topic_name', 'uavcan_node_tracker')
hb_topic_name = self.get_parameter('ros.hb_topic_name').value
self.id_name_dict = {}
self.id_alive_dict = {}
self.id_temperature_publisher_dict = {}
self.id_pressure_publisher_dict = {}
self.id_humidity_publisher_dict = {}
self.id_gas_publisher_dict = {}
self.id_envdata_dict = {}
self.node_tracker = self.create_subscription(Heartbeat, hb_topic_name, self.node_tracker_callback, 25)
self.get_logger().info("environment_sensor_publisher node initialised")
def env_publisher_timer_callback(self):
for key in self.id_alive_dict:
if self.id_alive_dict[key]:
temperature_msg = Temperature()
temperature_msg.header.stamp = self.get_clock().now().to_msg()
temperature_msg.header.frame_id = self.id_name_dict[key]
temperature_msg.temperature = self.id_envdata_dict[key].temperature
self.id_temperature_publisher_dict[key].publish(temperature_msg)
pressure_msg = FluidPressure()
pressure_msg.header.stamp = self.get_clock().now().to_msg()
pressure_msg.header.frame_id = self.id_name_dict[key]
pressure_msg.fluid_pressure = self.id_envdata_dict[key].pressure
self.id_pressure_publisher_dict[key].publish(pressure_msg)
humidity_msg = RelativeHumidity()
humidity_msg.header.stamp = self.get_clock().now().to_msg()
humidity_msg.header.frame_id = self.id_name_dict[key]
humidity_msg.relative_humidity = self.id_envdata_dict[key].humidity
self.id_humidity_publisher_dict[key].publish(humidity_msg)
gas_msg = Gas()
gas_msg.header.stamp = self.get_clock().now().to_msg()
gas_msg.header.frame_id = self.id_name_dict[key]
gas_msg.gas = self.id_envdata_dict[key].gas
self.id_gas_publisher_dict[key].publish(gas_msg)
def node_tracker_callback(self, msg):
if self.uavcan_environment_node_name in msg.header.frame_id:
if msg.node_id not in self.id_temperature_publisher_dict:
topic_name = msg.header.frame_id.replace('.', '_') + '/temperature'
self.id_temperature_publisher_dict[msg.node_id] = self.create_publisher(Temperature, topic_name, 1)
topic_name = msg.header.frame_id.replace('.', '_') + '/pressure'
self.id_pressure_publisher_dict[msg.node_id] = self.create_publisher(FluidPressure, topic_name, 1)
topic_name = msg.header.frame_id.replace('.', '_') + '/humidity'
self.id_humidity_publisher_dict[msg.node_id] = self.create_publisher(RelativeHumidity, topic_name, 1)
topic_name = msg.header.frame_id.replace('.', '_') + '/gas'
self.id_gas_publisher_dict[msg.node_id] = self.create_publisher(Gas, topic_name, 1)
self.id_alive_dict[msg.node_id] = msg.alive
self.id_envdata_dict[msg.node_id] = None
self.id_name_dict[msg.node_id] = msg.header.frame_id
else:
self.id_alive_dict[msg.node_id] = msg.alive
class Environment():
def __init__(self):
self.env_publisher = EnvironmentPublisher()
self.env_uavcan = EnvironmentUavcan(self.env_publisher.uavcan_can_interface,\
self.env_publisher.uavcan_mtu,\
self.env_publisher.uavcan_local_node_id,\
self.env_publisher.uavcan_local_node_name,\
self.env_publisher.uavcan_sub_name,\
self.env_publisher.uavcan_v_major,\
self.env_publisher.uavcan_v_minor,\
self.env_publisher.uavcan_register_file_path,\
self.env_publisher.uavcan_register_sub_key_name,\
self.env_publisher.uavcan_register_sub_value,\
self.env_publisher.uavcan_register_bitrate_key_name,\
self.env_publisher.uavcan_register_bitrate_value)
self.prev_time = 0
self.executor = MultiThreadedExecutor()
self.executor.add_node(self.env_publisher)
async def loop(self):
while 1:
time_now = time.time_ns()
await asyncio.sleep(0.0001)
if (time_now - self.prev_time) > self.env_publisher.publish_period:
self.prev_time = time_now
for key in self.env_publisher.id_envdata_dict:
self.env_publisher.id_envdata_dict[key] = self.env_uavcan.id_envdata_dict[key]
self.env_publisher.env_publisher_timer_callback()
self.executor.spin_once(timeout_sec = 0.0)
async def main():
env = Environment()
try:
task1 = asyncio.create_task(env.env_uavcan.read_env_msg())
task2 = asyncio.create_task(env.loop())
await task1
await task2
finally:
env.env_uavcan.close()
if __name__ == '__main__':
rclpy.init()
asyncio.run(main())
rclpy.shutdown()
Headlight interface
#!/usr/bin/env python3
import asyncio
import sys
import struct
import getmac
import rclpy
from rclpy.node import Node
from mk4_msgs.srv import Headlight as HeadlightSrv
from mk4_msgs.msg import Heartbeat
from rclpy.executors import MultiThreadedExecutor
compiled_dsdl_dir = "/home/pi/.yakut"
sys.path.insert(0,str(compiled_dsdl_dir))
import uavcan.node # noqa
import uavcan.primitive.scalar
from pyuavcan.transport.can.media.socketcan import SocketCANMedia
from pyuavcan.transport.can import CANTransport
from pyuavcan.presentation import Presentation
from pyuavcan.application._node_factory import SimpleNode
from pyuavcan.application._registry_factory import make_registry
from pyuavcan.application import register
class HeadlightUavcanInterface():
RESPONSE_SUCCESS = 0
RESPONSE_FAILURE = 1
RESPONSE_NOT_AUTHERIZED = 2
RESPONSE_BAD_COMMAND = 3
RESPONSE_BAD_PARAMETER = 4
RESPONSE_BAD_STATE = 5
RESPONSE_INTERNAL_ERROR = 6
RESPONSE_NO_RESPONSE = -1
def __init__(self, can_interface, mtu, local_node_id, local_node_name, \
v_major, v_minor, register_file, reg_bit_key, reg_bit_value, \
execute_command_identifier, publisher_port_name, reg_pub_key, reg_pub_value):
mac_addr = getmac.get_mac_address().split(':')
mac_addr_int = []
for x in range(len(mac_addr)):
mac_addr_int.append(int(mac_addr[x],16)) ##converts string to int
unique_id_padding = [0] * (16-len(mac_addr_int)) ## 16 is used here as the unique id is at uint8[16] array
## According to their documentation the unique id should be globally unique to each node, as we will have
## multiple nodes on the ccu, the local node id is added to the unique id for the node. This is added in
## with a byte gap from the mac address so if/when debugging it is easy to decipher the two
unique_id_padding[16-len(mac_addr_int)-2] = local_node_id
unique_id = unique_id_padding + mac_addr_int
uavcan_node_info = uavcan.node.GetInfo_1.Response(
software_version=uavcan.node.Version_1(major=v_major, minor=v_minor),
name=local_node_name, unique_id=unique_id,protocol_version=uavcan.node.Version_1(major=1, minor=0))
registry = make_registry(register_file=register_file)
registry.setdefault(reg_pub_key,register.Value(integer16=register.Integer16(reg_pub_value)))
registry.setdefault(reg_bit_key, register.Value(string=register.String(reg_bit_value)))
media = SocketCANMedia(can_interface, mtu=mtu)
transport = CANTransport(media, local_node_id=local_node_id)
presentation = Presentation(transport)
self.headlight_uavcan_node = SimpleNode(presentation,uavcan_node_info, registry)
self.headlight_activate_publisher = self.headlight_uavcan_node.make_publisher(\
uavcan.primitive.scalar.Bit_1_0, publisher_port_name)
self.id_headlightclient_dict = {}
self.id_name_dict = {}
self.id_command_dict = {}
self.send_activate_headlight = False
self.activate_headlight = False
self.byte_array_to_send = bytearray()
self.uavcan_response = None
self.send_req = False
self.uavcan_execute_command_identifier = execute_command_identifier
self.headlight_uavcan_node.start()
async def async_execute_command(self):
while 1:
await asyncio.sleep(0.0001)
for key in self.id_headlightclient_dict:
if self.id_headlightclient_dict[key] is None:
self.id_headlightclient_dict[key] = self.headlight_uavcan_node.make_client(\
uavcan.node.ExecuteCommand_1_1, key, self.id_name_dict[key] + '.headlight')
if self.id_command_dict[key] is not None and self.send_req:
req = uavcan.node.ExecuteCommand_1.Request()
req.command= self.uavcan_execute_command_identifier
req.parameter = self.id_command_dict[key]
attempts = 0
while attempts < 3:
try:
response, metadata = await(self.id_headlightclient_dict[key].call(req))
self.uavcan_response = response.status
break
except:
attempts += 1
self.uavcan_response = -1
self.send_req = False
async def async_activate_headlight_publisher(self):
while 1:
await asyncio.sleep(0.0001)
if self.send_activate_headlight:
await self.headlight_activate_publisher.publish(uavcan.primitive.scalar.Bit_1_0(self.activate_headlight))
self.send_activate_headlight = False
def close(self):
self.headlight_uavcan_node.close()
class HeadlightRosInterface(Node):
def __init__(self):
super().__init__('headlight_uavcan_srv')
self.get_logger().info("Initialising headlight_uavcan_srv node")
self.declare_parameter('uavcan.local_node_name', 'rossrobotics.mk4.ccu.bumper')
self.uavcan_local_node_name = self.get_parameter('uavcan.local_node_name').value
self.declare_parameter('uavcan.bumper_node_name', 'rossrobotics.mk4.bumper')
self.uavcan_bumper_node_name = self.get_parameter('uavcan.bumper_node_name').value
self.declare_parameter('uavcan.publisher_port_name', 'activate_headlights')
self.uavcan_pub_name = self.get_parameter('uavcan.publisher_port_name').value
self.declare_parameter('uavcan.can_interface', 'can0')
self.uavcan_can_interface = self.get_parameter('uavcan.can_interface').value
self.declare_parameter('uavcan.mtu', 8)
self.uavcan_mtu = self.get_parameter('uavcan.mtu').value
self.declare_parameter('uavcan.local_node_id', 4)
self.uavcan_local_node_id = self.get_parameter('uavcan.local_node_id').value
self.declare_parameter('uavcan.version.major', 1)
self.uavcan_v_major = self.get_parameter('uavcan.version.major').value
self.declare_parameter('uavcan.version.minor', 0)
self.uavcan_v_minor = self.get_parameter('uavcan.version.minor').value
self.declare_parameter('uavcan.register.file_path', '/home/pi/headlight_interface.db')
self.uavcan_register_file_path = self.get_parameter('uavcan.register.file_path').value
self.declare_parameter('uavcan.register.publisher.key_name', 'uavcan.pub.activate_headlights.id')
self.uavcan_register_pub_key_name = self.get_parameter('uavcan.register.publisher.key_name').value
self.declare_parameter('uavcan.register.publisher.value', 1639)
self.uavcan_register_pub_value = self.get_parameter('uavcan.register.publisher.value').value
self.declare_parameter('uavcan.register.bitrate.key_name', 'uavcan.can.bitrate')
self.uavcan_register_bitrate_key_name = self.get_parameter('uavcan.register.bitrate.key_name').value
self.declare_parameter('uavcan.register.bitrate.value', '1000000 1000000')
self.uavcan_register_bitrate_value = self.get_parameter('uavcan.register.bitrate.value').value
self.declare_parameter('uavcan.execute_command_identifier', 18)
self.uavcan_execute_command_identifier = self.get_parameter('uavcan.execute_command_identifier').value
self.declare_parameter('ros.heartbeat_topic_name', 'uavcan_node_tracker')
ros_heartbeat_topic_name = self.get_parameter('ros.heartbeat_topic_name').value
self.declare_parameter('ros.service_name','rossrobotics_mk4_bumper/headlight')
ros_service_name = self.get_parameter('ros.service_name').value
self.ros_heartbeat_subscriber = self.create_subscription(Heartbeat, ros_heartbeat_topic_name, self.node_tracker_callback, 5)
self.ros_headlight_service = self.create_service(HeadlightSrv, ros_service_name, self.headlight_srv_callback)
self.id_name_dict = {}
self.id_alive_dict = {}
self.id_command_dict = {}
self.headlight_uavcan_interface = HeadlightUavcanInterface.__new__(HeadlightUavcanInterface)
self.uavcan_node_alive = False
self.get_logger().info("headlight_uavcan_srv node initialised")
def node_tracker_callback(self, msg):
if self.uavcan_bumper_node_name in msg.header.frame_id:
if msg.node_id not in self.id_name_dict:
self.id_alive_dict[msg.node_id] = msg.alive
self.id_name_dict[msg.node_id] = msg.header.frame_id
else:
self.id_alive_dict[msg.node_id] = msg.alive
def headlight_srv_callback(self, request, response):
headlight_to_call = request.headlight
mode_byte = request.mode.to_bytes(1,byteorder='big')
ints_byte_array = bytearray(struct.pack("f", request.intensity))
headlight_to_call_ids = []
response_message = ""
if len(self.id_name_dict) != len(headlight_to_call):
response_message = response_message + 'The requested number of headlights have not been found on the CAN bus. Requested: ' \
+ str(len(headlight_to_call)) + ', Found: ' \
+ str(len(self.id_name_dict)) + ' on the CAN bus. '
for key in self.id_name_dict:
for headlight in headlight_to_call:
if headlight in self.id_name_dict[key]:
headlight_to_call_ids.append(key)
self.headlight_uavcan_interface.activate_headlight = False
self.headlight_uavcan_interface.send_activate_headlight = True
for id in headlight_to_call_ids:
if self.id_alive_dict[id]:
self.headlight_uavcan_interface.id_command_dict[id] = mode_byte + ints_byte_array
self.headlight_uavcan_interface.send_req = True
while self.headlight_uavcan_interface.uavcan_response == None:
pass
if self.headlight_uavcan_interface.uavcan_response == self.headlight_uavcan_interface.RESPONSE_SUCCESS:
response.success = True
response_message = response_message + 'Uavcan node: ' + self.id_name_dict[id] + ', ID: ' \
+ str(id) + \
'. Successfully set. '
elif self.headlight_uavcan_interface.uavcan_response == self.headlight_uavcan_interface.RESPONSE_FAILURE:
response.success = False
response_message = response_message + 'Uavcan node: ' + self.id_name_dict[id] + ', ID: ' \
+ str(id) + \
'. Headlight is disconnected or malfunctioning. '
elif self.headlight_uavcan_interface.uavcan_response == self.headlight_uavcan_interface.RESPONSE_BAD_PARAMETER:
response.success = False
response_message = response_message + 'Uavcan node: ' + self.id_name_dict[id] + ', ID: ' \
+ str(id) + \
'. Not successful. Mode selected not available. '
elif self.headlight_uavcan_interface.uavcan_response == self.headlight_uavcan_interface.RESPONSE_NO_RESPONSE:
response.success = False
response_message = response_message + 'Uavcan node: ' + self.id_name_dict[id] + ', ID: ' \
+ str(id) + \
'. Not sucessful. No response. '
self.headlight_uavcan_interface.uavcan_response = None
self.headlight_uavcan_interface.id_command_dict[id] = None
else:
response.success = False
response_message = response_message + 'Uavcan node: ' + self.id_name_dict[id] + ', ID: ' \
+ str(id) + \
'. Not alive. '
self.headlight_uavcan_interface.activate_headlight = True
self.headlight_uavcan_interface.send_activate_headlight = True
if len(self.id_name_dict) != len(headlight_to_call):
response.success = False
response.message = response_message
return response
class Headlight():
def __init__(self):
self.ros_headlight_interface = HeadlightRosInterface()
self.uavcan_headlight_interface = HeadlightUavcanInterface(\
self.ros_headlight_interface.uavcan_can_interface,\
self.ros_headlight_interface.uavcan_mtu, \
self.ros_headlight_interface.uavcan_local_node_id, \
self.ros_headlight_interface.uavcan_local_node_name, \
self.ros_headlight_interface.uavcan_v_major, \
self.ros_headlight_interface.uavcan_v_minor, \
self.ros_headlight_interface.uavcan_register_file_path,\
self.ros_headlight_interface.uavcan_register_bitrate_key_name,\
self.ros_headlight_interface.uavcan_register_bitrate_value, \
self.ros_headlight_interface.uavcan_execute_command_identifier, \
self.ros_headlight_interface.uavcan_pub_name, \
self.ros_headlight_interface.uavcan_register_pub_key_name, \
self.ros_headlight_interface.uavcan_register_pub_value)
self.ros_headlight_interface.headlight_uavcan_interface = self.uavcan_headlight_interface
self.executor = MultiThreadedExecutor()
self.executor.add_node(self.ros_headlight_interface)
async def loop(self):
while 1:
await asyncio.sleep(0.0001)
for key in self.ros_headlight_interface.id_name_dict:
if key not in self.uavcan_headlight_interface.id_name_dict:
self.uavcan_headlight_interface.id_name_dict[key] = self.ros_headlight_interface.id_name_dict[key]
self.uavcan_headlight_interface.id_headlightclient_dict[key] = None
self.uavcan_headlight_interface.id_command_dict[key] = None
self.executor.spin_once(timeout_sec = 0.0)
async def main():
headlight = Headlight()
try:
task1 = asyncio.create_task(headlight.uavcan_headlight_interface.async_execute_command())
task2 = asyncio.create_task(headlight.loop())
task3 = asyncio.create_task(headlight.uavcan_headlight_interface.async_activate_headlight_publisher())
await task1
await task2
await task3
finally:
headlight.uavcan_headlight_interface.close()
pass
if __name__ == '__main__':
rclpy.init()
asyncio.run(main())
rclpy.shutdown()