From cf7162cd940e63d1037962c015ae510137792d3d Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Tue, 5 May 2026 11:58:56 -0700 Subject: [PATCH] feat: Add Update Skill method for Vertex AI Skill Registry SDK PiperOrigin-RevId: 910827738 --- .../genai/replays/test_skills_create.py | 78 ++ .../genai/replays/test_skills_update.py | 113 ++ vertexai/_genai/_skills_utils.py | 116 ++ vertexai/_genai/skills.py | 1078 +++++++++++++++-- vertexai/_genai/types/__init__.py | 30 + vertexai/_genai/types/common.py | 262 ++++ 6 files changed, 1579 insertions(+), 98 deletions(-) create mode 100644 tests/unit/vertexai/genai/replays/test_skills_create.py create mode 100644 tests/unit/vertexai/genai/replays/test_skills_update.py create mode 100644 vertexai/_genai/_skills_utils.py diff --git a/tests/unit/vertexai/genai/replays/test_skills_create.py b/tests/unit/vertexai/genai/replays/test_skills_create.py new file mode 100644 index 0000000000..a873f091ef --- /dev/null +++ b/tests/unit/vertexai/genai/replays/test_skills_create.py @@ -0,0 +1,78 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Tests the skills.create() method against the Vertex AI endpoint using replays.""" + +import io +import os +import tempfile +import zipfile + +from tests.unit.vertexai.genai.replays import pytest_helper +from vertexai._genai import types + +# MANDATORY: Initialize the replay test framework for this module +pytestmark = pytest_helper.setup( + file=__file__, + globals_for_file=globals(), +) + + +def test_create_skill(client): + """Tests the creation of a skill using `client.skills.create()`.""" + client._api_client._http_options.base_url = ( + "https://us-central1-aiplatform.googleapis.com" + ) + + with tempfile.TemporaryDirectory() as tmpdir: + # Create a dummy skill structure (SKILL.md is required by the spec) + with open(os.path.join(tmpdir, "SKILL.md"), "w") as f: + f.write("# My Replay Skill\nThis is a test skill for replay tests.") + + skill = client.skills.create( + display_name="My Replay Skill", + description="My Replay Skill Description", + config=types.CreateSkillConfig( + local_path=tmpdir, wait_for_completion=True + ), + ) + + assert skill.name is not None + assert skill.display_name == "My Replay Skill" + assert skill.description == "My Replay Skill Description" + + +def test_create_skill_with_prezipped_bytes(client): + """Tests the creation of a skill with pre-zipped bytes.""" + client._api_client._http_options.base_url = ( + "https://us-central1-aiplatform.googleapis.com" + ) + + zip_buffer = io.BytesIO() + zinfo = zipfile.ZipInfo("SKILL.md", date_time=(1980, 1, 1, 0, 0, 0)) + with zipfile.ZipFile(zip_buffer, "w") as zip_file: + zip_file.writestr(zinfo, "# My Zipped Replay Skill\nThis is a test.") + zipped_bytes = zip_buffer.getvalue() + + skill = client.skills.create( + display_name="My Zipped Replay Skill", + description="My Zipped Replay Skill Description", + config=types.CreateSkillConfig( + zipped_filesystem=zipped_bytes, wait_for_completion=True + ), + ) + + assert skill.name is not None + assert skill.display_name == "My Zipped Replay Skill" + assert skill.description == "My Zipped Replay Skill Description" diff --git a/tests/unit/vertexai/genai/replays/test_skills_update.py b/tests/unit/vertexai/genai/replays/test_skills_update.py new file mode 100644 index 0000000000..869403726a --- /dev/null +++ b/tests/unit/vertexai/genai/replays/test_skills_update.py @@ -0,0 +1,113 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Tests the skills.update() method against the Vertex AI endpoint using replays.""" + +import io +import os +import random +import tempfile +import zipfile + +from tests.unit.vertexai.genai.replays import pytest_helper +from vertexai._genai import types + +# MANDATORY: Initialize the replay test framework for this module +pytestmark = pytest_helper.setup( + file=__file__, + globals_for_file=globals(), +) + +PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT", "srbai-testing") +REGION = "us-central1" + + +def test_update_skill(client): + """Tests the update of a skill using `client.skills.update()` (metadata only).""" + # Target the autopush sandbox endpoint for the Skill Registry API + client._api_client._http_options.base_url = ( + "https://us-central1-autopush-aiplatform.sandbox.googleapis.com" + ) + + # 1. Create a fresh unique skill first + skill_id = f"update_meta_test_skill_{random.randint(10000, 99999)}" + with tempfile.TemporaryDirectory() as tmpdir: + with open(os.path.join(tmpdir, "SKILL.md"), "w") as f: + f.write("# Test Skill\nInitial content.") + + created_skill = client.skills.create( + display_name="Original Skill", + description="Original Description", + config=types.CreateSkillConfig( + local_path=tmpdir, skill_id=skill_id, wait_for_completion=True + ), + ) + + # 2. Perform the metadata-only update on the new skill + updated_skill = client.skills.update( + name=created_skill.name, + config=types.UpdateSkillConfig( + display_name="My Updated Replay Skill", + description="My Updated Replay Skill Description", + wait_for_completion=True, + ), + ) + + assert updated_skill.name == created_skill.name + assert updated_skill.display_name == "My Updated Replay Skill" + assert updated_skill.description == "My Updated Replay Skill Description" + + +def test_update_skill_with_zipped_bytes(client): + """Tests the update of a skill with zipped bytes filesystem.""" + # Target the autopush sandbox endpoint for the Skill Registry API + client._api_client._http_options.base_url = ( + "https://us-central1-autopush-aiplatform.sandbox.googleapis.com" + ) + + # 1. Create a fresh unique skill first + skill_id = f"update_zip_test_skill_{random.randint(10000, 99999)}" + with tempfile.TemporaryDirectory() as tmpdir: + with open(os.path.join(tmpdir, "SKILL.md"), "w") as f: + f.write("# Test Skill\nInitial content.") + + created_skill = client.skills.create( + display_name="Original Skill", + description="Original Description", + config=types.CreateSkillConfig( + local_path=tmpdir, skill_id=skill_id, wait_for_completion=True + ), + ) + + # 2. Prepare zipped bytes for update + zip_buffer = io.BytesIO() + zinfo = zipfile.ZipInfo("SKILL.md", date_time=(1980, 1, 1, 0, 0, 0)) + with zipfile.ZipFile(zip_buffer, "w") as zip_file: + zip_file.writestr( + zinfo, "# My Updated Zipped Replay Skill\nThis is updated." + ) + zipped_bytes = zip_buffer.getvalue() + + # 3. Update the skill with new zipped bytes + updated_skill = client.skills.update( + name=created_skill.name, + config=types.UpdateSkillConfig( + zipped_filesystem=zipped_bytes, wait_for_completion=True + ), + ) + + assert updated_skill.name == created_skill.name + assert ( + updated_skill.display_name == "Original Skill" + ) # Display name remains unchanged diff --git a/vertexai/_genai/_skills_utils.py b/vertexai/_genai/_skills_utils.py new file mode 100644 index 0000000000..bf686b5af2 --- /dev/null +++ b/vertexai/_genai/_skills_utils.py @@ -0,0 +1,116 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Utility functions for Skills.""" + +import asyncio +import base64 +import io +import os +import time +from typing import Any, Awaitable, Callable +import zipfile + + +def zip_directory(directory_path: str) -> bytes: + """Zips a directory into memory and returns the bytes. + + Args: + directory_path (str): Required. The local path to the directory. + + Returns: + bytes: The zipped directory content. + """ + if not os.path.isdir(directory_path): + raise ValueError(f"Path is not a directory: {directory_path}") + + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: + for root, _, files in os.walk(directory_path): + for file in files: + file_path = os.path.join(root, file) + arcname = os.path.relpath(file_path, directory_path) + + # Read actual file data + with open(file_path, "rb") as f: + file_data = f.read() + + # Use deterministic ZipInfo (mtime: 1980-01-01 00:00:00) + zinfo = zipfile.ZipInfo(arcname, date_time=(1980, 1, 1, 0, 0, 0)) + zinfo.compress_type = zipfile.ZIP_DEFLATED + zinfo.external_attr = 0o644 << 16 # Constant file permissions + + zip_file.writestr(zinfo, file_data) + return zip_buffer.getvalue() + + +def get_zipped_filesystem_payload(directory_path: str) -> str: + """Zips a directory and base64-encodes the result to a UTF-8 string. + + Args: + directory_path (str): Required. The local path to the directory. + + Returns: + str: The base64-encoded zipped directory. + """ + zip_bytes = zip_directory(directory_path) + return base64.b64encode(zip_bytes).decode("utf-8") + + +def await_operation( + *, + operation_name: str, + get_operation_fn: Callable[..., Any], + poll_interval_seconds: float = 10.0, +) -> Any: + """Waits for a long running operation to complete. + + Args: + operation_name (str): Required. The name of the operation. + get_operation_fn (Callable): Required. Function to get the operation + status. + poll_interval_seconds (float): The interval between polls in seconds. + + Returns: + Any: The completed operation. + """ + operation = get_operation_fn(operation_name=operation_name) + while not operation.done: + time.sleep(poll_interval_seconds) + operation = get_operation_fn(operation_name=operation.name) + return operation + + +async def await_operation_async( + *, + operation_name: str, + get_operation_fn: Callable[..., Awaitable[Any]], + poll_interval_seconds: float = 10.0, +) -> Any: + """Waits for a long running operation to complete asynchronously. + + Args: + operation_name (str): Required. The name of the operation. + get_operation_fn (Callable): Required. Async function to get the operation + status. + poll_interval_seconds (float): The interval between polls in seconds. + + Returns: + Any: The completed operation. + """ + operation = await get_operation_fn(operation_name=operation_name) + while not operation.done: + await asyncio.sleep(poll_interval_seconds) + operation = await get_operation_fn(operation_name=operation.name) + return operation diff --git a/vertexai/_genai/skills.py b/vertexai/_genai/skills.py index 1eb145afe7..f307d2b8c1 100644 --- a/vertexai/_genai/skills.py +++ b/vertexai/_genai/skills.py @@ -15,6 +15,8 @@ # Code generated by the Google Gen AI SDK generator DO NOT EDIT. +import asyncio +import base64 import json import logging from typing import Any, Optional, Union @@ -25,74 +27,169 @@ from google.genai._common import get_value_by_path as getv from google.genai._common import set_value_by_path as setv +from . import _skills_utils from . import types logger = logging.getLogger("vertexai_genai.skills") +def _CreateSkillConfig_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + + if getv(from_object, ["zipped_filesystem"]) is not None: + setv( + parent_object, + ["zippedFilesystem"], + getv(from_object, ["zipped_filesystem"]), + ) + + if getv(from_object, ["skill_id"]) is not None: + setv(parent_object, ["_query", "skillId"], getv(from_object, ["skill_id"])) + + return to_object + + +def _CreateSkillRequestParameters_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + if getv(from_object, ["display_name"]) is not None: + setv(to_object, ["displayName"], getv(from_object, ["display_name"])) + + if getv(from_object, ["description"]) is not None: + setv(to_object, ["description"], getv(from_object, ["description"])) + + if getv(from_object, ["config"]) is not None: + _CreateSkillConfig_to_vertex(getv(from_object, ["config"]), to_object) + + return to_object + + +def _GetSkillOperationParameters_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + if getv(from_object, ["operation_name"]) is not None: + setv( + to_object, + ["_url", "operationName"], + getv(from_object, ["operation_name"]), + ) + + return to_object + + def _GetSkillRequestParameters_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: - to_object: dict[str, Any] = {} - if getv(from_object, ["name"]) is not None: - setv(to_object, ["_url", "name"], getv(from_object, ["name"])) + to_object: dict[str, Any] = {} + if getv(from_object, ["name"]) is not None: + setv(to_object, ["_url", "name"], getv(from_object, ["name"])) - if getv(from_object, ["config"]) is not None: - setv(to_object, ["config"], getv(from_object, ["config"])) + if getv(from_object, ["config"]) is not None: + setv(to_object, ["config"], getv(from_object, ["config"])) - return to_object + return to_object + + +def _UpdateSkillConfig_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + + if getv(from_object, ["display_name"]) is not None: + setv(parent_object, ["displayName"], getv(from_object, ["display_name"])) + + if getv(from_object, ["description"]) is not None: + setv(parent_object, ["description"], getv(from_object, ["description"])) + + if getv(from_object, ["zipped_filesystem"]) is not None: + setv( + parent_object, + ["zippedFilesystem"], + getv(from_object, ["zipped_filesystem"]), + ) + + if getv(from_object, ["update_mask"]) is not None: + setv( + parent_object, + ["_query", "updateMask"], + getv(from_object, ["update_mask"]), + ) + + return to_object + + +def _UpdateSkillRequestParameters_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + if getv(from_object, ["name"]) is not None: + setv(to_object, ["_url", "name"], getv(from_object, ["name"])) + + if getv(from_object, ["config"]) is not None: + _UpdateSkillConfig_to_vertex(getv(from_object, ["config"]), to_object) + + return to_object class Skills(_api_module.BaseModule): - """Class for managing Skills in the Skill Registry.""" + """Class for managing Skills in the Skill Registry.""" - def get( + def get( self, *, name: str, config: Optional[types.GetSkillConfigOrDict] = None ) -> types.Skill: - """ + """ Gets a Skill. """ - parameter_model = types._GetSkillRequestParameters( + parameter_model = types._GetSkillRequestParameters( name=name, config=config, ) - request_url_dict: Optional[dict[str, str]] - if not self._api_client.vertexai: - raise ValueError( + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError( "This method is only supported in the Gemini Enterprise Agent Platform (previously known as Vertex AI) client." ) - else: - request_dict = _GetSkillRequestParameters_to_vertex(parameter_model) - request_url_dict = request_dict.get("_url") - if request_url_dict: - path = "{name}".format_map(request_url_dict) - else: - path = "{name}" - - query_params = request_dict.get("_query") - if query_params: - path = f"{path}?{urlencode(query_params)}" - # TODO: remove the hack that pops config. - request_dict.pop("config", None) - - http_options: Optional[types.HttpOptions] = None - if ( + else: + request_dict = _GetSkillRequestParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}".format_map(request_url_dict) + else: + path = "{name}" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( parameter_model.config is not None and parameter_model.config.http_options is not None ): - http_options = parameter_model.config.http_options + http_options = parameter_model.config.http_options - request_dict = _common.convert_to_dict(request_dict) - request_dict = _common.encode_unserializable_types(request_dict) + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) - response = self._api_client.request("get", path, request_dict, http_options) + response = self._api_client.request("get", path, request_dict, http_options) - response_dict = {} if not response.body else json.loads(response.body) + response_dict = {} if not response.body else json.loads(response.body) - return_value = types.Skill._from_response( + return_value = types.Skill._from_response( response=response_dict, kwargs=( { @@ -113,80 +210,865 @@ def get( ), ) - self._api_client._verify_response(return_value) - return return_value + self._api_client._verify_response(return_value) + return return_value + def _create( + self, + *, + display_name: str, + description: str, + config: Optional[types.CreateSkillConfigOrDict] = None, + ) -> types.SkillOperation: + """Creates a new Skill.""" -class AsyncSkills(_api_module.BaseModule): - """Class for managing Skills in the Skill Registry.""" + parameter_model = types._CreateSkillRequestParameters( + display_name=display_name, + description=description, + config=config, + ) - async def get( - self, *, name: str, config: Optional[types.GetSkillConfigOrDict] = None - ) -> types.Skill: - """ - Gets a Skill. - """ + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError( + "This method is only supported in the Gemini Enterprise Agent" + " Platform (previously known as Vertex AI) client." + ) + else: + request_dict = _CreateSkillRequestParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "skills".format_map(request_url_dict) + else: + path = "skills" - parameter_model = types._GetSkillRequestParameters( - name=name, - config=config, + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = self._api_client.request( + "post", path, request_dict, http_options + ) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.SkillOperation._from_response( + response=response_dict, + kwargs=( + { + "config": { + "response_schema": getattr( + parameter_model.config, "response_schema", None + ), + "response_json_schema": getattr( + parameter_model.config, "response_json_schema", None + ), + "include_all_fields": getattr( + parameter_model.config, "include_all_fields", None + ), + } + } + if getattr(parameter_model, "config", None) + else {} + ), + ) + + self._api_client._verify_response(return_value) + return return_value + + def _update( + self, *, name: str, config: Optional[types.UpdateSkillConfigOrDict] = None + ) -> types.SkillOperation: + """Updates a Skill.""" + + parameter_model = types._UpdateSkillRequestParameters( + name=name, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError( + "This method is only supported in the Gemini Enterprise Agent" + " Platform (previously known as Vertex AI) client." + ) + else: + request_dict = _UpdateSkillRequestParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}".format_map(request_url_dict) + else: + path = "{name}" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = self._api_client.request( + "patch", path, request_dict, http_options + ) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.SkillOperation._from_response( + response=response_dict, + kwargs=( + { + "config": { + "response_schema": getattr( + parameter_model.config, "response_schema", None + ), + "response_json_schema": getattr( + parameter_model.config, "response_json_schema", None + ), + "include_all_fields": getattr( + parameter_model.config, "include_all_fields", None + ), + } + } + if getattr(parameter_model, "config", None) + else {} + ), + ) + + self._api_client._verify_response(return_value) + return return_value + + def _get_skill_operation( + self, + *, + operation_name: str, + config: Optional[types.GetSkillOperationConfigOrDict] = None, + ) -> types.SkillOperation: + parameter_model = types._GetSkillOperationParameters( + operation_name=operation_name, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError( + "This method is only supported in the Gemini Enterprise Agent" + " Platform (previously known as Vertex AI) client." + ) + else: + request_dict = _GetSkillOperationParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{operationName}".format_map(request_url_dict) + else: + path = "{operationName}" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = self._api_client.request("get", path, request_dict, http_options) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.SkillOperation._from_response( + response=response_dict, + kwargs=( + { + "config": { + "response_schema": getattr( + parameter_model.config, "response_schema", None + ), + "response_json_schema": getattr( + parameter_model.config, "response_json_schema", None + ), + "include_all_fields": getattr( + parameter_model.config, "include_all_fields", None + ), + } + } + if getattr(parameter_model, "config", None) + else {} + ), + ) + + self._api_client._verify_response(return_value) + return return_value + + def create( + self, + *, + display_name: str, + description: str, + config: Optional[types.CreateSkillConfigOrDict] = None, + ) -> Union[types.Skill, types.SkillOperation]: + """Creates a new Skill. + + Args: + display_name (str): Required. The display name of the Skill. + description (str): Required. The description of the Skill. + config (CreateSkillConfigOrDict): Optional. The configuration for + creating the Skill. + + Returns: + Skill: The created Skill if wait_for_completion is True. + SkillOperation: The operation for creating the Skill if + wait_for_completion is False. + """ + if config is None: + config = types.CreateSkillConfig() + elif isinstance(config, dict): + config = types.CreateSkillConfig.model_validate(config) + elif not isinstance(config, types.CreateSkillConfig): + raise TypeError( + f"config must be a dict or CreateSkillConfig, but got {type(config)}." + ) + + config = config.model_copy() + + local_path = config.local_path + zipped_filesystem = config.zipped_filesystem + + if local_path and zipped_filesystem: + raise ValueError( + "Only one of `local_path` or `zipped_filesystem` can be provided in" + " config." + ) + if not local_path and not zipped_filesystem: + raise ValueError( + "Either `local_path` or `zipped_filesystem` must be provided in" + " config." + ) + + if local_path: + zipped_filesystem_payload = _skills_utils.get_zipped_filesystem_payload( + local_path + ) + else: + # Narrow type for mypy + if zipped_filesystem is None: + raise ValueError( + "zipped_filesystem is required if local_path is not provided." + ) + if isinstance(zipped_filesystem, bytes): + zipped_filesystem_payload = base64.b64encode(zipped_filesystem).decode( + "utf-8" ) + else: + zipped_filesystem_payload = zipped_filesystem - request_url_dict: Optional[dict[str, str]] - if not self._api_client.vertexai: - raise ValueError( - "This method is only supported in the Gemini Enterprise Agent Platform (previously known as Vertex AI) client." - ) - else: - request_dict = _GetSkillRequestParameters_to_vertex(parameter_model) - request_url_dict = request_dict.get("_url") - if request_url_dict: - path = "{name}".format_map(request_url_dict) - else: - path = "{name}" - - query_params = request_dict.get("_query") - if query_params: - path = f"{path}?{urlencode(query_params)}" - # TODO: remove the hack that pops config. - request_dict.pop("config", None) - - http_options: Optional[types.HttpOptions] = None - if ( - parameter_model.config is not None - and parameter_model.config.http_options is not None - ): - http_options = parameter_model.config.http_options + # Mutate the config object to populate the zipped_filesystem payload + config.zipped_filesystem = zipped_filesystem_payload + + operation = self._create( + display_name=display_name, + description=description, + config=config, + ) + + if config.wait_for_completion: + operation = _skills_utils.await_operation( + operation_name=operation.name, + get_operation_fn=self._get_skill_operation, + ) + if operation.error: + raise RuntimeError(f"Failed to create Skill: {operation.error}") + # Fetch the fully populated Skill resource from the server + return self.get(name=operation.response.name) + + return operation + + def update( + self, + *, + name: str, + config: Optional[types.UpdateSkillConfigOrDict] = None, + ) -> Union[types.Skill, types.SkillOperation]: + """Updates an existing Skill. + + Args: + name (str): Required. The resource name of the Skill to update. + Format: projects/{project}/locations/{location}/skills/{skill} + config (UpdateSkillConfigOrDict): Optional. The configuration for + updating the Skill. + + Returns: + Skill: The updated Skill if wait_for_completion is True. + SkillOperation: The operation for updating the Skill if + wait_for_completion is False. + """ + if config is None: + config = types.UpdateSkillConfig() + elif isinstance(config, dict): + config = types.UpdateSkillConfig.model_validate(config) + elif not isinstance(config, types.UpdateSkillConfig): + raise TypeError( + f"config must be a dict or UpdateSkillConfig, but got {type(config)}." + ) + + config = config.model_copy() - request_dict = _common.convert_to_dict(request_dict) - request_dict = _common.encode_unserializable_types(request_dict) + display_name = config.display_name + description = config.description + local_path = config.local_path + zipped_filesystem = config.zipped_filesystem - response = await self._api_client.async_request( - "get", path, request_dict, http_options + if local_path and zipped_filesystem: + raise ValueError( + "Only one of `local_path` or `zipped_filesystem` can be provided in" + " config." + ) + + # Construct update_mask and prepare payload + update_mask_paths = [] + zipped_filesystem_payload = None + + if display_name is not None: + update_mask_paths.append("displayName") + + if description is not None: + update_mask_paths.append("description") + + if local_path: + zipped_filesystem_payload = _skills_utils.get_zipped_filesystem_payload( + local_path + ) + update_mask_paths.append("zippedFilesystem") + elif zipped_filesystem is not None: + if isinstance(zipped_filesystem, bytes): + zipped_filesystem_payload = base64.b64encode(zipped_filesystem).decode( + "utf-8" ) + else: + zipped_filesystem_payload = zipped_filesystem + update_mask_paths.append("zippedFilesystem") - response_dict = {} if not response.body else json.loads(response.body) + if not update_mask_paths: + raise ValueError( + "At least one of `display_name`, `description`, `local_path`, or " + "`zipped_filesystem` must be provided for update in config." + ) - return_value = types.Skill._from_response( - response=response_dict, - kwargs=( - { - "config": { - "response_schema": getattr( - parameter_model.config, "response_schema", None - ), - "response_json_schema": getattr( - parameter_model.config, "response_json_schema", None - ), - "include_all_fields": getattr( - parameter_model.config, "include_all_fields", None - ), - } + update_mask = ",".join(update_mask_paths) + + # Mutate config in place to populate the generated update_mask and zipped_filesystem + config.update_mask = update_mask + config.zipped_filesystem = zipped_filesystem_payload + + operation = self._update( + name=name, + config=config, + ) + + if config.wait_for_completion: + operation = _skills_utils.await_operation( + operation_name=operation.name, + get_operation_fn=self._get_skill_operation, + ) + if operation.error: + raise RuntimeError(f"Failed to update Skill: {operation.error}") + # Fetch the fully populated Skill resource from the server + return self.get(name=name) + + return operation + + +class AsyncSkills(_api_module.BaseModule): + """Class for managing Skills in the Skill Registry.""" + + async def get( + self, *, name: str, config: Optional[types.GetSkillConfigOrDict] = None + ) -> types.Skill: + """Gets a Skill.""" + + parameter_model = types._GetSkillRequestParameters( + name=name, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError( + "This method is only supported in the Gemini Enterprise Agent" + " Platform (previously known as Vertex AI) client." + ) + else: + request_dict = _GetSkillRequestParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}".format_map(request_url_dict) + else: + path = "{name}" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = await self._api_client.async_request( + "get", path, request_dict, http_options + ) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.Skill._from_response( + response=response_dict, + kwargs=( + { + "config": { + "response_schema": getattr( + parameter_model.config, "response_schema", None + ), + "response_json_schema": getattr( + parameter_model.config, "response_json_schema", None + ), + "include_all_fields": getattr( + parameter_model.config, "include_all_fields", None + ), } - if getattr(parameter_model, "config", None) - else {} - ), + } + if getattr(parameter_model, "config", None) + else {} + ), + ) + + self._api_client._verify_response(return_value) + return return_value + + async def _create( + self, + *, + display_name: str, + description: str, + config: Optional[types.CreateSkillConfigOrDict] = None, + ) -> types.SkillOperation: + """Creates a new Skill.""" + + parameter_model = types._CreateSkillRequestParameters( + display_name=display_name, + description=description, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError( + "This method is only supported in the Gemini Enterprise Agent" + " Platform (previously known as Vertex AI) client." + ) + else: + request_dict = _CreateSkillRequestParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "skills".format_map(request_url_dict) + else: + path = "skills" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = await self._api_client.async_request( + "post", path, request_dict, http_options + ) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.SkillOperation._from_response( + response=response_dict, + kwargs=( + { + "config": { + "response_schema": getattr( + parameter_model.config, "response_schema", None + ), + "response_json_schema": getattr( + parameter_model.config, "response_json_schema", None + ), + "include_all_fields": getattr( + parameter_model.config, "include_all_fields", None + ), + } + } + if getattr(parameter_model, "config", None) + else {} + ), + ) + + self._api_client._verify_response(return_value) + return return_value + + async def _update( + self, *, name: str, config: Optional[types.UpdateSkillConfigOrDict] = None + ) -> types.SkillOperation: + """Updates a Skill.""" + + parameter_model = types._UpdateSkillRequestParameters( + name=name, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError( + "This method is only supported in the Gemini Enterprise Agent" + " Platform (previously known as Vertex AI) client." + ) + else: + request_dict = _UpdateSkillRequestParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}".format_map(request_url_dict) + else: + path = "{name}" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = await self._api_client.async_request( + "patch", path, request_dict, http_options + ) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.SkillOperation._from_response( + response=response_dict, + kwargs=( + { + "config": { + "response_schema": getattr( + parameter_model.config, "response_schema", None + ), + "response_json_schema": getattr( + parameter_model.config, "response_json_schema", None + ), + "include_all_fields": getattr( + parameter_model.config, "include_all_fields", None + ), + } + } + if getattr(parameter_model, "config", None) + else {} + ), + ) + + self._api_client._verify_response(return_value) + return return_value + + async def _get_skill_operation( + self, + *, + operation_name: str, + config: Optional[types.GetSkillOperationConfigOrDict] = None, + ) -> types.SkillOperation: + parameter_model = types._GetSkillOperationParameters( + operation_name=operation_name, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError( + "This method is only supported in the Gemini Enterprise Agent" + " Platform (previously known as Vertex AI) client." + ) + else: + request_dict = _GetSkillOperationParameters_to_vertex(parameter_model) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{operationName}".format_map(request_url_dict) + else: + path = "{operationName}" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = await self._api_client.async_request( + "get", path, request_dict, http_options + ) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.SkillOperation._from_response( + response=response_dict, + kwargs=( + { + "config": { + "response_schema": getattr( + parameter_model.config, "response_schema", None + ), + "response_json_schema": getattr( + parameter_model.config, "response_json_schema", None + ), + "include_all_fields": getattr( + parameter_model.config, "include_all_fields", None + ), + } + } + if getattr(parameter_model, "config", None) + else {} + ), + ) + + self._api_client._verify_response(return_value) + return return_value + + async def create( + self, + *, + display_name: str, + description: str, + config: Optional[types.CreateSkillConfigOrDict] = None, + ) -> Union[types.Skill, types.SkillOperation]: + """Creates a new Skill asynchronously. + + Args: + display_name (str): Required. The display name of the Skill. + description (str): Required. The description of the Skill. + config (CreateSkillConfigOrDict): Optional. The configuration for + creating the Skill. + + Returns: + Skill: The created Skill if wait_for_completion is True. + SkillOperation: The operation for creating the Skill if + wait_for_completion is False. + """ + if config is None: + config = types.CreateSkillConfig() + elif isinstance(config, dict): + config = types.CreateSkillConfig.model_validate(config) + elif not isinstance(config, types.CreateSkillConfig): + raise TypeError( + f"config must be a dict or CreateSkillConfig, but got {type(config)}." + ) + + config = config.model_copy() + + local_path = config.local_path + zipped_filesystem = config.zipped_filesystem + + if local_path and zipped_filesystem: + raise ValueError( + "Only one of `local_path` or `zipped_filesystem` can be provided in" + " config." + ) + if not local_path and not zipped_filesystem: + raise ValueError( + "Either `local_path` or `zipped_filesystem` must be provided in" + " config." + ) + + if local_path: + loop = asyncio.get_running_loop() + zipped_filesystem_payload = await loop.run_in_executor( + None, _skills_utils.get_zipped_filesystem_payload, local_path + ) + else: + # Narrow type for mypy + if zipped_filesystem is None: + raise ValueError( + "zipped_filesystem is required if local_path is not provided." + ) + if isinstance(zipped_filesystem, bytes): + zipped_filesystem_payload = base64.b64encode(zipped_filesystem).decode( + "utf-8" + ) + else: + zipped_filesystem_payload = zipped_filesystem + + # Mutate the config object to populate the zipped_filesystem payload + config.zipped_filesystem = zipped_filesystem_payload + + operation = await self._create( + display_name=display_name, + description=description, + config=config, + ) + + if config.wait_for_completion: + operation = await _skills_utils.await_operation_async( + operation_name=operation.name, + get_operation_fn=self._get_skill_operation, + ) + if operation.error: + raise RuntimeError(f"Failed to create Skill: {operation.error}") + # Fetch the fully populated Skill resource asynchronously + return await self.get(name=operation.response.name) + + return operation + + async def update( + self, + *, + name: str, + config: Optional[types.UpdateSkillConfigOrDict] = None, + ) -> Union[types.Skill, types.SkillOperation]: + """Updates an existing Skill asynchronously. + + Args: + name (str): Required. The resource name of the Skill to update. + Format: projects/{project}/locations/{location}/skills/{skill} + config (UpdateSkillConfigOrDict): Optional. The configuration for + updating the Skill. + + Returns: + Skill: The updated Skill if wait_for_completion is True. + SkillOperation: The operation for updating the Skill if + wait_for_completion is False. + """ + if config is None: + config = types.UpdateSkillConfig() + elif isinstance(config, dict): + config = types.UpdateSkillConfig.model_validate(config) + elif not isinstance(config, types.UpdateSkillConfig): + raise TypeError( + f"config must be a dict or UpdateSkillConfig, but got {type(config)}." + ) + + config = config.model_copy() + + display_name = config.display_name + description = config.description + local_path = config.local_path + zipped_filesystem = config.zipped_filesystem + + if local_path and zipped_filesystem: + raise ValueError( + "Only one of `local_path` or `zipped_filesystem` can be provided in" + " config." + ) + + # Construct update_mask and prepare payload + update_mask_paths = [] + zipped_filesystem_payload = None + + if display_name is not None: + update_mask_paths.append("displayName") + + if description is not None: + update_mask_paths.append("description") + + if local_path: + loop = asyncio.get_running_loop() + zipped_filesystem_payload = await loop.run_in_executor( + None, _skills_utils.get_zipped_filesystem_payload, local_path + ) + update_mask_paths.append("zippedFilesystem") + elif zipped_filesystem is not None: + if isinstance(zipped_filesystem, bytes): + zipped_filesystem_payload = base64.b64encode(zipped_filesystem).decode( + "utf-8" ) + else: + zipped_filesystem_payload = zipped_filesystem + update_mask_paths.append("zippedFilesystem") + + if not update_mask_paths: + raise ValueError( + "At least one of `display_name`, `description`, `local_path`, or " + "`zipped_filesystem` must be provided for update in config." + ) + + update_mask = ",".join(update_mask_paths) + + # Mutate config in place to populate the generated update_mask and zipped_filesystem + config.update_mask = update_mask + config.zipped_filesystem = zipped_filesystem_payload + + operation = await self._update( + name=name, + config=config, + ) + + if config.wait_for_completion: + operation = await _skills_utils.await_operation_async( + operation_name=operation.name, + get_operation_fn=self._get_skill_operation, + ) + if operation.error: + raise RuntimeError(f"Failed to update Skill: {operation.error}") + # Fetch the fully populated Skill resource asynchronously + return await self.get(name=name) - self._api_client._verify_response(return_value) - return return_value + return operation diff --git a/vertexai/_genai/types/__init__.py b/vertexai/_genai/types/__init__.py index baeb22b20b..d4545f98b4 100644 --- a/vertexai/_genai/types/__init__.py +++ b/vertexai/_genai/types/__init__.py @@ -42,6 +42,7 @@ from .common import _CreateMultimodalDatasetParameters from .common import _CreateSandboxEnvironmentSnapshotRequestParameters from .common import _CreateSandboxEnvironmentTemplateRequestParameters +from .common import _CreateSkillRequestParameters from .common import _CustomJobParameters from .common import _CustomJobParameters from .common import _DeleteAgentEngineMemoryRequestParameters @@ -90,6 +91,7 @@ from .common import _GetSandboxEnvironmentSnapshotRequestParameters from .common import _GetSandboxEnvironmentTemplateOperationParameters from .common import _GetSandboxEnvironmentTemplateRequestParameters +from .common import _GetSkillOperationParameters from .common import _GetSkillRequestParameters from .common import _IngestEventsRequestParameters from .common import _ListAgentEngineMemoryRequestParameters @@ -125,6 +127,7 @@ from .common import _UpdateAgentEngineSessionRequestParameters from .common import _UpdateDatasetParameters from .common import _UpdateMultimodalDatasetParameters +from .common import _UpdateSkillRequestParameters from .common import A2aTask from .common import A2aTaskDict from .common import A2aTaskOrDict @@ -299,6 +302,9 @@ from .common import CreateSandboxEnvironmentTemplateConfig from .common import CreateSandboxEnvironmentTemplateConfigDict from .common import CreateSandboxEnvironmentTemplateConfigOrDict +from .common import CreateSkillConfig +from .common import CreateSkillConfigDict +from .common import CreateSkillConfigOrDict from .common import CustomCodeExecutionSpec from .common import CustomCodeExecutionSpecDict from .common import CustomCodeExecutionSpecOrDict @@ -612,6 +618,9 @@ from .common import GetSkillConfig from .common import GetSkillConfigDict from .common import GetSkillConfigOrDict +from .common import GetSkillOperationConfig +from .common import GetSkillOperationConfigDict +from .common import GetSkillOperationConfigOrDict from .common import IdentityType from .common import Importance from .common import IngestEventsConfig @@ -1249,6 +1258,9 @@ from .common import SessionOrDict from .common import Skill from .common import SkillDict +from .common import SkillOperation +from .common import SkillOperationDict +from .common import SkillOperationOrDict from .common import SkillOrDict from .common import SkillState from .common import State @@ -1383,6 +1395,9 @@ from .common import UpdatePromptConfig from .common import UpdatePromptConfigDict from .common import UpdatePromptConfigOrDict +from .common import UpdateSkillConfig +from .common import UpdateSkillConfigDict +from .common import UpdateSkillConfigOrDict from .common import VertexBaseConfig from .common import VertexBaseConfigDict from .common import VertexBaseConfigOrDict @@ -2492,6 +2507,18 @@ "Skill", "SkillDict", "SkillOrDict", + "CreateSkillConfig", + "CreateSkillConfigDict", + "CreateSkillConfigOrDict", + "SkillOperation", + "SkillOperationDict", + "SkillOperationOrDict", + "UpdateSkillConfig", + "UpdateSkillConfigDict", + "UpdateSkillConfigOrDict", + "GetSkillOperationConfig", + "GetSkillOperationConfigDict", + "GetSkillOperationConfigOrDict", "PromptOptimizerConfig", "PromptOptimizerConfigDict", "PromptOptimizerConfigOrDict", @@ -2730,6 +2757,9 @@ "_GetCustomJobParameters", "_OptimizeRequestParameters", "_GetSkillRequestParameters", + "_CreateSkillRequestParameters", + "_UpdateSkillRequestParameters", + "_GetSkillOperationParameters", "evals", "agent_engines", "prompts", diff --git a/vertexai/_genai/types/common.py b/vertexai/_genai/types/common.py index 8f5532d721..1f6b786add 100644 --- a/vertexai/_genai/types/common.py +++ b/vertexai/_genai/types/common.py @@ -17933,6 +17933,268 @@ class SkillDict(TypedDict, total=False): SkillOrDict = Union[Skill, SkillDict] +class CreateSkillConfig(_common.BaseModel): + """Config for creating a skill.""" + + http_options: Optional[genai_types.HttpOptions] = Field( + default=None, description="""Used to override HTTP request options.""" + ) + wait_for_completion: Optional[bool] = Field( + default=True, + description="""Whether to wait for the long running operation to complete.""", + ) + local_path: Optional[str] = Field( + default=None, + description="""Optional. The local path to the directory containing the Skill to + be zipped and uploaded. + """, + ) + zipped_filesystem: Optional[Any] = Field( + default=None, + description="""Optional. The zipped filesystem of the Skill.""", + ) + skill_id: Optional[str] = Field( + default=None, + description="""Optional. The ID to use for the Skill, which will become the final + component of the Skill's resource name. + """, + ) + + +class CreateSkillConfigDict(TypedDict, total=False): + """Config for creating a skill.""" + + http_options: Optional[genai_types.HttpOptionsDict] + """Used to override HTTP request options.""" + + wait_for_completion: Optional[bool] + """Whether to wait for the long running operation to complete.""" + + local_path: Optional[str] + """Optional. The local path to the directory containing the Skill to + be zipped and uploaded. + """ + + zipped_filesystem: Optional[Any] + """Optional. The zipped filesystem of the Skill.""" + + skill_id: Optional[str] + """Optional. The ID to use for the Skill, which will become the final + component of the Skill's resource name. + """ + + +CreateSkillConfigOrDict = Union[CreateSkillConfig, CreateSkillConfigDict] + + +class _CreateSkillRequestParameters(_common.BaseModel): + """Parameters for creating a skill.""" + + display_name: Optional[str] = Field( + default=None, description="""Required. The display name of the Skill.""" + ) + description: Optional[str] = Field( + default=None, description="""Required. The description of the Skill.""" + ) + config: Optional[CreateSkillConfig] = Field(default=None, description="""""") + + +class _CreateSkillRequestParametersDict(TypedDict, total=False): + """Parameters for creating a skill.""" + + display_name: Optional[str] + """Required. The display name of the Skill.""" + + description: Optional[str] + """Required. The description of the Skill.""" + + config: Optional[CreateSkillConfigDict] + """""" + + +_CreateSkillRequestParametersOrDict = Union[ + _CreateSkillRequestParameters, _CreateSkillRequestParametersDict +] + + +class SkillOperation(_common.BaseModel): + """Operation that has a skill as a response.""" + + name: Optional[str] = Field( + default=None, + description="""The server-assigned name, which is only unique within the same service that originally returns it. If you use the default HTTP mapping, the `name` should be a resource name ending with `operations/{unique_id}`.""", + ) + metadata: Optional[dict[str, Any]] = Field( + default=None, + description="""Service-specific metadata associated with the operation. It typically contains progress information and common metadata such as create time. Some services might not provide such metadata. Any method that returns a long-running operation should document the metadata type, if any.""", + ) + done: Optional[bool] = Field( + default=None, + description="""If the value is `false`, it means the operation is still in progress. If `true`, the operation is completed, and either `error` or `response` is available.""", + ) + error: Optional[dict[str, Any]] = Field( + default=None, + description="""The error result of the operation in case of failure or cancellation.""", + ) + response: Optional[Skill] = Field( + default=None, description="""The created Skill.""" + ) + + +class SkillOperationDict(TypedDict, total=False): + """Operation that has a skill as a response.""" + + name: Optional[str] + """The server-assigned name, which is only unique within the same service that originally returns it. If you use the default HTTP mapping, the `name` should be a resource name ending with `operations/{unique_id}`.""" + + metadata: Optional[dict[str, Any]] + """Service-specific metadata associated with the operation. It typically contains progress information and common metadata such as create time. Some services might not provide such metadata. Any method that returns a long-running operation should document the metadata type, if any.""" + + done: Optional[bool] + """If the value is `false`, it means the operation is still in progress. If `true`, the operation is completed, and either `error` or `response` is available.""" + + error: Optional[dict[str, Any]] + """The error result of the operation in case of failure or cancellation.""" + + response: Optional[SkillDict] + """The created Skill.""" + + +SkillOperationOrDict = Union[SkillOperation, SkillOperationDict] + + +class UpdateSkillConfig(_common.BaseModel): + """Config for updating a skill.""" + + http_options: Optional[genai_types.HttpOptions] = Field( + default=None, description="""Used to override HTTP request options.""" + ) + wait_for_completion: Optional[bool] = Field( + default=True, + description="""Whether to wait for the long running operation to complete.""", + ) + local_path: Optional[str] = Field( + default=None, + description="""Optional. The local path to the directory containing the Skill to + be zipped and uploaded. + """, + ) + display_name: Optional[str] = Field( + default=None, description="""Optional. The display name of the Skill.""" + ) + description: Optional[str] = Field( + default=None, description="""Optional. The description of the Skill.""" + ) + zipped_filesystem: Optional[Any] = Field( + default=None, + description="""Optional. The zipped filesystem of the Skill.""", + ) + update_mask: Optional[str] = Field( + default=None, description="""Optional. The update mask to apply.""" + ) + + +class UpdateSkillConfigDict(TypedDict, total=False): + """Config for updating a skill.""" + + http_options: Optional[genai_types.HttpOptionsDict] + """Used to override HTTP request options.""" + + wait_for_completion: Optional[bool] + """Whether to wait for the long running operation to complete.""" + + local_path: Optional[str] + """Optional. The local path to the directory containing the Skill to + be zipped and uploaded. + """ + + display_name: Optional[str] + """Optional. The display name of the Skill.""" + + description: Optional[str] + """Optional. The description of the Skill.""" + + zipped_filesystem: Optional[Any] + """Optional. The zipped filesystem of the Skill.""" + + update_mask: Optional[str] + """Optional. The update mask to apply.""" + + +UpdateSkillConfigOrDict = Union[UpdateSkillConfig, UpdateSkillConfigDict] + + +class _UpdateSkillRequestParameters(_common.BaseModel): + """Parameters for updating a skill.""" + + name: Optional[str] = Field( + default=None, + description="""Required. The resource name of the Skill to update.""", + ) + config: Optional[UpdateSkillConfig] = Field(default=None, description="""""") + + +class _UpdateSkillRequestParametersDict(TypedDict, total=False): + """Parameters for updating a skill.""" + + name: Optional[str] + """Required. The resource name of the Skill to update.""" + + config: Optional[UpdateSkillConfigDict] + """""" + + +_UpdateSkillRequestParametersOrDict = Union[ + _UpdateSkillRequestParameters, _UpdateSkillRequestParametersDict +] + + +class GetSkillOperationConfig(_common.BaseModel): + + http_options: Optional[genai_types.HttpOptions] = Field( + default=None, description="""Used to override HTTP request options.""" + ) + + +class GetSkillOperationConfigDict(TypedDict, total=False): + + http_options: Optional[genai_types.HttpOptionsDict] + """Used to override HTTP request options.""" + + +GetSkillOperationConfigOrDict = Union[ + GetSkillOperationConfig, GetSkillOperationConfigDict +] + + +class _GetSkillOperationParameters(_common.BaseModel): + """Parameters for getting an operation.""" + + operation_name: Optional[str] = Field( + default=None, + description="""The server-assigned name for the operation.""", + ) + config: Optional[GetSkillOperationConfig] = Field( + default=None, + description="""Used to override the default configuration.""", + ) + + +class _GetSkillOperationParametersDict(TypedDict, total=False): + """Parameters for getting an operation.""" + + operation_name: Optional[str] + """The server-assigned name for the operation.""" + + config: Optional[GetSkillOperationConfigDict] + """Used to override the default configuration.""" + + +_GetSkillOperationParametersOrDict = Union[ + _GetSkillOperationParameters, _GetSkillOperationParametersDict +] + + class PromptOptimizerConfig(_common.BaseModel): """VAPO Prompt Optimizer Config."""