Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ PostCommit Jobs run in a schedule against master branch and generally do not get
| [ PostCommit Java Nexmark Direct ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml) | N/A |`beam_PostCommit_Java_Nexmark_Direct.json`| [![.github/workflows/beam_PostCommit_Java_Nexmark_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml?query=event%3Aschedule) |
| [ PostCommit Java Nexmark Flink ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml) | N/A |`beam_PostCommit_Java_Nexmark_Flink.json`| [![.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml?query=event%3Aschedule) |
| [ PostCommit Java Nexmark Spark ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml) | N/A |`beam_PostCommit_Java_Nexmark_Spark.json`| [![.github/workflows/beam_PostCommit_Java_Nexmark_Spark.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml?query=event%3Aschedule) |
| [ PostCommit Java PVR Flink Batch ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml) | N/A |`beam_PostCommit_Java_PVR_Flink_Batch.json`| [![.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml?query=event%3Aschedule) |
| [ PostCommit Java PVR Flink Streaming ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml) | N/A |`beam_PostCommit_Java_PVR_Flink_Streaming.json`| [![.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml?query=event%3Aschedule) |
| [ PostCommit Java PVR Samza ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml) | N/A |`beam_PostCommit_Java_PVR_Samza.json`| [![.github/workflows/beam_PostCommit_Java_PVR_Samza.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml?query=event%3Aschedule) |
| [ PostCommit Java SingleStoreIO IT ](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml) | N/A |`beam_PostCommit_Java_SingleStoreIO_IT.json`| [![.github/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml?query=event%3Aschedule) |
Expand Down
106 changes: 106 additions & 0 deletions .github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: PostCommit Java PVR Flink Batch

on:
push:
tags: ['v*']
branches: ['master', 'release-*']
paths:
- 'runners/flink/**'
- 'runners/java-fn-execution/**'
- 'sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/**'
- '.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml'
pull_request_target:
branches: ['master', 'release-*']
paths:
- 'release/trigger_all_tests.json'
- '.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Batch.json'
schedule:
- cron: '15 2/6 * * *'
workflow_dispatch:

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}'
cancel-in-progress: true

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: write
checks: write
contents: read
deployments: read
id-token: none
issues: write
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read

env:
DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

jobs:
beam_PostCommit_Java_PVR_Flink_Batch:
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["beam_PostCommit_Java_PVR_Flink_Batch"]
job_phrase: ["Run Java_PVR_Flink_Batch PostCommit"]
timeout-minutes: 240
runs-on: [self-hosted, ubuntu-20.04, highmem]
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request_target' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
github.event_name == 'workflow_dispatch' ||
github.event.comment.body == 'Run Java_PVR_Flink_Batch PostCommit'
steps:
- uses: actions/checkout@v4
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup environment
uses: ./.github/actions/setup-environment-action
- name: run validatesPortableRunnerBatch script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:flink:1.20:job-server:validatesPortableRunnerBatchDataSet
env:
CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }}
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
with:
name: JUnit Test Results
path: "**/build/reports/tests/"
- name: Upload test report
uses: actions/upload-artifact@v4
with:
name: java-code-coverage-report
path: "**/build/test-results/**/*.xml"
# TODO: Investigate 'Max retries exceeded' issue with EnricoMi/publish-unit-test-result-action@v2.
125 changes: 83 additions & 42 deletions runners/flink/flink_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import groovy.json.JsonOutput
def base_path = ".."

def overrides(versions, type, base_path) {
versions.collect { "${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"]
// order is important
["${base_path}/src/${type}/java"] + versions.collect { "${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"]
}

def all_versions = flink_versions.split(",")
Expand All @@ -49,7 +50,8 @@ applyJavaNature(
automaticModuleName: 'org.apache.beam.runners.flink',
archivesBaseName: archivesBaseName,
// flink runner jars are in same package name. Publish javadoc once.
exportJavadoc: project.ext.flink_version.startsWith(all_versions.first())
exportJavadoc: project.ext.flink_version.startsWith(all_versions.first()),
requireJavaVersion: project.ext.flink_major.startsWith('2') ? JavaVersion.VERSION_11 : null
)

description = "Apache Beam :: Runners :: Flink $flink_version"
Expand All @@ -68,10 +70,16 @@ evaluationDependsOn(":examples:java")
*/
def sourceOverridesBase = project.layout.buildDirectory.dir('source-overrides/src').get()

def copySourceOverrides = tasks.register('copySourceOverrides', Copy) {
it.from main_source_overrides
it.into "${sourceOverridesBase}/main/java"
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { copyTask ->
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently if I overwrite a source file, saying, in Flink 1.20 module. I need to move that source file out of common src/ and branch it in both 1.17 and 1.20 paths.

After this change, I only need to create a file in 1.20 path to overwrite.

copyTask.from main_source_overrides
copyTask.into "${sourceOverridesBase}/main/java"
copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE

if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('main')) {
project.ext.excluded_files.main.each { file ->
copyTask.exclude "**/${file}"
}
}
}

def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) {
Expand All @@ -80,10 +88,16 @@ def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) {
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}

def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) {
it.from test_source_overrides
it.into "${sourceOverridesBase}/test/java"
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) { copyTask ->
copyTask.from test_source_overrides
copyTask.into "${sourceOverridesBase}/test/java"
copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE

if (project.ext.has('excluded_files') && project.ext.excluded_files.containsKey('test')) {
project.ext.excluded_files.test.each { file ->
copyTask.exclude "**/${file}"
}
}
}

def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Copy) {
Expand All @@ -92,45 +106,69 @@ def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', Co
it.duplicatesStrategy DuplicatesStrategy.INCLUDE
}

// add dependency to gradle Java plugin defined tasks
compileJava.dependsOn copySourceOverrides
processResources.dependsOn copyResourcesOverrides
compileTestJava.dependsOn copyTestSourceOverrides
processTestResources.dependsOn copyTestResourcesOverrides

// add dependency BeamModulePlugin defined custom tasks
// they are defined only when certain flags are provided (e.g. -Prelease; -Ppublishing, etc)
def sourcesJar = project.tasks.findByName('sourcesJar')
if (sourcesJar != null) {
sourcesJar.dependsOn copySourceOverrides
sourcesJar.dependsOn copyResourcesOverrides
}
def testSourcesJar = project.tasks.findByName('testSourcesJar')
if (testSourcesJar != null) {
testSourcesJar.dependsOn copyTestSourceOverrides
testSourcesJar.dependsOn copyTestResourcesOverrides
}
def use_override = (flink_major != all_versions.first())
def sourceBase = "${project.projectDir}/../src"

/*
if (use_override) {
// Copy original+version specific sources to a tmp dir and use it as sourceSet
// add dependency to gradle Java plugin defined tasks
compileJava.dependsOn copySourceOverrides
processResources.dependsOn copyResourcesOverrides
compileTestJava.dependsOn copyTestSourceOverrides
processTestResources.dependsOn copyTestResourcesOverrides

// add dependency BeamModulePlugin defined custom tasks
// they are defined only when certain flags are provided (e.g. -Prelease; -Ppublishing, etc)
def sourcesJar = project.tasks.findByName('sourcesJar')
if (sourcesJar != null) {
sourcesJar.dependsOn copySourceOverrides
sourcesJar.dependsOn copyResourcesOverrides
}
def testSourcesJar = project.tasks.findByName('testSourcesJar')
if (testSourcesJar != null) {
testSourcesJar.dependsOn copyTestSourceOverrides
testSourcesJar.dependsOn copyTestResourcesOverrides
}
/*
* We have to explicitly set all directories here to make sure each
* version of Flink has the correct overrides set.
*/
def sourceBase = "${project.projectDir}/../src"
sourceSets {
main {
java {
srcDirs = ["${sourceBase}/main/java", "${sourceOverridesBase}/main/java"]
sourceSets {
main {
java {
srcDirs = ["${sourceOverridesBase}/main/java"]
}
resources {
srcDirs = ["${sourceBase}/main/resources", "${sourceOverridesBase}/main/resources"]
}
}
resources {
srcDirs = ["${sourceBase}/main/resources", "${sourceOverridesBase}/main/resources"]
test {
java {
srcDirs = ["${sourceOverridesBase}/test/java"]
}
resources {
srcDirs = ["${sourceBase}/test/resources", "${sourceOverridesBase}/test/resources"]
}
}
}
test {
java {
srcDirs = ["${sourceBase}/test/java", "${sourceOverridesBase}/test/java"]
} else {
// Use the original sources directly for the lowest supported Flink version.
sourceSets {
main {
java {
srcDirs = ["${sourceBase}/main/java"]
}
resources {
srcDirs = ["${sourceBase}/main/resources"]
}
}
resources {
srcDirs = ["${sourceBase}/test/resources", "${sourceOverridesBase}/test/resources"]
test {
java {
srcDirs = ["${sourceBase}/test/java"]
}
resources {
srcDirs = ["${sourceBase}/test/resources"]
}
}
}
}
Expand Down Expand Up @@ -196,7 +234,10 @@ dependencies {

implementation "org.apache.flink:flink-core:$flink_version"
implementation "org.apache.flink:flink-metrics-core:$flink_version"
implementation "org.apache.flink:flink-java:$flink_version"
if (project.ext.flink_major.startsWith('1')) {
// FLINK-36336: dataset API removed in Flink 2
implementation "org.apache.flink:flink-java:$flink_version"
}

implementation "org.apache.flink:flink-runtime:$flink_version"
implementation "org.apache.flink:flink-metrics-core:$flink_version"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,19 @@ task copyDockerfileDependencies(type: Copy) {
}

def pushContainers = project.rootProject.hasProperty(["isRelease"]) || project.rootProject.hasProperty("push-containers")
def containerName = project.parent.name.startsWith("2") ? "flink_job_server" : "flink${project.parent.name}_job_server"
def containerTag = project.rootProject.hasProperty(["docker-tag"]) ? project.rootProject["docker-tag"] : project.sdk_version
if (project.parent.name.startsWith("2")) {
containerTag += "-flink" + ${project.parent.name}
}

docker {
name containerImageName(
name: project.docker_image_default_repo_prefix + "flink${project.parent.name}_job_server",
name: project.docker_image_default_repo_prefix + containerName,
root: project.rootProject.hasProperty(["docker-repository-root"]) ?
project.rootProject["docker-repository-root"] :
project.docker_image_default_repo_root,
tag: project.rootProject.hasProperty(["docker-tag"]) ?
project.rootProject["docker-tag"] : project.sdk_version)
tag: containerTag)
// tags used by dockerTag task
tags containerImageTags()
files "./build/"
Expand Down
31 changes: 22 additions & 9 deletions runners/flink/job-server/flink_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ apply plugin: 'application'
// we need to set mainClassName before applying shadow plugin
mainClassName = "org.apache.beam.runners.flink.FlinkJobServerDriver"

// Resolve the Flink project name (and version) the job-server is based on
def flinkRunnerProject = "${project.path.replace(":job-server", "")}"
evaluationDependsOn(flinkRunnerProject)
boolean isFlink2 = project(flinkRunnerProject).ext.flink_major.startsWith('2')

applyJavaNature(
automaticModuleName: 'org.apache.beam.runners.flink.jobserver',
archivesBaseName: project.hasProperty('archives_base_name') ? archives_base_name : archivesBaseName,
Expand All @@ -37,11 +42,9 @@ applyJavaNature(
shadowClosure: {
append "reference.conf"
},
requireJavaVersion: isFlink2 ? JavaVersion.VERSION_11 : null
)

// Resolve the Flink project name (and version) the job-server is based on
def flinkRunnerProject = "${project.path.replace(":job-server", "")}"

description = project(flinkRunnerProject).description + " :: Job Server"

/*
Expand Down Expand Up @@ -126,18 +129,22 @@ runShadow {
jvmArgs += ["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
}

def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpointing, boolean docker) {
def portableValidatesRunnerTask(String name, String mode, boolean checkpointing, boolean docker) {
def pipelineOptions = [
// Limit resource consumption via parallelism
"--parallelism=2",
]
boolean streaming = (mode == "streaming")
if (streaming) {
pipelineOptions += "--streaming"
if (checkpointing) {
pipelineOptions += "--checkpointingInterval=3000"
pipelineOptions += "--shutdownSourcesAfterIdleMs=60000"
}
}
if (mode == "batch") {
pipelineOptions += "--useDataStreamForBatch=true"
}
createPortableValidatesRunnerTask(
name: "validatesPortableRunner${name}",
jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver",
Expand Down Expand Up @@ -186,7 +193,9 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi
excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
return
}

if (mode == "batch") {
excludeCategories 'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
}
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
Expand Down Expand Up @@ -214,13 +223,17 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi
)
}

project.ext.validatesPortableRunnerDocker = portableValidatesRunnerTask("Docker", false, false, true)
project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", false, false, false)
project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", true, false, false)
project.ext.validatesPortableRunnerStreamingCheckpoint = portableValidatesRunnerTask("StreamingCheckpointing", true, true, false)
project.ext.validatesPortableRunnerDocker = portableValidatesRunnerTask("Docker", "batch", false, true)
project.ext.validatesPortableRunnerBatchDataSet = portableValidatesRunnerTask("BatchDataSet", "batch-dataset", false, false)
project.ext.validatesPortableRunnerBatch = portableValidatesRunnerTask("Batch", "batch", false, false)
project.ext.validatesPortableRunnerStreaming = portableValidatesRunnerTask("Streaming", "streaming", false, false)
project.ext.validatesPortableRunnerStreamingCheckpoint = portableValidatesRunnerTask("StreamingCheckpointing", "streaming", true, false)

tasks.register("validatesPortableRunner") {
dependsOn validatesPortableRunnerDocker
if (!isFlink2) {
dependsOn validatesPortableRunnerBatchDataSet
}
dependsOn validatesPortableRunnerBatch
dependsOn validatesPortableRunnerStreaming
dependsOn validatesPortableRunnerStreamingCheckpoint
Expand Down
Loading
Loading