Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions src/uipath/_cli/_utils/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from urllib.parse import urlparse

import click
from pydantic import TypeAdapter

from uipath.platform.common import UiPathConfig

from ..._utils._bindings import ResourceOverwrite, ResourceOverwriteParser
from ..._utils.constants import ENV_UIPATH_ACCESS_TOKEN
from ..models.runtime_schema import Bindings
from ..spinner import Spinner
from ._console import ConsoleLogger
from ._studio_project import (
Expand Down Expand Up @@ -180,28 +182,53 @@ async def may_override_files(
)


def extract_binding_keys(bindings_file_content: str) -> list[str]:
bindings_list: list[str] = []
bindings = TypeAdapter(Bindings).validate_python(json.loads(bindings_file_content))
for resource in bindings.resources:
bindings_list.append(f"{resource.resource}.{resource.key}")
return bindings_list


def fill_missing_binding_keys(
overwrites: dict[str, ResourceOverwrite | None], binding_keys: list[str]
):
"""Ensure all binding keys exist in the overwrites dict, defaulting to None."""
for binding_key in binding_keys:
if binding_key not in overwrites:
overwrites[binding_key] = None


async def read_resource_overwrites_from_file(
directory_path: str | None = None,
) -> dict[str, ResourceOverwrite]:
) -> dict[str, ResourceOverwrite | None]:
"""Read resource overwrites from a JSON file."""
config_file_name = UiPathConfig.config_file_name
if directory_path is not None:
file_path = Path(f"{directory_path}/{config_file_name}")
file_path = Path(directory_path) / config_file_name
else:
file_path = Path(f"{config_file_name}")
file_path = Path(config_file_name)

overwrites_dict = {}
overwrites_dict: dict[str, ResourceOverwrite | None] = {}

try:
# read configured bindings
with open(UiPathConfig.bindings_file_path, "r") as f:
bindings_list = extract_binding_keys(f.read())

# read overwrites
with open(file_path, "r") as f:
data = json.load(f)
resource_overwrites = (
data.get("runtime", {})
.get("internalArguments", {})
.get("resourceOverwrites", {})
)
for key, value in resource_overwrites.items():
overwrites_dict[key] = ResourceOverwriteParser.parse(key, value)
for key, value in resource_overwrites.items():
overwrites_dict[key] = ResourceOverwriteParser.parse(key, value)

fill_missing_binding_keys(overwrites_dict, bindings_list)
return overwrites_dict

# Return empty dict if file doesn't exist or invalid json
except FileNotFoundError:
Expand Down
12 changes: 10 additions & 2 deletions src/uipath/_cli/_utils/_studio_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,18 +538,25 @@ async def _get_existing_resources(self) -> List[dict[str, Any]]:
]
return self._resources_cache

async def get_resource_overwrites(self) -> dict[str, ResourceOverwrite]:
async def get_resource_overwrites(self) -> dict[str, ResourceOverwrite | None]:
"""Get resource overwrites from the solution.

Returns:
dict[str, ResourceOverwrite]: Dict of resource overwrites
"""
from uipath._cli._utils._common import (
extract_binding_keys,
fill_missing_binding_keys,
)

if not os.path.exists(UiPathConfig.bindings_file_path):
return {}

with open(UiPathConfig.bindings_file_path, "rb") as f:
file_content = f.read()

bindings_list = extract_binding_keys(file_content.decode("utf-8"))

solution_id = await self._get_solution_id()
tenant_id = os.getenv(ENV_TENANT_ID, None)

Expand All @@ -572,11 +579,12 @@ async def get_resource_overwrites(self) -> dict[str, ResourceOverwrite]:
files=files,
)
data = response.json()
overwrites = {}
overwrites: dict[str, ResourceOverwrite | None] = {}

for key, value in data.items():
overwrites[key] = ResourceOverwriteParser.parse(key, value)

fill_missing_binding_keys(overwrites, bindings_list)
return overwrites

