Main API Reference

This page documents the main classes and functions available in the top-level capture2go namespace.

Device Classes

class capture2go.AbstractDevice[source]

Bases: object

Base class that represents a single IMU device (BLE or USB).

This class can be used to send packages to the device and receive packages. Callbacks can be registered for state changes and for received packages.

Standard async usage for receiving packages:

await imu.connect()
await imu.init()
await imu.send(...)
async for package in imu:
    print(package)

# or:
package = await imu.apoll()  # waits until a package is available
print(package)

Non-blocking usage:

package = imu.poll()
if package is not None:
    print(package)

# or:
while (package := imu.poll()) is not None:
    print(package)
name: str

The device name (e.g., IMU_ab1234), might be an empty string until initialized.

state: Literal['disconnected', 'connecting', 'connected']

The current state of the device connection. (DeviceState)

status: DataStatus | None

Provides access to the last received status package from the device.

deviceInfo: DataDeviceInfo | None

Provides access to the last received device info package from the device.

async connect()[source]

Opens a connection to the device.

async disconnect()[source]

Closes the connection to the device.

async init(setTime=False, abortRecording=False, abortStreaming=False)[source]

Performs initial communication with the device to ensure a consistent state.

This function should be called immediately after AbstractDevice.connect(). It will send a pkg.CmdGetDeviceInfo package and wait for the response as well as the initial pkg.DataStatus package. Depending on the arguments, it will perform additional initialization steps.

Parameters:
  • setTime (bool, optional) – Set the sensor clock based on the current system time. Defaults to False.

  • abortRecording (bool, optional) – Aborts any ongoing recording. Defaults to False, in which case DeviceIsRecording is raised if the device is recording.

  • abortStreaming (bool, optional) – Aborts any ongoing streaming and clears the send buffer. Defaults to False, in which case DeviceIsStreaming is raised if the device is streaming.

async send(package)[source]

Sends a package to the device.

Parameters:

package (pkg.AbstractPackage) – The package to be sent.

async sendAndAwaitAck(package, ackCls, timeout=3.0)[source]

Sends a package to the device and waits for the acknowledgement (or an error or a timeout).

The return value is either the expected acknowledgement package or a SensorError package that refers to the sent package.

Raises a TimeoutError if the package was not received within timeout seconds.

Parameters:
  • package (pkg.AbstractPackage) – The package to be sent.

  • ackCls (type[T]) – The class of the expected acknowledgement package.

  • timeout (float, optional) – Maximum time in seconds to wait for the acknowledgement. Defaults to 3.

Return type:

T | SensorError

addStateListener(listener)[source]

Registers a callback function that is called when the connection state changes.

Parameters:

listener (StateListener) – A function that gets called with the device and the new state as parameters.

removeStateListener(listener)[source]

Unregisters a callback function that is called when the connection state changes.

Parameters:

listener (StateListener) – A callback function that was previously registered as a state listener.

addDataWithRtListener(listener)[source]

Registers a callback that is invoked when raw data (including RT packages) is received.

The callback is called before real-time (RT) packages are extracted from the incoming BLE chunk. This is useful if you need access to the unmodified BLE payload.

Parameters:

listener (DataListener) – A function that gets called with the device, the raw data chunk (including RT packages), and an optional receive timestamp.

removeDataWithRtListener(listener)[source]

Unregisters a previously registered raw data (with RT) listener.

Parameters:

listener (DataListener) – A callback function that was previously registered as a data (with RT) listener.

addDataListener(listener)[source]

Registers a callback that is invoked when data is received (after RT extraction).

The callback is called after real-time (RT) packages have been removed (if present) and before the data is fed to the unpacker. Use this to observe the stream that is parsed into packages.

Parameters:

listener (DataListener) – A function that gets called with the device, the data chunk after RT extraction, and an optional receive timestamp.

removeDataListener(listener)[source]

Unregisters a previously registered data listener.

Parameters:

listener (DataListener) – A callback function that was previously registered as a data listener.

addPackageListener(listener)[source]

Registers a callback that is invoked for each extracted package.

The callback is called for every package produced by the unpacker. It is invoked before the package is enqueued for iteration/polling.

Parameters:

listener (PackageListener) – A function that gets called with the device, the package, and an optional receive timestamp.

removePackageListener(listener)[source]

Unregisters a previously registered package listener.

Parameters:

listener (PackageListener) – A callback function that was previously registered as a package listener.

poll()[source]

Polls the device for received packages. Returns either a package or None.

async apoll()[source]

Asynchronous poll method that returns the next package. If no package is available, the method waits without blocking the event loop.

class capture2go.BleDevice(device, rssi=0)[source]

Bases: AbstractDevice

Represents a single BLE IMU device.

Usually created by capture2go.connect() or capture2go.BleScanner.

Parameters:
  • device (BLEDevice) – The BLE device instance.

  • rssi (int, optional) – Initial RSSI value. Defaults to 0.

device: BLEDevice

The underlying bleak BLEDevice instance.

rssi: int

The most recently observed RSSI (signal strength) for this device (updated only while scanning).

class capture2go.UsbDevice(device, baud=2147483647)[source]

Bases: AbstractDevice

Represents a USB-connected IMU device.

