diff --git a/tests/unit/vertexai/genai/replays/test_skills_update.py b/tests/unit/vertexai/genai/replays/test_skills_update.py new file mode 100644 index 0000000000..f001be7d08 --- /dev/null +++ b/tests/unit/vertexai/genai/replays/test_skills_update.py @@ -0,0 +1,103 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Tests the skills.update() method against the Vertex AI endpoint using replays.""" + +import io +import os +import zipfile + +from tests.unit.vertexai.genai.replays import pytest_helper +from vertexai._genai import types + +# MANDATORY: Initialize the replay test framework for this module +pytestmark = pytest_helper.setup( + file=__file__, + globals_for_file=globals(), +) + +PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT", "srbai-testing") +REGION = "us-central1" + + +def test_update_skill(client, tmp_path): + # Target the autopush sandbox endpoint for the Skill Registry API + client._api_client._http_options.base_url = ( + "https://us-central1-autopush-aiplatform.sandbox.googleapis.com" + ) + + # 1. Create a fresh unique skill first + with open(tmp_path / "SKILL.md", "w") as f: + f.write("# Test Skill\nInitial content.") + + created_skill = client.skills.create( + display_name="Original Skill", + description="Original Description", + config=types.CreateSkillConfig( + local_path=str(tmp_path), wait_for_completion=True + ), + ) + + # 2. Perform the metadata-only update on the new skill + updated_skill = client.skills.update( + name=created_skill.name, + config=types.UpdateSkillConfig( + display_name="My Updated Replay Skill", + description="My Updated Replay Skill Description", + wait_for_completion=True, + ), + ) + + assert updated_skill.name == created_skill.name + assert updated_skill.display_name == "My Updated Replay Skill" + assert updated_skill.description == "My Updated Replay Skill Description" + + +def test_update_skill_with_zipped_bytes(client, tmp_path): + # Target the autopush sandbox endpoint for the Skill Registry API + client._api_client._http_options.base_url = ( + "https://us-central1-autopush-aiplatform.sandbox.googleapis.com" + ) + + # 1. Create a fresh unique skill first + with open(tmp_path / "SKILL.md", "w") as f: + f.write("# Test Skill\nInitial content.") + + created_skill = client.skills.create( + display_name="Original Skill", + description="Original Description", + config=types.CreateSkillConfig( + local_path=str(tmp_path), wait_for_completion=True + ), + ) + + # 2. Prepare zipped bytes for update + zip_buffer = io.BytesIO() + zinfo = zipfile.ZipInfo("SKILL.md", date_time=(1980, 1, 1, 0, 0, 0)) + with zipfile.ZipFile(zip_buffer, "w") as zip_file: + zip_file.writestr(zinfo, "# My Updated Zipped Replay Skill\nThis is updated.") + zipped_bytes = zip_buffer.getvalue() + + # 3. Update the skill with new zipped bytes + updated_skill = client.skills.update( + name=created_skill.name, + config=types.UpdateSkillConfig( + zipped_filesystem=zipped_bytes, wait_for_completion=True + ), + ) + + assert updated_skill.name == created_skill.name + assert ( + updated_skill.display_name == "Original Skill" + ) # Display name remains unchanged diff --git a/vertexai/_genai/skills.py b/vertexai/_genai/skills.py index 665a07ad26..50fd0b56eb 100644 --- a/vertexai/_genai/skills.py +++ b/vertexai/_genai/skills.py @@ -126,6 +126,47 @@ def _RetrieveSkillsRequestParameters_to_vertex( return to_object +def _UpdateSkillConfig_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + + if getv(from_object, ["display_name"]) is not None: + setv(parent_object, ["displayName"], getv(from_object, ["display_name"])) + + if getv(from_object, ["description"]) is not None: + setv(parent_object, ["description"], getv(from_object, ["description"])) + + if getv(from_object, ["zipped_filesystem"]) is not None: + setv( + parent_object, + ["zippedFilesystem"], + getv(from_object, ["zipped_filesystem"]), + ) + + if getv(from_object, ["update_mask"]) is not None: + setv( + parent_object, ["_query", "updateMask"], getv(from_object, ["update_mask"]) + ) + + return to_object + + +def _UpdateSkillRequestParameters_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + if getv(from_object, ["name"]) is not None: + setv(to_object, ["_url", "name"], getv(from_object, ["name"])) + + if getv(from_object, ["config"]) is not None: + _UpdateSkillConfig_to_vertex(getv(from_object, ["config"]), to_object) + + return to_object + + class Skills(_api_module.BaseModule): """Class for managing Skills in the Skill Registry.""" @@ -341,6 +382,75 @@ def _create( self._api_client._verify_response(return_value) return return_value + def _update( + self, *, name: str, config: Optional[types.UpdateSkillConfigOrDict] = None + ) -> types.SkillOperation: + """ + Updates a Skill. + """ + + parameter_model = types._UpdateSkillRequestParameters( + name=name, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError( + "This method is only supported in the Gemini Enterprise Agent Platform (previously known as Vertex AI) client." + ) + else: + request_dict = _UpdateSkillRequestParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}".format_map(request_url_dict) + else: + path = "{name}" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = self._api_client.request("patch", path, request_dict, http_options) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.SkillOperation._from_response( + response=response_dict, + kwargs=( + { + "config": { + "response_schema": getattr( + parameter_model.config, "response_schema", None + ), + "response_json_schema": getattr( + parameter_model.config, "response_json_schema", None + ), + "include_all_fields": getattr( + parameter_model.config, "include_all_fields", None + ), + } + } + if getattr(parameter_model, "config", None) + else {} + ), + ) + + self._api_client._verify_response(return_value) + return return_value + def _get_skill_operation( self, *, @@ -492,6 +602,100 @@ def create( return operation + def update( + self, + *, + name: str, + config: Optional[types.UpdateSkillConfigOrDict] = None, + ) -> Union[types.Skill, types.SkillOperation]: + """Updates an existing Skill. + + Args: + name (str): + Required. The resource name of the Skill to update. + Format: projects/{project}/locations/{location}/skills/{skill} + config (UpdateSkillConfigOrDict): + Optional. The configuration for updating the Skill. + + Returns: + Skill: The updated Skill if wait_for_completion is True. + SkillOperation: The operation for updating the Skill if + wait_for_completion is False. + """ + if config is None: + config = types.UpdateSkillConfig() + elif isinstance(config, dict): + config = types.UpdateSkillConfig.model_validate(config) + elif not isinstance(config, types.UpdateSkillConfig): + raise TypeError( + f"config must be a dict or UpdateSkillConfig, but got {type(config)}." + ) + + config = config.model_copy() + + display_name = config.display_name + description = config.description + local_path = config.local_path + zipped_filesystem = config.zipped_filesystem + + if local_path and zipped_filesystem: + raise ValueError( + "Only one of `local_path` or `zipped_filesystem` can be provided in config." + ) + + # Construct update_mask and prepare payload + update_mask_paths = [] + zipped_filesystem_payload = None + + if display_name is not None: + update_mask_paths.append("displayName") + + if description is not None: + update_mask_paths.append("description") + + if local_path: + zipped_filesystem_payload = _skills_utils.get_zipped_filesystem_payload( + local_path + ) + update_mask_paths.append("zippedFilesystem") + elif zipped_filesystem is not None: + if isinstance(zipped_filesystem, bytes): + zipped_filesystem_payload = base64.b64encode(zipped_filesystem).decode( + "utf-8" + ) + else: + zipped_filesystem_payload = zipped_filesystem + update_mask_paths.append("zippedFilesystem") + + if not update_mask_paths: + raise ValueError( + "At least one of `display_name`, `description`, `local_path`, or " + "`zipped_filesystem` must be provided for update in config." + ) + + update_mask = ",".join(update_mask_paths) + + # Mutate config in place to populate the generated update_mask and zipped_filesystem + config.update_mask = update_mask + config.zipped_filesystem = zipped_filesystem_payload + + operation = self._update( + name=name, + config=config, + ) + + if config.wait_for_completion: + operation = _skills_utils.await_operation( + operation_name=operation.name, + get_operation_fn=self._get_skill_operation, + ) + if operation.error: + raise RuntimeError(f"Failed to update Skill: {operation.error}") + # Fetch the fully populated Skill resource from the server + return self.get(name=name) + + return operation + class AsyncSkills(_api_module.BaseModule): """Class for managing Skills in the Skill Registry.""" @@ -714,6 +918,77 @@ async def _create( self._api_client._verify_response(return_value) return return_value + async def _update( + self, *, name: str, config: Optional[types.UpdateSkillConfigOrDict] = None + ) -> types.SkillOperation: + """ + Updates a Skill. + """ + + parameter_model = types._UpdateSkillRequestParameters( + name=name, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError( + "This method is only supported in the Gemini Enterprise Agent Platform (previously known as Vertex AI) client." + ) + else: + request_dict = _UpdateSkillRequestParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}".format_map(request_url_dict) + else: + path = "{name}" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = await self._api_client.async_request( + "patch", path, request_dict, http_options + ) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.SkillOperation._from_response( + response=response_dict, + kwargs=( + { + "config": { + "response_schema": getattr( + parameter_model.config, "response_schema", None + ), + "response_json_schema": getattr( + parameter_model.config, "response_json_schema", None + ), + "include_all_fields": getattr( + parameter_model.config, "include_all_fields", None + ), + } + } + if getattr(parameter_model, "config", None) + else {} + ), + ) + + self._api_client._verify_response(return_value) + return return_value + async def _get_skill_operation( self, *, @@ -867,3 +1142,98 @@ async def create( return await self.get(name=operation.response.name) return operation + + async def update( + self, + *, + name: str, + config: Optional[types.UpdateSkillConfigOrDict] = None, + ) -> Union[types.Skill, types.SkillOperation]: + """Updates an existing Skill asynchronously. + + Args: + name (str): + Required. The resource name of the Skill to update. + Format: projects/{project}/locations/{location}/skills/{skill} + config (UpdateSkillConfigOrDict): + Optional. The configuration for updating the Skill. + + Returns: + Skill: The updated Skill if wait_for_completion is True. + SkillOperation: The operation for updating the Skill if + wait_for_completion is False. + """ + if config is None: + config = types.UpdateSkillConfig() + elif isinstance(config, dict): + config = types.UpdateSkillConfig.model_validate(config) + elif not isinstance(config, types.UpdateSkillConfig): + raise TypeError( + f"config must be a dict or UpdateSkillConfig, but got {type(config)}." + ) + + config = config.model_copy() + + display_name = config.display_name + description = config.description + local_path = config.local_path + zipped_filesystem = config.zipped_filesystem + + if local_path and zipped_filesystem: + raise ValueError( + "Only one of `local_path` or `zipped_filesystem` can be provided in config." + ) + + # Construct update_mask and prepare payload + update_mask_paths = [] + zipped_filesystem_payload = None + + if display_name is not None: + update_mask_paths.append("displayName") + + if description is not None: + update_mask_paths.append("description") + + if local_path: + loop = asyncio.get_running_loop() + zipped_filesystem_payload = await loop.run_in_executor( + None, _skills_utils.get_zipped_filesystem_payload, local_path + ) + update_mask_paths.append("zippedFilesystem") + elif zipped_filesystem is not None: + if isinstance(zipped_filesystem, bytes): + zipped_filesystem_payload = base64.b64encode(zipped_filesystem).decode( + "utf-8" + ) + else: + zipped_filesystem_payload = zipped_filesystem + update_mask_paths.append("zippedFilesystem") + + if not update_mask_paths: + raise ValueError( + "At least one of `display_name`, `description`, `local_path`, or " + "`zipped_filesystem` must be provided for update in config." + ) + + update_mask = ",".join(update_mask_paths) + + # Mutate config in place to populate the generated update_mask and zipped_filesystem + config.update_mask = update_mask + config.zipped_filesystem = zipped_filesystem_payload + + operation = await self._update( + name=name, + config=config, + ) + + if config.wait_for_completion: + operation = await _skills_utils.await_operation_async( + operation_name=operation.name, + get_operation_fn=self._get_skill_operation, + ) + if operation.error: + raise RuntimeError(f"Failed to update Skill: {operation.error}") + # Fetch the fully populated Skill resource asynchronously + return await self.get(name=name) + + return operation diff --git a/vertexai/_genai/types/__init__.py b/vertexai/_genai/types/__init__.py index 2e4acdba09..4182fbbd8c 100644 --- a/vertexai/_genai/types/__init__.py +++ b/vertexai/_genai/types/__init__.py @@ -128,6 +128,7 @@ from .common import _UpdateAgentEngineSessionRequestParameters from .common import _UpdateDatasetParameters from .common import _UpdateMultimodalDatasetParameters +from .common import _UpdateSkillRequestParameters from .common import A2aTask from .common import A2aTaskDict from .common import A2aTaskOrDict @@ -1404,6 +1405,9 @@ from .common import UpdatePromptConfig from .common import UpdatePromptConfigDict from .common import UpdatePromptConfigOrDict +from .common import UpdateSkillConfig +from .common import UpdateSkillConfigDict +from .common import UpdateSkillConfigOrDict from .common import VertexBaseConfig from .common import VertexBaseConfigDict from .common import VertexBaseConfigOrDict @@ -2528,6 +2532,9 @@ "SkillOperation", "SkillOperationDict", "SkillOperationOrDict", + "UpdateSkillConfig", + "UpdateSkillConfigDict", + "UpdateSkillConfigOrDict", "GetSkillOperationConfig", "GetSkillOperationConfigDict", "GetSkillOperationConfigOrDict", @@ -2771,6 +2778,7 @@ "_GetSkillRequestParameters", "_RetrieveSkillsRequestParameters", "_CreateSkillRequestParameters", + "_UpdateSkillRequestParameters", "_GetSkillOperationParameters", "evals", "agent_engines", diff --git a/vertexai/_genai/types/common.py b/vertexai/_genai/types/common.py index 57a1829ea6..6ce66a054c 100644 --- a/vertexai/_genai/types/common.py +++ b/vertexai/_genai/types/common.py @@ -18193,6 +18193,91 @@ class SkillOperationDict(TypedDict, total=False): SkillOperationOrDict = Union[SkillOperation, SkillOperationDict] +class UpdateSkillConfig(_common.BaseModel): + """Config for updating a skill.""" + + http_options: Optional[genai_types.HttpOptions] = Field( + default=None, description="""Used to override HTTP request options.""" + ) + wait_for_completion: Optional[bool] = Field( + default=True, + description="""Whether to wait for the long running operation to complete.""", + ) + local_path: Optional[str] = Field( + default=None, + description="""Optional. The local path to the directory containing the Skill to + be zipped and uploaded. + """, + ) + display_name: Optional[str] = Field( + default=None, description="""Optional. The display name of the Skill.""" + ) + description: Optional[str] = Field( + default=None, description="""Optional. The description of the Skill.""" + ) + zipped_filesystem: Optional[Any] = Field( + default=None, description="""Optional. The zipped filesystem of the Skill.""" + ) + update_mask: Optional[str] = Field( + default=None, description="""Optional. The update mask to apply.""" + ) + + +class UpdateSkillConfigDict(TypedDict, total=False): + """Config for updating a skill.""" + + http_options: Optional[genai_types.HttpOptionsDict] + """Used to override HTTP request options.""" + + wait_for_completion: Optional[bool] + """Whether to wait for the long running operation to complete.""" + + local_path: Optional[str] + """Optional. The local path to the directory containing the Skill to + be zipped and uploaded. + """ + + display_name: Optional[str] + """Optional. The display name of the Skill.""" + + description: Optional[str] + """Optional. The description of the Skill.""" + + zipped_filesystem: Optional[Any] + """Optional. The zipped filesystem of the Skill.""" + + update_mask: Optional[str] + """Optional. The update mask to apply.""" + + +UpdateSkillConfigOrDict = Union[UpdateSkillConfig, UpdateSkillConfigDict] + + +class _UpdateSkillRequestParameters(_common.BaseModel): + """Parameters for updating a skill.""" + + name: Optional[str] = Field( + default=None, + description="""Required. The resource name of the Skill to update.""", + ) + config: Optional[UpdateSkillConfig] = Field(default=None, description="""""") + + +class _UpdateSkillRequestParametersDict(TypedDict, total=False): + """Parameters for updating a skill.""" + + name: Optional[str] + """Required. The resource name of the Skill to update.""" + + config: Optional[UpdateSkillConfigDict] + """""" + + +_UpdateSkillRequestParametersOrDict = Union[ + _UpdateSkillRequestParameters, _UpdateSkillRequestParametersDict +] + + class GetSkillOperationConfig(_common.BaseModel): http_options: Optional[genai_types.HttpOptions] = Field(