diff --git a/.ci/helm.sh b/.ci/helm.sh index a1b8003eb..0a16f40fc 100644 --- a/.ci/helm.sh +++ b/.ci/helm.sh @@ -337,10 +337,22 @@ function ci::verify_crypto_function() { function ci::send_test_data() { inputtopic=$1 inputmessage=$2 - kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n 100 "${inputtopic}" + count=$3 + kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n $count "${inputtopic}" return 0 } +function ci::verify_backlog() { + topic=$1 + sub=$2 + expected=$3 + BACKLOG=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin topics stats $topic | grep msgBacklog) + if [[ "$BACKLOG" == *"\"msgBacklog\" : $expected"* ]]; then + return 0 + fi + return 1 +} + function ci::verify_exclamation_function() { inputtopic=$1 outputtopic=$2 diff --git a/.ci/tests/integration/cases/logging-window-function/manifests.yaml b/.ci/tests/integration/cases/logging-window-function/manifests.yaml index 3c1492b9d..0d6b77f8a 100644 --- a/.ci/tests/integration/cases/logging-window-function/manifests.yaml +++ b/.ci/tests/integration/cases/logging-window-function/manifests.yaml @@ -36,6 +36,10 @@ spec: windowConfig: windowLengthCount: 10 slidingIntervalCount: 5 + processingGuarantee: ATLEAST_ONCE + # the processingGuarantee should be manual for window function + # see: https://github.com/apache/pulsar/pull/16279/files#diff-c77c024ccb31c94a7aa80cb8e96d7e370709157bdc104a1be7867fb6c7aa0586R318-R319 + processingGuarantee: manual subscriptionPosition: earliest --- apiVersion: v1 diff --git a/.ci/tests/integration/cases/logging-window-function/verify.sh b/.ci/tests/integration/cases/logging-window-function/verify.sh index edd74f2e4..8828be915 100644 --- a/.ci/tests/integration/cases/logging-window-function/verify.sh +++ b/.ci/tests/integration/cases/logging-window-function/verify.sh @@ -43,17 +43,46 @@ if [ $? -ne 0 ]; then exit 1 fi -verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 2>&1) +verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 3 2>&1) if [ $? -ne 0 ]; then echo "$verify_java_result" kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true exit 1 fi +sleep 3 + +# the 3 messages will not be processed, so backlog should be 3 +verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic" "public/default/window-function-sample" 3 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_backlog_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +# it will fire the window with first 5 messages when get the 5th message, and then fire again with 10 messages when get 10th message +verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 7 2>&1) +if [ $? -ne 0 ]; then + echo "$verify_java_result" + kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true + exit 1 +fi + +# there is a bug in upstream that messages don't get ack if the function return null +# should be fixed by: https://github.com/apache/pulsar/pull/23618 +#sleep 3 +# +#verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic" "public/default/window-function-sample" 0 2>&1) +#if [ $? -ne 0 ]; then +# echo "$verify_backlog_result" +# kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true +# exit 1 +#fi + verify_log_result=$(kubectl logs -l compute.functionmesh.io/name=window-function-sample --tail=-1 | grep -e "-window-log" | wc -l) if [ $verify_log_result -ne 0 ]; then sub_name=$(echo $RANDOM | md5sum | head -c 20; echo;) - verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 10 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l) + verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 15 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l) if [ $verify_log_topic_result -ne 0 ]; then echo "e2e-test: ok" | yq eval - else diff --git a/api/compute/v1alpha1/common.go b/api/compute/v1alpha1/common.go index 45a3cf71a..178fdfeb1 100644 --- a/api/compute/v1alpha1/common.go +++ b/api/compute/v1alpha1/common.go @@ -407,6 +407,10 @@ const ( Manual ProcessGuarantee = "manual" ) +// WindowProcessGuarantee enum type +// +kubebuilder:validation:Enum=ATLEAST_ONCE;ATMOST_ONCE +type WindowProcessGuarantee string + // LogTopicAgent enum type // +kubebuilder:validation:Enum=runtime;sidecar type LogTopicAgent string @@ -533,15 +537,16 @@ type LogConfig struct { } type WindowConfig struct { - ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"` - WindowLengthCount *int32 `json:"windowLengthCount,omitempty"` - WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"` - SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"` - SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"` - LateDataTopic string `json:"lateDataTopic,omitempty"` - MaxLagMs *int64 `json:"maxLagMs,omitempty"` - WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"` - TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"` + ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"` + WindowLengthCount *int32 `json:"windowLengthCount,omitempty"` + WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"` + SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"` + SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"` + LateDataTopic string `json:"lateDataTopic,omitempty"` + MaxLagMs *int64 `json:"maxLagMs,omitempty"` + WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"` + TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"` + ProcessingGuarantee WindowProcessGuarantee `json:"processingGuarantee,omitempty"` } type VPASpec struct { diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml index 109c1a0d2..aad5b9d85 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functionmeshes.yaml @@ -3786,6 +3786,11 @@ spec: maxLagMs: format: int64 type: integer + processingGuarantee: + enum: + - ATLEAST_ONCE + - ATMOST_ONCE + type: string slidingIntervalCount: format: int32 type: integer diff --git a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml index 8e3568ecf..179f9ebd4 100644 --- a/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml +++ b/charts/function-mesh-operator/charts/admission-webhook/templates/crd-compute.functionmesh.io-functions.yaml @@ -3805,6 +3805,11 @@ spec: maxLagMs: format: int64 type: integer + processingGuarantee: + enum: + - ATLEAST_ONCE + - ATMOST_ONCE + type: string slidingIntervalCount: format: int32 type: integer diff --git a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml index 72502a887..890541cec 100644 --- a/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml +++ b/config/crd/bases/compute.functionmesh.io_functionmeshes.yaml @@ -3786,6 +3786,11 @@ spec: maxLagMs: format: int64 type: integer + processingGuarantee: + enum: + - ATLEAST_ONCE + - ATMOST_ONCE + type: string slidingIntervalCount: format: int32 type: integer diff --git a/config/crd/bases/compute.functionmesh.io_functions.yaml b/config/crd/bases/compute.functionmesh.io_functions.yaml index 94bfefa84..f1546efc4 100644 --- a/config/crd/bases/compute.functionmesh.io_functions.yaml +++ b/config/crd/bases/compute.functionmesh.io_functions.yaml @@ -3783,6 +3783,11 @@ spec: maxLagMs: format: int64 type: integer + processingGuarantee: + enum: + - ATLEAST_ONCE + - ATMOST_ONCE + type: string slidingIntervalCount: format: int32 type: integer