Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion .ci/helm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,22 @@ function ci::verify_crypto_function() {
function ci::send_test_data() {
inputtopic=$1
inputmessage=$2
kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n 100 "${inputtopic}"
count=$3
kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-client produce -m "${inputmessage}" -n $count "${inputtopic}"
return 0
}

function ci::verify_backlog() {
topic=$1
sub=$2
expected=$3
BACKLOG=$(kubectl exec -n ${NAMESPACE} ${CLUSTER}-pulsar-broker-0 -- bin/pulsar-admin topics stats $topic | grep msgBacklog)
if [[ "$BACKLOG" == *"\"msgBacklog\" : $expected"* ]]; then
return 0
fi
return 1
}

function ci::verify_exclamation_function() {
inputtopic=$1
outputtopic=$2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ spec:
windowConfig:
windowLengthCount: 10
slidingIntervalCount: 5
processingGuarantee: ATLEAST_ONCE
# the processingGuarantee should be manual for window function
# see: https://github.com/apache/pulsar/pull/16279/files#diff-c77c024ccb31c94a7aa80cb8e96d7e370709157bdc104a1be7867fb6c7aa0586R318-R319
processingGuarantee: manual
subscriptionPosition: earliest
---
apiVersion: v1
Expand Down
33 changes: 31 additions & 2 deletions .ci/tests/integration/cases/logging-window-function/verify.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,46 @@ if [ $? -ne 0 ]; then
exit 1
fi

verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 2>&1)
verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 3 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_java_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi

sleep 3

# the 3 messages will not be processed, so backlog should be 3
verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic" "public/default/window-function-sample" 3 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_backlog_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi

# it will fire the window with first 5 messages when get the 5th message, and then fire again with 10 messages when get 10th message
verify_java_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::send_test_data "persistent://public/default/window-function-input-topic" "test-message" 7 2>&1)
if [ $? -ne 0 ]; then
echo "$verify_java_result"
kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
exit 1
fi

# there is a bug in upstream that messages don't get ack if the function return null
# should be fixed by: https://github.com/apache/pulsar/pull/23618
#sleep 3
#
#verify_backlog_result=$(NAMESPACE=${PULSAR_NAMESPACE} CLUSTER=${PULSAR_RELEASE_NAME} ci::verify_backlog "persistent://public/default/window-function-input-topic" "public/default/window-function-sample" 0 2>&1)
#if [ $? -ne 0 ]; then
# echo "$verify_backlog_result"
# kubectl delete -f "${manifests_file}" > /dev/null 2>&1 || true
# exit 1
#fi

verify_log_result=$(kubectl logs -l compute.functionmesh.io/name=window-function-sample --tail=-1 | grep -e "-window-log" | wc -l)
if [ $verify_log_result -ne 0 ]; then
sub_name=$(echo $RANDOM | md5sum | head -c 20; echo;)
verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 10 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l)
verify_log_topic_result=$(kubectl exec -n ${PULSAR_NAMESPACE} ${PULSAR_RELEASE_NAME}-pulsar-broker-0 -- bin/pulsar-client consume -n 15 -s $sub_name --subscription-position Earliest "persistent://public/default/window-function-logs" | grep -e "-window-log" | wc -l)
if [ $verify_log_topic_result -ne 0 ]; then
echo "e2e-test: ok" | yq eval -
else
Expand Down
23 changes: 14 additions & 9 deletions api/compute/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,10 @@ const (
Manual ProcessGuarantee = "manual"
)

// WindowProcessGuarantee enum type
// +kubebuilder:validation:Enum=ATLEAST_ONCE;ATMOST_ONCE
type WindowProcessGuarantee string

// LogTopicAgent enum type
// +kubebuilder:validation:Enum=runtime;sidecar
type LogTopicAgent string
Expand Down Expand Up @@ -533,15 +537,16 @@ type LogConfig struct {
}

type WindowConfig struct {
ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"`
WindowLengthCount *int32 `json:"windowLengthCount,omitempty"`
WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"`
SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"`
SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"`
LateDataTopic string `json:"lateDataTopic,omitempty"`
MaxLagMs *int64 `json:"maxLagMs,omitempty"`
WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"`
TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"`
ActualWindowFunctionClassName string `json:"actualWindowFunctionClassName"`
WindowLengthCount *int32 `json:"windowLengthCount,omitempty"`
WindowLengthDurationMs *int64 `json:"windowLengthDurationMs,omitempty"`
SlidingIntervalCount *int32 `json:"slidingIntervalCount,omitempty"`
SlidingIntervalDurationMs *int64 `json:"slidingIntervalDurationMs,omitempty"`
LateDataTopic string `json:"lateDataTopic,omitempty"`
MaxLagMs *int64 `json:"maxLagMs,omitempty"`
WatermarkEmitIntervalMs *int64 `json:"watermarkEmitIntervalMs,omitempty"`
TimestampExtractorClassName *string `json:"timestampExtractorClassName,omitempty"`
ProcessingGuarantee WindowProcessGuarantee `json:"processingGuarantee,omitempty"`
}

type VPASpec struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3786,6 +3786,11 @@ spec:
maxLagMs:
format: int64
type: integer
processingGuarantee:
enum:
- ATLEAST_ONCE
- ATMOST_ONCE
type: string
slidingIntervalCount:
format: int32
type: integer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3805,6 +3805,11 @@ spec:
maxLagMs:
format: int64
type: integer
processingGuarantee:
enum:
- ATLEAST_ONCE
- ATMOST_ONCE
type: string
slidingIntervalCount:
format: int32
type: integer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3786,6 +3786,11 @@ spec:
maxLagMs:
format: int64
type: integer
processingGuarantee:
enum:
- ATLEAST_ONCE
- ATMOST_ONCE
type: string
slidingIntervalCount:
format: int32
type: integer
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/compute.functionmesh.io_functions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3783,6 +3783,11 @@ spec:
maxLagMs:
format: int64
type: integer
processingGuarantee:
enum:
- ATLEAST_ONCE
- ATMOST_ONCE
type: string
slidingIntervalCount:
format: int32
type: integer
Expand Down
Loading