diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FilterableStagingFilesPipelineOptions.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FilterableStagingFilesPipelineOptions.java new file mode 100644 index 000000000000..1d99c962f7b0 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FilterableStagingFilesPipelineOptions.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Allows to configure how the staging files are selected. + * TODO: allow to select in parent classloaders as well as current one. + */ +public interface FilterableStagingFilesPipelineOptions extends PipelineOptions { + /** + * The filter to use to select the files from the classloader to keep + * when staging the classloader. + * The regex is applied on the file name. + */ + @Description("Include regex for file staging.") + String getClassLoaderIncludeFilter(); + void setClassLoaderIncludeFilter(String value); + + /** + * The filter to use to not select the files from the classloader + * when staging the classloader. + * The regex is applied on the file name. + */ + @Description("Exclude regex for file staging.") + String getClassLoaderExcludeFilter(); + void setClassLoaderExcludeFilter(String value); +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java index ae6b076acf25..32b388aaffb3 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java @@ -17,41 +17,85 @@ */ package org.apache.beam.runners.core.construction; +import static java.util.Collections.emptySet; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; + import java.io.File; -import java.net.URISyntaxException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; +import java.io.IOException; import java.util.List; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import org.apache.beam.runners.core.construction.classloader.Classloaders; +import org.apache.beam.sdk.options.PipelineOptions; -/** Utilities for working with classpath resources for pipelines. */ +/** + * Utilities for working with classpath resources for pipelines. + */ public class PipelineResources { - /** - * Attempts to detect all the resources the class loader has access to. This does not recurse - * to class loader parents stopping it from pulling in resources from the system class loader. - * - * @param classLoader The URLClassLoader to use to detect resources to stage. - * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one - * of the resources the class loader exposes is not a file resource. - * @return A list of absolute paths to the resources the class loader uses. - */ - public static List detectClassPathResourcesToStage(ClassLoader classLoader) { - if (!(classLoader instanceof URLClassLoader)) { - String message = String.format("Unable to use ClassLoader to detect classpath elements. " - + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader); - throw new IllegalArgumentException(message); + /** + * Attempts to detect all the resources the class loader has access to. This does not recurse + * to class loader parents stopping it from pulling in resources from the system class loader. + * + * @param options The pipelineOptions to use to filter the files found. + * @param classLoader The URLClassLoader to use to detect resources to stage. + * @return A list of absolute paths to the resources the class loader uses. + * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one + * of the resources the class loader exposes is not a file resource. + */ + public static List detectClassPathResourcesToStage(final PipelineOptions options, + final ClassLoader classLoader) { + try { + final Set parentFiles = classLoader == null + ? emptySet() : Classloaders.toFiles(classLoader.getParent()).collect(toSet()); + final Predicate filter = toFilter(options); + return Classloaders.toFiles(classLoader) + .filter(s -> !parentFiles.contains(s)) + .filter(filter) + .map(f -> { + try { + return f.getCanonicalPath(); + } catch (IOException e) { + return f.getAbsolutePath(); + } + }) + .collect(toList()); + } catch (final IOException e) { + throw new IllegalStateException(e); + } + } + + private static Predicate toFilter(final PipelineOptions options) { + if (options == null) { + return f -> true; + } + final FilterableStagingFilesPipelineOptions opts = options + .as(FilterableStagingFilesPipelineOptions.class); + final String include = opts.getClassLoaderIncludeFilter(); + final String exclude = opts.getClassLoaderExcludeFilter(); + if (include == null && exclude == null) { + return f -> true; + } + final Predicate includeFilter = include == null + ? v -> true : new PatternFilter(include); + final Predicate excludeFilter = exclude == null + ? v -> false : new PatternFilter(exclude); + final Predicate predicate = includeFilter.and(excludeFilter.negate()); + return f -> predicate.test(f.getName()); } - List files = new ArrayList<>(); - for (URL url : ((URLClassLoader) classLoader).getURLs()) { - try { - files.add(new File(url.toURI()).getAbsolutePath()); - } catch (IllegalArgumentException | URISyntaxException e) { - String message = String.format("Unable to convert url (%s) to file.", url); - throw new IllegalArgumentException(message, e); - } + private static class PatternFilter implements Predicate { + private final Pattern pattern; + + private PatternFilter(final String include) { + this.pattern = Pattern.compile(include); + } + + @Override + public boolean test(final String value) { + return pattern.matcher(value).matches(); + } } - return files; - } } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/classloader/Classloaders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/classloader/Classloaders.java new file mode 100644 index 000000000000..3b717bb4f755 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/classloader/Classloaders.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction.classloader; + +import static java.util.Collections.list; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.stream.Stream; + +/** + * inspired from xbean, ensures the portability of the "scanning" and the impl independent logic. + */ +public final class Classloaders { + private static final ClassLoader SYSTEM = ClassLoader.getSystemClassLoader(); + + public static Stream toFiles(final ClassLoader classLoader) throws IOException { + if (classLoader == null) { + return Stream.empty(); + } + return Stream.concat( + list(classLoader.getResources("")).stream(), + list(classLoader.getResources("META-INF")).stream() + .map(url -> { + final String externalForm = url.toExternalForm(); + try { + return new URL(externalForm.substring(0, externalForm.lastIndexOf("META-INF"))); + } catch (final MalformedURLException e) { + throw new IllegalArgumentException(e); + } + })) + .map(Classloaders::toFile); + } + + private static boolean isSystemParent(final ClassLoader classLoader) { + ClassLoader current = SYSTEM.getParent(); + while (current != null) { + if (current == classLoader) { + return true; + } + current = current.getParent(); + } + return false; + } + + private static File toFile(final URL url) { + if ("jar".equals(url.getProtocol())) { + try { + final String spec = url.getFile(); + final int separator = spec.indexOf('!'); + if (separator == -1) { + return null; + } + return toFile(new URL(spec.substring(0, separator + 1))); + } catch (final MalformedURLException e) { + // let it fail + } + } else if ("file".equals(url.getProtocol())) { + String path = decode(url.getFile()); + if (path.endsWith("!")) { + path = path.substring(0, path.length() - 1); + } + return new File(path); + } + throw new IllegalArgumentException("Unsupported entry: " + url.toExternalForm()); + } + + private static String decode(String fileName) { + if (fileName.indexOf('%') == -1) { + return fileName; + } + + final StringBuilder result = new StringBuilder(fileName.length()); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + for (int i = 0; i < fileName.length(); ) { + final char c = fileName.charAt(i); + if (c == '%') { + out.reset(); + do { + if (i + 2 >= fileName.length()) { + throw new IllegalArgumentException("Incomplete % sequence at: " + i); + } + final int d1 = Character.digit(fileName.charAt(i + 1), 16); + final int d2 = Character.digit(fileName.charAt(i + 2), 16); + if (d1 == -1 || d2 == -1) { + throw new IllegalArgumentException("Invalid % sequence (" + + fileName.substring(i, i + 3) + ") at: " + String.valueOf(i)); + } + out.write((byte) ((d1 << 4) + d2)); + i += 3; + } while (i < fileName.length() && fileName.charAt(i) == '%'); + result.append(out.toString()); + } else { + result.append(c); + i++; + } + } + return result.toString(); + } + + private Classloaders() { + // no-op + } +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/classloader/package-info.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/classloader/package-info.java new file mode 100644 index 000000000000..c757ebed2dfd --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/classloader/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utilities for runners to implement metrics. + */ +package org.apache.beam.runners.core.construction.classloader; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java index 633df01246f7..4539f4b056bb 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java @@ -17,19 +17,23 @@ */ package org.apache.beam.runners.core.construction; +import static java.util.stream.Collectors.toList; import static org.junit.Assert.assertEquals; import com.google.common.collect.ImmutableList; import java.io.File; +import java.io.FileOutputStream; import java.net.URL; import java.net.URLClassLoader; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mockito; /** * Tests for PipelineResources. @@ -42,35 +46,58 @@ public class PipelineResourcesTest { @Test public void detectClassPathResourceWithFileResources() throws Exception { - File file = tmpFolder.newFile("file"); - File file2 = tmpFolder.newFile("file2"); - URLClassLoader classLoader = new URLClassLoader(new URL[] { + File file = tmpFolder.newFile("file.jar"); + try (final JarOutputStream jar = new JarOutputStream(new FileOutputStream(file))) { + // if there is anything in a valid jar this entry should be matched + // otherwise jar has nothing inside and we don't care about it anyway + jar.putNextEntry(new JarEntry("")); + jar.closeEntry(); + } + File file2 = tmpFolder.newFolder("file2"); + try (final URLClassLoader classLoader = new URLClassLoader(new URL[] { file.toURI().toURL(), file2.toURI().toURL() - }); - - assertEquals(ImmutableList.of(file.getAbsolutePath(), file2.getAbsolutePath()), - PipelineResources.detectClassPathResourcesToStage(classLoader)); + })) { + assertEquals(ImmutableList.of(file.getAbsolutePath(), file2.getAbsolutePath()), + PipelineResources.detectClassPathResourcesToStage(null, classLoader).stream() + .sorted().collect(toList())); + } } @Test - public void detectClassPathResourcesWithUnsupportedClassLoader() { - ClassLoader mockClassLoader = Mockito.mock(ClassLoader.class); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Unable to use ClassLoader to detect classpath elements."); - - PipelineResources.detectClassPathResourcesToStage(mockClassLoader); + public void detectClassPathResourceWithFilter() throws Exception { + File file = tmpFolder.newFile("file.jar"); + try (final JarOutputStream jar = new JarOutputStream(new FileOutputStream(file))) { + // if there is anything in a valid jar this entry should be matched + // otherwise jar has nothing inside and we don't care about it anyway + jar.putNextEntry(new JarEntry("")); + jar.closeEntry(); + } + File file2 = tmpFolder.newFolder("file2"); + try (final URLClassLoader classLoader = new URLClassLoader(new URL[] { + file.toURI().toURL(), + file2.toURI().toURL() + })) { + assertEquals(ImmutableList.of(file.getAbsolutePath()), + PipelineResources.detectClassPathResourcesToStage( + PipelineOptionsFactory.fromArgs("--classLoaderIncludeFilter=file\\.jar") + .as(FilterableStagingFilesPipelineOptions.class), classLoader) + .stream().sorted().collect(toList())); + assertEquals(ImmutableList.of(file2.getAbsolutePath()), + PipelineResources.detectClassPathResourcesToStage( + PipelineOptionsFactory.fromArgs("--classLoaderExcludeFilter=file\\.jar") + .as(FilterableStagingFilesPipelineOptions.class), classLoader) + .stream().sorted().collect(toList())); + } } @Test public void detectClassPathResourceWithNonFileResources() throws Exception { - String url = "http://www.google.com/all-the-secrets.jar"; - URLClassLoader classLoader = new URLClassLoader(new URL[] { + final String url = "http://www.google.com/all-the-secrets.jar"; + try (final URLClassLoader classLoader = new URLClassLoader(new URL[] { new URL(url) - }); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Unable to convert url (" + url + ") to file."); - - PipelineResources.detectClassPathResourcesToStage(classLoader); + })) { + assertEquals(0, PipelineResources.detectClassPathResourcesToStage(null, classLoader).size()); + } } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 5fdcdcec1218..c1b6bb19f999 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.flink; import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage; +import static org.apache.beam.sdk.util.common.ReflectHelpers.findClassLoader; import com.google.common.base.Joiner; import java.util.ArrayList; @@ -75,7 +76,7 @@ public static FlinkRunner fromOptions(PipelineOptions options) { if (flinkOptions.getFilesToStage() == null) { flinkOptions.setFilesToStage(detectClassPathResourcesToStage( - FlinkRunner.class.getClassLoader())); + options, findClassLoader())); LOG.info("PipelineOptions.filesToStage was not specified. " + "Defaulting to files from the classpath: will stage {} files. " + "Enable logging at DEBUG level to see which files will be staged.", diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 895c7a1a661a..7b5205008920 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -25,6 +25,7 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; +import static org.apache.beam.sdk.util.common.ReflectHelpers.findClassLoader; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; @@ -260,7 +261,7 @@ public static DataflowRunner fromOptions(PipelineOptions options) { if (dataflowOptions.getFilesToStage() == null) { dataflowOptions.setFilesToStage(detectClassPathResourcesToStage( - DataflowRunner.class.getClassLoader())); + options, findClassLoader())); if (dataflowOptions.getFilesToStage().isEmpty()) { throw new IllegalArgumentException("No files to stage has been found."); } else { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 32972e7e9a07..b1f4aa15e1b9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.spark; import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage; +import static org.apache.beam.sdk.util.common.ReflectHelpers.findClassLoader; import com.google.common.collect.Iterables; import java.util.Arrays; @@ -126,7 +127,7 @@ public static SparkRunner fromOptions(PipelineOptions options) { if (sparkOptions.getFilesToStage() == null) { sparkOptions.setFilesToStage(detectClassPathResourcesToStage( - SparkRunner.class.getClassLoader())); + options, findClassLoader())); LOG.info("PipelineOptions.filesToStage was not specified. " + "Defaulting to files from the classpath: will stage {} files. " + "Enable logging at DEBUG level to see which files will be staged.",