Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
214e6f0
add pyink modification
kiryl-filatau Feb 24, 2026
e0b33ec
Merge branch 'azure-5k' into aws-5k
kiryl-filatau Feb 24, 2026
3519bf3
Merge branch 'azure-5k' into aws-5k
kiryl-filatau Feb 24, 2026
5577dee
extend timeout for deletion
kiryl-filatau Feb 25, 2026
5869a7e
decrease the timeout, as 1 hour is enough
kiryl-filatau Mar 2, 2026
ed94853
pyink reformat
kiryl-filatau Mar 4, 2026
684215e
reformatted by pyink
kiryl-filatau Mar 11, 2026
568ba1a
Merge branch 'master' into aws-5k
kiryl-filatau Mar 12, 2026
d14b242
Fix pytype in Prepare() and GetNodeNames mocks in kubernetes_tracker_…
kiryl-filatau Mar 12, 2026
f76985c
EKS cleanup uses RunRetryableKubectlCommand, kubectl retry list exten…
kiryl-filatau Mar 16, 2026
c72880e
Fix pytype error in kubernetes_tracker by using isinstance for type n…
kiryl-filatau Mar 18, 2026
cea09dd
pyink adjustment
kiryl-filatau Mar 18, 2026
4e51710
Make EKS Karpenter CPU limit configurable via eks_karpenter_limits_vc…
kiryl-filatau Mar 19, 2026
6c7542f
Retry deleting orphaned ENIs on AWS rate limits (vm_util.Retry)
kiryl-filatau Mar 19, 2026
c834c95
Add eks_karpenter_nodepool_instance_types for default Karpenter NodePool
kiryl-filatau Mar 19, 2026
711893a
added karpenter NodePool instance-type key, improved cleanup IAM step…
kiryl-filatau Mar 20, 2026
7e930f9
Merge branch 'master' into aws-5k
kiryl-filatau Mar 21, 2026
8afdf4a
Merge branch 'master' into aws-5k
kiryl-filatau Mar 27, 2026
3e29c77
fix lint: wrap long lines, shorten docstring, drop .keys(), rename _D…
kiryl-filatau Mar 27, 2026
e663e9a
conflict resolved
kiryl-filatau Apr 7, 2026
3561a38
pyink reformated
kiryl-filatau Apr 7, 2026
b4e6dc7
empty line removed
kiryl-filatau Apr 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions perfkitbenchmarker/data/container/karpenter/nodepool.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,25 @@ spec:
- key: karpenter.sh/capacity-type
operator: In
values: ["on-demand"]
{%- if KARPENTER_INSTANCE_TYPES %}
- key: node.kubernetes.io/instance-type
operator: In
values: [{% for instance_type in KARPENTER_INSTANCE_TYPES %}"{{ instance_type }}"{% if not loop.last %}, {% endif %}{% endfor %}]
{%- else %}
- key: karpenter.k8s.aws/instance-category
operator: In
values: ["c", "m", "r"]
values: ["c", "m", "r", "t"]
- key: karpenter.k8s.aws/instance-generation
operator: Gt
values: ["2"]
{%- endif %}
nodeClassRef:
group: karpenter.k8s.aws
kind: EC2NodeClass
name: default
expireAfter: 720h # 30 * 24h = 720h
limits:
cpu: 1000
cpu: {{ KARPENTER_NODEPOOL_CPU_LIMIT }}
disruption:
consolidationPolicy: WhenEmptyOrUnderutilized
consolidateAfter: 1m
Expand Down
44 changes: 22 additions & 22 deletions perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ def Prepare(bm_spec: benchmark_spec.BenchmarkSpec):
"""Sets additional spec attributes."""
bm_spec.always_call_cleanup = True
assert bm_spec.container_cluster
_EnsureEksKarpenterGpuNodepool(bm_spec.container_cluster)
cluster = bm_spec.container_cluster
assert isinstance(cluster, kubernetes_cluster.KubernetesCluster)
_EnsureEksKarpenterGpuNodepool(cluster)


