From c7f0503ffc0be03a8d7cb789b9c766cb1ac262a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Thu, 12 Jun 2025 10:00:38 +0100 Subject: [PATCH 01/31] Add "International Sales" to data generator --- .../streamshub/kafka/data/generator/Main.java | 2 + .../examples/InternationalSalesData.java | 59 +++++++++++++++++++ .../main/resources/internationalSales.avsc | 12 ++++ .../recommendation-app/data-generator.yaml | 2 +- 4 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/examples/InternationalSalesData.java create mode 100644 tutorials/data-generator/src/main/resources/internationalSales.avsc diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Main.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Main.java index e789018..03cc566 100644 --- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Main.java +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Main.java @@ -2,6 +2,7 @@ import com.github.streamshub.kafka.data.generator.examples.ClickStreamData; import com.github.streamshub.kafka.data.generator.examples.SalesData; +import com.github.streamshub.kafka.data.generator.examples.InternationalSalesData; import java.util.Arrays; import java.util.List; @@ -21,6 +22,7 @@ private static Data getDataClass(String dataType) { switch(dataType) { case "clickStream" -> data = new ClickStreamData(); case "sales" -> data = new SalesData(); + case "internationalSales" -> data = new InternationalSalesData(); default -> throw new RuntimeException("Unknown data type " + dataType); } return data; diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/examples/InternationalSalesData.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/examples/InternationalSalesData.java new file mode 100644 index 0000000..98473c6 --- /dev/null +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/examples/InternationalSalesData.java @@ -0,0 +1,59 @@ +package com.github.streamshub.kafka.data.generator.examples; + +import com.github.streamshub.kafka.data.generator.Data; +import com.github.streamshub.kafka.data.generator.schema.InternationalSales; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificRecord; + +import java.util.Random; + +public class InternationalSalesData implements Data { + private static final char[] CURRENCY_SYMBOLS = {'€', '₹', '₺', '฿', '₴', '₮'}; + private final Random random = new Random(); + + public String topic() { + return "flink.international.sales.records"; + } + public Schema schema() { + return InternationalSales.SCHEMA$; + } + + public SpecificRecord generate() { + return InternationalSales.newBuilder() + .setInvoiceId(generateInvoiceId()) + .setUserId(generateUserId()) + .setProductId(generateProductId()) + .setQuantity(generateQuantity()) + .setUnitCost(generateUnitCost()) + .build(); + } + public String generateCsv() { + return String.join(",", + generateInvoiceId(), + generateUserId(), + generateProductId(), + generateQuantity(), + generateUnitCost()); + } + + private String generateInvoiceId() { + return String.valueOf(Math.abs(random.nextLong())); + } + + private String generateUserId() { + return "user-" + Math.abs(random.nextInt(100)); + } + + private String generateProductId() { + return String.valueOf(Math.abs(random.nextInt(200))); + } + + private String generateQuantity() { + return String.valueOf(Math.abs(random.nextInt(3) + 1)); + } + + private String generateUnitCost() { + char randomCurrencySymbol = CURRENCY_SYMBOLS[random.nextInt(CURRENCY_SYMBOLS.length)]; + return randomCurrencySymbol + String.valueOf(Math.abs(random.nextInt(1000) + 1)); + } +} diff --git a/tutorials/data-generator/src/main/resources/internationalSales.avsc b/tutorials/data-generator/src/main/resources/internationalSales.avsc new file mode 100644 index 0000000..7674cd2 --- /dev/null +++ b/tutorials/data-generator/src/main/resources/internationalSales.avsc @@ -0,0 +1,12 @@ +{ + "namespace": "com.github.streamshub.kafka.data.generator.schema", + "type": "record", + "name": "InternationalSales", + "fields": [ + {"name": "user_id", "type": "string"}, + {"name": "product_id", "type": "string"}, + {"name": "invoice_id", "type": "string"}, + {"name": "quantity", "type": "string"}, + {"name": "unit_cost", "type": "string"} + ] +} \ No newline at end of file diff --git a/tutorials/recommendation-app/data-generator.yaml b/tutorials/recommendation-app/data-generator.yaml index 8572cd8..b59b09b 100644 --- a/tutorials/recommendation-app/data-generator.yaml +++ b/tutorials/recommendation-app/data-generator.yaml @@ -21,7 +21,7 @@ spec: - name: KAFKA_BOOTSTRAP_SERVERS value: "my-cluster-kafka-bootstrap.flink.svc:9092" - name: DATA - value: "clickStream,sales" + value: "clickStream,sales,internationalSales" - name: USE_APICURIO_REGISTRY value: "true" - name: REGISTRY_URL From 560916cadfef7285dc7c15a0dfb0d0417efdc748 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Thu, 12 Jun 2025 10:03:26 +0100 Subject: [PATCH 02/31] Add "CurrencyConverter" UDF code --- .gitignore | 3 + tutorials/currency-converter/pom.xml | 72 +++++++++++++++++++ .../flink/functions/CurrencyConverter.java | 31 ++++++++ tutorials/pom.xml | 1 + .../flink-session-udf.yaml | 18 +++++ 5 files changed, 125 insertions(+) create mode 100644 tutorials/currency-converter/pom.xml create mode 100644 tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java create mode 100644 tutorials/user-defined-functions/flink-session-udf.yaml diff --git a/.gitignore b/.gitignore index 172e1c3..30a3f02 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,6 @@ dependency-reduced-pom.xml # Rendered Docs *.html + +# Maven Wrapper files +.mvn diff --git a/tutorials/currency-converter/pom.xml b/tutorials/currency-converter/pom.xml new file mode 100644 index 0000000..13ab073 --- /dev/null +++ b/tutorials/currency-converter/pom.xml @@ -0,0 +1,72 @@ + + + 4.0.0 + + + com.github.streamshub + flink-sql-tutorials + 0.1.0-SNAPSHOT + + + currency-converter + + + UTF-8 + 17 + + + + + org.apache.flink + flink-table-api-java + 2.0.0 + provided + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.1.1 + + + + package + + shade + + + + + com.google.code.findbugs:jsr305 + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + com.github.streamshub.flink.functions.currency-converter + + + + + + + + + + + diff --git a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java new file mode 100644 index 0000000..400f57f --- /dev/null +++ b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java @@ -0,0 +1,31 @@ +package com.github.streamshub.flink.functions; + +import java.util.Map; + +import org.apache.flink.table.functions.ScalarFunction; + +public class CurrencyConverter extends ScalarFunction { + // https://www.unicode.org/charts/nameslist/n_20A0.html + // https://www.iso.org/iso-4217-currency-codes.html + private static final Map CURRENCY_SYMBOL_ISO_MAP = Map.of( + '€', "EUR", + '₹', "INR", + '₺', "TRY", + '฿', "THB", + '₴', "UAH", + '₮', "MNT" + ); + + // e.g. currencyAmount = "€100" + public String eval(String currencyAmount) { + char currencySymbol = currencyAmount.charAt(0); // e.g. '€' + String amount = currencyAmount.substring(1); // e.g. "100" + + String currencyIsoCode = CURRENCY_SYMBOL_ISO_MAP.get(currencySymbol); // e.g. '€' => "EUR" + if (currencyIsoCode == null) { + currencyIsoCode = "???"; + } + + return amount + " " + currencyIsoCode; // e.g. "100 EUR" + } +} diff --git a/tutorials/pom.xml b/tutorials/pom.xml index bfdd425..671e1aa 100644 --- a/tutorials/pom.xml +++ b/tutorials/pom.xml @@ -22,6 +22,7 @@ data-generator + currency-converter diff --git a/tutorials/user-defined-functions/flink-session-udf.yaml b/tutorials/user-defined-functions/flink-session-udf.yaml new file mode 100644 index 0000000..52efb1f --- /dev/null +++ b/tutorials/user-defined-functions/flink-session-udf.yaml @@ -0,0 +1,18 @@ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: session-cluster-udf +spec: + image: quay.io/streamshub/flink-sql-runner:0.2.0 + flinkVersion: v2_0 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 2 From 981bff57f8ba6c696329eabc120bc11489e4b470 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Thu, 12 Jun 2025 15:39:18 +0100 Subject: [PATCH 03/31] Add User Defined Function tutorial --- docs/user-defined-functions/index.md | 495 +++++++++++++++++++++++++++ 1 file changed, 495 insertions(+) create mode 100644 docs/user-defined-functions/index.md diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md new file mode 100644 index 0000000..f1795ba --- /dev/null +++ b/docs/user-defined-functions/index.md @@ -0,0 +1,495 @@ ++++ +title = 'Simple User Defined Functions' ++++ + +> Note: This tutorial is mainly focused on creating a simple [Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/overview/) [User Defined Function (UDF)](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/). For detailed information on working with [Flink ETL Jobs](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/learn-flink/etl/) and [Session Clusters](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#session-cluster-deployments), look at the [Interactive ETL example](../interactive-etl/index.md). + +[Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/overview/) is a powerful tool for data exploration, manipulation and inter-connection. +It allows you to access the power of Flink's distributed stream processing abilities with a familiar interface. +In this tutorial we show how to write a simple [User Defined Function (UDF)](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/) in Java and use it to manipulate data in a Flink SQL query running on a [Flink session cluster](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#session-cluster-deployments). + +The tutorial is based on the StreamsHub [Flink SQL Examples](https://github.com/streamshub/flink-sql-examples) repository and the code can be found under the `tutorials/user-defined-functions` and `tutorials/currency-converter` directories. + +## Scenario + +### Source Data Table + +The data generator application creates a topic (`flink.international.sales.records`) containing international sales records. +The schema for this topic can be seen in the `data-generator/src/main/resources/internationalSales.avsc`: + +```avroschema +{ + "namespace": "com.github.streamshub.kafka.data.generator.schema", + "type": "record", + "name": "InternationalSales", + "fields": [ + {"name": "user_id", "type": "string"}, + {"name": "product_id", "type": "string"}, + {"name": "invoice_id", "type": "string"}, + {"name": "quantity", "type": "string"}, + {"name": "unit_cost", "type": "string"} + ] +} +``` + +However, it looks like the person who owns this schema repeated the same mistake he did for the Sales schema we looked at in the [Interactive ETL example](../interactive-etl/index.md), and decided to once again include the currency symbol at the start of the `unit_cost` field! (at least he's consistent...). + +```shell +$ kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ + ./bin/kafka-console-consumer.sh --bootstrap-server \ + localhost:9092 \ + --topic flink.international.sales.records + +user-82130&53972644729678620433 + ₹192 +user-1619&74772726277194883031 + ฿638 +... +``` + +Instead of having to deal with [Unicode currency symbols](https://www.unicode.org/charts/nameslist/n_20A0.html) in our Flink SQL queries, we can create a simple UDF to strip the currency symbol from the `unit_cost` field and add an equivalent [ISO 4217](https://www.iso.org/iso-4217-currency-codes.html) currency code to the end of the field instead e.g. "€192" will become "192 EUR". + +We will basically be replacing a cumbersome to maintain query e.g.: + +```sql +// Think about how complex this query would become if the unit_cost field +// required more complex parsing logic +SELECT + invoice_id, + CASE + WHEN LEFT(unit_cost, 1) = '€' THEN CONCAT(SUBSTRING(unit_cost, 2), ' EUR') + WHEN LEFT(unit_cost, 1) = '₹' THEN CONCAT(SUBSTRING(unit_cost, 2), ' INR') + WHEN LEFT(unit_cost, 1) = '₺' THEN CONCAT(SUBSTRING(unit_cost, 2), ' TRY') + WHEN LEFT(unit_cost, 1) = '฿' THEN CONCAT(SUBSTRING(unit_cost, 2), ' THB') + WHEN LEFT(unit_cost, 1) = '₴' THEN CONCAT(SUBSTRING(unit_cost, 2), ' UAH') + WHEN LEFT(unit_cost, 1) = '₮' THEN CONCAT(SUBSTRING(unit_cost, 2), ' MNT') + // ... imagine many more currency symbols here ... + ELSE CONCAT(SUBSTRING(unit_cost, 2), ' ???') + END AS iso_unit_cost, + unit_cost, + quantity +FROM InternationalSalesRecordTable; +``` + +with this: + +```sql +SELECT + invoice_id, + currency_convert(unit_cost) AS iso_unit_cost, + unit_cost, + quantity +FROM InternationalSalesRecordTable; +``` + +## Creating the User Defined Function + +> Note: You can find the full completed code for the UDF in the `tutorials/currency-converter` directory. + +### Creating the Maven project + +First, we will create a Maven project in our home directory, and `cd` into it: + +```shell +cd ~ + +mvn archetype:generate \ + -DgroupId=com.github.example \ + -DartifactId=currency-converter \ + -DarchetypeArtifactId=maven-archetype-quickstart \ + -DarchetypeVersion=1.5 \ + -DinteractiveMode=false + +cd ~/currency-converter +``` + +We can remove the provided tests and their dependencies, as we don't need them for this tutorial: + +```shell +rm -r src/test + +sed -i -e '//,/<\/dependencyManagement>/d' pom.xml + +sed -i -e '//,/<\/dependencies>/d' pom.xml +``` + +Your `pom.xml` should now look something like this: + +```xml + + + 4.0.0 + + com.github.example + currency-converter + 1.0-SNAPSHOT + + currency-converter + + + UTF-8 + 17 + + + + + + + + maven-clean-plugin + 3.4.0 + + + + maven-resources-plugin + 3.3.1 + + + maven-compiler-plugin + 3.13.0 + + + maven-surefire-plugin + 3.3.0 + + + maven-jar-plugin + 3.4.2 + + + maven-install-plugin + 3.1.2 + + + maven-deploy-plugin + 3.1.2 + + + + maven-site-plugin + 3.12.1 + + + maven-project-info-reports-plugin + 3.6.1 + + + + + +``` + +### Renaming the `App` class + +Next, we will rename the `App` class to `CurrencyConverter` and rename the file accordingly: + +```shell +sed -i -e 's/App/CurrencyConverter/g' src/main/java/com/github/example/App.java + +mv src/main/java/com/github/example/App.java src/main/java/com/github/example/CurrencyConverter.java +``` + +The project should still build and run successfully at this point, we can run the following commands to verify: + +```shell +mvn clean package + +java -cp target/currency-converter-1.0-SNAPSHOT.jar com.github.example.CurrencyConverter +# Should print "Hello World!" +``` + +### Adding the core Flink API dependency + +If we look back at the [Scenario](#scenario) section, we can see that all we want to do is map one string (e.g. "€100") into a new string (e.g. "100 EUR"). We can do this by writing a [Scalar Function](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/#scalar-functions). + +To make our UDF, we will need to extend the [`ScalarFunction`](https://nightlies.apache.org/flink/flink-docs-release-2.0/api/java/org/apache/flink/table/functions/ScalarFunction.html) base class. Let's add a dependency to our `pom.xml` so we can do that: + +```xml +currency-converter + + + + + org.apache.flink + flink-table-api-java + 2.0.0 + provided + + +``` + +> Note: Notice how we should specify the `provided` scope, in order to exclude the dependency from our JAR. We should to do this for any core Flink API dependencies we add. Otherwise, the core Flink API dependencies in our JAR [could clash with some of our other dependency versions](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/configuration/maven/#adding-dependencies-to-the-project). + +We don't need any external dependencies in our JAR (apart from Flink). But, if we did want to add some, we would need to either [shade them into an uber/fat JAR or add them to the classpath of the distribution](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/configuration/maven/#packaging-the-application). If you want to do the former, the [Flink docs provide a template](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/configuration/maven/#template-for-creating-an-uberfat-jar-with-dependencies) on how to use the [Maven Shade Plugin](https://maven.apache.org/plugins/maven-shade-plugin/index.html) to do so. + +### Extending the `ScalarFunction` base class + +Now that we have added the only dependency we need, we can implement our `CurrencyConverter` UDF. + +Let's start by making our `CurrencyConverter` class extend the `ScalarFunction` base class. We can also remove the `main` method since we won't need it: + +```java +// ~/currency-converter/src/main/java/com/github/example/CurrencyConverter.java +package com.github.example; + +import org.apache.flink.table.functions.ScalarFunction; + +public class CurrencyConverter extends ScalarFunction {} +``` + +This function doesn't do anything yet. For that, we need it to declare a public `eval` method. + +Since we'll only be passing it one argument (the `unit_cost` field), we can declare that the method takes in a single `String` argument and also returns a `String`: + +```java +package com.github.example; + +import org.apache.flink.table.functions.ScalarFunction; + +public class CurrencyConverter extends ScalarFunction { + // (You can name the parameter whatever you like) + // e.g. currencyAmount = "€100" + public String eval(String currencyAmount) { + // logic will go here + } +} +``` + +Flink's [Automatic Type Inference](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/#automatic-type-inference) will use reflection to derive SQL data types for the argument and result of our UDF. If you want to override this behavior, you can [explicitly specify the types]((https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/#automatic-type-inference)), but in this case we will keep it simple and let Flink decide for us. + +If we look at the [Data Generator](https://github.com/streamshub/flink-sql-examples/blob/main/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/examples/InternationalSalesData.java) in the StreamsHub [Flink SQL Examples](https://github.com/streamshub/flink-sql-examples) repository, we can see the possible currency symbols that can appear in the `unit_cost` field: + +```java +public class InternationalSalesData implements Data { + private static final char[] CURRENCY_SYMBOLS = {'€', '₹', '₺', '฿', '₴', '₮'}; + // ... other fields and methods +} +``` + +In our UDF, we can create a `Map` of these symbols to their corresponding [ISO 4217](https://www.iso.org/iso-4217-currency-codes.html) currency codes. We will use these when converting the `unit_cost` field into our desired format. + +```java +package com.github.example; + +import java.util.Map; + +import org.apache.flink.table.functions.ScalarFunction; + +public class CurrencyConverter extends ScalarFunction { + // https://www.unicode.org/charts/nameslist/n_20A0.html + // https://www.iso.org/iso-4217-currency-codes.html + private static final Map CURRENCY_SYMBOL_ISO_MAP = Map.of( + '€', "EUR", + '₹', "INR", + '₺', "TRY", + '฿', "THB", + '₴', "UAH", + '₮', "MNT" + ); + + // e.g. currencyAmount = "€100" + public String eval(String currencyAmount) { + // logic will go here + } +} +``` + +### Implementing the function logic + +Now, we can begin implementing our function logic in the `eval` method. + +As a reminder, we want to convert a string like "€100" into "100 EUR". To do this, we can use the following steps: + +1. Get the first character of the string, which is the currency symbol (e.g. '€'). +2. Get the rest of the string, which is the amount (e.g. "100"). +3. Look up the currency symbol in our `Map` to get the corresponding currency code (e.g. '€' => "EUR"). +4. If the lookup returned `null` (currency symbol was not found in the `Map`), we can return "???" as the currency code. +5. Concatenate the currency code to the amount, and return the result (e.g. "100 EUR"). + +A possible implementation could look like this: + +```java +package com.github.example; + +import java.util.Map; + +import org.apache.flink.table.functions.ScalarFunction; + +public class CurrencyConverter extends ScalarFunction { + // https://www.unicode.org/charts/nameslist/n_20A0.html + // https://www.iso.org/iso-4217-currency-codes.html + private static final Map CURRENCY_SYMBOL_ISO_MAP = Map.of( + '€', "EUR", + '₹', "INR", + '₺', "TRY", + '฿', "THB", + '₴', "UAH", + '₮', "MNT" + ); + + // Value of passed field (e.g. "unit_cost") is passed in e.g. "€100" + public String eval(String currencyAmount) { + // 1. Get the first character of the string, which is the currency symbol (e.g. '€'). + char currencySymbol = currencyAmount.charAt(0); + + // 2. Get the rest of the string, which is the amount (e.g. "100"). + String amount = currencyAmount.substring(1); + + // 3. Look up the currency symbol in our Map to get the corresponding currency code (e.g. '€' => "EUR"). + String currencyIsoCode = CURRENCY_SYMBOL_ISO_MAP.get(currencySymbol); + + // 4. If the currency symbol is not found in the Map, we can return "???" as the currency code. + if (currencyIsoCode == null) { + currencyIsoCode = "???"; + } + + // 5. Concatenate the currency code to the amount, and return the result (e.g. "100 EUR"). + return amount + " " + currencyIsoCode; + } +} +``` + +### Building the JAR + +After implementing the logic, we can build our JAR: + +```shell +mvn clean package +``` + +There should be no compilation errors, and there should be a JAR in the `target` directory. + +```shell +$ ls -lh ~/currency-converter/target/currency-converter-1.0-SNAPSHOT.jar +-rw-r--r--. 1 3.6K Jun 11 16:23 /home//currency-converter/target/currency-converter-1.0-SNAPSHOT.jar +``` + +If you want be extra sure everything is there, you can check if the `CurrencyConverter` class is in the JAR and if running the JAR fails as expected: + +```shell +$ jar tf ~/currency-converter/target/currency-converter-1.0-SNAPSHOT.jar | grep "CurrencyConverter" +com/github/example/CurrencyConverter.class + +$ java -cp ~/currency-converter/target/currency-converter-1.0-SNAPSHOT.jar com.github.example.CurrencyConverter +Error: Could not find or load main class com.github.example.CurrencyConverter +Caused by: java.lang.NoClassDefFoundError: org/apache/flink/table/functions/ScalarFunction +``` + +We can now try out our new UDF! + +## Using the User Defined Function + +### Dependencies + +In order to try the UDF you will need: + +- [Minikube](https://minikube.sigs.k8s.io/docs/) +- [kubectl](https://kubernetes.io/docs/tasks/tools/#kubectl) +- [Helm](https://helm.sh/) +- [Docker](https://www.docker.com/) or [Podman](https://podman.io/) +- [Maven](https://maven.apache.org/install.html) + +### Setup + +> Note: If you want more information on what the steps below are doing, look at the [Interactive ETL example](../interactive-etl/index.md) setup which is almost identical. + +1. Spin up a [minikube](https://minikube.sigs.k8s.io/docs/) cluster: + + ```shell + minikube start --cpus 4 --memory 16G + ``` + +2. From the main `tutorials` directory, run the data generator setup script: + + ```shell + ./scripts/data-gen-setup.sh + ``` + +3. (Optional) Verify that the test data is flowing correctly (wait a few seconds for messages to start flowing): + + ```shell + kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ + ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink.international.sales.records + ``` + +4. Deploy a [Flink session cluster](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#session-cluster-deployments): + + ```shell + kubectl -n flink apply -f user-defined-functions/flink-session-udf.yaml + ``` + +### Running the UDF + +Since we want to try the UDF in different scenarios, we will create a container containing the Flink SQL CLI to run our queries (see the [Interactive ETL example](../interactive-etl/index.md) for details on the CLI). + +First, we need to port forward the Flink Job Manager pod so the Flink SQL CLI can access it: + +```shell +kubectl -n flink port-forward 8081:8081 +``` + +The job manager pod will have the name format `session-cluster-udf-`, your `kubectl` should tab-complete the name. +If it doesn't then you can find the job manager name by running `kubectl -n flink get pods`. + +Next, we will create a container with our JAR mounted into it: + +```shell +podman run -it --rm --net=host \ + -v ~/currency-converter/target/currency-converter-1.0-SNAPSHOT.jar:/opt/flink/opt/currency-converter-1.0-SNAPSHOT.jar:Z \ + quay.io/streamshub/flink-sql-runner:0.2.0 \ + /opt/flink/bin/sql-client.sh embedded +``` + +> Note: Don't forget the `:Z` at the end of the volume mount if using a system with SELinux! Otherwise, you will get a permission error when trying to use the JAR later. + +Once we're in the Flink SQL CLI, we will first create a table for the generated international sales records: + +```sql +CREATE TABLE InternationalSalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND +) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.international.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', + 'properties.group.id' = 'international-sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' +); +``` + +We can do a simple query to verify that the table was created correctly and that the data is flowing (give it a few seconds to start receiving data): + +```sql +SELECT * FROM InternationalSalesRecordTable; +``` + +If that worked, we can now register our UDF: + +```sql +CREATE FUNCTION currency_convert +AS 'com.github.example.CurrencyConverter' +USING JAR '/opt/flink/opt/currency-converter-1.0-SNAPSHOT.jar'; +``` + +> Note: This statement may succeed even if the JAR was not found or has insufficient permissions. You will likely only find this out when you try to use the UDF in a query. + +Now, we can use our UDF in a query: + +```sql +SELECT + invoice_id, + currency_convert(unit_cost) AS iso_unit_cost, + unit_cost, + quantity +FROM InternationalSalesRecordTable; +``` + +You should start seeing results with both a `unit_cost` field and an `iso_unit_cost` field containing the output of our UDF. From ad2ae448a42ae0d7a33aff5e79f5646e2dd76eab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Fri, 13 Jun 2025 12:17:08 +0100 Subject: [PATCH 04/31] Make UDF `TEMPORARY` --- docs/user-defined-functions/index.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index f1795ba..9f18230 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -471,14 +471,16 @@ We can do a simple query to verify that the table was created correctly and that SELECT * FROM InternationalSalesRecordTable; ``` -If that worked, we can now register our UDF: +If that worked, we can now register our UDF as a [temporary catalog function](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/create/#create-function): ```sql -CREATE FUNCTION currency_convert +CREATE TEMPORARY FUNCTION currency_convert AS 'com.github.example.CurrencyConverter' USING JAR '/opt/flink/opt/currency-converter-1.0-SNAPSHOT.jar'; ``` +> Note: Temporary catalog functions [only live as long as the current session](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/overview/#types-of-functions). You can omit the `TEMPORARY` keyword to create a catalog function that persists across sessions. + > Note: This statement may succeed even if the JAR was not found or has insufficient permissions. You will likely only find this out when you try to use the UDF in a query. Now, we can use our UDF in a query: From c9fa11e75d43196dc882de693b9850005fada7c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Fri, 13 Jun 2025 13:31:02 +0100 Subject: [PATCH 05/31] Persist query with UDF to Kafka --- docs/user-defined-functions/index.md | 82 +++++++++++++++++++++++++++- 1 file changed, 79 insertions(+), 3 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 9f18230..b7a57d9 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -36,8 +36,7 @@ However, it looks like the person who owns this schema repeated the same mistake ```shell $ kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ - ./bin/kafka-console-consumer.sh --bootstrap-server \ - localhost:9092 \ + ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic flink.international.sales.records user-82130&53972644729678620433 @@ -494,4 +493,81 @@ SELECT FROM InternationalSalesRecordTable; ``` -You should start seeing results with both a `unit_cost` field and an `iso_unit_cost` field containing the output of our UDF. +You should start seeing results with both a `unit_cost` field and an `iso_unit_cost` field containing the output of our UDF! + +We can also use the UDF in more complex queries e.g. to filter for records with a specific currency and quantity: + +```sql +SELECT + DISTINCT(product_id), + iso_unit_cost, + quantity + FROM ( + SELECT + invoice_id, + user_id, + product_id, + CAST(quantity AS INT) AS quantity, + currency_convert(unit_cost) AS iso_unit_cost, + unit_cost, + purchase_time + FROM InternationalSalesRecordTable + ) +WHERE + RIGHT(iso_unit_cost, 3) = 'EUR' AND quantity > 1; +``` + +> Note: This query might take a while to return results, since there are many currencies used in the data! + +### Persisting back to Kafka + +Just like in the [Interactive ETL example](../interactive-etl/index.md), we can create a new table to persist the output of our query back to Kafka (look at that example for an explanation of the steps below). This way we don't have to run the query every time we want to access the formatted cost. + +First, let's define the table, and specify `csv` as the format so we don't have to provide a schema: + +```sql +CREATE TABLE IsoInternationalSalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity INT, + iso_unit_cost STRING, + purchase_time TIMESTAMP(3), + PRIMARY KEY (`user_id`) NOT ENFORCED +) WITH ( + 'connector' = 'upsert-kafka', + 'topic' = 'flink.iso.international.sales.records.interactive', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', + 'properties.client.id' = 'sql-cleaning-client', + 'properties.transaction.timeout.ms' = '800000', + 'key.format' = 'csv', + 'value.format' = 'csv', + 'value.fields-include' = 'ALL' +); +``` + +Next, let's insert the results of the formatting query into it: + +```sql +INSERT INTO IsoInternationalSalesRecordTable +SELECT + invoice_id, + user_id, + product_id, + CAST(quantity AS INT), + currency_convert(unit_cost), + purchase_time +FROM InternationalSalesRecordTable; +``` + +Finally, we can verify the data is being written to the new topic by running the following command in a new terminal: + +```shell +$ kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ + ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ + --topic flink.iso.international.sales.records.interactive + +5688844959819606179,user-96,0,1,"448 INR","2025-06-13 11:28:29.722" +7208742491425008088,user-87,106,3,"587 UAH","2025-06-13 11:28:32.725" +8796404564173987612,user-70,105,1,"399 EUR","2025-06-13 11:28:35.728" +``` From b80b7d2ba1fd9a5e8fdbe842ffd930acc0cab64c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Fri, 13 Jun 2025 16:26:10 +0100 Subject: [PATCH 06/31] Add note for Flink Archetype --- docs/user-defined-functions/index.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index b7a57d9..a6466c3 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -102,6 +102,8 @@ mvn archetype:generate \ cd ~/currency-converter ``` +> Note: Flink provides a [Maven Archetype and quickstart script](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/configuration/overview/#getting-started) for getting started. However, it includes a lot of dependencies and boilerplate we don't need for this tutorial, so we will start with a minimal Maven project instead. + We can remove the provided tests and their dependencies, as we don't need them for this tutorial: ```shell From 073eadcb650e3565541632045dcb3ce253f3d97c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Fri, 13 Jun 2025 16:50:22 +0100 Subject: [PATCH 07/31] Change where UDF is mounted --- docs/user-defined-functions/index.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index a6466c3..0c25390 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -437,11 +437,13 @@ Next, we will create a container with our JAR mounted into it: ```shell podman run -it --rm --net=host \ - -v ~/currency-converter/target/currency-converter-1.0-SNAPSHOT.jar:/opt/flink/opt/currency-converter-1.0-SNAPSHOT.jar:Z \ + -v ~/currency-converter/target/currency-converter-1.0-SNAPSHOT.jar:/opt/currency-converter-1.0-SNAPSHOT.jar:Z \ quay.io/streamshub/flink-sql-runner:0.2.0 \ /opt/flink/bin/sql-client.sh embedded ``` +> Note: Flink [ships optional dependencies in `/opt`](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/configuration/advanced/#anatomy-of-the-flink-distribution), so that's a good place to mount our JAR. + > Note: Don't forget the `:Z` at the end of the volume mount if using a system with SELinux! Otherwise, you will get a permission error when trying to use the JAR later. Once we're in the Flink SQL CLI, we will first create a table for the generated international sales records: @@ -477,7 +479,7 @@ If that worked, we can now register our UDF as a [temporary catalog function](ht ```sql CREATE TEMPORARY FUNCTION currency_convert AS 'com.github.example.CurrencyConverter' -USING JAR '/opt/flink/opt/currency-converter-1.0-SNAPSHOT.jar'; +USING JAR '/opt/currency-converter-1.0-SNAPSHOT.jar'; ``` > Note: Temporary catalog functions [only live as long as the current session](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/overview/#types-of-functions). You can omit the `TEMPORARY` keyword to create a catalog function that persists across sessions. From 8f72f9b551038f30ede59963f2f0750ef7f3fdf4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 17 Jun 2025 16:01:13 +0100 Subject: [PATCH 08/31] Apply suggestions from code review Co-authored-by: Thomas Cooper Update docs/user-defined-functions/index.md Co-authored-by: Thomas Cooper Update docs/user-defined-functions/index.md Co-authored-by: Thomas Cooper Apply suggestions from code review Co-authored-by: Thomas Cooper --- docs/user-defined-functions/index.md | 30 ++++++++++++++++------------ 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 0c25390..80df25c 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -5,7 +5,9 @@ title = 'Simple User Defined Functions' > Note: This tutorial is mainly focused on creating a simple [Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/overview/) [User Defined Function (UDF)](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/). For detailed information on working with [Flink ETL Jobs](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/learn-flink/etl/) and [Session Clusters](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#session-cluster-deployments), look at the [Interactive ETL example](../interactive-etl/index.md). [Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/overview/) is a powerful tool for data exploration, manipulation and inter-connection. -It allows you to access the power of Flink's distributed stream processing abilities with a familiar interface. +Flink SQL has many [built-in functions](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/systemfunctions/#system-built-in-functions), that allow you to extract and manipulate data from the many sources that Flink supports. +However, sometimes you need to be able to do operations not covered by these built-in functions. +In that situation Flink gives you the option of creating your own functions. In this tutorial we show how to write a simple [User Defined Function (UDF)](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/) in Java and use it to manipulate data in a Flink SQL query running on a [Flink session cluster](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#session-cluster-deployments). The tutorial is based on the StreamsHub [Flink SQL Examples](https://github.com/streamshub/flink-sql-examples) repository and the code can be found under the `tutorials/user-defined-functions` and `tutorials/currency-converter` directories. @@ -32,7 +34,7 @@ The schema for this topic can be seen in the `data-generator/src/main/resources/ } ``` -However, it looks like the person who owns this schema repeated the same mistake he did for the Sales schema we looked at in the [Interactive ETL example](../interactive-etl/index.md), and decided to once again include the currency symbol at the start of the `unit_cost` field! (at least he's consistent...). +However, it looks like the person who owns this schema repeated the same mistake they did for the Sales schema we looked at in the [Interactive ETL example](../interactive-etl/index.md), and decided to once again include the currency symbol at the start of the `unit_cost` field (at least they're consistent...)! ```shell $ kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ @@ -87,19 +89,18 @@ FROM InternationalSalesRecordTable; ### Creating the Maven project -First, we will create a Maven project in our home directory, and `cd` into it: +First, we will create a blank Maven project: ```shell -cd ~ mvn archetype:generate \ - -DgroupId=com.github.example \ - -DartifactId=currency-converter \ + -DgroupId=com.github.streamshub \ + -DartifactId=flink-udf-currency-converter \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DarchetypeVersion=1.5 \ -DinteractiveMode=false -cd ~/currency-converter +cd ~/flink-udf-currency-converter ``` > Note: Flink provides a [Maven Archetype and quickstart script](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/configuration/overview/#getting-started) for getting started. However, it includes a lot of dependencies and boilerplate we don't need for this tutorial, so we will start with a minimal Maven project instead. @@ -188,7 +189,7 @@ Next, we will rename the `App` class to `CurrencyConverter` and rename the file ```shell sed -i -e 's/App/CurrencyConverter/g' src/main/java/com/github/example/App.java -mv src/main/java/com/github/example/App.java src/main/java/com/github/example/CurrencyConverter.java +mv src/main/java/com/github/example/App.java src/main/java/com/github/streamshub/CurrencyConverter.java ``` The project should still build and run successfully at this point, we can run the following commands to verify: @@ -222,7 +223,9 @@ To make our UDF, we will need to extend the [`ScalarFunction`](https://nightlies > Note: Notice how we should specify the `provided` scope, in order to exclude the dependency from our JAR. We should to do this for any core Flink API dependencies we add. Otherwise, the core Flink API dependencies in our JAR [could clash with some of our other dependency versions](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/configuration/maven/#adding-dependencies-to-the-project). -We don't need any external dependencies in our JAR (apart from Flink). But, if we did want to add some, we would need to either [shade them into an uber/fat JAR or add them to the classpath of the distribution](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/configuration/maven/#packaging-the-application). If you want to do the former, the [Flink docs provide a template](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/configuration/maven/#template-for-creating-an-uberfat-jar-with-dependencies) on how to use the [Maven Shade Plugin](https://maven.apache.org/plugins/maven-shade-plugin/index.html) to do so. +We don't need any external dependencies in our JAR (apart from Flink). +But, if we did want to add some, we would need to either [shade them into an uber/fat JAR or add them to the classpath of the distribution](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/configuration/maven/#packaging-the-application). +If you want to do the former, the [Flink docs provide a template](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/configuration/maven/#template-for-creating-an-uberfat-jar-with-dependencies) on how to use the [Maven Shade Plugin](https://maven.apache.org/plugins/maven-shade-plugin/index.html) to do so. ### Extending the `ScalarFunction` base class @@ -232,7 +235,7 @@ Let's start by making our `CurrencyConverter` class extend the `ScalarFunction` ```java // ~/currency-converter/src/main/java/com/github/example/CurrencyConverter.java -package com.github.example; +package com.github.streamshub; import org.apache.flink.table.functions.ScalarFunction; @@ -257,7 +260,8 @@ public class CurrencyConverter extends ScalarFunction { } ``` -Flink's [Automatic Type Inference](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/#automatic-type-inference) will use reflection to derive SQL data types for the argument and result of our UDF. If you want to override this behavior, you can [explicitly specify the types]((https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/#automatic-type-inference)), but in this case we will keep it simple and let Flink decide for us. +Flink's [Automatic Type Inference](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/#automatic-type-inference) will use reflection to derive SQL data types for the argument and result of our UDF. +If you want to override this behaviour, you can [explicitly specify the types]((https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/#automatic-type-inference)), but in this case we will keep it simple and let Flink decide for us. If we look at the [Data Generator](https://github.com/streamshub/flink-sql-examples/blob/main/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/examples/InternationalSalesData.java) in the StreamsHub [Flink SQL Examples](https://github.com/streamshub/flink-sql-examples) repository, we can see the possible currency symbols that can appear in the `unit_cost` field: @@ -422,7 +426,7 @@ In order to try the UDF you will need: ### Running the UDF -Since we want to try the UDF in different scenarios, we will create a container containing the Flink SQL CLI to run our queries (see the [Interactive ETL example](../interactive-etl/index.md) for details on the CLI). +In order to use our UDF we need to create a container containing it and the Flink runtime. First, we need to port forward the Flink Job Manager pod so the Flink SQL CLI can access it: @@ -482,7 +486,7 @@ AS 'com.github.example.CurrencyConverter' USING JAR '/opt/currency-converter-1.0-SNAPSHOT.jar'; ``` -> Note: Temporary catalog functions [only live as long as the current session](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/overview/#types-of-functions). You can omit the `TEMPORARY` keyword to create a catalog function that persists across sessions. +> Note: Temporary catalog functions [only live as long as the current session](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/overview/#types-of-functions). Provided you have a [Flink catalog](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/catalogs/#catalogs) deployed and configured, you can omit the `TEMPORARY` keyword to create a function that persists across sessions. > Note: This statement may succeed even if the JAR was not found or has insufficient permissions. You will likely only find this out when you try to use the UDF in a query. From ab382c52ec1715da72f9f554b39b36613e8762c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 17 Jun 2025 16:35:39 +0100 Subject: [PATCH 09/31] Create `flink.version` pom property https://github.com/streamshub/flink-sql-examples/pull/55#discussion_r2151760327 --- tutorials/currency-converter/pom.xml | 2 +- tutorials/pom.xml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tutorials/currency-converter/pom.xml b/tutorials/currency-converter/pom.xml index 13ab073..f65ea6e 100644 --- a/tutorials/currency-converter/pom.xml +++ b/tutorials/currency-converter/pom.xml @@ -20,7 +20,7 @@ org.apache.flink flink-table-api-java - 2.0.0 + ${flink.version} provided diff --git a/tutorials/pom.xml b/tutorials/pom.xml index 671e1aa..cac8f5c 100644 --- a/tutorials/pom.xml +++ b/tutorials/pom.xml @@ -18,6 +18,7 @@ 3.9.1 2.6.8.Final 1.12.0 + 2.0.0 From 0c7191c713dafd0d4ba5e187d65c77d8ef1a7d49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 17 Jun 2025 16:50:52 +0100 Subject: [PATCH 10/31] Explain what first kafka-consumer command is doing --- docs/user-defined-functions/index.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 80df25c..424cdd7 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -36,6 +36,8 @@ The schema for this topic can be seen in the `data-generator/src/main/resources/ However, it looks like the person who owns this schema repeated the same mistake they did for the Sales schema we looked at in the [Interactive ETL example](../interactive-etl/index.md), and decided to once again include the currency symbol at the start of the `unit_cost` field (at least they're consistent...)! +*(Assuming you have the data generator up and running as per the instructions in the [Setup](#setup) section, you can verify this by running the following command):* + ```shell $ kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ From 1ce56b3abf42cc883fbc93782894ff2ec86e60fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 17 Jun 2025 17:01:51 +0100 Subject: [PATCH 11/31] Move solution after problem https://github.com/streamshub/flink-sql-examples/pull/55#discussion_r2152218818 --- docs/user-defined-functions/index.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 424cdd7..113c3d1 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -50,9 +50,7 @@ user-1619&74772726277194883031 ... ``` -Instead of having to deal with [Unicode currency symbols](https://www.unicode.org/charts/nameslist/n_20A0.html) in our Flink SQL queries, we can create a simple UDF to strip the currency symbol from the `unit_cost` field and add an equivalent [ISO 4217](https://www.iso.org/iso-4217-currency-codes.html) currency code to the end of the field instead e.g. "€192" will become "192 EUR". - -We will basically be replacing a cumbersome to maintain query e.g.: +Because of that mistake, we currently have to deal with [Unicode currency symbols](https://www.unicode.org/charts/nameslist/n_20A0.html) in our Flink SQL queries, and are forced to make long and complex queries to convert the `unit_cost` field into a more usable format. ```sql // Think about how complex this query would become if the unit_cost field @@ -74,7 +72,9 @@ SELECT FROM InternationalSalesRecordTable; ``` -with this: +Instead, we can create a simple UDF to strip the currency symbol from the `unit_cost` field and add an equivalent [ISO 4217](https://www.iso.org/iso-4217-currency-codes.html) currency code to the end of the field instead e.g. "€192" will become "192 EUR". + +We will be replacing that cumbersome to maintain query above with something simple like this: ```sql SELECT From cf0e74bc0cc2dd501c7437a561147908fbf4ed66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 17 Jun 2025 17:20:42 +0100 Subject: [PATCH 12/31] Ask for currency symbols https://github.com/streamshub/flink-sql-examples/pull/55#discussion_r2152248298 --- docs/user-defined-functions/index.md | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 113c3d1..d31b984 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -265,14 +265,13 @@ public class CurrencyConverter extends ScalarFunction { Flink's [Automatic Type Inference](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/#automatic-type-inference) will use reflection to derive SQL data types for the argument and result of our UDF. If you want to override this behaviour, you can [explicitly specify the types]((https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/#automatic-type-inference)), but in this case we will keep it simple and let Flink decide for us. -If we look at the [Data Generator](https://github.com/streamshub/flink-sql-examples/blob/main/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/examples/InternationalSalesData.java) in the StreamsHub [Flink SQL Examples](https://github.com/streamshub/flink-sql-examples) repository, we can see the possible currency symbols that can appear in the `unit_cost` field: +By speaking to authors of the upstream services, we should be able to obtain a list of currency symbols that can potentially appear in the `unit_cost` field: -```java -public class InternationalSalesData implements Data { - private static final char[] CURRENCY_SYMBOLS = {'€', '₹', '₺', '฿', '₴', '₮'}; - // ... other fields and methods -} -``` +> Here is list of currency symbols that can potentially appear in the `unit_cost` field: +> +> `'€', '₹', '₺', '฿', '₴', '₮'` +> +> — authors of the upstream services In our UDF, we can create a `Map` of these symbols to their corresponding [ISO 4217](https://www.iso.org/iso-4217-currency-codes.html) currency codes. We will use these when converting the `unit_cost` field into our desired format. From d43dc83e69bc4bfb1eee6e4c8e110997c3e0670c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 17 Jun 2025 18:08:28 +0100 Subject: [PATCH 13/31] Change currency `Map` to `enum` https://github.com/streamshub/flink-sql-examples/pull/55#discussion_r2152267288 --- docs/user-defined-functions/index.md | 83 ++++++++++++------- .../flink/functions/CurrencyConverter.java | 41 +++++---- 2 files changed, 77 insertions(+), 47 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index d31b984..51a221b 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -273,26 +273,35 @@ By speaking to authors of the upstream services, we should be able to obtain a l > > — authors of the upstream services -In our UDF, we can create a `Map` of these symbols to their corresponding [ISO 4217](https://www.iso.org/iso-4217-currency-codes.html) currency codes. We will use these when converting the `unit_cost` field into our desired format. +In our UDF, we can create an `enum` that maps these symbols to their corresponding [ISO 4217](https://www.iso.org/iso-4217-currency-codes.html) currency codes. We will use these when converting the `unit_cost` field into our desired format. ```java package com.github.example; -import java.util.Map; - import org.apache.flink.table.functions.ScalarFunction; public class CurrencyConverter extends ScalarFunction { // https://www.unicode.org/charts/nameslist/n_20A0.html // https://www.iso.org/iso-4217-currency-codes.html - private static final Map CURRENCY_SYMBOL_ISO_MAP = Map.of( - '€', "EUR", - '₹', "INR", - '₺', "TRY", - '฿', "THB", - '₴', "UAH", - '₮', "MNT" - ); + enum Currency { + €("EUR"), + ₹("INR"), + ₺("TRY"), + ฿("THB"), + ₴("UAH"), + ₮("MNT"), + ERR("ERR"); + + private final String isoCode; + + Currency(String isoCode) { + this.isoCode = isoCode; + } + + public String getIsoCode() { + return isoCode; + } + } // e.g. currencyAmount = "€100" public String eval(String currencyAmount) { @@ -307,10 +316,10 @@ Now, we can begin implementing our function logic in the `eval` method. As a reminder, we want to convert a string like "€100" into "100 EUR". To do this, we can use the following steps: -1. Get the first character of the string, which is the currency symbol (e.g. '€'). +1. Get the first character of the string, which is the currency symbol (e.g. "€"). 2. Get the rest of the string, which is the amount (e.g. "100"). -3. Look up the currency symbol in our `Map` to get the corresponding currency code (e.g. '€' => "EUR"). -4. If the lookup returned `null` (currency symbol was not found in the `Map`), we can return "???" as the currency code. +3. Look up the currency symbol in our `enum` to get the corresponding currency code (e.g. "€" => "EUR"). +4. If the lookup failed (e.g. currency symbol was not found), we can return "ERR" as the currency code. 5. Concatenate the currency code to the amount, and return the result (e.g. "100 EUR"). A possible implementation could look like this: @@ -318,40 +327,50 @@ A possible implementation could look like this: ```java package com.github.example; -import java.util.Map; - import org.apache.flink.table.functions.ScalarFunction; public class CurrencyConverter extends ScalarFunction { // https://www.unicode.org/charts/nameslist/n_20A0.html // https://www.iso.org/iso-4217-currency-codes.html - private static final Map CURRENCY_SYMBOL_ISO_MAP = Map.of( - '€', "EUR", - '₹', "INR", - '₺', "TRY", - '฿', "THB", - '₴', "UAH", - '₮', "MNT" - ); + enum Currency { + €("EUR"), + ₹("INR"), + ₺("TRY"), + ฿("THB"), + ₴("UAH"), + ₮("MNT"), + ERR("ERR"); + + private final String isoCode; + + Currency(String isoCode) { + this.isoCode = isoCode; + } + + public String getIsoCode() { + return isoCode; + } + } // Value of passed field (e.g. "unit_cost") is passed in e.g. "€100" public String eval(String currencyAmount) { // 1. Get the first character of the string, which is the currency symbol (e.g. '€'). - char currencySymbol = currencyAmount.charAt(0); + String currencySymbol = currencyAmount.substring(0, 1); // 2. Get the rest of the string, which is the amount (e.g. "100"). String amount = currencyAmount.substring(1); - // 3. Look up the currency symbol in our Map to get the corresponding currency code (e.g. '€' => "EUR"). - String currencyIsoCode = CURRENCY_SYMBOL_ISO_MAP.get(currencySymbol); - - // 4. If the currency symbol is not found in the Map, we can return "???" as the currency code. - if (currencyIsoCode == null) { - currencyIsoCode = "???"; + Currency currency; + try { + // 3. Look up the currency symbol in our enum to get the corresponding currency code (e.g. "€" => "EUR"). + currency = Currency.valueOf(currencySymbol); + } catch (Exception e) { + // 4. If the lookup failed (e.g. currency symbol was not found), we can return "ERR" as the currency code. + currency = Currency.ERR; // e.g. ">" => "ERR" } // 5. Concatenate the currency code to the amount, and return the result (e.g. "100 EUR"). - return amount + " " + currencyIsoCode; + return amount + " " + currency.getIsoCode(); // e.g. "100 EUR" } } ``` diff --git a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java index 400f57f..b021278 100644 --- a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java +++ b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java @@ -1,31 +1,42 @@ package com.github.streamshub.flink.functions; -import java.util.Map; - import org.apache.flink.table.functions.ScalarFunction; public class CurrencyConverter extends ScalarFunction { // https://www.unicode.org/charts/nameslist/n_20A0.html // https://www.iso.org/iso-4217-currency-codes.html - private static final Map CURRENCY_SYMBOL_ISO_MAP = Map.of( - '€', "EUR", - '₹', "INR", - '₺', "TRY", - '฿', "THB", - '₴', "UAH", - '₮', "MNT" - ); + enum Currency { + €("EUR"), + ₹("INR"), + ₺("TRY"), + ฿("THB"), + ₴("UAH"), + ₮("MNT"), + ERR("ERR"); + + private final String isoCode; + + Currency(String isoCode) { + this.isoCode = isoCode; + } + + public String getIsoCode() { + return isoCode; + } + } // e.g. currencyAmount = "€100" public String eval(String currencyAmount) { - char currencySymbol = currencyAmount.charAt(0); // e.g. '€' + String currencySymbol = currencyAmount.substring(0, 1); // e.g. "€" String amount = currencyAmount.substring(1); // e.g. "100" - String currencyIsoCode = CURRENCY_SYMBOL_ISO_MAP.get(currencySymbol); // e.g. '€' => "EUR" - if (currencyIsoCode == null) { - currencyIsoCode = "???"; + Currency currency; + try { + currency = Currency.valueOf(currencySymbol); // e.g. "€" => "EUR" + } catch (Exception e) { + currency = Currency.ERR; // e.g. ">" => "ERR" } - return amount + " " + currencyIsoCode; // e.g. "100 EUR" + return amount + " " + currency.getIsoCode(); // e.g. "100 EUR" } } From 96ece59a94fa8ea7ac1e74c6b49a867a754f3cfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 17 Jun 2025 18:15:21 +0100 Subject: [PATCH 14/31] Don't show `pom.xml` https://github.com/streamshub/flink-sql-examples/pull/55#discussion_r2152431248 --- docs/user-defined-functions/index.md | 67 ---------------------------- 1 file changed, 67 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 51a221b..4784b46 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -117,73 +117,6 @@ sed -i -e '//,/<\/dependencyManagement>/d' pom.xml sed -i -e '//,/<\/dependencies>/d' pom.xml ``` -Your `pom.xml` should now look something like this: - -```xml - - - 4.0.0 - - com.github.example - currency-converter - 1.0-SNAPSHOT - - currency-converter - - - UTF-8 - 17 - - - - - - - - maven-clean-plugin - 3.4.0 - - - - maven-resources-plugin - 3.3.1 - - - maven-compiler-plugin - 3.13.0 - - - maven-surefire-plugin - 3.3.0 - - - maven-jar-plugin - 3.4.2 - - - maven-install-plugin - 3.1.2 - - - maven-deploy-plugin - 3.1.2 - - - - maven-site-plugin - 3.12.1 - - - maven-project-info-reports-plugin - 3.6.1 - - - - - -``` - ### Renaming the `App` class Next, we will rename the `App` class to `CurrencyConverter` and rename the file accordingly: From adb7c8014587037536f56e127d65b0c299b2301e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 17 Jun 2025 18:29:33 +0100 Subject: [PATCH 15/31] Don't check Maven artefacts https://github.com/streamshub/flink-sql-examples/pull/55#discussion_r2152298730 --- docs/user-defined-functions/index.md | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 4784b46..32a85b8 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -316,25 +316,7 @@ After implementing the logic, we can build our JAR: mvn clean package ``` -There should be no compilation errors, and there should be a JAR in the `target` directory. - -```shell -$ ls -lh ~/currency-converter/target/currency-converter-1.0-SNAPSHOT.jar --rw-r--r--. 1 3.6K Jun 11 16:23 /home//currency-converter/target/currency-converter-1.0-SNAPSHOT.jar -``` - -If you want be extra sure everything is there, you can check if the `CurrencyConverter` class is in the JAR and if running the JAR fails as expected: - -```shell -$ jar tf ~/currency-converter/target/currency-converter-1.0-SNAPSHOT.jar | grep "CurrencyConverter" -com/github/example/CurrencyConverter.class - -$ java -cp ~/currency-converter/target/currency-converter-1.0-SNAPSHOT.jar com.github.example.CurrencyConverter -Error: Could not find or load main class com.github.example.CurrencyConverter -Caused by: java.lang.NoClassDefFoundError: org/apache/flink/table/functions/ScalarFunction -``` - -We can now try out our new UDF! +Assuming there are no compilation errors, we can now try out our new UDF! ## Using the User Defined Function From 4c9829d5a5f85fe7883227c4b6b0db74386ebf77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 17 Jun 2025 18:52:57 +0100 Subject: [PATCH 16/31] Replace all `example` with `streamshub` --- docs/user-defined-functions/index.md | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 32a85b8..c332837 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -122,9 +122,9 @@ sed -i -e '//,/<\/dependencies>/d' pom.xml Next, we will rename the `App` class to `CurrencyConverter` and rename the file accordingly: ```shell -sed -i -e 's/App/CurrencyConverter/g' src/main/java/com/github/example/App.java +sed -i -e 's/App/CurrencyConverter/g' src/main/java/com/github/streamshub/App.java -mv src/main/java/com/github/example/App.java src/main/java/com/github/streamshub/CurrencyConverter.java +mv src/main/java/com/github/streamshub/App.java src/main/java/com/github/streamshub/CurrencyConverter.java ``` The project should still build and run successfully at this point, we can run the following commands to verify: @@ -132,7 +132,7 @@ The project should still build and run successfully at this point, we can run th ```shell mvn clean package -java -cp target/currency-converter-1.0-SNAPSHOT.jar com.github.example.CurrencyConverter +java -cp target/currency-converter-1.0-SNAPSHOT.jar com.github.streamshub.CurrencyConverter # Should print "Hello World!" ``` @@ -169,7 +169,7 @@ Now that we have added the only dependency we need, we can implement our `Curren Let's start by making our `CurrencyConverter` class extend the `ScalarFunction` base class. We can also remove the `main` method since we won't need it: ```java -// ~/currency-converter/src/main/java/com/github/example/CurrencyConverter.java +// ~/currency-converter/src/main/java/com/github/streamshub/CurrencyConverter.java package com.github.streamshub; import org.apache.flink.table.functions.ScalarFunction; @@ -182,7 +182,7 @@ This function doesn't do anything yet. For that, we need it to declare a public Since we'll only be passing it one argument (the `unit_cost` field), we can declare that the method takes in a single `String` argument and also returns a `String`: ```java -package com.github.example; +package com.github.streamshub; import org.apache.flink.table.functions.ScalarFunction; @@ -209,7 +209,7 @@ By speaking to authors of the upstream services, we should be able to obtain a l In our UDF, we can create an `enum` that maps these symbols to their corresponding [ISO 4217](https://www.iso.org/iso-4217-currency-codes.html) currency codes. We will use these when converting the `unit_cost` field into our desired format. ```java -package com.github.example; +package com.github.streamshub; import org.apache.flink.table.functions.ScalarFunction; @@ -258,7 +258,7 @@ As a reminder, we want to convert a string like "€100" into "100 EUR". To do t A possible implementation could look like this: ```java -package com.github.example; +package com.github.streamshub; import org.apache.flink.table.functions.ScalarFunction; @@ -417,7 +417,7 @@ If that worked, we can now register our UDF as a [temporary catalog function](ht ```sql CREATE TEMPORARY FUNCTION currency_convert -AS 'com.github.example.CurrencyConverter' +AS 'com.github.streamshub.CurrencyConverter' USING JAR '/opt/currency-converter-1.0-SNAPSHOT.jar'; ``` From 94563f7179c661e0ca3d46186cc9f814298ee541 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 18 Jun 2025 09:38:29 +0100 Subject: [PATCH 17/31] Move separator to enum https://github.com/streamshub/flink-sql-examples/pull/55#discussion_r2152294592 --- docs/user-defined-functions/index.md | 10 ++++++++-- .../streamshub/flink/functions/CurrencyConverter.java | 8 +++++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index c332837..a988d0f 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -265,7 +265,7 @@ import org.apache.flink.table.functions.ScalarFunction; public class CurrencyConverter extends ScalarFunction { // https://www.unicode.org/charts/nameslist/n_20A0.html // https://www.iso.org/iso-4217-currency-codes.html - enum Currency { + public enum Currency { €("EUR"), ₹("INR"), ₺("TRY"), @@ -274,6 +274,8 @@ public class CurrencyConverter extends ScalarFunction { ₮("MNT"), ERR("ERR"); + public static final String SEPARATOR = " "; + private final String isoCode; Currency(String isoCode) { @@ -283,6 +285,10 @@ public class CurrencyConverter extends ScalarFunction { public String getIsoCode() { return isoCode; } + + public String concatToAmount(String amount) { + return amount + SEPARATOR + isoCode; + } } // Value of passed field (e.g. "unit_cost") is passed in e.g. "€100" @@ -303,7 +309,7 @@ public class CurrencyConverter extends ScalarFunction { } // 5. Concatenate the currency code to the amount, and return the result (e.g. "100 EUR"). - return amount + " " + currency.getIsoCode(); // e.g. "100 EUR" + return currency.concatToAmount(amount); // e.g. "100 EUR" } } ``` diff --git a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java index b021278..61a18c7 100644 --- a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java +++ b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java @@ -14,6 +14,8 @@ enum Currency { ₮("MNT"), ERR("ERR"); + public static final String SEPARATOR = " "; + private final String isoCode; Currency(String isoCode) { @@ -23,6 +25,10 @@ enum Currency { public String getIsoCode() { return isoCode; } + + public String concatToAmount(String amount) { + return amount + SEPARATOR + isoCode; + } } // e.g. currencyAmount = "€100" @@ -37,6 +43,6 @@ public String eval(String currencyAmount) { currency = Currency.ERR; // e.g. ">" => "ERR" } - return amount + " " + currency.getIsoCode(); // e.g. "100 EUR" + return currency.concatToAmount(amount); // e.g. "100 EUR" } } From 676072cfdad1c2d0aaf6d97200beea8d28a82e65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 18 Jun 2025 10:03:43 +0100 Subject: [PATCH 18/31] Move parsing to `Currency.fromCurrencyAmount()` https://github.com/streamshub/flink-sql-examples/pull/55#discussion_r2152294592 --- docs/user-defined-functions/index.md | 35 ++++++++++--------- .../flink/functions/CurrencyConverter.java | 21 ++++++----- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index a988d0f..80ab03f 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -250,9 +250,9 @@ Now, we can begin implementing our function logic in the `eval` method. As a reminder, we want to convert a string like "€100" into "100 EUR". To do this, we can use the following steps: 1. Get the first character of the string, which is the currency symbol (e.g. "€"). -2. Get the rest of the string, which is the amount (e.g. "100"). -3. Look up the currency symbol in our `enum` to get the corresponding currency code (e.g. "€" => "EUR"). -4. If the lookup failed (e.g. currency symbol was not found), we can return "ERR" as the currency code. +2. Look up the currency symbol in our `enum` to get the corresponding currency code (e.g. "€" => "EUR"). +3. If the lookup failed (e.g. currency symbol was not found), we can return "ERR" as the currency code. +4. Get the rest of the string, which is the amount (e.g. "100"). 5. Concatenate the currency code to the amount, and return the result (e.g. "100 EUR"). A possible implementation could look like this: @@ -286,6 +286,19 @@ public class CurrencyConverter extends ScalarFunction { return isoCode; } + public static Currency fromCurrencyAmount(String currencyAmount) { + // 1. Get the first character of the string, which is the currency symbol (e.g. '€'). + String currencySymbol = currencyAmount.substring(0, 1); + + // 2. Look up the currency symbol in our enum to get the corresponding currency code (e.g. "€" => "EUR"). + try { + return Currency.valueOf(currencySymbol); + } catch (Exception e) { + // 3. If the lookup failed (e.g. currency symbol was not found), we can return "ERR" as the currency code (e.g. ">" => "ERR"). + return Currency.ERR; + } + } + public String concatToAmount(String amount) { return amount + SEPARATOR + isoCode; } @@ -293,23 +306,13 @@ public class CurrencyConverter extends ScalarFunction { // Value of passed field (e.g. "unit_cost") is passed in e.g. "€100" public String eval(String currencyAmount) { - // 1. Get the first character of the string, which is the currency symbol (e.g. '€'). - String currencySymbol = currencyAmount.substring(0, 1); + String currencySymbol = Currency.fromCurrencyAmount(currencyAmount); - // 2. Get the rest of the string, which is the amount (e.g. "100"). + // 4. Get the rest of the string, which is the amount (e.g. "100"). String amount = currencyAmount.substring(1); - Currency currency; - try { - // 3. Look up the currency symbol in our enum to get the corresponding currency code (e.g. "€" => "EUR"). - currency = Currency.valueOf(currencySymbol); - } catch (Exception e) { - // 4. If the lookup failed (e.g. currency symbol was not found), we can return "ERR" as the currency code. - currency = Currency.ERR; // e.g. ">" => "ERR" - } - // 5. Concatenate the currency code to the amount, and return the result (e.g. "100 EUR"). - return currency.concatToAmount(amount); // e.g. "100 EUR" + return currency.concatToAmount(amount); } } ``` diff --git a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java index 61a18c7..072c557 100644 --- a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java +++ b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java @@ -5,7 +5,7 @@ public class CurrencyConverter extends ScalarFunction { // https://www.unicode.org/charts/nameslist/n_20A0.html // https://www.iso.org/iso-4217-currency-codes.html - enum Currency { + public enum Currency { €("EUR"), ₹("INR"), ₺("TRY"), @@ -26,6 +26,15 @@ public String getIsoCode() { return isoCode; } + public static Currency fromCurrencyAmount(String currencyAmount) { + String currencySymbol = currencyAmount.substring(0, 1); // e.g. '€' + try { + return Currency.valueOf(currencySymbol); // e.g. "€" => "EUR" + } catch (Exception e) { + return Currency.ERR; // e.g. ">" => "ERR" + } + } + public String concatToAmount(String amount) { return amount + SEPARATOR + isoCode; } @@ -33,15 +42,9 @@ public String concatToAmount(String amount) { // e.g. currencyAmount = "€100" public String eval(String currencyAmount) { - String currencySymbol = currencyAmount.substring(0, 1); // e.g. "€" - String amount = currencyAmount.substring(1); // e.g. "100" + Currency currency = Currency.fromCurrencyAmount(currencyAmount); - Currency currency; - try { - currency = Currency.valueOf(currencySymbol); // e.g. "€" => "EUR" - } catch (Exception e) { - currency = Currency.ERR; // e.g. ">" => "ERR" - } + String amount = currencyAmount.substring(1); // e.g. "100" return currency.concatToAmount(amount); // e.g. "100 EUR" } From 72d5457c8c2a09bce670093814f038f99ca805b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 18 Jun 2025 11:37:00 +0100 Subject: [PATCH 19/31] Combine `enum` and `Map` approaches --- .../flink/functions/CurrencyConverter.java | 34 +++++++++++++------ 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java index 072c557..2087418 100644 --- a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java +++ b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java @@ -1,27 +1,39 @@ package com.github.streamshub.flink.functions; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.apache.flink.table.functions.ScalarFunction; public class CurrencyConverter extends ScalarFunction { // https://www.unicode.org/charts/nameslist/n_20A0.html // https://www.iso.org/iso-4217-currency-codes.html public enum Currency { - €("EUR"), - ₹("INR"), - ₺("TRY"), - ฿("THB"), - ₴("UAH"), - ₮("MNT"), - ERR("ERR"); + EUR("€", "EUR"), + INR("₹", "INR"), + TRY("₺", "TRY"), + THB("฿", "THB"), + UAH("₴", "UAH"), + MNT("₮", "MNT"), + ERR("?", "ERR"); public static final String SEPARATOR = " "; + private static final Map SYMBOL_TO_CURRENCY = Stream.of(Currency.values()) + .collect(Collectors.toMap(Currency::getSymbol, c -> c)); + private final String symbol; private final String isoCode; - Currency(String isoCode) { + Currency(String symbol, String isoCode) { + this.symbol = symbol; this.isoCode = isoCode; } + public String getSymbol() { + return symbol; + } + public String getIsoCode() { return isoCode; } @@ -29,9 +41,9 @@ public String getIsoCode() { public static Currency fromCurrencyAmount(String currencyAmount) { String currencySymbol = currencyAmount.substring(0, 1); // e.g. '€' try { - return Currency.valueOf(currencySymbol); // e.g. "€" => "EUR" + return SYMBOL_TO_CURRENCY.getOrDefault(currencySymbol, ERR); } catch (Exception e) { - return Currency.ERR; // e.g. ">" => "ERR" + return ERR; } } @@ -46,6 +58,6 @@ public String eval(String currencyAmount) { String amount = currencyAmount.substring(1); // e.g. "100" - return currency.concatToAmount(amount); // e.g. "100 EUR" + return currency.concatToAmount(amount); // e.g. "100 EUR" } } From 3b1d36ae82e3dac21100836857afc545c0cee12f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 18 Jun 2025 12:02:48 +0100 Subject: [PATCH 20/31] Move all parsing logic to `enum` --- .../flink/functions/CurrencyConverter.java | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java index 2087418..ee456e7 100644 --- a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java +++ b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java @@ -38,26 +38,31 @@ public String getIsoCode() { return isoCode; } - public static Currency fromCurrencyAmount(String currencyAmount) { - String currencySymbol = currencyAmount.substring(0, 1); // e.g. '€' + public static Currency fromUnicodeAmount(String unicodeAmount) { + String currencySymbol = unicodeAmount.substring(0, 1); // "€100" -> "€" try { - return SYMBOL_TO_CURRENCY.getOrDefault(currencySymbol, ERR); + return SYMBOL_TO_CURRENCY.getOrDefault(currencySymbol, ERR); // "€100" -> EUR } catch (Exception e) { - return ERR; + return ERR; // "]100" -> ERR } } - public String concatToAmount(String amount) { - return amount + SEPARATOR + isoCode; + public String concatIsoCodeToAmount(String amount) { + return amount + SEPARATOR + isoCode; // "100" + EUR -> "100 EUR" } - } - // e.g. currencyAmount = "€100" - public String eval(String currencyAmount) { - Currency currency = Currency.fromCurrencyAmount(currencyAmount); + public static String unicodeAmountToIsoAmount(String unicodeAmount) { + String trimmedUnicodeAmount = unicodeAmount.trim(); + + Currency currency = fromUnicodeAmount(trimmedUnicodeAmount); // "€100" -> EUR + String amount = trimmedUnicodeAmount.substring(1); // "€100" -> "100" - String amount = currencyAmount.substring(1); // e.g. "100" + return currency.concatIsoCodeToAmount(amount); // "100" + EUR -> "100 EUR" + } + } - return currency.concatToAmount(amount); // e.g. "100 EUR" + // e.g. unicodeAmount = "€100" + public String eval(String unicodeAmount) { + return Currency.unicodeAmountToIsoAmount(unicodeAmount); // "€100" -> "100 EUR" } } From 8c3c961d2796945e0ca2b05a0d0c1c58f115ed63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 18 Jun 2025 12:06:49 +0100 Subject: [PATCH 21/31] Extract `enum` to separate file --- .../streamshub/flink/enums/Currency.java | 59 ++++++++++++++++++ .../flink/functions/CurrencyConverter.java | 60 +------------------ 2 files changed, 61 insertions(+), 58 deletions(-) create mode 100644 tutorials/currency-converter/src/main/java/com/github/streamshub/flink/enums/Currency.java diff --git a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/enums/Currency.java b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/enums/Currency.java new file mode 100644 index 0000000..afb4ea6 --- /dev/null +++ b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/enums/Currency.java @@ -0,0 +1,59 @@ +package com.github.streamshub.flink.enums; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +// https://www.unicode.org/charts/nameslist/n_20A0.html +// https://www.iso.org/iso-4217-currency-codes.html +public enum Currency { + EUR("€", "EUR"), + INR("₹", "INR"), + TRY("₺", "TRY"), + THB("฿", "THB"), + UAH("₴", "UAH"), + MNT("₮", "MNT"), + ERR("?", "ERR"); + + public static final String SEPARATOR = " "; + private static final Map SYMBOL_TO_CURRENCY = Stream.of(Currency.values()) + .collect(Collectors.toMap(Currency::getSymbol, c -> c)); + + private final String symbol; + private final String isoCode; + + Currency(String symbol, String isoCode) { + this.symbol = symbol; + this.isoCode = isoCode; + } + + public String getSymbol() { + return symbol; + } + + public String getIsoCode() { + return isoCode; + } + + public static Currency fromUnicodeAmount(String unicodeAmount) { + String currencySymbol = unicodeAmount.substring(0, 1); // "€100" -> "€" + try { + return SYMBOL_TO_CURRENCY.getOrDefault(currencySymbol, ERR); // "€100" -> EUR + } catch (Exception e) { + return ERR; // "]100" -> ERR + } + } + + public String concatIsoCodeToAmount(String amount) { + return amount + SEPARATOR + isoCode; // "100" + EUR -> "100 EUR" + } + + public static String unicodeAmountToIsoAmount(String unicodeAmount) { + String trimmedUnicodeAmount = unicodeAmount.trim(); + + Currency currency = fromUnicodeAmount(trimmedUnicodeAmount); // "€100" -> EUR + String amount = trimmedUnicodeAmount.substring(1); // "€100" -> "100" + + return currency.concatIsoCodeToAmount(amount); // "100" + EUR -> "100 EUR" + } +} diff --git a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java index ee456e7..7d815a1 100644 --- a/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java +++ b/tutorials/currency-converter/src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java @@ -1,66 +1,10 @@ package com.github.streamshub.flink.functions; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; - import org.apache.flink.table.functions.ScalarFunction; -public class CurrencyConverter extends ScalarFunction { - // https://www.unicode.org/charts/nameslist/n_20A0.html - // https://www.iso.org/iso-4217-currency-codes.html - public enum Currency { - EUR("€", "EUR"), - INR("₹", "INR"), - TRY("₺", "TRY"), - THB("฿", "THB"), - UAH("₴", "UAH"), - MNT("₮", "MNT"), - ERR("?", "ERR"); - - public static final String SEPARATOR = " "; - private static final Map SYMBOL_TO_CURRENCY = Stream.of(Currency.values()) - .collect(Collectors.toMap(Currency::getSymbol, c -> c)); - - private final String symbol; - private final String isoCode; - - Currency(String symbol, String isoCode) { - this.symbol = symbol; - this.isoCode = isoCode; - } - - public String getSymbol() { - return symbol; - } - - public String getIsoCode() { - return isoCode; - } - - public static Currency fromUnicodeAmount(String unicodeAmount) { - String currencySymbol = unicodeAmount.substring(0, 1); // "€100" -> "€" - try { - return SYMBOL_TO_CURRENCY.getOrDefault(currencySymbol, ERR); // "€100" -> EUR - } catch (Exception e) { - return ERR; // "]100" -> ERR - } - } - - public String concatIsoCodeToAmount(String amount) { - return amount + SEPARATOR + isoCode; // "100" + EUR -> "100 EUR" - } - - public static String unicodeAmountToIsoAmount(String unicodeAmount) { - String trimmedUnicodeAmount = unicodeAmount.trim(); - - Currency currency = fromUnicodeAmount(trimmedUnicodeAmount); // "€100" -> EUR - String amount = trimmedUnicodeAmount.substring(1); // "€100" -> "100" - - return currency.concatIsoCodeToAmount(amount); // "100" + EUR -> "100 EUR" - } - } +import com.github.streamshub.flink.enums.Currency; +public class CurrencyConverter extends ScalarFunction { // e.g. unicodeAmount = "€100" public String eval(String unicodeAmount) { return Currency.unicodeAmountToIsoAmount(unicodeAmount); // "€100" -> "100 EUR" From 54d5e90e2bc9cb726a41ac1adcb70c6e9e717d63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 18 Jun 2025 12:24:28 +0100 Subject: [PATCH 22/31] Update tutorial with new code --- docs/user-defined-functions/index.md | 163 ++++++++++++--------------- 1 file changed, 71 insertions(+), 92 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 80ab03f..65ac17d 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -169,7 +169,6 @@ Now that we have added the only dependency we need, we can implement our `Curren Let's start by making our `CurrencyConverter` class extend the `ScalarFunction` base class. We can also remove the `main` method since we won't need it: ```java -// ~/currency-converter/src/main/java/com/github/streamshub/CurrencyConverter.java package com.github.streamshub; import org.apache.flink.table.functions.ScalarFunction; @@ -188,8 +187,8 @@ import org.apache.flink.table.functions.ScalarFunction; public class CurrencyConverter extends ScalarFunction { // (You can name the parameter whatever you like) - // e.g. currencyAmount = "€100" - public String eval(String currencyAmount) { + // e.g. unicodeAmount = "€100" + public String eval(String unicodeAmount) { // logic will go here } } @@ -198,6 +197,8 @@ public class CurrencyConverter extends ScalarFunction { Flink's [Automatic Type Inference](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/#automatic-type-inference) will use reflection to derive SQL data types for the argument and result of our UDF. If you want to override this behaviour, you can [explicitly specify the types]((https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/#automatic-type-inference)), but in this case we will keep it simple and let Flink decide for us. +### Implementing the logic + By speaking to authors of the upstream services, we should be able to obtain a list of currency symbols that can potentially appear in the `unit_cost` field: > Here is list of currency symbols that can potentially appear in the `unit_cost` field: @@ -206,46 +207,7 @@ By speaking to authors of the upstream services, we should be able to obtain a l > > — authors of the upstream services -In our UDF, we can create an `enum` that maps these symbols to their corresponding [ISO 4217](https://www.iso.org/iso-4217-currency-codes.html) currency codes. We will use these when converting the `unit_cost` field into our desired format. - -```java -package com.github.streamshub; - -import org.apache.flink.table.functions.ScalarFunction; - -public class CurrencyConverter extends ScalarFunction { - // https://www.unicode.org/charts/nameslist/n_20A0.html - // https://www.iso.org/iso-4217-currency-codes.html - enum Currency { - €("EUR"), - ₹("INR"), - ₺("TRY"), - ฿("THB"), - ₴("UAH"), - ₮("MNT"), - ERR("ERR"); - - private final String isoCode; - - Currency(String isoCode) { - this.isoCode = isoCode; - } - - public String getIsoCode() { - return isoCode; - } - } - - // e.g. currencyAmount = "€100" - public String eval(String currencyAmount) { - // logic will go here - } -} -``` - -### Implementing the function logic - -Now, we can begin implementing our function logic in the `eval` method. +We can create an `enum` that maps these symbols to their corresponding [ISO 4217](https://www.iso.org/iso-4217-currency-codes.html) currency codes. As a reminder, we want to convert a string like "€100" into "100 EUR". To do this, we can use the following steps: @@ -260,59 +222,76 @@ A possible implementation could look like this: ```java package com.github.streamshub; -import org.apache.flink.table.functions.ScalarFunction; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +// https://www.unicode.org/charts/nameslist/n_20A0.html +// https://www.iso.org/iso-4217-currency-codes.html +public enum Currency { + EUR("€", "EUR"), + INR("₹", "INR"), + TRY("₺", "TRY"), + THB("฿", "THB"), + UAH("₴", "UAH"), + MNT("₮", "MNT"), + ERR("?", "ERR"); + + public static final String SEPARATOR = " "; + private static final Map SYMBOL_TO_CURRENCY = Stream.of(Currency.values()) + .collect(Collectors.toMap(Currency::getSymbol, c -> c)); + + private final String symbol; + private final String isoCode; + + Currency(String symbol, String isoCode) { + this.symbol = symbol; + this.isoCode = isoCode; + } + + public String getSymbol() { + return symbol; + } + + public String getIsoCode() { + return isoCode; + } + + public static Currency fromUnicodeAmount(String unicodeAmount) { + String currencySymbol = unicodeAmount.substring(0, 1); // "€100" -> "€" + try { + return SYMBOL_TO_CURRENCY.getOrDefault(currencySymbol, ERR); // "€100" -> EUR + } catch (Exception e) { + return ERR; // "]100" -> ERR + } + } + + public String concatIsoCodeToAmount(String amount) { + return amount + SEPARATOR + isoCode; // "100" + EUR -> "100 EUR" + } + + public static String unicodeAmountToIsoAmount(String unicodeAmount) { + String trimmedUnicodeAmount = unicodeAmount.trim(); + + Currency currency = fromUnicodeAmount(trimmedUnicodeAmount); // "€100" -> EUR + String amount = trimmedUnicodeAmount.substring(1); // "€100" -> "100" + + return currency.concatIsoCodeToAmount(amount); // "100" + EUR -> "100 EUR" + } +} +``` -public class CurrencyConverter extends ScalarFunction { - // https://www.unicode.org/charts/nameslist/n_20A0.html - // https://www.iso.org/iso-4217-currency-codes.html - public enum Currency { - €("EUR"), - ₹("INR"), - ₺("TRY"), - ฿("THB"), - ₴("UAH"), - ₮("MNT"), - ERR("ERR"); - - public static final String SEPARATOR = " "; - - private final String isoCode; - - Currency(String isoCode) { - this.isoCode = isoCode; - } - - public String getIsoCode() { - return isoCode; - } - - public static Currency fromCurrencyAmount(String currencyAmount) { - // 1. Get the first character of the string, which is the currency symbol (e.g. '€'). - String currencySymbol = currencyAmount.substring(0, 1); - - // 2. Look up the currency symbol in our enum to get the corresponding currency code (e.g. "€" => "EUR"). - try { - return Currency.valueOf(currencySymbol); - } catch (Exception e) { - // 3. If the lookup failed (e.g. currency symbol was not found), we can return "ERR" as the currency code (e.g. ">" => "ERR"). - return Currency.ERR; - } - } - - public String concatToAmount(String amount) { - return amount + SEPARATOR + isoCode; - } - } +We can then use this `enum` in the `eval` method of our UDF: - // Value of passed field (e.g. "unit_cost") is passed in e.g. "€100" - public String eval(String currencyAmount) { - String currencySymbol = Currency.fromCurrencyAmount(currencyAmount); +```java +package com.github.streamshub; - // 4. Get the rest of the string, which is the amount (e.g. "100"). - String amount = currencyAmount.substring(1); +import org.apache.flink.table.functions.ScalarFunction; - // 5. Concatenate the currency code to the amount, and return the result (e.g. "100 EUR"). - return currency.concatToAmount(amount); +public class CurrencyConverter extends ScalarFunction { + // e.g. unicodeAmount = "€100" + public String eval(String unicodeAmount) { + return Currency.unicodeAmountToIsoAmount(unicodeAmount); // "€100" -> "100 EUR" } } ``` From 52ff05d53bd8b4017754a89c22241c08842513a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 18 Jun 2025 12:32:09 +0100 Subject: [PATCH 23/31] Use latest flink docs everywhere https://github.com/streamshub/flink-sql-examples/pull/55#discussion_r2154156007 --- docs/user-defined-functions/index.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 65ac17d..1f406a8 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -2,9 +2,9 @@ title = 'Simple User Defined Functions' +++ -> Note: This tutorial is mainly focused on creating a simple [Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/overview/) [User Defined Function (UDF)](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/). For detailed information on working with [Flink ETL Jobs](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/learn-flink/etl/) and [Session Clusters](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#session-cluster-deployments), look at the [Interactive ETL example](../interactive-etl/index.md). +> Note: This tutorial is mainly focused on creating a simple [Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/overview/) [User Defined Function (UDF)](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/udfs/). For detailed information on working with [Flink ETL Jobs](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/learn-flink/etl/) and [Session Clusters](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#session-cluster-deployments), look at the [Interactive ETL example](../interactive-etl/index.md). -[Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/overview/) is a powerful tool for data exploration, manipulation and inter-connection. +[Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/overview/) is a powerful tool for data exploration, manipulation and inter-connection. Flink SQL has many [built-in functions](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/systemfunctions/#system-built-in-functions), that allow you to extract and manipulate data from the many sources that Flink supports. However, sometimes you need to be able to do operations not covered by these built-in functions. In that situation Flink gives you the option of creating your own functions. From 4f31fbb2df996cf2a3d47137dbcc682589fc2e14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 18 Jun 2025 14:05:42 +0100 Subject: [PATCH 24/31] Add UDF tests https://github.com/streamshub/flink-sql-examples/pull/55#discussion_r2154159659 --- docs/user-defined-functions/index.md | 55 ++++++++++++++----- tutorials/currency-converter/pom.xml | 23 ++++++++ .../functions/CurrencyConverterTest.java | 29 ++++++++++ 3 files changed, 93 insertions(+), 14 deletions(-) create mode 100644 tutorials/currency-converter/src/test/java/com/github/streamshub/flink/functions/CurrencyConverterTest.java diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 1f406a8..cc2ac3f 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -107,24 +107,16 @@ cd ~/flink-udf-currency-converter > Note: Flink provides a [Maven Archetype and quickstart script](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/configuration/overview/#getting-started) for getting started. However, it includes a lot of dependencies and boilerplate we don't need for this tutorial, so we will start with a minimal Maven project instead. -We can remove the provided tests and their dependencies, as we don't need them for this tutorial: +### Renaming `App` and `AppTest` -```shell -rm -r src/test - -sed -i -e '//,/<\/dependencyManagement>/d' pom.xml - -sed -i -e '//,/<\/dependencies>/d' pom.xml -``` - -### Renaming the `App` class - -Next, we will rename the `App` class to `CurrencyConverter` and rename the file accordingly: +Next, we will rename the `App` and `AppTest` classes to `CurrencyConverter` and `CurrencyConverterTest` respectively. We will also rename the files accordingly: ```shell sed -i -e 's/App/CurrencyConverter/g' src/main/java/com/github/streamshub/App.java +sed -i -e 's/AppTest/CurrencyConverterTest/g' src/test/java/com/github/streamshub/AppTest.java mv src/main/java/com/github/streamshub/App.java src/main/java/com/github/streamshub/CurrencyConverter.java +mv src/test/java/com/github/streamshub/AppTest.java src/test/java/com/github/streamshub/CurrencyConverterTest.java ``` The project should still build and run successfully at this point, we can run the following commands to verify: @@ -145,7 +137,6 @@ To make our UDF, we will need to extend the [`ScalarFunction`](https://nightlies ```xml currency-converter - org.apache.flink @@ -296,6 +287,42 @@ public class CurrencyConverter extends ScalarFunction { } ``` +### Testing (optional) + +If we want to, we can modify `CurrencyConverterTest` to verify the UDF works as expected: + +```java +package com.github.streamshub; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +public class CurrencyConverterTest { + public static final String VALID_UNICODE_AMOUNT = " €100 "; + public static final String VALID_ISO_AMOUNT = "100" + Currency.SEPARATOR + Currency.EUR.getIsoCode(); + + public static final String INVALID_UNICODE_AMOUNT = " ]100 "; + public static final String INVALID_ISO_AMOUNT = "100" + Currency.SEPARATOR + Currency.ERR.getIsoCode(); + + @Test + public void shouldConvertValidUnicodeAmount() throws Exception { + CurrencyConverter currencyConverter = new CurrencyConverter(); + + assertEquals(VALID_ISO_AMOUNT, currencyConverter.eval(VALID_UNICODE_AMOUNT)); + } + + @Test + public void shouldConvertInvalidUnicodeAmount() throws Exception { + CurrencyConverter currencyConverter = new CurrencyConverter(); + + assertEquals(INVALID_ISO_AMOUNT, currencyConverter.eval(INVALID_UNICODE_AMOUNT)); + } +} +``` + +> Note: Since our UDF is simple and stateless, we can test its methods directly. If we would've made use of managed state or timers (e.g. for watermarks) we probably would've had to use [test harnesses](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators). + ### Building the JAR After implementing the logic, we can build our JAR: @@ -304,7 +331,7 @@ After implementing the logic, we can build our JAR: mvn clean package ``` -Assuming there are no compilation errors, we can now try out our new UDF! +Assuming there are no errors, we can now try out our new UDF! ## Using the User Defined Function diff --git a/tutorials/currency-converter/pom.xml b/tutorials/currency-converter/pom.xml index f65ea6e..bcaef69 100644 --- a/tutorials/currency-converter/pom.xml +++ b/tutorials/currency-converter/pom.xml @@ -16,6 +16,18 @@ 17 + + + + org.junit + junit-bom + 5.11.0 + pom + import + + + + org.apache.flink @@ -23,6 +35,17 @@ ${flink.version} provided + + org.junit.jupiter + junit-jupiter-api + test + + + + org.junit.jupiter + junit-jupiter-params + test + diff --git a/tutorials/currency-converter/src/test/java/com/github/streamshub/flink/functions/CurrencyConverterTest.java b/tutorials/currency-converter/src/test/java/com/github/streamshub/flink/functions/CurrencyConverterTest.java new file mode 100644 index 0000000..acc57af --- /dev/null +++ b/tutorials/currency-converter/src/test/java/com/github/streamshub/flink/functions/CurrencyConverterTest.java @@ -0,0 +1,29 @@ +package com.github.streamshub.flink.functions; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +import com.github.streamshub.flink.enums.Currency; + +public class CurrencyConverterTest { + public static final String VALID_UNICODE_AMOUNT = " €100 "; + public static final String VALID_ISO_AMOUNT = "100" + Currency.SEPARATOR + Currency.EUR.getIsoCode(); + + public static final String INVALID_UNICODE_AMOUNT = " ]100 "; + public static final String INVALID_ISO_AMOUNT = "100" + Currency.SEPARATOR + Currency.ERR.getIsoCode(); + + @Test + public void shouldConvertValidUnicodeAmount() throws Exception { + CurrencyConverter currencyConverter = new CurrencyConverter(); + + assertEquals(VALID_ISO_AMOUNT, currencyConverter.eval(VALID_UNICODE_AMOUNT)); + } + + @Test + public void shouldConvertInvalidUnicodeAmount() throws Exception { + CurrencyConverter currencyConverter = new CurrencyConverter(); + + assertEquals(INVALID_ISO_AMOUNT, currencyConverter.eval(INVALID_UNICODE_AMOUNT)); + } +} From fa5b023e6cb6c5420e62b4f62a837a8b38c342f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 18 Jun 2025 16:03:44 +0100 Subject: [PATCH 25/31] Add `docker-maven-plugin` --- docs/user-defined-functions/index.md | 47 ++++++++++++++++++++++++++++ tutorials/currency-converter/pom.xml | 19 +++++++++++ 2 files changed, 66 insertions(+) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index cc2ac3f..169f6c4 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -529,3 +529,50 @@ $ kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ 7208742491425008088,user-87,106,3,"587 UAH","2025-06-13 11:28:32.725" 8796404564173987612,user-70,105,1,"399 EUR","2025-06-13 11:28:35.728" ``` + +## Layering the UDF on top of the `flink-sql-runner` image + +### Using [docker-maven-plugin](https://github.com/fabric8io/docker-maven-plugin) + +To make it easier for others to use our UDF, we can create a new container image that layers our JAR on top of the `flink-sql-runner` image. This way, mounting our local build of the JAR into the container will no longer be necessary. + +Instead of writing the Dockerfile ourselves, we can automate this by adding the [docker-maven-plugin](https://github.com/fabric8io/docker-maven-plugin) to our `pom.xml`: + +```xml + + io.fabric8 + docker-maven-plugin + 0.46.0 + + + + flink-sql-runner-with-${project.artifactId} + + quay.io/streamshub/flink-sql-runner:0.2.0 + + artifact + /opt + + + + + + +``` + +We can then build the image like this: + +```shell +mvn clean package docker:build +``` + +> Note: docker-maven-plugin uses Docker by default, if you're using Podman [you will likely need to set `DOCKER_HOST` to use podman](https://github.com/fabric8io/docker-maven-plugin/issues/1330#issuecomment-872905283). + +Finally, we can create a new container using the image we just built: + +```shell +# We don't need to mount the JAR anymore! +podman run -it --rm --net=host + currency-converter:latest + /opt/flink/bin/sql-client.sh embedded +``` diff --git a/tutorials/currency-converter/pom.xml b/tutorials/currency-converter/pom.xml index bcaef69..ff1ee9b 100644 --- a/tutorials/currency-converter/pom.xml +++ b/tutorials/currency-converter/pom.xml @@ -50,6 +50,25 @@ + + io.fabric8 + docker-maven-plugin + 0.46.0 + + + + flink-sql-runner-with-${project.artifactId} + + quay.io/streamshub/flink-sql-runner:0.2.0 + + artifact + /opt + + + + + + org.apache.maven.plugins maven-shade-plugin From b71320b97c335f498a0409f31edfc78c6f54efb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 18 Jun 2025 17:07:06 +0100 Subject: [PATCH 26/31] Add `FlinkDeployment` --- docs/user-defined-functions/index.md | 109 +++++++++++++++++- .../standalone-etl-udf-deployment.yaml | 71 ++++++++++++ 2 files changed, 179 insertions(+), 1 deletion(-) create mode 100644 tutorials/user-defined-functions/standalone-etl-udf-deployment.yaml diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 169f6c4..1ac9c24 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -532,7 +532,7 @@ $ kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ ## Layering the UDF on top of the `flink-sql-runner` image -### Using [docker-maven-plugin](https://github.com/fabric8io/docker-maven-plugin) +### Adding [docker-maven-plugin](https://github.com/fabric8io/docker-maven-plugin) To make it easier for others to use our UDF, we can create a new container image that layers our JAR on top of the `flink-sql-runner` image. This way, mounting our local build of the JAR into the container will no longer be necessary. @@ -576,3 +576,110 @@ podman run -it --rm --net=host currency-converter:latest /opt/flink/bin/sql-client.sh embedded ``` + +You can run the same Flink SQL queries as before to verify that everything works the same way. + +### Using the new UDF image in a `FlinkDeployment` + +So far, we've been using the UDF in ETL queries that would have to compete for resources with other queries running in the same Flink session cluster. + +Instead, like in the [Interactive ETL example](../interactive-etl/index.md), we can create a FlinkDeployment CR for deploying our queries as a stand-alone Flink Job: + +```yaml +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: standalone-etl-udf +spec: + # Change the two lines below depending on your image + image: docker.io/library/flink-sql-runner-with-currency-converter:latest + imagePullPolicy: Never + flinkVersion: v2_0 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/streamshub/flink-sql-runner.jar + args: [" + CREATE TABLE InternationalSalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.international.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', + 'properties.group.id' = 'international-sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' + ); + CREATE FUNCTION currency_convert + AS 'com.github.streamshub.flink.functions.CurrencyConverter' + USING JAR '/opt/currency-converter-1.0-SNAPSHOT.jar'; + CREATE TABLE IsoInternationalSalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity INT, + iso_unit_cost STRING, + purchase_time TIMESTAMP(3), + PRIMARY KEY (`user_id`) NOT ENFORCED + ) WITH ( + 'connector' = 'upsert-kafka', + 'topic' = 'flink.iso.international.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', + 'properties.client.id' = 'sql-cleaning-client', + 'properties.transaction.timeout.ms' = '800000', + 'key.format' = 'csv', + 'value.format' = 'csv', + 'value.fields-include' = 'ALL' + ); + INSERT INTO IsoInternationalSalesRecordTable + SELECT + invoice_id, + user_id, + product_id, + CAST(quantity AS INT), + currency_convert(unit_cost), + purchase_time + FROM InternationalSalesRecordTable; + "] + parallelism: 1 + upgradeMode: stateless +``` + +Then use it: + +```shell +# If using minikube and a local image, load the image first: +minikube image load flink-sql-runner-with-currency-converter + +kubectl apply -n flink -f .yaml +``` + +> Note: We can also just use the provided example FlinkDeployment CR instead: +> +> ```shell +> kubectl apply -n flink -f tutorials/user-defined-functions/standalone-etl-udf-deployment.yaml +> ``` + +Finally, we can verify that data is being written to the new topic, just like before: + +```shell +kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ + ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ + --topic flink.iso.international.sales.records +``` diff --git a/tutorials/user-defined-functions/standalone-etl-udf-deployment.yaml b/tutorials/user-defined-functions/standalone-etl-udf-deployment.yaml new file mode 100644 index 0000000..b433913 --- /dev/null +++ b/tutorials/user-defined-functions/standalone-etl-udf-deployment.yaml @@ -0,0 +1,71 @@ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: standalone-etl-udf +spec: + image: quay.io/streamshub/flink-sql-runner-with-currency-converter:latest + flinkVersion: v2_0 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/streamshub/flink-sql-runner.jar + args: [" + CREATE TABLE InternationalSalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.international.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', + 'properties.group.id' = 'international-sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' + ); + CREATE FUNCTION currency_convert + AS 'com.github.streamshub.flink.functions.CurrencyConverter' + USING JAR '/opt/currency-converter-0.1.0-SNAPSHOT.jar'; + CREATE TABLE IsoInternationalSalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity INT, + iso_unit_cost STRING, + purchase_time TIMESTAMP(3), + PRIMARY KEY (`user_id`) NOT ENFORCED + ) WITH ( + 'connector' = 'upsert-kafka', + 'topic' = 'flink.iso.international.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', + 'properties.client.id' = 'sql-cleaning-client', + 'properties.transaction.timeout.ms' = '800000', + 'key.format' = 'csv', + 'value.format' = 'csv', + 'value.fields-include' = 'ALL' + ); + INSERT INTO IsoInternationalSalesRecordTable + SELECT + invoice_id, + user_id, + product_id, + CAST(quantity AS INT), + currency_convert(unit_cost), + purchase_time + FROM InternationalSalesRecordTable; + "] + parallelism: 1 + upgradeMode: stateless From 144f4774aa88a03659622ff7fad57eb92d4e46fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Fri, 20 Jun 2025 15:50:38 +0100 Subject: [PATCH 27/31] Apply suggestions from code review Co-authored-by: Thomas Cooper --- docs/user-defined-functions/index.md | 16 ++++++++-------- tutorials/currency-converter/pom.xml | 3 ++- .../standalone-etl-udf-deployment.yaml | 4 ++-- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 1ac9c24..f69a4f4 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -124,7 +124,7 @@ The project should still build and run successfully at this point, we can run th ```shell mvn clean package -java -cp target/currency-converter-1.0-SNAPSHOT.jar com.github.streamshub.CurrencyConverter +java -cp target/flink-udf-currency-converter-1.0-SNAPSHOT.jar com.github.streamshub.CurrencyConverter # Should print "Hello World!" ``` @@ -321,7 +321,7 @@ public class CurrencyConverterTest { } ``` -> Note: Since our UDF is simple and stateless, we can test its methods directly. If we would've made use of managed state or timers (e.g. for watermarks) we probably would've had to use [test harnesses](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators). +> Note: Since our UDF is simple and stateless, we can test its methods directly. If we had made use of managed state or timers (e.g. for watermarks) we would need to use the Flink [test harnesses](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/datastream/testing/#unit-testing-stateful-or-timely-udfs--custom-operators). ### Building the JAR @@ -391,7 +391,7 @@ Next, we will create a container with our JAR mounted into it: ```shell podman run -it --rm --net=host \ - -v ~/currency-converter/target/currency-converter-1.0-SNAPSHOT.jar:/opt/currency-converter-1.0-SNAPSHOT.jar:Z \ + -v ~/currency-converter/target/flink-udf-currency-converter-1.0-SNAPSHOT.jar:/opt/flink-udf-currency-converter-1.0-SNAPSHOT.jar:Z \ quay.io/streamshub/flink-sql-runner:0.2.0 \ /opt/flink/bin/sql-client.sh embedded ``` @@ -432,8 +432,8 @@ If that worked, we can now register our UDF as a [temporary catalog function](ht ```sql CREATE TEMPORARY FUNCTION currency_convert -AS 'com.github.streamshub.CurrencyConverter' -USING JAR '/opt/currency-converter-1.0-SNAPSHOT.jar'; +AS 'com.github.streamshub.flink.functions.CurrencyConverter' +USING JAR '/opt/flink-udf-currency-converter-1.0-SNAPSHOT.jar'; ``` > Note: Temporary catalog functions [only live as long as the current session](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/functions/overview/#types-of-functions). Provided you have a [Flink catalog](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/catalogs/#catalogs) deployed and configured, you can omit the `TEMPORARY` keyword to create a function that persists across sessions. @@ -592,7 +592,7 @@ metadata: name: standalone-etl-udf spec: # Change the two lines below depending on your image - image: docker.io/library/flink-sql-runner-with-currency-converter:latest + image: docker.io/library/flink-sql-runner-with-flink-udf-currency-converter:latest imagePullPolicy: Never flinkVersion: v2_0 flinkConfiguration: @@ -628,7 +628,7 @@ spec: ); CREATE FUNCTION currency_convert AS 'com.github.streamshub.flink.functions.CurrencyConverter' - USING JAR '/opt/currency-converter-1.0-SNAPSHOT.jar'; + USING JAR '/opt/flink-udf-currency-converter-1.0-SNAPSHOT.jar'; CREATE TABLE IsoInternationalSalesRecordTable ( invoice_id STRING, user_id STRING, @@ -665,7 +665,7 @@ Then use it: ```shell # If using minikube and a local image, load the image first: -minikube image load flink-sql-runner-with-currency-converter +minikube image load flink-sql-runner-with-flink-udf-currency-converter kubectl apply -n flink -f .yaml ``` diff --git a/tutorials/currency-converter/pom.xml b/tutorials/currency-converter/pom.xml index ff1ee9b..0e545b2 100644 --- a/tutorials/currency-converter/pom.xml +++ b/tutorials/currency-converter/pom.xml @@ -9,7 +9,8 @@ 0.1.0-SNAPSHOT - currency-converter + flink-udf-currency-converter + 1.0-SNAPSHOT UTF-8 diff --git a/tutorials/user-defined-functions/standalone-etl-udf-deployment.yaml b/tutorials/user-defined-functions/standalone-etl-udf-deployment.yaml index b433913..48e6fa1 100644 --- a/tutorials/user-defined-functions/standalone-etl-udf-deployment.yaml +++ b/tutorials/user-defined-functions/standalone-etl-udf-deployment.yaml @@ -3,7 +3,7 @@ kind: FlinkDeployment metadata: name: standalone-etl-udf spec: - image: quay.io/streamshub/flink-sql-runner-with-currency-converter:latest + image: quay.io/streamshub/flink-sql-runner-with-flink-udf-currency-converter:latest flinkVersion: v2_0 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" @@ -38,7 +38,7 @@ spec: ); CREATE FUNCTION currency_convert AS 'com.github.streamshub.flink.functions.CurrencyConverter' - USING JAR '/opt/currency-converter-0.1.0-SNAPSHOT.jar'; + USING JAR '/opt/flink-udf-currency-converter-1.0-SNAPSHOT.jar'; CREATE TABLE IsoInternationalSalesRecordTable ( invoice_id STRING, user_id STRING, From b76ef8b550ef098500d4571e0d505d8a1a109aad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Mon, 23 Jun 2025 10:39:58 +0100 Subject: [PATCH 28/31] Make tutorial code match example code better https://github.com/streamshub/flink-sql-examples/pull/55#discussion_r2157100402 --- docs/user-defined-functions/index.md | 43 ++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index f69a4f4..c281f09 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -94,7 +94,6 @@ FROM InternationalSalesRecordTable; First, we will create a blank Maven project: ```shell - mvn archetype:generate \ -DgroupId=com.github.streamshub \ -DartifactId=flink-udf-currency-converter \ @@ -109,14 +108,24 @@ cd ~/flink-udf-currency-converter ### Renaming `App` and `AppTest` -Next, we will rename the `App` and `AppTest` classes to `CurrencyConverter` and `CurrencyConverterTest` respectively. We will also rename the files accordingly: +Next, we will rename the `App` and `AppTest` classes to `CurrencyConverter` and `CurrencyConverterTest` respectively. We will also rename the files accordingly and move them to a new package called `com.github.streamshub.flink.functions`: ```shell +# Rename the classes sed -i -e 's/App/CurrencyConverter/g' src/main/java/com/github/streamshub/App.java sed -i -e 's/AppTest/CurrencyConverterTest/g' src/test/java/com/github/streamshub/AppTest.java -mv src/main/java/com/github/streamshub/App.java src/main/java/com/github/streamshub/CurrencyConverter.java -mv src/test/java/com/github/streamshub/AppTest.java src/test/java/com/github/streamshub/CurrencyConverterTest.java +# Create new package directories +mkdir -p src/main/java/com/github/streamshub/flink/functions +mkdir -p src/test/java/com/github/streamshub/flink/functions + +# Move classes to the new package +mv src/main/java/com/github/streamshub/App.java src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java +mv src/test/java/com/github/streamshub/AppTest.java src/test/java/com/github/streamshub/flink/functions/CurrencyConverterTest.java + +# Update package declarations +sed -i -e 's/com.github.streamshub;/com.github.streamshub.flink.functions;/' src/main/java/com/github/streamshub/flink/functions/CurrencyConverter.java +sed -i -e 's/com.github.streamshub;/com.github.streamshub.flink.functions;/' src/test/java/com/github/streamshub/flink/functions/CurrencyConverterTest.java ``` The project should still build and run successfully at this point, we can run the following commands to verify: @@ -124,7 +133,7 @@ The project should still build and run successfully at this point, we can run th ```shell mvn clean package -java -cp target/flink-udf-currency-converter-1.0-SNAPSHOT.jar com.github.streamshub.CurrencyConverter +java -cp target/flink-udf-currency-converter-1.0-SNAPSHOT.jar com.github.streamshub.flink.functions.CurrencyConverter # Should print "Hello World!" ``` @@ -160,7 +169,7 @@ Now that we have added the only dependency we need, we can implement our `Curren Let's start by making our `CurrencyConverter` class extend the `ScalarFunction` base class. We can also remove the `main` method since we won't need it: ```java -package com.github.streamshub; +package com.github.streamshub.flink.functions; import org.apache.flink.table.functions.ScalarFunction; @@ -172,7 +181,7 @@ This function doesn't do anything yet. For that, we need it to declare a public Since we'll only be passing it one argument (the `unit_cost` field), we can declare that the method takes in a single `String` argument and also returns a `String`: ```java -package com.github.streamshub; +package com.github.streamshub.flink.functions; import org.apache.flink.table.functions.ScalarFunction; @@ -200,6 +209,12 @@ By speaking to authors of the upstream services, we should be able to obtain a l We can create an `enum` that maps these symbols to their corresponding [ISO 4217](https://www.iso.org/iso-4217-currency-codes.html) currency codes. +```shell +mkdir -p src/main/java/com/github/streamshub/flink/enums + +touch src/main/java/com/github/streamshub/flink/enums/Currency.java +``` + As a reminder, we want to convert a string like "€100" into "100 EUR". To do this, we can use the following steps: 1. Get the first character of the string, which is the currency symbol (e.g. "€"). @@ -211,7 +226,7 @@ As a reminder, we want to convert a string like "€100" into "100 EUR". To do t A possible implementation could look like this: ```java -package com.github.streamshub; +package com.github.streamshub.flink.enums; import java.util.Map; import java.util.stream.Collectors; @@ -275,10 +290,12 @@ public enum Currency { We can then use this `enum` in the `eval` method of our UDF: ```java -package com.github.streamshub; +package com.github.streamshub.flink.functions; import org.apache.flink.table.functions.ScalarFunction; +import com.github.streamshub.flink.enums.Currency; + public class CurrencyConverter extends ScalarFunction { // e.g. unicodeAmount = "€100" public String eval(String unicodeAmount) { @@ -292,12 +309,14 @@ public class CurrencyConverter extends ScalarFunction { If we want to, we can modify `CurrencyConverterTest` to verify the UDF works as expected: ```java -package com.github.streamshub; +package com.github.streamshub.flink.functions; import static org.junit.jupiter.api.Assertions.assertEquals; import org.junit.jupiter.api.Test; +import com.github.streamshub.flink.enums.Currency; + public class CurrencyConverterTest { public static final String VALID_UNICODE_AMOUNT = " €100 "; public static final String VALID_ISO_AMOUNT = "100" + Currency.SEPARATOR + Currency.EUR.getIsoCode(); @@ -572,8 +591,8 @@ Finally, we can create a new container using the image we just built: ```shell # We don't need to mount the JAR anymore! -podman run -it --rm --net=host - currency-converter:latest +podman run -it --rm --net=host \ + flink-sql-runner-with-flink-udf-currency-converter:latest \ /opt/flink/bin/sql-client.sh embedded ``` From 6210b13180ce326021b80f71cb0a87f722da6e7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Mon, 23 Jun 2025 10:53:01 +0100 Subject: [PATCH 29/31] Change query for faster results https://github.com/streamshub/flink-sql-examples/pull/55#discussion_r2157236222 --- docs/user-defined-functions/index.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index c281f09..78b34cd 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -472,7 +472,7 @@ FROM InternationalSalesRecordTable; You should start seeing results with both a `unit_cost` field and an `iso_unit_cost` field containing the output of our UDF! -We can also use the UDF in more complex queries e.g. to filter for records with a specific currency and quantity: +We can also use the UDF in more complex queries e.g. to filter for records with specific currencies and quantities: ```sql SELECT @@ -491,7 +491,7 @@ SELECT FROM InternationalSalesRecordTable ) WHERE - RIGHT(iso_unit_cost, 3) = 'EUR' AND quantity > 1; + quantity > 1 AND RIGHT(iso_unit_cost, 3) NOT IN ('MNT', 'ERR'); ``` > Note: This query might take a while to return results, since there are many currencies used in the data! From 05a02e7f4821b3d082e42919da92c13f6614b8cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Mon, 23 Jun 2025 17:08:53 +0100 Subject: [PATCH 30/31] Build and push image in CI https://github.com/streamshub/flink-sql-examples/pull/55#discussion_r2157264452 --- .github/workflows/integration.yaml | 51 ++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/.github/workflows/integration.yaml b/.github/workflows/integration.yaml index 02cf173..326b863 100644 --- a/.github/workflows/integration.yaml +++ b/.github/workflows/integration.yaml @@ -41,7 +41,11 @@ jobs: username: "${{ secrets.IMAGE_REPO_USERNAME }}" password: "${{ secrets.IMAGE_REPO_PASSWORD }}" - - name: Build Image + - name: Generate Currency Converter Dockerfile + working-directory: tutorials/currency-converter + run: mvn -B docker:build -Ddocker.buildArchiveOnly # Skip building image, just create Dockerfile + + - name: Build Data Generator Image if: github.event_name != 'push' uses: docker/build-push-action@v6 with: @@ -49,10 +53,19 @@ jobs: platforms: linux/amd64,linux/arm64 push: false file: tutorials/data-generator/Dockerfile + + - name: Build Currency Converter Image + if: github.event_name != 'push' + uses: docker/build-push-action@v6 + with: + context: tutorials/currency-converter/target/docker/flink-sql-runner-with-flink-udf-currency-converter/build/ + platforms: linux/amd64,linux/arm64 + push: false + file: tutorials/currency-converter/target/docker/flink-sql-runner-with-flink-udf-currency-converter/build/Dockerfile - - name: Image metadata + - name: Data Generator Image metadata if: github.event_name == 'push' - id: meta + id: data_generator_meta uses: docker/metadata-action@v5 with: images: | @@ -65,7 +78,22 @@ jobs: prefix= suffix= - - name: Build and Push Image + - name: Currency Converter Image metadata + if: github.event_name == 'push' + id: currency_converter_meta + uses: docker/metadata-action@v5 + with: + images: | + ${{ secrets.IMAGE_REPO_HOSTNAME }}/${{ secrets.IMAGE_REPO_NAMESPACE }}/flink-sql-runner-with-flink-udf-currency-converter + tags: | + type=ref,event=branch + type=ref,event=tag + flavor: | + latest=false + prefix= + suffix= + + - name: Build and Push Data Generator Image if: github.event_name == 'push' uses: docker/build-push-action@v6 with: @@ -73,5 +101,16 @@ jobs: platforms: linux/amd64,linux/arm64 push: true file: tutorials/data-generator/Dockerfile - tags: ${{ steps.meta.outputs.tags }} - labels: ${{ steps.meta.outputs.labels }} \ No newline at end of file + tags: ${{ steps.data_generator_meta.outputs.tags }} + labels: ${{ steps.data_generator_meta.outputs.labels }} + + - name: Build and Push Currency Converter Image + if: github.event_name == 'push' + uses: docker/build-push-action@v6 + with: + context: tutorials/currency-converter/target/docker/flink-sql-runner-with-flink-udf-currency-converter/build/ + platforms: linux/amd64,linux/arm64 + push: true + file: tutorials/currency-converter/target/docker/flink-sql-runner-with-flink-udf-currency-converter/build/Dockerfile + tags: ${{ steps.currency_converter_meta.outputs.tags }} + labels: ${{ steps.currency_converter_meta.outputs.labels }} \ No newline at end of file From b400a3b56c0c756c1360a67f053dfee72a099ecf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 24 Jun 2025 16:06:39 +0100 Subject: [PATCH 31/31] Apply suggestions from code review Co-authored-by: Thomas Cooper --- docs/user-defined-functions/index.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/user-defined-functions/index.md b/docs/user-defined-functions/index.md index 78b34cd..98abc64 100644 --- a/docs/user-defined-functions/index.md +++ b/docs/user-defined-functions/index.md @@ -553,7 +553,9 @@ $ kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ ### Adding [docker-maven-plugin](https://github.com/fabric8io/docker-maven-plugin) -To make it easier for others to use our UDF, we can create a new container image that layers our JAR on top of the `flink-sql-runner` image. This way, mounting our local build of the JAR into the container will no longer be necessary. +To make it easier for others to use our UDF, we can create a new container image that layers our JAR on top of the Streamhub [`flink-sql-runner`](https://github.com/streamshub/flink-sql) image. +This way, the function will be available to all users of the image without needing to have the jar file locally. +It also allows us to deploy a standalone Flink job, which we will discuss later. Instead of writing the Dockerfile ourselves, we can automate this by adding the [docker-maven-plugin](https://github.com/fabric8io/docker-maven-plugin) to our `pom.xml`: @@ -612,6 +614,7 @@ metadata: spec: # Change the two lines below depending on your image image: docker.io/library/flink-sql-runner-with-flink-udf-currency-converter:latest + # This is set to Never when you have pushed/built the image directly in your Kubernetes cluster imagePullPolicy: Never flinkVersion: v2_0 flinkConfiguration: @@ -689,7 +692,7 @@ minikube image load flink-sql-runner-with-flink-udf-currency-converter kubectl apply -n flink -f .yaml ``` -> Note: We can also just use the provided example FlinkDeployment CR instead: +> Note: We can also just use the provided example FlinkDeployment CR instead (which uses the image built from the example code in this repository): > > ```shell > kubectl apply -n flink -f tutorials/user-defined-functions/standalone-etl-udf-deployment.yaml