From df56103cadee43f1c83475fa0879b57e3460592b Mon Sep 17 00:00:00 2001 From: Mohadeseh Date: Mon, 27 Oct 2025 18:04:47 +0100 Subject: [PATCH 1/5] feat: update robosuite/maple --- robosuite/__init__.py | 1 + .../environments/manipulation/stack_three.py | 382 ++++++++++++++++++ robosuite/utils/placement_samplers.py | 3 +- 3 files changed, 385 insertions(+), 1 deletion(-) create mode 100644 robosuite/environments/manipulation/stack_three.py diff --git a/robosuite/__init__.py b/robosuite/__init__.py index 87b6ef07b0..325cb90dea 100644 --- a/robosuite/__init__.py +++ b/robosuite/__init__.py @@ -3,6 +3,7 @@ # Manipulation environments from robosuite.environments.manipulation.lift import Lift from robosuite.environments.manipulation.stack import Stack +from robosuite.environments.manipulation.stack_three import StackThree from robosuite.environments.manipulation.cleanup import Cleanup from robosuite.environments.manipulation.peg_in_hole import PegInHole from robosuite.environments.manipulation.nut_assembly import NutAssembly diff --git a/robosuite/environments/manipulation/stack_three.py b/robosuite/environments/manipulation/stack_three.py new file mode 100644 index 0000000000..9d61cb328a --- /dev/null +++ b/robosuite/environments/manipulation/stack_three.py @@ -0,0 +1,382 @@ +from collections import OrderedDict +import numpy as np +from robosuite.utils.transform_utils import convert_quat +from robosuite.utils.mjcf_utils import CustomMaterial +from robosuite.environments.manipulation.single_arm_env import SingleArmEnv +from robosuite.models.arenas import TableArena +from robosuite.models.objects import BoxObject +from robosuite.models.tasks import ManipulationTask +from robosuite.utils.placement_samplers import UniformRandomSampler + +class StackThree(SingleArmEnv): + """ + Minimal change version of StackThree. Stacks three identical cubes (A, B, C). + Reward shaping is updated to incentivize targeting the next unstacked cube. + """ + # [Rest of __init__ is unchanged] + def __init__( + self, + robots, + env_configuration="default", + controller_configs=None, + gripper_types="default", + initialization_noise="default", + table_full_size=(0.8, 0.8, 0.05), + table_friction=(1., 5e-3, 1e-4), + table_offset=(0, 0, 0.8), + use_camera_obs=True, + use_object_obs=True, + reward_scale=1.0, + reward_shaping=False, + placement_initializer=None, + has_renderer=False, + has_offscreen_renderer=True, + render_camera="frontview", + render_collision_mesh=False, + render_visual_mesh=True, + render_gpu_device_id=-1, + control_freq=20, + horizon=1000, + ignore_done=False, + hard_reset=True, + camera_names="agentview", + camera_heights=256, + camera_widths=256, + camera_depths=False, + full_stacking_bonus=0.0, + skill_config=None, + ): + # table/top settings + self.table_full_size = table_full_size + self.table_friction = table_friction + self.table_offset = np.array(table_offset) + # reward config + self.reward_scale = reward_scale + self.reward_shaping = reward_shaping + # whether to include object states in observations + self.use_object_obs = use_object_obs + # object placement initializer + self.placement_initializer = placement_initializer + # bonus for stacking (kept but not required for sparse success) + self.full_stacking_bonus = full_stacking_bonus + + # Cube identifiers for staged rewards + self.cubes = None + self.cube_body_ids = None + + super().__init__( + robots=robots, + env_configuration=env_configuration, + controller_configs=controller_configs, + mount_types="default", + gripper_types=gripper_types, + initialization_noise=initialization_noise, + use_camera_obs=use_camera_obs, + has_renderer=has_renderer, + has_offscreen_renderer=has_offscreen_renderer, + render_camera=render_camera, + render_collision_mesh=render_collision_mesh, + render_visual_mesh=render_visual_mesh, + render_gpu_device_id=render_gpu_device_id, + control_freq=control_freq, + horizon=horizon, + ignore_done=ignore_done, + hard_reset=hard_reset, + camera_names=camera_names, + camera_heights=256, + camera_widths=256, + camera_depths=False, + skill_config=skill_config, + ) + + # [Reward function is unchanged] + def reward(self, action): + r_reach, r_lift, r_stack = self.staged_rewards() + if self.reward_shaping: + reward = max(r_reach, r_lift, r_stack) + else: + # sparse: only give reward when the full 3-block stack is present + reward = 3.0 if (r_stack >= 3.0 - 1e-6) else 0.0 # Use 3.0 here for easier scaling + + if self.reward_scale is not None: + # Final reward is scaled by reward_scale / 3.0 so that a full success equals reward_scale. + reward *= self.reward_scale / 3.0 + return reward + + # ----------------------------------------------------------- + # CRITICAL CHANGE: Phased Reward Logic to Target Unstacked Cubes + # ----------------------------------------------------------- + def staged_rewards(self): + """ + Returns 3 components (r_reach, r_lift, r_stack) adapted to three-block stacking. + The reach/lift target is dynamically set to the next cube that needs to be picked. + """ + gripper_site_pos = self.sim.data.site_xpos[self.robots[0].eef_site_id] + + # Cube positions (must use copies as they are needed for comparisons) + cubeA_pos = self.sim.data.body_xpos[self.cubeA_body_id].copy() + cubeB_pos = self.sim.data.body_xpos[self.cubeB_body_id].copy() + cubeC_pos = self.sim.data.body_xpos[self.cubeC_body_id].copy() + + # 1. Determine Stacking Status and Target Cube + # Check contact (robust way to track stack progress) + a_on_b = self.check_contact(self.cubeA, self.cubeB) + a_on_c = self.check_contact(self.cubeA, self.cubeC) + b_on_c = self.check_contact(self.cubeB, self.cubeC) + + r_stack = 0.0 + r_stack_progress = 0 # 0, 1, or 2 for progress + + # Assume C is the fixed base for now (simplification) + # Check B on C (1-block stack achieved) + if b_on_c: + r_stack += 1.5 + r_stack_progress = 1 + + # Check A on B (2-block stack achieved, full tower) + if a_on_b: + r_stack += 1.5 + r_stack_progress = 2 + + # Identify the next cube to pick (target_cube) and its placement target (target_base_pos) + target_cube = None + target_pos = None + + if r_stack_progress == 0: + # Stage 1: Pick the first cube (e.g., B) and place it on the table (C). + # This is hard to define without a fixed sequence. A simpler approach is: + # Target the cube that is not yet part of a 2-block stack. + # We'll stick to a fixed sequence A->B->C (or B on C, then A on B) for simplicity, + # and let the policy figure out the order, but prioritize picking an unstacked cube. + + # We'll use the cube furthest from the gripper as a default if no clear stack target. + # Find the closest ungrasped, unstacked cube to the gripper. + + # Let's use a simpler heuristic for the target: the cube not currently being held. + # If nothing is held, target the cube closest to the gripper. + + cubes_list = [self.cubeA, self.cubeB, self.cubeC] + positions = [cubeA_pos, cubeB_pos, cubeC_pos] + + # Find the cube currently being held + grasping_cubes = [self._check_grasp(self.robots[0].gripper, cube) for cube in cubes_list] + held_cube_idx = np.where(grasping_cubes)[0] + + if len(held_cube_idx) == 0: + # Nothing held: Target the closest cube to pick up. + dists = [np.linalg.norm(gripper_site_pos - pos) for pos in positions] + target_cube = cubes_list[np.argmin(dists)] + target_cube_pos = positions[np.argmin(dists)] + else: + # Holding a cube: The target is the placement location. + held_cube = cubes_list[held_cube_idx[0]] + + # If holding the first cube (e.g., B), target C for placement. + if held_cube is self.cubeB and not b_on_c: + target_cube_pos = cubeC_pos + # If holding the second cube (e.g., A), target B for placement (if B is on C). + elif held_cube is self.cubeA and b_on_c and not a_on_b: + target_cube_pos = cubeB_pos + else: + # Default: keep aligning it over a potential base (e.g., the table or the lowest base) + target_cube_pos = np.array([0, 0, self.table_offset[2]]) # Center of table + + target_cube = held_cube # Redefine target_cube for the reach reward in the lift/align phase + + # 2. Reach and Grasp Reward (r_reach) + # We always incentivize reaching and grasping the current TARGET CUBE. + if target_cube is None: # Should only happen if all cubes are somehow placed or in a weird state + target_cube = self.cubeA + target_cube_pos = cubeA_pos + + dist = np.linalg.norm(gripper_site_pos - target_cube_pos) + r_reach = (1 - np.tanh(10.0 * dist)) * 0.25 + + # Grasping reward for the target cube + grasping_target = self._check_grasp(self.robots[0].gripper, object_geoms=target_cube) + if grasping_target: + r_reach += 0.25 # Reach + Grasp max 0.5 + + # 3. Lift and Align Reward (r_lift) + + # Cube must be lifted. We use the currently grasped cube's height. + cube_to_check = target_cube if grasping_target else self.cubeA # Fallback to A if nothing is grasped + cube_to_check_pos = self.sim.data.body_xpos[self.sim.model.body_name2id(cube_to_check.root_body)] + + cube_lifted = cube_to_check_pos[2] > self.table_offset[2] + 0.04 + r_lift = 1.0 if cube_lifted else 0.0 + + # Aligning: When the *grasped cube* is above its *placement target*. + if target_cube_pos is not None and target_cube_pos[2] > self.table_offset[2] + 0.01: + # The target_cube_pos here is the placement location (B or C), not the cube being held. + # We need the pos of the CUBE BEING HELD for alignment + held_cube_pos = cube_to_check_pos + + # Placement target is the base cube's center + placement_base_pos = None + if grasping_target and target_cube is self.cubeB and not b_on_c: + # Holding B, target is C + placement_base_pos = cubeC_pos + elif grasping_target and target_cube is self.cubeA and b_on_c and not a_on_b: + # Holding A, target is B + placement_base_pos = cubeB_pos + + if placement_base_pos is not None: + horiz_dist = np.linalg.norm(held_cube_pos[:2] - placement_base_pos[:2]) + r_lift += 0.5 * (1 - np.tanh(10.0 * horiz_dist)) # Max r_lift is 1.5 + + # 4. Stacking (r_stack) is calculated at the beginning (0.0, 1.5, or 3.0) + # Note: If r_stack is 1.5, the policy should shift its focus from 'picking first cube' to 'picking second cube' due to the dynamic target logic in step 1. + + return r_reach, r_lift, r_stack + + # [Rest of the class is mostly unchanged] + def _get_env_info(self, action): + info = super()._get_env_info(action) + r_reach, r_lift, r_stack = self.staged_rewards() + cubes_stacked = (r_stack >= 3.0 - 1e-6) + info.update({ + # Max possible r_reach is 0.5, r_lift is 1.5, r_stack is 3.0 + 'r_reach_grasp': r_reach / 0.50, + 'r_lift_align': r_lift / 1.50, + 'r_stack': r_stack / 3.0, + 'cubes_stacked': cubes_stacked, + 'success': self._check_success(), + }) + return info + + def _get_skill_info(self): + # This is now less relevant since the target is dynamic, but we'll stick to A as a default target + cubeA_pos = self.sim.data.body_xpos[self.cubeA_body_id].copy() + cubeB_pos = self.sim.data.body_xpos[self.cubeB_body_id].copy() + cubeC_pos = self.sim.data.body_xpos[self.cubeC_body_id].copy() + pos_info = {} + # Default targets: Grasp A, place on B, which is placed on C + pos_info['grasp'] = [cubeA_pos] + pos_info['reach'] = [cubeB_pos] + pos_info['base'] = [cubeC_pos] + info = {} + for k in pos_info: + info[k + '_pos'] = pos_info[k] + return info + + # ----------------------------------------------------------- + # CRITICAL CHANGE: All Cubes Same Size (Medium) + # ----------------------------------------------------------- + def _load_model(self): + """ Loads the model and adds three identical boxes (cubeA, cubeB, cubeC). """ + super()._load_model() + xpos = self.robots[0].robot_model.base_xpos_offset["table"](self.table_full_size[0]) + self.robots[0].robot_model.set_base_xpos(xpos) + + mujoco_arena = TableArena( + table_full_size=self.table_full_size, + table_friction=self.table_friction, + table_offset=self.table_offset, + ) + mujoco_arena.set_origin([0, 0, 0]) + + # materials + tex_attrib = {"type": "cube"} + mat_attrib = {"texrepeat": "1 1", "specular": "0.4", "shininess": "0.1"} + redwood = CustomMaterial(texture="WoodRed", tex_name="redwood", mat_name="redwood_mat", tex_attrib=tex_attrib, mat_attrib=mat_attrib) + greenwood = CustomMaterial(texture="WoodGreen", tex_name="greenwood", mat_name="greenwood_mat", tex_attrib=tex_attrib, mat_attrib=mat_attrib) + bluewood = CustomMaterial(texture="WoodBlue", tex_name="bluewood", mat_name="bluewood_mat", tex_attrib=tex_attrib, mat_attrib=mat_attrib) + + # Three IDENTICAL boxes (using the medium size: 0.025) + cube_size = [0.025, 0.025, 0.025] + self.cubeA = BoxObject(name="cubeA", size_min=cube_size, size_max=cube_size, rgba=[1, 0, 0, 1], material=redwood) + self.cubeB = BoxObject(name="cubeB", size_min=cube_size, size_max=cube_size, rgba=[0, 1, 0, 1], material=greenwood) + self.cubeC = BoxObject(name="cubeC", size_min=cube_size, size_max=cube_size, rgba=[0, 0, 1, 1], material=bluewood) + cubes = [self.cubeA, self.cubeB, self.cubeC] + + if self.placement_initializer is not None: + self.placement_initializer.reset() + self.placement_initializer.add_objects(cubes) + else: + self.placement_initializer = UniformRandomSampler( + name="ObjectSampler", + mujoco_objects=cubes, + x_range=[-0.08, 0.08], + y_range=[-0.08, 0.08], + rotation=None, + ensure_object_boundary_in_range=False, + ensure_valid_placement=True, + reference_pos=self.table_offset, + z_offset=0.01, + ) + + self.model = ManipulationTask( + mujoco_arena=mujoco_arena, + mujoco_robots=[robot.robot_model for robot in self.robots], + mujoco_objects=cubes, + ) + self.cubes = cubes + + def _get_reference(self): + super()._get_reference() + self.cubeA_body_id = self.sim.model.body_name2id(self.cubeA.root_body) + self.cubeB_body_id = self.sim.model.body_name2id(self.cubeB.root_body) + self.cubeC_body_id = self.sim.model.body_name2id(self.cubeC.root_body) + self.cube_body_ids = [self.cubeA_body_id, self.cubeB_body_id, self.cubeC_body_id] + + def _reset_internal(self): + super()._reset_internal() + if not self.deterministic_reset: + object_placements = self.placement_initializer.sample() + for obj_pos, obj_quat, obj in object_placements.values(): + self.sim.data.set_joint_qpos(obj.joints[0], np.concatenate([np.array(obj_pos), np.array(obj_quat)])) + + def _get_observation(self): + di = super()._get_observation() + if self.use_object_obs: + pr = self.robots[0].robot_model.naming_prefix + + # Helper to get cube data + def get_cube_data(cube_body_id): + pos = np.array(self.sim.data.body_xpos[cube_body_id]) + quat = convert_quat(np.array(self.sim.data.body_xquat[cube_body_id]), to="xyzw") + return pos, quat + + cubeA_pos, cubeA_quat = get_cube_data(self.cubeA_body_id) + cubeB_pos, cubeB_quat = get_cube_data(self.cubeB_body_id) + cubeC_pos, cubeC_quat = get_cube_data(self.cubeC_body_id) + + di["cubeA_pos"], di["cubeA_quat"] = cubeA_pos, cubeA_quat + di["cubeB_pos"], di["cubeB_quat"] = cubeB_pos, cubeB_quat + di["cubeC_pos"], di["cubeC_quat"] = cubeC_pos, cubeC_quat + + gripper_site_pos = np.array(self.sim.data.site_xpos[self.robots[0].eef_site_id]) + + di[pr + "gripper_to_cubeA"] = gripper_site_pos - cubeA_pos + di[pr + "gripper_to_cubeB"] = gripper_site_pos - cubeB_pos + di[pr + "gripper_to_cubeC"] = gripper_site_pos - cubeC_pos + di["cubeA_to_cubeB"] = cubeA_pos - cubeB_pos + di["cubeB_to_cubeC"] = cubeB_pos - cubeC_pos + di["cubeA_to_cubeC"] = cubeA_pos - cubeC_pos + + # object-state contains positions, quaternions, and relative vectors for all three cubes + di["object-state"] = np.concatenate( + [ + cubeA_pos, cubeA_quat, + cubeB_pos, cubeB_quat, + cubeC_pos, cubeC_quat, + di[pr + "gripper_to_cubeA"], + di[pr + "gripper_to_cubeB"], + di[pr + "gripper_to_cubeC"], + di["cubeA_to_cubeB"], + di["cubeB_to_cubeC"], + di["cubeA_to_cubeC"], + ] + ) + return di + + def _check_success(self): + _, _, r_stack = self.staged_rewards() + return r_stack >= 3.0 - 1e-6 + + def visualize(self, vis_settings): + super().visualize(vis_settings=vis_settings) + if vis_settings.get("grippers", False): + # Target visualization is less meaningful with dynamic targets, keep the original target (A) for simplicity + self._visualize_gripper_to_target(gripper=self.robots[0].gripper, target=self.cubeA) \ No newline at end of file diff --git a/robosuite/utils/placement_samplers.py b/robosuite/utils/placement_samplers.py index 4d97a5fc87..081413575f 100644 --- a/robosuite/utils/placement_samplers.py +++ b/robosuite/utils/placement_samplers.py @@ -1,4 +1,5 @@ import collections +from collections.abc import Iterable import numpy as np from copy import copy @@ -194,7 +195,7 @@ def _sample_quat(self): """ if self.rotation is None: rot_angle = np.random.uniform(high=2 * np.pi, low=0) - elif isinstance(self.rotation, collections.Iterable): + elif isinstance(self.rotation, Iterable): rot_angle = np.random.uniform( high=max(self.rotation), low=min(self.rotation) ) From ee72dedcd115b841910c5ce91680825e13c67b46 Mon Sep 17 00:00:00 2001 From: Mohadeseh Date: Tue, 28 Oct 2025 13:26:31 +0100 Subject: [PATCH 2/5] refactor(breaking!): stach_three smoother reward shaping + dynamic clarification, WIP --- .../environments/manipulation/stack_three.py | 323 +++++++----------- 1 file changed, 119 insertions(+), 204 deletions(-) diff --git a/robosuite/environments/manipulation/stack_three.py b/robosuite/environments/manipulation/stack_three.py index 9d61cb328a..667c473c2b 100644 --- a/robosuite/environments/manipulation/stack_three.py +++ b/robosuite/environments/manipulation/stack_three.py @@ -1,5 +1,6 @@ from collections import OrderedDict import numpy as np + from robosuite.utils.transform_utils import convert_quat from robosuite.utils.mjcf_utils import CustomMaterial from robosuite.environments.manipulation.single_arm_env import SingleArmEnv @@ -8,12 +9,14 @@ from robosuite.models.tasks import ManipulationTask from robosuite.utils.placement_samplers import UniformRandomSampler + class StackThree(SingleArmEnv): """ - Minimal change version of StackThree. Stacks three identical cubes (A, B, C). - Reward shaping is updated to incentivize targeting the next unstacked cube. + 3-cube stacking environment (Phase 1 refactor). + Fixes: deterministic target order, consistent held-cube logic, + smoother reward shaping. Goal: build tower C→B→A. """ - # [Rest of __init__ is unchanged] + def __init__( self, robots, @@ -22,7 +25,7 @@ def __init__( gripper_types="default", initialization_noise="default", table_full_size=(0.8, 0.8, 0.05), - table_friction=(1., 5e-3, 1e-4), + table_friction=(1.0, 5e-3, 1e-4), table_offset=(0, 0, 0.8), use_camera_obs=True, use_object_obs=True, @@ -46,24 +49,16 @@ def __init__( full_stacking_bonus=0.0, skill_config=None, ): - # table/top settings + # table + reward settings self.table_full_size = table_full_size self.table_friction = table_friction self.table_offset = np.array(table_offset) - # reward config self.reward_scale = reward_scale self.reward_shaping = reward_shaping - # whether to include object states in observations self.use_object_obs = use_object_obs - # object placement initializer self.placement_initializer = placement_initializer - # bonus for stacking (kept but not required for sparse success) self.full_stacking_bonus = full_stacking_bonus - # Cube identifiers for staged rewards - self.cubes = None - self.cube_body_ids = None - super().__init__( robots=robots, env_configuration=env_configuration, @@ -83,188 +78,103 @@ def __init__( ignore_done=ignore_done, hard_reset=hard_reset, camera_names=camera_names, - camera_heights=256, - camera_widths=256, - camera_depths=False, + camera_heights=camera_heights, + camera_widths=camera_widths, + camera_depths=camera_depths, skill_config=skill_config, ) - # [Reward function is unchanged] + # ------------------------------------------------------------------ + # Reward logic (Phase 1 refactor) + # ------------------------------------------------------------------ def reward(self, action): r_reach, r_lift, r_stack = self.staged_rewards() if self.reward_shaping: reward = max(r_reach, r_lift, r_stack) else: - # sparse: only give reward when the full 3-block stack is present - reward = 3.0 if (r_stack >= 3.0 - 1e-6) else 0.0 # Use 3.0 here for easier scaling - + reward = 3.0 if (r_stack >= 3.0 - 1e-6) else 0.0 if self.reward_scale is not None: - # Final reward is scaled by reward_scale / 3.0 so that a full success equals reward_scale. reward *= self.reward_scale / 3.0 return reward - # ----------------------------------------------------------- - # CRITICAL CHANGE: Phased Reward Logic to Target Unstacked Cubes - # ----------------------------------------------------------- def staged_rewards(self): - """ - Returns 3 components (r_reach, r_lift, r_stack) adapted to three-block stacking. - The reach/lift target is dynamically set to the next cube that needs to be picked. - """ - gripper_site_pos = self.sim.data.site_xpos[self.robots[0].eef_site_id] - - # Cube positions (must use copies as they are needed for comparisons) + """Staged rewards with deterministic order and smoother shaping.""" + gripper_pos = self.sim.data.site_xpos[self.robots[0].eef_site_id] + + # cube positions cubeA_pos = self.sim.data.body_xpos[self.cubeA_body_id].copy() cubeB_pos = self.sim.data.body_xpos[self.cubeB_body_id].copy() cubeC_pos = self.sim.data.body_xpos[self.cubeC_body_id].copy() - - # 1. Determine Stacking Status and Target Cube - # Check contact (robust way to track stack progress) - a_on_b = self.check_contact(self.cubeA, self.cubeB) - a_on_c = self.check_contact(self.cubeA, self.cubeC) + + # stacking progress b_on_c = self.check_contact(self.cubeB, self.cubeC) + a_on_b = self.check_contact(self.cubeA, self.cubeB) r_stack = 0.0 - r_stack_progress = 0 # 0, 1, or 2 for progress - - # Assume C is the fixed base for now (simplification) - # Check B on C (1-block stack achieved) if b_on_c: r_stack += 1.5 - r_stack_progress = 1 - - # Check A on B (2-block stack achieved, full tower) if a_on_b: r_stack += 1.5 - r_stack_progress = 2 - - # Identify the next cube to pick (target_cube) and its placement target (target_base_pos) - target_cube = None - target_pos = None - - if r_stack_progress == 0: - # Stage 1: Pick the first cube (e.g., B) and place it on the table (C). - # This is hard to define without a fixed sequence. A simpler approach is: - # Target the cube that is not yet part of a 2-block stack. - # We'll stick to a fixed sequence A->B->C (or B on C, then A on B) for simplicity, - # and let the policy figure out the order, but prioritize picking an unstacked cube. - - # We'll use the cube furthest from the gripper as a default if no clear stack target. - # Find the closest ungrasped, unstacked cube to the gripper. - - # Let's use a simpler heuristic for the target: the cube not currently being held. - # If nothing is held, target the cube closest to the gripper. - - cubes_list = [self.cubeA, self.cubeB, self.cubeC] - positions = [cubeA_pos, cubeB_pos, cubeC_pos] - - # Find the cube currently being held - grasping_cubes = [self._check_grasp(self.robots[0].gripper, cube) for cube in cubes_list] - held_cube_idx = np.where(grasping_cubes)[0] - - if len(held_cube_idx) == 0: - # Nothing held: Target the closest cube to pick up. - dists = [np.linalg.norm(gripper_site_pos - pos) for pos in positions] - target_cube = cubes_list[np.argmin(dists)] - target_cube_pos = positions[np.argmin(dists)] - else: - # Holding a cube: The target is the placement location. - held_cube = cubes_list[held_cube_idx[0]] - - # If holding the first cube (e.g., B), target C for placement. - if held_cube is self.cubeB and not b_on_c: - target_cube_pos = cubeC_pos - # If holding the second cube (e.g., A), target B for placement (if B is on C). - elif held_cube is self.cubeA and b_on_c and not a_on_b: - target_cube_pos = cubeB_pos - else: - # Default: keep aligning it over a potential base (e.g., the table or the lowest base) - target_cube_pos = np.array([0, 0, self.table_offset[2]]) # Center of table - - target_cube = held_cube # Redefine target_cube for the reach reward in the lift/align phase - - # 2. Reach and Grasp Reward (r_reach) - # We always incentivize reaching and grasping the current TARGET CUBE. - if target_cube is None: # Should only happen if all cubes are somehow placed or in a weird state - target_cube = self.cubeA - target_cube_pos = cubeA_pos - - dist = np.linalg.norm(gripper_site_pos - target_cube_pos) - r_reach = (1 - np.tanh(10.0 * dist)) * 0.25 - - # Grasping reward for the target cube - grasping_target = self._check_grasp(self.robots[0].gripper, object_geoms=target_cube) - if grasping_target: - r_reach += 0.25 # Reach + Grasp max 0.5 - - # 3. Lift and Align Reward (r_lift) - - # Cube must be lifted. We use the currently grasped cube's height. - cube_to_check = target_cube if grasping_target else self.cubeA # Fallback to A if nothing is grasped - cube_to_check_pos = self.sim.data.body_xpos[self.sim.model.body_name2id(cube_to_check.root_body)] - - cube_lifted = cube_to_check_pos[2] > self.table_offset[2] + 0.04 - r_lift = 1.0 if cube_lifted else 0.0 - - # Aligning: When the *grasped cube* is above its *placement target*. - if target_cube_pos is not None and target_cube_pos[2] > self.table_offset[2] + 0.01: - # The target_cube_pos here is the placement location (B or C), not the cube being held. - # We need the pos of the CUBE BEING HELD for alignment - held_cube_pos = cube_to_check_pos - - # Placement target is the base cube's center - placement_base_pos = None - if grasping_target and target_cube is self.cubeB and not b_on_c: - # Holding B, target is C - placement_base_pos = cubeC_pos - elif grasping_target and target_cube is self.cubeA and b_on_c and not a_on_b: - # Holding A, target is B - placement_base_pos = cubeB_pos - - if placement_base_pos is not None: - horiz_dist = np.linalg.norm(held_cube_pos[:2] - placement_base_pos[:2]) - r_lift += 0.5 * (1 - np.tanh(10.0 * horiz_dist)) # Max r_lift is 1.5 - - # 4. Stacking (r_stack) is calculated at the beginning (0.0, 1.5, or 3.0) - # Note: If r_stack is 1.5, the policy should shift its focus from 'picking first cube' to 'picking second cube' due to the dynamic target logic in step 1. - + + # 1-2. deterministic order (always build C→B→A) + if not b_on_c: + target_pick = self.cubeB + placement_base_pos = cubeC_pos + else: + target_pick = self.cubeA + placement_base_pos = cubeB_pos + + target_pick_pos = self.sim.data.body_xpos[ + self.sim.model.body_name2id(target_pick.root_body) + ] + + # 3. detect held cube correctly + held_cube = None + for cube in [self.cubeA, self.cubeB, self.cubeC]: + if self._check_grasp(self.robots[0].gripper, cube): + held_cube = cube + break + + # reach + grasp + dist = np.linalg.norm(gripper_pos - target_pick_pos) + r_reach = (1 - np.tanh(5.0 * dist)) * 0.25 # smoother curve + if held_cube is target_pick: + r_reach += 0.25 + + # 3-4. lift + align (smoother tanh) + r_lift = 0.0 + if held_cube is not None: + held_pos = self.sim.data.body_xpos[ + self.sim.model.body_name2id(held_cube.root_body) + ] + lifted = held_pos[2] > self.table_offset[2] + 0.04 + if lifted: + r_lift += 1.0 + horiz_dist = np.linalg.norm(held_pos[:2] - placement_base_pos[:2]) + r_lift += 0.5 * (1 - np.tanh(3.0 * horiz_dist)) + return r_reach, r_lift, r_stack - # [Rest of the class is mostly unchanged] + # ------------------------------------------------------------------ + # Diagnostics + # ------------------------------------------------------------------ def _get_env_info(self, action): info = super()._get_env_info(action) r_reach, r_lift, r_stack = self.staged_rewards() cubes_stacked = (r_stack >= 3.0 - 1e-6) info.update({ - # Max possible r_reach is 0.5, r_lift is 1.5, r_stack is 3.0 - 'r_reach_grasp': r_reach / 0.50, - 'r_lift_align': r_lift / 1.50, - 'r_stack': r_stack / 3.0, - 'cubes_stacked': cubes_stacked, - 'success': self._check_success(), + "r_reach_grasp": r_reach / 0.50, + "r_lift_align": r_lift / 1.50, + "r_stack": r_stack / 3.0, + "cubes_stacked": cubes_stacked, + "success": self._check_success(), }) return info - def _get_skill_info(self): - # This is now less relevant since the target is dynamic, but we'll stick to A as a default target - cubeA_pos = self.sim.data.body_xpos[self.cubeA_body_id].copy() - cubeB_pos = self.sim.data.body_xpos[self.cubeB_body_id].copy() - cubeC_pos = self.sim.data.body_xpos[self.cubeC_body_id].copy() - pos_info = {} - # Default targets: Grasp A, place on B, which is placed on C - pos_info['grasp'] = [cubeA_pos] - pos_info['reach'] = [cubeB_pos] - pos_info['base'] = [cubeC_pos] - info = {} - for k in pos_info: - info[k + '_pos'] = pos_info[k] - return info - - # ----------------------------------------------------------- - # CRITICAL CHANGE: All Cubes Same Size (Medium) - # ----------------------------------------------------------- + # ------------------------------------------------------------------ + # Model + object setup + # ------------------------------------------------------------------ def _load_model(self): - """ Loads the model and adds three identical boxes (cubeA, cubeB, cubeC). """ super()._load_model() xpos = self.robots[0].robot_model.base_xpos_offset["table"](self.table_full_size[0]) self.robots[0].robot_model.set_base_xpos(xpos) @@ -276,18 +186,19 @@ def _load_model(self): ) mujoco_arena.set_origin([0, 0, 0]) - # materials tex_attrib = {"type": "cube"} mat_attrib = {"texrepeat": "1 1", "specular": "0.4", "shininess": "0.1"} - redwood = CustomMaterial(texture="WoodRed", tex_name="redwood", mat_name="redwood_mat", tex_attrib=tex_attrib, mat_attrib=mat_attrib) - greenwood = CustomMaterial(texture="WoodGreen", tex_name="greenwood", mat_name="greenwood_mat", tex_attrib=tex_attrib, mat_attrib=mat_attrib) - bluewood = CustomMaterial(texture="WoodBlue", tex_name="bluewood", mat_name="bluewood_mat", tex_attrib=tex_attrib, mat_attrib=mat_attrib) + redwood = CustomMaterial(texture="WoodRed", tex_name="redwood", mat_name="redwood_mat", + tex_attrib=tex_attrib, mat_attrib=mat_attrib) + greenwood = CustomMaterial(texture="WoodGreen", tex_name="greenwood", mat_name="greenwood_mat", + tex_attrib=tex_attrib, mat_attrib=mat_attrib) + bluewood = CustomMaterial(texture="WoodBlue", tex_name="bluewood", mat_name="bluewood_mat", + tex_attrib=tex_attrib, mat_attrib=mat_attrib) - # Three IDENTICAL boxes (using the medium size: 0.025) cube_size = [0.025, 0.025, 0.025] - self.cubeA = BoxObject(name="cubeA", size_min=cube_size, size_max=cube_size, rgba=[1, 0, 0, 1], material=redwood) - self.cubeB = BoxObject(name="cubeB", size_min=cube_size, size_max=cube_size, rgba=[0, 1, 0, 1], material=greenwood) - self.cubeC = BoxObject(name="cubeC", size_min=cube_size, size_max=cube_size, rgba=[0, 0, 1, 1], material=bluewood) + self.cubeA = BoxObject("cubeA", cube_size, cube_size, [1, 0, 0, 1], redwood) + self.cubeB = BoxObject("cubeB", cube_size, cube_size, [0, 1, 0, 1], greenwood) + self.cubeC = BoxObject("cubeC", cube_size, cube_size, [0, 0, 1, 1], bluewood) cubes = [self.cubeA, self.cubeB, self.cubeC] if self.placement_initializer is not None: @@ -311,66 +222,69 @@ def _load_model(self): mujoco_robots=[robot.robot_model for robot in self.robots], mujoco_objects=cubes, ) - self.cubes = cubes def _get_reference(self): super()._get_reference() self.cubeA_body_id = self.sim.model.body_name2id(self.cubeA.root_body) self.cubeB_body_id = self.sim.model.body_name2id(self.cubeB.root_body) self.cubeC_body_id = self.sim.model.body_name2id(self.cubeC.root_body) - self.cube_body_ids = [self.cubeA_body_id, self.cubeB_body_id, self.cubeC_body_id] def _reset_internal(self): super()._reset_internal() if not self.deterministic_reset: object_placements = self.placement_initializer.sample() for obj_pos, obj_quat, obj in object_placements.values(): - self.sim.data.set_joint_qpos(obj.joints[0], np.concatenate([np.array(obj_pos), np.array(obj_quat)])) + self.sim.data.set_joint_qpos( + obj.joints[0], np.concatenate([np.array(obj_pos), np.array(obj_quat)]) + ) + # ------------------------------------------------------------------ + # Observation + # ------------------------------------------------------------------ def _get_observation(self): di = super()._get_observation() if self.use_object_obs: pr = self.robots[0].robot_model.naming_prefix - - # Helper to get cube data - def get_cube_data(cube_body_id): - pos = np.array(self.sim.data.body_xpos[cube_body_id]) - quat = convert_quat(np.array(self.sim.data.body_xquat[cube_body_id]), to="xyzw") + + def get_data(bid): + pos = np.array(self.sim.data.body_xpos[bid]) + quat = convert_quat(np.array(self.sim.data.body_xquat[bid]), to="xyzw") return pos, quat - cubeA_pos, cubeA_quat = get_cube_data(self.cubeA_body_id) - cubeB_pos, cubeB_quat = get_cube_data(self.cubeB_body_id) - cubeC_pos, cubeC_quat = get_cube_data(self.cubeC_body_id) + cubeA_pos, cubeA_quat = get_data(self.cubeA_body_id) + cubeB_pos, cubeB_quat = get_data(self.cubeB_body_id) + cubeC_pos, cubeC_quat = get_data(self.cubeC_body_id) - di["cubeA_pos"], di["cubeA_quat"] = cubeA_pos, cubeA_quat - di["cubeB_pos"], di["cubeB_quat"] = cubeB_pos, cubeB_quat - di["cubeC_pos"], di["cubeC_quat"] = cubeC_pos, cubeC_quat + di.update({ + "cubeA_pos": cubeA_pos, "cubeA_quat": cubeA_quat, + "cubeB_pos": cubeB_pos, "cubeB_quat": cubeB_quat, + "cubeC_pos": cubeC_pos, "cubeC_quat": cubeC_quat, + }) - gripper_site_pos = np.array(self.sim.data.site_xpos[self.robots[0].eef_site_id]) - - di[pr + "gripper_to_cubeA"] = gripper_site_pos - cubeA_pos - di[pr + "gripper_to_cubeB"] = gripper_site_pos - cubeB_pos - di[pr + "gripper_to_cubeC"] = gripper_site_pos - cubeC_pos + grip_pos = np.array(self.sim.data.site_xpos[self.robots[0].eef_site_id]) + di[pr + "gripper_to_cubeA"] = grip_pos - cubeA_pos + di[pr + "gripper_to_cubeB"] = grip_pos - cubeB_pos + di[pr + "gripper_to_cubeC"] = grip_pos - cubeC_pos di["cubeA_to_cubeB"] = cubeA_pos - cubeB_pos di["cubeB_to_cubeC"] = cubeB_pos - cubeC_pos di["cubeA_to_cubeC"] = cubeA_pos - cubeC_pos - # object-state contains positions, quaternions, and relative vectors for all three cubes - di["object-state"] = np.concatenate( - [ - cubeA_pos, cubeA_quat, - cubeB_pos, cubeB_quat, - cubeC_pos, cubeC_quat, - di[pr + "gripper_to_cubeA"], - di[pr + "gripper_to_cubeB"], - di[pr + "gripper_to_cubeC"], - di["cubeA_to_cubeB"], - di["cubeB_to_cubeC"], - di["cubeA_to_cubeC"], - ] - ) + di["object-state"] = np.concatenate([ + cubeA_pos, cubeA_quat, + cubeB_pos, cubeB_quat, + cubeC_pos, cubeC_quat, + di[pr + "gripper_to_cubeA"], + di[pr + "gripper_to_cubeB"], + di[pr + "gripper_to_cubeC"], + di["cubeA_to_cubeB"], + di["cubeB_to_cubeC"], + di["cubeA_to_cubeC"], + ]) return di + # ------------------------------------------------------------------ + # Success + visualize + # ------------------------------------------------------------------ def _check_success(self): _, _, r_stack = self.staged_rewards() return r_stack >= 3.0 - 1e-6 @@ -378,5 +292,6 @@ def _check_success(self): def visualize(self, vis_settings): super().visualize(vis_settings=vis_settings) if vis_settings.get("grippers", False): - # Target visualization is less meaningful with dynamic targets, keep the original target (A) for simplicity - self._visualize_gripper_to_target(gripper=self.robots[0].gripper, target=self.cubeA) \ No newline at end of file + self._visualize_gripper_to_target( + gripper=self.robots[0].gripper, target=self.cubeA + ) From 9db8b628054d53cb7ed307db6a587a1810586d68 Mon Sep 17 00:00:00 2001 From: Mohadeseh Date: Wed, 5 Nov 2025 16:46:03 +0100 Subject: [PATCH 3/5] refactor: stack_three, WIP --- .../environments/manipulation/stack_three.py | 226 ++++++++++-------- 1 file changed, 127 insertions(+), 99 deletions(-) diff --git a/robosuite/environments/manipulation/stack_three.py b/robosuite/environments/manipulation/stack_three.py index 667c473c2b..defe273e5b 100644 --- a/robosuite/environments/manipulation/stack_three.py +++ b/robosuite/environments/manipulation/stack_three.py @@ -1,6 +1,5 @@ from collections import OrderedDict import numpy as np - from robosuite.utils.transform_utils import convert_quat from robosuite.utils.mjcf_utils import CustomMaterial from robosuite.environments.manipulation.single_arm_env import SingleArmEnv @@ -9,14 +8,10 @@ from robosuite.models.tasks import ManipulationTask from robosuite.utils.placement_samplers import UniformRandomSampler - class StackThree(SingleArmEnv): """ - 3-cube stacking environment (Phase 1 refactor). - Fixes: deterministic target order, consistent held-cube logic, - smoother reward shaping. Goal: build tower C→B→A. + Minimal change version of StackThree with corrected staged_rewards for a guaranteed C -> B -> A stack order. """ - def __init__( self, robots, @@ -25,12 +20,12 @@ def __init__( gripper_types="default", initialization_noise="default", table_full_size=(0.8, 0.8, 0.05), - table_friction=(1.0, 5e-3, 1e-4), + table_friction=(1., 5e-3, 1e-4), table_offset=(0, 0, 0.8), use_camera_obs=True, use_object_obs=True, reward_scale=1.0, - reward_shaping=False, + reward_shaping=False, # We recommend setting this to True for this task placement_initializer=None, has_renderer=False, has_offscreen_renderer=True, @@ -49,16 +44,24 @@ def __init__( full_stacking_bonus=0.0, skill_config=None, ): - # table + reward settings + # table/top settings self.table_full_size = table_full_size self.table_friction = table_friction self.table_offset = np.array(table_offset) + # reward config self.reward_scale = reward_scale self.reward_shaping = reward_shaping + # whether to include object states in observations self.use_object_obs = use_object_obs + # object placement initializer self.placement_initializer = placement_initializer + # bonus for stacking (kept but not required for sparse success) self.full_stacking_bonus = full_stacking_bonus + # Cube identifiers for staged rewards + self.cubes = None + self.cube_body_ids = None + super().__init__( robots=robots, env_configuration=env_configuration, @@ -92,89 +95,120 @@ def reward(self, action): if self.reward_shaping: reward = max(r_reach, r_lift, r_stack) else: - reward = 3.0 if (r_stack >= 3.0 - 1e-6) else 0.0 + # sparse: only give reward when the full 3-block stack is present + reward = 3.0 if (r_stack >= 3.0 - 1e-6) else 0.0 # Use 3.0 here for easier scaling + if self.reward_scale is not None: + # Final reward is scaled by reward_scale / 3.0 so that a full success equals reward_scale. reward *= self.reward_scale / 3.0 return reward + # ----------------------------------------------------------- + # CRITICAL FIX: Fixed-Stage Reward Logic for C -> B -> A Stacking + # ----------------------------------------------------------- def staged_rewards(self): - """Staged rewards with deterministic order and smoother shaping.""" - gripper_pos = self.sim.data.site_xpos[self.robots[0].eef_site_id] - - # cube positions + """ + Returns 3 components (r_reach, r_lift, r_stack) with logic to enforce B on C, then A on B. + """ + gripper_site_pos = self.sim.data.site_xpos[self.robots[0].eef_site_id] + + # Cube positions cubeA_pos = self.sim.data.body_xpos[self.cubeA_body_id].copy() cubeB_pos = self.sim.data.body_xpos[self.cubeB_body_id].copy() cubeC_pos = self.sim.data.body_xpos[self.cubeC_body_id].copy() - - # stacking progress + + # Determine stack progress (B on C, then A on B) b_on_c = self.check_contact(self.cubeB, self.cubeC) a_on_b = self.check_contact(self.cubeA, self.cubeB) r_stack = 0.0 - if b_on_c: - r_stack += 1.5 - if a_on_b: - r_stack += 1.5 + target_pick = None # The cube the robot should be aiming to GRASP + placement_base_pos = None # The position the HELD cube should be placed ON (e.g., C or B) - # 1-2. deterministic order (always build C→B→A) + # --- 1. Determine Stack Status and Next Target --- if not b_on_c: + # Stage 1: Get Cube B and put it on Cube C. target_pick = self.cubeB placement_base_pos = cubeC_pos - else: + elif b_on_c and not a_on_b: + # Stage 2: Get Cube A and put it on Cube B. target_pick = self.cubeA placement_base_pos = cubeB_pos - - target_pick_pos = self.sim.data.body_xpos[ - self.sim.model.body_name2id(target_pick.root_body) - ] - - # 3. detect held cube correctly + r_stack += 1.5 # Reward for B on C + elif b_on_c and a_on_b: + # Stage 3: Full Success! Tower is A on B on C. + target_pick = self.cubeA # Stay focused on the top block (A) + placement_base_pos = cubeB_pos + r_stack += 3.0 # Full reward + + # --- 2. Determine Held Cube --- held_cube = None for cube in [self.cubeA, self.cubeB, self.cubeC]: if self._check_grasp(self.robots[0].gripper, cube): held_cube = cube break - # reach + grasp - dist = np.linalg.norm(gripper_pos - target_pick_pos) - r_reach = (1 - np.tanh(5.0 * dist)) * 0.25 # smoother curve + # --- 3. Reach and Grasp Reward (r_reach) --- + target_pick_pos = self.sim.data.body_xpos[self.sim.model.body_name2id(target_pick.root_body)] + + # Use a slightly softer tanh coefficient (5.0) than the original 10.0 for easier initial reaching + dist_reach = np.linalg.norm(gripper_site_pos - target_pick_pos) + r_reach = (1 - np.tanh(5.0 * dist_reach)) * 0.25 + + # Grasp reward: Only for successfully grasping the *correct* target cube. if held_cube is target_pick: - r_reach += 0.25 + r_reach += 0.25 # Max r_reach is 0.5 (0.25 reach + 0.25 grasp) - # 3-4. lift + align (smoother tanh) + # --- 4. Lift and Align Reward (r_lift) --- r_lift = 0.0 - if held_cube is not None: - held_pos = self.sim.data.body_xpos[ - self.sim.model.body_name2id(held_cube.root_body) - ] - lifted = held_pos[2] > self.table_offset[2] + 0.04 + + # Lift reward: Only given if the correct cube is held and lifted + if held_cube is target_pick: + held_cube_pos = self.sim.data.body_xpos[self.sim.model.body_name2id(held_cube.root_body)] + + # Check if the held cube is lifted 4cm above the table plane + lifted = held_cube_pos[2] > self.table_offset[2] + 0.04 + if lifted: r_lift += 1.0 - horiz_dist = np.linalg.norm(held_pos[:2] - placement_base_pos[:2]) - r_lift += 0.5 * (1 - np.tanh(3.0 * horiz_dist)) + + # Alignment reward: Distance between held cube and its intended placement base + # Use a slightly softer tanh coefficient (5.0) than the original 10.0 for easier alignment + horiz_dist = np.linalg.norm(held_cube_pos[:2] - placement_base_pos[:2]) + r_lift += 0.5 * (1 - np.tanh(5.0 * horiz_dist)) # Max r_lift is 1.5 return r_reach, r_lift, r_stack - # ------------------------------------------------------------------ - # Diagnostics - # ------------------------------------------------------------------ + # [Rest of the class methods remain the same as your second version] + def _get_env_info(self, action): info = super()._get_env_info(action) r_reach, r_lift, r_stack = self.staged_rewards() cubes_stacked = (r_stack >= 3.0 - 1e-6) info.update({ - "r_reach_grasp": r_reach / 0.50, - "r_lift_align": r_lift / 1.50, - "r_stack": r_stack / 3.0, - "cubes_stacked": cubes_stacked, - "success": self._check_success(), + 'r_reach_grasp': r_reach / 0.50, + 'r_lift_align': r_lift / 1.50, + 'r_stack': r_stack / 3.0, + 'cubes_stacked': cubes_stacked, + 'success': self._check_success(), }) return info - # ------------------------------------------------------------------ - # Model + object setup - # ------------------------------------------------------------------ + def _get_skill_info(self): + cubeA_pos = self.sim.data.body_xpos[self.cubeA_body_id].copy() + cubeB_pos = self.sim.data.body_xpos[self.cubeB_body_id].copy() + cubeC_pos = self.sim.data.body_xpos[self.cubeC_body_id].copy() + pos_info = {} + pos_info['grasp'] = [cubeA_pos] + pos_info['reach'] = [cubeB_pos] + pos_info['base'] = [cubeC_pos] + info = {} + for k in pos_info: + info[k + '_pos'] = pos_info[k] + return info + def _load_model(self): + """ Loads the model and adds three identical boxes (cubeA, cubeB, cubeC). """ super()._load_model() xpos = self.robots[0].robot_model.base_xpos_offset["table"](self.table_full_size[0]) self.robots[0].robot_model.set_base_xpos(xpos) @@ -186,19 +220,18 @@ def _load_model(self): ) mujoco_arena.set_origin([0, 0, 0]) + # materials tex_attrib = {"type": "cube"} mat_attrib = {"texrepeat": "1 1", "specular": "0.4", "shininess": "0.1"} - redwood = CustomMaterial(texture="WoodRed", tex_name="redwood", mat_name="redwood_mat", - tex_attrib=tex_attrib, mat_attrib=mat_attrib) - greenwood = CustomMaterial(texture="WoodGreen", tex_name="greenwood", mat_name="greenwood_mat", - tex_attrib=tex_attrib, mat_attrib=mat_attrib) - bluewood = CustomMaterial(texture="WoodBlue", tex_name="bluewood", mat_name="bluewood_mat", - tex_attrib=tex_attrib, mat_attrib=mat_attrib) + redwood = CustomMaterial(texture="WoodRed", tex_name="redwood", mat_name="redwood_mat", tex_attrib=tex_attrib, mat_attrib=mat_attrib) + greenwood = CustomMaterial(texture="WoodGreen", tex_name="greenwood", mat_name="greenwood_mat", tex_attrib=tex_attrib, mat_attrib=mat_attrib) + bluewood = CustomMaterial(texture="WoodBlue", tex_name="bluewood", mat_name="bluewood_mat", tex_attrib=tex_attrib, mat_attrib=mat_attrib) + # Three IDENTICAL boxes (using the medium size: 0.025) cube_size = [0.025, 0.025, 0.025] - self.cubeA = BoxObject("cubeA", cube_size, cube_size, [1, 0, 0, 1], redwood) - self.cubeB = BoxObject("cubeB", cube_size, cube_size, [0, 1, 0, 1], greenwood) - self.cubeC = BoxObject("cubeC", cube_size, cube_size, [0, 0, 1, 1], bluewood) + self.cubeA = BoxObject(name="cubeA", size_min=cube_size, size_max=cube_size, rgba=[1, 0, 0, 1], material=redwood) + self.cubeB = BoxObject(name="cubeB", size_min=cube_size, size_max=cube_size, rgba=[0, 1, 0, 1], material=greenwood) + self.cubeC = BoxObject(name="cubeC", size_min=cube_size, size_max=cube_size, rgba=[0, 0, 1, 1], material=bluewood) cubes = [self.cubeA, self.cubeB, self.cubeC] if self.placement_initializer is not None: @@ -222,69 +255,65 @@ def _load_model(self): mujoco_robots=[robot.robot_model for robot in self.robots], mujoco_objects=cubes, ) + self.cubes = cubes def _get_reference(self): super()._get_reference() self.cubeA_body_id = self.sim.model.body_name2id(self.cubeA.root_body) self.cubeB_body_id = self.sim.model.body_name2id(self.cubeB.root_body) self.cubeC_body_id = self.sim.model.body_name2id(self.cubeC.root_body) + self.cube_body_ids = [self.cubeA_body_id, self.cubeB_body_id, self.cubeC_body_id] def _reset_internal(self): super()._reset_internal() if not self.deterministic_reset: object_placements = self.placement_initializer.sample() for obj_pos, obj_quat, obj in object_placements.values(): - self.sim.data.set_joint_qpos( - obj.joints[0], np.concatenate([np.array(obj_pos), np.array(obj_quat)]) - ) + self.sim.data.set_joint_qpos(obj.joints[0], np.concatenate([np.array(obj_pos), np.array(obj_quat)])) - # ------------------------------------------------------------------ - # Observation - # ------------------------------------------------------------------ def _get_observation(self): di = super()._get_observation() if self.use_object_obs: pr = self.robots[0].robot_model.naming_prefix - - def get_data(bid): - pos = np.array(self.sim.data.body_xpos[bid]) - quat = convert_quat(np.array(self.sim.data.body_xquat[bid]), to="xyzw") + + def get_cube_data(cube_body_id): + pos = np.array(self.sim.data.body_xpos[cube_body_id]) + quat = convert_quat(np.array(self.sim.data.body_xquat[cube_body_id]), to="xyzw") return pos, quat - cubeA_pos, cubeA_quat = get_data(self.cubeA_body_id) - cubeB_pos, cubeB_quat = get_data(self.cubeB_body_id) - cubeC_pos, cubeC_quat = get_data(self.cubeC_body_id) + cubeA_pos, cubeA_quat = get_cube_data(self.cubeA_body_id) + cubeB_pos, cubeB_quat = get_cube_data(self.cubeB_body_id) + cubeC_pos, cubeC_quat = get_cube_data(self.cubeC_body_id) - di.update({ - "cubeA_pos": cubeA_pos, "cubeA_quat": cubeA_quat, - "cubeB_pos": cubeB_pos, "cubeB_quat": cubeB_quat, - "cubeC_pos": cubeC_pos, "cubeC_quat": cubeC_quat, - }) + di["cubeA_pos"], di["cubeA_quat"] = cubeA_pos, cubeA_quat + di["cubeB_pos"], di["cubeB_quat"] = cubeB_pos, cubeB_quat + di["cubeC_pos"], di["cubeC_quat"] = cubeC_pos, cubeC_quat - grip_pos = np.array(self.sim.data.site_xpos[self.robots[0].eef_site_id]) - di[pr + "gripper_to_cubeA"] = grip_pos - cubeA_pos - di[pr + "gripper_to_cubeB"] = grip_pos - cubeB_pos - di[pr + "gripper_to_cubeC"] = grip_pos - cubeC_pos + gripper_site_pos = np.array(self.sim.data.site_xpos[self.robots[0].eef_site_id]) + + di[pr + "gripper_to_cubeA"] = gripper_site_pos - cubeA_pos + di[pr + "gripper_to_cubeB"] = gripper_site_pos - cubeB_pos + di[pr + "gripper_to_cubeC"] = gripper_site_pos - cubeC_pos di["cubeA_to_cubeB"] = cubeA_pos - cubeB_pos di["cubeB_to_cubeC"] = cubeB_pos - cubeC_pos di["cubeA_to_cubeC"] = cubeA_pos - cubeC_pos - di["object-state"] = np.concatenate([ - cubeA_pos, cubeA_quat, - cubeB_pos, cubeB_quat, - cubeC_pos, cubeC_quat, - di[pr + "gripper_to_cubeA"], - di[pr + "gripper_to_cubeB"], - di[pr + "gripper_to_cubeC"], - di["cubeA_to_cubeB"], - di["cubeB_to_cubeC"], - di["cubeA_to_cubeC"], - ]) + # object-state contains positions, quaternions, and relative vectors for all three cubes + di["object-state"] = np.concatenate( + [ + cubeA_pos, cubeA_quat, + cubeB_pos, cubeB_quat, + cubeC_pos, cubeC_quat, + di[pr + "gripper_to_cubeA"], + di[pr + "gripper_to_cubeB"], + di[pr + "gripper_to_cubeC"], + di["cubeA_to_cubeB"], + di["cubeB_to_cubeC"], + di["cubeA_to_cubeC"], + ] + ) return di - # ------------------------------------------------------------------ - # Success + visualize - # ------------------------------------------------------------------ def _check_success(self): _, _, r_stack = self.staged_rewards() return r_stack >= 3.0 - 1e-6 @@ -292,6 +321,5 @@ def _check_success(self): def visualize(self, vis_settings): super().visualize(vis_settings=vis_settings) if vis_settings.get("grippers", False): - self._visualize_gripper_to_target( - gripper=self.robots[0].gripper, target=self.cubeA - ) + self._visualize_gripper_to_target(gripper=self.robots[0].gripper, target=self.cubeA) + From 8025b44b5049b6ca5ef96f236f71c7d5e8f122a7 Mon Sep 17 00:00:00 2001 From: Mohadeseh Date: Wed, 5 Nov 2025 18:06:24 +0100 Subject: [PATCH 4/5] feat: add arrange_three as a new task --- robosuite/__init__.py | 1 + .../manipulation/arrange_three.py | 308 ++++++++++++++++++ 2 files changed, 309 insertions(+) create mode 100644 robosuite/environments/manipulation/arrange_three.py diff --git a/robosuite/__init__.py b/robosuite/__init__.py index 325cb90dea..e8bef8e66b 100644 --- a/robosuite/__init__.py +++ b/robosuite/__init__.py @@ -4,6 +4,7 @@ from robosuite.environments.manipulation.lift import Lift from robosuite.environments.manipulation.stack import Stack from robosuite.environments.manipulation.stack_three import StackThree +from robosuite.environments.manipulation.arrange_three import ArrangeThree from robosuite.environments.manipulation.cleanup import Cleanup from robosuite.environments.manipulation.peg_in_hole import PegInHole from robosuite.environments.manipulation.nut_assembly import NutAssembly diff --git a/robosuite/environments/manipulation/arrange_three.py b/robosuite/environments/manipulation/arrange_three.py new file mode 100644 index 0000000000..c60a8da969 --- /dev/null +++ b/robosuite/environments/manipulation/arrange_three.py @@ -0,0 +1,308 @@ +from collections import OrderedDict +import numpy as np + +from robosuite.utils.transform_utils import convert_quat +from robosuite.utils.mjcf_utils import CustomMaterial +from robosuite.environments.manipulation.single_arm_env import SingleArmEnv +from robosuite.models.arenas import TableArena +from robosuite.models.objects import BoxObject +from robosuite.models.tasks import ManipulationTask +from robosuite.utils.placement_samplers import UniformRandomSampler + + +class ArrangeThree(SingleArmEnv): + """ + A simple pre-stacking task: + Arrange three cubes (A, B, C) side-by-side on a straight line along X (or Y). + No balancing or precise stacking required. Pushing is enough. + + Shaping terms (if reward_shaping=True): + - Line alignment (all cubes close to target line) + - Spacing alignment (pairwise distances close to target spacing) + - Height regularization (stay near tabletop height) + + Success (sparse): + - All cubes within tolerances for line alignment + spacing. + """ + + def __init__( + self, + robots, + env_configuration="default", + controller_configs=None, + gripper_types="default", + initialization_noise="default", + table_full_size=(0.8, 0.8, 0.05), + table_friction=(1.0, 5e-3, 1e-4), + table_offset=(0, 0, 0.8), + use_camera_obs=False, + use_object_obs=True, + reward_scale=1.0, + reward_shaping=True, + placement_initializer=None, + has_renderer=False, + has_offscreen_renderer=True, + render_camera="frontview", + render_collision_mesh=False, + render_visual_mesh=True, + render_gpu_device_id=-1, + control_freq=20, + horizon=400, + ignore_done=False, + hard_reset=True, + camera_names="agentview", + camera_heights=256, + camera_widths=256, + camera_depths=False, + # Task params + line_axis="x", # "x" (default) or "y" + target_spacing=0.06, # distance between adjacent cube centers on the line + tol_line=0.02, # max distance to line (per cube) + tol_spacing=0.02, # max spacing error (per pair) + tol_height=0.01, # max height error from tabletop + cube_size=0.025, # half-size per axis + skill_config=None, + ): + # Table & task settings + self.table_full_size = table_full_size + self.table_friction = table_friction + self.table_offset = np.array(table_offset) + + # Rewards + self.reward_scale = reward_scale + self.reward_shaping = reward_shaping + + # Obs / placement + self.use_object_obs = use_object_obs + self.placement_initializer = placement_initializer + + # Arrangement params + assert line_axis in ("x", "y") + self.line_axis = line_axis + self.target_spacing = float(target_spacing) + self.tol_line = float(tol_line) + self.tol_spacing = float(tol_spacing) + self.tol_height = float(tol_height) + self.cube_size = float(cube_size) + + super().__init__( + robots=robots, + env_configuration=env_configuration, + controller_configs=controller_configs, + mount_types="default", + gripper_types=gripper_types, + initialization_noise=initialization_noise, + use_camera_obs=use_camera_obs, + has_renderer=has_renderer, + has_offscreen_renderer=has_offscreen_renderer, + render_camera=render_camera, + render_collision_mesh=render_collision_mesh, + render_visual_mesh=render_visual_mesh, + render_gpu_device_id=render_gpu_device_id, + control_freq=control_freq, + horizon=horizon, + ignore_done=ignore_done, + hard_reset=hard_reset, + camera_names=camera_names, + camera_heights=camera_heights, + camera_widths=camera_widths, + camera_depths=camera_depths, + skill_config=skill_config, + ) + + # -------------------- Reward -------------------- + def reward(self, action): + r_align, r_space, r_height = self.staged_rewards() # each in [0, 1] + if self.reward_shaping: + # Weighted sum -> max 1.0 (before scaling) + # Put most weight on line alignment, then spacing, tiny on height + shaped = 0.5 * r_align + 0.45 * r_space + 0.05 * r_height + reward = shaped + else: + reward = 1.0 if self._check_success() else 0.0 + + if self.reward_scale is not None: + reward *= self.reward_scale # scale to your taste + return reward + + def staged_rewards(self): + """ + Returns (r_align, r_space, r_height), each in [0, 1], where: + - r_align: how close cubes are to the target line (perpendicular distance) + - r_space: how close pairwise spacings are to target spacing + - r_height: how close heights are to tabletop height (avoid accidental lifts) + """ + # Positions + A = self.sim.data.body_xpos[self.cubeA_body_id].copy() + B = self.sim.data.body_xpos[self.cubeB_body_id].copy() + C = self.sim.data.body_xpos[self.cubeC_body_id].copy() + pos = np.stack([A, B, C], axis=0) + + # Table height reference + table_z = self.table_offset[2] + z = pos[:, 2] + + # Target line is through the table center along chosen axis. + # We'll anchor at table_offset (x0,y0) and require all cubes be close to that line. + if self.line_axis == "x": + # x can vary freely; y should be near table_offset[1] + dist_to_line = np.abs(pos[:, 1] - self.table_offset[1]) + # spacing along x + coords = np.sort(pos[:, 0]) + else: + # y-axis line: y can vary; x should be near table_offset[0] + dist_to_line = np.abs(pos[:, 0] - self.table_offset[0]) + # spacing along y + coords = np.sort(pos[:, 1]) + + # r_align: map distance-to-line into [0,1] with tolerance + # 1.0 when within tol_line; decays with tanh outside + def smooth_clamp01(d, tol): + # <= tol -> ~1; increases beyond tol -> decreases smoothly + # Use scaled tanh for a soft boundary + return np.clip(1.0 - np.tanh(5.0 * np.maximum(0.0, d - tol)), 0.0, 1.0) + + r_align_per = smooth_clamp01(dist_to_line, self.tol_line) + r_align = float(np.mean(r_align_per)) + + # r_space: compare pairwise distances on the line axis to [0, s, 2s] + # After sorting coords: target deltas are [s, s] between neighbors + deltas = np.diff(coords) # shape (2,) + spacing_err = np.abs(deltas - self.target_spacing) + r_space_pair = smooth_clamp01(spacing_err, self.tol_spacing) + r_space = float(np.mean(r_space_pair)) if r_space_pair.size > 0 else 0.0 + + # r_height: keep near table height (+ half cube) + # We want them resting on the table, not lifted: target z ~= table_z + cube_size + target_z = table_z + self.cube_size + r_height_per = smooth_clamp01(np.abs(z - target_z), self.tol_height) + r_height = float(np.mean(r_height_per)) + + return r_align, r_space, r_height + + # -------------------- Info / Success -------------------- + def _get_env_info(self, action): + info = super()._get_env_info(action) + r_align, r_space, r_height = self.staged_rewards() + info.update({ + "r_align": r_align, + "r_space": r_space, + "r_height": r_height, + "success": self._check_success(), + }) + return info + + def _check_success(self): + r_align, r_space, r_height = self.staged_rewards() + # Require each term be high enough + return (r_align > 0.95) and (r_space > 0.95) and (r_height > 0.95) + + # -------------------- Model & Objects -------------------- + def _load_model(self): + super()._load_model() + + # Place robot base + xpos = self.robots[0].robot_model.base_xpos_offset["table"](self.table_full_size[0]) + self.robots[0].robot_model.set_base_xpos(xpos) + + # Arena + arena = TableArena( + table_full_size=self.table_full_size, + table_friction=self.table_friction, + table_offset=self.table_offset, + ) + arena.set_origin([0, 0, 0]) + + # Materials + tex = {"type": "cube"} + mat = {"texrepeat": "1 1", "specular": "0.4", "shininess": "0.1"} + redwood = CustomMaterial(texture="WoodRed", tex_name="redwood", mat_name="redwood_mat", tex_attrib=tex, mat_attrib=mat) + greenwd = CustomMaterial(texture="WoodGreen", tex_name="greenwd", mat_name="greenwd_mat", tex_attrib=tex, mat_attrib=mat) + bluewd = CustomMaterial(texture="WoodBlue", tex_name="bluewd", mat_name="bluewd_mat", tex_attrib=tex, mat_attrib=mat) + + size = [self.cube_size, self.cube_size, self.cube_size] + # Use keyword args to avoid size/rgba mix-ups across robosuite versions + self.cubeA = BoxObject(name="cubeA", size_min=size, size_max=size, rgba=[1, 0, 0, 1], material=redwood) + self.cubeB = BoxObject(name="cubeB", size_min=size, size_max=size, rgba=[0, 1, 0, 1], material=greenwd) + self.cubeC = BoxObject(name="cubeC", size_min=size, size_max=size, rgba=[0, 0, 1, 1], material=bluewd) + cubes = [self.cubeA, self.cubeB, self.cubeC] + + # Placement + if self.placement_initializer is not None: + self.placement_initializer.reset() + self.placement_initializer.add_objects(cubes) + else: + self.placement_initializer = UniformRandomSampler( + name="ArrangeSampler", + mujoco_objects=cubes, + x_range=[-0.10, 0.10], + y_range=[-0.10, 0.10], + rotation=None, + ensure_object_boundary_in_range=False, + ensure_valid_placement=True, + reference_pos=self.table_offset, + z_offset=0.01, + ) + + # Task + self.model = ManipulationTask( + mujoco_arena=arena, + mujoco_robots=[robot.robot_model for robot in self.robots], + mujoco_objects=cubes, + ) + + def _get_reference(self): + super()._get_reference() + self.cubeA_body_id = self.sim.model.body_name2id(self.cubeA.root_body) + self.cubeB_body_id = self.sim.model.body_name2id(self.cubeB.root_body) + self.cubeC_body_id = self.sim.model.body_name2id(self.cubeC.root_body) + + def _reset_internal(self): + super()._reset_internal() + if not self.deterministic_reset: + placements = self.placement_initializer.sample() + for obj_pos, obj_quat, obj in placements.values(): + self.sim.data.set_joint_qpos(obj.joints[0], np.concatenate([np.array(obj_pos), np.array(obj_quat)])) + + # -------------------- Observations -------------------- + def _get_observation(self): + di = super()._get_observation() + + if self.use_object_obs: + pr = self.robots[0].robot_model.naming_prefix + + def get_data(bid): + pos = np.array(self.sim.data.body_xpos[bid]) + quat = convert_quat(np.array(self.sim.data.body_xquat[bid]), to="xyzw") + return pos, quat + + Apos, Aquat = get_data(self.cubeA_body_id) + Bpos, Bquat = get_data(self.cubeB_body_id) + Cpos, Cquat = get_data(self.cubeC_body_id) + + di["cubeA_pos"], di["cubeA_quat"] = Apos, Aquat + di["cubeB_pos"], di["cubeB_quat"] = Bpos, Bquat + di["cubeC_pos"], di["cubeC_quat"] = Cpos, Cquat + + grip = np.array(self.sim.data.site_xpos[self.robots[0].eef_site_id]) + di[pr + "gripper_to_cubeA"] = grip - Apos + di[pr + "gripper_to_cubeB"] = grip - Bpos + di[pr + "gripper_to_cubeC"] = grip - Cpos + + di["cubeA_to_cubeB"] = Apos - Bpos + di["cubeB_to_cubeC"] = Bpos - Cpos + di["cubeA_to_cubeC"] = Apos - Cpos + + di["object-state"] = np.concatenate([ + Apos, Aquat, Bpos, Bquat, Cpos, Cquat, + di[pr + "gripper_to_cubeA"], di[pr + "gripper_to_cubeB"], di[pr + "gripper_to_cubeC"], + di["cubeA_to_cubeB"], di["cubeB_to_cubeC"], di["cubeA_to_cubeC"], + ]) + + return di + + # -------------------- Viz -------------------- + def visualize(self, vis_settings): + super().visualize(vis_settings=vis_settings) + # Optional: you could draw a thin line on the table to show the target alignment. + # For a minimal version, we keep default visualization. From 5a14e1657f0237aa15258111bbcf227c97cd3395 Mon Sep 17 00:00:00 2001 From: Mohadeseh Date: Wed, 12 Nov 2025 16:45:41 +0100 Subject: [PATCH 5/5] refactor: mimic clean-up task for stack3, feat: creat push task --- robosuite/environments/manipulation/push.py | 269 +++++++++++++ .../environments/manipulation/stack_three.py | 356 ++++++++---------- 2 files changed, 435 insertions(+), 190 deletions(-) create mode 100644 robosuite/environments/manipulation/push.py diff --git a/robosuite/environments/manipulation/push.py b/robosuite/environments/manipulation/push.py new file mode 100644 index 0000000000..f830935838 --- /dev/null +++ b/robosuite/environments/manipulation/push.py @@ -0,0 +1,269 @@ +from collections import OrderedDict +import numpy as np + +from robosuite.environments.manipulation.single_arm_env import SingleArmEnv +from robosuite.models.arenas import TableArena +from robosuite.models.objects import BoxObject +from robosuite.models.tasks import ManipulationTask +from robosuite.utils.placement_samplers import UniformRandomSampler +from robosuite.utils import RandomizationError +from robosuite.utils.transform_utils import convert_quat +from robosuite.utils.mjcf_utils import CustomMaterial + + +DEFAULT_PUSH_CONFIG = { + 'use_push_rew': True, + 'shaped_push_rew': True, + 'push_scale_fac': 5.0, + 'num_push_objs': 1, +} + + +class Push(SingleArmEnv): + """ + Push-only environment: push a single cube to a fixed gray circular target on the table. + """ + + def __init__( + self, + robots, + env_configuration="default", + controller_configs=None, + gripper_types="default", + initialization_noise="default", + table_full_size=(0.8, 0.8, 0.05), + table_friction=(1., 5e-3, 1e-4), + table_offset=(0, 0, 0.8), + use_camera_obs=True, + use_object_obs=True, + reward_scale=1.0, + reward_shaping=True, + placement_initializer=None, + has_renderer=False, + has_offscreen_renderer=True, + render_camera="frontview", + render_collision_mesh=False, + render_visual_mesh=True, + render_gpu_device_id=-1, + control_freq=20, + horizon=1000, + ignore_done=False, + hard_reset=True, + camera_names="agentview", + camera_heights=256, + camera_widths=256, + camera_depths=False, + skill_config=None, + ): + # Store environment-specific parameters locally + self.table_full_size = table_full_size + self.table_friction = table_friction + self.table_offset = np.array(table_offset) + self.reward_scale = reward_scale + self.reward_shaping = reward_shaping + + self.use_object_obs = use_object_obs + self.placement_initializer = placement_initializer + self.task_config = DEFAULT_PUSH_CONFIG.copy() + + # Call parent constructor with standard, expected arguments only + super().__init__( + robots=robots, + env_configuration=env_configuration, + controller_configs=controller_configs, + mount_types="default", + gripper_types=gripper_types, + initialization_noise=initialization_noise, + use_camera_obs=use_camera_obs, + has_renderer=has_renderer, + has_offscreen_renderer=has_offscreen_renderer, + render_camera=render_camera, + render_collision_mesh=render_collision_mesh, + render_visual_mesh=render_visual_mesh, + render_gpu_device_id=render_gpu_device_id, + control_freq=control_freq, + horizon=horizon, + ignore_done=ignore_done, + hard_reset=hard_reset, + camera_names=camera_names, + camera_heights=camera_heights, + camera_widths=camera_widths, + camera_depths=camera_depths, + skill_config=skill_config, + ) + + # ------------------------- + # Reward functions + # ------------------------- + def reward(self, action): + _, reward = self.reward_infos() + return reward + + def reward_infos(self): + rew_push = 0 + for i in range(self.task_config['num_push_objs']): + r_reach, r_push, _ = self.push_staged_rewards(obj_id=i) + if self.reward_shaping: + rew_push += r_reach + r_push + else: + rew_push += r_push + + if self.reward_scale is not None: + rew_push *= self.reward_scale + return {}, rew_push + + def push_staged_rewards(self, obj_id=0): + gripper_pos = self.sim.data.site_xpos[self.robots[0].eef_site_id] + obj_pos = self.sim.data.body_xpos[self.push_obj_body_ids[obj_id]] + # Fixed target position (x_table+0.15, y_table-0.15) + target_xy = self.table_offset[:2] + np.array([0.15, -0.15]) + + d_push = np.linalg.norm(obj_pos[:2] - target_xy) + r_push = 1 - np.tanh(self.task_config['push_scale_fac'] * d_push) + + # reach reward + th = [0.08, 0.08, 0.04] + d_reach = np.sum(np.clip(np.abs(gripper_pos - obj_pos) - th, 0, None)) + r_reach = (1 - np.tanh(10.0 * d_reach)) * 0.25 + + return r_reach, r_push, d_push + + def _check_success(self): + for i in range(self.task_config['num_push_objs']): + _, _, d_push = self.push_staged_rewards(obj_id=i) + # 0.10m threshold for success + if d_push > 0.10: + return False + return True + + # ------------------------- + # Model / arena + # ------------------------- + def _load_model(self): + super()._load_model() + + # Set robot position + xpos = self.robots[0].robot_model.base_xpos_offset["table"](self.table_full_size[0]) + self.robots[0].robot_model.set_base_xpos(xpos) + + # Create table arena + arena = TableArena( + table_full_size=self.table_full_size, + table_offset=self.table_offset, + table_friction=(1., 5e-3, 1e-4) + ) + arena.set_origin([0, 0, 0]) + + # 1. Define the custom push target object (as a static, flat box) + target_size = [0.07, 0.07, 0.001] + target_pos = self.table_offset + np.array([0.15, -0.15, 0.0005]) + + self.target_obj = BoxObject( + name="push_target_obj", + size_min=target_size, + size_max=target_size, + rgba=[0.5, 0.5, 0.5, 1], # Gray color + density=0, # Make it massless + joints=None, # No joints, fixed to the world + ) + + # --- THE CORRECTED LINES: FINAL FIX FOR INITIAL POSITIONING --- + # Set its fixed position directly on the object's initial attribute properties. + self.target_obj.initial_pos = target_pos + self.target_obj.initial_quat = np.array([1, 0, 0, 0]) + # ---------------------------------------------------------------------- + + # 2. Define the movable push cube + mat = CustomMaterial( + texture="WoodLight", + tex_name="push_cube_tex", + mat_name="push_cube_mat", + tex_attrib={"type": "cube"}, + mat_attrib={"texrepeat": "1 1", "specular": "0.4", "shininess": "0.1"}, + ) + self.push_objs = [] + for i in range(self.task_config['num_push_objs']): + size = np.array([0.035, 0.0425, 0.0125]) + obj = BoxObject( + name=f"obj_push_{i}", + size_min=size, + size_max=size, + rgba=[0, 1, 0, 1], + material=mat, + ) + self.push_objs.append(obj) + + # 3. Combine all MujocoObject instances (movable + static target) + all_objects = self.push_objs + [self.target_obj] + + # 4. Placement sampler for the movable cube(s) ONLY + if self.placement_initializer is None: + self.placement_initializer = UniformRandomSampler( + name="PushSampler", + mujoco_objects=self.push_objs, # ONLY the movable cube(s) + x_range=[0.0, 0.16], + y_range=[-0.16, 0.16], + rotation=None, + ensure_object_boundary_in_range=False, + ensure_valid_placement=True, + reference_pos=self.table_offset, + z_offset=0.01, + ) + else: + self.placement_initializer.reset() + self.placement_initializer.add_objects(self.push_objs) + + # 5. Create ManipulationTask with ALL objects + self.model = ManipulationTask( + mujoco_arena=arena, + mujoco_robots=[robot.robot_model for robot in self.robots], + mujoco_objects=all_objects, + ) + + # ------------------------- + # References and reset + # ------------------------- + def _get_reference(self): + super()._get_reference() + self.table_body_id = self.sim.model.body_name2id("table") + self.push_obj_body_ids = [] + for obj in self.push_objs: + bid = self.sim.model.body_name2id(obj.root_body) + self.push_obj_body_ids.append(bid) + + self.target_body_id = self.sim.model.body_name2id(self.target_obj.root_body) + + + def _reset_internal(self): + super()._reset_internal() + # Only reset position for the movable cubes using the initializer + while True: + try: + placements = self.placement_initializer.sample() + break + except RandomizationError: + continue + for obj_pos, obj_quat, obj in placements.values(): + self.sim.data.set_joint_qpos(obj.joints[0], np.concatenate([np.array(obj_pos), np.array(obj_quat)])) + + # ------------------------- + # Observations + # ------------------------- + def _get_observation(self): + di = super()._get_observation() + + pos = np.array(self.obj_positions).flatten() + quat = np.array(self.obj_quats).flatten() + di["obj_pos"] = pos + di["obj_quat"] = quat + di["object-state"] = np.concatenate([pos, quat]) + + return di + + @property + def obj_positions(self): + return [self.sim.data.body_xpos[bid].copy() for bid in self.push_obj_body_ids] + + @property + def obj_quats(self): + return [convert_quat(self.sim.data.body_xquat[bid], to="xyzw") for bid in self.push_obj_body_ids] \ No newline at end of file diff --git a/robosuite/environments/manipulation/stack_three.py b/robosuite/environments/manipulation/stack_three.py index defe273e5b..88b5903b42 100644 --- a/robosuite/environments/manipulation/stack_three.py +++ b/robosuite/environments/manipulation/stack_three.py @@ -1,17 +1,28 @@ from collections import OrderedDict import numpy as np + from robosuite.utils.transform_utils import convert_quat from robosuite.utils.mjcf_utils import CustomMaterial + from robosuite.environments.manipulation.single_arm_env import SingleArmEnv + from robosuite.models.arenas import TableArena from robosuite.models.objects import BoxObject from robosuite.models.tasks import ManipulationTask from robosuite.utils.placement_samplers import UniformRandomSampler -class StackThree(SingleArmEnv): + +class StackSequential(SingleArmEnv): """ - Minimal change version of StackThree with corrected staged_rewards for a guaranteed C -> B -> A stack order. + Multi-cube stacking environment with sequential deterministic order. + Example: for 3 cubes, build C <- B <- A (bottom to top). + + - Modular reward for each cube (reach, grasp, lift, stack) + - Sequential gating: next cube reward only after previous stacked + - Additive shaping avoids plateaus + - Optional contact-based stack validation """ + def __init__( self, robots, @@ -20,12 +31,12 @@ def __init__( gripper_types="default", initialization_noise="default", table_full_size=(0.8, 0.8, 0.05), - table_friction=(1., 5e-3, 1e-4), + table_friction=(1.0, 5e-3, 1e-4), table_offset=(0, 0, 0.8), use_camera_obs=True, use_object_obs=True, reward_scale=1.0, - reward_shaping=False, # We recommend setting this to True for this task + reward_shaping=True, placement_initializer=None, has_renderer=False, has_offscreen_renderer=True, @@ -43,24 +54,17 @@ def __init__( camera_depths=False, full_stacking_bonus=0.0, skill_config=None, + num_stack_objs=3, ): - # table/top settings + # Table and reward config self.table_full_size = table_full_size self.table_friction = table_friction self.table_offset = np.array(table_offset) - # reward config self.reward_scale = reward_scale self.reward_shaping = reward_shaping - # whether to include object states in observations - self.use_object_obs = use_object_obs - # object placement initializer self.placement_initializer = placement_initializer - # bonus for stacking (kept but not required for sparse success) self.full_stacking_bonus = full_stacking_bonus - - # Cube identifiers for staged rewards - self.cubes = None - self.cube_body_ids = None + self.num_stack_objs = num_stack_objs super().__init__( robots=robots, @@ -87,129 +91,117 @@ def __init__( skill_config=skill_config, ) + self.task_config = dict(num_stack_objs=num_stack_objs) + # ------------------------------------------------------------------ - # Reward logic (Phase 1 refactor) + # Reward logic # ------------------------------------------------------------------ - def reward(self, action): - r_reach, r_lift, r_stack = self.staged_rewards() - if self.reward_shaping: - reward = max(r_reach, r_lift, r_stack) - else: - # sparse: only give reward when the full 3-block stack is present - reward = 3.0 if (r_stack >= 3.0 - 1e-6) else 0.0 # Use 3.0 here for easier scaling - if self.reward_scale is not None: - # Final reward is scaled by reward_scale / 3.0 so that a full success equals reward_scale. - reward *= self.reward_scale / 3.0 + def reward(self, action): + """Top-level reward call.""" + _, _, reward = self.reward_infos() return reward - # ----------------------------------------------------------- - # CRITICAL FIX: Fixed-Stage Reward Logic for C -> B -> A Stacking - # ----------------------------------------------------------- - def staged_rewards(self): - """ - Returns 3 components (r_reach, r_lift, r_stack) with logic to enforce B on C, then A on B. + def reward_infos(self): """ - gripper_site_pos = self.sim.data.site_xpos[self.robots[0].eef_site_id] - - # Cube positions - cubeA_pos = self.sim.data.body_xpos[self.cubeA_body_id].copy() - cubeB_pos = self.sim.data.body_xpos[self.cubeB_body_id].copy() - cubeC_pos = self.sim.data.body_xpos[self.cubeC_body_id].copy() - - # Determine stack progress (B on C, then A on B) - b_on_c = self.check_contact(self.cubeB, self.cubeC) - a_on_b = self.check_contact(self.cubeA, self.cubeB) + Handles reward computation across all cubes sequentially. + Each cube contributes reward only after the previous one is successfully stacked. + """ + total_reward = 0.0 + num_cubes = self.task_config.get('num_stack_objs', 3) + stacked_success = [False] * num_cubes - r_stack = 0.0 - target_pick = None # The cube the robot should be aiming to GRASP - placement_base_pos = None # The position the HELD cube should be placed ON (e.g., C or B) - - # --- 1. Determine Stack Status and Next Target --- - if not b_on_c: - # Stage 1: Get Cube B and put it on Cube C. - target_pick = self.cubeB - placement_base_pos = cubeC_pos - elif b_on_c and not a_on_b: - # Stage 2: Get Cube A and put it on Cube B. - target_pick = self.cubeA - placement_base_pos = cubeB_pos - r_stack += 1.5 # Reward for B on C - elif b_on_c and a_on_b: - # Stage 3: Full Success! Tower is A on B on C. - target_pick = self.cubeA # Stay focused on the top block (A) - placement_base_pos = cubeB_pos - r_stack += 3.0 # Full reward - - # --- 2. Determine Held Cube --- - held_cube = None - for cube in [self.cubeA, self.cubeB, self.cubeC]: - if self._check_grasp(self.robots[0].gripper, cube): - held_cube = cube + for i in range(num_cubes): + r_reach, r_grasp, r_lift, r_stack, done = self.staged_stack_reward(obj_id=i) + reward_i = r_reach + r_grasp + r_lift + r_stack + total_reward += reward_i + stacked_success[i] = done + if not done: break - # --- 3. Reach and Grasp Reward (r_reach) --- - target_pick_pos = self.sim.data.body_xpos[self.sim.model.body_name2id(target_pick.root_body)] - - # Use a slightly softer tanh coefficient (5.0) than the original 10.0 for easier initial reaching - dist_reach = np.linalg.norm(gripper_site_pos - target_pick_pos) - r_reach = (1 - np.tanh(5.0 * dist_reach)) * 0.25 - - # Grasp reward: Only for successfully grasping the *correct* target cube. - if held_cube is target_pick: - r_reach += 0.25 # Max r_reach is 0.5 (0.25 reach + 0.25 grasp) - - # --- 4. Lift and Align Reward (r_lift) --- + info = dict( + r_total=total_reward, + stacked_success_count=stacked_success.count(True), + success_all=all(stacked_success), + ) + + # Optional full-stack bonus + if info["success_all"]: + total_reward += self.full_stacking_bonus + + return info['stacked_success_count'], total_reward, total_reward + + def staged_stack_reward(self, obj_id=0): + """ + Computes staged reward for one cube in stacking. + Stage order: reach → grasp → lift → stack. + """ + reach_mult = 0.15 + grasp_mult = 0.3 + lift_mult = 0.6 + stack_mult = 1.0 + + gripper_site_pos = self.sim.data.site_xpos[self.robots[0].eef_site_id] + obj_pos = self.sim.data.body_xpos[self.obj_body_ids[obj_id]] + target_z = self.table_offset[2] + 0.05 + 0.05 * obj_id + + # --- Reach --- + reach_dist = np.linalg.norm(gripper_site_pos - obj_pos) + r_reach = (1 - np.tanh(8.0 * reach_dist)) * reach_mult + + # --- Grasp --- + grasp_success = int(self._check_grasp(self.robots[0].gripper, self.objs[obj_id])) + r_grasp = grasp_success * grasp_mult + + # --- Lift --- r_lift = 0.0 - - # Lift reward: Only given if the correct cube is held and lifted - if held_cube is target_pick: - held_cube_pos = self.sim.data.body_xpos[self.sim.model.body_name2id(held_cube.root_body)] - - # Check if the held cube is lifted 4cm above the table plane - lifted = held_cube_pos[2] > self.table_offset[2] + 0.04 - - if lifted: - r_lift += 1.0 - - # Alignment reward: Distance between held cube and its intended placement base - # Use a slightly softer tanh coefficient (5.0) than the original 10.0 for easier alignment - horiz_dist = np.linalg.norm(held_cube_pos[:2] - placement_base_pos[:2]) - r_lift += 0.5 * (1 - np.tanh(5.0 * horiz_dist)) # Max r_lift is 1.5 - - return r_reach, r_lift, r_stack - - # [Rest of the class methods remain the same as your second version] - - def _get_env_info(self, action): - info = super()._get_env_info(action) - r_reach, r_lift, r_stack = self.staged_rewards() - cubes_stacked = (r_stack >= 3.0 - 1e-6) - info.update({ - 'r_reach_grasp': r_reach / 0.50, - 'r_lift_align': r_lift / 1.50, - 'r_stack': r_stack / 3.0, - 'cubes_stacked': cubes_stacked, - 'success': self._check_success(), - }) - return info - - def _get_skill_info(self): - cubeA_pos = self.sim.data.body_xpos[self.cubeA_body_id].copy() - cubeB_pos = self.sim.data.body_xpos[self.cubeB_body_id].copy() - cubeC_pos = self.sim.data.body_xpos[self.cubeC_body_id].copy() - pos_info = {} - pos_info['grasp'] = [cubeA_pos] - pos_info['reach'] = [cubeB_pos] - pos_info['base'] = [cubeC_pos] - info = {} - for k in pos_info: - info[k + '_pos'] = pos_info[k] - return info + if grasp_success: + z_diff = max(target_z - obj_pos[2], 0.) + r_lift = grasp_mult + (1 - np.tanh(10.0 * z_diff)) * (lift_mult - grasp_mult) + + # --- Stack alignment --- + r_stack = 0.0 + done = False + + if obj_id > 0: + below_obj_pos = self.sim.data.body_xpos[self.obj_body_ids[obj_id - 1]] + xy_dist = np.linalg.norm(obj_pos[:2] - below_obj_pos[:2]) + z_expected = below_obj_pos[2] + 0.05 # cube height + z_diff = abs(obj_pos[2] - z_expected) + in_contact = self.check_contact(self.objs[obj_id], self.objs[obj_id - 1]) + + if xy_dist < 0.03 and z_diff < 0.02 and in_contact: + r_stack = stack_mult + done = True + else: + align_xy = (1 - np.tanh(10.0 * xy_dist)) + align_z = np.exp(-30.0 * z_diff) + r_stack = (align_xy * align_z) * (stack_mult - lift_mult) + else: + # first cube: target = table center + target_xy = self.table_offset[:2] + xy_dist = np.linalg.norm(obj_pos[:2] - target_xy) + z_expected = self.table_offset[2] + 0.05 + z_diff = abs(obj_pos[2] - z_expected) + + if xy_dist < 0.03 and z_diff < 0.02: + done = True + r_stack = stack_mult + else: + align_xy = (1 - np.tanh(10.0 * xy_dist)) + align_z = np.exp(-30.0 * z_diff) + r_stack = (align_xy * align_z) * (stack_mult - lift_mult) + + reward = r_reach + r_grasp + r_lift + r_stack + return r_reach, r_grasp, r_lift, r_stack, done + + # ------------------------------------------------------------------ + # Model and setup + # ------------------------------------------------------------------ def _load_model(self): - """ Loads the model and adds three identical boxes (cubeA, cubeB, cubeC). """ super()._load_model() + xpos = self.robots[0].robot_model.base_xpos_offset["table"](self.table_full_size[0]) self.robots[0].robot_model.set_base_xpos(xpos) @@ -220,27 +212,44 @@ def _load_model(self): ) mujoco_arena.set_origin([0, 0, 0]) - # materials + # cube materials tex_attrib = {"type": "cube"} mat_attrib = {"texrepeat": "1 1", "specular": "0.4", "shininess": "0.1"} - redwood = CustomMaterial(texture="WoodRed", tex_name="redwood", mat_name="redwood_mat", tex_attrib=tex_attrib, mat_attrib=mat_attrib) - greenwood = CustomMaterial(texture="WoodGreen", tex_name="greenwood", mat_name="greenwood_mat", tex_attrib=tex_attrib, mat_attrib=mat_attrib) - bluewood = CustomMaterial(texture="WoodBlue", tex_name="bluewood", mat_name="bluewood_mat", tex_attrib=tex_attrib, mat_attrib=mat_attrib) - # Three IDENTICAL boxes (using the medium size: 0.025) + materials = [ + CustomMaterial( + texture=f"Wood{i}", + tex_name=f"wood{i}_tex", + mat_name=f"wood{i}_mat", + tex_attrib=tex_attrib, + mat_attrib=mat_attrib, + ) + for i in ["Red", "Green", "Blue", "Yellow"] + ] + + # cubes (identical size) cube_size = [0.025, 0.025, 0.025] - self.cubeA = BoxObject(name="cubeA", size_min=cube_size, size_max=cube_size, rgba=[1, 0, 0, 1], material=redwood) - self.cubeB = BoxObject(name="cubeB", size_min=cube_size, size_max=cube_size, rgba=[0, 1, 0, 1], material=greenwood) - self.cubeC = BoxObject(name="cubeC", size_min=cube_size, size_max=cube_size, rgba=[0, 0, 1, 1], material=bluewood) - cubes = [self.cubeA, self.cubeB, self.cubeC] + self.objs = [] + for i in range(self.num_stack_objs): + color = np.random.choice(["Red", "Green", "Blue", "Yellow"]) + mat = next(m for m in materials if color in m.mat_name) + cube = BoxObject( + name=f"cube{i}", + size_min=cube_size, + size_max=cube_size, + rgba=[0.8, 0.8, 0.8, 1], + material=mat, + ) + self.objs.append(cube) + # placement if self.placement_initializer is not None: self.placement_initializer.reset() - self.placement_initializer.add_objects(cubes) + self.placement_initializer.add_objects(self.objs) else: self.placement_initializer = UniformRandomSampler( name="ObjectSampler", - mujoco_objects=cubes, + mujoco_objects=self.objs, x_range=[-0.08, 0.08], y_range=[-0.08, 0.08], rotation=None, @@ -253,73 +262,40 @@ def _load_model(self): self.model = ManipulationTask( mujoco_arena=mujoco_arena, mujoco_robots=[robot.robot_model for robot in self.robots], - mujoco_objects=cubes, + mujoco_objects=self.objs, ) - self.cubes = cubes def _get_reference(self): super()._get_reference() - self.cubeA_body_id = self.sim.model.body_name2id(self.cubeA.root_body) - self.cubeB_body_id = self.sim.model.body_name2id(self.cubeB.root_body) - self.cubeC_body_id = self.sim.model.body_name2id(self.cubeC.root_body) - self.cube_body_ids = [self.cubeA_body_id, self.cubeB_body_id, self.cubeC_body_id] + self.obj_body_ids = [self.sim.model.body_name2id(o.root_body) for o in self.objs] def _reset_internal(self): super()._reset_internal() - if not self.deterministic_reset: - object_placements = self.placement_initializer.sample() - for obj_pos, obj_quat, obj in object_placements.values(): - self.sim.data.set_joint_qpos(obj.joints[0], np.concatenate([np.array(obj_pos), np.array(obj_quat)])) + object_placements = self.placement_initializer.sample() + for obj_pos, obj_quat, obj in object_placements.values(): + self.sim.data.set_joint_qpos( + obj.joints[0], np.concatenate([np.array(obj_pos), np.array(obj_quat)]) + ) def _get_observation(self): di = super()._get_observation() if self.use_object_obs: pr = self.robots[0].robot_model.naming_prefix - - def get_cube_data(cube_body_id): - pos = np.array(self.sim.data.body_xpos[cube_body_id]) - quat = convert_quat(np.array(self.sim.data.body_xquat[cube_body_id]), to="xyzw") - return pos, quat - - cubeA_pos, cubeA_quat = get_cube_data(self.cubeA_body_id) - cubeB_pos, cubeB_quat = get_cube_data(self.cubeB_body_id) - cubeC_pos, cubeC_quat = get_cube_data(self.cubeC_body_id) - - di["cubeA_pos"], di["cubeA_quat"] = cubeA_pos, cubeA_quat - di["cubeB_pos"], di["cubeB_quat"] = cubeB_pos, cubeB_quat - di["cubeC_pos"], di["cubeC_quat"] = cubeC_pos, cubeC_quat - - gripper_site_pos = np.array(self.sim.data.site_xpos[self.robots[0].eef_site_id]) - - di[pr + "gripper_to_cubeA"] = gripper_site_pos - cubeA_pos - di[pr + "gripper_to_cubeB"] = gripper_site_pos - cubeB_pos - di[pr + "gripper_to_cubeC"] = gripper_site_pos - cubeC_pos - di["cubeA_to_cubeB"] = cubeA_pos - cubeB_pos - di["cubeB_to_cubeC"] = cubeB_pos - cubeC_pos - di["cubeA_to_cubeC"] = cubeA_pos - cubeC_pos - - # object-state contains positions, quaternions, and relative vectors for all three cubes - di["object-state"] = np.concatenate( - [ - cubeA_pos, cubeA_quat, - cubeB_pos, cubeB_quat, - cubeC_pos, cubeC_quat, - di[pr + "gripper_to_cubeA"], - di[pr + "gripper_to_cubeB"], - di[pr + "gripper_to_cubeC"], - di["cubeA_to_cubeB"], - di["cubeB_to_cubeC"], - di["cubeA_to_cubeC"], - ] - ) - return di - def _check_success(self): - _, _, r_stack = self.staged_rewards() - return r_stack >= 3.0 - 1e-6 + obs = [] + for i, obj in enumerate(self.objs): + pos = np.array(self.sim.data.body_xpos[self.obj_body_ids[i]]) + quat = convert_quat(np.array(self.sim.data.body_xquat[self.obj_body_ids[i]]), to="xyzw") + obs.extend([*pos, *quat]) + di[f"{obj.name}_pos"] = pos + di[f"{obj.name}_quat"] = quat - def visualize(self, vis_settings): - super().visualize(vis_settings=vis_settings) - if vis_settings.get("grippers", False): - self._visualize_gripper_to_target(gripper=self.robots[0].gripper, target=self.cubeA) + di["object-state"] = np.array(obs) + return di + def _check_success(self): + """Success if all cubes stacked sequentially.""" + for i in range(1, self.num_stack_objs): + if not self.check_contact(self.objs[i], self.objs[i - 1]): + return False + return True