Hello, as discussed, here is some logs for pytest running for Python-CAN implementation.
Pytest was started from \tests\transport\can
WARNING asyncio:base_events.py:1854 Executing <Task pending name='Task-60' coro=<_unittest_can_pythoncan() running at E:\Work\pyuavcan-master\tests\transport\can\media\_pythoncan.py:63> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001D0841DB580>()] created at c:\users\alex\appdata\local\programs\python\python38\lib\asyncio\base_events.py:422> cb=[_run_until_complete_cb() at c:\users\alex\appdata\local\programs\python\python38\lib\asyncio\base_events.py:184] created at C:\Users\Alex\AppData\Local\Programs\Python\Python38\lib\site-packages\pytest_asyncio\plugin.py:157> took 0.562 seconds
If assert is wrong, right result is displayed, otherwise pytest hangs here.
Here is the implementation of the test script:
@pytest.mark.asyncio # type: ignore
async def _unittest_can_pythoncan() -> None:
from pyuavcan.transport import Timestamp
from pyuavcan.transport.can.media import TimestampedDataFrame, DataFrame, FrameFormat, FilterConfiguration
from pyuavcan.transport.can.media.pythoncan import PythonCANMedia
available = PythonCANMedia.list_available_interface_names()
# print('Available SocketCAN ifaces:', available)
# assert 'vcan0' in available, \
# 'Either the interface listing method is not working or the environment is not configured correctly. ' \
# 'Please ensure that the virtual SocketCAN interface "vcan0" is available, and its MTU is set to 64+8.'
media_a = PythonCANMedia('virtual:0', 500000, 8)
media_b = PythonCANMedia('virtual:0', 500000, 8)
assert media_a.mtu == 8
assert media_b.mtu == 8
assert media_a.interface_name == 'virtual:0'
assert media_b.interface_name == 'virtual:0'
assert media_a.number_of_acceptance_filters == media_b.number_of_acceptance_filters
assert media_a._maybe_thread is None
assert media_b._maybe_thread is None
#media_a.configure_acceptance_filters([FilterConfiguration.new_promiscuous()])
#media_b.configure_acceptance_filters([FilterConfiguration.new_promiscuous()])
rx_a: typing.List[TimestampedDataFrame] = []
def on_rx_a(frames: typing.Iterable[TimestampedDataFrame]) -> None:
nonlocal rx_a
frames = list(frames)
print('RX A:', frames)
rx_a += frames
def on_rx_b(frames: typing.Iterable[TimestampedDataFrame]) -> None:
frames = list(frames)
print('RX B:', frames)
#asyncio.ensure_future(media_b.send_until(frames, asyncio.get_event_loop().time() + 1.0))
media_a.start(on_rx_a, False)
media_b.start(on_rx_b, False)
assert media_a._maybe_thread is not None
assert media_b._maybe_thread is not None
await asyncio.sleep(2.0) # This wait is needed to ensure that the RX thread handles select() timeout properly
ts_begin = Timestamp.now()
await media_b.send_until([
DataFrame(identifier=0xbadc0fe,
data=bytearray(range(8)),
format=FrameFormat.EXTENDED,
loopback=True),
DataFrame(identifier=0x12345678,
data=bytearray(range(0)),
format=FrameFormat.EXTENDED,
loopback=False),
DataFrame(identifier=0x123,
data=bytearray(range(6)),
format=FrameFormat.BASE,
loopback=True),
], asyncio.get_event_loop().time() + 1.0)
await asyncio.sleep(0.1)
ts_end = Timestamp.now()
print('rx_a:', rx_a)
# Three sent back from the other end, two loopback
assert len(rx_a) == 5
media_a.close()
media_b.close()
The right result is 3 packets, it hangs with his value. If the result is wrong (3 <> 5), it stops with failed result but displays this message anyway.