From 76a3dd2efc793df51c157afeae1ecf79b047ebe6 Mon Sep 17 00:00:00 2001 From: James Aguilar Date: Sun, 4 Jan 2026 22:25:32 -0700 Subject: [PATCH 1/7] remote_control: Basic force-feedback. --- .../remote_control/main.py | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 sets/mindstorms-ev3/education-expansion/remote_control/main.py diff --git a/sets/mindstorms-ev3/education-expansion/remote_control/main.py b/sets/mindstorms-ev3/education-expansion/remote_control/main.py new file mode 100644 index 00000000..55a1ab02 --- /dev/null +++ b/sets/mindstorms-ev3/education-expansion/remote_control/main.py @@ -0,0 +1,75 @@ +from pybricks.hubs import EV3Brick +from pybricks.parameters import Color, Port, Stop +from pybricks.ev3devices import Motor +from pybricks.tools import wait, StopWatch +from micropython import const +from umath import sin, pi + +hub = EV3Brick() +m = Motor(Port.A) + +# These variables are used to compute a duty cycle that resists the user's +# attempt to turn the motor in a direction with a low duty cycle, while still +# allowing us to tell when the user stops turning the motor in that direction. +OSCILLATE_PERIOD = 22 # ms +STATIC_FRICTION_DUTY = 20 +T_COEFF = 2 * pi / OSCILLATE_PERIOD +RESIST_MAX_DUTY = 30 + + +def resist_duty(base_duty, t) -> int: + added_duty = sin(t * T_COEFF) * STATIC_FRICTION_DUTY + return int(base_duty + added_duty) + + +w = StopWatch() +lw = StopWatch() + + +LOG_PERIOD = const(100) + + +def maybe_log(x): + if lw.time() > LOG_PERIOD: + print(x) + + +while True: + t = w.time() + + angle = m.angle() + speed = m.speed() + + maybe_log(f"Angle: {angle}, Speed: {speed}") + + if abs(angle) < 10: + hub.light.on(Color.BLUE) + maybe_log("Near zero angle") + m.brake() + if abs(angle) < 1: + m.stop() + elif (angle > 0) != (speed > 0) and speed != 0: + # If the motor is currently turned in one direction and turning back + # toward zero, encourage that motion by running the motor toward zero. + maybe_log("Turning toward zero") + m.dc(-30 if angle > 0 else 30) + else: + # If the user is currently turning the motor away from zero, resist + # that movement with a duty cycle that increases the further we get + # from zero. Duty cycle of resistance maxes out at 15%. + base_duty = 0 + if abs(angle) > 50: + base_duty = min(RESIST_MAX_DUTY, RESIST_MAX_DUTY * (abs(angle) - 50) / 130) + duty = resist_duty(base_duty, t) + duty = duty if angle < 0 else -duty + maybe_log(f"Resisting with base_duty: {base_duty} duty: {duty}") + m.dc(duty) + + if abs(duty) < 8: + hub.light.on(Color.ORANGE) + else: + hub.light.on(Color.RED) + + if lw.time() > LOG_PERIOD: + lw.reset() + wait(10) \ No newline at end of file From 7838de4485ad9e2e512abfb73217673efb453ea4 Mon Sep 17 00:00:00 2001 From: James Aguilar Date: Sun, 4 Jan 2026 23:13:16 -0700 Subject: [PATCH 2/7] remote_control: Axis 2 controls. --- .../remote_control/main.py | 127 +++++++++++------- 1 file changed, 80 insertions(+), 47 deletions(-) diff --git a/sets/mindstorms-ev3/education-expansion/remote_control/main.py b/sets/mindstorms-ev3/education-expansion/remote_control/main.py index 55a1ab02..482e6f4b 100644 --- a/sets/mindstorms-ev3/education-expansion/remote_control/main.py +++ b/sets/mindstorms-ev3/education-expansion/remote_control/main.py @@ -1,75 +1,108 @@ from pybricks.hubs import EV3Brick -from pybricks.parameters import Color, Port, Stop +from pybricks.parameters import Color, Port, Stop, Direction from pybricks.ev3devices import Motor -from pybricks.tools import wait, StopWatch +from pybricks.tools import wait, StopWatch, run_task from micropython import const from umath import sin, pi hub = EV3Brick() -m = Motor(Port.A) +axis1_motor = Motor(Port.A, positive_direction=Direction.COUNTERCLOCKWISE) +axis2_motor = Motor(Port.D) +axis2_motor.stop() # These variables are used to compute a duty cycle that resists the user's # attempt to turn the motor in a direction with a low duty cycle, while still # allowing us to tell when the user stops turning the motor in that direction. -OSCILLATE_PERIOD = 22 # ms -STATIC_FRICTION_DUTY = 20 -T_COEFF = 2 * pi / OSCILLATE_PERIOD -RESIST_MAX_DUTY = 30 - -def resist_duty(base_duty, t) -> int: - added_duty = sin(t * T_COEFF) * STATIC_FRICTION_DUTY +OSCILLATE_PERIOD = const(22) # ms +AXIS1_STATIC_FRICTION_DUTY = const(20) +AXIS2_STATIC_FRICTION_DUTY = const(10) +PI = const(3.141592653589793) +T_COEFF = 2 * PI / OSCILLATE_PERIOD +RESIST_MAX_DUTY = const(30) +STEERING_DEAD_ZONE = const(10) # degrees +RESIST_START_ANGLE = const(50) # degrees - angle where resistance begins +RESIST_RANGE = const(130) # degrees - range over which resistance ramps to max +AXIS2_MAX_THROW = const(29) # degrees - maximum throw for throttle + +def resist_duty(base_duty, t, static_friction_duty) -> int: + added_duty = sin(t * T_COEFF) * static_friction_duty return int(base_duty + added_duty) -w = StopWatch() -lw = StopWatch() - - -LOG_PERIOD = const(100) - - -def maybe_log(x): - if lw.time() > LOG_PERIOD: - print(x) +def steering_wheel(t: int) -> int: + """Drives the steering wheel process. + 1. Provides force-feedback for the steering wheel depending on how + far it is from neutral. + 2. Returns an integer between -127 and 127 that expresses how far + the steering wheel is from neutral. -while True: - t = w.time() + Args: + t (int): The time in milliseconds since the process started. + Returns: + int: An integer between -127 and 127 representing the steering + wheel position. + """ + angle = axis1_motor.angle() + speed = axis1_motor.speed() - angle = m.angle() - speed = m.speed() - - maybe_log(f"Angle: {angle}, Speed: {speed}") - - if abs(angle) < 10: + if abs(angle) < STEERING_DEAD_ZONE: hub.light.on(Color.BLUE) - maybe_log("Near zero angle") - m.brake() + axis1_motor.brake() if abs(angle) < 1: - m.stop() + axis1_motor.stop() elif (angle > 0) != (speed > 0) and speed != 0: # If the motor is currently turned in one direction and turning back # toward zero, encourage that motion by running the motor toward zero. - maybe_log("Turning toward zero") - m.dc(-30 if angle > 0 else 30) + axis1_motor.dc(-30 if angle > 0 else 30) else: # If the user is currently turning the motor away from zero, resist # that movement with a duty cycle that increases the further we get # from zero. Duty cycle of resistance maxes out at 15%. base_duty = 0 - if abs(angle) > 50: - base_duty = min(RESIST_MAX_DUTY, RESIST_MAX_DUTY * (abs(angle) - 50) / 130) - duty = resist_duty(base_duty, t) + if abs(angle) > RESIST_START_ANGLE: + base_duty = min(RESIST_MAX_DUTY, RESIST_MAX_DUTY * (abs(angle) - RESIST_START_ANGLE) / RESIST_RANGE) + duty = resist_duty(base_duty, t, AXIS1_STATIC_FRICTION_DUTY) duty = duty if angle < 0 else -duty - maybe_log(f"Resisting with base_duty: {base_duty} duty: {duty}") - m.dc(duty) - - if abs(duty) < 8: - hub.light.on(Color.ORANGE) - else: - hub.light.on(Color.RED) - - if lw.time() > LOG_PERIOD: - lw.reset() - wait(10) \ No newline at end of file + axis1_motor.dc(duty) + + # Scale the return value: 0 in dead zone, ±127 at max resistance angle + if abs(angle) < STEERING_DEAD_ZONE: + return 0 + + max_angle = RESIST_START_ANGLE + RESIST_RANGE + scaled = int(127 * (abs(angle) - STEERING_DEAD_ZONE) / (max_angle - STEERING_DEAD_ZONE)) + scaled = min(127, scaled) # Cap at 127 + return scaled if angle > 0 else -scaled + + +def throttle(t: int) -> int: + """Returns an integer between 0 and 128 representing the throttle position.""" + angle = axis2_motor.angle() + + # This extra duty we apply helps keep the motor unstuck, which makes + # it easier for the user to push it. + axis2_motor.dc(resist_duty(0, t, AXIS2_STATIC_FRICTION_DUTY)) + + scaled = int(127 * angle / AXIS2_MAX_THROW) + return min(127, max(0, scaled)) + + +UPDATE_PERIOD = const(16) # ms + +async def main(): + """Main remote control process.""" + stopwatch = StopWatch() + update = StopWatch() + while True: + t = stopwatch.time() + wheel = steering_wheel(t) + throttle_value = throttle(t) + print(f"AXIS1:{wheel} AXIS2:{throttle_value}") + + await wait(update.time() - UPDATE_PERIOD) + update.reset() + + +run_task(main()) \ No newline at end of file From a8de44c22d1998b4250cb107a629a7b6fd1cddc5 Mon Sep 17 00:00:00 2001 From: James Aguilar Date: Mon, 5 Jan 2026 16:17:20 -0700 Subject: [PATCH 3/7] remote_control: Add rfcomm communication. --- .../remote_control/main.py | 100 ++++++++++++------ 1 file changed, 67 insertions(+), 33 deletions(-) diff --git a/sets/mindstorms-ev3/education-expansion/remote_control/main.py b/sets/mindstorms-ev3/education-expansion/remote_control/main.py index 482e6f4b..04201751 100644 --- a/sets/mindstorms-ev3/education-expansion/remote_control/main.py +++ b/sets/mindstorms-ev3/education-expansion/remote_control/main.py @@ -2,33 +2,40 @@ from pybricks.parameters import Color, Port, Stop, Direction from pybricks.ev3devices import Motor from pybricks.tools import wait, StopWatch, run_task +from pybricks.messaging import rfcomm_connect from micropython import const from umath import sin, pi +import ustruct hub = EV3Brick() + +# The large wheel connected to port A is the steering wheel. axis1_motor = Motor(Port.A, positive_direction=Direction.COUNTERCLOCKWISE) + +# The small wheel connected to port D is the throttle trigger. axis2_motor = Motor(Port.D) -axis2_motor.stop() # These variables are used to compute a duty cycle that resists the user's # attempt to turn the motor in a direction with a low duty cycle, while still # allowing us to tell when the user stops turning the motor in that direction. OSCILLATE_PERIOD = const(22) # ms -AXIS1_STATIC_FRICTION_DUTY = const(20) -AXIS2_STATIC_FRICTION_DUTY = const(10) -PI = const(3.141592653589793) -T_COEFF = 2 * PI / OSCILLATE_PERIOD -RESIST_MAX_DUTY = const(30) -STEERING_DEAD_ZONE = const(10) # degrees -RESIST_START_ANGLE = const(50) # degrees - angle where resistance begins -RESIST_RANGE = const(130) # degrees - range over which resistance ramps to max -AXIS2_MAX_THROW = const(29) # degrees - maximum throw for throttle +ANGULAR_FREQ_COEFF = 2 * pi / OSCILLATE_PERIOD def resist_duty(base_duty, t, static_friction_duty) -> int: - added_duty = sin(t * T_COEFF) * static_friction_duty + """Computes a duty cycle that oscillates around a base duty cycle.""" + added_duty = sin(t * ANGULAR_FREQ_COEFF) * static_friction_duty return int(base_duty + added_duty) +# The fraction of the motor's power that is required to overcome the +# motor's static friction. For large motors, it's a greater fraction +# of the motor's maximum power than for small motors. +LARGE_MOTOR_STATIC_FRICTION_DUTY = const(20) +SMALL_MOTOR_STATIC_FRICTION_DUTY = const(10) + +# The bluetooth address of the remotely controlled device. Change this +# to the address printed by your remote controlled device program. +TARGET_BLUETOOTH_ADDRESS = 'F0:45:DA:13:1C:8A' def steering_wheel(t: int) -> int: """Drives the steering wheel process. @@ -44,13 +51,20 @@ def steering_wheel(t: int) -> int: int: An integer between -127 and 127 representing the steering wheel position. """ + STEERING_RESIST_MAX_DUTY = const(30) + STEERING_DEAD_ZONE = const(10) # degrees + RESIST_START_ANGLE = const(50) # degrees - angle where resistance begins + RESIST_RANGE = const(130) # degrees - range over which resistance ramps to max + MAX_ANGLE = const(180) # degrees - maximum steering wheel angle + angle = axis1_motor.angle() speed = axis1_motor.speed() + abs_angle = abs(angle) - if abs(angle) < STEERING_DEAD_ZONE: + if abs_angle < STEERING_DEAD_ZONE: hub.light.on(Color.BLUE) axis1_motor.brake() - if abs(angle) < 1: + if abs_angle < 1: axis1_motor.stop() elif (angle > 0) != (speed > 0) and speed != 0: # If the motor is currently turned in one direction and turning back @@ -60,49 +74,69 @@ def steering_wheel(t: int) -> int: # If the user is currently turning the motor away from zero, resist # that movement with a duty cycle that increases the further we get # from zero. Duty cycle of resistance maxes out at 15%. - base_duty = 0 - if abs(angle) > RESIST_START_ANGLE: - base_duty = min(RESIST_MAX_DUTY, RESIST_MAX_DUTY * (abs(angle) - RESIST_START_ANGLE) / RESIST_RANGE) - duty = resist_duty(base_duty, t, AXIS1_STATIC_FRICTION_DUTY) + base_duty = STEERING_RESIST_MAX_DUTY * max(0, min(1, (abs_angle - RESIST_START_ANGLE) / RESIST_RANGE)) + duty = resist_duty(base_duty, t, LARGE_MOTOR_STATIC_FRICTION_DUTY) duty = duty if angle < 0 else -duty axis1_motor.dc(duty) # Scale the return value: 0 in dead zone, ±127 at max resistance angle - if abs(angle) < STEERING_DEAD_ZONE: + if abs_angle < STEERING_DEAD_ZONE: return 0 - max_angle = RESIST_START_ANGLE + RESIST_RANGE - scaled = int(127 * (abs(angle) - STEERING_DEAD_ZONE) / (max_angle - STEERING_DEAD_ZONE)) + scaled = int(127 * (abs_angle - STEERING_DEAD_ZONE) / (MAX_ANGLE - STEERING_DEAD_ZONE)) scaled = min(127, scaled) # Cap at 127 return scaled if angle > 0 else -scaled +AXIS2_MAX_THROW = const(29) # degrees - maximum throw for throttle + + def throttle(t: int) -> int: - """Returns an integer between 0 and 128 representing the throttle position.""" + """Returns an integer between 0 and 127 representing the throttle position.""" angle = axis2_motor.angle() # This extra duty we apply helps keep the motor unstuck, which makes - # it easier for the user to push it. - axis2_motor.dc(resist_duty(0, t, AXIS2_STATIC_FRICTION_DUTY)) + # it easier for the user to adjust it with small increments. + axis2_motor.dc(resist_duty(0, t, SMALL_MOTOR_STATIC_FRICTION_DUTY)) scaled = int(127 * angle / AXIS2_MAX_THROW) return min(127, max(0, scaled)) +# How often we send update messages to the remote controlled device. UPDATE_PERIOD = const(16) # ms async def main(): """Main remote control process.""" - stopwatch = StopWatch() - update = StopWatch() while True: - t = stopwatch.time() - wheel = steering_wheel(t) - throttle_value = throttle(t) - print(f"AXIS1:{wheel} AXIS2:{throttle_value}") - - await wait(update.time() - UPDATE_PERIOD) - update.reset() - + hub.light.on(Color.RED) + conn = None + try: + conn = await rfcomm_connect(TARGET_BLUETOOTH_ADDRESS) + hub.light.on(Color.GREEN) + stopwatch = StopWatch() + update = StopWatch() + msg_buf = bytearray(2) + while True: + t = stopwatch.time() + wheel = steering_wheel(t) + throttle_value = throttle(t) + + if update.time() < UPDATE_PERIOD: + # We drive the steering wheel and throttle at a + # higher rate than we send out updates, because + # their motors need to be adjusted more frequently + # than our RC device needs to receive updates. + continue + + ustruct.pack_into('>bb', msg_buf, 0, wheel, throttle_value) + conn.write(msg_buf) + update.reset() + + print(f"AXIS1:{wheel} AXIS2:{throttle_value}") + await wait(1) + finally: + if conn: + conn.close() run_task(main()) \ No newline at end of file From 17ee32408a62864c4f92e6d9bcf69aaa5e44f048 Mon Sep 17 00:00:00 2001 From: James Aguilar Date: Mon, 5 Jan 2026 16:17:34 -0700 Subject: [PATCH 4/7] tankbot_rc: Add RC version of tankbot. --- .../education-expansion/tank_bot_rc/main.py | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100755 sets/mindstorms-ev3/education-expansion/tank_bot_rc/main.py diff --git a/sets/mindstorms-ev3/education-expansion/tank_bot_rc/main.py b/sets/mindstorms-ev3/education-expansion/tank_bot_rc/main.py new file mode 100755 index 00000000..f5ad7099 --- /dev/null +++ b/sets/mindstorms-ev3/education-expansion/tank_bot_rc/main.py @@ -0,0 +1,87 @@ +#!/usr/bin/env pybricks-micropython + +""" +Example LEGO® MINDSTORMS® EV3 Tank Bot Program +---------------------------------------------- + +This program requires LEGO® EV3 MicroPython v2.0. +Download: https://education.lego.com/en-us/support/mindstorms-ev3/python-for-ev3 + +Building instructions can be found at: +https://education.lego.com/en-us/support/mindstorms-ev3/building-instructions#building-expansion +""" + +from pybricks.hubs import EV3Brick +from pybricks.ev3devices import Motor, GyroSensor +from pybricks.parameters import Port, Direction, Button, Color +from pybricks.tools import StopWatch, wait, run_task +from pybricks.robotics import DriveBase +from pybricks.messaging import rfcomm_listen, local_address + +from micropython import const + +import ustruct + +# Initialize the EV3 brick. +ev3 = EV3Brick() + +# Configure 2 motors on Ports B and C. Set the motor directions to +# counterclockwise, so that positive speed values make the robot move +# forward. These will be the left and right motors of the Tank Bot. +left_motor = Motor(Port.A, Direction.COUNTERCLOCKWISE) +right_motor = Motor(Port.D, Direction.COUNTERCLOCKWISE) + +# The wheel diameter of the Tank Bot is about 54 mm. +WHEEL_DIAMETER = 54 + +# The axle track is the distance between the centers of each of the +# wheels. This is about 200 mm for the Tank Bot. +AXLE_TRACK = 200 + +# The Driving Base is comprised of 2 motors. There is a wheel on each +# motor. The wheel diameter and axle track values are used to make the +# motors move at the correct speed when you give a drive command. +robot = DriveBase(left_motor, right_motor, WHEEL_DIAMETER, AXLE_TRACK) + +SPEED_SCALE = 6 # Scale factor for speed (768 // 127) +TURN_SCALE = 2 # Scale factor for turn rate (320 // 127) + +# Storage for incoming messages from remote control. +msg_buf = bytearray(2) +msg_buf_view = memoryview(msg_buf) + +# Tracks the next-to-be-filled index in msg_buf. +cur_idx = 0 + +async def main(): + while True: + print('Local address: ', local_address()) + ev3.light.on(Color.RED) + print('Waiting for connection...') + conn = await rfcomm_listen() + print('Connected!') + ev3.light.on(Color.GREEN) + + timeout = StopWatch() + cur_idx = 0 + while timeout.time() < 100: + cur_idx += conn.readinto(msg_buf_view[cur_idx:], len(msg_buf) - cur_idx) + + if cur_idx != len(msg_buf): + # We were not able to read the entire message. Loop again. + await wait(1) + continue + + timeout.reset() + axis1, axis2 = ustruct.unpack('>bb', msg_buf) + cur_idx = 0 + + speed = axis2 * SPEED_SCALE # -768 to +768 mm/s + turn_rate = axis1 * TURN_SCALE # -320 to +320 deg/s + robot.drive(speed, turn_rate) + + robot.stop() + print('Client disconnected or timed out.') + conn.close() + +run_task(main()) \ No newline at end of file From 4103da7bf5614284cb5d32bd93be720a0f65ab2a Mon Sep 17 00:00:00 2001 From: James Aguilar Date: Sun, 18 Jan 2026 10:29:30 -0700 Subject: [PATCH 5/7] tankbot_rc && remote_control: Use RFCOMMSocket. This new API is what we intend to be using going forward. --- .../remote_control/main.py | 12 ++--- .../education-expansion/tank_bot_rc/main.py | 54 +++++++++---------- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/sets/mindstorms-ev3/education-expansion/remote_control/main.py b/sets/mindstorms-ev3/education-expansion/remote_control/main.py index 04201751..ed7cd98c 100644 --- a/sets/mindstorms-ev3/education-expansion/remote_control/main.py +++ b/sets/mindstorms-ev3/education-expansion/remote_control/main.py @@ -2,7 +2,7 @@ from pybricks.parameters import Color, Port, Stop, Direction from pybricks.ev3devices import Motor from pybricks.tools import wait, StopWatch, run_task -from pybricks.messaging import rfcomm_connect +from pybricks.messaging import RFCOMMSocket from micropython import const from umath import sin, pi import ustruct @@ -108,11 +108,11 @@ def throttle(t: int) -> int: async def main(): """Main remote control process.""" + sock = RFCOMMSocket() while True: hub.light.on(Color.RED) - conn = None try: - conn = await rfcomm_connect(TARGET_BLUETOOTH_ADDRESS) + await sock.connect(TARGET_BLUETOOTH_ADDRESS) hub.light.on(Color.GREEN) stopwatch = StopWatch() update = StopWatch() @@ -130,13 +130,13 @@ async def main(): continue ustruct.pack_into('>bb', msg_buf, 0, wheel, throttle_value) - conn.write(msg_buf) + sock.write(msg_buf) update.reset() print(f"AXIS1:{wheel} AXIS2:{throttle_value}") await wait(1) finally: - if conn: - conn.close() + if sock: + sock.close() run_task(main()) \ No newline at end of file diff --git a/sets/mindstorms-ev3/education-expansion/tank_bot_rc/main.py b/sets/mindstorms-ev3/education-expansion/tank_bot_rc/main.py index f5ad7099..2e26bf8f 100755 --- a/sets/mindstorms-ev3/education-expansion/tank_bot_rc/main.py +++ b/sets/mindstorms-ev3/education-expansion/tank_bot_rc/main.py @@ -16,7 +16,7 @@ from pybricks.parameters import Port, Direction, Button, Color from pybricks.tools import StopWatch, wait, run_task from pybricks.robotics import DriveBase -from pybricks.messaging import rfcomm_listen, local_address +from pybricks.messaging import RFCOMMSocket, local_address from micropython import const @@ -54,34 +54,34 @@ cur_idx = 0 async def main(): - while True: - print('Local address: ', local_address()) - ev3.light.on(Color.RED) - print('Waiting for connection...') - conn = await rfcomm_listen() - print('Connected!') - ev3.light.on(Color.GREEN) - - timeout = StopWatch() + sock = RFCOMMSocket() + print('Local address: ', local_address()) + ev3.light.on(Color.RED) + print('Waiting for connection...') + await sock.listen() + print('Connected!') + ev3.light.on(Color.GREEN) + + timeout = StopWatch() + cur_idx = 0 + while timeout.time() < 100: + cur_idx += sock.readinto(msg_buf_view[cur_idx:], len(msg_buf) - cur_idx) + + if cur_idx != len(msg_buf): + # We were not able to read the entire message. Loop again. + await wait(1) + continue + + timeout.reset() + axis1, axis2 = ustruct.unpack('>bb', msg_buf) cur_idx = 0 - while timeout.time() < 100: - cur_idx += conn.readinto(msg_buf_view[cur_idx:], len(msg_buf) - cur_idx) - if cur_idx != len(msg_buf): - # We were not able to read the entire message. Loop again. - await wait(1) - continue + speed = axis2 * SPEED_SCALE # -768 to +768 mm/s + turn_rate = axis1 * TURN_SCALE # -320 to +320 deg/s + robot.drive(speed, turn_rate) - timeout.reset() - axis1, axis2 = ustruct.unpack('>bb', msg_buf) - cur_idx = 0 - - speed = axis2 * SPEED_SCALE # -768 to +768 mm/s - turn_rate = axis1 * TURN_SCALE # -320 to +320 deg/s - robot.drive(speed, turn_rate) - - robot.stop() - print('Client disconnected or timed out.') - conn.close() + robot.stop() + print('Client disconnected or timed out.') + sock.close() run_task(main()) \ No newline at end of file From 2e536fc52d9e0d81947e4cef50ec05155964a350 Mon Sep 17 00:00:00 2001 From: James Aguilar Date: Sun, 18 Jan 2026 10:36:43 -0700 Subject: [PATCH 6/7] fixup! tankbot_rc && remote_control: Use RFCOMMSocket. Use context manager in remote_control. --- .../education-expansion/remote_control/main.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sets/mindstorms-ev3/education-expansion/remote_control/main.py b/sets/mindstorms-ev3/education-expansion/remote_control/main.py index ed7cd98c..1778b358 100644 --- a/sets/mindstorms-ev3/education-expansion/remote_control/main.py +++ b/sets/mindstorms-ev3/education-expansion/remote_control/main.py @@ -108,10 +108,9 @@ def throttle(t: int) -> int: async def main(): """Main remote control process.""" - sock = RFCOMMSocket() - while True: - hub.light.on(Color.RED) - try: + with RFCOMMSocket() as sock: + while True: + hub.light.on(Color.RED) await sock.connect(TARGET_BLUETOOTH_ADDRESS) hub.light.on(Color.GREEN) stopwatch = StopWatch() @@ -127,6 +126,7 @@ async def main(): # higher rate than we send out updates, because # their motors need to be adjusted more frequently # than our RC device needs to receive updates. + await wait(1) continue ustruct.pack_into('>bb', msg_buf, 0, wheel, throttle_value) @@ -135,8 +135,5 @@ async def main(): print(f"AXIS1:{wheel} AXIS2:{throttle_value}") await wait(1) - finally: - if sock: - sock.close() run_task(main()) \ No newline at end of file From 11acbe72597353b58a9574e785c6cc9c5fce655c Mon Sep 17 00:00:00 2001 From: James Aguilar Date: Tue, 20 Jan 2026 15:50:54 -0700 Subject: [PATCH 7/7] fixup! tankbot_rc && remote_control: Use RFCOMMSocket. Use new UARTDevice-inspired API. --- .../remote_control/main.py | 5 ++-- .../education-expansion/tank_bot_rc/main.py | 28 ++++++------------- 2 files changed, 10 insertions(+), 23 deletions(-) diff --git a/sets/mindstorms-ev3/education-expansion/remote_control/main.py b/sets/mindstorms-ev3/education-expansion/remote_control/main.py index 1778b358..e6baf573 100644 --- a/sets/mindstorms-ev3/education-expansion/remote_control/main.py +++ b/sets/mindstorms-ev3/education-expansion/remote_control/main.py @@ -115,7 +115,7 @@ async def main(): hub.light.on(Color.GREEN) stopwatch = StopWatch() update = StopWatch() - msg_buf = bytearray(2) + while True: t = stopwatch.time() wheel = steering_wheel(t) @@ -129,8 +129,7 @@ async def main(): await wait(1) continue - ustruct.pack_into('>bb', msg_buf, 0, wheel, throttle_value) - sock.write(msg_buf) + await sock.write(ustruct.pack('>bb', wheel, throttle_value)) update.reset() print(f"AXIS1:{wheel} AXIS2:{throttle_value}") diff --git a/sets/mindstorms-ev3/education-expansion/tank_bot_rc/main.py b/sets/mindstorms-ev3/education-expansion/tank_bot_rc/main.py index 2e26bf8f..3f40c8ff 100755 --- a/sets/mindstorms-ev3/education-expansion/tank_bot_rc/main.py +++ b/sets/mindstorms-ev3/education-expansion/tank_bot_rc/main.py @@ -14,9 +14,9 @@ from pybricks.hubs import EV3Brick from pybricks.ev3devices import Motor, GyroSensor from pybricks.parameters import Port, Direction, Button, Color -from pybricks.tools import StopWatch, wait, run_task +from pybricks.tools import StopWatch, wait, run_task, multitask from pybricks.robotics import DriveBase -from pybricks.messaging import RFCOMMSocket, local_address +from pybricks.messaging import RFCOMMSocket, local_address from micropython import const @@ -46,13 +46,6 @@ SPEED_SCALE = 6 # Scale factor for speed (768 // 127) TURN_SCALE = 2 # Scale factor for turn rate (320 // 127) -# Storage for incoming messages from remote control. -msg_buf = bytearray(2) -msg_buf_view = memoryview(msg_buf) - -# Tracks the next-to-be-filled index in msg_buf. -cur_idx = 0 - async def main(): sock = RFCOMMSocket() print('Local address: ', local_address()) @@ -62,19 +55,14 @@ async def main(): print('Connected!') ev3.light.on(Color.GREEN) - timeout = StopWatch() - cur_idx = 0 - while timeout.time() < 100: - cur_idx += sock.readinto(msg_buf_view[cur_idx:], len(msg_buf) - cur_idx) + while True: + (msg, _) = await multitask(sock.read(2), wait(100), race=True) - if cur_idx != len(msg_buf): - # We were not able to read the entire message. Loop again. - await wait(1) - continue + if msg is None: + # Connection timed out. + break - timeout.reset() - axis1, axis2 = ustruct.unpack('>bb', msg_buf) - cur_idx = 0 + axis1, axis2 = ustruct.unpack('>bb', msg) speed = axis2 * SPEED_SCALE # -768 to +768 mm/s turn_rate = axis1 * TURN_SCALE # -320 to +320 deg/s