Skip to main content
Version: QTrobot V3

Python API Reference

The luxai-robot SDK gives you a single Robot object that talks to every part of QTrobot V3 — face, speech, gestures, motors, media, microphones, and a growing set of plugins (camera, speech recognition, kinematics) — through one consistent, transport-independent API.

pip install luxai-robot
# extras for optional transports/plugins:
pip install "luxai-robot[mqtt]" # MQTT transport
pip install "luxai-robot[webrtc]" # WebRTC transport
pip install "luxai-robot[asr-azure]" # Azure Speech ASR plugin
pip install "luxai-robot[asr-groq]" # Groq Whisper ASR plugin
pip install "luxai-robot[asr-parakeet]" # Local Parakeet ASR plugin
pip install "luxai-robot[asr-riva]" # Local NVIDIA Riva ASR plugin

On this page: Core concepts · Connecting · Plugins · tts · face · gesture · motor · media · speaker · microphone · camera · asr · kinematics

Core concepts

Two simple patterns repeat across every namespace below. Understanding them here means you won't need them re-explained on every page.

The action pattern: blocking calls and ActionHandle

Anything that takes real time on the robot — speaking, playing a gesture, moving an arm, playing a media file — comes in two forms:

  • The plain method (e.g. say_text(...)) blocks until the action finishes and returns its result.
  • The _async method (e.g. say_text_async(...)) returns immediately with an ActionHandle, so your code keeps running while the robot acts.
# Blocking — waits for the gesture to finish
robot.gesture.play_file("QT/wave")

# Non-blocking — keep doing other things, cancel early if needed
handle = robot.gesture.play_file_async("QT/wave")
...
handle.cancel()

ActionHandle exposes:

MemberDescription
.result()Block until the action completes and return its result (re-raises any error)
.wait(timeout=None)Block until completion or timeout, without consuming the result
.cancel()Request early cancellation of the action
.doneTrue once the action has finished, succeeded, failed, or been cancelled
.add_done_callback(fn)Register fn(handle) to run when the action completes

To coordinate several actions at once, use the module-level helpers wait_all_actions(*handles) and wait_any_action(*handles).

Throughout this reference, a method documented with both a blocking and _async form is shown once, with both variants available as tabs on the example.

The stream pattern: readers, writers, and callbacks

Continuous data — joint positions, microphone audio, camera frames — doesn't fit a single request/response call. These use streams instead, opened from a namespace's .stream property:

  • Readeropen_*_reader() returns an object you pull from explicitly: frame = reader.read(timeout=3.0).
  • Callbackon_*(callback) runs your function automatically every time a new frame arrives, and returns a StreamSubscription you .cancel() when done.
  • Writeropen_*_writer() returns an object you push frames to: writer.write(frame). Used for sending data to the robot (e.g. streamed audio/video, joint commands).
# Reader — pull frames yourself
reader = robot.motor.stream.open_joints_state_reader()
frame = reader.read(timeout=1.0)

# Callback — let the SDK push frames to you
def on_state(frame):
print(frame.position("HeadYaw"))

subscription = robot.motor.stream.on_joints_state(on_state)
...
subscription.cancel()

Both forms deliver the exact same data — pick whichever fits your control flow better.

Connecting

Every Robot uses exactly one transport, chosen with one of four class methods. All return a ready-to-use Robot; use as a context manager (with Robot.connect_zmq(...) as robot:) or call robot.close() yourself.

Direct connection to the robot's onboard service hub — the lowest-latency option, for code running on the robot itself or its local network.

from luxai.robot import Robot