Parameters:
  • device (str) – Path to the serial device (e.g., /dev/tty.usbmodem... or COM1 on Windows).

  • baud (int, optional) – Baud rate for the serial connection. Defaults to 2147483647.

class capture2go.FilePlaybackDevice(filename)[source]

Bases: AbstractDevice

Device that replays data from a file as if it were a live IMU device.

This is useful for development and testing, allowing you to execute processing code without interacting with real hardware.

Limitations:
  • Stored packages are played back without any delay, so this will not work for timing-sensitive code.

  • Any packages sent to the device (e.g., via send) will be ignored.

Parameters:

filename (str) – Path to the binary file containing recorded IMU data to play back.

Scanning and Connecting

class capture2go.BleScanner[source]

Bases: object

BLE device scanner for IMU sensors.

Usage example (see also capture2go.connect()):

async def findDevice(name: str):
    scanner = BleScanner()
    async for found in scanner.scan():
        print(f'Discovered devices: {found}.')
        if name in found:
            return found[name]
async scan()[source]

Asynchronously scan for BLE IMU devices.

Yields:

dict[str, BleDevice] – A dictionary mapping device names to BleDevice objects.

Return type:

AsyncGenerator[dict[str, BleDevice], None]

This is an async generator that yields the current set of discovered devices every second or when a new device is found or updated.

async capture2go.connect(names)[source]

Connect to one or more IMU devices by name.

Pass device names like IMU_1234b to connect to a sensor over BLE.

To connect to a single device over USB, use usb as the device name. This will throw a RuntimeError if multiple USB devices are found.

To connect to multiple or specific USB devices, pass device names (like /dev/tty.usbmodem* or COM1 on Windows).

If BLE devices names are passed, a Bluetooth scan is started and connection is initiated once all devices are discovered.

Parameters:

names (list[str]) – List of device names (e.g., [‘IMU_1234ab’, ‘usb’, …]).

Returns:

List of connected AbstractDevice instances (e.g., BleDevice, UsbDevice).

Return type:

list[AbstractDevice]

Raises:

RuntimeError – If no matching USB device is found or multiple USB devices are found.

Parsing

class capture2go.Unpacker(f=None, ignoreInitialGarbage=False)[source]

Bases: object

Unpacks and parses the binary IMU protocol data stream into package objects.

This class can be used to feed raw bytes (from a file, serial port, or BLE) and iterate over parsed packages.

Parameters:
  • f (file-like, optional) – File-like object to read data from. If None, data must be fed manually.

  • ignoreInitialGarbage (bool, optional) – If True, ignore bytes until a valid frame is found. This can be useful if file parsing is started in the middle of a byte stream and initial invalid data is to be expected. Defaults to False.

Example:

unpacker = Unpacker()
unpacker.feed(receivedData)
for package in unpacker:
    print(package)
feed(data)[source]

Add new binary data to the internal buffer for parsing.

Parameters:

data (bytes | bytearray) – Raw bytes to add to the buffer.

clear()[source]

Clear the internal buffer.

extractRtPackages(data, receiveTimestamp=None)[source]

Extract and parse real-time (RT) packages from data received over BLE.

Every raw package sent over BLE contains a byte indicating the count of RT packages, followed by the RT packages and then by the data stream. When receiving this data, the RT packages first have to be extracted using this method. Then, the remaining data chunk can be passed to feed().

The parsed packages are stored in this class and returned first when iterating over the data.

Parameters:
  • data (bytes | bytearray) – Raw bytes containing RT packages.

  • receiveTimestamp (int, optional) – Timestamp (in nanoseconds, from time.time_ns()) to be used when DataClockRoundtrip packages are received. Defaults to None.

Raises:

RuntimeError – If a package cannot be parsed or CRC check fails.

Returns:

Remaining data after RT packages have been extracted.

Return type:

bytes | bytearray

capture2go.loadBinaryFile(filename)[source]

Load and parse a binary Capture2Go recording file into NumPy arrays.

This function reads a binary file (optionally gzip-compressed), unpacks all packages, and organizes them by package type. Each package type is converted to a dictionary of NumPy arrays, with one array per field.

Parameters:

filename (str | Path) – Path to the binary recording file. Can be a string or pathlib.Path object. Files with a .gz extension are automatically decompressed.

Returns:

Nested dictionary where the outer key is the package class name (e.g., DataFullPacked200Hz) and the value is a dictionary mapping field names to NumPy arrays containing all values for that field.

Return type:

dict[str, dict[str, ndarray]]

Exceptions

class capture2go.DeviceIsRecording[source]

Raised by AbstractDevice.init() if the device is currently recording and the parameter abortRecording was not set to True.

class capture2go.DeviceIsStreaming[source]

Raised by AbstractDevice.init() if the device is currently streaming and the parameter abortStreaming was not set to True.

Type Aliases

capture2go.DeviceState

alias of Literal[‘disconnected’, ‘connecting’, ‘connected’]

capture2go.StateListener

alias of Callable[[AbstractDevice, Literal[‘disconnected’, ‘connecting’, ‘connected’]], Any]

capture2go.DataListener

alias of Callable[[AbstractDevice, bytes | bytearray, int | None], Any]

capture2go.PackageListener

alias of Callable[[AbstractDevice, AbstractPackage, int | None], Any]