Skip to content

Commit 5a4e753

Browse files
committed
added triggerer test
1 parent 91ed762 commit 5a4e753

18 files changed

+617
-0
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{% if test_scenario['values']['openshift'] == 'true' %}
2+
# see https://github.com/stackabletech/issues/issues/566
3+
---
4+
apiVersion: kuttl.dev/v1beta1
5+
kind: TestStep
6+
commands:
7+
- script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}'
8+
timeout: 120
9+
{% endif %}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestAssert
4+
metadata:
5+
name: test-airflow-postgresql
6+
timeout: 480
7+
---
8+
apiVersion: apps/v1
9+
kind: StatefulSet
10+
metadata:
11+
name: airflow-postgresql
12+
status:
13+
readyReplicas: 1
14+
replicas: 1
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestStep
4+
commands:
5+
- script: >-
6+
helm install airflow-postgresql
7+
--namespace $NAMESPACE
8+
--version 16.4.2
9+
-f helm-bitnami-postgresql-values.yaml
10+
oci://registry-1.docker.io/bitnamicharts/postgresql
11+
timeout: 600
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{% if test_scenario['values']['executor'] == 'celery' %}
2+
---
3+
apiVersion: kuttl.dev/v1beta1
4+
kind: TestAssert
5+
metadata:
6+
name: test-airflow-redis
7+
timeout: 360
8+
---
9+
apiVersion: apps/v1
10+
kind: StatefulSet
11+
metadata:
12+
name: airflow-redis-master
13+
status:
14+
readyReplicas: 1
15+
replicas: 1
16+
---
17+
apiVersion: apps/v1
18+
kind: StatefulSet
19+
metadata:
20+
name: airflow-redis-replicas
21+
status:
22+
readyReplicas: 1
23+
replicas: 1
24+
{% endif %}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{% if test_scenario['values']['executor'] == 'celery' %}
2+
---
3+
apiVersion: kuttl.dev/v1beta1
4+
kind: TestStep
5+
commands:
6+
- script: >-
7+
helm install airflow-redis
8+
--namespace $NAMESPACE
9+
--version 17.11.3
10+
-f helm-bitnami-redis-values.yaml
11+
--repo https://charts.bitnami.com/bitnami redis
12+
timeout: 600
13+
{% endif %}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestAssert
4+
metadata:
5+
name: test-airflow-cluster
6+
timeout: 1200
7+
---
8+
apiVersion: apps/v1
9+
kind: StatefulSet
10+
metadata:
11+
name: airflow-webserver-default
12+
status:
13+
readyReplicas: 1
14+
replicas: 1
15+
{% if test_scenario['values']['executor'] == 'celery' %}
16+
---
17+
apiVersion: apps/v1
18+
kind: StatefulSet
19+
metadata:
20+
name: airflow-worker-default
21+
status:
22+
readyReplicas: 1
23+
replicas: 1
24+
{% endif %}
25+
---
26+
apiVersion: apps/v1
27+
kind: StatefulSet
28+
metadata:
29+
name: airflow-scheduler-default
30+
status:
31+
readyReplicas: 1
32+
replicas: 1
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
apiVersion: kuttl.dev/v1beta1
2+
kind: TestStep
3+
metadata:
4+
name: install-airflow-db
5+
timeout: 480
6+
---
7+
apiVersion: v1
8+
kind: Secret
9+
metadata:
10+
name: test-airflow-credentials
11+
type: Opaque
12+
stringData:
13+
adminUser.username: airflow
14+
adminUser.firstname: Airflow
15+
adminUser.lastname: Admin
16+
adminUser.email: airflow@airflow.com
17+
adminUser.password: airflow
18+
connections.secretKey: thisISaSECRET_1234
19+
connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@airflow-postgresql/airflow
20+
{% if test_scenario['values']['executor'] == 'celery' %}
21+
connections.celeryResultBackend: db+postgresql://airflow:airflow@airflow-postgresql/airflow
22+
connections.celeryBrokerUrl: redis://:redis@airflow-redis-master:6379/0
23+
{% endif %}
24+
---
25+
apiVersion: v1
26+
kind: ConfigMap
27+
metadata:
28+
name: triggerer-dag
29+
data:
30+
triggerer_dag.py: |
31+
from datetime import datetime, timedelta
32+
33+
from airflow import DAG
34+
from airflow.models.baseoperator import BaseOperator
35+
from airflow.triggers.temporal import TimeDeltaTrigger
36+
from airflow.utils.context import Context
37+
from airflow.operators.empty import EmptyOperator
38+
39+
# ------------------------------------------------------
40+
# Custom deferrable operator - does a simple async sleep
41+
# ------------------------------------------------------
42+
class CoreDeferrableSleepOperator(BaseOperator):
43+
"""
44+
Sleeps for ``duration`` seconds without occupying a worker.
45+
The async hand-off happens via ``self.defer`` + ``TimeDeltaTrigger``.
46+
"""
47+
ui_color = "#ffefeb"
48+
49+
def __init__(self, *, duration: int, **kwargs):
50+
super().__init__(**kwargs)
51+
self.duration = duration
52+
53+
def execute(self, context: Context):
54+
"""Run on a worker, then hand control to the Triggerer."""
55+
# Build the trigger that will fire after `duration` seconds.
56+
trigger = TimeDeltaTrigger(timedelta(seconds=self.duration))
57+
58+
# *** Asynchronous hand-off ***
59+
# This tells the scheduler: “pause this task, let the Triggerer watch the timer”.
60+
self.defer(trigger=trigger, method_name="execute_complete")
61+
62+
def execute_complete(self, context: Context, event=None):
63+
"""Resumes here once the Triggerer fires."""
64+
self.log.info("Deferrable sleep of %s seconds finished.", self.duration)
65+
return "DONE"
66+
67+
default_args = {"owner": "stackable", "retries": 0}
68+
69+
with DAG(
70+
dag_id="core_deferrable_sleep_demo",
71+
schedule=None,
72+
# N.B. this be earlier than the current timestamp!
73+
start_date=datetime(2025, 8, 1),
74+
catchup=False,
75+
default_args=default_args,
76+
tags=["example", "triggerer"],
77+
) as dag:
78+
79+
start = EmptyOperator(task_id="start")
80+
81+
sleep = CoreDeferrableSleepOperator(
82+
task_id="deferrable_sleep",
83+
duration=10,
84+
)
85+
86+
finish = EmptyOperator(task_id="finish")
87+
88+
start >> sleep >> finish
89+
---
90+
apiVersion: airflow.stackable.tech/v1alpha1
91+
kind: AirflowCluster
92+
metadata:
93+
name: airflow
94+
spec:
95+
image:
96+
{% if test_scenario['values']['airflow-latest'].find(",") > 0 %}
97+
custom: "{{ test_scenario['values']['airflow-latest'].split(',')[1] }}"
98+
productVersion: "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}"
99+
{% else %}
100+
productVersion: "{{ test_scenario['values']['airflow-latest'] }}"
101+
{% endif %}
102+
pullPolicy: IfNotPresent
103+
clusterConfig:
104+
credentialsSecret: test-airflow-credentials
105+
volumes:
106+
- name: triggerer-dag
107+
configMap:
108+
name: triggerer-dag
109+
volumeMounts:
110+
- name: triggerer-dag
111+
mountPath: /dags/triggerer_dag.py
112+
subPath: triggerer_dag.py
113+
webservers:
114+
roleConfig:
115+
listenerClass: external-unstable
116+
roleGroups:
117+
default:
118+
envOverrides: &envOverrides
119+
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
120+
replicas: 1
121+
{% if test_scenario['values']['executor'] == 'celery' %}
122+
celeryExecutors:
123+
roleGroups:
124+
default:
125+
envOverrides: *envOverrides
126+
replicas: 1
127+
{% elif test_scenario['values']['executor'] == 'kubernetes' %}
128+
kubernetesExecutors:
129+
envOverrides: *envOverrides
130+
{% endif %}
131+
schedulers:
132+
config:
133+
gracefulShutdownTimeout: 10s
134+
roleGroups:
135+
default:
136+
envOverrides: *envOverrides
137+
replicas: 1
138+
dagProcessors:
139+
roleGroups:
140+
default:
141+
envOverrides: *envOverrides
142+
replicas: 1
143+
triggerers:
144+
roleGroups:
145+
default:
146+
envOverrides: *envOverrides
147+
replicas: 1
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestAssert
4+
metadata:
5+
name: test-airflow-python
6+
timeout: 240
7+
---
8+
apiVersion: apps/v1
9+
kind: StatefulSet
10+
metadata:
11+
name: test-airflow-python
12+
status:
13+
readyReplicas: 1
14+
replicas: 1
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
---
2+
apiVersion: apps/v1
3+
kind: StatefulSet
4+
metadata:
5+
name: test-airflow-python
6+
labels:
7+
app: test-airflow-python
8+
spec:
9+
replicas: 1
10+
selector:
11+
matchLabels:
12+
app: test-airflow-python
13+
template:
14+
metadata:
15+
labels:
16+
app: test-airflow-python
17+
spec:
18+
containers:
19+
- name: test-airflow-python
20+
image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev
21+
imagePullPolicy: IfNotPresent
22+
stdin: true
23+
tty: true
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
apiVersion: kuttl.dev/v1beta1
3+
kind: TestAssert
4+
metadata:
5+
name: test-airflow-webserver-health-check
6+
timeout: 480
7+
commands:
8+
{% if test_scenario['values']['airflow-latest'].find(",") > 0 %}
9+
- script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}"
10+
{% else %}
11+
- script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}"
12+
{% endif %}

0 commit comments

Comments
 (0)