def _GetScaleTimeout() -> int:
Expand Down Expand Up @@ -377,35 +379,33 @@ def GetStatusConditionsForResourceType(
lastTransitionTime.
"""

jsonpath = (
r'{range .items[*]}'
# e.g. '"pod-name-1234": [<condition1>, ...],\n'
r'{"\""}{.metadata.name}{"\": "}{.status.conditions}{",\n"}'
r'{end}'
)
# Use full JSON output to avoid invalid JSON when manually building from
# jsonpath with many resources or on connection reset (truncated output).
# Avoid logging huge JSON: kubernetes_scale uses num_replicas;
# kubernetes_node_scale uses kubernetes_scale_num_nodes for the
# same code path (get pod/node -o json).
stdout, _, _ = kubectl.RunKubectlCommand(
[
'get',
resource_type,
'-o',
'jsonpath=' + jsonpath,
],
timeout=60 * 2, # 2 minutes; should be a pretty fast call.
# Output can be quite large, so we'll conditionally suppress it.
suppress_logging=NUM_PODS.value > 20,
['get', resource_type, '-o', 'json'],
timeout=60 * 5, # 5 minutes for large clusters (e.g. 1000 pods)
suppress_logging=(
NUM_PODS.value > 20
or getattr(FLAGS, 'kubernetes_scale_num_nodes', 5) > 20
),
)

# Convert output to valid json and parse it
stdout = stdout.rstrip('\t\n\r ,')
stdout = '{' + stdout + '}'
name_to_conditions = json.loads(stdout)
data = json.loads(stdout)
name_to_conditions = {}
for item in data.get('items', []):
name = item.get('metadata', {}).get('name')
conditions = item.get('status', {}).get('conditions')
if name is not None and conditions is not None:
name_to_conditions[name] = conditions

for key in resources_to_ignore:
name_to_conditions.pop(key, None)

results = []
failures = []
for name in name_to_conditions.keys():
for name in name_to_conditions:
for conditions in name_to_conditions[name]:
if not KubernetesResourceStatusCondition.IsValid(conditions):
failures.append(conditions)
Expand Down
136 changes: 100 additions & 36 deletions perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from collections import abc
import json
import logging
import math
import re
from typing import Any
from urllib import parse
Expand Down Expand Up @@ -704,9 +705,7 @@ def _Create(self):
}],
},
'iamIdentityMappings': [{
'arn': (
f'arn:aws:iam::{self.account}:role/KarpenterNodeRole-{self.name}'
),
'arn': f'arn:aws:iam::{self.account}:role/KarpenterNodeRole-{self.name}',
'username': 'system:node:{{EC2PrivateDNSName}}',
'groups': ['system:bootstrappers', 'system:nodes'],
}],
Expand Down Expand Up @@ -992,9 +991,31 @@ def _PostIngressNetworkingFixups(
'[PKB][EKS] Allowed ALB SG %s -> node SGs on port %s', alb_sg, port
)

@staticmethod
def _DefaultNodepoolInstanceTypes() -> list[str]:
"""EC2 instance types for the default Karpenter NodePool manifest.

Controlled by --eks_karpenter_nodepool_instance_types.
Empty list: Jinja template keeps instance-category/generation rules.
"""
return [
t.strip()
for t in FLAGS.eks_karpenter_nodepool_instance_types
if t.strip()
]

def _PostCreate(self):
"""Performs post-creation steps for the cluster."""
super()._PostCreate()
# Karpenter controller resources: default 1/1Gi; scale up when
# node_scale target exceeds 1000 nodes.
num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', None)
if num_nodes is not None and num_nodes > 1000:
controller_cpu, controller_memory = 4, '16Gi'
elif num_nodes is not None and num_nodes >= 500:
controller_cpu, controller_memory = 2, '8Gi'
else:
controller_cpu, controller_memory = 1, '1Gi'
vm_util.IssueCommand([
'helm',
'upgrade',
Expand All @@ -1013,13 +1034,13 @@ def _PostCreate(self):
'--set',
f'settings.interruptionQueue={self.name}',
'--set',
'controller.resources.requests.cpu=1',
f'controller.resources.requests.cpu={controller_cpu}',
'--set',
'controller.resources.requests.memory=1Gi',
f'controller.resources.requests.memory={controller_memory}',
'--set',
'controller.resources.limits.cpu=1',
f'controller.resources.limits.cpu={controller_cpu}',
'--set',
'controller.resources.limits.memory=1Gi',
f'controller.resources.limits.memory={controller_memory}',
'--set',
'logLevel=debug',
'--wait',
Expand Down Expand Up @@ -1057,10 +1078,16 @@ def _PostCreate(self):
'v'
+ full_version.strip().strip('"').split(f'{self.cluster_version}-v')[1]
)
# NodePool CPU limit: benchmark target nodes * vCPU + 5%, min 1000.
num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', 5)
vcpu_per_node = FLAGS.eks_karpenter_limits_vcpu_per_node
cpu_limit = max(1000, math.ceil(num_nodes * vcpu_per_node * 1.05))
kubernetes_commands.ApplyManifest(
'container/karpenter/nodepool.yaml.j2',
CLUSTER_NAME=self.name,
ALIAS_VERSION=alias_version,
KARPENTER_NODEPOOL_CPU_LIMIT=cpu_limit,
KARPENTER_INSTANCE_TYPES=self._DefaultNodepoolInstanceTypes(),
)

def _Delete(self):
Expand Down Expand Up @@ -1093,16 +1120,33 @@ def _DeleteDependencies(self):
# Start deleting the stack but likely to fail to delete this role.
vm_util.IssueCommand(delete_stack_cmd)
node_role = f'KarpenterNodeRole-{self.name}'
out, _, _ = vm_util.IssueCommand([
'aws',
'iam',
'list-instance-profiles-for-role',
'--role-name',
node_role,
'--region',
f'{self.region}',
])
profiles_json = json.loads(out)
out, _, retcode = vm_util.IssueCommand(
[
'aws',
'iam',
'list-instance-profiles-for-role',
'--role-name',
node_role,
'--region',
f'{self.region}',
],
suppress_failure=lambda stdout, stderr, rc: (
rc != 0
and (
'nosuchentity' in (stderr or '').lower()
or 'cannot be found' in (stderr or '').lower()
)
),
)
if retcode == 0 and out.strip():
profiles_json = json.loads(out)
else:
logging.info(
'Karpenter node role %s not found or empty response; skipping'
' instance profile cleanup',
node_role,
)
profiles_json = {'InstanceProfiles': []}
for profile in profiles_json.get('InstanceProfiles', []):
profile_name = profile['InstanceProfileName']
vm_util.IssueCommand([
Expand Down Expand Up @@ -1149,21 +1193,21 @@ def _CleanupKarpenter(self):
"""Cleanup Karpenter managed nodes before cluster deletion."""
logging.info('Cleaning up Karpenter nodes...')
# Delete NodePool resources - this will trigger node termination
kubectl.RunKubectlCommand(
kubectl.RunRetryableKubectlCommand(
[
'delete',
'nodepool,ec2nodeclass',
'--all',
'--timeout=120s',
],
timeout=300,
suppress_failure=lambda stdout, stderr, retcode: (
'no resources found' in stderr.lower()
or 'not found' in stderr.lower()
or 'timed out waiting for the condition' in stderr.lower()
),
)
# Wait for all Karpenter nodes to be deleted
kubectl.RunKubectlCommand(
kubectl.RunRetryableKubectlCommand(
[
'wait',
'--for=delete',
Expand All @@ -1172,9 +1216,10 @@ def _CleanupKarpenter(self):
'karpenter.sh/nodepool',
'--timeout=120s',
],
timeout=300,
suppress_failure=lambda stdout, stderr, retcode: (
'no matching resources found' in stderr.lower()
or 'timed out' in stderr.lower()
or 'no resources found' in stderr.lower()
),
)
# Force terminate remaining EC2 instances
Expand Down Expand Up @@ -1246,21 +1291,40 @@ def _CleanupKarpenter(self):
if eni_ids:
logging.info('Deleting %d orphaned network interfaces', len(eni_ids))
for eni_id in eni_ids:
vm_util.IssueCommand(
[
'aws',
'ec2',
'delete-network-interface',
'--region',
self.region,
'--network-interface-id',
eni_id,
],
suppress_failure=lambda stdout, stderr, retcode: (
'not found' in stderr.lower()
or 'does not exist' in stderr.lower()
),
)
# Bind eni_id by default to avoid loop closure issues if
# this is refactored.
def _DeleteOneEni(eni_id=eni_id) -> None:
_, stderr, retcode = vm_util.IssueCommand(
[
'aws',
'ec2',
'delete-network-interface',
'--region',
self.region,
'--network-interface-id',
eni_id,
],
raise_on_failure=False,
)
if retcode == 0:
return
stderr_lower = (stderr or '').lower()
# ENI already deleted (e.g. by another process or previous attempt).
if 'invalidnetworkinterfaceid.notfound' in stderr_lower:
return
# RequestLimitExceeded (throttle): retry via vm_util.Retry.
if 'requestlimitexceeded' in stderr_lower:
raise errors.Resource.RetryableDeletionError(stderr or '')
raise errors.VmUtil.IssueCommandError(
f'DeleteNetworkInterface failed: {stderr}'
)

# max_retries=5 yields 6 CLI attempts (tries > 5 on 6th failure).
vm_util.Retry(
poll_interval=10,
max_retries=5,
retryable_exceptions=(errors.Resource.RetryableDeletionError,),
)(_DeleteOneEni)()

def _IsReady(self):
"""Returns True if cluster is running. Autopilot defaults to 0 nodes."""
Expand Down
13 changes: 13 additions & 0 deletions perfkitbenchmarker/providers/aws/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,19 @@
'Whether to install AWS Load Balancer Controller in EKS Karpenter clusters'
'Default value - do not install unless explicitly requested',
)
flags.DEFINE_integer(
'eks_karpenter_limits_vcpu_per_node',
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use flagholder: https://absl.readthedocs.io/en/latest/absl.flags.html#absl.flags.FlagHolder

Also not sure if this is generally the right spot for these. Ideally probably both should go in config_overrides, with this one maybe being set cpu size from vm_spec & the other coming in a follow up cl.

2,
'Assumed vCPUs per provisioned node when computing Karpenter NodePool '
'limits.cpu on EKS (uses kubernetes_scale_num_nodes, this value, and 5% '
'headroom; minimum limit 1000). Raise for larger EC2 instance shapes.',
)
flags.DEFINE_list(
'eks_karpenter_nodepool_instance_types',
[],
'Comma-separated EC2 types for the Karpenter default NodePool (worker '
'nodes only). Empty keeps instance-category/generation in the template.',
)
AWS_CAPACITY_BLOCK_RESERVATION_ID = flags.DEFINE_string(
'aws_capacity_block_reservation_id',
None,
Expand Down
7 changes: 6 additions & 1 deletion perfkitbenchmarker/resources/container_service/kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
'error sending request:',
'(abnormal closure): unexpected EOF',
'deadline exceeded',
# kubectl wait/delete timeouts and connection errors
# (retried in EKS cleanup)
'timed out',
'unable to connect to the server',
]


Expand All @@ -38,8 +42,9 @@ def _DetectTimeoutViaSuppressFailure(stdout, stderr, retcode):
# Check for kubectl timeout. If found, treat it the same as a regular
# timeout.
if retcode != 0:
stderr_lower = stderr.lower()
for error_substring in RETRYABLE_KUBECTL_ERRORS:
if error_substring in stderr:
if error_substring.lower() in stderr_lower:
# Raise timeout error regardless of raise_on_failure - as the intended
# semantics is to ignore expected errors caused by invoking the
# command not errors from PKB infrastructure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def _DeleteAllFromDefaultNamespace():
run_cmd = ['delete', 'job', '--all', '-n', 'default']
kubectl.RunRetryableKubectlCommand(run_cmd)

timeout = 60 * 20
timeout = 60 * 60 # 1 hour for kubectl delete all -n default (teardown)
run_cmd = [
'delete',
'all',
Expand Down
Loading