Pytest results for Python-CAN

Hello, as discussed, here is some logs for pytest running for Python-CAN implementation.
Pytest was started from \tests\transport\can

WARNING Executing <Task pending name='Task-60' coro=<_unittest_can_pythoncan() running at E:\Work\pyuavcan-master\tests\transport\can\media\> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001D0841DB580>()] created at c:\users\alex\appdata\local\programs\python\python38\lib\asyncio\> cb=[_run_until_complete_cb() at c:\users\alex\appdata\local\programs\python\python38\lib\asyncio\] created at C:\Users\Alex\AppData\Local\Programs\Python\Python38\lib\site-packages\pytest_asyncio\> took 0.562 seconds

If assert is wrong, right result is displayed, otherwise pytest hangs here.

Here is the implementation of the test script:

@pytest.mark.asyncio    # type: ignore
async def _unittest_can_pythoncan() -> None:
    from pyuavcan.transport import Timestamp
    from import TimestampedDataFrame, DataFrame, FrameFormat, FilterConfiguration

    from import PythonCANMedia
    available = PythonCANMedia.list_available_interface_names()
#    print('Available SocketCAN ifaces:', available)
#    assert 'vcan0' in available, \
#        'Either the interface listing method is not working or the environment is not configured correctly. ' \
#        'Please ensure that the virtual SocketCAN interface "vcan0" is available, and its MTU is set to 64+8.'

    media_a = PythonCANMedia('virtual:0', 500000, 8)
    media_b = PythonCANMedia('virtual:0', 500000, 8)

    assert media_a.mtu == 8
    assert media_b.mtu == 8
    assert media_a.interface_name == 'virtual:0'
    assert media_b.interface_name == 'virtual:0'
    assert media_a.number_of_acceptance_filters == media_b.number_of_acceptance_filters
    assert media_a._maybe_thread is None
    assert media_b._maybe_thread is None


    rx_a: typing.List[TimestampedDataFrame] = []

    def on_rx_a(frames: typing.Iterable[TimestampedDataFrame]) -> None:
        nonlocal rx_a
        frames = list(frames)
        print('RX A:', frames)
        rx_a += frames

    def on_rx_b(frames: typing.Iterable[TimestampedDataFrame]) -> None:
        frames = list(frames)
        print('RX B:', frames)
        #asyncio.ensure_future(media_b.send_until(frames, asyncio.get_event_loop().time() + 1.0))

    media_a.start(on_rx_a, False)
    media_b.start(on_rx_b, False)

    assert media_a._maybe_thread is not None
    assert media_b._maybe_thread is not None

    await asyncio.sleep(2.0)    # This wait is needed to ensure that the RX thread handles select() timeout properly

    ts_begin =
    await media_b.send_until([
    ], asyncio.get_event_loop().time() + 1.0)
    await asyncio.sleep(0.1)
    ts_end =

    print('rx_a:', rx_a)
    # Three sent back from the other end, two loopback
    assert len(rx_a) == 5


The right result is 3 packets, it hangs with his value. If the result is wrong (3 <> 5), it stops with failed result but displays this message anyway.

To squelch the useless warning, put this near the beginning of your test function:

asyncio.get_running_loop().slow_callback_duration = 1.0

If you read the message, it is merely saying that your coroutine is taking too long to execute, which is to be expected considering that you execute complex initialization in it.

I can’t reproduce the issue locally. Please push your code to your branch and let me know when I can test it. Also, please rebase your branch to the latest master.