diff --git a/robosuite/__init__.py b/robosuite/__init__.py index 87b6ef07b0..e8bef8e66b 100644 --- a/robosuite/__init__.py +++ b/robosuite/__init__.py @@ -3,6 +3,8 @@ # Manipulation environments from robosuite.environments.manipulation.lift import Lift from robosuite.environments.manipulation.stack import Stack +from robosuite.environments.manipulation.stack_three import StackThree +from robosuite.environments.manipulation.arrange_three import ArrangeThree from robosuite.environments.manipulation.cleanup import Cleanup from robosuite.environments.manipulation.peg_in_hole import PegInHole from robosuite.environments.manipulation.nut_assembly import NutAssembly diff --git a/robosuite/environments/manipulation/arrange_three.py b/robosuite/environments/manipulation/arrange_three.py new file mode 100644 index 0000000000..c60a8da969 --- /dev/null +++ b/robosuite/environments/manipulation/arrange_three.py @@ -0,0 +1,308 @@ +from collections import OrderedDict +import numpy as np + +from robosuite.utils.transform_utils import convert_quat +from robosuite.utils.mjcf_utils import CustomMaterial +from robosuite.environments.manipulation.single_arm_env import SingleArmEnv +from robosuite.models.arenas import TableArena +from robosuite.models.objects import BoxObject +from robosuite.models.tasks import ManipulationTask +from robosuite.utils.placement_samplers import UniformRandomSampler + + +class ArrangeThree(SingleArmEnv): + """ + A simple pre-stacking task: + Arrange three cubes (A, B, C) side-by-side on a straight line along X (or Y). + No balancing or precise stacking required. Pushing is enough. + + Shaping terms (if reward_shaping=True): + - Line alignment (all cubes close to target line) + - Spacing alignment (pairwise distances close to target spacing) + - Height regularization (stay near tabletop height) + + Success (sparse): + - All cubes within tolerances for line alignment + spacing. + """ + + def __init__( + self, + robots, + env_configuration="default", + controller_configs=None, + gripper_types="default", + initialization_noise="default", + table_full_size=(0.8, 0.8, 0.05), + table_friction=(1.0, 5e-3, 1e-4), + table_offset=(0, 0, 0.8), + use_camera_obs=False, + use_object_obs=True, + reward_scale=1.0, + reward_shaping=True, + placement_initializer=None, + has_renderer=False, + has_offscreen_renderer=True, + render_camera="frontview", + render_collision_mesh=False, + render_visual_mesh=True, + render_gpu_device_id=-1, + control_freq=20, + horizon=400, + ignore_done=False, + hard_reset=True, + camera_names="agentview", + camera_heights=256, + camera_widths=256, + camera_depths=False, + # Task params + line_axis="x", # "x" (default) or "y" + target_spacing=0.06, # distance between adjacent cube centers on the line + tol_line=0.02, # max distance to line (per cube) + tol_spacing=0.02, # max spacing error (per pair) + tol_height=0.01, # max height error from tabletop + cube_size=0.025, # half-size per axis + skill_config=None, + ): + # Table & task settings + self.table_full_size = table_full_size + self.table_friction = table_friction + self.table_offset = np.array(table_offset) + + # Rewards + self.reward_scale = reward_scale + self.reward_shaping = reward_shaping + + # Obs / placement + self.use_object_obs = use_object_obs + self.placement_initializer = placement_initializer + + # Arrangement params + assert line_axis in ("x", "y") + self.line_axis = line_axis + self.target_spacing = float(target_spacing) + self.tol_line = float(tol_line) + self.tol_spacing = float(tol_spacing) + self.tol_height = float(tol_height) + self.cube_size = float(cube_size) + + super().__init__( + robots=robots, + env_configuration=env_configuration, + controller_configs=controller_configs, + mount_types="default", + gripper_types=gripper_types, + initialization_noise=initialization_noise, + use_camera_obs=use_camera_obs, + has_renderer=has_renderer, + has_offscreen_renderer=has_offscreen_renderer, + render_camera=render_camera, + render_collision_mesh=render_collision_mesh, + render_visual_mesh=render_visual_mesh, + render_gpu_device_id=render_gpu_device_id, + control_freq=control_freq, + horizon=horizon, + ignore_done=ignore_done, + hard_reset=hard_reset, + camera_names=camera_names, + camera_heights=camera_heights, + camera_widths=camera_widths, + camera_depths=camera_depths, + skill_config=skill_config, + ) + + # -------------------- Reward -------------------- + def reward(self, action): + r_align, r_space, r_height = self.staged_rewards() # each in [0, 1] + if self.reward_shaping: + # Weighted sum -> max 1.0 (before scaling) + # Put most weight on line alignment, then spacing, tiny on height + shaped = 0.5 * r_align + 0.45 * r_space + 0.05 * r_height + reward = shaped + else: + reward = 1.0 if self._check_success() else 0.0 + + if self.reward_scale is not None: + reward *= self.reward_scale # scale to your taste + return reward + + def staged_rewards(self): + """ + Returns (r_align, r_space, r_height), each in [0, 1], where: + - r_align: how close cubes are to the target line (perpendicular distance) + - r_space: how close pairwise spacings are to target spacing + - r_height: how close heights are to tabletop height (avoid accidental lifts) + """ + # Positions + A = self.sim.data.body_xpos[self.cubeA_body_id].copy() + B = self.sim.data.body_xpos[self.cubeB_body_id].copy() + C = self.sim.data.body_xpos[self.cubeC_body_id].copy() + pos = np.stack([A, B, C], axis=0) + + # Table height reference + table_z = self.table_offset[2] + z = pos[:, 2] + + # Target line is through the table center along chosen axis. + # We'll anchor at table_offset (x0,y0) and require all cubes be close to that line. + if self.line_axis == "x": + # x can vary freely; y should be near table_offset[1] + dist_to_line = np.abs(pos[:, 1] - self.table_offset[1]) + # spacing along x + coords = np.sort(pos[:, 0]) + else: + # y-axis line: y can vary; x should be near table_offset[0] + dist_to_line = np.abs(pos[:, 0] - self.table_offset[0]) + # spacing along y + coords = np.sort(pos[:, 1]) + + # r_align: map distance-to-line into [0,1] with tolerance + # 1.0 when within tol_line; decays with tanh outside + def smooth_clamp01(d, tol): + # <= tol -> ~1; increases beyond tol -> decreases smoothly + # Use scaled tanh for a soft boundary + return np.clip(1.0 - np.tanh(5.0 * np.maximum(0.0, d - tol)), 0.0, 1.0) + + r_align_per = smooth_clamp01(dist_to_line, self.tol_line) + r_align = float(np.mean(r_align_per)) + + # r_space: compare pairwise distances on the line axis to [0, s, 2s] + # After sorting coords: target deltas are [s, s] between neighbors + deltas = np.diff(coords) # shape (2,) + spacing_err = np.abs(deltas - self.target_spacing) + r_space_pair = smooth_clamp01(spacing_err, self.tol_spacing) + r_space = float(np.mean(r_space_pair)) if r_space_pair.size > 0 else 0.0 + + # r_height: keep near table height (+ half cube) + # We want them resting on the table, not lifted: target z ~= table_z + cube_size + target_z = table_z + self.cube_size + r_height_per = smooth_clamp01(np.abs(z - target_z), self.tol_height) + r_height = float(np.mean(r_height_per)) + + return r_align, r_space, r_height + + # -------------------- Info / Success -------------------- + def _get_env_info(self, action): + info = super()._get_env_info(action) + r_align, r_space, r_height = self.staged_rewards() + info.update({ + "r_align": r_align, + "r_space": r_space, + "r_height": r_height, + "success": self._check_success(), + }) + return info + + def _check_success(self): + r_align, r_space, r_height = self.staged_rewards() + # Require each term be high enough + return (r_align > 0.95) and (r_space > 0.95) and (r_height > 0.95) + + # -------------------- Model & Objects -------------------- + def _load_model(self): + super()._load_model() + + # Place robot base + xpos = self.robots[0].robot_model.base_xpos_offset["table"](self.table_full_size[0]) + self.robots[0].robot_model.set_base_xpos(xpos) + + # Arena + arena = TableArena( + table_full_size=self.table_full_size, + table_friction=self.table_friction, + table_offset=self.table_offset, + ) + arena.set_origin([0, 0, 0]) + + # Materials + tex = {"type": "cube"} + mat = {"texrepeat": "1 1", "specular": "0.4", "shininess": "0.1"} + redwood = CustomMaterial(texture="WoodRed", tex_name="redwood", mat_name="redwood_mat", tex_attrib=tex, mat_attrib=mat) + greenwd = CustomMaterial(texture="WoodGreen", tex_name="greenwd", mat_name="greenwd_mat", tex_attrib=tex, mat_attrib=mat) + bluewd = CustomMaterial(texture="WoodBlue", tex_name="bluewd", mat_name="bluewd_mat", tex_attrib=tex, mat_attrib=mat) + + size = [self.cube_size, self.cube_size, self.cube_size] + # Use keyword args to avoid size/rgba mix-ups across robosuite versions + self.cubeA = BoxObject(name="cubeA", size_min=size, size_max=size, rgba=[1, 0, 0, 1], material=redwood) + self.cubeB = BoxObject(name="cubeB", size_min=size, size_max=size, rgba=[0, 1, 0, 1], material=greenwd) + self.cubeC = BoxObject(name="cubeC", size_min=size, size_max=size, rgba=[0, 0, 1, 1], material=bluewd) + cubes = [self.cubeA, self.cubeB, self.cubeC] + + # Placement + if self.placement_initializer is not None: + self.placement_initializer.reset() + self.placement_initializer.add_objects(cubes) + else: + self.placement_initializer = UniformRandomSampler( + name="ArrangeSampler", + mujoco_objects=cubes, + x_range=[-0.10, 0.10], + y_range=[-0.10, 0.10], + rotation=None, + ensure_object_boundary_in_range=False, + ensure_valid_placement=True, + reference_pos=self.table_offset, + z_offset=0.01, + ) + + # Task + self.model = ManipulationTask( + mujoco_arena=arena, + mujoco_robots=[robot.robot_model for robot in self.robots], + mujoco_objects=cubes, + ) + + def _get_reference(self): + super()._get_reference() + self.cubeA_body_id = self.sim.model.body_name2id(self.cubeA.root_body) + self.cubeB_body_id = self.sim.model.body_name2id(self.cubeB.root_body) + self.cubeC_body_id = self.sim.model.body_name2id(self.cubeC.root_body) + + def _reset_internal(self): + super()._reset_internal() + if not self.deterministic_reset: + placements = self.placement_initializer.sample() + for obj_pos, obj_quat, obj in placements.values(): + self.sim.data.set_joint_qpos(obj.joints[0], np.concatenate([np.array(obj_pos), np.array(obj_quat)])) + + # -------------------- Observations -------------------- + def _get_observation(self): + di = super()._get_observation() + + if self.use_object_obs: + pr = self.robots[0].robot_model.naming_prefix + + def get_data(bid): + pos = np.array(self.sim.data.body_xpos[bid]) + quat = convert_quat(np.array(self.sim.data.body_xquat[bid]), to="xyzw") + return pos, quat + + Apos, Aquat = get_data(self.cubeA_body_id) + Bpos, Bquat = get_data(self.cubeB_body_id) + Cpos, Cquat = get_data(self.cubeC_body_id) + + di["cubeA_pos"], di["cubeA_quat"] = Apos, Aquat + di["cubeB_pos"], di["cubeB_quat"] = Bpos, Bquat + di["cubeC_pos"], di["cubeC_quat"] = Cpos, Cquat + + grip = np.array(self.sim.data.site_xpos[self.robots[0].eef_site_id]) + di[pr + "gripper_to_cubeA"] = grip - Apos + di[pr + "gripper_to_cubeB"] = grip - Bpos + di[pr + "gripper_to_cubeC"] = grip - Cpos + + di["cubeA_to_cubeB"] = Apos - Bpos + di["cubeB_to_cubeC"] = Bpos - Cpos + di["cubeA_to_cubeC"] = Apos - Cpos + + di["object-state"] = np.concatenate([ + Apos, Aquat, Bpos, Bquat, Cpos, Cquat, + di[pr + "gripper_to_cubeA"], di[pr + "gripper_to_cubeB"], di[pr + "gripper_to_cubeC"], + di["cubeA_to_cubeB"], di["cubeB_to_cubeC"], di["cubeA_to_cubeC"], + ]) + + return di + + # -------------------- Viz -------------------- + def visualize(self, vis_settings): + super().visualize(vis_settings=vis_settings) + # Optional: you could draw a thin line on the table to show the target alignment. + # For a minimal version, we keep default visualization. diff --git a/robosuite/environments/manipulation/push.py b/robosuite/environments/manipulation/push.py new file mode 100644 index 0000000000..f830935838 --- /dev/null +++ b/robosuite/environments/manipulation/push.py @@ -0,0 +1,269 @@ +from collections import OrderedDict +import numpy as np + +from robosuite.environments.manipulation.single_arm_env import SingleArmEnv +from robosuite.models.arenas import TableArena +from robosuite.models.objects import BoxObject +from robosuite.models.tasks import ManipulationTask +from robosuite.utils.placement_samplers import UniformRandomSampler +from robosuite.utils import RandomizationError +from robosuite.utils.transform_utils import convert_quat +from robosuite.utils.mjcf_utils import CustomMaterial + + +DEFAULT_PUSH_CONFIG = { + 'use_push_rew': True, + 'shaped_push_rew': True, + 'push_scale_fac': 5.0, + 'num_push_objs': 1, +} + + +class Push(SingleArmEnv): + """ + Push-only environment: push a single cube to a fixed gray circular target on the table. + """ + + def __init__( + self, + robots, + env_configuration="default", + controller_configs=None, + gripper_types="default", + initialization_noise="default", + table_full_size=(0.8, 0.8, 0.05), + table_friction=(1., 5e-3, 1e-4), + table_offset=(0, 0, 0.8), + use_camera_obs=True, + use_object_obs=True, + reward_scale=1.0, + reward_shaping=True, + placement_initializer=None, + has_renderer=False, + has_offscreen_renderer=True, + render_camera="frontview", + render_collision_mesh=False, + render_visual_mesh=True, + render_gpu_device_id=-1, + control_freq=20, + horizon=1000, + ignore_done=False, + hard_reset=True, + camera_names="agentview", + camera_heights=256, + camera_widths=256, + camera_depths=False, + skill_config=None, + ): + # Store environment-specific parameters locally + self.table_full_size = table_full_size + self.table_friction = table_friction + self.table_offset = np.array(table_offset) + self.reward_scale = reward_scale + self.reward_shaping = reward_shaping + + self.use_object_obs = use_object_obs + self.placement_initializer = placement_initializer + self.task_config = DEFAULT_PUSH_CONFIG.copy() + + # Call parent constructor with standard, expected arguments only + super().__init__( + robots=robots, + env_configuration=env_configuration, + controller_configs=controller_configs, + mount_types="default", + gripper_types=gripper_types, + initialization_noise=initialization_noise, + use_camera_obs=use_camera_obs, + has_renderer=has_renderer, + has_offscreen_renderer=has_offscreen_renderer, + render_camera=render_camera, + render_collision_mesh=render_collision_mesh, + render_visual_mesh=render_visual_mesh, + render_gpu_device_id=render_gpu_device_id, + control_freq=control_freq, + horizon=horizon, + ignore_done=ignore_done, + hard_reset=hard_reset, + camera_names=camera_names, + camera_heights=camera_heights, + camera_widths=camera_widths, + camera_depths=camera_depths, + skill_config=skill_config, + ) + + # ------------------------- + # Reward functions + # ------------------------- + def reward(self, action): + _, reward = self.reward_infos() + return reward + + def reward_infos(self): + rew_push = 0 + for i in range(self.task_config['num_push_objs']): + r_reach, r_push, _ = self.push_staged_rewards(obj_id=i) + if self.reward_shaping: + rew_push += r_reach + r_push + else: + rew_push += r_push + + if self.reward_scale is not None: + rew_push *= self.reward_scale + return {}, rew_push + + def push_staged_rewards(self, obj_id=0): + gripper_pos = self.sim.data.site_xpos[self.robots[0].eef_site_id] + obj_pos = self.sim.data.body_xpos[self.push_obj_body_ids[obj_id]] + # Fixed target position (x_table+0.15, y_table-0.15) + target_xy = self.table_offset[:2] + np.array([0.15, -0.15]) + + d_push = np.linalg.norm(obj_pos[:2] - target_xy) + r_push = 1 - np.tanh(self.task_config['push_scale_fac'] * d_push) + + # reach reward + th = [0.08, 0.08, 0.04] + d_reach = np.sum(np.clip(np.abs(gripper_pos - obj_pos) - th, 0, None)) + r_reach = (1 - np.tanh(10.0 * d_reach)) * 0.25 + + return r_reach, r_push, d_push + + def _check_success(self): + for i in range(self.task_config['num_push_objs']): + _, _, d_push = self.push_staged_rewards(obj_id=i) + # 0.10m threshold for success + if d_push > 0.10: + return False + return True + + # ------------------------- + # Model / arena + # ------------------------- + def _load_model(self): + super()._load_model() + + # Set robot position + xpos = self.robots[0].robot_model.base_xpos_offset["table"](self.table_full_size[0]) + self.robots[0].robot_model.set_base_xpos(xpos) + + # Create table arena + arena = TableArena( + table_full_size=self.table_full_size, + table_offset=self.table_offset, + table_friction=(1., 5e-3, 1e-4) + ) + arena.set_origin([0, 0, 0]) + + # 1. Define the custom push target object (as a static, flat box) + target_size = [0.07, 0.07, 0.001] + target_pos = self.table_offset + np.array([0.15, -0.15, 0.0005]) + + self.target_obj = BoxObject( + name="push_target_obj", + size_min=target_size, + size_max=target_size, + rgba=[0.5, 0.5, 0.5, 1], # Gray color + density=0, # Make it massless + joints=None, # No joints, fixed to the world + ) + + # --- THE CORRECTED LINES: FINAL FIX FOR INITIAL POSITIONING --- + # Set its fixed position directly on the object's initial attribute properties. + self.target_obj.initial_pos = target_pos + self.target_obj.initial_quat = np.array([1, 0, 0, 0]) + # ---------------------------------------------------------------------- + + # 2. Define the movable push cube + mat = CustomMaterial( + texture="WoodLight", + tex_name="push_cube_tex", + mat_name="push_cube_mat", + tex_attrib={"type": "cube"}, + mat_attrib={"texrepeat": "1 1", "specular": "0.4", "shininess": "0.1"}, + ) + self.push_objs = [] + for i in range(self.task_config['num_push_objs']): + size = np.array([0.035, 0.0425, 0.0125]) + obj = BoxObject( + name=f"obj_push_{i}", + size_min=size, + size_max=size, + rgba=[0, 1, 0, 1], + material=mat, + ) + self.push_objs.append(obj) + + # 3. Combine all MujocoObject instances (movable + static target) + all_objects = self.push_objs + [self.target_obj] + + # 4. Placement sampler for the movable cube(s) ONLY + if self.placement_initializer is None: + self.placement_initializer = UniformRandomSampler( + name="PushSampler", + mujoco_objects=self.push_objs, # ONLY the movable cube(s) + x_range=[0.0, 0.16], + y_range=[-0.16, 0.16], + rotation=None, + ensure_object_boundary_in_range=False, + ensure_valid_placement=True, + reference_pos=self.table_offset, + z_offset=0.01, + ) + else: + self.placement_initializer.reset() + self.placement_initializer.add_objects(self.push_objs) + + # 5. Create ManipulationTask with ALL objects + self.model = ManipulationTask( + mujoco_arena=arena, + mujoco_robots=[robot.robot_model for robot in self.robots], + mujoco_objects=all_objects, + ) + + # ------------------------- + # References and reset + # ------------------------- + def _get_reference(self): + super()._get_reference() + self.table_body_id = self.sim.model.body_name2id("table") + self.push_obj_body_ids = [] + for obj in self.push_objs: + bid = self.sim.model.body_name2id(obj.root_body) + self.push_obj_body_ids.append(bid) + + self.target_body_id = self.sim.model.body_name2id(self.target_obj.root_body) + + + def _reset_internal(self): + super()._reset_internal() + # Only reset position for the movable cubes using the initializer + while True: + try: + placements = self.placement_initializer.sample() + break + except RandomizationError: + continue + for obj_pos, obj_quat, obj in placements.values(): + self.sim.data.set_joint_qpos(obj.joints[0], np.concatenate([np.array(obj_pos), np.array(obj_quat)])) + + # ------------------------- + # Observations + # ------------------------- + def _get_observation(self): + di = super()._get_observation() + + pos = np.array(self.obj_positions).flatten() + quat = np.array(self.obj_quats).flatten() + di["obj_pos"] = pos + di["obj_quat"] = quat + di["object-state"] = np.concatenate([pos, quat]) + + return di + + @property + def obj_positions(self): + return [self.sim.data.body_xpos[bid].copy() for bid in self.push_obj_body_ids] + + @property + def obj_quats(self): + return [convert_quat(self.sim.data.body_xquat[bid], to="xyzw") for bid in self.push_obj_body_ids] \ No newline at end of file diff --git a/robosuite/environments/manipulation/stack_three.py b/robosuite/environments/manipulation/stack_three.py new file mode 100644 index 0000000000..88b5903b42 --- /dev/null +++ b/robosuite/environments/manipulation/stack_three.py @@ -0,0 +1,301 @@ +from collections import OrderedDict +import numpy as np + +from robosuite.utils.transform_utils import convert_quat +from robosuite.utils.mjcf_utils import CustomMaterial + +from robosuite.environments.manipulation.single_arm_env import SingleArmEnv + +from robosuite.models.arenas import TableArena +from robosuite.models.objects import BoxObject +from robosuite.models.tasks import ManipulationTask +from robosuite.utils.placement_samplers import UniformRandomSampler + + +class StackSequential(SingleArmEnv): + """ + Multi-cube stacking environment with sequential deterministic order. + Example: for 3 cubes, build C <- B <- A (bottom to top). + + - Modular reward for each cube (reach, grasp, lift, stack) + - Sequential gating: next cube reward only after previous stacked + - Additive shaping avoids plateaus + - Optional contact-based stack validation + """ + + def __init__( + self, + robots, + env_configuration="default", + controller_configs=None, + gripper_types="default", + initialization_noise="default", + table_full_size=(0.8, 0.8, 0.05), + table_friction=(1.0, 5e-3, 1e-4), + table_offset=(0, 0, 0.8), + use_camera_obs=True, + use_object_obs=True, + reward_scale=1.0, + reward_shaping=True, + placement_initializer=None, + has_renderer=False, + has_offscreen_renderer=True, + render_camera="frontview", + render_collision_mesh=False, + render_visual_mesh=True, + render_gpu_device_id=-1, + control_freq=20, + horizon=1000, + ignore_done=False, + hard_reset=True, + camera_names="agentview", + camera_heights=256, + camera_widths=256, + camera_depths=False, + full_stacking_bonus=0.0, + skill_config=None, + num_stack_objs=3, + ): + # Table and reward config + self.table_full_size = table_full_size + self.table_friction = table_friction + self.table_offset = np.array(table_offset) + self.reward_scale = reward_scale + self.reward_shaping = reward_shaping + self.placement_initializer = placement_initializer + self.full_stacking_bonus = full_stacking_bonus + self.num_stack_objs = num_stack_objs + + super().__init__( + robots=robots, + env_configuration=env_configuration, + controller_configs=controller_configs, + mount_types="default", + gripper_types=gripper_types, + initialization_noise=initialization_noise, + use_camera_obs=use_camera_obs, + has_renderer=has_renderer, + has_offscreen_renderer=has_offscreen_renderer, + render_camera=render_camera, + render_collision_mesh=render_collision_mesh, + render_visual_mesh=render_visual_mesh, + render_gpu_device_id=render_gpu_device_id, + control_freq=control_freq, + horizon=horizon, + ignore_done=ignore_done, + hard_reset=hard_reset, + camera_names=camera_names, + camera_heights=camera_heights, + camera_widths=camera_widths, + camera_depths=camera_depths, + skill_config=skill_config, + ) + + self.task_config = dict(num_stack_objs=num_stack_objs) + + # ------------------------------------------------------------------ + # Reward logic + # ------------------------------------------------------------------ + + def reward(self, action): + """Top-level reward call.""" + _, _, reward = self.reward_infos() + return reward + + def reward_infos(self): + """ + Handles reward computation across all cubes sequentially. + Each cube contributes reward only after the previous one is successfully stacked. + """ + total_reward = 0.0 + num_cubes = self.task_config.get('num_stack_objs', 3) + stacked_success = [False] * num_cubes + + for i in range(num_cubes): + r_reach, r_grasp, r_lift, r_stack, done = self.staged_stack_reward(obj_id=i) + reward_i = r_reach + r_grasp + r_lift + r_stack + total_reward += reward_i + stacked_success[i] = done + if not done: + break + + info = dict( + r_total=total_reward, + stacked_success_count=stacked_success.count(True), + success_all=all(stacked_success), + ) + + # Optional full-stack bonus + if info["success_all"]: + total_reward += self.full_stacking_bonus + + return info['stacked_success_count'], total_reward, total_reward + + def staged_stack_reward(self, obj_id=0): + """ + Computes staged reward for one cube in stacking. + Stage order: reach → grasp → lift → stack. + """ + reach_mult = 0.15 + grasp_mult = 0.3 + lift_mult = 0.6 + stack_mult = 1.0 + + gripper_site_pos = self.sim.data.site_xpos[self.robots[0].eef_site_id] + obj_pos = self.sim.data.body_xpos[self.obj_body_ids[obj_id]] + target_z = self.table_offset[2] + 0.05 + 0.05 * obj_id + + # --- Reach --- + reach_dist = np.linalg.norm(gripper_site_pos - obj_pos) + r_reach = (1 - np.tanh(8.0 * reach_dist)) * reach_mult + + # --- Grasp --- + grasp_success = int(self._check_grasp(self.robots[0].gripper, self.objs[obj_id])) + r_grasp = grasp_success * grasp_mult + + # --- Lift --- + r_lift = 0.0 + if grasp_success: + z_diff = max(target_z - obj_pos[2], 0.) + r_lift = grasp_mult + (1 - np.tanh(10.0 * z_diff)) * (lift_mult - grasp_mult) + + # --- Stack alignment --- + r_stack = 0.0 + done = False + + if obj_id > 0: + below_obj_pos = self.sim.data.body_xpos[self.obj_body_ids[obj_id - 1]] + xy_dist = np.linalg.norm(obj_pos[:2] - below_obj_pos[:2]) + z_expected = below_obj_pos[2] + 0.05 # cube height + z_diff = abs(obj_pos[2] - z_expected) + in_contact = self.check_contact(self.objs[obj_id], self.objs[obj_id - 1]) + + if xy_dist < 0.03 and z_diff < 0.02 and in_contact: + r_stack = stack_mult + done = True + else: + align_xy = (1 - np.tanh(10.0 * xy_dist)) + align_z = np.exp(-30.0 * z_diff) + r_stack = (align_xy * align_z) * (stack_mult - lift_mult) + else: + # first cube: target = table center + target_xy = self.table_offset[:2] + xy_dist = np.linalg.norm(obj_pos[:2] - target_xy) + z_expected = self.table_offset[2] + 0.05 + z_diff = abs(obj_pos[2] - z_expected) + + if xy_dist < 0.03 and z_diff < 0.02: + done = True + r_stack = stack_mult + else: + align_xy = (1 - np.tanh(10.0 * xy_dist)) + align_z = np.exp(-30.0 * z_diff) + r_stack = (align_xy * align_z) * (stack_mult - lift_mult) + + reward = r_reach + r_grasp + r_lift + r_stack + return r_reach, r_grasp, r_lift, r_stack, done + + # ------------------------------------------------------------------ + # Model and setup + # ------------------------------------------------------------------ + + def _load_model(self): + super()._load_model() + + xpos = self.robots[0].robot_model.base_xpos_offset["table"](self.table_full_size[0]) + self.robots[0].robot_model.set_base_xpos(xpos) + + mujoco_arena = TableArena( + table_full_size=self.table_full_size, + table_friction=self.table_friction, + table_offset=self.table_offset, + ) + mujoco_arena.set_origin([0, 0, 0]) + + # cube materials + tex_attrib = {"type": "cube"} + mat_attrib = {"texrepeat": "1 1", "specular": "0.4", "shininess": "0.1"} + + materials = [ + CustomMaterial( + texture=f"Wood{i}", + tex_name=f"wood{i}_tex", + mat_name=f"wood{i}_mat", + tex_attrib=tex_attrib, + mat_attrib=mat_attrib, + ) + for i in ["Red", "Green", "Blue", "Yellow"] + ] + + # cubes (identical size) + cube_size = [0.025, 0.025, 0.025] + self.objs = [] + for i in range(self.num_stack_objs): + color = np.random.choice(["Red", "Green", "Blue", "Yellow"]) + mat = next(m for m in materials if color in m.mat_name) + cube = BoxObject( + name=f"cube{i}", + size_min=cube_size, + size_max=cube_size, + rgba=[0.8, 0.8, 0.8, 1], + material=mat, + ) + self.objs.append(cube) + + # placement + if self.placement_initializer is not None: + self.placement_initializer.reset() + self.placement_initializer.add_objects(self.objs) + else: + self.placement_initializer = UniformRandomSampler( + name="ObjectSampler", + mujoco_objects=self.objs, + x_range=[-0.08, 0.08], + y_range=[-0.08, 0.08], + rotation=None, + ensure_object_boundary_in_range=False, + ensure_valid_placement=True, + reference_pos=self.table_offset, + z_offset=0.01, + ) + + self.model = ManipulationTask( + mujoco_arena=mujoco_arena, + mujoco_robots=[robot.robot_model for robot in self.robots], + mujoco_objects=self.objs, + ) + + def _get_reference(self): + super()._get_reference() + self.obj_body_ids = [self.sim.model.body_name2id(o.root_body) for o in self.objs] + + def _reset_internal(self): + super()._reset_internal() + object_placements = self.placement_initializer.sample() + for obj_pos, obj_quat, obj in object_placements.values(): + self.sim.data.set_joint_qpos( + obj.joints[0], np.concatenate([np.array(obj_pos), np.array(obj_quat)]) + ) + + def _get_observation(self): + di = super()._get_observation() + if self.use_object_obs: + pr = self.robots[0].robot_model.naming_prefix + + obs = [] + for i, obj in enumerate(self.objs): + pos = np.array(self.sim.data.body_xpos[self.obj_body_ids[i]]) + quat = convert_quat(np.array(self.sim.data.body_xquat[self.obj_body_ids[i]]), to="xyzw") + obs.extend([*pos, *quat]) + di[f"{obj.name}_pos"] = pos + di[f"{obj.name}_quat"] = quat + + di["object-state"] = np.array(obs) + return di + + def _check_success(self): + """Success if all cubes stacked sequentially.""" + for i in range(1, self.num_stack_objs): + if not self.check_contact(self.objs[i], self.objs[i - 1]): + return False + return True diff --git a/robosuite/utils/placement_samplers.py b/robosuite/utils/placement_samplers.py index 4d97a5fc87..081413575f 100644 --- a/robosuite/utils/placement_samplers.py +++ b/robosuite/utils/placement_samplers.py @@ -1,4 +1,5 @@ import collections +from collections.abc import Iterable import numpy as np from copy import copy @@ -194,7 +195,7 @@ def _sample_quat(self): """ if self.rotation is None: rot_angle = np.random.uniform(high=2 * np.pi, low=0) - elif isinstance(self.rotation, collections.Iterable): + elif isinstance(self.rotation, Iterable): rot_angle = np.random.uniform( high=max(self.rotation), low=min(self.rotation) )