async def create_virtual_resource(
Expand Down
1 change: 0 additions & 1 deletion src/uipath/_cli/cli_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ async def execute_debug_runtime():

# Load simulation config and set up execution context for tool mocking
mocking_ctx = load_simulation_config()
span_collector: ExecutionSpanCollector | None = None
execution_id = str(uuid.uuid4())

if mocking_ctx:
Expand Down
157 changes: 134 additions & 23 deletions src/uipath/_utils/_bindings.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import functools
import inspect
from abc import ABC, abstractmethod
Expand All @@ -8,7 +10,6 @@
Callable,
Coroutine,
Literal,
Optional,
TypeVar,
Union,
)
Expand All @@ -35,11 +36,25 @@ def resource_identifier(self) -> str:

@property
@abstractmethod
def folder_identifier(self) -> str:
def folder_identifier(self) -> str | None:
"""The folder location identifier for this resource."""
pass


class SystemResourceOverwrite(ResourceOverwrite):
resource_type: Literal["index"]
name: str = Field(alias="name")
folder_key: str = Field(alias="folderKey")

@property
def resource_identifier(self) -> str:
return self.name

@property
def folder_identifier(self) -> str:
return self.folder_key


class GenericResourceOverwrite(ResourceOverwrite):
resource_type: Literal["process", "index", "app", "asset", "bucket", "mcpServer"]
name: str = Field(alias="name")
Expand Down Expand Up @@ -111,37 +126,49 @@ def parse(cls, key: str, value: dict[str, Any]) -> ResourceOverwrite:
return cls._adapter.validate_python(value_with_type)


