Main API Reference
This page documents the main classes and functions available in the top-level capture2go namespace.
Device Classes
- class capture2go.AbstractDevice[source]
Bases:
objectBase class that represents a single IMU device (BLE or USB).
This class can be used to send packages to the device and receive packages. Callbacks can be registered for state changes and for received packages.
Standard async usage for receiving packages:
await imu.connect() await imu.init() await imu.send(...) async for package in imu: print(package) # or: package = await imu.apoll() # waits until a package is available print(package)
Non-blocking usage:
package = imu.poll() if package is not None: print(package) # or: while (package := imu.poll()) is not None: print(package)
- state: Literal['disconnected', 'connecting', 'connected']
The current state of the device connection. (
DeviceState)
- status: DataStatus | None
Provides access to the last received status package from the device.
- deviceInfo: DataDeviceInfo | None
Provides access to the last received device info package from the device.
- async init(setTime=False, abortRecording=False, abortStreaming=False)[source]
Performs initial communication with the device to ensure a consistent state.
This function should be called immediately after
AbstractDevice.connect(). It will send apkg.CmdGetDeviceInfopackage and wait for the response as well as the initialpkg.DataStatuspackage. Depending on the arguments, it will perform additional initialization steps.- Parameters:
setTime (bool, optional) – Set the sensor clock based on the current system time. Defaults to False.
abortRecording (bool, optional) – Aborts any ongoing recording. Defaults to False, in which case
DeviceIsRecordingis raised if the device is recording.abortStreaming (bool, optional) – Aborts any ongoing streaming and clears the send buffer. Defaults to False, in which case
DeviceIsStreamingis raised if the device is streaming.
- async send(package)[source]
Sends a package to the device.
- Parameters:
package (pkg.AbstractPackage) – The package to be sent.
- async sendAndAwaitAck(package, ackCls, timeout=3.0)[source]
Sends a package to the device and waits for the acknowledgement (or an error or a timeout).
The return value is either the expected acknowledgement package or a
SensorErrorpackage that refers to the sent package.Raises a
TimeoutErrorif the package was not received withintimeoutseconds.- Parameters:
package (pkg.AbstractPackage) – The package to be sent.
ackCls (type[T]) – The class of the expected acknowledgement package.
timeout (float, optional) – Maximum time in seconds to wait for the acknowledgement. Defaults to 3.
- Return type:
T | SensorError
- addStateListener(listener)[source]
Registers a callback function that is called when the connection state changes.
- Parameters:
listener (
StateListener) – A function that gets called with the device and the new state as parameters.
- removeStateListener(listener)[source]
Unregisters a callback function that is called when the connection state changes.
- Parameters:
listener (
StateListener) – A callback function that was previously registered as a state listener.
- addDataWithRtListener(listener)[source]
Registers a callback that is invoked when raw data (including RT packages) is received.
The callback is called before real-time (RT) packages are extracted from the incoming BLE chunk. This is useful if you need access to the unmodified BLE payload.
- Parameters:
listener (
DataListener) – A function that gets called with the device, the raw data chunk (including RT packages), and an optional receive timestamp.
- removeDataWithRtListener(listener)[source]
Unregisters a previously registered raw data (with RT) listener.
- Parameters:
listener (
DataListener) – A callback function that was previously registered as a data (with RT) listener.
- addDataListener(listener)[source]
Registers a callback that is invoked when data is received (after RT extraction).
The callback is called after real-time (RT) packages have been removed (if present) and before the data is fed to the unpacker. Use this to observe the stream that is parsed into packages.
- Parameters:
listener (
DataListener) – A function that gets called with the device, the data chunk after RT extraction, and an optional receive timestamp.
- removeDataListener(listener)[source]
Unregisters a previously registered data listener.
- Parameters:
listener (
DataListener) – A callback function that was previously registered as a data listener.
- addPackageListener(listener)[source]
Registers a callback that is invoked for each extracted package.
The callback is called for every package produced by the unpacker. It is invoked before the package is enqueued for iteration/polling.
- Parameters:
listener (
PackageListener) – A function that gets called with the device, the package, and an optional receive timestamp.
- removePackageListener(listener)[source]
Unregisters a previously registered package listener.
- Parameters:
listener (
PackageListener) – A callback function that was previously registered as a package listener.
- class capture2go.BleDevice(device, rssi=0)[source]
Bases:
AbstractDeviceRepresents a single BLE IMU device.
Usually created by
capture2go.connect()orcapture2go.BleScanner.- Parameters:
- class capture2go.UsbDevice(device, baud=2147483647)[source]
Bases:
AbstractDeviceRepresents a USB-connected IMU device.
- class capture2go.FilePlaybackDevice(filename)[source]
Bases:
AbstractDeviceDevice that replays data from a file as if it were a live IMU device.
This is useful for development and testing, allowing you to execute processing code without interacting with real hardware.
- Limitations:
Stored packages are played back without any delay, so this will not work for timing-sensitive code.
Any packages sent to the device (e.g., via
send) will be ignored.
- Parameters:
filename (str) – Path to the binary file containing recorded IMU data to play back.
Scanning and Connecting
- class capture2go.BleScanner[source]
Bases:
objectBLE device scanner for IMU sensors.
Usage example (see also
capture2go.connect()):async def findDevice(name: str): scanner = BleScanner() async for found in scanner.scan(): print(f'Discovered devices: {found}.') if name in found: return found[name]
- async scan()[source]
Asynchronously scan for BLE IMU devices.
- Yields:
dict[str, BleDevice] – A dictionary mapping device names to
BleDeviceobjects.- Return type:
AsyncGenerator[dict[str, BleDevice], None]
This is an async generator that yields the current set of discovered devices every second or when a new device is found or updated.
- async capture2go.connect(names)[source]
Connect to one or more IMU devices by name.
Pass device names like
IMU_1234bto connect to a sensor over BLE.To connect to a single device over USB, use
usbas the device name. This will throw a RuntimeError if multiple USB devices are found.To connect to multiple or specific USB devices, pass device names (like
/dev/tty.usbmodem*orCOM1on Windows).If BLE devices names are passed, a Bluetooth scan is started and connection is initiated once all devices are discovered.
- Parameters:
names (list[str]) – List of device names (e.g., [‘IMU_1234ab’, ‘usb’, …]).
- Returns:
List of connected
AbstractDeviceinstances (e.g.,BleDevice,UsbDevice).- Return type:
- Raises:
RuntimeError – If no matching USB device is found or multiple USB devices are found.
Parsing
- class capture2go.Unpacker(f=None, ignoreInitialGarbage=False)[source]
Bases:
objectUnpacks and parses the binary IMU protocol data stream into package objects.
This class can be used to feed raw bytes (from a file, serial port, or BLE) and iterate over parsed packages.
- Parameters:
f (file-like, optional) – File-like object to read data from. If None, data must be fed manually.
ignoreInitialGarbage (bool, optional) – If True, ignore bytes until a valid frame is found. This can be useful if file parsing is started in the middle of a byte stream and initial invalid data is to be expected. Defaults to False.
Example:
unpacker = Unpacker() unpacker.feed(receivedData) for package in unpacker: print(package)
- extractRtPackages(data, receiveTimestamp=None)[source]
Extract and parse real-time (RT) packages from data received over BLE.
Every raw package sent over BLE contains a byte indicating the count of RT packages, followed by the RT packages and then by the data stream. When receiving this data, the RT packages first have to be extracted using this method. Then, the remaining data chunk can be passed to
feed().The parsed packages are stored in this class and returned first when iterating over the data.
- Parameters:
data (bytes | bytearray) – Raw bytes containing RT packages.
receiveTimestamp (int, optional) – Timestamp (in nanoseconds, from
time.time_ns()) to be used whenDataClockRoundtrippackages are received. Defaults to None.
- Raises:
RuntimeError – If a package cannot be parsed or CRC check fails.
- Returns:
Remaining data after RT packages have been extracted.
- Return type:
- capture2go.loadBinaryFile(filename)[source]
Load and parse a binary Capture2Go recording file into NumPy arrays.
This function reads a binary file (optionally gzip-compressed), unpacks all packages, and organizes them by package type. Each package type is converted to a dictionary of NumPy arrays, with one array per field.
- Parameters:
filename (str | Path) – Path to the binary recording file. Can be a string or pathlib.Path object. Files with a .gz extension are automatically decompressed.
- Returns:
Nested dictionary where the outer key is the package class name (e.g.,
DataFullPacked200Hz) and the value is a dictionary mapping field names to NumPy arrays containing all values for that field.- Return type:
Exceptions
- class capture2go.DeviceIsRecording[source]
Raised by
AbstractDevice.init()if the device is currently recording and the parameterabortRecordingwas not set toTrue.
- class capture2go.DeviceIsStreaming[source]
Raised by
AbstractDevice.init()if the device is currently streaming and the parameterabortStreamingwas not set toTrue.
Type Aliases
- capture2go.StateListener
alias of
Callable[[AbstractDevice,Literal[‘disconnected’, ‘connecting’, ‘connected’]],Any]
- capture2go.PackageListener
alias of
Callable[[AbstractDevice,AbstractPackage,int|None],Any]