diff --git a/.gitignore b/.gitignore index 87c11c1..8758e8d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ /classpath/ build/ .idea +bin/ +.vscode/ diff --git a/README.md b/README.md index 32a9397..5466e82 100644 --- a/README.md +++ b/README.md @@ -104,9 +104,13 @@ $ ./gradlew gem Build with integrationTest ``` -$ ./gradlew -DenableIntegrationTest=true gem +$ ./gradlew -DenableIntegrationTest=true clean all ``` +## Versions + +This plugin version 0.4.0 or later can use with Embulk 0.11. + ## Note diff --git a/build.gradle b/build.gradle index 2b084bb..a3bd5a1 100644 --- a/build.gradle +++ b/build.gradle @@ -1,77 +1,68 @@ plugins { - id "com.jfrog.bintray" version "1.1" - id "com.github.jruby-gradle.base" version "0.1.5" id "java" + id "maven-publish" + id "org.embulk.embulk-plugins" version "0.6.2" } -import com.github.jrubygradle.JRubyExec -apply from: 'https://raw.githubusercontent.com/hata/gradle-plugins/master/embulk-integration-test.gradle' +// apply from: 'https://raw.githubusercontent.com/hata/gradle-plugins/master/embulk-integration-test.gradle' +apply from: 'embulk-integration-test.gradle' -sourceCompatibility = '1.7' -targetCompatibility = '1.7' +sourceCompatibility = '1.8' +targetCompatibility = '1.8' repositories { mavenCentral() - jcenter() -} -configurations { - provided } -version = "0.3.6" +version = "0.4.0" dependencies { - compile "org.embulk:embulk-core:0.8.0+" - compile "org.msgpack:msgpack-core:0.7.1" - provided "org.embulk:embulk-core:0.8.0+" - // compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION" - testCompile "org.jmockit:jmockit:1.15+" - testCompile "junit:junit:4.+" -} + // https://mvnrepository.com/artifact/org.embulk/embulk-api + compileOnly "org.embulk:embulk-api:0.10.43" + // https://dev.embulk.org/embulk-spi/ + compileOnly "org.embulk:embulk-spi:0.10.43" + // https://dev.embulk.org/embulk-util-file/ + compile "org.embulk:embulk-util-file:0.1.5" + // https://dev.embulk.org/embulk-util-config/ + compile "org.embulk:embulk-util-config:0.3.4" + // https://dev.embulk.org/embulk-util-timestamp/ + compile "org.embulk:embulk-util-timestamp:0.2.2" -task classpath(type: Copy, dependsOn: ["jar"]) { - doFirst { file("classpath").deleteDir() } - from (configurations.runtime - configurations.provided + files(jar.archivePath)) - into "classpath" + testImplementation "org.embulk:embulk-api:0.10.43" + testImplementation "org.embulk:embulk-spi:0.10.43" + testImplementation "org.embulk:embulk-util-file:0.1.5" + testImplementation "org.embulk:embulk-util-timestamp:0.2.2" + testImplementation "org.jmockit:jmockit:1.15" + testImplementation "junit:junit:4.+" } -clean { delete 'classpath' } -task gem(type: JRubyExec, dependsOn: ["build", "gemspec", "classpath"]) { - jrubyArgs "-rrubygems/gem_runner", "-eGem::GemRunner.new.run(ARGV)", "build" - script "build/gemspec" - doLast { ant.move(file: "${project.name}-${project.version}.gem", todir: "pkg") } +embulkPlugin { + mainClass = "org.embulk.filter.SpeedometerFilterPlugin" + category = "filter" + type = "speedometer" } -task gemPush(type: JRubyExec, dependsOn: ["gem"]) { - jrubyArgs "-rrubygems/gem_runner", "-eGem::GemRunner.new.run(ARGV)", "push" - script "pkg/${project.name}-${project.version}.gem" +gem { + authors = [ "hata" ] + email = ["hiroki.ata@gmail.com"] + summary = "Speedometer filter plugin for Embulk" + description = "Write log message of processed bytes and throughput periodically." + homepage = "https://github.com/hata/embulk-filter-speedometer" + licenses = ["MIT"] } -task "package"(dependsOn: ["gemspec", "classpath"]) << { - println "> Build succeeded." - println "> You can run embulk with '-L ${file(".").absolutePath}' argument." +gemPush { + host = "https://rubygems.org" } -task gemspec << { file("build/gemspec").write($/ -Gem::Specification.new do |spec| - spec.name = "${project.name}" - spec.version = "${project.version}" - spec.authors = ["hata"] - spec.summary = %[Speedometer filter plugin for Embulk] - spec.description = %[Write log message of processed bytes and throughput periodically.] - spec.email = ["hiroki.ata@gmail.com"] - spec.licenses = ["MIT"] - spec.homepage = "https://github.com/hata/embulk-filter-speedometer" +// To use integrationTest, run like this: +// ./gradlew -DenableIntegrationTest=true clean all - spec.files = `git ls-files`.split("\n") + Dir["classpath/*.jar"] - spec.test_files = spec.files.grep(%r"^(test|spec)/") - spec.require_paths = ["lib"] +gem.dependsOn(check) - #spec.add_dependency 'YOUR_GEM_DEPENDENCY', ['~> YOUR_GEM_DEPENDENCY_VERSION'] - spec.add_development_dependency 'bundler', ['~> 1.0'] - spec.add_development_dependency 'rake', ['>= 10.0'] -end -/$) -} +// integrationTest reference build/gemContents to run the tests. +// So, it is required to depend on gem target to generate the contents. +integrationTest.dependsOn(gem) + +task all(dependsOn: ["gem", "check", "integrationTest"]) -project.tasks.integrationTest.dependsOn(classpath) diff --git a/embulk-integration-test.gradle b/embulk-integration-test.gradle new file mode 100644 index 0000000..e795a88 --- /dev/null +++ b/embulk-integration-test.gradle @@ -0,0 +1,113 @@ +// This part is made based on 'https://raw.github.com/brunodecarvalho/gradle-plugins/master/integration-test.gradle' +// Assumes 'java', 'groovy' or 'scala' plugins have been applied before + +// Add integration test source sets +sourceSets { + integrationTest { sourceSet -> + ["java", "groovy", "scala", "resources"].each { + if (!sourceSet.hasProperty(it)) return + sourceSet."$it".srcDir file("src/integration-test/${it}") + } + } +} + +// Setup dependencies for integration testing +dependencies { + integrationTestCompile sourceSets.main.output + integrationTestCompile sourceSets.test.output + integrationTestCompile configurations.testCompile + integrationTestRuntime configurations.testRuntime + integrationTestCompile configurations.testImplementation +} + +// Define integration test task +task integrationTest(type: Test) { + testClassesDirs = sourceSets.integrationTest.output.classesDirs + classpath = sourceSets.integrationTest.runtimeClasspath +} + +///// This part is made based on https://raw.githubusercontent.com/hata/gradle-plugins/master/embulk-integration-test.gradle. +// +// If you would like to override embulkDownloadURL, then define +// embulkDownloadURL in gradle.properties or other config. +// And if you use embulk v0.9.x, then change embulkLogPathOption to --log +// because the option is changed. +// e.g. +// def embulkDownloadURL = 'https://dl.embulk.org/embulk-0.9.25.jar' +// def embulkLogPathOption = '--log' + +def embulkDownloadURL = 'https://dl.embulk.org/embulk-0.11.4.jar' +def embulkLogPathOption = '--log-path' +def embulkJarPath = '' +def enableIntegrationTestFlag = Boolean.getBoolean('enableIntegrationTest') +def embulkIntegrationTestDir = 'embulk.integrationtest.dir' + +task initIntegrationTestProperties { + if (project.hasProperty('embulkDownloadURL')) { + embulkDownloadURL = project.findProperty('embulkDownloadURL') + } + if (project.hasProperty('embulkLogPathOption')) { + embulkLogPathOption = project.findProperty('embulkLogPathOption') + } + if (project.hasProperty('embulkJarPath')) { + embulkJarPath = project.findProperty('embulkJarPath') + } else { + embulkJarPath = "$temporaryDir/embulk/bin/embulk.jar" + } + + println("Set download url ${embulkDownloadURL} and jar path ${embulkJarPath}") +} + +task downloadEmbulk { + doFirst { + if (!enableIntegrationTestFlag || new File(embulkJarPath).exists()) { + throw new StopExecutionException('Skip downloadEmbulk task.') + } + } + + doLast { + println("Try to download embulk from ${embulkDownloadURL} to ${embulkJarPath}") + + exec { + executable "sh" + args "-c", "curl --create-dirs -o $embulkJarPath -L $embulkDownloadURL" + } + } +} + +integrationTest.doFirst { + // gemContents is generated by gem task. So, this task should be run after gem. + String libDir = file('build/gemContents/lib').absolutePath + String baseDir = "$temporaryDir/embulk" + systemProperty "$embulkIntegrationTestDir", "$baseDir" + + if (!enableIntegrationTestFlag) { + throw new StopExecutionException('Skip integrationTest task.') + } + + copy { + from 'src/integration-test/resources' + into baseDir + } + + fileTree(dir: "$baseDir", include: "*.yml").visit { config -> + ['preview', 'run'].each { embulkCommand -> + String logFile = "${config.file.absolutePath}.${embulkCommand}.log" + delete logFile + + println("Run integrationTest for ${config.file.absolutePath}") + + exec { + workingDir baseDir + executable "sh" + args "-c", "java -jar $embulkJarPath $embulkCommand -I $libDir $config.file.absolutePath $embulkLogPathOption $logFile" + } + } + } +} + +project.tasks.downloadEmbulk.dependsOn(initIntegrationTestProperties) +project.tasks.integrationTest.dependsOn(downloadEmbulk) + +// Make sure 'check' task calls integration test +// check.dependsOn integrationTest diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 0087cd3..e708b1c 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index e39ed80..3ab0b72 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,5 @@ -#Wed Feb 04 13:46:12 PST 2015 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-6.9.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-2.2.1-bin.zip diff --git a/gradlew b/gradlew index 91a7e26..4f906e0 100755 --- a/gradlew +++ b/gradlew @@ -1,4 +1,20 @@ -#!/usr/bin/env bash +#!/usr/bin/env sh + +# +# Copyright 2015 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ############################################################################## ## @@ -6,20 +22,38 @@ ## ############################################################################## -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS="" +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null APP_NAME="Gradle" APP_BASE_NAME=`basename "$0"` +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" -warn ( ) { +warn () { echo "$*" } -die ( ) { +die () { echo echo "$*" echo @@ -30,6 +64,7 @@ die ( ) { cygwin=false msys=false darwin=false +nonstop=false case "`uname`" in CYGWIN* ) cygwin=true @@ -40,33 +75,14 @@ case "`uname`" in MINGW* ) msys=true ;; + NONSTOP* ) + nonstop=true + ;; esac -# For Cygwin, ensure paths are in UNIX format before anything is touched. -if $cygwin ; then - [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"` -fi - -# Attempt to set APP_HOME -# Resolve links: $0 may be a link -PRG="$0" -# Need this for relative symlinks. -while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`"/$link" - fi -done -SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >&- -APP_HOME="`pwd -P`" -cd "$SAVED" >&- - CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + # Determine the Java command to use to start the JVM. if [ -n "$JAVA_HOME" ] ; then if [ -x "$JAVA_HOME/jre/sh/java" ] ; then @@ -90,7 +106,7 @@ location of your Java installation." fi # Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then MAX_FD_LIMIT=`ulimit -H -n` if [ $? -eq 0 ] ; then if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then @@ -110,11 +126,13 @@ if $darwin; then GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" fi -# For Cygwin, switch paths to Windows format before running java -if $cygwin ; then +# For Cygwin or MSYS, switch paths to Windows format before running java +if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + # We build the pattern for arguments to be converted via cygpath ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` SEP="" @@ -138,27 +156,30 @@ if $cygwin ; then else eval `echo args$i`="\"$arg\"" fi - i=$((i+1)) + i=`expr $i + 1` done case $i in - (0) set -- ;; - (1) set -- "$args0" ;; - (2) set -- "$args0" "$args1" ;; - (3) set -- "$args0" "$args1" "$args2" ;; - (4) set -- "$args0" "$args1" "$args2" "$args3" ;; - (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; - (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; - (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; - (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; - (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + 0) set -- ;; + 1) set -- "$args0" ;; + 2) set -- "$args0" "$args1" ;; + 3) set -- "$args0" "$args1" "$args2" ;; + 4) set -- "$args0" "$args1" "$args2" "$args3" ;; + 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; esac fi -# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules -function splitJvmOpts() { - JVM_OPTS=("$@") +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " } -eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS -JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" +APP_ARGS=`save "$@"` + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" -exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat index aec9973..ac1b06f 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,3 +1,19 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + @if "%DEBUG%" == "" @echo off @rem ########################################################################## @rem @@ -8,20 +24,23 @@ @rem Set local scope for the variables with windows NT shell if "%OS%"=="Windows_NT" setlocal -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS= - set DIRNAME=%~dp0 if "%DIRNAME%" == "" set DIRNAME=. set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + @rem Find java.exe if defined JAVA_HOME goto findJavaFromJavaHome set JAVA_EXE=java.exe %JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto init +if "%ERRORLEVEL%" == "0" goto execute echo. echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. @@ -35,7 +54,7 @@ goto fail set JAVA_HOME=%JAVA_HOME:"=% set JAVA_EXE=%JAVA_HOME%/bin/java.exe -if exist "%JAVA_EXE%" goto init +if exist "%JAVA_EXE%" goto execute echo. echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% @@ -45,34 +64,14 @@ echo location of your Java installation. goto fail -:init -@rem Get command-line arguments, handling Windowz variants - -if not "%OS%" == "Windows_NT" goto win9xME_args -if "%@eval[2+2]" == "4" goto 4NT_args - -:win9xME_args -@rem Slurp the command line arguments. -set CMD_LINE_ARGS= -set _SKIP=2 - -:win9xME_args_slurp -if "x%~1" == "x" goto execute - -set CMD_LINE_ARGS=%* -goto execute - -:4NT_args -@rem Get arguments from the 4NT Shell from JP Software -set CMD_LINE_ARGS=%$ - :execute @rem Setup the command line set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + @rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* :end @rem End local scope for the variables with windows NT shell diff --git a/src/main/java/org/embulk/compatibility/CompatibilityUtil.java b/src/main/java/org/embulk/compatibility/CompatibilityUtil.java new file mode 100644 index 0000000..fefc3d9 --- /dev/null +++ b/src/main/java/org/embulk/compatibility/CompatibilityUtil.java @@ -0,0 +1,61 @@ +package org.embulk.compatibility; + +import org.embulk.spi.BufferAllocator; +import org.embulk.spi.Column; +import org.embulk.spi.Exec; +import org.embulk.spi.PageBuilder; +import org.embulk.spi.PageOutput; +import org.embulk.spi.PageReader; +import org.embulk.spi.Schema; + +public class CompatibilityUtil { + @SuppressWarnings("deprecation") + public static PageBuilder getPageBuilder(final BufferAllocator bufferAllocator, final Schema schema, final PageOutput output) { + if (HAS_EXEC_GET_PAGE_BUILDER) { + return Exec.getPageBuilder(bufferAllocator, schema, output); + } else { + return new PageBuilder(bufferAllocator, schema, output); + } + } + + @SuppressWarnings("deprecation") + public static PageReader getPageReader(final Schema schema) { + if (HAS_EXEC_GET_PAGE_READER) { + return Exec.getPageReader(schema); + } else { + return new PageReader(schema); + } + } + + private static boolean hasExecGetPageReader() { + try { + Exec.class.getMethod("getPageReader", Schema.class); + } catch (final NoSuchMethodException ex) { + return false; + } + return true; + } + + private static boolean hasExecGetPageBuilder() { + try { + Exec.class.getMethod("getPageBuilder", BufferAllocator.class, Schema.class, PageOutput.class); + } catch (final NoSuchMethodException ex) { + return false; + } + return true; + } + + private static boolean hasPageBuilderTimestampInstant() { + try { + PageReader.class.getMethod("getTimestampInstant", Column.class); + } catch (NoSuchMethodException e) { + return false; + } + return true; + } + + public static final boolean HAS_EXEC_GET_PAGE_READER = hasExecGetPageReader(); + public static final boolean HAS_EXEC_GET_PAGE_BUILDER = hasExecGetPageBuilder(); + public static final boolean HAS_PAGE_BUILDER_TIMESTAMP_INSTANT = hasPageBuilderTimestampInstant(); + +} diff --git a/src/main/java/org/embulk/filter/SpeedometerFilterPlugin.java b/src/main/java/org/embulk/filter/SpeedometerFilterPlugin.java index 620652e..0e1e1b4 100644 --- a/src/main/java/org/embulk/filter/SpeedometerFilterPlugin.java +++ b/src/main/java/org/embulk/filter/SpeedometerFilterPlugin.java @@ -1,14 +1,19 @@ package org.embulk.filter; +import java.time.Instant; import java.util.Map; +import java.util.Optional; import javax.validation.constraints.Min; -import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; -import org.embulk.config.ConfigInject; +import org.embulk.compatibility.CompatibilityUtil; +import org.embulk.util.config.Config; +import org.embulk.util.config.ConfigDefault; +import org.embulk.util.config.ConfigMapper; +import org.embulk.util.config.ConfigMapperFactory; import org.embulk.config.ConfigSource; -import org.embulk.config.Task; +import org.embulk.util.config.Task; +import org.embulk.util.config.TaskMapper; import org.embulk.config.TaskSource; import org.embulk.spi.BufferAllocator; import org.embulk.spi.Column; @@ -19,12 +24,10 @@ import org.embulk.spi.PageOutput; import org.embulk.spi.PageReader; import org.embulk.spi.Schema; -import org.embulk.spi.time.Timestamp; -import org.embulk.spi.time.TimestampFormatter; -import org.embulk.spi.util.Timestamps; -import org.msgpack.value.Value; - -import com.google.common.base.Optional; +import org.embulk.spi.json.JsonValue; +import org.embulk.spi.type.TimestampType; +import org.embulk.spi.Exec; +import org.embulk.util.timestamp.TimestampFormatter; public class SpeedometerFilterPlugin implements FilterPlugin @@ -32,8 +35,7 @@ public class SpeedometerFilterPlugin private static final int TRUE_LENGTH = Boolean.toString(true).length(); private static final int FALSE_LENGTH = Boolean.toString(false).length(); - public interface PluginTask - extends Task, TimestampFormatter.Task + public interface PluginTask extends Task { @Config("speed_limit") @ConfigDefault("0") @@ -66,32 +68,64 @@ public interface PluginTask @ConfigDefault("null") public Optional getLabel(); - @ConfigInject - public BufferAllocator getBufferAllocator(); + // copy from org.embulk.spi.time.TimestampParser.Task + @Config("default_timezone") + @ConfigDefault("\"UTC\"") + public String getDefaultTimeZoneId(); + + // copy from org.embulk.spi.time.TimestampParser.Task + @Config("default_timestamp_format") + @ConfigDefault("\"%Y-%m-%d %H:%M:%S.%N %z\"") + public String getDefaultTimestampFormat(); + + // copy from org.embulk.spi.time.TimestampParser.Task + @Config("default_date") + @ConfigDefault("\"1970-01-01\"") + String getDefaultDate(); } - public interface TimestampColumnOption extends Task, - TimestampFormatter.TimestampColumnOption - { } + private interface TimestampColumnOption extends org.embulk.util.config.Task { + @Config("timezone") + @ConfigDefault("null") + java.util.Optional getTimeZoneId(); + + @Config("format") + @ConfigDefault("null") + java.util.Optional getFormat(); + + @Config("date") + @ConfigDefault("null") + java.util.Optional getDate(); + } + + private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build(); @Override public void transaction(ConfigSource config, Schema inputSchema, FilterPlugin.Control control) { - PluginTask task = config.loadConfig(PluginTask.class); Schema outputSchema = inputSchema; - control.run(task.dump(), outputSchema); + control.run(getTask(config).toTaskSource(), outputSchema); } @Override public PageOutput open(TaskSource taskSource, Schema inputSchema, Schema outputSchema, PageOutput output) { - PluginTask task = taskSource.loadTask(PluginTask.class); - + final PluginTask task = getTask(taskSource); return new SpeedControlPageOutput(task, inputSchema, output); } + PluginTask getTask(ConfigSource config) { + final ConfigMapper configMapper = CONFIG_MAPPER_FACTORY.createConfigMapper(); + return configMapper.map(config, PluginTask.class); + } + + PluginTask getTask(TaskSource taskSource) { + final TaskMapper taskMapper = CONFIG_MAPPER_FACTORY.createTaskMapper(); + return taskMapper.map(taskSource, PluginTask.class); + } + static class SpeedControlPageOutput implements PageOutput { private final SpeedometerSpeedController controller; private final Schema schema; @@ -105,15 +139,19 @@ static class SpeedControlPageOutput implements PageOutput { SpeedControlPageOutput(PluginTask task, Schema schema, PageOutput pageOutput) { this.controller = new SpeedometerSpeedController(task, SpeedometerSpeedAggregator.getInstance(task)); this.schema = schema; - this.allocator = task.getBufferAllocator(); + this.allocator = getBufferAllocator(); this.delimiterLength = task.getDelimiter().length(); this.recordPaddingSize = task.getRecordPaddingSize(); - this.pageReader = new PageReader(schema); - this.timestampFormatters = Timestamps.newTimestampColumnFormatters(task, schema, task.getColumnOptions()); - this.pageBuilder = new PageBuilder(allocator, schema, pageOutput); + this.pageReader = CompatibilityUtil.getPageReader(schema); + this.timestampFormatters = newTimestampColumnFormatters(task, schema, task.getColumnOptions()); + this.pageBuilder = CompatibilityUtil.getPageBuilder(allocator, schema, pageOutput); this.controller.start(System.currentTimeMillis()); } + BufferAllocator getBufferAllocator() { + return Exec.getBufferAllocator(); + } + @Override public void add(Page page) { ColumnVisitorImpl visitor = new ColumnVisitorImpl(pageBuilder); @@ -137,6 +175,26 @@ public void close() { pageBuilder.close(); } + private static TimestampFormatter[] newTimestampColumnFormatters( + PluginTask task, Schema schema, + Map columnOptions) { + TimestampFormatter[] formatters = new TimestampFormatter[schema.getColumnCount()]; + int i = 0; + for (Column column : schema.getColumns()) { + if (column.getType() instanceof TimestampType) { + final TimestampColumnOption columnOption = columnOptions.get(column.getName()); + + formatters[i] = TimestampFormatter + .builder(columnOption.getFormat().orElse(task.getDefaultTimestampFormat()), true) + .setDefaultZoneFromString(columnOption.getTimeZoneId().orElse(task.getDefaultTimeZoneId())) + .setDefaultDateFromString(columnOption.getDate().orElse(task.getDefaultDate())) + .build(); + } + i++; + } + return formatters; + } + class ColumnVisitorImpl implements ColumnVisitor { private final PageBuilder pageBuilder; private long startRecordTime; @@ -185,13 +243,17 @@ public void stringColumn(Column column) { } } + @SuppressWarnings("deprecation") // For the use of org.embulk.spi.time.Timestamp, pageReader.getTimestamp and pageBuilder.setTimestamp @Override public void timestampColumn(Column column) { if (pageReader.isNull(column)) { speedMonitor(column); pageBuilder.setNull(column); + } else if (CompatibilityUtil.HAS_PAGE_BUILDER_TIMESTAMP_INSTANT) { + pageBuilder.setTimestamp(column, speedMonitor(column, pageReader.getTimestampInstant(column))); } else { - pageBuilder.setTimestamp(column, speedMonitor(column, pageReader.getTimestamp(column))); + Instant instant = speedMonitor(column, pageReader.getTimestamp(column).getInstant()); + pageBuilder.setTimestamp(column, org.embulk.spi.time.Timestamp.ofInstant(instant)); } } @@ -201,7 +263,7 @@ public void jsonColumn(Column column) { speedMonitor(column); pageBuilder.setNull(column); } else { - pageBuilder.setJson(column, speedMonitor(column, pageReader.getJson(column))); + pageBuilder.setJson(column, speedMonitor(column, pageReader.getJsonValue(column))); } } @@ -242,14 +304,14 @@ private String speedMonitor(Column column, String s) { return s; } - private Timestamp speedMonitor(Column column, Timestamp t) { + private Instant speedMonitor(Column column, Instant t) { speedMonitorForDelimiter(column); TimestampFormatter formatter = timestampFormatters[column.getIndex()]; controller.checkSpeedLimit(startRecordTime, formatter.format(t).length()); return t; } - private Value speedMonitor(Column column, Value v) { + private JsonValue speedMonitor(Column column, JsonValue v) { speedMonitorForDelimiter(column); // NOTE: This may not be good for performance. But, I have no other idea. String s = v.toJson(); diff --git a/src/main/java/org/embulk/filter/SpeedometerSpeedAggregator.java b/src/main/java/org/embulk/filter/SpeedometerSpeedAggregator.java index 972b0ef..7dfdf85 100644 --- a/src/main/java/org/embulk/filter/SpeedometerSpeedAggregator.java +++ b/src/main/java/org/embulk/filter/SpeedometerSpeedAggregator.java @@ -6,8 +6,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.embulk.filter.SpeedometerFilterPlugin.PluginTask; -import org.embulk.spi.Exec; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class SpeedometerSpeedAggregator { private static final Object INSTANCE_LOCK = new Object(); @@ -90,7 +90,7 @@ public void checkProgress(long nowTime, int logIntervalMillisec) { } Logger getLogger() { - return Exec.getLogger(SpeedometerFilterPlugin.class); + return LoggerFactory.getLogger(SpeedometerFilterPlugin.class); } long getGlobalStartTime() { diff --git a/src/test/java/org/embulk/filter/TestSpeedometerFilterPlugin.java b/src/test/java/org/embulk/filter/TestSpeedometerFilterPlugin.java index d3700c3..04fefe8 100644 --- a/src/test/java/org/embulk/filter/TestSpeedometerFilterPlugin.java +++ b/src/test/java/org/embulk/filter/TestSpeedometerFilterPlugin.java @@ -1,7 +1,7 @@ package org.embulk.filter; +import mockit.Expectations; import mockit.Mocked; -import mockit.NonStrictExpectations; import mockit.Verifications; import org.embulk.config.ConfigSource; @@ -40,35 +40,43 @@ public class TestSpeedometerFilterPlugin @Mocked FilterPlugin.Control control; + @Mocked + PageReader reader; + + @Mocked + PageBuilder builder; + + @Mocked + Page page; + @Test public void testTransaction() { - new NonStrictExpectations() {{ - config.loadConfig(PluginTask.class); result = task; + SpeedometerFilterPlugin plugin = new SpeedometerFilterPlugin(); + new Expectations(plugin) {{ + plugin.getTask(config); result = task; }}; - SpeedometerFilterPlugin plugin = new SpeedometerFilterPlugin(); plugin.transaction(config, schema, control); new Verifications() {{ - config.loadConfig(PluginTask.class); times = 1; control.run((TaskSource)any, schema); times = 1; }}; } @Test - public void testOpen(final @Mocked PageReader reader, final @Mocked PageBuilder builder, final @Mocked Page page) throws Exception { - new NonStrictExpectations() {{ - taskSource.loadTask(PluginTask.class); result = task; + public void testOpen() throws Exception { + SpeedometerFilterPlugin plugin = new SpeedometerFilterPlugin(); + new Expectations(plugin) {{ + plugin.getTask(taskSource); result = task; task.getDelimiter(); result = ""; reader.nextRecord(); result = true; result = false; + Exec.getPageReader(schema); result = reader; }}; - SpeedometerFilterPlugin plugin = new SpeedometerFilterPlugin(); PageOutput output = plugin.open(taskSource, schema, schema, inPageOutput); output.add(page); new Verifications() {{ - taskSource.loadTask(PluginTask.class); times = 1; builder.addRecord(); times = 1; builder.finish(); times = 0; reader.nextRecord(); times = 2; @@ -78,35 +86,33 @@ public void testOpen(final @Mocked PageReader reader, final @Mocked PageBuilder } @Test - public void testFinish(final @Mocked PageReader reader, final @Mocked PageBuilder builder, final @Mocked Page page) throws Exception { - new NonStrictExpectations() {{ - taskSource.loadTask(PluginTask.class); result = task; + public void testFinish() throws Exception { + SpeedometerFilterPlugin plugin = new SpeedometerFilterPlugin(); + new Expectations(plugin) {{ + plugin.getTask(taskSource); result = task; task.getDelimiter(); result = ""; }}; - SpeedometerFilterPlugin plugin = new SpeedometerFilterPlugin(); PageOutput output = plugin.open(taskSource, schema, schema, inPageOutput); output.finish(); new Verifications() {{ - taskSource.loadTask(PluginTask.class); times = 1; builder.finish(); times = 1; }}; } @Test - public void testClose(final @Mocked PageReader reader, final @Mocked PageBuilder builder, final @Mocked Page page) throws Exception { - new NonStrictExpectations() {{ - taskSource.loadTask(PluginTask.class); result = task; + public void testClose() throws Exception { + SpeedometerFilterPlugin plugin = new SpeedometerFilterPlugin(); + new Expectations(plugin) {{ + plugin.getTask(taskSource); result = task; task.getDelimiter(); result = ""; }}; - SpeedometerFilterPlugin plugin = new SpeedometerFilterPlugin(); PageOutput output = plugin.open(taskSource, schema, schema, inPageOutput); output.close(); new Verifications() {{ - taskSource.loadTask(PluginTask.class); times = 1; builder.close(); times = 1; }}; } diff --git a/src/test/java/org/embulk/filter/TestSpeedometerSpeedAggregator.java b/src/test/java/org/embulk/filter/TestSpeedometerSpeedAggregator.java index e4ad2f2..4ba0cfa 100644 --- a/src/test/java/org/embulk/filter/TestSpeedometerSpeedAggregator.java +++ b/src/test/java/org/embulk/filter/TestSpeedometerSpeedAggregator.java @@ -6,17 +6,15 @@ import static org.junit.Assert.assertTrue; import org.embulk.filter.SpeedometerFilterPlugin.PluginTask; -import org.embulk.spi.Exec; import org.junit.Test; import org.slf4j.Logger; +import mockit.Expectations; import mockit.Mocked; -import mockit.NonStrictExpectations; import mockit.Verifications; public class TestSpeedometerSpeedAggregator { @Mocked SpeedometerSpeedController controller; - @Mocked Exec exec; @Mocked PluginTask task; @Test @@ -52,7 +50,7 @@ public void testStartController() { @Test public void testStopController() { - new NonStrictExpectations() {{ + new Expectations() {{ controller.getTotalBytes(); result = 11; controller.getTotalRecords(); result = 5; }}; @@ -69,11 +67,11 @@ public void testStopController() { @Test public void testStopControllerShowOverallMessage(@Mocked final Logger logger) { - new NonStrictExpectations() {{ - Exec.getLogger(SpeedometerFilterPlugin.class); result = logger; + SpeedometerSpeedAggregator aggregator = new SpeedometerSpeedAggregator(); + new Expectations(aggregator) {{ + aggregator.getLogger(); result = logger; }}; - SpeedometerSpeedAggregator aggregator = new SpeedometerSpeedAggregator(); long nowTime = System.currentTimeMillis(); aggregator.startController(controller, nowTime); aggregator.startController(controller, nowTime); @@ -81,13 +79,13 @@ public void testStopControllerShowOverallMessage(@Mocked final Logger logger) { aggregator.stopController(controller); new Verifications() {{ - logger.info(withAny("Overall message.")); times = 2; + logger.info(withAny("Overall message.")); times = 1; }}; } @Test public void testGetSpeedLimitForController() { - new NonStrictExpectations() {{ + new Expectations() {{ controller.getSpeedLimit(); result = 10; }}; diff --git a/src/test/java/org/embulk/filter/TestSpeedometerSpeedController.java b/src/test/java/org/embulk/filter/TestSpeedometerSpeedController.java index 02f4f72..a238e5a 100644 --- a/src/test/java/org/embulk/filter/TestSpeedometerSpeedController.java +++ b/src/test/java/org/embulk/filter/TestSpeedometerSpeedController.java @@ -3,7 +3,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import mockit.Mocked; -import mockit.NonStrictExpectations; +import mockit.Expectations; import mockit.Verifications; import org.embulk.filter.SpeedometerFilterPlugin.PluginTask; @@ -20,7 +20,7 @@ public class TestSpeedometerSpeedController { @Test public void testSpeedometerSpeedController() { - new NonStrictExpectations() {{ + new Expectations() {{ task.getSpeedLimit(); result = 1L; task.getMaxSleepMillisec(); result = 2; task.getLogIntervalSeconds(); result = 3; @@ -51,7 +51,7 @@ public void testStop() { @Test public void testGetTotalBytes() { - new NonStrictExpectations() {{ + new Expectations() {{ task.getSpeedLimit(); result = 1L; task.getMaxSleepMillisec(); result = 2; task.getLogIntervalSeconds(); result = 3; @@ -69,7 +69,7 @@ public void testGetTotalBytes() { @Test public void testGetPeriodBytesPerSec() { - new NonStrictExpectations() {{ + new Expectations() {{ task.getSpeedLimit(); result = 1L; task.getMaxSleepMillisec(); result = 2; task.getLogIntervalSeconds(); result = 3; @@ -85,7 +85,7 @@ public void testGetPeriodBytesPerSec() { @Test public void testCheckSpeedLimit() { - new NonStrictExpectations() {{ + new Expectations() {{ task.getSpeedLimit(); result = 1L; task.getMaxSleepMillisec(); result = 2; task.getLogIntervalSeconds(); result = 3; @@ -113,7 +113,7 @@ public void testRenewPeriod() { @Test public void testGetTotalRecords() { - new NonStrictExpectations() {{ + new Expectations() {{ task.getSpeedLimit(); result = 1L; task.getMaxSleepMillisec(); result = 2; task.getLogIntervalSeconds(); result = 3; @@ -132,7 +132,7 @@ public void testGetTotalRecords() { @Test public void testGetPeriodRecordsPerSec() { - new NonStrictExpectations() {{ + new Expectations() {{ task.getSpeedLimit(); result = 1L; task.getMaxSleepMillisec(); result = 2; task.getLogIntervalSeconds(); result = 3;