From 25a2844f4872f7e88596603c626676fe19387546 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Mon, 15 Dec 2025 21:22:16 -0500 Subject: [PATCH 1/2] Flink 2 support prerequisites * Honor getUseDataStreamForBatch pipeline option for Flink portable runner * Refactor gradle scripts in preparation for Flink 2 support --- runners/flink/flink_runner.gradle | 29 +++++++++++++------ .../flink_job_server_container.gradle | 10 +++++-- .../flink/job-server/flink_job_server.gradle | 27 ++++++++++++----- .../runners/flink/FlinkPipelineRunner.java | 4 ++- .../wrappers/streaming/DoFnOperator.java | 0 .../streaming/MemoryStateBackendWrapper.java | 0 .../flink/streaming/StreamSources.java | 0 7 files changed, 49 insertions(+), 21 deletions(-) rename runners/flink/{1.17 => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java (100%) rename runners/flink/{1.17 => }/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java (100%) rename runners/flink/{1.17 => }/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java (100%) diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 52f9631f455f..449b0621d96d 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -28,7 +28,8 @@ import groovy.json.JsonOutput def base_path = ".." def overrides(versions, type, base_path) { - versions.collect { "${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"] + // order is important + ["${base_path}/src/${type}/java"] + versions.collect { "${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"] } def all_versions = flink_versions.split(",") @@ -49,7 +50,8 @@ applyJavaNature( automaticModuleName: 'org.apache.beam.runners.flink', archivesBaseName: archivesBaseName, // flink runner jars are in same package name. Publish javadoc once. - exportJavadoc: project.ext.flink_version.startsWith(all_versions.first()) + exportJavadoc: project.ext.flink_version.startsWith(all_versions.first()), + requireJavaVersion: project.ext.flink_major.startsWith('2') ? JavaVersion.VERSION_11 : null ) description = "Apache Beam :: Runners :: Flink $flink_version" @@ -68,10 +70,16 @@ evaluationDependsOn(":examples:java") */ def sourceOverridesBase = project.layout.buildDirectory.dir('source-overrides/src').get() -def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { - it.from main_source_overrides - it.into "${sourceOverridesBase}/main/java" - it.duplicatesStrategy DuplicatesStrategy.INCLUDE +def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { copyTask -> + copyTask.from main_source_overrides + copyTask.into "${sourceOverridesBase}/main/java" + copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE + + if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('main')) { + project.ext.excluded_files.main.each { file -> + copyTask.exclude "**/${file}" + } + } } def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) { @@ -119,7 +127,7 @@ def sourceBase = "${project.projectDir}/../src" sourceSets { main { java { - srcDirs = ["${sourceBase}/main/java", "${sourceOverridesBase}/main/java"] + srcDirs = ["${sourceOverridesBase}/main/java"] } resources { srcDirs = ["${sourceBase}/main/resources", "${sourceOverridesBase}/main/resources"] @@ -127,7 +135,7 @@ sourceSets { } test { java { - srcDirs = ["${sourceBase}/test/java", "${sourceOverridesBase}/test/java"] + srcDirs = ["${sourceOverridesBase}/test/java"] } resources { srcDirs = ["${sourceBase}/test/resources", "${sourceOverridesBase}/test/resources"] @@ -196,7 +204,10 @@ dependencies { implementation "org.apache.flink:flink-core:$flink_version" implementation "org.apache.flink:flink-metrics-core:$flink_version" - implementation "org.apache.flink:flink-java:$flink_version" + if (project.ext.flink_major.startsWith('1')) { + // FLINK-36336: dataset API removed in Flink 2 + implementation "org.apache.flink:flink-java:$flink_version" + } implementation "org.apache.flink:flink-runtime:$flink_version" implementation "org.apache.flink:flink-metrics-core:$flink_version" diff --git a/runners/flink/job-server-container/flink_job_server_container.gradle b/runners/flink/job-server-container/flink_job_server_container.gradle index 3f30a1aac1fb..0133a520477b 100644 --- a/runners/flink/job-server-container/flink_job_server_container.gradle +++ b/runners/flink/job-server-container/flink_job_server_container.gradle @@ -53,15 +53,19 @@ task copyDockerfileDependencies(type: Copy) { } def pushContainers = project.rootProject.hasProperty(["isRelease"]) || project.rootProject.hasProperty("push-containers") +def containerName = project.parent.name.startsWith("2") ? "flink_job_server" : "flink${project.parent.name}_job_server" +def containerTag = project.rootProject.hasProperty(["docker-tag"]) ? project.rootProject["docker-tag"] : project.sdk_version +if (project.parent.name.startsWith("2")) { + containerTag += "-flink" + ${project.parent.name} +} docker { name containerImageName( - name: project.docker_image_default_repo_prefix + "flink${project.parent.name}_job_server", + name: project.docker_image_default_repo_prefix + containerName, root: project.rootProject.hasProperty(["docker-repository-root"]) ? project.rootProject["docker-repository-root"] : project.docker_image_default_repo_root, - tag: project.rootProject.hasProperty(["docker-tag"]) ? - project.rootProject["docker-tag"] : project.sdk_version) + tag: containerTag) // tags used by dockerTag task tags containerImageTags() files "./build/" diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index d8a818ff84c4..88808cc7ed60 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -29,6 +29,11 @@ apply plugin: 'application' // we need to set mainClassName before applying shadow plugin mainClassName = "org.apache.beam.runners.flink.FlinkJobServerDriver" +// Resolve the Flink project name (and version) the job-server is based on +def flinkRunnerProject = "${project.path.replace(":job-server", "")}" +evaluationDependsOn(flinkRunnerProject) +boolean isFlink2 = project(flinkRunnerProject).ext.flink_major.startsWith('2') + applyJavaNature( automaticModuleName: 'org.apache.beam.runners.flink.jobserver', archivesBaseName: project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName, @@ -37,11 +42,9 @@ applyJavaNature( shadowClosure: { append "reference.conf" }, + requireJavaVersion: isFlink2 ? JavaVersion.VERSION_11 : null ) -// Resolve the Flink project name (and version) the job-server is based on -def flinkRunnerProject = "${project.path.replace(":job-server", "")}" - description = project(flinkRunnerProject).description + " :: Job Server" /* @@ -126,11 +129,12 @@ runShadow { jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"] } -def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpointing, boolean docker) { +def portableValidatesRunnerTask(String name, String mode, boolean checkpointing, boolean docker) { def pipelineOptions = [ // Limit resource consumption via parallelism "--parallelism=2", ] + boolean streaming = (mode == "streaming") if (streaming) { pipelineOptions += "--streaming" if (checkpointing) { @@ -138,6 +142,9 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi pipelineOptions += "--shutdownSourcesAfterIdleMs=60000" } } + if (mode == "batch") { + pipelineOptions += "--useDataStreamForBatch=true" + } createPortableValidatesRunnerTask( name: "validatesPortableRunner${name}", jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver", @@ -214,13 +221,17 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi ) } -project.ext.validatesPortableRunnerDocker = portableValidatesRunnerTask("Docker", false, false, true) -project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", false, false, false) -project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", true, false, false) -project.ext.validatesPortableRunnerStreamingCheckpoint = portableValidatesRunnerTask("StreamingCheckpointing", true, true, false) +project.ext.validatesPortableRunnerDocker = portableValidatesRunnerTask("Docker", "batch", false, true) +project.ext.validatesPortableRunnerBatchDataSet = portableValidatesRunnerTask("BatchDataSet", "batch-dataset", false, false) +project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", "batch", false, false) +project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", "streaming", false, false) +project.ext.validatesPortableRunnerStreamingCheckpoint = portableValidatesRunnerTask("StreamingCheckpointing", "streaming", true, false) tasks.register("validatesPortableRunner") { dependsOn validatesPortableRunnerDocker + if (!isFlink2) { + dependsOn validatesPortableRunnerBatchDataSet + } dependsOn validatesPortableRunnerBatch dependsOn validatesPortableRunnerStreaming dependsOn validatesPortableRunnerStreamingCheckpoint diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java index c9559a392704..11175129d7ef 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java @@ -84,7 +84,9 @@ public PortablePipelineResult run(final Pipeline pipeline, JobInfo jobInfo) thro SdkHarnessOptions.getConfiguredLoggerFromOptions(pipelineOptions.as(SdkHarnessOptions.class)); FlinkPortablePipelineTranslator translator; - if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(pipeline)) { + if (!pipelineOptions.getUseDataStreamForBatch() + && !pipelineOptions.isStreaming() + && !hasUnboundedPCollections(pipeline)) { // TODO: Do we need to inspect for unbounded sources before fusing? translator = FlinkBatchPortablePipelineTranslator.createTranslator(); } else { diff --git a/runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java similarity index 100% rename from runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java diff --git a/runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java similarity index 100% rename from runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java diff --git a/runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java similarity index 100% rename from runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java From 63e5f2cd597f83c6c3149178515618c344d63d44 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 17 Dec 2025 17:06:15 -0500 Subject: [PATCH 2/2] Create a PostCommit run validate runner tests on legacy DataSet --- .github/workflows/README.md | 1 + .../beam_PostCommit_Java_PVR_Flink_Batch.yml | 106 ++++++++++++++++++ runners/flink/flink_runner.gradle | 100 +++++++++++------ .../flink/job-server/flink_job_server.gradle | 4 +- 4 files changed, 175 insertions(+), 36 deletions(-) create mode 100644 .github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml diff --git a/.github/workflows/README.md b/.github/workflows/README.md index f01d2a1257bd..283be9c2b1fc 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -344,6 +344,7 @@ PostCommit Jobs run in a schedule against master branch and generally do not get | [ PostCommit Java Nexmark Direct ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml) | N/A |`beam_PostCommit_Java_Nexmark_Direct.json`| [![.github/workflows/beam_PostCommit_Java_Nexmark_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml?query=event%3Aschedule) | | [ PostCommit Java Nexmark Flink ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml) | N/A |`beam_PostCommit_Java_Nexmark_Flink.json`| [![.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml?query=event%3Aschedule) | | [ PostCommit Java Nexmark Spark ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml) | N/A |`beam_PostCommit_Java_Nexmark_Spark.json`| [![.github/workflows/beam_PostCommit_Java_Nexmark_Spark.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml?query=event%3Aschedule) | +| [ PostCommit Java PVR Flink Batch ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml) | N/A |`beam_PostCommit_Java_PVR_Flink_Batch.json`| [![.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml?query=event%3Aschedule) | | [ PostCommit Java PVR Flink Streaming ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml) | N/A |`beam_PostCommit_Java_PVR_Flink_Streaming.json`| [![.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml?query=event%3Aschedule) | | [ PostCommit Java PVR Samza ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml) | N/A |`beam_PostCommit_Java_PVR_Samza.json`| [![.github/workflows/beam_PostCommit_Java_PVR_Samza.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml?query=event%3Aschedule) | | [ PostCommit Java SingleStoreIO IT ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml) | N/A |`beam_PostCommit_Java_SingleStoreIO_IT.json`| [![.github/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml?query=event%3Aschedule) | diff --git a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml new file mode 100644 index 000000000000..0a808f2f8617 --- /dev/null +++ b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml @@ -0,0 +1,106 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: PostCommit Java PVR Flink Batch + +on: + push: + tags: ['v*'] + branches: ['master', 'release-*'] + paths: + - 'runners/flink/**' + - 'runners/java-fn-execution/**' + - 'sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/**' + - '.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml' + pull_request_target: + branches: ['master', 'release-*'] + paths: + - 'release/trigger_all_tests.json' + - '.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Batch.json' + schedule: + - cron: '15 2/6 * * *' + workflow_dispatch: + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: write + checks: write + contents: read + deployments: read + id-token: none + issues: write + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +env: + DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + beam_PostCommit_Java_PVR_Flink_Batch: + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + strategy: + matrix: + job_name: ["beam_PostCommit_Java_PVR_Flink_Batch"] + job_phrase: ["Run Java_PVR_Flink_Batch PostCommit"] + timeout-minutes: 240 + runs-on: [self-hosted, ubuntu-20.04, highmem] + if: | + github.event_name == 'push' || + github.event_name == 'pull_request_target' || + (github.event_name == 'schedule' && github.repository == 'apache/beam') || + github.event_name == 'workflow_dispatch' || + github.event.comment.body == 'Run Java_PVR_Flink_Batch PostCommit' + steps: + - uses: actions/checkout@v4 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup environment + uses: ./.github/actions/setup-environment-action + - name: run validatesPortableRunnerBatch script + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :runners:flink:1.20:job-server:validatesPortableRunnerBatchDataSet + env: + CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }} + - name: Archive JUnit Test Results + uses: actions/upload-artifact@v4 + if: ${{ !success() }} + with: + name: JUnit Test Results + path: "**/build/reports/tests/" + - name: Upload test report + uses: actions/upload-artifact@v4 + with: + name: java-code-coverage-report + path: "**/build/test-results/**/*.xml" +# TODO: Investigate 'Max retries exceeded' issue with EnricoMi/publish-unit-test-result-action@v2. diff --git a/runners/flink/flink_runner.gradle b/runners/flink/flink_runner.gradle index 449b0621d96d..af90c22cfb08 100644 --- a/runners/flink/flink_runner.gradle +++ b/runners/flink/flink_runner.gradle @@ -88,10 +88,16 @@ def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) { it.duplicatesStrategy DuplicatesStrategy.INCLUDE } -def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) { - it.from test_source_overrides - it.into "${sourceOverridesBase}/test/java" - it.duplicatesStrategy DuplicatesStrategy.INCLUDE +def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) { copyTask -> + copyTask.from test_source_overrides + copyTask.into "${sourceOverridesBase}/test/java" + copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE + + if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('test')) { + project.ext.excluded_files.test.each { file -> + copyTask.exclude "**/${file}" + } + } } def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Copy) { @@ -100,45 +106,69 @@ def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Co it.duplicatesStrategy DuplicatesStrategy.INCLUDE } -// add dependency to gradle Java plugin defined tasks -compileJava.dependsOn copySourceOverrides -processResources.dependsOn copyResourcesOverrides -compileTestJava.dependsOn copyTestSourceOverrides -processTestResources.dependsOn copyTestResourcesOverrides - -// add dependency BeamModulePlugin defined custom tasks -// they are defined only when certain flags are provided (e.g. -Prelease; -Ppublishing, etc) -def sourcesJar = project.tasks.findByName('sourcesJar') -if (sourcesJar != null) { - sourcesJar.dependsOn copySourceOverrides - sourcesJar.dependsOn copyResourcesOverrides -} -def testSourcesJar = project.tasks.findByName('testSourcesJar') -if (testSourcesJar != null) { - testSourcesJar.dependsOn copyTestSourceOverrides - testSourcesJar.dependsOn copyTestResourcesOverrides -} +def use_override = (flink_major != all_versions.first()) +def sourceBase = "${project.projectDir}/../src" -/* +if (use_override) { + // Copy original+version specific sources to a tmp dir and use it as sourceSet + // add dependency to gradle Java plugin defined tasks + compileJava.dependsOn copySourceOverrides + processResources.dependsOn copyResourcesOverrides + compileTestJava.dependsOn copyTestSourceOverrides + processTestResources.dependsOn copyTestResourcesOverrides + + // add dependency BeamModulePlugin defined custom tasks + // they are defined only when certain flags are provided (e.g. -Prelease; -Ppublishing, etc) + def sourcesJar = project.tasks.findByName('sourcesJar') + if (sourcesJar != null) { + sourcesJar.dependsOn copySourceOverrides + sourcesJar.dependsOn copyResourcesOverrides + } + def testSourcesJar = project.tasks.findByName('testSourcesJar') + if (testSourcesJar != null) { + testSourcesJar.dependsOn copyTestSourceOverrides + testSourcesJar.dependsOn copyTestResourcesOverrides + } + /* * We have to explicitly set all directories here to make sure each * version of Flink has the correct overrides set. */ -def sourceBase = "${project.projectDir}/../src" -sourceSets { - main { - java { - srcDirs = ["${sourceOverridesBase}/main/java"] + sourceSets { + main { + java { + srcDirs = ["${sourceOverridesBase}/main/java"] + } + resources { + srcDirs = ["${sourceBase}/main/resources", "${sourceOverridesBase}/main/resources"] + } } - resources { - srcDirs = ["${sourceBase}/main/resources", "${sourceOverridesBase}/main/resources"] + test { + java { + srcDirs = ["${sourceOverridesBase}/test/java"] + } + resources { + srcDirs = ["${sourceBase}/test/resources", "${sourceOverridesBase}/test/resources"] + } } } - test { - java { - srcDirs = ["${sourceOverridesBase}/test/java"] +} else { + // Use the original sources directly for the lowest supported Flink version. + sourceSets { + main { + java { + srcDirs = ["${sourceBase}/main/java"] + } + resources { + srcDirs = ["${sourceBase}/main/resources"] + } } - resources { - srcDirs = ["${sourceBase}/test/resources", "${sourceOverridesBase}/test/resources"] + test { + java { + srcDirs = ["${sourceBase}/test/java"] + } + resources { + srcDirs = ["${sourceBase}/test/resources"] + } } } } diff --git a/runners/flink/job-server/flink_job_server.gradle b/runners/flink/job-server/flink_job_server.gradle index 88808cc7ed60..b85f8fc98aaa 100644 --- a/runners/flink/job-server/flink_job_server.gradle +++ b/runners/flink/job-server/flink_job_server.gradle @@ -193,7 +193,9 @@ def portableValidatesRunnerTask(String name, String mode, boolean checkpointing, excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs' return } - + if (mode == "batch") { + excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs' + } excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo' excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections' excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'