Source code for capture2go.usb

# SPDX-FileCopyrightText: 2025 SensorStim Neurotechnology GmbH <support@capture2go.com>
#
# SPDX-License-Identifier: MIT

import asyncio
import threading
import time
import serial

from .device import AbstractDevice


def _readSerial(ser: serial.Serial, loop: asyncio.AbstractEventLoop, device: 'UsbDevice'):
    """
    This function is running in a thread to continuously read data from the serial port.
    Without using threads, the receive buffer might fill up easily when data processing takes too long.
    """
    try:
        while True:
            waiting = ser.in_waiting
            data = ser.read(waiting if waiting >= 16 else 256)
            if data:
                loop.call_soon_threadsafe(device._onData, time.time_ns(), data)
    except (serial.SerialException, OSError, TypeError):
        loop.call_soon_threadsafe(device._onData, time.time_ns(), None)  # Disconnect sentinel.


[docs] class UsbDevice(AbstractDevice): """ Represents a USB-connected IMU device. Args: device (str): Path to the serial device (e.g., ``/dev/tty.usbmodem...`` or ``COM1`` on Windows). baud (int, optional): Baud rate for the serial connection. Defaults to 2147483647. """ def __init__(self, device: str, baud=2147483647): super().__init__() self._device = device self._baud = baud self._ser = None self._thread = None async def connect(self): assert self._ser is None, 'must be disconnected' self._ser = serial.Serial(self._device, self._baud, timeout=0.01) self._ser.reset_input_buffer() self._thread = threading.Thread(target=_readSerial, args=(self._ser, asyncio.get_running_loop(), self)) self._thread.daemon = True self._thread.start() self._deviceInfoReceived.clear() self._statusReceived.clear() self.state = 'connected' for listener in self._stateListeners: listener(self, 'connected') # The connect sentinel ensures that the queue is not empty if a disconnect sentinel is found. self._queue.put_nowait(self._connectSentinel) async def disconnect(self): self._disconnect() async def send(self, package): assert self._ser is not None self._ser.write(package.pack()) self._ser.flush() def _disconnect(self): if self.state == 'disconnected': return assert self._ser is not None assert self._thread is not None self._ser.close() del self._ser self._ser = None self._thread.join() self._thread = None self.state = 'disconnected' for listener in self._stateListeners: listener(self, 'disconnected') self._queue.put_nowait(self._disconnectSentinel) def _onData(self, timestamp: int, data: bytes | None): if data is None: if self.state != 'disconnected': self._disconnect() return self._feed(data, timestamp, extractRtPackages=False)