Python API Reference
The luxai-robot SDK gives you a single Robot object that talks to every part of QTrobot V3 — face, speech, gestures, motors, media, microphones, and a growing set of plugins (camera, speech recognition, kinematics) — through one consistent, transport-independent API.
pip install luxai-robot
# extras for optional transports/plugins:
pip install "luxai-robot[mqtt]" # MQTT transport
pip install "luxai-robot[webrtc]" # WebRTC transport
pip install "luxai-robot[asr-azure]" # Azure Speech ASR plugin
pip install "luxai-robot[asr-groq]" # Groq Whisper ASR plugin
pip install "luxai-robot[asr-parakeet]" # Local Parakeet ASR plugin
pip install "luxai-robot[asr-riva]" # Local NVIDIA Riva ASR plugin
On this page: Core concepts · Connecting · Plugins · tts · face · gesture · motor · media · speaker · microphone · camera · asr · kinematics
Core concepts
Two simple patterns repeat across every namespace below. Understanding them here means you won't need them re-explained on every page.
The action pattern: blocking calls and ActionHandle
Anything that takes real time on the robot — speaking, playing a gesture, moving an arm, playing a media file — comes in two forms:
- The plain method (e.g.
say_text(...)) blocks until the action finishes and returns its result. - The
_asyncmethod (e.g.say_text_async(...)) returns immediately with anActionHandle, so your code keeps running while the robot acts.
# Blocking — waits for the gesture to finish
robot.gesture.play_file("QT/wave")
# Non-blocking — keep doing other things, cancel early if needed
handle = robot.gesture.play_file_async("QT/wave")
...
handle.cancel()
ActionHandle exposes:
| Member | Description |
|---|---|
.result() | Block until the action completes and return its result (re-raises any error) |
.wait(timeout=None) | Block until completion or timeout, without consuming the result |
.cancel() | Request early cancellation of the action |
.done | True once the action has finished, succeeded, failed, or been cancelled |
.add_done_callback(fn) | Register fn(handle) to run when the action completes |
To coordinate several actions at once, use the module-level helpers wait_all_actions(*handles) and wait_any_action(*handles).
Throughout this reference, a method documented with both a blocking and _async form is shown once, with both variants available as tabs on the example.
The stream pattern: readers, writers, and callbacks
Continuous data — joint positions, microphone audio, camera frames — doesn't fit a single request/response call. These use streams instead, opened from a namespace's .stream property:
- Reader —
open_*_reader()returns an object you pull from explicitly:frame = reader.read(timeout=3.0). - Callback —
on_*(callback)runs your function automatically every time a new frame arrives, and returns aStreamSubscriptionyou.cancel()when done. - Writer —
open_*_writer()returns an object you push frames to:writer.write(frame). Used for sending data to the robot (e.g. streamed audio/video, joint commands).
# Reader — pull frames yourself
reader = robot.motor.stream.open_joints_state_reader()
frame = reader.read(timeout=1.0)
# Callback — let the SDK push frames to you
def on_state(frame):
print(frame.position("HeadYaw"))
subscription = robot.motor.stream.on_joints_state(on_state)
...
subscription.cancel()
Both forms deliver the exact same data — pick whichever fits your control flow better.
Connecting
Every Robot uses exactly one transport, chosen with one of four class methods. All return a ready-to-use Robot; use as a context manager (with Robot.connect_zmq(...) as robot:) or call robot.close() yourself.
- Local Network (ZMQ)
- MQTT
- WebRTC (MQTT signaling)
- WebRTC (ZMQ signaling)
Direct connection to the robot's onboard service hub — the lowest-latency option, for code running on the robot itself or its local network.
from luxai.robot import Robot
robot = Robot.connect_zmq(robot_id="QTRD000320")
# or connect directly to a known endpoint, skipping discovery:
robot = Robot.connect_zmq(endpoint="tcp://192.168.3.10:50557")
| Parameter | Type | Description |
|---|---|---|
endpoint | str | None | Explicit ZMQ endpoint (e.g. tcp://192.168.3.10:50557). Skips discovery. Mutually exclusive with robot_id. |
robot_id | str | None | Robot serial number, used for automatic endpoint discovery. Mutually exclusive with endpoint. |
connect_timeout | float | Seconds to wait for discovery when using robot_id (default 5.0). |
default_rpc_timeout | float | None | Overrides the default timeout applied to RPC calls on this client. |
Exactly one of endpoint / robot_id must be given. Raises ValueError if neither or both are provided, TimeoutError if discovery doesn't resolve in time.
Connects through an MQTT broker and the qtrobot-service-hub-gateway-mqtt bridge — for remote, cloud, or browser-friendly control. Requires pip install "luxai-robot[mqtt]".
robot = Robot.connect_mqtt("mqtt://10.231.0.1:1883", "QTRD000320")
With mutual TLS:
from luxai.robot import MqttOptions, MqttTlsOptions, MqttAuthOptions
options = MqttOptions(
tls=MqttTlsOptions(
ca_file="/path/to/ca.crt",
cert_file="/path/to/client.crt",
key_file="/path/to/client.key",
),
auth=MqttAuthOptions(mode="mtls"),
)
robot = Robot.connect_mqtt("mqtts://10.231.0.1:8883", "QTRD000320", options=options)
| Parameter | Type | Description |
|---|---|---|
uri | str | Broker URI: mqtt://, mqtts://, ws://, or wss://. |
robot_id | str | Robot serial number — addresses the right robot on a shared broker. |
options | MqttOptions | None | TLS, authentication, session, and reconnect settings. |
connect_timeout | float | Seconds to wait for the broker connection (default 10.0). |
default_rpc_timeout | float | None | Overrides the default RPC call timeout. |
Raises RuntimeError if the broker connection fails, ImportError if paho-mqtt isn't installed.
A WebRTC peer connection, signaled over MQTT (so it works over the internet, behind NAT). Once connected, all traffic flows peer-to-peer — the broker is only used for the initial handshake. Requires pip install "luxai-robot[webrtc]".
robot = Robot.connect_webrtc_mqtt("mqtt://10.231.0.1:1883", "QTRD000320")
With STUN/TURN for NAT traversal and TLS signaling:
from luxai.robot import MqttOptions, MqttTlsOptions, WebRTCOptions, WebRTCTurnServer
robot = Robot.connect_webrtc_mqtt(
"mqtts://10.231.0.1:8883",
"QTRD000320",
mqtt_options=MqttOptions(tls=MqttTlsOptions(ca_file="/path/to/ca.crt")),
webrtc_options=WebRTCOptions(
stun_servers=["stun:stun.l.google.com:19302"],
turn_servers=[WebRTCTurnServer(url="turn:turn.example.com:3478",
username="user", credential="pass")],
),
)
| Parameter | Type | Description |
|---|---|---|
broker_url | str | MQTT broker URI used only for signaling. |
robot_id | str | Used as the WebRTC session ID so both peers rendezvous correctly. |
mqtt_options | MqttOptions | None | Settings for the signaling broker connection. |
webrtc_options | WebRTCOptions | None | STUN/TURN servers, codec preferences for the peer connection. |
reconnect | bool | Automatically re-establish the connection if it drops (default False). |
connect_timeout | float | End-to-end timeout covering signaling + handshake (default 15.0). |
default_rpc_timeout | float | None | Overrides the default RPC call timeout. |
Raises ImportError if aiortc isn't installed, RuntimeError if the handshake doesn't complete in time.
A WebRTC peer connection signaled over a direct ZMQ socket instead of a broker — for LAN/local use with no MQTT infrastructure available.
# Operator side (connects)
robot = Robot.connect_webrtc_zmq("tcp://192.168.1.10:5555", "QTRD000320")
| Parameter | Type | Description |
|---|---|---|
endpoint | str | ZMQ signaling endpoint, e.g. tcp://192.168.1.10:5555 (use tcp://*:5555 when binding). |
robot_id | str | Used as the WebRTC session ID. |
bind | bool | True to bind the socket (typically the robot side), False to connect (operator side, default). |
webrtc_options | WebRTCOptions | None | STUN/TURN servers, codec preferences. |
reconnect | bool | Automatically re-establish the connection if it drops (default False). |
connect_timeout | float | End-to-end timeout for the handshake (default 15.0). |
default_rpc_timeout | float | None | Overrides the default RPC call timeout. |
Robot properties
Once connected, the robot's self-reported identity is available without any extra calls:
| Property | Type | Description |
|---|---|---|
robot.robot_id | str | None | The robot's serial number (e.g. QTRD000320). |
robot.robot_type | str | None | Robot model/type string. |
robot.sdk_version | str | None | Robot-side SDK version. |
robot.min_sdk / robot.max_sdk | str | None | SDK version range supported by the robot. |
Plugins
Some capabilities aren't part of the core service hub and are enabled on demand as plugins — the camera driver, ASR engines, and kinematics helpers are all plugins. Once enabled, a plugin shows up as a regular namespace (e.g. robot.camera) regardless of where it actually runs.
- Local
- ZMQ
- MQTT
- WebRTC
robot.enable_plugin_local("kinematics")
robot.enable_plugin_zmq("realsense-driver", node_id="qtrobot-realsense-driver")
# or directly by endpoint:
robot.enable_plugin_zmq("realsense-driver", endpoint="tcp://192.168.3.152:50750")
robot.enable_plugin_mqtt("realsense-driver", node_id="qtrobot-realsense-driver")
Reuses the robot's existing broker connection — requires the robot to be connected via connect_mqtt.
robot.enable_plugin_webrtc_mqtt(
"realsense-driver",
node_id="qtrobot-realsense-driver",
broker_url="mqtt://10.231.0.1:1883",
)
Each plugin gets its own independent WebRTC peer connection (own data channel and media tracks), so plugin streams never conflict with the main robot peer. enable_plugin_webrtc_zmq(...) is also available for broker-less signaling.
Disable any plugin with robot.disable_plugin(name). You can also pass a custom Transport directly via robot.enable_plugin(name, transport).
tts
robot.tts controls text-to-speech: listing/selecting engines and voices, speaking plain text or SSML, and per-engine configuration.
say_text / say_text_async
Synthesize and play plain text using a selected TTS engine. Visemes may be scheduled to the face if connected, so the robot's lips move in sync.
say_text(text, engine=None, lang=None, voice=None, rate=None, pitch=None, volume=None, style=None) -> bool
say_text_async(...) -> ActionHandle
| Parameter | Type | Description |
|---|---|---|
text | str | Text to synthesize. |
engine | str | None | Engine id to use (default engine if omitted). |
lang | str | None | Language code, e.g. en-US. |
voice | str | None | Voice id/name. |
rate | float | None | Speaking rate multiplier. |
pitch | float | None | Pitch adjustment. |
volume | float | None | Volume level. |
style | str | None | Speaking style (engine-dependent). |
Returns: bool — True on success.
- Blocking
- Non-blocking
robot.tts.say_text("Hello world!")
robot.tts.say_text("Slower speech", engine="acapela", rate=0.8, pitch=1.1)
handle = robot.tts.say_text_async("This is a very long sentence...")
time.sleep(2)
handle.cancel()
say_ssml / say_ssml_async
Synthesize and play SSML markup using a selected TTS engine.
say_ssml(ssml, engine=None) -> bool
say_ssml_async(...) -> ActionHandle
| Parameter | Type | Description |
|---|---|---|
ssml | str | SSML markup string. |
engine | str | None | Engine id to use (default engine if omitted). |
Returns: bool — True on success.
robot.tts.say_ssml("<speak>Hello!</speak>")
robot.tts.say_ssml("<speak>Hello!</speak>", engine="azure")
Cancel an in-progress utterance the same way as say_text_async — via say_ssml_async(...) and .cancel().
Engines, voices, and languages
| Method | Returns | Description |
|---|---|---|
list_engines() | list[str] | Loaded/available TTS engine ids. |
get_default_engine() | str | Current default engine id. |
set_default_engine(engine) | — | Set the default engine id. |
get_languages(engine=None) | list[str] | Supported language codes for an engine. |
get_voices(engine=None) | list[dict] | Supported voices (id, lang, gender, display_name, ...). |
supports_ssml(engine=None) | bool | Whether the engine supports SSML. |
engines = robot.tts.list_engines()
robot.tts.set_default_engine("acapela")
voices = robot.tts.get_voices(engine="acapela")
for v in voices:
print(v["display_name"], v["lang"])
if robot.tts.supports_ssml(engine="azure"):
robot.tts.say_ssml("<speak>Hello!</speak>", engine="azure")
Engine configuration
| Method | Returns | Description |
|---|---|---|
get_config(engine=None) | dict | Current engine configuration map. |
set_config(config, engine=None) | bool | Set engine-specific configuration parameters. |
cfg = robot.tts.get_config(engine="acapela")
robot.tts.set_config(config={"pitch": 1.0, "rate": 0.8}, engine="acapela")
face
robot.face controls the face display: eye gaze and emotion playback.
look
Move (offset) the eyes on the face display.
look(l_eye, r_eye, duration=0.0) -> None
| Parameter | Type | Description |
|---|---|---|
l_eye | list | [dx, dy] offset for the left eye (pixels), relative to its configured center. |
r_eye | list | [dx, dy] offset for the right eye (pixels). |
duration | float | If > 0, eyes reset back to center after this many seconds (default 0.0). |
robot.face.look(l_eye=[30, 0], r_eye=[30, 0]) # look right
robot.face.look(l_eye=[0, 0], r_eye=[0, 0], duration=2) # center, auto-reset
show_emotion / show_emotion_async
Play an emotion video on the face background lane.
show_emotion(emotion, speed=None) -> bool
show_emotion_async(emotion, speed=None) -> ActionHandle
| Parameter | Type | Description |
|---|---|---|
emotion | str | Emotion name or relative path, with or without the .avi extension. |
speed | float | None | Optional playback speed factor. |
Returns: bool — True if playback completed, False otherwise.
# Blocking
robot.face.show_emotion("QT/kiss")
robot.face.show_emotion("QT/surprise", speed=2.0)
# Non-blocking — cancel after 3 seconds
handle = robot.face.show_emotion_async("QT/breathing_exercise")
time.sleep(3)
handle.cancel()
list_emotions
List available emotion video files under the emotions directory (recursive scan, .avi/.AVI files).
list_emotions() -> list[str]
for e in robot.face.list_emotions():
print(e)
gesture
robot.gesture plays predefined gesture files, plays raw keyframe trajectories, and records new gestures from the robot's own motors.
play_file / play_file_async
Load a gesture XML file and play it.
play_file(gesture, speed_factor=1.0) -> bool
play_file_async(gesture, speed_factor=1.0) -> ActionHandle
| Parameter | Type | Description |
|---|---|---|
gesture | str | Gesture name/path, with or without the .xml extension. |
speed_factor | float | Playback speed multiplier (default 1.0). |
Returns: bool — True if loaded and played successfully.
- Blocking
- Non-blocking
robot.gesture.play_file("QT/wave")
handle = robot.gesture.play_file_async("QT/bye")
input("Press Enter to cancel...")
handle.cancel()
play / play_async
Play a gesture trajectory (a keyframes dict, as returned by record()). Progress is published on the gesture.stream.on_progress stream below.
play(keyframes, resample=True, rate_hz=100.0, speed_factor=1.0) -> bool
play_async(...) -> ActionHandle
| Parameter | Type | Description |
|---|---|---|
keyframes | dict | Trajectory dict, as returned by gesture.record(). |
resample | bool | Resample for smooth playback (default True). |
rate_hz | float | Resample rate in Hz (default 100.0). |
speed_factor | float | Playback speed multiplier (default 1.0). |
# Blocking
robot.gesture.play(keyframes)
# Non-blocking — cancel on demand
handle = robot.gesture.play_async(keyframes)
handle.cancel()
list_files
List available gesture XML files under the configured gesture directory.
list_files() -> list[str]
for g in robot.gesture.list_files():
print(g)
record / record_async, stop_record, store_record
Capture a gesture by physically moving (or releasing torque on) selected motors and recording the resulting trajectory.
record(motors, release_motors=False, delay_start_ms=0, timeout_ms=60000,
refine_keyframe=False, keyframe_pos_eps=..., keyframe_max_gap_us=...) -> dict
record_async(...) -> ActionHandle
stop_record() -> bool
store_record(gesture) -> None
| Parameter | Type | Description |
|---|---|---|
motors | list[str] | Motor names to record. |
release_motors | bool | If True, torque is disabled during recording (lets a human move the arm freely). |
delay_start_ms | int | Delay before recording starts, in ms (default 0). |
timeout_ms | int | Maximum recording duration, in ms (default 60000). |
refine_keyframe | bool | Compress redundant keyframes if True. |
keyframe_pos_eps | float | Position epsilon for keyframe compression (degrees). |
keyframe_max_gap_us | int | Max gap for keyframe compression (µs). |
record() blocks until recording finishes (timeout or manual stop) and returns the trajectory. For interactive use, call record_async(), let the user signal when to stop, call stop_record() to end capture, then handle.result() to retrieve the trajectory — and store_record(gesture) to save it to an XML file for later playback with play_file.
handle = robot.gesture.record_async(
motors=["RightShoulderPitch", "RightElbowRoll"],
release_motors=True,
delay_start_ms=2000,
timeout_ms=20000,
)
input("Press Enter to stop...")
robot.gesture.stop_record()
keyframes = handle.result()
robot.gesture.store_record("my_wave")
Stream: on_progress / open_progress_reader
Outbound stream of gesture playback progress while play()/play_file() is running.
on_progress(callback, queue_size=None) -> StreamSubscription
open_progress_reader(queue_size=None) -> TypedStreamReader[DictFrame]
Frame payload (DictFrame): percentage (float), time_us (int).
def on_progress(frame):
print(f"{frame.value['percentage']:.1f}%")
subscription = robot.gesture.stream.on_progress(on_progress)
motor
robot.motor lists motors, controls torque/homing/velocity, and streams joint state, errors, and commands.
list
List configured motors and their parameters.
list() -> dict
Returns: dict — {motor_name: {part, position_min, position_max, position_home, velocity_max, overload_threshold, ...}}.
motors = robot.motor.list()
for name, info in motors.items():
print(name, info)
Torque and homing
| Method | Description |
|---|---|
on(motor) | Enable torque for one motor. |
off(motor) | Disable torque for one motor. |
on_all() | Enable torque for all motors. |
off_all() | Disable torque for all motors. |
home(motor) | Move one motor to its configured home position. |
home_all() | Move all motors to their configured home positions. |
All return bool (True on success).
robot.motor.on_all()
robot.motor.home("HeadYaw")
robot.motor.home_all()
robot.motor.off_all()
Velocity
set_velocity(motor, velocity) -> bool
| Parameter | Type | Description |
|---|---|---|
motor | str | Motor name. |
velocity | int | Velocity value, validated against the motor's max. |
robot.motor.set_velocity("HeadYaw", 50)
Stream: joint state, errors, and commands
stream.open_joints_state_reader(queue_size=None) -> TypedStreamReader[JointStateFrame]
stream.on_joints_state(callback, queue_size=None) -> StreamSubscription
stream.open_joints_error_reader(queue_size=None) -> TypedStreamReader[DictFrame]
stream.on_joints_error(callback, queue_size=None) -> StreamSubscription
stream.open_joints_command_writer(queue_size=None) -> TypedStreamWriter[JointCommandFrame]
joints_state— outboundJointStateFramemappingmotor_name → {position, velocity, effort, voltage, temperature}.joints_error— outboundDictFramemappingmotor_name → {overload_limit?, voltage_limit?, temperature_limit?, sensor_failure?}flags, when present.joints_command— inbound writer for sending{'position': float, 'velocity': float (optional)}commands per motor.
# Read joint state — callback-based
def on_state(frame):
print(frame.position("HeadYaw"))
robot.motor.stream.on_joints_state(on_state)
# Read joint state — reader-based
reader = robot.motor.stream.open_joints_state_reader()
frame = reader.read()
# Watch for motor errors
def on_error(frame):
print("Motor error:", frame.value)
robot.motor.stream.on_joints_error(on_error)
# Send a joint command
from luxai.robot import JointCommandFrame
writer = robot.motor.stream.open_joints_command_writer()
cmd = JointCommandFrame()
cmd.set_joint("HeadYaw", position=30, velocity=40)
writer.write(cmd)
media
robot.media plays audio and video on two independent lanes — foreground (FG) and background (BG) — either from files or as live streamed frames. FG and BG expose an identical API; examples below use FG, just substitute bg for the background lane.
File playback
play_fg_audio_file(uri) -> bool
play_fg_audio_file_async(uri) -> ActionHandle
pause_fg_audio_file() -> None
resume_fg_audio_file() -> None
play_bg_audio_file(uri) -> bool
play_bg_audio_file_async(uri) -> ActionHandle
pause_bg_audio_file() -> None
resume_bg_audio_file() -> None
| Parameter | Type | Description |
|---|---|---|
uri | str | Audio file URI/path supported by the engine. |
- Blocking
- Non-blocking
ok = robot.media.play_fg_audio_file("/data/audio/hello.wav")
handle = robot.media.play_fg_audio_file_async("/data/audio/hello.wav")
time.sleep(3)
handle.cancel()
Pause/resume work on the currently playing file:
handle = robot.media.play_bg_audio_file_async("/data/audio/music.wav")
time.sleep(2)
robot.media.pause_bg_audio_file()
time.sleep(3)
robot.media.resume_bg_audio_file()
handle.wait()
Video file playback
play_fg_video_file(uri, speed=1.0, with_audio=False) -> bool
play_fg_video_file_async(uri, speed=1.0, with_audio=False) -> ActionHandle
pause_fg_video_file() -> None
resume_fg_video_file() -> None
set_fg_video_alpha(value) -> None
| Parameter | Type | Description |
|---|---|---|
uri | str | Video file URI/path. |
speed | float | Playback speed factor (default 1.0). |
with_audio | bool | Play the embedded audio track if True (default False). |
value (alpha) | float | Foreground video transparency, 0.0 (fully transparent) to 1.0. |
ok = robot.media.play_fg_video_file("/data/video/intro.mp4")
robot.media.set_fg_video_alpha(0.8)
# Pause/resume on the background lane
handle = robot.media.play_bg_video_file_async("/data/emotions/QT/kiss.avi")
time.sleep(2)
robot.media.pause_bg_video_file()
time.sleep(3)
robot.media.resume_bg_video_file()
handle.wait()
play_bg_video_file / play_bg_video_file_async / pause_bg_video_file / resume_bg_video_file mirror the FG methods above (no alpha control on BG).
Volume
set_fg_audio_volume(value) -> None
get_fg_audio_volume() -> float
set_bg_audio_volume(value) -> None
get_bg_audio_volume() -> float
value is in range [0.0, 1.0]. (For the robot's overall speaker volume rather than a single media lane, see speaker.)
robot.media.set_fg_audio_volume(0.8)
vol = robot.media.get_fg_audio_volume()
Streamed audio/video
For audio/video you generate or receive yourself (rather than a file on disk), open a stream writer and push frames to it. Use the matching cancel_*_stream() / pause_*_stream() / resume_*_stream() calls to control the pipeline — these are separate from the file-playback controls above.
stream.open_fg_audio_stream_writer(queue_size=None) -> TypedStreamWriter[AudioFrameRaw]
stream.open_bg_audio_stream_writer(queue_size=None) -> TypedStreamWriter[AudioFrameRaw]
stream.open_fg_video_stream_writer(queue_size=None) -> TypedStreamWriter[ImageFrameRaw]
stream.open_bg_video_stream_writer(queue_size=None) -> TypedStreamWriter[ImageFrameRaw]
cancel_fg_audio_stream() / pause_fg_audio_stream() / resume_fg_audio_stream()
cancel_bg_audio_stream() / pause_bg_audio_stream() / resume_bg_audio_stream()
cancel_fg_video_stream() / pause_fg_video_stream() / resume_fg_video_stream()
cancel_bg_video_stream() / pause_bg_video_stream() / resume_bg_video_stream()
from luxai.robot import AudioFrameRaw
writer = robot.media.stream.open_fg_audio_stream_writer()
writer.write(AudioFrameRaw(...))
...
robot.media.cancel_fg_audio_stream()
speaker
robot.speaker controls the robot's overall hardware volume — separate from any individual media lane's volume (see media).
set_volume(value) -> bool
get_volume() -> float
mute() -> bool
unmute() -> bool
| Parameter | Type | Description |
|---|---|---|
value | float | Volume in range [0.0, 1.0]. |
All methods return bool/float — True/False indicate whether the underlying mixer control succeeded.
robot.speaker.set_volume(0.8)
vol = robot.speaker.get_volume()
robot.speaker.mute()
robot.speaker.unmute()
microphone
robot.microphone reads raw audio from the robot's internal 5-channel microphone array (and an optional external mic), tunes the array's DSP, and reports voice-activity/direction-of-arrival events.
DSP tuning
get_int_tuning() -> dict
set_int_tuning(name, value) -> bool
get_int_tuning() returns every readable parameter exposed by the internal mic array's Respeaker controller (an empty dict if the device isn't available). set_int_tuning(name, value) sets one parameter — persistence is handled via config (microphone.tunning.*) applied at startup.
params = robot.microphone.get_int_tuning()
print(params.get("AECNORM"))
robot.microphone.set_int_tuning(name="AGCONOFF", value=1.0)
Internal microphone audio (5 channels)
stream.open_int_audio_ch0_reader(queue_size=None) -> TypedStreamReader[AudioFrameRaw]
stream.on_int_audio_ch0(callback, queue_size=None) -> StreamSubscription
# ... ch1, ch2, ch3, ch4 follow the same pattern
Channel 0 is the processed/ASR-ready channel; channels 1–4 are the raw per-microphone signals (exact physical mapping depends on the ALSA layout).
def on_audio(frame):
process(frame.data)
robot.microphone.stream.on_int_audio_ch0(on_audio, queue_size=10)
# Or pull directly
reader = robot.microphone.stream.open_int_audio_ch0_reader(queue_size=10)
frame = reader.read(timeout=3.0)
External microphone audio
stream.open_ext_audio_ch0_reader(queue_size=None) -> TypedStreamReader[AudioFrameRaw]
stream.on_ext_audio_ch0(callback, queue_size=None) -> StreamSubscription
Only published if microphone.external.enabled is set in the robot's configuration.
Voice activity and direction of arrival
stream.open_int_event_reader(queue_size=None) -> TypedStreamReader[DictFrame]
stream.on_int_event(callback, queue_size=None) -> StreamSubscription
Frame payload (DictFrame): activity (bool, True when voice is detected), direction (int, estimated direction-of-arrival in degrees).
def on_event(frame):
evt = frame.value
if evt.get("activity"):
print("Voice detected — DOA:", evt.get("direction"))
robot.microphone.stream.on_int_event(on_event, queue_size=2)
This stream delivers only the latest event — events may be dropped if your callback/reader is too slow to keep up.
camera (plugin)
robot.camera is a plugin namespace exposing QTrobot's RealSense 3D camera: color/depth/IMU streams and intrinsics. Enable it first — see Plugins — then use it exactly like a core namespace.
robot.enable_plugin_zmq("realsense-driver", node_id="qtrobot-realsense-driver")
Intrinsics and depth scale
get_color_intrinsics() -> dict
get_depth_intrinsics() -> dict
get_depth_scale() -> dict
get_color_intrinsics() / get_depth_intrinsics() return {fx, fy, ppx, ppy, width, height, model, coeffs}. get_depth_scale() returns {'scale': float} — multiply a raw depth pixel value by scale to get metres.
intr = robot.camera.get_color_intrinsics()
info = robot.camera.get_depth_scale()
scale = info["scale"]
Image streams
stream.open_color_reader(queue_size=None) -> TypedStreamReader[ImageFrameRaw]
stream.on_color(callback, queue_size=None) -> StreamSubscription
stream.open_depth_reader(queue_size=None) -> TypedStreamReader[ImageFrameRaw]
stream.on_depth(callback, queue_size=None) -> StreamSubscription
stream.open_depth_aligned_reader(queue_size=None) -> TypedStreamReader[ImageFrameRaw]
stream.on_depth_aligned(callback, queue_size=None) -> StreamSubscription
stream.open_depth_color_reader(queue_size=None) -> TypedStreamReader[ImageFrameRaw]
stream.on_depth_color(callback, queue_size=None) -> StreamSubscription
| Stream | Frame | Description |
|---|---|---|
color | ImageFrameRaw (BGR, width × height × 3) | Color image. |
depth | ImageFrameRaw (16-bit, width × height) | Raw depth image. |
depth_aligned | ImageFrameRaw (16-bit, color resolution) | Depth aligned to the color frame. |
depth_color | ImageFrameRaw (BGR) | False-colour depth, colourised for visualisation. |
reader = robot.camera.stream.open_color_reader()
frame = reader.read(timeout=3.0)
IMU streams
stream.open_gyro_reader(queue_size=None) -> TypedStreamReader[ListFrame]
stream.on_gyro(callback, queue_size=None) -> StreamSubscription
stream.open_acceleration_reader(queue_size=None) -> TypedStreamReader[ListFrame]
stream.on_acceleration(callback, queue_size=None) -> StreamSubscription
Both deliver a ListFrame of [x, y, z] — angular velocity in rad/s for gyro, linear acceleration in m/s² for acceleration.
def on_gyro(frame):
print(frame.value) # [x, y, z]
robot.camera.stream.on_gyro(on_gyro)
asr (plugin)
robot.asr is a plugin namespace for continuous speech recognition, backed by your choice of four engines: Azure, Groq Whisper, or a local Parakeet or Riva server. Enable the plugin first — see Plugins — then configure_* the engine you want once before recognizing.
Each engine exposes the same three-part shape: configure_<engine>(...), recognize_<engine>() / recognize_<engine>_async(), and a pair of streams (<engine>_speech, <engine>_event) for continuous/background consumption.
- Azure
- Groq Whisper
- Parakeet (local)
- Riva (local)
robot.enable_plugin_local("asr-azure")
configure_azure(subscription, region, languages=["en-US"], silence_timeout=0.2,
use_vad=False, continuous_mode=False) -> bool
recognize_azure() -> dict
recognize_azure_async() -> ActionHandle
| Parameter | Type | Description |
|---|---|---|
subscription | str | Azure Speech subscription key. |
region | str | Azure Speech region, e.g. westeurope. |
languages | list | Language codes to recognize (default ['en-US']). |
silence_timeout | float | Silence end-of-speech threshold in seconds (default 0.2). |
use_vad | bool | Enable voice-activity detection (default False). |
continuous_mode | bool | Enable continuous recognition mode (default False). |
recognize_azure() blocks until a complete utterance is recognized; result fields include text, confidence, etc.
robot.asr.configure_azure(subscription="<key>", region="westeurope",
continuous_mode=True, use_vad=True)
result = robot.asr.recognize_azure()
print(result.get("text"))
Streams — stream.on_azure_speech(callback) / open_azure_speech_reader() deliver recognized segments (DictFrame: text, confidence, language); stream.on_azure_event(callback) / open_azure_event_reader() deliver lifecycle events (StringFrame: recognizing, recognized, canceled, session_started, session_stopped).
robot.enable_plugin_local("asr-groq")
configure_groq(api_key, language="en", context_prompt=None, silence_timeout=0.5,
use_vad=True, continuous_mode=False) -> bool
recognize_groq() -> dict
recognize_groq_async() -> ActionHandle
| Parameter | Type | Description |
|---|---|---|
api_key | str | Groq API key. |
language | str | ISO-639-1 language code, e.g. en, fr (default en). |
context_prompt | str | None | Optional domain hint for Whisper (max 224 chars). |
silence_timeout | float | Seconds of silence that end an utterance (default 0.5). |
use_vad | bool | Enable client-side voice-activity detection (default True). |
continuous_mode | bool | Enable continuous recognition mode. |
recognize_groq() blocks until voice activity is detected, one utterance is captured (ended by silence), and Groq transcribes it via the Whisper API; result fields: text, language.
robot.asr.configure_groq(api_key="<your-groq-api-key>", language="en", continuous_mode=True)
result = robot.asr.recognize_groq()
print(result.get("text"))
Streams — stream.on_groq_speech(callback) / open_groq_speech_reader() (DictFrame: text, language); stream.on_groq_event(callback) / open_groq_event_reader() (StringFrame: STARTED, RECOGNIZING, RECOGNIZED, STOPPED, CANCELED).
Runs against qtrobot-parakeet-asr-server (e.g. on a Jetson Orin) rather than a cloud API — no API key needed.
robot.enable_plugin_local("asr-parakeet")
configure_parakeet(endpoint, language="en", use_vad=True, silence_timeout=0.3,
max_buffer_seconds=20.0, continuous_mode=False) -> bool
recognize_parakeet() -> dict
recognize_parakeet_async() -> ActionHandle
| Parameter | Type | Description |
|---|---|---|
endpoint | str | ZMQ base endpoint of qtrobot-parakeet-asr-server, e.g. tcp://10.231.0.1:50860. |
language | str | ISO-639-1 language code reported in results — the model auto-detects language from audio (default en). |
use_vad | bool | Enable client-side voice-activity detection (default True). |
silence_timeout | float | Silence duration (s) used by both the client VAD and server RMS detection (default 0.3). |
max_buffer_seconds | float | Maximum utterance duration before forced finalization (default 20.0). |
continuous_mode | bool | Enable continuous recognition mode. |
recognize_parakeet() waits for voice activity, streams audio to qtrobot-parakeet-asr-server, and blocks until the final transcription returns; interim results are published on the parakeet_speech stream. Result fields: text, language.
robot.asr.configure_parakeet(
endpoint="tcp://10.231.0.1:50860",
language="en",
use_vad=True,
silence_timeout=0.3,
max_buffer_seconds=20.0,
continuous_mode=True,
)
# Continuous mode — subscribe to streams
robot.asr.stream.on_parakeet_event(lambda e: print("event:", e.value))
robot.asr.stream.on_parakeet_speech(lambda s: print("speech:", s.value))
# Or perform a single recognition
result = robot.asr.recognize_parakeet()
print(result.get("text"))
Streams — stream.on_parakeet_speech(callback) / open_parakeet_speech_reader() (DictFrame: text, language, published for both interim and final results); stream.on_parakeet_event(callback) / open_parakeet_event_reader() (StringFrame: STARTED, RECOGNIZING, RECOGNIZED, STOPPED, CANCELED).
Runs against a self-hosted NVIDIA Riva ASR server (typically a Docker container on the same network) rather than a cloud API. Requires the asr-riva extra: pip install "luxai-robot[asr-riva]".
robot.enable_plugin_local("asr-riva")
configure_riva(server="localhost:50051", language="en-US", use_ssl=False, ssl_cert=None,
profanity_filter=False, automatic_punctuation=True, use_vad=False,
continuous_mode=False) -> bool
recognize_riva() -> dict
recognize_riva_async() -> ActionHandle
| Parameter | Type | Description |
|---|---|---|
server | str | Address of the Riva ASR server (default localhost:50051). |
language | str | BCP-47 language code, e.g. en-US (default en-US). |
use_ssl | bool | Connect to the server over SSL (default False). |
ssl_cert | str | None | Path to an SSL certificate, used when use_ssl=True. |
profanity_filter | bool | Filter profanity from the transcript (default False). |
automatic_punctuation | bool | Let Riva add punctuation to the transcript (default True). |
use_vad | bool | Enable client-side voice-activity detection (default False). |
continuous_mode | bool | Enable continuous recognition mode. |
recognize_riva() streams microphone audio to the Riva server and blocks until a final transcript is returned; interim results are published on the riva_speech stream. Result fields: text, language.
robot.asr.configure_riva(
server="localhost:50051",
language="en-US",
use_vad=True,
continuous_mode=True,
)
# Continuous mode — subscribe to streams
robot.asr.stream.on_riva_event(lambda e: print("event:", e.value))
robot.asr.stream.on_riva_speech(lambda s: print("speech:", s.value))
# Or perform a single recognition
result = robot.asr.recognize_riva()
print(result.get("text"))
Streams — stream.on_riva_speech(callback) / open_riva_speech_reader() (DictFrame: text, language); stream.on_riva_event(callback) / open_riva_event_reader() (StringFrame: STARTED, RECOGNIZING, RECOGNIZED, STOPPED, CANCELED).
Every recognize_* method has a non-blocking _async counterpart returning an ActionHandle — call .cancel() on it to abort recognition early.
handle = robot.asr.recognize_azure_async()
result = handle.result()
print(result.get("text"))
kinematics (plugin)
robot.kinematics is a plugin namespace solving inverse kinematics for head gaze and arm reach/aim, working in the robot base frame (origin at the bottom of the robot: x forward, y left-positive, z up). Enable it first — see Plugins — typically with robot.enable_plugin_local("kinematics") since it computes locally.
configure
Optional — defaults already match the QTrobot hardware.
configure(fx=376.4, fy=376.1, ppx=314.6, ppy=255.6, img_cx=..., img_cy=...,
camera_height=0.6, motor_timeout=20.0) -> bool
| Parameter | Type | Description |
|---|---|---|
fx, fy | float | Camera focal length (default 376.4, 376.1). |
ppx, ppy | float | Camera principal point (default 314.6, 255.6). |
camera_height | float | Camera height above the robot base frame, in metres (default 0.6). |
motor_timeout | float | Hard cap on wait time for joint motion to complete, in seconds (default 20.0). |
robot.kinematics.configure(motor_timeout=15.0)
look_at_point / look_at_point_async
Move the robot head to look at a 3-D point. Sends an eye-gaze command and, unless only_gaze=True, a head joint command; blocks until the head motors stop.
look_at_point(x, y, z, only_gaze=False, velocity=0) -> bool
look_at_point_async(...) -> ActionHandle
| Parameter | Type | Description |
|---|---|---|
x | float | Forward distance from the robot base (metres). |
y | float | Lateral distance — positive = left, negative = right. |
z | float | Height above the base (metres). |
only_gaze | bool | Move the eyes only, not the head, if True. |
velocity | float | Joint velocity override (motor units); 0 = robot default. |
# Blocking
robot.kinematics.look_at_point(0.8, 0.0, 1.2)
# Non-blocking
handle = robot.kinematics.look_at_point_async(0.8, 0.0, 1.2)
handle.wait()
# Eyes only
robot.kinematics.look_at_point(0.8, 0.3, 1.0, only_gaze=True)
look_at_pixel / look_at_pixel_async
Move the head to look at a camera pixel — converts (u, v, depth) to a 3-D point using the current head joint angles and camera intrinsics, then calls look_at_point.
look_at_pixel(u, v, depth=1.0, only_gaze=False, velocity=0) -> bool
look_at_pixel_async(...) -> ActionHandle
robot.kinematics.look_at_pixel(320, 240, depth=1.5)
reach_left / reach_right (+ _async)
Move one arm to reach a 3-D point, solving inverse kinematics for that arm; blocks until the arm motors stop. Points outside the reachable workspace are clamped to the closest reachable configuration.
reach_right(x, y, z, velocity=0) -> bool
reach_right_async(...) -> ActionHandle
reach_left(x, y, z, velocity=0) -> bool
reach_left_async(...) -> ActionHandle
y is negative for the right side, positive for the left side.
robot.kinematics.reach_right(0.3, -0.2, 0.5)
robot.kinematics.reach_left(0.3, 0.2, 0.5)
aim_at_point / aim_at_pixel (+ _async)
Like reach_*, but auto-selects the arm based on y (left arm if y >= 0, right arm if y < 0).
aim_at_point(x, y, z, velocity=0) -> bool
aim_at_point_async(...) -> ActionHandle
aim_at_pixel(u, v, depth=1.0, velocity=0) -> bool
aim_at_pixel_async(...) -> ActionHandle
robot.kinematics.aim_at_point(0.5, 0.0, 1.0)
robot.kinematics.aim_at_pixel(320, 240, depth=1.0)
pixel_to_point
Pure computation, no motor movement — convert a camera pixel to a 3-D point using the current head joint angles and camera intrinsics.
pixel_to_point(u, v, depth=1.0) -> dict
Returns: {'x': float, 'y': float, 'z': float} in the robot base frame (metres).
pt = robot.kinematics.pixel_to_point(320, 240, depth=1.2)
print(pt["x"], pt["y"], pt["z"])
See the Python Tutorials for runnable, narrative walkthroughs of several of the namespaces above.