_resource_overwrites: ContextVar[Optional[dict[str, ResourceOverwrite]]] = ContextVar(
"resource_overwrites", default=None
# this context var holds a dictionary in the following format:
# {"binding_key: applied_overwrite | None"}
# for system resources (e.g. system indexes) we need to make sure that a binding is set with no corresponding overwrite
_binding_overwrites: ContextVar[dict[str, ResourceOverwrite | None] | None] = (
ContextVar("binding_overwrites", default=None)
)


class ResourceOverwritesContext:
def __init__(
self,
get_overwrites_callable: Callable[
[], Coroutine[Any, Any, dict[str, ResourceOverwrite]]
[], Coroutine[Any, Any, dict[str, ResourceOverwrite | None]]
],
):
self.get_overwrites_callable = get_overwrites_callable
self._token: Optional[Token[Optional[dict[str, ResourceOverwrite]]]] = None
self._token: Token[dict[str, ResourceOverwrite | None] | None] | None = None
self.overwrites_count = 0

async def __aenter__(self) -> "ResourceOverwritesContext":
async def __aenter__(self) -> ResourceOverwritesContext:
overwrites = await self.get_overwrites_callable()
self._token = _resource_overwrites.set(overwrites)
self._token = _binding_overwrites.set(overwrites)
self.overwrites_count = len(overwrites)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._token:
_resource_overwrites.reset(self._token)
_binding_overwrites.reset(self._token)


class ArgumentsProcessingResponse(BaseModel):
arguments: dict[str, Any]
can_apply_resolution: bool = False


def resource_override(
resource_type: str,
resource_identifier: str = "name",
folder_identifier: str = "folder_path",
resolution_func_name: str | None = None,
resolution_coroutine_name: str | None = None,
resolution_resource_identifier: str = "name",
resolution_folder_identifier: str = "folder_path",
) -> Callable[..., Any]:
"""Decorator for applying resource overrides for an overridable resource.

Expand All @@ -152,6 +179,11 @@ def resource_override(
resource_type: Type of resource to check for overrides (e.g., "asset", "bucket")
resource_identifier: Key name for the resource ID in override data (default: "name")
folder_identifier: Key name for the folder path in override data (default: "folder_path")
resolution_func_name: Optional sync callable for resolving the overwrite when binding is set but overwrite not present.
resolution_coroutine_name: Optional async callable for resolving the overwrite when binding is set but overwrite not present.
Note: those are passed string reference since decorators are evaluated at class definition time.
resolution_resource_identifier: Key name for the resource ID in resolution data (default: "name")
resolution_folder_identifier: Key name for the folder identifier in resolution data (default: "folder_path")

Returns:
Decorated function that receives overridden resource identifiers when applicable
Expand All @@ -163,12 +195,34 @@ def resource_override(
def decorator(func: Callable[..., Any]):
sig = inspect.signature(func)

def process_args(args, kwargs) -> dict[str, Any]:
def apply_overwrite(
all_args: dict[str, Any],
matched_overwrite: ResourceOverwrite,
is_resolution_overwrite=False,
) -> None:
resource_id = (
resolution_resource_identifier
if is_resolution_overwrite
else resource_identifier
)
folder_id = (
resolution_folder_identifier
if is_resolution_overwrite
else folder_identifier
)

if resource_id in sig.parameters:
all_args[resource_id] = matched_overwrite.resource_identifier
if folder_id in sig.parameters:
all_args[folder_id] = matched_overwrite.folder_identifier

def process_args(args, kwargs) -> ArgumentsProcessingResponse:
"""Process arguments and apply resource overrides if applicable."""
# convert both args and kwargs to single dict
bound = sig.bind_partial(*args, **kwargs)
bound.apply_defaults()
all_args = dict(bound.arguments)

if (
"kwargs" in sig.parameters
and sig.parameters["kwargs"].kind == inspect.Parameter.VAR_KEYWORD
Expand All @@ -177,7 +231,8 @@ def process_args(args, kwargs) -> dict[str, Any]:
all_args.update(extra_kwargs)

# Get overwrites from context variable
context_overwrites = _resource_overwrites.get()

context_overwrites = _binding_overwrites.get()

if context_overwrites is not None:
resource_identifier_value = all_args.get(resource_identifier)
Expand All @@ -192,34 +247,90 @@ def process_args(args, kwargs) -> dict[str, Any]:
else key
)

matched_overwrite = context_overwrites.get(key)
try:
matched_overwrite = context_overwrites[key]
except KeyError:
# binding not set, default to original parameters
return ArgumentsProcessingResponse(arguments=all_args)

# Apply the matched overwrite
if matched_overwrite is not None:
if resource_identifier in sig.parameters:
all_args[resource_identifier] = (
matched_overwrite.resource_identifier
)
if folder_identifier in sig.parameters:
all_args[folder_identifier] = (
matched_overwrite.folder_identifier
)
apply_overwrite(all_args, matched_overwrite)
return ArgumentsProcessingResponse(arguments=all_args)

# binding is set but no corresponding overwrite exists
# we can try to apply the resolution
return ArgumentsProcessingResponse(
arguments=all_args, can_apply_resolution=True
)

return ArgumentsProcessingResponse(arguments=all_args)

return all_args
def filter_function_args(func: Callable[..., Any], all_args: dict[str, Any]):
callable_sig = inspect.signature(func)
filtered_args = {}
for param_name in callable_sig.parameters:
if param_name != "self" and param_name in all_args:
filtered_args[param_name] = all_args[param_name]
return filtered_args

if inspect.iscoroutinefunction(func):

@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
all_args = process_args(args, kwargs)
process_args_response = process_args(args, kwargs)
all_args = process_args_response.arguments
if not (
process_args_response.can_apply_resolution
and resolution_coroutine_name
):
return await func(**all_args)

# apply resolution coroutine
invoked_class_instance = args[0] # self
resolution_coroutine: (
Callable[..., Coroutine[Any, Any, ResourceOverwrite | None]] | None
) = getattr(invoked_class_instance, resolution_coroutine_name)
if resolution_coroutine and (
resource_overwrite := await resolution_coroutine(
**filter_function_args(
resolution_coroutine, process_args_response.arguments
)
)
):
apply_overwrite(
all_args, resource_overwrite, is_resolution_overwrite=True
)
return await func(**all_args)

return async_wrapper

else:

@functools.wraps(func)
def wrapper(*args, **kwargs):
all_args = process_args(args, kwargs)
process_args_response = process_args(args, kwargs)
all_args = process_args_response.arguments
if not (
process_args_response.can_apply_resolution and resolution_func_name
):
return func(**all_args)

# apply resolution function
invoked_class_instance = args[0] # self
resolution_func: Callable[..., ResourceOverwrite | None] | None = (
getattr(invoked_class_instance, resolution_func_name)
)
if resolution_func and (
resource_overwrite := resolution_func(
**filter_function_args(
resolution_func, process_args_response.arguments
)
)
):
apply_overwrite(
all_args, resource_overwrite, is_resolution_overwrite=True
)
return func(**all_args)

return wrapper
Expand Down
Loading
Loading