diff --git a/lithops/serverless/backends/one/config.py b/lithops/serverless/backends/one/config.py index 3d9bbe559..52d9b8ccf 100644 --- a/lithops/serverless/backends/one/config.py +++ b/lithops/serverless/backends/one/config.py @@ -1,197 +1,40 @@ -import os -import json +import enum - -from lithops.serverless.backends.k8s.config import ( - DEFAULT_CONFIG_KEYS, - load_config as original_load_config -) +from lithops.serverless.backends.k8s.config import DEFAULT_CONFIG_KEYS +from lithops.serverless.backends.k8s.config import load_config as load_k8 class OneConfigError(Exception): pass -MANDATORY_CONFIG_KEYS = { - "public_network_id", - "private_network_id" -} - -OPTIONAL_CONFIG_KEYS = { - "ONEAPP_VROUTER_ETH0_VIP0": "", - "ONEAPP_VROUTER_ETH1_VIP0": "", - "ONEAPP_RKE2_SUPERVISOR_EP": "ep0.eth0.vr:9345", - "ONEAPP_K8S_CONTROL_PLANE_EP": "ep0.eth0.vr:6443", - "ONEAPP_K8S_EXTRA_SANS": "localhost,127.0.0.1,ep0.eth0.vr,${vnf.TEMPLATE.CONTEXT.ETH0_IP},k8s.yourdomain.it", - "ONEAPP_K8S_MULTUS_ENABLED": "NO", - "ONEAPP_K8S_MULTUS_CONFIG": "", - "ONEAPP_K8S_CNI_PLUGIN": "cilium", - "ONEAPP_K8S_CNI_CONFIG": "", - "ONEAPP_K8S_CILIUM_RANGE": "", - "ONEAPP_K8S_METALLB_ENABLED": "NO", - "ONEAPP_K8S_METALLB_CONFIG": "", - "ONEAPP_K8S_METALLB_RANGE": "", - "ONEAPP_K8S_LONGHORN_ENABLED": "YES", - "ONEAPP_STORAGE_DEVICE": "/dev/vdb", - "ONEAPP_STORAGE_FILESYSTEM": "xfs", - "ONEAPP_K8S_TRAEFIK_ENABLED": "YES", - "ONEAPP_VNF_HAPROXY_INTERFACES": "eth0", - "ONEAPP_VNF_HAPROXY_REFRESH_RATE": "30", - "ONEAPP_VNF_HAPROXY_LB0_PORT": "9345", - "ONEAPP_VNF_HAPROXY_LB1_PORT": "6443", - "ONEAPP_VNF_HAPROXY_LB2_PORT": "443", - "ONEAPP_VNF_HAPROXY_LB3_PORT": "80", - "ONEAPP_VNF_DNS_ENABLED": "YES", - "ONEAPP_VNF_DNS_INTERFACES": "eth1", - "ONEAPP_VNF_DNS_NAMESERVERS": "1.1.1.1,8.8.8.8", - "ONEAPP_VNF_NAT4_ENABLED": "YES", - "ONEAPP_VNF_NAT4_INTERFACES_OUT": "eth0", - "ONEAPP_VNF_ROUTER4_ENABLED": "YES", - "ONEAPP_VNF_ROUTER4_INTERFACES": "eth0,eth1" -} - -DEFAULT_PRIVATE_VNET = """ -NAME = "private-oneke" -VN_MAD = "bridge" -AUTOMATIC_VLAN_ID = "YES" -AR = [TYPE = "IP4", IP = "192.168.150.0", SIZE = "51"] -""" - -STATE = { - 0: "INIT", - 1: "PENDING", - 2: "HOLD", - 3: "ACTIVE", - 4: "STOPPED", - 5: "SUSPENDED", - 6: "DONE", - 8: "POWEROFF", - 9: "UNDEPLOYED", - 10: "CLONING", - 11: "CLONING_FAILURE" -} +@enum.unique +class ServiceState(enum.Enum): + RUNNING = 2 + SCALING = 9 + COOLDOWN = 10 -LCM_STATE = { - 0: "LCM_INIT", - 1: "PROLOG", - 2: "BOOT", - 3: "RUNNING", - 4: "MIGRATE", - 5: "SAVE_STOP", - 6: "SAVE_SUSPEND", - 7: "SAVE_MIGRATE", - 8: "PROLOG_MIGRATE", - 9: "PROLOG_RESUME", - 10: "EPILOG_STOP", - 11: "EPILOG", - 12: "SHUTDOWN", - 15: "CLEANUP_RESUBMIT", - 16: "UNKNOWN", - 17: "HOTPLUG", - 18: "SHUTDOWN_POWEROFF", - 19: "BOOT_UNKNOWN", - 20: "BOOT_POWEROFF", - 21: "BOOT_SUSPENDED", - 22: "BOOT_STOPPED", - 23: "CLEANUP_DELETE", - 24: "HOTPLUG_SNAPSHOT", - 25: "HOTPLUG_NIC", - 26: "HOTPLUG_SAVEAS", - 27: "HOTPLUG_SAVEAS_POWEROFF", - 28: "HOTPLUG_SAVEAS_SUSPENDED", - 29: "SHUTDOWN_UNDEPLOY", - 30: "EPILOG_UNDEPLOY", - 31: "PROLOG_UNDEPLOY", - 32: "BOOT_UNDEPLOY", - 33: "HOTPLUG_PROLOG_POWEROFF", - 34: "HOTPLUG_EPILOG_POWEROFF", - 35: "BOOT_MIGRATE", - 36: "BOOT_FAILURE", - 37: "BOOT_MIGRATE_FAILURE", - 38: "PROLOG_MIGRATE_FAILURE", - 39: "PROLOG_FAILURE", - 40: "EPILOG_FAILURE", - 41: "EPILOG_STOP_FAILURE", - 42: "EPILOG_UNDEPLOY_FAILURE", - 43: "PROLOG_MIGRATE_POWEROFF", - 44: "PROLOG_MIGRATE_POWEROFF_FAILURE", - 45: "PROLOG_MIGRATE_SUSPEND", - 46: "PROLOG_MIGRATE_SUSPEND_FAILURE", - 47: "BOOT_UNDEPLOY_FAILURE", - 48: "BOOT_STOPPED_FAILURE", - 49: "PROLOG_RESUME_FAILURE", - 50: "PROLOG_UNDEPLOY_FAILURE", - 51: "DISK_SNAPSHOT_POWEROFF", - 52: "DISK_SNAPSHOT_REVERT_POWEROFF", - 53: "DISK_SNAPSHOT_DELETE_POWEROFF", - 54: "DISK_SNAPSHOT_SUSPENDED", - 55: "DISK_SNAPSHOT_REVERT_SUSPENDED", - 56: "DISK_SNAPSHOT_DELETE_SUSPENDED", - 57: "DISK_SNAPSHOT", - 59: "DISK_SNAPSHOT_DELETE", - 60: "PROLOG_MIGRATE_UNKNOWN", - 61: "PROLOG_MIGRATE_UNKNOWN_FAILURE", - 62: "DISK_RESIZE", - 63: "DISK_RESIZE_POWEROFF", - 64: "DISK_RESIZE_UNDEPLOYED", - 65: "HOTPLUG_NIC_POWEROFF", - 66: "HOTPLUG_RESIZE", - 67: "HOTPLUG_SAVEAS_UNDEPLOYED", - 68: "HOTPLUG_SAVEAS_STOPPED", - 69: "BACKUP", - 70: "BACKUP_POWEROFF" -} # Add OpenNebula defaults -DEFAULT_CONFIG_KEYS.update({ - 'timeout': 600, - 'kubecfg_path': '/tmp/kube_config', - 'oneke_config_path': None, - 'delete': False, - 'minimum_nodes': 0, - 'maximum_nodes': -1, - 'average_job_execution': 1, - 'auto_scale': 'all', -}) +DEFAULT_CONFIG_KEYS.update( + { + "minimum_nodes": 1, + "maximum_nodes": 3, + } +) def load_config(config_data): - if 'oneke_config' in config_data['one']: - oneke_config = config_data['one']['oneke_config'] - - # Validate mandatory params - for key in MANDATORY_CONFIG_KEYS: - if key not in oneke_config: - raise OneConfigError(f"'{key}' is missing in 'oneke_config'") - public_network_id = oneke_config['public_network_id'] - private_network_id = oneke_config['private_network_id'] - - # Optional params - name = oneke_config.get('name', 'OneKE for lithops') - custom_attrs_values = {key: oneke_config.get(key, default_value) - for key, default_value in OPTIONAL_CONFIG_KEYS.items()} - - oneke_update = { - "name": name, - "networks_values": [ - {"Public": {"id": str(public_network_id)}}, - {"Private": {"id": str(private_network_id)}} - ], - "custom_attrs_values": custom_attrs_values - } - # Override oneke_config with a valid JSON to update the service - config_data['one']['oneke_config'] = json.dumps(oneke_update) - # Load default config for key in DEFAULT_CONFIG_KEYS: - if key not in config_data['one']: - config_data['one'][key] = DEFAULT_CONFIG_KEYS[key] + if key not in config_data["one"]: + config_data["one"][key] = DEFAULT_CONFIG_KEYS[key] # Ensure 'k8s' key exists and is a dictionary - if 'k8s' not in config_data or config_data['k8s'] is None: - config_data['k8s'] = {} - config_data['k8s']['docker_user'] = config_data['one']['docker_user'] - config_data['k8s']['docker_password'] = config_data['one']['docker_password'] + if "k8s" not in config_data or config_data["k8s"] is None: + config_data["k8s"] = {} + config_data["k8s"]["docker_user"] = config_data["one"]["docker_user"] + config_data["k8s"]["docker_password"] = config_data["one"]["docker_password"] # Load k8s default config - original_load_config(config_data) \ No newline at end of file + load_k8(config_data) diff --git a/lithops/serverless/backends/one/gate.py b/lithops/serverless/backends/one/gate.py new file mode 100644 index 000000000..c0b859951 --- /dev/null +++ b/lithops/serverless/backends/one/gate.py @@ -0,0 +1,68 @@ +import os + +import requests + + +class OneGateError(Exception): + """General exception for OneGate-related errors.""" + + def __init__(self, message, status_code=None): + super().__init__(message) + self.status_code = status_code + + +class OneGateClient: + def __init__(self): + self.endpoint = os.getenv( + "ONEGATE_ENDPOINT", self.get_config("ONEGATE_ENDPOINT") + ) + self.token = self.read_file("/mnt/context/token.txt") + self.vm_id = self.get_config("VMID") + + @staticmethod + def read_file(filepath): + with open(filepath, "r") as file: + return file.read().strip() + + @staticmethod + def get_config(param, filepath="/mnt/context/context.sh"): + with open(filepath, "r") as file: + for line in file: + if line.startswith(f"{param}="): + return line.split("=", 1)[1].strip().strip("'\"") + return None + + def get(self, path): + """ + Make a GET request to OneGate API and return the JSON response. + """ + url = f"{self.endpoint}/{path}" + headers = {"X-ONEGATE-TOKEN": self.token, "X-ONEGATE-VMID": self.vm_id} + + try: + response = requests.get(url, headers=headers) + response.raise_for_status() + return response.json() + except requests.exceptions.RequestException as e: + status_code = e.response.status_code if e.response else None + raise OneGateError(f"GET request to {url} failed: {e}", status_code) + except ValueError as e: + raise OneGateError(f"Failed to parse JSON response: {e}") + + def scale(self, cardinality, role="worker"): + """ + Make a PUT request to OneGate API. + """ + url = f"{self.endpoint}/service/role/{role}" + headers = { + "X-ONEGATE-TOKEN": self.token, + "X-ONEGATE-VMID": self.vm_id, + "Content-Type": "application/json", + } + data = {"cardinality": cardinality} + try: + response = requests.put(url, headers=headers, json=data) + response.raise_for_status() + except requests.exceptions.RequestException as e: + status_code = e.response.status_code if e.response else None + raise OneGateError(f"PUT request to {url} failed: {e}", status_code) diff --git a/lithops/serverless/backends/one/one.py b/lithops/serverless/backends/one/one.py index b289e64ab..7f74cca93 100644 --- a/lithops/serverless/backends/one/one.py +++ b/lithops/serverless/backends/one/one.py @@ -1,16 +1,14 @@ -from ..k8s.k8s import KubernetesBackend -from .config import STATE, LCM_STATE - -import oneflow -import pyone - -import os -import json -import time import base64 import logging +import math +import time + import urllib3 +from ..k8s.k8s import KubernetesBackend +from .config import ServiceState +from .gate import OneGateClient + logger = logging.getLogger(__name__) urllib3.disable_warnings() @@ -18,352 +16,170 @@ class OneError(Exception): pass -def _config_one(): - env = os.environ - - # Reading the `one_auth` file or environment variable. - # `ONE_AUTH` can be a file path or the credentials, otherwise - # the default credentials in `$HOME/.one/one_auth` are used. - one_auth = env.get('ONE_AUTH') or os.path.expanduser('~/.one/one_auth') - if os.path.isfile(one_auth): - with open(one_auth, mode='r') as auth_file: - credentials = auth_file.readlines()[0].strip() - else: - credentials = one_auth.strip() - - # Reading environment variables. - # Environment variable `ONESERVER_URL` superseeds the default URL. - url = env.get('ONESERVER_URL', 'http://localhost:2633/RPC2') - - return pyone.OneServer(url, session=credentials) - class OpenNebula(KubernetesBackend): """ A wrap-up around OpenNebula backend. """ + def __init__(self, one_config, internal_storage): logger.info("Initializing OpenNebula backend") - # Overwrite config values - self.name = 'one' - self.timeout = one_config['timeout'] - self.auto_scale = one_config['auto_scale'] - self.minimum_nodes = one_config['minimum_nodes'] - self.maximum_nodes = one_config['maximum_nodes'] - self.runtime_cpu = float(one_config['runtime_cpu']) - self.runtime_memory = float(one_config['runtime_memory']) - self.average_job_execution = float(one_config['average_job_execution']) - self.job_clean = set() - - # Export environment variables - os.environ['ONEFLOW_URL'] = one_config['oneflow_url'] - os.environ['ONESERVER_URL'] = one_config['oneserver_url'] - os.environ['ONE_AUTH'] = one_config['one_auth'] - - logger.debug("Initializing Oneflow python client") - self.client = oneflow.OneFlowClient() - - logger.debug("Initializing OpenNebula python client") - self.pyone = _config_one() - - # service_template_id: instantiate master node - if 'service_template_id' in one_config: - self.service_id = self._instantiate_oneke( - one_config['service_template_id'], - one_config['oneke_config'], - one_config['oneke_config_path'] - ) - self._wait_for_oneke('RUNNING') - elif 'service_id' in one_config: - self.service_id = one_config['service_id'] - else: - raise OneError( - "OpenNebula backend must contain 'service_template_id' or 'service_id'" - ) - self._check_oneke() + # Backend configuration + self.name = "one" + self.auto_scale = one_config["auto_scale"] + self.runtime_cpu = float(one_config["runtime_cpu"]) + self.runtime_memory = float(one_config["runtime_memory"]) + self.job_clean = set() + # TODO: get this values + self.minimum_nodes = one_config["minimum_nodes"] + self.maximum_nodes = one_config["maximum_nodes"] + + # Check OneKE service + logger.debug("Initializing OneGate python client") + self.client = OneGateClient() + logger.info("Checking OpenNebula OneKE service status") + self.service_id = self.client.get("service").get("SERVICE", {}).get("id") + self._check_service_status() # Get and Save kubeconfig from OneKE kubecfg = self._get_kube_config() - with open(one_config['kubecfg_path'], 'w') as file: + with open(one_config["kubecfg_path"], "w") as file: file.write(kubecfg) - self.kubecfg_path = one_config['kubecfg_path'] + self.kubecfg_path = one_config["kubecfg_path"] super().__init__(one_config, internal_storage) - def invoke(self, docker_image_name, runtime_memory, job_payload): # Wait for nodes to become available in Kubernetes - vm_workers = len(self._get_vm_workers()) - self._wait_kubernetes_nodes(vm_workers) + for role in self.client.get("service").get("SERVICE", {}).get("roles", []): + if role.get("name").lower() == "worker": + oneke_workers = role.get("cardinality") + self._wait_kubernetes_nodes(oneke_workers) # Scale nodes scale_nodes, pods, chunksize, worker_processes = self._granularity( - job_payload['total_calls'] + job_payload["total_calls"] ) if scale_nodes == 0 and len(self.nodes) == 0: raise OneError( - f"No nodes available and can't scale. Ensure nodes are active, detected by " - f"Kubernetes, and have enough resources to scale." + "No nodes available and can not scale. Ensure nodes are active, " + "detected by Kubernetes, and have enough resources to scale." ) - if scale_nodes > len(self.nodes) and self.auto_scale in {'all', 'up'}: + if scale_nodes > len(self.nodes) and self.auto_scale in {"all", "up"}: self._scale_oneke(self.nodes, scale_nodes) self._wait_kubernetes_nodes(scale_nodes) # Setup granularity - job_payload['max_workers'] = pods - job_payload['chunksize'] = chunksize - job_payload['worker_processes'] = worker_processes + job_payload["max_workers"] = pods + job_payload["chunksize"] = chunksize + job_payload["worker_processes"] = worker_processes return super().invoke(docker_image_name, runtime_memory, job_payload) - def clear(self, job_keys=None): if not job_keys: return - new_keys = [key for key in job_keys if key not in self.job_clean] - if not new_keys: - return - - self.job_clean.update(new_keys) - super().clear(job_keys) - super()._get_nodes() - - if self.auto_scale in {'all', 'down'}: - self._scale_oneke(self.nodes, self.minimum_nodes) - - - def _check_oneke(self): - logger.info("Checking OpenNebula OneKE service status") - - # Check service status - state = self._get_latest_state() - if state != 'RUNNING': - raise OneError(f"OpenNebula OneKE service is not 'RUNNING': {state}") - - # Check VMs status - self._check_vms_status() - - - def _instantiate_oneke(self, service_template_id, oneke_config, oneke_config_path): - # TODO: create private network if not passed - - # Instantiate OneKE (with JSON or oneke_config parameters) - logger.info("Instantiating OpenNebula OneKE service") - if oneke_config_path is not None: - _json = self.client.templatepool[service_template_id].instantiate(path=oneke_config_path) - else: - oneke_json = json.loads(oneke_config) - _json = self.client.templatepool[service_template_id].instantiate(json_str=oneke_json) - - # Get service_id from JSON - service_id = list(_json.keys())[0] - logger.info(f"OneKE service ID: {service_id}") - return service_id - - - def _wait_for_oneke(self, state): - start_time = time.time() - minutes_timeout = int(self.timeout/60) - logger.info( - f"Waiting for OneKE service to become {state}. " - f"Wait time: {minutes_timeout} minutes" - ) - while True: - # Check timeout - elapsed_time = time.time() - start_time - if elapsed_time > self.timeout: - raise OneError( - f"Can't reach {state} state. OneKE timed out after {self.timeout} seconds. " - f"You can try again once OneKE is in `'RUNNING'` state with the `service_id` option" - ) - - # Check OneKE deployment status - current_state = self._get_latest_state() - if current_state == 'FAILED_DEPLOYING': - raise OneError("OneKE deployment has failed") - elif current_state == 'FAILED_SCALING': - raise OneError("OneKE scaling has failed") - elif current_state == state: - break - - time.sleep(5) - logger.info(f"OneKE service is {state} after {int(elapsed_time)} seconds") - + new_keys = set(job_keys) - self.job_clean + if new_keys: + self.job_clean.update(new_keys) + super().clear(job_keys) + super()._get_nodes() + + if ( + self.auto_scale in {"all", "down"} + and len(self.nodes) > self.minimum_nodes + ): + self._scale_oneke(self.nodes, self.minimum_nodes) + + def _check_service_status(self): + roles = self.client.get("service").get("SERVICE", {}).get("roles", []) + failed_roles = [] + for role in roles: + for node in role.get("nodes", []): + vm_id = node.get("vm_info", {}).get("VM", {}).get("ID") + vm = self.client.get(f"vms/{vm_id}")["VM"] + if vm.get("STATE") != "3" or vm.get("LCM_STATE") != "3": + failed_roles.append(role.get("name")) + break + if failed_roles: + raise OneError(f"Non-running roles detected: {', '.join(failed_roles)}") def _get_kube_config(self): - # Get master VM ID - master_vm_id = None - _service_json = self.client.servicepool[self.service_id].info() - roles = _service_json[str(self.service_id)]['TEMPLATE']['BODY']['roles'] - for role in roles: - if role['name'] == 'master': - master_vm_id = role['nodes'][0]['vm_info']['VM']['ID'] - break - if master_vm_id is None: + roles = self.client.get("service").get("SERVICE", {}).get("roles", []) + master_role = next( + (role for role in roles if role.get("name").lower() == "master"), None + ) + if not master_role: raise OneError( - "Master VM ID not found. " - "Please change the name of the master node role to 'master' and try again" + "Master VM not found. Ensure OneKE master node role is named 'master'" ) - - # Get kubeconfig - vm = self.pyone.vm.info(int(master_vm_id)) - encoded_kubeconfig = vm.USER_TEMPLATE.get('ONEKE_KUBECONFIG') - decoded_kubeconfig = base64.b64decode(encoded_kubeconfig).decode('utf-8') + master_id = ( + master_role.get("nodes", [])[0].get("vm_info", {}).get("VM", {}).get("ID") + ) + master_vm = self.client.get(f"vms/{master_id}")["VM"] + encoded_kubeconfig = master_vm.get("USER_TEMPLATE", {}).get( + "ONEKE_KUBECONFIG", None + ) + if not encoded_kubeconfig: + raise OneError("Kubernetes configuration missing in the OneKE master VM.") + decoded_kubeconfig = base64.b64decode(encoded_kubeconfig).decode("utf-8") logger.debug(f"OpenNebula OneKE Kubeconfig: {decoded_kubeconfig}") return decoded_kubeconfig - - - def _get_latest_state(self): - _service_json = self.client.servicepool[self.service_id].info() - logs = _service_json[str(self.service_id)]['TEMPLATE']['BODY'].get('log', []) - for log in reversed(logs): - if 'New state:' in log['message']: - return log['message'].split(':')[-1].strip() - raise OneError("No state found in logs") - - def _check_vms_status(self): - _service_json = self.client.servicepool[self.service_id].info() - vm_ids = { - node['vm_info']['VM']['ID'] - for role in _service_json[str(self.service_id)]['TEMPLATE']['BODY']['roles'] - for node in role['nodes'] - } - if len(vm_ids) == 0: - raise OneError("No VMs found in OneKE service") - for vm_id in vm_ids: - vm = self.pyone.vm.info(int(vm_id)) - state = vm.STATE - lcm_state = vm.LCM_STATE - if state != 3 or lcm_state != 3: - state_desc = STATE.get(state, "UNKNOWN_STATE") - lcm_state_desc = LCM_STATE.get(lcm_state, "UNKNOWN_LCM_STATE") - raise OneError( - f"VM {vm_id} fails validation: " - f"STATE={state_desc} (code {state}), " - f"LCM_STATE={lcm_state_desc} (code {lcm_state})" + def _wait_for_state(self, state, timeout=300): + start_time = time.time() + minutes_timeout = int(timeout / 60) + logger.info( + f"Waiting for OneKE service to become {state}. " + f"Wait time: {minutes_timeout} minutes" + ) + while (elapsed_time := time.time() - start_time) <= timeout: + service_data = self.client.get("service") + service_state = service_data.get("SERVICE", {}).get("state") + if service_state == ServiceState[state].value: + logger.info( + f"OneKE service is {state} after {int(elapsed_time)} seconds." ) - - - def _get_vm_workers(self): - workers_ids = set() - _service_json = self.client.servicepool[self.service_id].info() - roles = _service_json[str(self.service_id)]['TEMPLATE']['BODY']['roles'] - for role in roles: - if role['name'] == 'worker': - for node in role['nodes']: - workers_ids.add(node['vm_info']['VM']['ID']) - return workers_ids - + return + time.sleep(1) + raise OneError( + f"Unable to reach {state} state. OneKE timed out after {timeout} seconds. " + f"Please retry when OneKE is in the 'RUNNING' state." + ) def _granularity(self, total_functions): - _host_cpu, _host_mem = self._one_resources() - _node_cpu, _node_mem = self._node_resources() - # OpenNebula available resources - max_nodes_cpu = int(_host_cpu / _node_cpu) - max_nodes_mem = int(_host_mem / _node_mem) - # OneKE current available resources - current_nodes = len(self.nodes) - total_pods_cpu = sum(int((float(node['cpu'])-1) // self.runtime_cpu) for node in self.nodes) - total_pods_mem = sum( - int(self._parse_unit(node['memory']) // self.runtime_memory) - for node in self.nodes + oneke_nodes = len(self.nodes) + oneke_workers = oneke_nodes * int(self.runtime_cpu) + nodes = ( + oneke_nodes + if total_functions <= oneke_workers + else oneke_nodes + (self.maximum_nodes - oneke_nodes) + ) + workers = int( + math.ceil(total_functions / nodes) + if total_functions <= oneke_workers + else (self.runtime_cpu) ) - current_pods = min(total_pods_cpu, total_pods_mem) - # Set by the user, otherwise calculated based on OpenNebula available Resources - max_nodes = min(max_nodes_cpu, max_nodes_mem) + current_nodes - total_nodes = max_nodes if self.maximum_nodes == -1 else min(self.maximum_nodes, max_nodes) - # Calculate the best time with scaling - best_time = (total_functions / current_pods) * self.average_job_execution if current_pods > 0 else float('inf') - for additional_nodes in range(1, total_nodes - current_nodes + 1): - new_pods = min(int(_node_cpu // self.runtime_cpu), int(_node_mem // self.runtime_memory)) - if new_pods > 0 and (current_pods <= total_functions): - estimated_time_with_scaling = (total_functions / (current_pods+new_pods)) * self.average_job_execution - total_creation_time = self._get_total_creation_time(additional_nodes) - total_estimated_time_with_scaling = estimated_time_with_scaling + total_creation_time - if total_estimated_time_with_scaling < best_time: - best_time = total_estimated_time_with_scaling - current_nodes += 1 - new_pods_cpu = int(_node_cpu // self.runtime_cpu) - new_pods_mem = int(_node_mem // self.runtime_memory) - current_pods += min(new_pods_cpu, new_pods_mem) - - nodes = current_nodes - pods = min(total_functions, current_pods) - logger.info( - f"Nodes: {nodes}, Pods: {pods}, Chunksize: 1, Worker Processes: 1" + f"Nodes: {nodes}, Pods: {nodes}, Chunksize: {workers}, Worker Processes: {workers}" ) - return nodes, pods, 1, 1 - + return nodes, nodes, workers, workers def _scale_oneke(self, nodes, scale_nodes): logger.info(f"Scaling workers from {len(nodes)} to {scale_nodes} nodes") # Ensure the service can be scaled - state = self._get_latest_state() - if state == 'COOLDOWN': + service = self.client.get("service").get("SERVICE", {}) + if service.get("state") == ServiceState.COOLDOWN.value: if len(self.nodes) == 0: - self._wait_for_oneke('RUNNING') + self._wait_for_state("RUNNING") else: - logger.info("OneKE service is in 'COOLDOWN' state and does not need to be scaled") + logger.info( + "OneKE service is in 'COOLDOWN' state and does not need to be scaled" + ) return - self.client.servicepool[self.service_id].role["worker"].scale(int(scale_nodes)) - self._wait_for_oneke('COOLDOWN') - - - def _one_resources(self): - hostpool = self.pyone.hostpool.info() - host = hostpool.HOST[0] - - total_cpu = host.HOST_SHARE.TOTAL_CPU - used_cpu = host.HOST_SHARE.CPU_USAGE - - total_memory = host.HOST_SHARE.TOTAL_MEM - used_memory = host.HOST_SHARE.MEM_USAGE - - one_cpu = (total_cpu - used_cpu)/100 - one_memory = (total_memory - used_memory)/1000 - logger.info( - f"Available CPU: {one_cpu}, Available Memory: {one_memory}" - ) - return one_cpu, one_memory - - - def _node_resources(self): - _service_json = self.client.servicepool[self.service_id].info() - _service_roles = _service_json[str(self.service_id)]['TEMPLATE']['BODY']['roles'] - - for role in _service_roles: - if role['name'] == 'worker': - vm_template_id = role['vm_template'] - break - - vm_template = self.pyone.template.info(int(vm_template_id)).TEMPLATE - template_cpu = float(vm_template['CPU']) - template_memory = float(vm_template['MEMORY']) - logger.info(f"Template CPU: {template_cpu}, Template Memory: {template_memory}") - return template_cpu, template_memory - - - def _get_total_creation_time(self, additional_nodes): - # First node creation time is 90 seconds - # Additional nodes take 30 seconds in total, regardless of the number - return 90 if additional_nodes == 1 else 120 - - - def _parse_unit(self, unit_str): - unit = unit_str[-2:] - value = float(unit_str[:-2]) - if unit == 'Ki': - return value - elif unit == 'Mi': - return value * 1024 - elif unit == 'Gi': - return value * 1024 * 1024 - else: - raise ValueError(f"Unsupported unit: {unit_str}") - + self.client.scale(int(scale_nodes)) + self._wait_for_state("COOLDOWN") def deploy_runtime(self, docker_image_name, memory, timeout): super()._get_nodes() @@ -371,9 +187,10 @@ def deploy_runtime(self, docker_image_name, memory, timeout): self._scale_oneke(self.nodes, 1) return super().deploy_runtime(docker_image_name, memory, timeout) - - def _wait_kubernetes_nodes(self, total): + def _wait_kubernetes_nodes(self, total, timeout=60): + logger.info(f"Waiting for {total} Kubernetes nodes to become available") super()._get_nodes() - while len(self.nodes) < total: + start_time = time.time() + while len(self.nodes) < total and time.time() - start_time < timeout: time.sleep(1) - super()._get_nodes() \ No newline at end of file + super()._get_nodes() diff --git a/lithops/serverless/backends/one/one_config.yaml b/lithops/serverless/backends/one/one_config.yaml index 304b6dd93..eff520e60 100644 --- a/lithops/serverless/backends/one/one_config.yaml +++ b/lithops/serverless/backends/one/one_config.yaml @@ -1,25 +1,12 @@ lithops: backend: one one: - service_id: # oneke service id (means OneKE is already deployed) - service_template_id: # oneke_tempalte_id (client has only downloaded the template) - - oneke_config: - public_network_id: # ID for Public vnet - private_network_id: # ID for Private vnet - - oneflow_url: # OneFlow server URL - oneserver_url: #Oned server URL - one_auth: # OpenNebula credentials - + docker_user: # Docker hub username + docker_pass: # Docker hub password runtime_cpu: # number of vCPU x POD (default: 1) runtime_memory: # amount of memory x POD (default: 512MiB) - minimum_nodes: # minimum number of nodes in OneKE (default: 0) - maximum_nodes: # maximum number of nodes in OneKE - oneke_config_path: # PATH to OneKE JSON config - timeout: # time to wait for OneKE to be ready (default: 600s) kubecfg_path': # PATH were kubeconfig will be stored (default: `/tmp/kubeconfig`) - - average_job_execution: # average job execution time (default: 1s) - auto_scale: # options: `'all'` (default), `'up'`, `'down'`, `'none'` \ No newline at end of file + auto_scale: # options: `'all'` (default), `'up'`, `'down'`, `'none'` + minimum_nodes: # minimum number of nodes (default: 1) + maximum_nodes: # maximum number of nodes (default: 3) \ No newline at end of file