robot = Robot.connect_zmq(robot_id="QTRD000320")
# or connect directly to a known endpoint, skipping discovery:
robot = Robot.connect_zmq(endpoint="tcp://192.168.3.10:50557")
ParameterTypeDescription
endpointstr | NoneExplicit ZMQ endpoint (e.g. tcp://192.168.3.10:50557). Skips discovery. Mutually exclusive with robot_id.
robot_idstr | NoneRobot serial number, used for automatic endpoint discovery. Mutually exclusive with endpoint.
connect_timeoutfloatSeconds to wait for discovery when using robot_id (default 5.0).
default_rpc_timeoutfloat | NoneOverrides the default timeout applied to RPC calls on this client.

Exactly one of endpoint / robot_id must be given. Raises ValueError if neither or both are provided, TimeoutError if discovery doesn't resolve in time.

Robot properties

Once connected, the robot's self-reported identity is available without any extra calls:

PropertyTypeDescription
robot.robot_idstr | NoneThe robot's serial number (e.g. QTRD000320).
robot.robot_typestr | NoneRobot model/type string.
robot.sdk_versionstr | NoneRobot-side SDK version.
robot.min_sdk / robot.max_sdkstr | NoneSDK version range supported by the robot.

Plugins

Some capabilities aren't part of the core service hub and are enabled on demand as plugins — the camera driver, ASR engines, and kinematics helpers are all plugins. Once enabled, a plugin shows up as a regular namespace (e.g. robot.camera) regardless of where it actually runs.

robot.enable_plugin_local("kinematics")

Disable any plugin with robot.disable_plugin(name). You can also pass a custom Transport directly via robot.enable_plugin(name, transport).


tts

robot.tts controls text-to-speech: listing/selecting engines and voices, speaking plain text or SSML, and per-engine configuration.

say_text / say_text_async

Synthesize and play plain text using a selected TTS engine. Visemes may be scheduled to the face if connected, so the robot's lips move in sync.

say_text(text, engine=None, lang=None, voice=None, rate=None, pitch=None, volume=None, style=None) -> bool
say_text_async(...) -> ActionHandle
ParameterTypeDescription
textstrText to synthesize.
enginestr | NoneEngine id to use (default engine if omitted).
langstr | NoneLanguage code, e.g. en-US.
voicestr | NoneVoice id/name.
ratefloat | NoneSpeaking rate multiplier.
pitchfloat | NonePitch adjustment.
volumefloat | NoneVolume level.
stylestr | NoneSpeaking style (engine-dependent).

Returns: boolTrue on success.

robot.tts.say_text("Hello world!")
robot.tts.say_text("Slower speech", engine="acapela", rate=0.8, pitch=1.1)

say_ssml / say_ssml_async

Synthesize and play SSML markup using a selected TTS engine.

say_ssml(ssml, engine=None) -> bool
say_ssml_async(...) -> ActionHandle
ParameterTypeDescription
ssmlstrSSML markup string.
enginestr | NoneEngine id to use (default engine if omitted).

Returns: boolTrue on success.

robot.tts.say_ssml("<speak>Hello!</speak>")
robot.tts.say_ssml("<speak>Hello!</speak>", engine="azure")

Cancel an in-progress utterance the same way as say_text_async — via say_ssml_async(...) and .cancel().

Engines, voices, and languages

MethodReturnsDescription
list_engines()list[str]Loaded/available TTS engine ids.
get_default_engine()strCurrent default engine id.
set_default_engine(engine)Set the default engine id.
get_languages(engine=None)list[str]Supported language codes for an engine.
get_voices(engine=None)list[dict]Supported voices (id, lang, gender, display_name, ...).
supports_ssml(engine=None)boolWhether the engine supports SSML.
engines = robot.tts.list_engines()
robot.tts.set_default_engine("acapela")

voices = robot.tts.get_voices(engine="acapela")
for v in voices:
print(v["display_name"], v["lang"])

if robot.tts.supports_ssml(engine="azure"):
robot.tts.say_ssml("<speak>Hello!</speak>", engine="azure")

Engine configuration

MethodReturnsDescription
get_config(engine=None)dictCurrent engine configuration map.
set_config(config, engine=None)boolSet engine-specific configuration parameters.
cfg = robot.tts.get_config(engine="acapela")
robot.tts.set_config(config={"pitch": 1.0, "rate": 0.8}, engine="acapela")

face

robot.face controls the face display: eye gaze and emotion playback.

look

Move (offset) the eyes on the face display.

look(l_eye, r_eye, duration=0.0) -> None
ParameterTypeDescription
l_eyelist[dx, dy] offset for the left eye (pixels), relative to its configured center.
r_eyelist[dx, dy] offset for the right eye (pixels).
durationfloatIf > 0, eyes reset back to center after this many seconds (default 0.0).
robot.face.look(l_eye=[30, 0], r_eye=[30, 0])           # look right
robot.face.look(l_eye=[0, 0], r_eye=[0, 0], duration=2) # center, auto-reset

show_emotion / show_emotion_async

Play an emotion video on the face background lane.

show_emotion(emotion, speed=None) -> bool
show_emotion_async(emotion, speed=None) -> ActionHandle
ParameterTypeDescription
emotionstrEmotion name or relative path, with or without the .avi extension.
speedfloat | NoneOptional playback speed factor.

Returns: boolTrue if playback completed, False otherwise.

# Blocking
robot.face.show_emotion("QT/kiss")
robot.face.show_emotion("QT/surprise", speed=2.0)

# Non-blocking — cancel after 3 seconds
handle = robot.face.show_emotion_async("QT/breathing_exercise")
time.sleep(3)
handle.cancel()

list_emotions

List available emotion video files under the emotions directory (recursive scan, .avi/.AVI files).

list_emotions() -> list[str]
for e in robot.face.list_emotions():
print(e)

gesture

robot.gesture plays predefined gesture files, plays raw keyframe trajectories, and records new gestures from the robot's own motors.

play_file / play_file_async

Load a gesture XML file and play it.

play_file(gesture, speed_factor=1.0) -> bool
play_file_async(gesture, speed_factor=1.0) -> ActionHandle
ParameterTypeDescription
gesturestrGesture name/path, with or without the .xml extension.
speed_factorfloatPlayback speed multiplier (default 1.0).

Returns: boolTrue if loaded and played successfully.

robot.gesture.play_file("QT/wave")

play / play_async

Play a gesture trajectory (a keyframes dict, as returned by record()). Progress is published on the gesture.stream.on_progress stream below.

play(keyframes, resample=True, rate_hz=100.0, speed_factor=1.0) -> bool
play_async(...) -> ActionHandle
ParameterTypeDescription
keyframesdictTrajectory dict, as returned by gesture.record().
resampleboolResample for smooth playback (default True).
rate_hzfloatResample rate in Hz (default 100.0).
speed_factorfloatPlayback speed multiplier (default 1.0).
# Blocking
robot.gesture.play(keyframes)

# Non-blocking — cancel on demand
handle = robot.gesture.play_async(keyframes)
handle.cancel()

list_files

List available gesture XML files under the configured gesture directory.

list_files() -> list[str]
for g in robot.gesture.list_files():
print(g)

record / record_async, stop_record, store_record

Capture a gesture by physically moving (or releasing torque on) selected motors and recording the resulting trajectory.

record(motors, release_motors=False, delay_start_ms=0, timeout_ms=60000,
refine_keyframe=False, keyframe_pos_eps=..., keyframe_max_gap_us=...) -> dict
record_async(...) -> ActionHandle
stop_record() -> bool
store_record(gesture) -> None
ParameterTypeDescription
motorslist[str]Motor names to record.
release_motorsboolIf True, torque is disabled during recording (lets a human move the arm freely).
delay_start_msintDelay before recording starts, in ms (default 0).
timeout_msintMaximum recording duration, in ms (default 60000).
refine_keyframeboolCompress redundant keyframes if True.
keyframe_pos_epsfloatPosition epsilon for keyframe compression (degrees).
keyframe_max_gap_usintMax gap for keyframe compression (µs).

record() blocks until recording finishes (timeout or manual stop) and returns the trajectory. For interactive use, call record_async(), let the user signal when to stop, call stop_record() to end capture, then handle.result() to retrieve the trajectory — and store_record(gesture) to save it to an XML file for later playback with play_file.

handle = robot.gesture.record_async(
motors=["RightShoulderPitch", "RightElbowRoll"],
release_motors=True,
delay_start_ms=2000,
timeout_ms=20000,
)
input("Press Enter to stop...")
robot.gesture.stop_record()
keyframes = handle.result()
robot.gesture.store_record("my_wave")

Stream: on_progress / open_progress_reader

Outbound stream of gesture playback progress while play()/play_file() is running.

on_progress(callback, queue_size=None) -> StreamSubscription
open_progress_reader(queue_size=None) -> TypedStreamReader[DictFrame]

Frame payload (DictFrame): percentage (float), time_us (int).

def on_progress(frame):
print(f"{frame.value['percentage']:.1f}%")

subscription = robot.gesture.stream.on_progress(on_progress)

motor

robot.motor lists motors, controls torque/homing/velocity, and streams joint state, errors, and commands.

list

List configured motors and their parameters.

list() -> dict

Returns: dict{motor_name: {part, position_min, position_max, position_home, velocity_max, overload_threshold, ...}}.

motors = robot.motor.list()
for name, info in motors.items():
print(name, info)

Torque and homing

MethodDescription
on(motor)Enable torque for one motor.
off(motor)Disable torque for one motor.
on_all()Enable torque for all motors.
off_all()Disable torque for all motors.
home(motor)Move one motor to its configured home position.
home_all()Move all motors to their configured home positions.

All return bool (True on success).

robot.motor.on_all()
robot.motor.home("HeadYaw")
robot.motor.home_all()
robot.motor.off_all()

Velocity

set_velocity(motor, velocity) -> bool
ParameterTypeDescription
motorstrMotor name.
velocityintVelocity value, validated against the motor's max.
robot.motor.set_velocity("HeadYaw", 50)

Stream: joint state, errors, and commands

stream.open_joints_state_reader(queue_size=None) -> TypedStreamReader[JointStateFrame]
stream.on_joints_state(callback, queue_size=None) -> StreamSubscription
stream.open_joints_error_reader(queue_size=None) -> TypedStreamReader[DictFrame]
stream.on_joints_error(callback, queue_size=None) -> StreamSubscription
stream.open_joints_command_writer(queue_size=None) -> TypedStreamWriter[JointCommandFrame]
  • joints_state — outbound JointStateFrame mapping motor_name → {position, velocity, effort, voltage, temperature}.
  • joints_error — outbound DictFrame mapping motor_name → {overload_limit?, voltage_limit?, temperature_limit?, sensor_failure?} flags, when present.
  • joints_command — inbound writer for sending {'position': float, 'velocity': float (optional)} commands per motor.
# Read joint state — callback-based
def on_state(frame):
print(frame.position("HeadYaw"))

robot.motor.stream.on_joints_state(on_state)

# Read joint state — reader-based
reader = robot.motor.stream.open_joints_state_reader()
frame = reader.read()

# Watch for motor errors
def on_error(frame):
print("Motor error:", frame.value)

robot.motor.stream.on_joints_error(on_error)

# Send a joint command
from luxai.robot import JointCommandFrame

writer = robot.motor.stream.open_joints_command_writer()
cmd = JointCommandFrame()
cmd.set_joint("HeadYaw", position=30, velocity=40)
writer.write(cmd)

media

robot.media plays audio and video on two independent lanes — foreground (FG) and background (BG) — either from files or as live streamed frames. FG and BG expose an identical API; examples below use FG, just substitute bg for the background lane.

File playback

play_fg_audio_file(uri) -> bool
play_fg_audio_file_async(uri) -> ActionHandle
pause_fg_audio_file() -> None
resume_fg_audio_file() -> None

play_bg_audio_file(uri) -> bool
play_bg_audio_file_async(uri) -> ActionHandle
pause_bg_audio_file() -> None
resume_bg_audio_file() -> None
ParameterTypeDescription
uristrAudio file URI/path supported by the engine.
ok = robot.media.play_fg_audio_file("/data/audio/hello.wav")

Pause/resume work on the currently playing file:

handle = robot.media.play_bg_audio_file_async("/data/audio/music.wav")
time.sleep(2)
robot.media.pause_bg_audio_file()
time.sleep(3)
robot.media.resume_bg_audio_file()
handle.wait()

Video file playback

play_fg_video_file(uri, speed=1.0, with_audio=False) -> bool
play_fg_video_file_async(uri, speed=1.0, with_audio=False) -> ActionHandle
pause_fg_video_file() -> None
resume_fg_video_file() -> None
set_fg_video_alpha(value) -> None
ParameterTypeDescription
uristrVideo file URI/path.
speedfloatPlayback speed factor (default 1.0).
with_audioboolPlay the embedded audio track if True (default False).
value (alpha)floatForeground video transparency, 0.0 (fully transparent) to 1.0.
ok = robot.media.play_fg_video_file("/data/video/intro.mp4")
robot.media.set_fg_video_alpha(0.8)

# Pause/resume on the background lane
handle = robot.media.play_bg_video_file_async("/data/emotions/QT/kiss.avi")
time.sleep(2)
robot.media.pause_bg_video_file()
time.sleep(3)
robot.media.resume_bg_video_file()
handle.wait()

play_bg_video_file / play_bg_video_file_async / pause_bg_video_file / resume_bg_video_file mirror the FG methods above (no alpha control on BG).

Volume

set_fg_audio_volume(value) -> None
get_fg_audio_volume() -> float
set_bg_audio_volume(value) -> None
get_bg_audio_volume() -> float

value is in range [0.0, 1.0]. (For the robot's overall speaker volume rather than a single media lane, see speaker.)

robot.media.set_fg_audio_volume(0.8)
vol = robot.media.get_fg_audio_volume()

Streamed audio/video

For audio/video you generate or receive yourself (rather than a file on disk), open a stream writer and push frames to it. Use the matching cancel_*_stream() / pause_*_stream() / resume_*_stream() calls to control the pipeline — these are separate from the file-playback controls above.

stream.open_fg_audio_stream_writer(queue_size=None) -> TypedStreamWriter[AudioFrameRaw]
stream.open_bg_audio_stream_writer(queue_size=None) -> TypedStreamWriter[AudioFrameRaw]
stream.open_fg_video_stream_writer(queue_size=None) -> TypedStreamWriter[ImageFrameRaw]
stream.open_bg_video_stream_writer(queue_size=None) -> TypedStreamWriter[ImageFrameRaw]

cancel_fg_audio_stream() / pause_fg_audio_stream() / resume_fg_audio_stream()
cancel_bg_audio_stream() / pause_bg_audio_stream() / resume_bg_audio_stream()
cancel_fg_video_stream() / pause_fg_video_stream() / resume_fg_video_stream()
cancel_bg_video_stream() / pause_bg_video_stream() / resume_bg_video_stream()
from luxai.robot import AudioFrameRaw

writer = robot.media.stream.open_fg_audio_stream_writer()
writer.write(AudioFrameRaw(...))
...
robot.media.cancel_fg_audio_stream()

speaker

robot.speaker controls the robot's overall hardware volume — separate from any individual media lane's volume (see media).

set_volume(value) -> bool
get_volume() -> float
mute() -> bool
unmute() -> bool
ParameterTypeDescription
valuefloatVolume in range [0.0, 1.0].

All methods return bool/floatTrue/False indicate whether the underlying mixer control succeeded.

robot.speaker.set_volume(0.8)
vol = robot.speaker.get_volume()

robot.speaker.mute()
robot.speaker.unmute()

microphone

robot.microphone reads raw audio from the robot's internal 5-channel microphone array (and an optional external mic), tunes the array's DSP, and reports voice-activity/direction-of-arrival events.

DSP tuning

get_int_tuning() -> dict
set_int_tuning(name, value) -> bool

get_int_tuning() returns every readable parameter exposed by the internal mic array's Respeaker controller (an empty dict if the device isn't available). set_int_tuning(name, value) sets one parameter — persistence is handled via config (microphone.tunning.*) applied at startup.

params = robot.microphone.get_int_tuning()
print(params.get("AECNORM"))

robot.microphone.set_int_tuning(name="AGCONOFF", value=1.0)

Internal microphone audio (5 channels)

stream.open_int_audio_ch0_reader(queue_size=None) -> TypedStreamReader[AudioFrameRaw]
stream.on_int_audio_ch0(callback, queue_size=None) -> StreamSubscription
# ... ch1, ch2, ch3, ch4 follow the same pattern

Channel 0 is the processed/ASR-ready channel; channels 1–4 are the raw per-microphone signals (exact physical mapping depends on the ALSA layout).

def on_audio(frame):
process(frame.data)

robot.microphone.stream.on_int_audio_ch0(on_audio, queue_size=10)

# Or pull directly
reader = robot.microphone.stream.open_int_audio_ch0_reader(queue_size=10)
frame = reader.read(timeout=3.0)

External microphone audio

stream.open_ext_audio_ch0_reader(queue_size=None) -> TypedStreamReader[AudioFrameRaw]
stream.on_ext_audio_ch0(callback, queue_size=None) -> StreamSubscription

Only published if microphone.external.enabled is set in the robot's configuration.

Voice activity and direction of arrival

stream.open_int_event_reader(queue_size=None) -> TypedStreamReader[DictFrame]
stream.on_int_event(callback, queue_size=None) -> StreamSubscription

Frame payload (DictFrame): activity (bool, True when voice is detected), direction (int, estimated direction-of-arrival in degrees).

def on_event(frame):
evt = frame.value
if evt.get("activity"):
print("Voice detected — DOA:", evt.get("direction"))

robot.microphone.stream.on_int_event(on_event, queue_size=2)
note

This stream delivers only the latest event — events may be dropped if your callback/reader is too slow to keep up.


camera (plugin)

robot.camera is a plugin namespace exposing QTrobot's RealSense 3D camera: color/depth/IMU streams and intrinsics. Enable it first — see Plugins — then use it exactly like a core namespace.

robot.enable_plugin_zmq("realsense-driver", node_id="qtrobot-realsense-driver")

Intrinsics and depth scale

get_color_intrinsics() -> dict
get_depth_intrinsics() -> dict
get_depth_scale() -> dict

get_color_intrinsics() / get_depth_intrinsics() return {fx, fy, ppx, ppy, width, height, model, coeffs}. get_depth_scale() returns {'scale': float} — multiply a raw depth pixel value by scale to get metres.

intr = robot.camera.get_color_intrinsics()
info = robot.camera.get_depth_scale()
scale = info["scale"]

Image streams

stream.open_color_reader(queue_size=None) -> TypedStreamReader[ImageFrameRaw]
stream.on_color(callback, queue_size=None) -> StreamSubscription

stream.open_depth_reader(queue_size=None) -> TypedStreamReader[ImageFrameRaw]
stream.on_depth(callback, queue_size=None) -> StreamSubscription

stream.open_depth_aligned_reader(queue_size=None) -> TypedStreamReader[ImageFrameRaw]
stream.on_depth_aligned(callback, queue_size=None) -> StreamSubscription

stream.open_depth_color_reader(queue_size=None) -> TypedStreamReader[ImageFrameRaw]
stream.on_depth_color(callback, queue_size=None) -> StreamSubscription
StreamFrameDescription
colorImageFrameRaw (BGR, width × height × 3)Color image.
depthImageFrameRaw (16-bit, width × height)Raw depth image.
depth_alignedImageFrameRaw (16-bit, color resolution)Depth aligned to the color frame.
depth_colorImageFrameRaw (BGR)False-colour depth, colourised for visualisation.
reader = robot.camera.stream.open_color_reader()
frame = reader.read(timeout=3.0)

IMU streams

stream.open_gyro_reader(queue_size=None) -> TypedStreamReader[ListFrame]
stream.on_gyro(callback, queue_size=None) -> StreamSubscription

stream.open_acceleration_reader(queue_size=None) -> TypedStreamReader[ListFrame]
stream.on_acceleration(callback, queue_size=None) -> StreamSubscription

Both deliver a ListFrame of [x, y, z] — angular velocity in rad/s for gyro, linear acceleration in m/s² for acceleration.

def on_gyro(frame):
print(frame.value) # [x, y, z]

robot.camera.stream.on_gyro(on_gyro)

asr (plugin)

robot.asr is a plugin namespace for continuous speech recognition, backed by your choice of four engines: Azure, Groq Whisper, or a local Parakeet or Riva server. Enable the plugin first — see Plugins — then configure_* the engine you want once before recognizing.

Each engine exposes the same three-part shape: configure_<engine>(...), recognize_<engine>() / recognize_<engine>_async(), and a pair of streams (<engine>_speech, <engine>_event) for continuous/background consumption.

robot.enable_plugin_local("asr-azure")

configure_azure(subscription, region, languages=["en-US"], silence_timeout=0.2,
use_vad=False, continuous_mode=False) -> bool
recognize_azure() -> dict
recognize_azure_async() -> ActionHandle
ParameterTypeDescription
subscriptionstrAzure Speech subscription key.
regionstrAzure Speech region, e.g. westeurope.
languageslistLanguage codes to recognize (default ['en-US']).
silence_timeoutfloatSilence end-of-speech threshold in seconds (default 0.2).
use_vadboolEnable voice-activity detection (default False).
continuous_modeboolEnable continuous recognition mode (default False).

recognize_azure() blocks until a complete utterance is recognized; result fields include text, confidence, etc.

robot.asr.configure_azure(subscription="<key>", region="westeurope",
continuous_mode=True, use_vad=True)

result = robot.asr.recognize_azure()
print(result.get("text"))

Streamsstream.on_azure_speech(callback) / open_azure_speech_reader() deliver recognized segments (DictFrame: text, confidence, language); stream.on_azure_event(callback) / open_azure_event_reader() deliver lifecycle events (StringFrame: recognizing, recognized, canceled, session_started, session_stopped).

Every recognize_* method has a non-blocking _async counterpart returning an ActionHandle — call .cancel() on it to abort recognition early.

handle = robot.asr.recognize_azure_async()
result = handle.result()
print(result.get("text"))

kinematics (plugin)

robot.kinematics is a plugin namespace solving inverse kinematics for head gaze and arm reach/aim, working in the robot base frame (origin at the bottom of the robot: x forward, y left-positive, z up). Enable it first — see Plugins — typically with robot.enable_plugin_local("kinematics") since it computes locally.

configure

Optional — defaults already match the QTrobot hardware.

configure(fx=376.4, fy=376.1, ppx=314.6, ppy=255.6, img_cx=..., img_cy=...,
camera_height=0.6, motor_timeout=20.0) -> bool
ParameterTypeDescription
fx, fyfloatCamera focal length (default 376.4, 376.1).
ppx, ppyfloatCamera principal point (default 314.6, 255.6).
camera_heightfloatCamera height above the robot base frame, in metres (default 0.6).
motor_timeoutfloatHard cap on wait time for joint motion to complete, in seconds (default 20.0).
robot.kinematics.configure(motor_timeout=15.0)

look_at_point / look_at_point_async

Move the robot head to look at a 3-D point. Sends an eye-gaze command and, unless only_gaze=True, a head joint command; blocks until the head motors stop.

look_at_point(x, y, z, only_gaze=False, velocity=0) -> bool
look_at_point_async(...) -> ActionHandle
ParameterTypeDescription
xfloatForward distance from the robot base (metres).
yfloatLateral distance — positive = left, negative = right.
zfloatHeight above the base (metres).
only_gazeboolMove the eyes only, not the head, if True.
velocityfloatJoint velocity override (motor units); 0 = robot default.
# Blocking
robot.kinematics.look_at_point(0.8, 0.0, 1.2)

# Non-blocking
handle = robot.kinematics.look_at_point_async(0.8, 0.0, 1.2)
handle.wait()

# Eyes only
robot.kinematics.look_at_point(0.8, 0.3, 1.0, only_gaze=True)

look_at_pixel / look_at_pixel_async

Move the head to look at a camera pixel — converts (u, v, depth) to a 3-D point using the current head joint angles and camera intrinsics, then calls look_at_point.

look_at_pixel(u, v, depth=1.0, only_gaze=False, velocity=0) -> bool
look_at_pixel_async(...) -> ActionHandle
robot.kinematics.look_at_pixel(320, 240, depth=1.5)

reach_left / reach_right (+ _async)

Move one arm to reach a 3-D point, solving inverse kinematics for that arm; blocks until the arm motors stop. Points outside the reachable workspace are clamped to the closest reachable configuration.

reach_right(x, y, z, velocity=0) -> bool
reach_right_async(...) -> ActionHandle
reach_left(x, y, z, velocity=0) -> bool
reach_left_async(...) -> ActionHandle

y is negative for the right side, positive for the left side.

robot.kinematics.reach_right(0.3, -0.2, 0.5)
robot.kinematics.reach_left(0.3, 0.2, 0.5)

aim_at_point / aim_at_pixel (+ _async)

Like reach_*, but auto-selects the arm based on y (left arm if y >= 0, right arm if y < 0).

aim_at_point(x, y, z, velocity=0) -> bool
aim_at_point_async(...) -> ActionHandle
aim_at_pixel(u, v, depth=1.0, velocity=0) -> bool
aim_at_pixel_async(...) -> ActionHandle
robot.kinematics.aim_at_point(0.5, 0.0, 1.0)
robot.kinematics.aim_at_pixel(320, 240, depth=1.0)

pixel_to_point

Pure computation, no motor movement — convert a camera pixel to a 3-D point using the current head joint angles and camera intrinsics.

pixel_to_point(u, v, depth=1.0) -> dict

Returns: {'x': float, 'y': float, 'z': float} in the robot base frame (metres).

pt = robot.kinematics.pixel_to_point(320, 240, depth=1.2)
print(pt["x"], pt["y"], pt["z"])

See the Python Tutorials for runnable, narrative walkthroughs of several of the namespaces above.