From 83ab98b382450773ae67a2fb4a2c5e9353569d12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Mon, 30 Jun 2025 16:48:42 +0100 Subject: [PATCH 01/15] Create topics with retention --- .../kafka/data/generator/DataGenerator.java | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java index d05065e..1d30ef7 100644 --- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java @@ -1,23 +1,38 @@ package com.github.streamshub.kafka.data.generator; +import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import java.util.List; +import java.util.*; import java.util.function.Supplier; +import static org.apache.kafka.clients.CommonClientConfigs.*; +import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG; + public class DataGenerator implements Runnable { + final static Map RETENTION_CONFIG = Collections.singletonMap(RETENTION_MS_CONFIG, String.valueOf(60 * 60 * 1000)); // 1 hour + final static int KAFKA_ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG = 5000; + final static String KAFKA_ADMIN_CLIENT_ID_CONFIG = "data-generator-admin-client"; final String bootstrapServers; final List dataTypes; + final Properties kafkaAdminProps; public DataGenerator(String bootstrapServers, List dataTypes) { this.bootstrapServers = bootstrapServers; this.dataTypes = dataTypes; + + kafkaAdminProps = new Properties(); + kafkaAdminProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + kafkaAdminProps.put(CLIENT_ID_CONFIG, KAFKA_ADMIN_CLIENT_ID_CONFIG); + kafkaAdminProps.put(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG)); } @Override public void run() { + createTopics(dataTypes.stream().map(Data::topic).toList()); + if (Boolean.parseBoolean(System.getenv("USE_APICURIO_REGISTRY"))) { String registryUrl = System.getenv("REGISTRY_URL"); Producer producer = new KafkaProducer<>(KafkaClientProps.avro(bootstrapServers, registryUrl)); @@ -28,6 +43,28 @@ public void run() { } } + private void createTopics(List topicNames) { + try (AdminClient adminClient = AdminClient.create(kafkaAdminProps)) { + Set existingTopicNames = adminClient.listTopics().names().get(); + List newTopics = topicNames.stream() + .filter(topicName -> !existingTopicNames.contains(topicName)) // createTopics() will throw if topic already exists + .map( + topicName -> new NewTopic(topicName, Optional.empty(), Optional.empty()).configs(RETENTION_CONFIG) + ) + .toList(); + + if (newTopics.isEmpty()) { + return; + } + + adminClient.createTopics(newTopics).all().get(); + System.out.println("Kafka Data Generator : Successfully created topics: " + newTopics); + } catch (Exception e) { + System.out.println("Kafka Data Generator : Failed to create topics."); + throw new RuntimeException(e); + } + } + private ProducerRecord generateAvroRecord(Data data) { return getKafkaProducerRecord(data, data.generate()); } From 9f6d7ab8f0f12522001c243643849506db78831b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Mon, 30 Jun 2025 16:49:26 +0100 Subject: [PATCH 02/15] Increase emission rate to 1 event per second --- .../github/streamshub/kafka/data/generator/DataGenerator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java index 1d30ef7..59448d1 100644 --- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java @@ -98,7 +98,7 @@ private void send(Producer producer, Supplier Date: Wed, 9 Jul 2025 17:27:47 +0100 Subject: [PATCH 03/15] Add `batchSize()` to `Data` class --- .../java/com/github/streamshub/kafka/data/generator/Data.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Data.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Data.java index 01a68d0..559578c 100644 --- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Data.java +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/Data.java @@ -5,6 +5,9 @@ public interface Data { String topic(); + default int batchSize() { + return 1; + } SpecificRecord generate(); String generateCsv(); Schema schema(); From 2770cd1367d1fcebd6d6bc65df5b1e0ab68554d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 9 Jul 2025 17:40:30 +0100 Subject: [PATCH 04/15] Add `batchSize()` and quantity multiplication to `SalesData` --- .../kafka/data/generator/examples/SalesData.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/examples/SalesData.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/examples/SalesData.java index 00a9d80..ce7e3d3 100644 --- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/examples/SalesData.java +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/examples/SalesData.java @@ -9,6 +9,7 @@ public class SalesData implements Data { private final Random random = new Random(); + private final static int USER_COUNT = 100; public String topic() { return "flink.sales.records"; @@ -16,6 +17,9 @@ public String topic() { public Schema schema() { return Sales.SCHEMA$; } + public int batchSize() { + return USER_COUNT; + } public SpecificRecord generate() { return Sales.newBuilder() @@ -40,7 +44,7 @@ private String generateInvoiceId() { } private String generateUserId() { - return "user-" + Math.abs(random.nextInt(100)); + return "user-" + Math.abs(random.nextInt(USER_COUNT)); } private String generateProductId() { @@ -48,7 +52,13 @@ private String generateProductId() { } private String generateQuantity() { - return String.valueOf(Math.abs(random.nextInt(3) + 1)); + int multiplier = 1; + + if (Math.abs(random.nextInt(batchSize() * 10)) < 2) { + multiplier = 10; + } + + return String.valueOf(Math.abs((1 + random.nextInt(3)) * multiplier )); } private String generateUnitCost() { From 646b82953cddccda48e982f3651c718d1616d0c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 9 Jul 2025 17:41:16 +0100 Subject: [PATCH 05/15] Use `batchSize()` in `DataGenerator` --- .../kafka/data/generator/DataGenerator.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java index 59448d1..7d85fbc 100644 --- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java @@ -6,6 +6,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import java.util.*; +import java.util.function.Function; import java.util.function.Supplier; import static org.apache.kafka.clients.CommonClientConfigs.*; @@ -36,13 +37,25 @@ public void run() { if (Boolean.parseBoolean(System.getenv("USE_APICURIO_REGISTRY"))) { String registryUrl = System.getenv("REGISTRY_URL"); Producer producer = new KafkaProducer<>(KafkaClientProps.avro(bootstrapServers, registryUrl)); - send(producer, () -> dataTypes.stream().map(this::generateAvroRecord).toList()); + send(producer, () -> generateTopicRecords(this::generateAvroRecord)); } else { Producer producer = new KafkaProducer<>(KafkaClientProps.csv(bootstrapServers)); - send(producer, () -> dataTypes.stream().map(this::generateCsvRecord).toList()); + send(producer, () -> generateTopicRecords(this::generateCsvRecord)); } } + private List> generateTopicRecords(Function> recordsGenerator) { + List> records = new ArrayList<>(); + + for (Data dataType : this.dataTypes) { + for (int i = 0; i < dataType.batchSize(); i++) { + records.add(recordsGenerator.apply(dataType)); + } + } + + return records; + } + private void createTopics(List topicNames) { try (AdminClient adminClient = AdminClient.create(kafkaAdminProps)) { Set existingTopicNames = adminClient.listTopics().names().get(); From bef1a966189c413ce322fbfc00d9b147528c5a54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 9 Jul 2025 17:41:36 +0100 Subject: [PATCH 06/15] Add `flink-session-anomaly.yaml` --- .../flink-session-anomaly.yaml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 tutorials/anomaly-detection/flink-session-anomaly.yaml diff --git a/tutorials/anomaly-detection/flink-session-anomaly.yaml b/tutorials/anomaly-detection/flink-session-anomaly.yaml new file mode 100644 index 0000000..72d5ec3 --- /dev/null +++ b/tutorials/anomaly-detection/flink-session-anomaly.yaml @@ -0,0 +1,18 @@ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: session-cluster-anomaly +spec: + image: quay.io/streamshub/flink-sql-runner:0.2.0 + flinkVersion: v2_0 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "2" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 2 From b4b4abc4374575893cdfdd0f6d14a591f753b664 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Wed, 9 Jul 2025 18:05:34 +0100 Subject: [PATCH 07/15] Add anomaly detection tutorial unfinished draft --- .../assets/avg_range.excalidraw.svg | 5 + .../assets/first_last.excalidraw.svg | 5 + .../assets/scenario.excalidraw.svg | 4 + .../assets/useful_match.excalidraw.svg | 5 + .../assets/useful_query.excalidraw.svg | 5 + docs/anomaly-detection/index.md | 322 ++++++++++++++++++ 6 files changed, 346 insertions(+) create mode 100644 docs/anomaly-detection/assets/avg_range.excalidraw.svg create mode 100644 docs/anomaly-detection/assets/first_last.excalidraw.svg create mode 100644 docs/anomaly-detection/assets/scenario.excalidraw.svg create mode 100644 docs/anomaly-detection/assets/useful_match.excalidraw.svg create mode 100644 docs/anomaly-detection/assets/useful_query.excalidraw.svg create mode 100644 docs/anomaly-detection/index.md diff --git a/docs/anomaly-detection/assets/avg_range.excalidraw.svg b/docs/anomaly-detection/assets/avg_range.excalidraw.svg new file mode 100644 index 0000000..967347b --- /dev/null +++ b/docs/anomaly-detection/assets/avg_range.excalidraw.svg @@ -0,0 +1,5 @@ + + 2230USER-350123purchase_time (seconds)TYPICAL_SALE31FIRST()LAST()UNUSUAL_SALE \ No newline at end of file diff --git a/docs/anomaly-detection/assets/first_last.excalidraw.svg b/docs/anomaly-detection/assets/first_last.excalidraw.svg new file mode 100644 index 0000000..37107be --- /dev/null +++ b/docs/anomaly-detection/assets/first_last.excalidraw.svg @@ -0,0 +1,5 @@ + + +eyJ2ZXJzaW9uIjoiMSIsImVuY29kaW5nIjoiYnN0cmluZyIsImNvbXByZXNzZWQiOnRydWUsImVuY29kZWQiOiJ4nO1dWVPqSFx1MDAxYr4/v8JybuarXHUwMDFhmN6XuXNBRVx1MDAxMVx1MDAxNVxc+WqKXG5cdTAwMTAghy1AXHUwMDAwcer89+mgkFx1MDAxMDpcdTAwMTggKk5cdTAwMDGWJZ1OeNP9Pnnerdt/fuzt7Ttj29z/a2/ffC5cdTAwMWJNq9IzRvt/uO1Ds9e3Om11XGJNPvc7g1550rPuOHb/rz//9M5Iljut17PMptky205f9fu/+ry398/ktzpiVSbndlx1MDAwNz3wclxuK6KWfipcdTAwMTVKjZ/9enNy6qTTVJieWXaMdq1peoeeVTuEXHUwMDAwzFx1MDAxYcaqQVx1MDAwMJAkWEBcdTAwMDJcdJaAMTE7OrIqTn3SI1x0/C8+61E3rVrdUV24TEr/i826vIrw1573pX2n12mYR51mp+fK+Vx1MDAxYjTdtydlySg3ar3OoF2Z9XF6RrtvXHUwMDFiPTUsXr+q1WzmnfHk6mpo1TDuXHUwMDA3vuPh7Vx1MDAwNmCgPews9aW1etvs9+fk7dhG2XLcofJcdTAwMGadK6Gdrkxm6W9Ppp7RMtPuNLVcdTAwMDfN5qzZaldMd/D3XHI0923tytu3TafYmz/81vLLk9003Vx1MDAwYkOCJEaAcTk74ulcdTAwMTlcdTAwMDRcdTAwMDRcdTAwMDebs532ROmgkJghgXw9rP6x0jZnct2q0eyb3iS4wqWCmujXRp9Gti5cdTAwMWVOX06rJ4fO4VV7PDwqXFyeP2RmNzqnlY757OzPXHUwMDBl/Hr7y1x1MDAxYsCBXTFe5YGcXCLAIWdcZlHvVptWu1x1MDAxMVx1MDAxY91mp9zwbuGHb9RcdTAwMDLIWS7nvIxcdTAwMDHQIJmEXHUwMDEzxZZEjSBC81x1MDAxMII8KZdCXGLBJHOhwyDAXHUwMDE0UnVLi1x1MDAxOFwidKtAg9ZcdTAwMDBccvxcdTAwMTjQYD1o5nq/oUPNXHUwMDBlXHUwMDE2lHGoXHUwMDAzXHUwMDA3ZiRcdTAwMWNcdTAwMWOYSOQ+XHUwMDA311x1MDAwMUec+uupo6uG6vahb/Y6bSdvvUyeXHUwMDBmbK71xGhZTXe42dz5XHUwMDA3Tavm3vp+WYlq9vb9t+9YintmXHUwMDFkWlal4ueKsrqoYbXNXjpcbul0elbNalx1MDAxYs3bRZmNgdPJmf1XqZ3ewPSPhXk21X6YRHRcdHSH92c3T09O/yUxcMY3xssoNVx1MDAxYVdcdTAwMTeha/R6nVFcdTAwMDC7gMxcdTAwMTOeZN7nKT69llx1MDAxOVx1MDAxY5HwXHUwMDFhvytcdTAwMWU/iMSyUUlcZoWSmFQ4ZZiwXHUwMDA1tnJVmFx1MDAwN1x1MDAxYqcopYwySFx0XovBwkFcbpjkSqB1QGp3rCBBen/teVx1MDAxMzD5MPv77z/e751QXG7onbBAlE2j71x1MDAxY3VaLctRd3LtSrHwRHSMnnOoZs1q14LHzHYl5MjkrFx1MDAwM1x1MDAxN0Z101jQXHUwMDAxdZ7/mNJpK2Bjms1SZ1x1MDAxNImN87Iqz1x1MDAxZYsjRK5SuZvGbauFXlrxQVogXHKod5BcdTAwMGWB9NXGkGaQXHUwMDBiyORcInjduaDBxlx1MDAxOe9yxClcdTAwMTbSZ81uXHUwMDBlaSg5XHUwMDExUEDqXHLLV0Hap4WB/v9BRMvzYXvUOFx1MDAxOF1WuonzpkTG4+XPqyj2tYA4KXwvT1x1MDAxOcZcdTAwMDEkT9GtvKAk8r88SvBcZurtQnuoXHUwMDE3Wum4M/6pcM9Ft6iVT0NcdTAwMTBWINVZ1D6EXHUwMDA1gK1McI6VTb199vRdPpVLQKa3qtFcXOvGVrXTscNM6jmRg/bzgoxzVvT8fa1iRjeer8f3w9t6ySaVq0xcblVPjMFT9NhcdTAwMTGD3nRGi1x1MDAxZFx1MDAwNVx1MDAwM0PezIXGjrwuW43az+fo281jR1xicuBcdTAwMTKjzj3mWFx1MDAwNFtndjdcdTAwMTFcdTAwMDJyQtZcIumZeJrIXHUwMDExsVx1MDAxMsfX8tKw8vzgyjxcdTAwMWGwg1T2Pp7I0epG/Vx1MDAxMtwsl3NZ5IhcdTAwMTGwi1x1MDAxY31Z5OguOs9cdDU/TI2xLqwqcWhUXHUwMDE1S1x1MDAwNVxmJEGc9ut/KG70XHUwMDBl4Xx43OiibpvWXHUwMDExKuDusTxcdTAwMWRcXGRqnU6+slxiXFz3kjGFjbbL6NxcIlx1MDAxN/M+Kn2FR40wJVx1MDAwMFwiqnMxcbiLXHQl5VDZo3FGjZSLiShQXHUwMDAyUb9afn3UXGJtvYc5d8zuNMe1yfy9x8Csmk33TouZStZOOT9T7PDpfHBcdTAwMTOJgVx1MDAwMVxuXHUwMDAwWXiKMFx1MDAwNTLRIFx1MDAxOX9/Vv0gJD9EZ1VcdTAwMDQhXHUwMDEygPldelx1MDAxZq2iYONcZrOEK9RKXCI9qMeEWcgh8LhuXHJaXHUwMDA1WlpFYq71y1x1MDAxY0dcdTAwMTBCoOu7jHfmVaXE60b6yjhJWOls71x1MDAxMV7akVx1MDAxONSfQdkxaFxmuHvcnEFcdTAwMDWUXHUwMDE4clx1MDAwZXTRXHUwMDFjXHUwMDEymndRKMZcYuFcdTAwMGaA445CfWd9MIXelprWsH5e4pmz0m3msTK6XHUwMDE4ZpxIXHUwMDE0itiOQuOF8tNcblx1MDAxNCrd6Fx1MDAwMFx1MDAwMjqrXHUwMDE3wtBcdTAwMDAsxFC5pS6Lblx1MDAxZIXqPdNtodAwXHUwMDFmdH1cbi1cdTAwMTazd1x1MDAwZYJZ01CTczaoJmpOvlx1MDAxMIlCadB23VHoW+uauCtsTqGSMiqF/1x1MDAxMejhkYZ7oVx1MDAwMFxiLlx1MDAwMaA7XG79xlx1MDAxNHo9oF3DXHUwMDFjXlx1MDAxYuIp3bY7optpsUhcdTAwMTlOSMGOQuOFsrFCXHUwMDBlU5mvXHUwMDEyXHUwMDAzgLRJTFx1MDAxOJr34Fxur4hxXHUwMDE2Z1x1MDAxMjNcdTAwMWVcbkVbTaEodlxuZbDby5xcdTAwMTZrN1x1MDAxZIpcdTAwMDG7fjg/hEBTLKShUFx1MDAxZbRdd1x1MDAxNPrWuibuSjF4oZQxRIg2kMtcdTAwMTZykzNcbqVcYnIkON9R6Dem0GK7OLZlO11cdTAwMWPcmf3x/UE2I9E4XHUwMDEyhfJgRmZHoVx1MDAxYkK5vFx1MDAwMoUyTtVcdTAwMWJIrVx1MDAxYopCXHUwMDBi61x1MDAwNZGcXG6BYk++bEyheKspXHUwMDE0x06hY5BFjSZ4ubDvu8Pzy2dUR4OHSMBcdTAwMGKmQiHwra+Y1dvCQFx1MDAxZM9cIlx1MDAxMHdcdTAwMDV5oUisroBEzLhwS/J0SEQgfImLgrBUrCtiLlXYoHp+XG5Fe9Ar142+WXSslrn3e99UQKn0/7fVJXrvylx1MDAxY5fl22u9pHo/63dnZ8YxOck7ucdcdTAwMDSJXFyyXHUwMDE3tH5cdTAwMTlYtH4h1LEmZd9cdTAwMDStK9LmxlCtr0Kaglx1MDAwMqKviVembChSXHUwMDExXHUwMDEyXHUwMDA0Sya/vKhomVx1MDAxZDdcdTAwMWHbsJk4bdG6nc6c30nDXHUwMDFlXFxELyVcdTAwMDWB5CCflYFcblx1MDAwMFxiYXJBTZlOSyGaLk6GUlKKIflPKi3dWGtcdTAwMWIraK3AXHUwMDE0Yom5NlqyJGnPJEFIOXexLzBcdTAwMWXeXHUwMDE3ioeVWreTPc7d0Juj49tL0teXib4uXHUwMDE3mlx1MDAxZPm0XHUwMDE1xvpcYoRv3IhP4Vx1MDAwM1x1MDAwMychXHUwMDA2SEpcdTAwMTTFr31cdTAwMWSQ5+IxPeuL5nHGXHUwMDFjpU6GLW7XWndboqBT3OEkJJBKxCim7lx1MDAwMlx1MDAxZk8rpnFcdTAwMTlvW1x1MDAwMcJcdTAwMDOF5XDuacBcdTAwMDNSelx1MDAxMH49fXVcYs+4XHUwMDBmaFx1MDAxZSq+mZpCXHUwMDAyIC5cdTAwMTBcIj6bS4/Dz7Gb1vPuI0Zcctb0/z8qXHUwMDEwQt7tzlbuP7+fXHUwMDA1/GPZwfeu5tOgycfgalx1MDAwNrya+ImFXHUwMDBisNUugJOUXCI+cVx1MDAxMlx1MDAxNHYogGZcdTAwMDLSlUZ8WeTJI1x1MDAxNN/q71x1MDAxMPJcdOWZ6MFcInCZRz3nslR4qCVOXHUwMDFi7Xaims1fRjcyXHUwMDEwXGaU2+/Wq0R9oG/u05ZcdTAwMDDU21x1MDAxY6ssWFGTpZ6Wi5uauPMgWLB1Vq9cdTAwMDQ4Jlx1MDAxMHHuRf5iskQu+oWWZWVHNXztnD31ReGwUijrLZFcdTAwMTVcdTAwMTesKDYgkJGYXHUwMDE2rCyXc2mtXHUwMDBmgbtcdTAwMDUrX7VgpVx1MDAwNEI2XGLS2+lcdTAwMTJhjLhcdTAwMTZcdTAwMWRShOdRMFx1MDAwNFxcKJMzzjxKPGtW9DHZT16z8lx1MDAwZelEjdSuvWZcdTAwMDU9XHUwMDFk3bZcdTAwMWGZRDd7cCjYRTbRKLCT6KSHfeiaeNaahCfS+tLfJeJTMfr1z43Pllx1MDAwMF2hYFx1MDAwZlCBXHUwMDA0W1x1MDAwNKCryqFcdTAwMGLJiMCQYyDWoqxPivn0riBcdTAwMTdcdTAwMDOc+4mG193roZGp1kxNIZtuy1x1MDAwZeIrOZ5opW+PuGXJO//c7ZQyoJR881Q8RIBIyoQ2XHUwMDE1XHUwMDFmXHUwMDFh6kFAOVx1MDAxOIJgXHUwMDFje4RS/cC1KCRWXHUwMDBmNEFcdTAwMDJOXHUwMDE52Ip0vG53jiVOefSdO5ZcdTAwMDfYllx1MDAwMptcIlx1MDAwZsivwPZUaVx1MDAwN+w1gX24MbCZRJIpO1670j+8aFx1MDAxY1x1MDAxMiqVY/X1LPQxuP5YWPum521cdTAwMTPXdIRUycSqLVx1MDAwZibqlMSQIa5cdTAwMWMrKVxi44xcdTAwMTPm61Uz7MmAXHUwMDA2XHUwMDEyXHUwMDFliyqwXHJPXHUwMDE02S3gRFx1MDAxMeds26Hj0e3904to1VwiOaA4UCnLNParcmCSbmRYQoxVf+6ztba17Gerni9HK1izakKIxFx1MDAxOOpWcKLQXHUwMDEwXGZVRrCb84h5q75cdTAwMTh8zJN0Ln/7u764IKz6R8xd5eOLXHUwMDBiXHUwMDE2ZIzHs0zlnluUPybyxZvDo2zq5mxET0+jla/7RiZcZpRcdTAwMTIlicIkUiSCqIKkZz3uMFx1MDAxOVx1MDAwMZPH0TEp1ENPKDzoILmkfpZcdTAwMTDCXHUwMDE5pGD79uTKXHUwMDFjbD0kg1wixlx1MDAxNOtcdTAwMTlcdTAwMTHy5LSrve7h0DZcdTAwMGVLqYfuw3kkRFx1MDAxMlx1MDAxY7C7NbvksUksXHUwMDE2XHSgpp1DwTWh2Fx1MDAxZFwiw1x1MDAxMZlaIVx1MDAxNsuxuzFcdNauXG5joavCKFx1MDAwNdK1uLdcdTAwMGaR+YNMaqvxOC9gPGjMZrKJoVlr1Fx1MDAxMoXK3dnltXPXXHUwMDFlXHUwMDFhi2g0m03L7i/EXZlMulx0V8wwZdy/S81r6lx1MDAxMb2mXHUwMDFlXHUwMDAxo+pHXHUwMDEz/eI8SSVQbyhcdTAwMDRHXHUwMDEyakxa7lxy8jaAlVx1MDAwNNq/Mtd4slxuVlx1MDAxMVx1MDAxMJhqNvVyjZjFLalnYHVcclx1MDAxYugvTds4qah9vvtcdTAwMTSyL1x1MDAxZnPZetXA/W72ppzI5ZDTuV6pukkwiFxi8aNh/fCvXpooZIWl8lpcdTAwMTmXVKk2I8S3XFzgNakokoJcdTAwMDGpWEoojJDFZVx1MDAxZN8wqbhN4DiNXHUwMDBlXHUwMDBlXHUwMDA0XFzPXHIhXTyYslx1MDAwNYPT28dcdTAwMTlwTihcdTAwMDHrXHUwMDA0jubk2Fxcg1x1MDAxN7hMv1jyk7OK73BL1CWUa3NbrZu7KqWh9VxcXHUwMDFj3nXNWq1gnV9p/vmJntuIUFdcdTAwMDdqfqmycqTkXHUwMDAx/M64TUJcdJlYrN3dcdveXHUwMDA28D1bgdtcYiaAz1x1MDAxNSr6/ztcdTAwMTBcYlx1MDAwNbBiNkDcXHUwMDA37OeRW/m49LNvwfuXPOpcXMGH9HnCejxYjdw4WW3h5lx1MDAxMnzopYlCbsqGT1wiN1arqI1yXHUwMDE4xm2EXCIhfe7WjttiXHUwMDAxRzo6OCjCUkJK9GGT0FAm54yqh1asO3espb3bSWzvXHUwMDEwS5zE9uPtabBv2HbeUaO5P0267Fx1MDAwZi1zdLio6L9VJy/3qTKBvavk5iSV9uvHr39cdTAwMDFoZqBcdTAwMGYifQ==1USER-1610123purchase_time (seconds)3FIRST()LAST()SALE22 \ No newline at end of file diff --git a/docs/anomaly-detection/assets/scenario.excalidraw.svg b/docs/anomaly-detection/assets/scenario.excalidraw.svg new file mode 100644 index 0000000..6dbca04 --- /dev/null +++ b/docs/anomaly-detection/assets/scenario.excalidraw.svg @@ -0,0 +1,4 @@ + + +eyJ2ZXJzaW9uIjoiMSIsImVuY29kaW5nIjoiYnN0cmluZyIsImNvbXByZXNzZWQiOnRydWUsImVuY29kZWQiOiJ4nO1dWW/iyFx1MDAxNn7vX1x1MDAxMWVe7pVcdTAwMDam9mXeQlx1MDAxNlx1MDAwMiFkhSxXo8iAXHUwMDAxJ8ZsXHUwMDA2XHUwMDAyo/7vt0zS2HghXHUwMDA2nI67XHUwMDA1kVwiKC9cdTAwMWNXna++s1Xx77e9vX172tP3/97b11/rmmk0XHUwMDA22mT/T6d9rFx1MDAwZoZG11KH0PzzsDtcdTAwMWHU52e2bbs3/Puvv9wrsvVu5+0q3dQ7umVcdTAwMGbVef9Tn/f2/p3/V0eMhnOtfYBsfJ7XKqBRyedzXHUwMDBmXHUwMDA3/Vq1NL90ftJcdTAwMGZhXHUwMDA2et3WrJapu4deVTvEhC9cdTAwMWGmqkFcdTAwMDCQXHUwMDE1nCGBXHUwMDA0XHUwMDE3XHUwMDEwY7Y4OjFcdTAwMWF2W53BZVYuvVx1MDAxNme0daPVtlee8ibC33tg0TK0XHUwMDA33Vx1MDAxN/2wa3ZcdTAwMDeOnH9A3flzpaxp9ZfWoDuyXHUwMDFhi3PsgWZccnvaQHWLe17TMM1cdTAwMWJ7Or+76lrVjfu+77h7f1x1MDAwMOhrj7pKfWmrbenD4ZK83Z5WN2ynqyBwW1x1MDAxZFx0e4XGfJT+cWVcdTAwMWFoXHUwMDFkveBcZpM1Ms1Fs2E1dKfz9zWw9G1W4/3bflxmsTt++L3luyu7rjfmQmBOoKDMXHUwMDFkRlfPOKP+1nLXmutcdTAwMWNcdTAwMTaQQ4mI21x1MDAxOcbwSOmaPb9rUzOHujtcdTAwMDSOaMd+PfTqokdcdTAwMWZH7OjopDPWuVnMNauvxs0znvVcdTAwMTePuaSTtv5q7y9cdTAwMGV8f3/ndt+o19De5IGcXCLAXHUwMDAxRVx1MDAxOFx1MDAxMbI4blx1MDAxYdaLv2/Nbv3FfYRvnj7z4Wa1nMsy+iDDWVx1MDAxNjpazSRcdTAwMTGIILRcZiAosnglgFx1MDAxMMwyoF5MjVx1MDAxZIWUIVx1MDAxYURcdTAwMTChqYJcZtpcdTAwMDAy8HMgXHUwMDAzwyGzdPY7NiSVXHUwMDFjSUJkXGI0JFxyXHUwMDAw5lx1MDAwNzSoukyNK99cYlx1MDAxYUlqr6uMjlx1MDAxMqqHR56x61r2jTGbQ5kttZ5oXHUwMDFkw3Q6my1df2BcdTAwMWEt58H360pUfbDvfXrbULyzOKFjNFx1MDAxYV6eqKubaoalXHUwMDBmXG5xXGKnOzBahqWZt0GZtZHdvdaHb1Lbg5Hu7Vx1MDAwYv30h+7DLKIrgNvuj1x1MDAwNmCWh1xy0So81Fx1MDAxZWsvz8O2XHUwMDE5n/AgcVUwXHUwMDFl4Vx0kFx1MDAwNd6XqziRhOfeJFxy6E1cdTAwMTHhoe1cdI8giVx1MDAxMWA8XGbVXHUwMDEweEbXXHUwMDA3a8hcdTAwMTDAUlDgTq1cdFFe5+wuP8s3T3J27sKajlx1MDAwZlx1MDAxZs+Ld6VcdTAwMTRS3mo5V1FcdTAwMWVkckd5X0Z5OD7lqdHBjjFcYsPAgVkkOFxiVaDCnKaP82BcdTAwMWE471x1MDAwM87xc1x1MDAxZUya86p0dtSuktZcdTAwMDW+rvHMtCTKnd7FXHUwMDFhTp5w+8XpJepRhJ1b9zaESVx1MDAwMvZka5ZDXHUwMDEwccS5+lx1MDAxZurWMX/rguSwVIOLKHPHJiGSw9p9oYK0q0mxcVjUarPjXvFcdTAwMDGkkORWy7mK5FxipCtIjnKepVx1MDAwMdTsiG3euiFO8vGJTVlcdTAwMTZcdTAwMTTLUDhIKlwiXTmm/DxA8EZo+P1p7Vx1MDAwM1r5dFrTnvlcdTAwMDTX+vTAyF2Mx5PB66PeKqzhyqFlV07NfDtXblx1MDAwMafPJrnT7V05SCTFVIS6cpIgf6vryVx1MDAxMaFGXHUwMDA2cZw0yTVcdTAwMWaeXHUwMDBlO3f5USd33jy9p6O7sbBFXG5JbrWcKz05ssqTo3znyX0m4Vx1MDAxNeJcdTAwMTNcdTAwMWVETDllXHUwMDAwkTDKU1x1MDAwN3FcdTAwMTQ6XHUwMDEwpFx1MDAwMFx1MDAwMvU/dZyH08B5XHUwMDFmcI6f83DSnFfOVepcdTAwMTmreSqGefR6c09Pr1x1MDAxYvIsXGJd3TSN3tDPeFT48Fx1MDAxYeLI+TnO7cdcdTAwMDVAhXtVXHUwMDFhXHUwMDAwSnztX0hqxbikhqJIjVxihlx1MDAxOFx1MDAwNTgsXHUwMDAyg6JcdTAwMWQ3XHUwMDA0IYdAmTSJO25WXHUwMDE5ty2Syc3IWbX+UqqePdTrT7E47c9Vt61DZJ/Dx/rJ+cFL+6hQOc9cdTAwMWZ79Hjpttpg0J18XHUwMDE5V65+/lVcXMlcdTAwMDDNKqeQMcKYYjqyXGY9gbJcdTAwMWNSrpiSXCKKXHUwMDA1XGZcdTAwMDBcdTAwMTFjhyohXHUwMDEykGNFqDCMKvGvT5WfhMSz+FRJXHUwMDA1I8qUkYFktyNcdFlcdTAwMTErYVxcOYg4fUSJQChTXCKx1PrZTPlcdTAwMDFTXHUwMDA1XHUwMDEyfSBpqlx1MDAxY1dPr1x1MDAxZVx1MDAxZezhLDOyp1fabHI8mTaDyH2bXpahXHUwMDBifFk+XHUwMDE5QpRuy1x1MDAwMo5cdTAwMTSki1x1MDAxOVOEx/LWzFxipUCYYVx1MDAxMjRdXHUwMDFkXHUwMDFkjmZG5ehRgjfy9T5cdTAwMDWlva7hp1333Z7b//NcdTAwMGaL9//8+fHZXHUwMDE5pX/uXHUwMDA1XHUwMDAxmjS1oX3Y7XRcZls9yaUjRWBCtLWBnVODZlgt/zHdakRcdTAwMWOZX3XgoKita1x1MDAwMVx1MDAxNVDXeY8plTZ8wVx1MDAxON2sdSexuPhGNuXp/dNcdTAwMDSRi+Prq5fbTlx1MDAwN806ySFaoFx1MDAxMEzvXHUwMDEwXHUwMDFkgeiLrVx1MDAxMc0gXHUwMDE3kMnQ2jNcdTAwMTFZelx1MDAwNrlcdTAwMDI0XHUwMDE2kidcdTAwMTmVhZJcdTAwMTNcdTAwMDGF8n2/XHUwMDFj0lx1MDAxZS30nf9cdTAwMWJcItqoTO+Nl3zlVnHzXf2uXnyYTWNFolx1MDAwNPTb00E8Q4KyyPtyScBccjalXHUwMDBi35FcdTAwMDHaRtdcdTAwMTnjn1xu8Ks1ok1cdTAwMThcbqqm2PDCgehgXHUwMDEzVD6roJKmL8FSuTm+zsCIijm01Lq1IW13e1FW9JLIfpM5IOOS3bz8XFzrXHUwMDE4zrI4tiYvXHUwMDA3k/NGP1M0JdLuz59Dylx1MDAwNcJAibPC83KHdeqj11x1MDAxZERcdTAwMTOB6PVcdTAwMWFcdTAwMTAlmDglOmHWM/RgJZAsUVxmrY5vVun9+Vx1MDAxMMVcdTAwMTGZ0DRBXHUwMDE0R2U+N4foy+vltDq+bdd6pHFROkbNXHUwMDEzbfRcdTAwMTA/9cmgO5y7ZVx1MDAxYj/bcr7dPvWJIFx1MDAwN465XHUwMDFhxrdcdTAwMWNH1qZDiSHiinFcdTAwMTNcdTAwMGZcdTAwMTNcdTAwMTMjc3QpzzXjhlx1MDAxZlxc6IcjdnBcXK6mMPW5Ws6V4VxcXHUwMDAydkWsX5b6rKxR66PGh6k+XHUwMDBlLVx1MDAwYsCRNaxqzFx1MDAxOJBcdTAwMDCkj+hSUezzXHUwMDAx43x6sc9Zu6dcdTAwMWKH6Fx1MDAxMfePZH50Vmp1uzeNIHCdWyZcdTAwMTPMRekyO1NcdTAwMTT5qcblr+hYLqZcdTAwMDRAXHUwMDE0XFxEtb+U01xueIuScsg4TDKYXHUwMDBiJUdcdTAwMTQogahXLb8+mItSXHUwMDFm+Fk61uua09Z8/D5iYNYsXHUwMDE3XHUwMDA2+adSo9w7tp+PWe6hOLqKxcDAV7AnhatcYj+ATEKQvMuSRiH5Lj6rXCJcYpFcdTAwMDDM69R7aDW62I5whVpJpFx1MDAwYvWEMOtcdTAwMTQ8uFxct1x1MDAwMa3+nCzppp5jVDp0c5+xol80arytXHUwMDE1LrSTjFEoXHUwMDBm7uF5L1x1MDAxNoNcIrFj0ERxd789g1xux5fjXHUwMDFjhMVzgkV/njohjFx1MDAxMP5cdTAwMDQ47ijUc9UnU+htzTTG7WKNl05rt6X7xuRsXFyyY1EoYjtcbk1cdTAwMTbKXHUwMDBma1CoJJBgXHUwMDA0QktyYfTKY1xmkVx1MDAwNDDZLEkyXHUwMDE0XHUwMDFh7pmmhUKTXHUwMDBmuz49lSs2gmVdU4NzOmpmWvbNYyxcbqV+23VHoe+tXHUwMDFi4u5xe1xulZRRKbxToItHXHUwMDFh7YVcdTAwMDIguFx1MDAwNIDuKPRcdTAwMTem0MtcdTAwMTHta/r4Ulx1MDAxM1x1MDAwZlx1MDAwNavXXHUwMDE1/VKHxcpxQm+R345Ck4CytkZcdTAwMTZTma9cdTAwMTJcdTAwMDOAQtOYMHIlJ1d4RYyzJIO7yVBoeIlBWig0+eJcdTAwMDJcdTAwMDb7g1L+qXXVpVx1MDAxOLDLu2JcdTAwMGWCkFx1MDAxYb5cdTAwMTBcbuV+23VHoe+tXHUwMDFi4q6WgFx1MDAxN0pcdTAwMTlDhIRcdTAwMDZyWSA5uaBQiqCz8oHvKPRcdTAwMTem0CfradqTVuFpVNGH0+pBuSTRNFx1MDAxNoVyf0ZmR6FbQrm+XHUwMDA2hTJO1Vx1MDAxZpDhK0MjK4FcdTAwMDSRnFxugVx1MDAxMk++bE2h4Vx1MDAwYkPTQqFRS0A3p9ApKKNcdTAwMTdcdTAwMTPMznrV/rh4/oraaHRcdTAwMTdcdTAwMGJ4/lQo9FZsLsrgoa+QJ1xixF1JXiRcdTAwMTKbayBcdTAwMTEzLpyivDAkomBkd7EtiYRcdTAwMDI4i0RcdTAwMTNEYjKVXG620dH3/jPUXHUwMDE1PFx1MDAxYcP/proyL0rUpMzcQWd2PHhuV05PtSNycmNf32dI7Fx1MDAwMj2/qYtDTN2lrSg9a7N/XHUwMDExaK7JkVvjsr1cdTAwMGVDOsXs4etSYLB1YdVcIkZcdTAwMTBcdTAwMDJcZiZp1SZd/6a320eWTawqXHUwMDE592mtXCLL13f6SWy9hMCXXHSkJKiYXGZ/yFx1MDAxZjBlTllSWkq3VlNjLTVlXHUwMDAwXHUwMDAzXHUwMDE5XFwqNe/hyFhcYqSAci5l4nt333ZcdTAwMWKHh3TSfLYz+UrvcnBjWa9umHZJv75qTX94eGEpXHJcdTAwMTNtXHUwMDAwM1xmKFTixOi3t1x1MDAwZSnoI5S5grmjSlx1MDAwNtHH06ldu1x1MDAwN9WU6OdcdTAwMGbY4ayzc51EzNlKSXLpXHUwMDFhXHUwMDE1oUFcdTAwMTfvOis/Tt82XGZZXHUwMDFmp1x1MDAwYkZcdTAwMDOhhFx1MDAxNtB7XHUwMDAxXHUwMDAxZVx1MDAxMnpcdTAwMWOYcLCtNcdiwNBGXshmXHUwMDBlekzHf0NcdTAwMTf+s2JcdTAwMTnkw9PZ2ucvbzdcdTAwMDP/XFx18KO7eTRo/tG/XCJcdTAwMDGvJ34mcFx1MDAwM7beXHJwllLE53a+MswogHpcdTAwMDbStXp8VfDIZY1nz6RcdTAwMTLOMJFkska8ZzLtQTOT79B2r1AqVqTWXHUwMDFihWw9XHUwMDE013Tgi/VcIqprXGLxbDm6MCTCXGZcXGU6vCtcdTAwMDWUklLsWVx1MDAwN7qzJLw68bKGJSEwhVhiXHUwMDFlmlVcdTAwMTGRllx1MDAwNFOaTSkkX+6Ibsb1JLhz4GIxgIKtcrBjPdlcdTAwMWI6Xp+O6OlQmEclfXJ8Mu7wXqtTSYlcdTAwMDbF53p3vvX+gMXbXHUwMDAylyW48kiM/SwrXHUwMDAwIC5cdTAwMTBcIjxZK4BcdM9WITsrIPTsnVx1MDAxNZBOK8D8XHUwMDE5VkD/Tr/nonGUXHUwMDAz08PrXHUwMDAzWOTlfqFcdTAwMTK0XHUwMDAygvlb4aub4GHLwUPB/4vEXG7WXHI2b0/xnTUoXHUwMDFlXG5cdJ1NXHUwMDAyw4JcdTAwMDU0cvckiVx1MDAwMJPCu4voV4eaXHUwMDEzndagb6r4XVO1qyMzKzdOgv6In2e79FW52t81xJdAimhcdTAwMTJcdTAwMGXbdeoulO0jXHUwMDExpjjMYMeRVq2gTrSQJlp2kVwiNFx1MDAxM1x1MDAxZm0nW3nhXHUwMDE5nvdcdTAwMWbVLMSIqDv60K2PnEfMgCxz8ufUqVx1MDAwM1fGNnHdLUeXtN7c2VxuXGb5XHUwMDFhM0fYVkwr5o/42zSdTvpNu4lzV3f1We2RnbzWyqOYXHUwMDBiXHUwMDBlwPJcdTAwMGaEYk/UbsH6jM8tJIBcdTAwMDXlXHUwMDA0eVx1MDAxZM60ln6kaDaZrmFcdTAwMDQgJqmz31LIrFx1MDAxMb1cdTAwMDWMXHUwMDFhXHUwMDE0ypw8dWpmjVx1MDAxZunmXHUwMDFizdT3+iPNsp3edLssfUUgXHUwMDExkiazNH71Xsor2Z0p34ZcdTAwMTLmbKdcdTAwMDdcYobMt6lcdTAwMWFFWcU0XHUwMDE4K0ebirAqLf/lYZtacN9N3Fx1MDAwMoBcdTAwMWSSl5A8294uXHUwMDAwXHUwMDEwYcfKXHUwMDBmXHUwMDAxuIysJ+FcdTAwMTJQwrlIz07GiZpcdTAwMDWZaCWdXHUwMDFmXHUwMDBl6GeSVkO0XHUwMDEzXHUwMDEwZU/kL2XbalxicPZU1Eon1WthWGfPYfZcdTAwMDTIOtvnOT9mKZXskkomPGe92Vx1MDAxMzSoKT/NaFj9ICt/SsuXdCQhVVx1MDAwNlx1MDAxOKAsJFx1MDAxMkkopTJcdTAwMTl4+ndHT89MU1x1MDAwM1x1MDAxMb9cdTAwMWNcdTAwMWVqNFBcdImzYVGY1Vx1MDAxML2dXHUwMDBlXHUwMDE0kipLbkOzYaHHqf/pgIBRUrFGw5FmmtO9ttLDX8M8+VDmtVxylW/vvbyv9Xo3turjxYy3Pzb0SS6Irj+a85czWvP5xEGWPue/79++/1x1MDAxZolNuaoifQ==211320USER-12USER-3110123time (seconds)Sale quantityUnusually high quantity \ No newline at end of file diff --git a/docs/anomaly-detection/assets/useful_match.excalidraw.svg b/docs/anomaly-detection/assets/useful_match.excalidraw.svg new file mode 100644 index 0000000..b376ba9 --- /dev/null +++ b/docs/anomaly-detection/assets/useful_match.excalidraw.svg @@ -0,0 +1,5 @@ + +  (seconds)AVG() = 2AVG() = 1.520 > (AVG() * 3)1 < (AVG() * 3) \ No newline at end of file diff --git a/docs/anomaly-detection/assets/useful_query.excalidraw.svg b/docs/anomaly-detection/assets/useful_query.excalidraw.svg new file mode 100644 index 0000000..5622fa2 --- /dev/null +++ b/docs/anomaly-detection/assets/useful_query.excalidraw.svg @@ -0,0 +1,5 @@ + + +eyJ2ZXJzaW9uIjoiMSIsImVuY29kaW5nIjoiYnN0cmluZyIsImNvbXByZXNzZWQiOnRydWUsImVuY29kZWQiOiJ4nO1da0/jyFx1MDAxMv2+v1x1MDAwMrFfdq823n4/VrpXguFcdTAwMTVIeFxmMFxmXFytkJM4iSEvXHUwMDEyh1x1MDAxMFbz3291ILHjR3BcdTAwMTJcdTAwMDOZq1x1MDAwNGlcdTAwMDTttlPurlOnqrq6559fNjY2vWHH2fxrY9N5KttccrfStVx1MDAwN5t/mPZHp9tz2y24REZ/99r9bnnUs+55nd5ff/7p32GV282Xu5yG03RaXlx1MDAwZvr9XHUwMDE3/t7Y+Gf0L1xccSuje1x1MDAxZvpd9LyPK6qWvy7dlO7vevXG6NZRp7EwXafs2a1aw/EvPUE7xlxiTVx1MDAxYYbQoFx1MDAwMn9cdTAwMGbcildcdTAwMWa1WSj4kZNcdTAwMWV1x63VPehcIrWlg1x1MDAxZjHp8vKlf234j+153fa986XdaHeNZL9ix/z4cpXs8n2t2+63KpM+Xtdu9Tp2XHUwMDE3XHUwMDA2wu9XdVx1MDAxYo1zbzh6OlxmJlxm3GboO65eX1x1MDAwMIfak+6CL63VW06vNyVvu2OXXc9cZk5wsIyEnXxlNC9/+zJ17aaTN1x1MDAxM9PqN1x1MDAxYZNmt1VxzHBv2mTq21qV129cdTAwMWJPqj9j9LXlhy+745hcdTAwMDdjRjQlSEg9ueJrXHUwMDE2RoyEm4/brZGaYUE0wYxcdTAwMTLqy9XbXHUwMDAx/fJGz63ajZ7jT4JcdTAwMTFuN6x7Qf1cdTAwMGLoYPPoav95v7q37W2ftIaPX26Kh1eFyYtO6aHnPHmbk1x1MDAwYj9ef/NcdTAwMDew36nYL/JgyVx0kjDkVGo2ud5wW/fh0W20y/f+K/xcdTAwMTJcdTAwMTi1XHUwMDEwVmbLOS1jXGImRFt4pNiaKcJcYplcdTAwMDZccpZcdTAwMTaPwIZgS1x1MDAxOLhcYowox1xcXHUwMDEwXHUwMDFlxVxy4ytcdTAwMDVcdTAwMTSyXHUwMDAwUPD7XHUwMDAwhcZcdTAwMDNlqvcrXCJgRqjiQuI4QFBBXHUwMDEzXHUwMDAxQaSmVFMuXHUwMDE3XHUwMDAxxEydlTSAw7d11ldBo3rw+lx1MDAwMTNRbbe8c/d5ZFx1MDAxM8RU657ddFx1MDAxYma4xdT9W1xyt2ZefbNcZqI63c3g63suMMykQ9OtVIKMUIaH2m7L6ebTUEu769bclt24iMps9732V6f3XCK11+07wbFwXHUwMDBlxtqPLcJnwPXx28HZ9bXXe871veGZ/TzYXHUwMDFkXGarUbja3W57XHUwMDEwwiti07SmRZTW/JZcdFx1MDAxY4nyXHUwMDFif1Y8vlx1MDAxM3FcdTAwMWSnJS6SSFxcXHUwMDFhcCooi1x1MDAwMtKosFxmN45Rylx1MDAwNVx1MDAxN5gzulx1MDAxMGslg1x1MDAxNFx0LUGgRUDaabthUvR/2/AnYPTH5Pe//3i7d1x1MDAwZVx1MDAxNNC/IUKOXHK7531pN5uuXHUwMDA3b3JqpIhYRM/uetswa26rXHUwMDE2vua0Klx0V0Z3bVx1MDAxOVx1MDAxONVcdTAwMWQ7olx1MDAwM3Bf8Fx1MDAxYei0XHUwMDFi8iSdRqk9SMXA57qqXHUwMDBmvt9cdTAwMGVcYjvZ/Xp2f9FskudmdpBWJFx1MDAwNtRrSCdA+mRpSFx1MDAwYixcdTAwMTVcdTAwMTY6XG5eM1x1MDAxNzzcOOFdSSSnSlx1MDAwNzzY5SGNtWRcbivM/WH5LEhcdTAwMDe0MNT//1x1MDAxMNH68LE1uN9cdTAwMWFcdTAwMTQrXHUwMDBmucOGJvb34t1JXHUwMDFhn1phaqnAx1eGYVxiyWN0Q+RjkeDHp1x1MDAwNN+hXi20J0aelbaZ8Vx1MDAwZoX71/RcdTAwMWU1RImMUFx1MDAwMGmcR81cIoHnXHUwMDA02Fx1MDAxY8hcdTAwMWFxXHUwMDE1gH5WXHUwMDBlNVx1MDAwNF9cdTAwMGJcdTAwMDF77FBfnu9+zZFcdTAwMDS3mky1Lu1We+1Okk89JXLYgY7IOOVGT7/XPH70/dPp8NvjRb3UYZWTwi6p7tn96/QpXCKBfS1+TVx1MDAxMVmMKsxASzRcdTAwMTJCRYBcdTAwMWHOXHUwMDA2+Vxuk5gw8rusNGw/nqQvlk9cdTAwMThcdTAwMTEskWHGuPhYUlx1MDAxNW6dON5MKSxcdTAwMTlbiKUn4sWki5ib2znVRds9l1snzpe+2No9/pZNumh+r35cdTAwMDZuZss5K10kXHUwMDE4eiNdpGdcdTAwMDJonTpcdTAwMWG1Llximcv0RKdgflx1MDAwNIxxXFwuVdPEVCpBWiEtXHUwMDExwYtgY1x1MDAxNtExglx1MDAxMZ5DfSNEt1x1MDAxMpmjN1x1MDAxOOfdM0dH9Y7jfiE39GFH7/ePXG61dvu8XHUwMDEyRa55ZEaJo9VyO1coyPyWlr+S80aUM4RcdI9cdTAwMGIyaXKQiTWXWEicZd5cYoJMwlx1MDAxMVxixINq+fl5I7LyMebUtU67MayN5u8tXG5cdTAwMTbV43x3/7ZQOe7sene7Yvv6sH+WioJcdTAwMTFcdFx1MDAwMVn5ijBcdTAwMDYyi0Ey/flp9Z2QfJWeVlx0xkQhXHUwMDExXGbqXHUwMDAzvJq8QskkoFYzneWCzFxis1x1MDAxOKjV57pcdTAwMDVoXHUwMDE1xdIqUVOtn1x1MDAxNjmiXHUwMDA0XHUwMDAyXTxmvHROKiVZt/Mn9l7OzVx1MDAxZne/42InXHUwMDE1g1x1MDAwNtdQ1lxmmlx1MDAwMe6+L8+gXG5riqVEcflcdTAwMWOWuPJcdTAwMDIopoTQd4DjmkJcdTAwMDN3vTOFXpRcdTAwMWHuY/2wJFx1MDAwYlx1MDAwN6WLwvfK4Oix4KWiUFwi1lx1MDAxNJotlK/noFBt0lx1MDAwM1x1MDAwNMV5vVx1MDAxOKNw64RCKSZcdTAwMWFcdTAwMTlcdTAwMTZdOVxuxStNoThzXG69vT2+9FxiPnZsmJyDfjVX885vUlEoXHUwMDBm+65rXG59bV1cdTAwMTB3N8tTqOaCa1x1MDAxNTSBPlx1MDAxZXlyXHUwMDE0ipCSXHUwMDFhoUxLjNZcdTAwMTT6wVx1MDAxNHra51x1MDAwZrbzeGqr63yr01ZcdTAwMGaFpki1xok5WlNotlC251jFXHUwMDA091VThEjsMiZOXFz4kIBXXCKkyDK5m1xyhcYnd1eFQrNfuVx1MDAxNPihW9i/rZ21OUXi9OpwXHUwMDFio5hyoVx1MDAxOFxulWHfdU2hr61cdTAwMGLirpRBXHUwMDE0yoUgjMUmckVkcdIvKiBYXHUwMDEyJeWaQn9iXG69bd1cdTAwMGU7upW/7V86veG3reOCJsNUXHUwMDE0KsMrMmtcbl1cdTAwMTLK5TkoVEhcdTAwMGU/SMeGoYSFW8egVUxLrtRiW03elULpSlMozZxCh+iY3DfQ81Hn28PjYfGJ1En/Klx1MDAxNfDCS6FcdTAwMThcdTAwMDV2WEwqbnGokCdcbsR1SV5cIlx1MDAxMqtzIJFcbqlMUV5cdTAwMWNcdTAwMTJcdEpEXCJcdTAwMDZcYmtgXZVlQmi5+vkxXHUwMDE0O/1uuW73nFvPbTpcdTAwMWK/9Vx1MDAxY1x1MDAwMEql9/tK1+i9KXNWnm+3+bzbvatfXHUwMDFlXHUwMDFj2Dts79z7+j3HUtfsSUEspqjAhGhcdTAwMDV+l2/dRmNcdTAwMTazx1x1MDAxM+M4XHUwMDBl5eInwe6cJLo0cOvzUKjiiMXXyOPonjW/xFxiXHUwMDAxyJDiWZdcdTAwMThJJLnIqELudjDs4EZuv8nrnXzh8FLbnf5R+spSXHUwMDE0WiqUk6pQhVx1MDAxMGNCR9RUxGkpJuNccspYa84pZv+XSsuX1tr7ObRWUY6ppjI2d6JcdTAwMTPVllx1MDAxMsqZwHQhrX29XHUwMDEwWzS69/Bl//TwXHUwMDE2i8smpi3Jhzl6uVx1MDAxNV80+rJ7aHIlxSZjKsFYLo2J+HREsHI+oO+hcdOYXCKiNUlcdTAwMTPkvlxmyNPtXHUwMDBlP+ipxk7BXHUwMDE57O49NmWn1rxcXFx1MDAxMf1cdTAwMWPDjlqYYa6J4JSb/T6+Mo2TNP7JXHUwMDAyTIbKzPGUMZAhKX1cdTAwMDS/3D4/gifUh2JsSmCmxohARCpCWMBcdTAwMDGLh+HHOFGLhfopU1xiXHUwMDBiJlx1MDAwM94rK8Le7C7m7j99pFx1MDAwNf5j1sW3nlx1MDAxNtCg0Z/hvVxydD7xc5FcdTAwMDeI+Vx1MDAxZUAtzolcdTAwMWNFXGaAXHUwMDFkjrCTw3yuXHUwMDExn5WG8vkksFx1MDAxOTyBe1x1MDAxMmkmfeZcYlx1MDAxNc9J1yuWbq5quf37VitXPT4vpvcxXGJcdTAwMGVcdTAwMTXfr3evpDXoy1x1MDAwN7glhONdjnm2r8BkgbVkcY6IVFwi3Or7z5IyTKRcXGgv2ixP5Kh303Td40GNnnpcdTAwMDfXPXWzXbkpx3tcInNuX1x1MDAwMTZgWMx1csRcZuDMlnNm4Vx1MDAwZsPr7SuftX2lhFx1MDAxMs5cYop30zWhlMhYdOhkd1x1MDAxMzNcdTAwMDKQ0lx1MDAxNGWcXHUwMDE1wshcdTAwMTRez3NcXI+vjzPLhD54XHUwMDAzy1x1MDAxYqSTtnho4VxyLK3a8N6uN/H2IJ/fuTugj832Q8xJRUmkR1x1MDAwM9mGNel9OOklnF00XHUwMDBm6UFsoJHULFJZZCZCJpYuXHUwMDEwXHTzyznKPPy+O3i4u9u57HRFe49fX1x1MDAxZLhcZnVcdTAwMWKZkFx1MDAxZdhcZlwiZOAsiKVIb7acs0iParEmvc8jPTZcdTAwMDfpSYUpzFx1MDAwMY0lveSUKlx1MDAwM3ZiXHUwMDFhZX86XHUwMDAxpWixQoKV2rT5XHUwMDA257z7ps3ZXHUwMDE5vplnXHUwMDAzXHUwMDExriz4XHUwMDA3McRcdMXK9z9esspcdTAwMTSQSSBcItZcdTAwMThmSvnUOIYu5ZYkimCBXHUwMDAx4lx1MDAxMjwq/1x1MDAwMX45XHUwMDAxtjQzi8/GvGopfeysXHUwMDAykpNcdTAwMTc17V79Y1x1MDAxNzVLaH/pUiFcYoQkNmVcdTAwMDcxXHUwMDAwZzhxw1xu5VxuK7TQjs+pd4kgnIFTXHUwMDFiqLn/rFKhXFyyor5cXFx1MDAwZeuo/8BcYlx1MDAwYs9dTFx1MDAxNJi+1zNp8ynWgEa2q9w3I5BDXHUwMDE2lkwoRJlcdTAwMDRcdFx1MDAxNVx1MDAwMJZcdTAwMDS61eyOmXmLQlx1MDAxMCyxUtJcdTAwMDT+XG5HVGQquZkkVPnk6pJcdTAwMTWc4e1Nv7vbe6KFh25LxVx0ZfxiI5M5XHUwMDE5lXNNuK89XHUwMDEzmZRFXHUwMDE0XHUwMDE4XHUwMDE2RYDfXHUwMDA1xTrGa1x1MDAwYidN445UmpE6TX/c0uxcdTAwMTebfYQpsjhcdTAwMTVcdTAwMDSGXHUwMDFm9Fx1MDAxOdNp/0ZoZVHJlKZcXIOXI6P+XHJcdTAwMTbaYlxigVYhUDJcIrhPQ6tac7VSVvEgvYOjXHUwMDA0ZUqAXHUwMDEzXHUwMDE5Y/5w8n49XGLqlX5cdTAwMDff/52X3jhfzLSOnaetb/u//b7x71xyabFYNyqptktNPen9S0di5czGdVKdut3Yblx1MDAxNYroZoiva/Vujlbz6dNcdTAwMDVMgExcdTAwMDBoaeqEXHUwMDE0nj6JTUqwXHUwMDFiSENESlx1MDAwNVx1MDAwYuzd9itHpGWMXHUwMDBillx1MDAwMu6WMsYsYMYsXCKUyTlRylx1MDAxOVx1MDAwYtSjrM3ExrSZyM9jJlx1MDAxNFx1MDAwM1wi1XFmQidcdTAwMWbSplx1MDAwNaJKg6uctZ2gxWqluHPq3jXdIzbc/YpK5cv7zOxcdTAwMDSWwc2QSyVcdGZcdTAwMGI6M9LgWFtcdTAwMTBIcHDBmKJcbk3n2aTgXHUwMDE2ReDkXHUwMDAwiUqhXHUwMDAzXHUwMDAx3CTUkFx1MDAxNlx1MDAwMedGUVxyvlxyXHUwMDBlVlx1MDAwNVx1MDAwNZJcdTAwMDSWxEJcdTAwMTBpjpFcIkz8JGfpf1x1MDAwNlhcdTAwMGWXjjRcYlx1MDAxOC5cctNcdTAwMTZ3Nlx1MDAxYYlcdTAwMWWx71dVmkBcdTAwMTLs3TvEXHUwMDFhlJJcdTAwMTU4lzhRUc0nXHUwMDE30VH/cVx1MDAxMShnXHUwMDE2acxmuelIg5tcYpIgZmZXMVx1MDAxYVjff3XquVx1MDAwNTMvXHUwMDE4+Fx1MDAxZpJcdTAwMTDB+IKBxl7+ol53Lr7e7Fx1MDAxNo8wq+3Rc7dXTJDJXHUwMDFjiFwiMNKcgWBcdTAwMTT5dntcIlx1MDAxNLVM4IPAP+cm2YhcdTAwMTiP6u2HhVx1MDAxYbNfbVaowVx1MDAxOLKElJxcdTAwMDE/wVx1MDAxMPtcdTAwMDZsXHUwMDE0aShcZnFcdTAwMDSBgYdIXlx1MDAwMoxcIjZcdTAwMTL0XHR6gE9BXHUwMDExhGYyJo+6jjOSbeJRelx1MDAwN1x1MDAwMlxcN61cdTAwMDWmsXmWYOFEZFezNodcZqDs19bf1YVY3LJONo+gjf9s/PbiyP9rg8aXqq9KvJEsbDZBx8mgr1x1MDAxYlx1MDAwNbTPz65b9fsnsi++92O2dDmNhtvpRUJcdTAwMGUlLWGWU8DSXHShXHUwMDAzJ9O8rFfil/VKXHUwMDA0Pq5cZlx1MDAxZUAzWa9cdTAwMTRwOzJcdTAwMDfDUZP4jcvYytWKMlio/TNcdTAwMTcoXHUwMDBic9hcYiGASKdcdTAwMGVrXHUwMDBmXHUwMDFjv4eSN1GDeYe5yfL02FjGXHQoZJWcXdj8/KxW4/KiYp+f7Fxc5EpzWVxiaf5vJVx1MDAxNUTD4kFGvDRp6JMjZlHzP3NBpEaUJKG1fIi3lUDgtFx1MDAxYc8rWPkx2VxySS2ze1x1MDAwMFx1MDAxM1NhauBcdTAwMTFcdTAwMTdjrFx1MDAxNDZWikCL6cHBJYJZkCp2T1byQiRcdTAwMTaUgGMkXHUwMDE3OlhrdvRg9neQpVx1MDAwZZCl8UfdffBi5Fx1MDAxYuRcdTAwMTLluiXY7ZdXm7BpdzrnXHUwMDFljOfE3998dJ3BdlTdf62OPsa2jMBvVN1cdTAwMTlFnj9++fE/gPiHXHUwMDE5In0=2USER-2220123purchase_time (seconds)12AVG() = 7.430 > (AVG() * 3)30 \ No newline at end of file diff --git a/docs/anomaly-detection/index.md b/docs/anomaly-detection/index.md new file mode 100644 index 0000000..43344c9 --- /dev/null +++ b/docs/anomaly-detection/index.md @@ -0,0 +1,322 @@ ++++ +title = 'Anomaly Detection' ++++ + +> Note: This tutorial is mainly focused on anomaly detection. For detailed information on working with [Flink ETL Jobs](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/learn-flink/etl/) and [Session Clusters](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#session-cluster-deployments), look at the [Interactive ETL example](../interactive-etl/index.md). + +[FlinkCEP](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/libs/cep/) (Complex Event Processing) is a Flink library made for finding patterns in data streams e.g. for detecting suspicious bank transactions. It can be accessed in [Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/overview/) using the [`MATCH_RECOGNIZE` clause](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/). In this tutorial, we will use Flink SQL and its `MATCH_RECOGNIZE` clause to detect and report suspicious sales across many users in real-time. + +The tutorial is based on the StreamsHub [Flink SQL Examples](https://github.com/streamshub/flink-sql-examples) repository and the code can be found under the `tutorials/anomaly-detection` directory. + +## Setup + +> Note: If you want more information on what the steps below are doing, look at the [Interactive ETL example](../interactive-etl/index.md) setup which is almost identical. + +1. Spin up a [minikube](https://minikube.sigs.k8s.io/docs/) cluster: + + ```shell + minikube start --cpus 4 --memory 16G + ``` + +2. From the main `tutorials` directory, run the data generator setup script: + + ```shell + ./scripts/data-gen-setup.sh + ``` + +3. (Optional) Verify that the test data is flowing correctly (wait a few seconds for messages to start flowing): + + ```shell + kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ + ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink.sales.records + ``` + +4. Deploy a [Flink session cluster](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#session-cluster-deployments): + + ```shell + kubectl -n flink apply -f anomaly-detection/flink-session-anomaly.yaml + ``` + +## Scenario + +### Source Data Table + +The data generator application creates a topic (`flink.sales.records`) containing sales records. +The schema for this topic can be seen in the `data-generator/src/main/resources/sales.avsc`: + +```json +{ + "namespace": "com.github.streamshub.kafka.data.generator.schema", + "type": "record", + "name": "Sales", + "fields": [ + {"name": "user_id", "type": "string"}, + {"name": "product_id", "type": "string"}, + {"name": "invoice_id", "type": "string"}, + {"name": "quantity", "type": "string"}, + {"name": "unit_cost", "type": "string"} + ] +} +``` + +> Note: All of the fields are of type `string` for simplicity. We'll `CAST()` `quantity` to an `INT` later on. + +*(Assuming you have the data generator up and running as per the instructions in the [Setup](#setup) section, you can verify this by running the following command):* + +```shell +$ kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ + ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ + --topic flink.sales.records + +user-11188&30299767430689348882 +£838 +user-9467&63188787247555258843 +£971 +... +``` + +| user_id | product_id | invoice_id | quantity | unit_cost | +|---------|------------|---------------------|----------|-----------| +| user-11 | 188 | 3029976743068934888 | 2 | £838 | +| user-94 | 67 | 6318878724755525884 | 3 | £971 | + +### High Sale Quantities + +We want to detect if a user ordered a higher quantity than they usually do. This could be a sign that they made a mistake or their account has been hijacked, which could cause troubles for all parties involved if not dealt with promptly. + +![](assets/scenario.excalidraw.svg) + +## Data analysis + +### Setting up the Flink SQL CLI + +We're going to use the Flink SQL CLI to interactively try various queries against our data. + +First, let's port forward the Flink Job Manager pod so the Flink SQL CLI can access it: + +```shell +kubectl -n flink port-forward 8081:8081 +``` + +The job manager pod will have the name format `session-cluster-anomaly-`, your `kubectl` should tab-complete the name. +If it doesn’t, then you can find the job manager name by running `kubectl -n flink get pods`. + +Next, let's get the Flink SQL CLI up and running: + +```shell +podman run -it --rm --net=host \ + quay.io/streamshub/flink-sql-runner:0.2.0 \ + /opt/flink/bin/sql-client.sh embedded +``` + +Once we're in, we can create a table for the sales records: + +```sql +CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND +) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' +); +``` + +We can do a simple query to verify that the table was created correctly and that the data is flowing (give it a few seconds to start receiving data): + +```sql +SELECT * FROM SalesRecordTable; +``` + +### Classifying "unusual" sales + +There are many arbitrary ways we could use to define an "unusual" or "suspicious" sale quantity. + +By looking at the data in the `SalesRecordTable`, we can observe that users typically order quantities between `1` and `3` (inclusive): + +1. Fetch all the sales and group them by user: + + ```sql + SELECT + user_id, + COUNT(user_id) AS total_sales_count + FROM SalesRecordTable + GROUP BY user_id; + ``` + +2. For each user, take all of their sales and calculate the average quantity: + + ```sql + SELECT + user_id, + COUNT(user_id) AS total_sales_count, + AVG(CAST(quantity AS INT)) AS avg_quantity + FROM SalesRecordTable + GROUP BY user_id; + ``` + +However, we can occasionally see sales with much higher quantities. Usually, between `10` and `30`. + +If we wanted to, we could simply classify any quantity above `3` as "unusual": + +```sql +SELECT * +FROM SalesRecordTable +WHERE quantity > 3; +``` + +> Note: This query might take a couple of seconds to return results, since the sales with high quantities are unusual. + +Of course, this wouldn't be a particularly good measure. Specific users might always order higher quantities than the average user. + +A more useful measure would involve calculating the average sale quantity of each user, and considering a quantity "unusual" if it is, for example, 3 times higher than that user's average. + +The following query will return sales that match that condition: + +```sql +SELECT sales.*, user_average.avg_quantity +FROM SalesRecordTable sales +JOIN ( + SELECT + user_id, + AVG(CAST(quantity AS INT)) AS avg_quantity + FROM SalesRecordTable + GROUP BY user_id + ) user_average +ON sales.user_id = user_average.user_id +WHERE sales.quantity > (user_average.avg_quantity * 3); +``` + +![](assets/useful_query.excalidraw.svg) + +While useful, the query above has several flaws: + +- The "unusual" quantities are included in the `AVG()` calculation and skew it. + - Ideally, we would not only exclude these sales from the `AVG()`, but maybe even reset the `AVG()` after an "unusual" sale. + +- The `AVG()` calculation uses all of a user's sales, which might not be ideal. + - This calculation could use a lot of computing resources, depending on the volume and frequency of sales. + + - Limiting the `AVG()` to sales made, for example, in the past week, might be more useful. + +- The query would become much more complex if, for example, we only wanted to return a match if two "unusual" sales occurred one after another. + - We would likely have to use some elaborate combination of [`WITH_TIES`](https://learn.microsoft.com/en-us/sql/t-sql/queries/top-transact-sql?view=sql-server-ver17#with-ties), [`OVER`](https://learn.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver17), and [`PARTITION BY`](https://learn.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver17). Assuming they're even supported. + +`MATCH_RECOGNIZE` lets us easily and concisely solve these problems. + +## Using `MATCH_RECOGNIZE` + +### Simple pattern + +We can use `MATCH_RECOGNIZE` to easily look for both simple and complex patterns. + +For example, you can match any sale with a quantity higher than `3` like this: + +```sql +SELECT * +FROM SalesRecordTable +MATCH_RECOGNIZE ( + ORDER BY purchase_time + MEASURES + SALE.quantity AS unusual_quantity, + SALE.purchase_time AS unusual_tstamp + PATTERN (SALE) + DEFINE + SALE AS + CAST(SALE.quantity AS INT) > 3 +); +``` + +> Note: [The `ORDER BY` clause is required](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#order-of-events). With streaming, this ensures the output of the query will be correct even if some events arrive late. + +We `DEFINE` a single `SALE` ["pattern variable"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#defining-a-pattern) for our condition, then include it in our pattern. We then output both the quantity and timestamp of the unusual sale. + +### Pattern navigation + +Maybe instead, we want to look for two sales with the same quantity occurring one after another. This could be a sign that the user accidentally made the same order twice. + +For now, let's simply match two sales occurring one after another, that both have a quantity of `2`. The [`PATTERN` syntax](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#defining-a-pattern) is similar to [regular expression syntax](https://en.wikipedia.org/wiki/Regular_expression), so we can easily extend our pattern from above using ["quantifiers"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#defining-a-pattern) to accomplish that: + +```sql +SELECT * +FROM SalesRecordTable +MATCH_RECOGNIZE ( + ORDER BY purchase_time + MEASURES + FIRST(SALE.quantity) AS first_unusual_quantity, + FIRST(SALE.purchase_time) AS first_unusual_tstamp, + LAST(SALE.quantity) AS last_unusual_quantity, + LAST(SALE.purchase_time) AS last_unusual_tstamp + PATTERN (SALE{2}) + DEFINE + SALE AS + CAST(SALE.quantity AS INT) = 2 +); +``` + +![](assets/first_last.excalidraw.svg) + +Notice how the `SALE` pattern variable doesn't simply hold one value, it maps to multiple rows/sales/events. We're able to pass it to the `FIRST()` and `LAST()` functions to output the quantities from both the first and second matching sale respectively. These functions are specifically referred to as ["offset functions"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#logical-offsets), since we use "logical offsets" to navigate the events mapped to a particular pattern variable. + +### Useful pattern + +Finally, let's use some techniques from the useful measure/query in the ["Classifying "unusual" sales"](#classifying-unusual-sales) section in a `MATCH_RECOGNIZE`: + +```sql +SELECT * +FROM SalesRecordTable +MATCH_RECOGNIZE ( + PARTITION BY user_id + ORDER BY purchase_time + MEASURES + UNUSUAL_SALE.quantity AS unusual_quantity, + UNUSUAL_SALE.purchase_time AS unusual_tstamp, + AVG(CAST(TYPICAL_SALE.quantity AS INT)) AS avg_quantity, + FIRST(TYPICAL_SALE.purchase_time) AS avg_first_sale_tstamp, + LAST(TYPICAL_SALE.purchase_time) AS avg_last_sale_tstamp + ONE ROW PER MATCH + AFTER MATCH SKIP PAST LAST ROW + PATTERN (TYPICAL_SALE+? UNUSUAL_SALE) WITHIN INTERVAL '10' SECOND + DEFINE + UNUSUAL_SALE AS + UNUSUAL_SALE.quantity > AVG(CAST(TYPICAL_SALE.quantity AS INT)) * 3 +); +``` + +This query might look intimidating at first, but becomes easier to understand once we break it down. + +`PARTITION BY user_id` is similar to `GROUP BY user_id` in the original query, it lets us calculate `AVG()` values and find matches for each user separately. Unlike with a global user average, we won't receive false positives because of users who order in large quantities often. + +![](assets/useful_match.excalidraw.svg) + +We use two "pattern variables" in `PATTERN (TYPICAL_SALE+? UNUSUAL_SALE)`: + +- `TYPICAL_SALE`: We use this to match all the sales made before an "unusual" sale. + - By not specifying a condition for this variable in `DEFINE`, the [default condition](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#define--measures) is used, which evaluates to `true` for every row/sale. + + - Notice how we use `TYPICAL_SALE+?` instead of just `TYPICAL_SALE`. + - `+` and `?` are both "quantifiers", just like the [`{ n }` in `SALE{2}` from one of our previous queries](#classifying-unusual-sales). + - We use them to match "one or more" typical sales, since an `AVG()` of zero sales isn't very useful. + + - By combining both of those "quantifiers" into `+?`, we make a ["reluctant quantifier"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#greedy--reluctant-quantifiers). + - This [reduces memory consumption](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#controlling-memory-consumption) by specifying to match as few typical sales as possible. + + - Since `TYPICAL_SALE` matches every row/sale by default, we need to use a "reluctant quantifier", or the query might match all rows and never finish. + +- `UNUSUAL_SALE`: We use this to match an "unusual" sale. + - Unlike in our original query, we're able to easily calculate an `AVG()` of just typical sales by using `AVG(TYPICAL_SALE.quantity)`. + + - This prevents "unusual" sales from skewing our `AVG()` and creating false positives. + +Unlike in our original query, we can easily use `FIRST()` and `LAST()` to output the timestamps for the first and last sales that we use to calculate our `AVG()` respectively. This information lets us know exactly which previous sales were used to determine an "unusual" sale. + +![](assets/avg_range.excalidraw.svg) From 4ac19bc43e9446e27a92ff7ce66e576075b695b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Thu, 10 Jul 2025 13:50:08 +0100 Subject: [PATCH 08/15] Add to "Useful pattern" section --- .../assets/moving_average.excalidraw.svg | 5 ++ .../reluctant_quantifier.excalidraw.svg | 5 ++ .../assets/skip_past_last_row.excalidraw.svg | 5 ++ docs/anomaly-detection/index.md | 68 +++++++++++++++++-- 4 files changed, 79 insertions(+), 4 deletions(-) create mode 100644 docs/anomaly-detection/assets/moving_average.excalidraw.svg create mode 100644 docs/anomaly-detection/assets/reluctant_quantifier.excalidraw.svg create mode 100644 docs/anomaly-detection/assets/skip_past_last_row.excalidraw.svg diff --git a/docs/anomaly-detection/assets/moving_average.excalidraw.svg b/docs/anomaly-detection/assets/moving_average.excalidraw.svg new file mode 100644 index 0000000..6bd6523 --- /dev/null +++ b/docs/anomaly-detection/assets/moving_average.excalidraw.svg @@ -0,0 +1,5 @@ + + 3USER-35051015purchase_time (seconds)1513WITHIN '10' SECONDTYPICAL_SALE \ No newline at end of file diff --git a/docs/anomaly-detection/assets/reluctant_quantifier.excalidraw.svg b/docs/anomaly-detection/assets/reluctant_quantifier.excalidraw.svg new file mode 100644 index 0000000..542cb62 --- /dev/null +++ b/docs/anomaly-detection/assets/reluctant_quantifier.excalidraw.svg @@ -0,0 +1,5 @@ + + 120RELUCTANTGREEDY0123purchase_time (seconds)22201202220TYPICAL_SALEUNUSUAL_SALETYPICAL_SALEUNUSUAL_SALE \ No newline at end of file diff --git a/docs/anomaly-detection/assets/skip_past_last_row.excalidraw.svg b/docs/anomaly-detection/assets/skip_past_last_row.excalidraw.svg new file mode 100644 index 0000000..9b96ff5 --- /dev/null +++ b/docs/anomaly-detection/assets/skip_past_last_row.excalidraw.svg @@ -0,0 +1,5 @@ + + 2USER-530123purchase_time (seconds)220MATCH2AFTER MATCH SKIP PAST LAST ROW3TYPICAL_SALE \ No newline at end of file diff --git a/docs/anomaly-detection/index.md b/docs/anomaly-detection/index.md index 43344c9..3e4629e 100644 --- a/docs/anomaly-detection/index.md +++ b/docs/anomaly-detection/index.md @@ -236,7 +236,7 @@ MATCH_RECOGNIZE ( ); ``` -> Note: [The `ORDER BY` clause is required](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#order-of-events). With streaming, this ensures the output of the query will be correct even if some events arrive late. +[The `ORDER BY` clause is required](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#order-of-events), it allows us to search for patterns based on [different notions of time](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/). With streaming, this ensures the output of the query will be correct, even if some sales arrive late. We `DEFINE` a single `SALE` ["pattern variable"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#defining-a-pattern) for our condition, then include it in our pattern. We then output both the quantity and timestamp of the unusual sale. @@ -294,11 +294,27 @@ MATCH_RECOGNIZE ( This query might look intimidating at first, but becomes easier to understand once we break it down. -`PARTITION BY user_id` is similar to `GROUP BY user_id` in the original query, it lets us calculate `AVG()` values and find matches for each user separately. Unlike with a global user average, we won't receive false positives because of users who order in large quantities often. +--- + +#### `ORDER BY purchase_time` + +Like previously mentioned, this clause allows us to look for pattens based on time. In our case, the purchase time of the sale. + +--- + +#### `PARTITION BY user_id` + +This is similar to `GROUP BY user_id` in the original query, it lets us calculate `AVG()` values and find matches for each user separately. + +Unlike with a global user average, we won't receive false positives because of a minority of users who order in large quantities often. ![](assets/useful_match.excalidraw.svg) -We use two "pattern variables" in `PATTERN (TYPICAL_SALE+? UNUSUAL_SALE)`: +--- + +#### `PATTERN (TYPICAL_SALE+? UNUSUAL_SALE)` + +We use two "pattern variables" in our pattern: - `TYPICAL_SALE`: We use this to match all the sales made before an "unusual" sale. - By not specifying a condition for this variable in `DEFINE`, the [default condition](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#define--measures) is used, which evaluates to `true` for every row/sale. @@ -317,6 +333,50 @@ We use two "pattern variables" in `PATTERN (TYPICAL_SALE+? UNUSUAL_SALE)`: - This prevents "unusual" sales from skewing our `AVG()` and creating false positives. -Unlike in our original query, we can easily use `FIRST()` and `LAST()` to output the timestamps for the first and last sales that we use to calculate our `AVG()` respectively. This information lets us know exactly which previous sales were used to determine an "unusual" sale. +![](assets/reluctant_quantifier.excalidraw.svg) + +We append `WITHIN INTERVAL '10' SECOND` after the pattern to set a ["time constraint"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#time-constraint). + +> Note: We use an `INTERVAL` of `10 SECOND`s for quick user feedback in this tutorial. In a real situation, you would probably use something like `WITHIN INTERVAL '1' HOUR`. + +This provides several benefits: + +- Only the sales made within the last `10 SECOND`s are used. + - Memory use becomes more efficient, since we can prune sales older than this. + +- Our `AVG()` calculation changes from a typical arithmetic mean to a [simple moving average](https://en.wikipedia.org/wiki/Moving_average). + - There are benefits and downsides to both approaches. + + - In our case, sales are frequent, so only using recent sales can be beneficial. + +![](assets/moving_average.excalidraw.svg) + +--- + +#### `FIRST(TYPICAL_SALE.purchase_time) AS avg_first_sale_tstamp`, `LAST(...)` + +We use `FIRST()` and `LAST()` to output timestamps for the first and last sales that were used to calculate our `AVG()`. This information lets us know exactly which previous sales were used to determine an "unusual" sale. ![](assets/avg_range.excalidraw.svg) + +--- + +#### `ONE ROW PER MATCH` + +Currently, this is the only supported ["output mode"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#output-mode). + +As the name suggests, it indicates to only output one row when a match is found. + +Once released, `ALL ROWS PER MATCH` will be able to output multiple rows instead. + +--- + +#### `AFTER MATCH SKIP PAST LAST ROW` + +This is pretty self-explanatory, we skip past the last row/sale of a match before looking for the next match. + +Other ["After Match Strategies"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#after-match-strategy) are available for skipping to different rows and pattern variable values inside the current match. However, they aren't particularly useful in our scenario. + +Our strategy skips past the "unusual" sale of the current match. This prevents the "unusual" sale from being wrongly used as the first "typical" sale of the next match and skewing the `AVG()`. + +![](assets/skip_past_last_row.excalidraw.svg) From ee48a9876b4d6aa42a06e2fbf44eebdb4354af6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Thu, 10 Jul 2025 16:18:12 +0100 Subject: [PATCH 09/15] Add to "Persisting back to Kafka" section --- docs/anomaly-detection/index.md | 85 +++++++++++++++++++ .../standalone-etl-anomaly-deployment.yaml | 80 +++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 tutorials/anomaly-detection/standalone-etl-anomaly-deployment.yaml diff --git a/docs/anomaly-detection/index.md b/docs/anomaly-detection/index.md index 3e4629e..579502e 100644 --- a/docs/anomaly-detection/index.md +++ b/docs/anomaly-detection/index.md @@ -380,3 +380,88 @@ Other ["After Match Strategies"](https://nightlies.apache.org/flink/flink-docs-r Our strategy skips past the "unusual" sale of the current match. This prevents the "unusual" sale from being wrongly used as the first "typical" sale of the next match and skewing the `AVG()`. ![](assets/skip_past_last_row.excalidraw.svg) + +## Persisting back to Kafka + +Just like in the [Interactive ETL tutorial](../interactive-etl/index.md), we can create a new table to persist the output of our query back to Kafka (look at that tutorial for an explanation of the steps below). This way, we don't have to run the query every time we want to find "unusual" sales. + +First, let's define the table, and specify `csv` as the format so we don't have to provide a schema: + +```sql +CREATE TABLE UnusualSalesRecordTable ( + user_id STRING, + unusual_invoice_id STRING, + unusual_quantity INT, + unusual_tstamp TIMESTAMP(3), + avg_quantity INT, + avg_first_sale_tstamp TIMESTAMP(3), + avg_last_sale_tstamp TIMESTAMP(3), + PRIMARY KEY (`user_id`) NOT ENFORCED +) WITH ( + 'connector' = 'upsert-kafka', + 'topic' = 'flink.unusual.sales.records.interactive', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', + 'properties.client.id' = 'sql-cleaning-client', + 'properties.transaction.timeout.ms' = '800000', + 'key.format' = 'csv', + 'value.format' = 'csv', + 'value.fields-include' = 'ALL' +); +``` + +Next, let's insert the results of our "unusual" sales pattern matching query into it: + +```sql +INSERT INTO UnusualSalesRecordTable +SELECT * +FROM SalesRecordTable +MATCH_RECOGNIZE ( + PARTITION BY user_id + ORDER BY purchase_time + MEASURES + UNUSUAL_SALE.invoice_id AS unusual_invoice_id, + CAST(UNUSUAL_SALE.quantity AS INT) AS unusual_quantity, + UNUSUAL_SALE.purchase_time AS unusual_tstamp, + AVG(CAST(TYPICAL_SALE.quantity AS INT)) AS avg_quantity, + FIRST(TYPICAL_SALE.purchase_time) AS avg_first_sale_tstamp, + LAST(TYPICAL_SALE.purchase_time) AS avg_last_sale_tstamp + ONE ROW PER MATCH + AFTER MATCH SKIP PAST LAST ROW + PATTERN (TYPICAL_SALE+? UNUSUAL_SALE) WITHIN INTERVAL '10' SECOND + DEFINE + UNUSUAL_SALE AS + UNUSUAL_SALE.quantity > AVG(CAST(TYPICAL_SALE.quantity AS INT)) * 3 +); +``` + +Finally, we can verify the data is being written to the new topic by running the following command in a new terminal: + +```shell +$ kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ + ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ + --topic flink.unusual.sales.records.interactive + +user-67,7850595442358871117,30,"2025-07-10 14:07:02.697",1,"2025-07-10 14:06:54.566","2025-07-10 14:07:01.679" +user-77,787429984061010435,10,"2025-07-10 14:07:04.729",1,"2025-07-10 14:06:58.641","2025-07-10 14:07:01.672" +user-98,3476938040725302112,20,"2025-07-10 14:07:05.751",1,"2025-07-10 14:06:56.594","2025-07-10 14:07:05.749" +``` + +## Converting to a stand alone Flink job + +The ETL query (deployed above) will have to compete for resources with other queries running in the same Flink session cluster. + +Instead, like in the [Interactive ETL example](../interactive-etl/index.md), we can use a `FlinkDeployment` CR for deploying our queries as a stand-alone Flink Job. + +There is an example `FlinkDeployment` CR (`standalone-etl-anomaly-deployment.yaml`) that we can use: + +```shell +kubectl apply -n flink -f anomaly-detection/standalone-etl-anomaly-deployment.yaml +``` + +Finally, we can verify that data is being written to the new topic: + +```shell +kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ + ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ + --topic flink.unusual.sales.records +``` diff --git a/tutorials/anomaly-detection/standalone-etl-anomaly-deployment.yaml b/tutorials/anomaly-detection/standalone-etl-anomaly-deployment.yaml new file mode 100644 index 0000000..afbe16d --- /dev/null +++ b/tutorials/anomaly-detection/standalone-etl-anomaly-deployment.yaml @@ -0,0 +1,80 @@ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: standalone-etl-anomaly +spec: + image: quay.io/streamshub/flink-sql-runner:0.2.0 + flinkVersion: v2_0 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/streamshub/flink-sql-runner.jar + args: [" + CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' + ); + CREATE TABLE UnusualSalesRecordTable ( + user_id STRING, + unusual_invoice_id STRING, + unusual_quantity INT, + unusual_tstamp TIMESTAMP(3), + avg_quantity INT, + avg_first_sale_tstamp TIMESTAMP(3), + avg_last_sale_tstamp TIMESTAMP(3), + PRIMARY KEY (`user_id`) NOT ENFORCED + ) WITH ( + 'connector' = 'upsert-kafka', + 'topic' = 'flink.unusual.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', + 'properties.client.id' = 'sql-cleaning-client', + 'properties.transaction.timeout.ms' = '800000', + 'key.format' = 'csv', + 'value.format' = 'csv', + 'value.fields-include' = 'ALL' + ); + INSERT INTO UnusualSalesRecordTable + SELECT * + FROM SalesRecordTable + MATCH_RECOGNIZE ( + PARTITION BY user_id + ORDER BY purchase_time + MEASURES + UNUSUAL_SALE.invoice_id AS unusual_invoice_id, + CAST(UNUSUAL_SALE.quantity AS INT) AS unusual_quantity, + UNUSUAL_SALE.purchase_time AS unusual_tstamp, + AVG(CAST(TYPICAL_SALE.quantity AS INT)) AS avg_quantity, + FIRST(TYPICAL_SALE.purchase_time) AS avg_first_sale_tstamp, + LAST(TYPICAL_SALE.purchase_time) AS avg_last_sale_tstamp + ONE ROW PER MATCH + AFTER MATCH SKIP PAST LAST ROW + PATTERN (TYPICAL_SALE+? UNUSUAL_SALE) WITHIN INTERVAL '10' SECOND + DEFINE + UNUSUAL_SALE AS + UNUSUAL_SALE.quantity > AVG(CAST(TYPICAL_SALE.quantity AS INT)) * 3 + ); + "] + parallelism: 1 + upgradeMode: stateless From fc8176b107561b0d7a1412e290dd7aef3f0c60af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Thu, 10 Jul 2025 16:20:19 +0100 Subject: [PATCH 10/15] Minor changes for clarity --- docs/anomaly-detection/index.md | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/docs/anomaly-detection/index.md b/docs/anomaly-detection/index.md index 579502e..93e8263 100644 --- a/docs/anomaly-detection/index.md +++ b/docs/anomaly-detection/index.md @@ -75,11 +75,6 @@ user-9467&63188787247555258843 ... ``` -| user_id | product_id | invoice_id | quantity | unit_cost | -|---------|------------|---------------------|----------|-----------| -| user-11 | 188 | 3029976743068934888 | 2 | £838 | -| user-94 | 67 | 6318878724755525884 | 3 | £971 | - ### High Sale Quantities We want to detect if a user ordered a higher quantity than they usually do. This could be a sign that they made a mistake or their account has been hijacked, which could cause troubles for all parties involved if not dealt with promptly. @@ -209,7 +204,7 @@ While useful, the query above has several flaws: - Limiting the `AVG()` to sales made, for example, in the past week, might be more useful. - The query would become much more complex if, for example, we only wanted to return a match if two "unusual" sales occurred one after another. - - We would likely have to use some elaborate combination of [`WITH_TIES`](https://learn.microsoft.com/en-us/sql/t-sql/queries/top-transact-sql?view=sql-server-ver17#with-ties), [`OVER`](https://learn.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver17), and [`PARTITION BY`](https://learn.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver17). Assuming they're even supported. + - In a typical database, we would likely have to use some combination of [`WITH_TIES`](https://learn.microsoft.com/en-us/sql/t-sql/queries/top-transact-sql?view=sql-server-ver17#with-ties), [`OVER`](https://learn.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver17), and [`PARTITION BY`](https://learn.microsoft.com/en-us/sql/t-sql/queries/select-over-clause-transact-sql?view=sql-server-ver17). Assuming those are even supported. `MATCH_RECOGNIZE` lets us easily and concisely solve these problems. @@ -236,9 +231,18 @@ MATCH_RECOGNIZE ( ); ``` -[The `ORDER BY` clause is required](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#order-of-events), it allows us to search for patterns based on [different notions of time](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/). With streaming, this ensures the output of the query will be correct, even if some sales arrive late. +[The `ORDER BY` clause is required](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#order-of-events), it allows us to search for patterns based on [different notions of time](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/). + +- We pass it the `purchase_time` field from our `SalesRecordTable`, which contains [copies of the timestamp embedded in our source Kafka `ConsumerRecord`s, as the event time](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/connectors/datastream/kafka/#event-time-and-watermarks). + +- With streaming, this ensures the output of the query will be correct, even if some sales arrive late. + +The `DEFINE` and `MEASURES` clauses are similar to the `WHERE` and `SELECT` SQL clauses respectively. + +- We `DEFINE` a single `SALE` ["pattern variable"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#defining-a-pattern) with our condition, then include it in our pattern. + - > Note: The value of `SALE` is the row/sale which matches our `DEFINE`d condition. -We `DEFINE` a single `SALE` ["pattern variable"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#defining-a-pattern) for our condition, then include it in our pattern. We then output both the quantity and timestamp of the unusual sale. +- In `MEASURES`, we use the value of `SALE` to output both the quantity and timestamp of the "unusual" sale. ### Pattern navigation @@ -265,7 +269,9 @@ MATCH_RECOGNIZE ( ![](assets/first_last.excalidraw.svg) -Notice how the `SALE` pattern variable doesn't simply hold one value, it maps to multiple rows/sales/events. We're able to pass it to the `FIRST()` and `LAST()` functions to output the quantities from both the first and second matching sale respectively. These functions are specifically referred to as ["offset functions"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#logical-offsets), since we use "logical offsets" to navigate the events mapped to a particular pattern variable. +Notice how the `SALE` pattern variable doesn't simply hold one value, it maps to multiple rows/sales/events. We're able to pass it to the `FIRST()` and `LAST()` functions to output the quantities from both the first and second matching sale respectively. + +These functions are specifically referred to as ["offset functions"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#logical-offsets), since we use "logical offsets" to navigate the events mapped to a particular pattern variable. ### Useful pattern @@ -278,7 +284,8 @@ MATCH_RECOGNIZE ( PARTITION BY user_id ORDER BY purchase_time MEASURES - UNUSUAL_SALE.quantity AS unusual_quantity, + UNUSUAL_SALE.invoice_id AS unusual_invoice_id, + CAST(UNUSUAL_SALE.quantity AS INT) AS unusual_quantity, UNUSUAL_SALE.purchase_time AS unusual_tstamp, AVG(CAST(TYPICAL_SALE.quantity AS INT)) AS avg_quantity, FIRST(TYPICAL_SALE.purchase_time) AS avg_first_sale_tstamp, @@ -353,7 +360,9 @@ This provides several benefits: --- -#### `FIRST(TYPICAL_SALE.purchase_time) AS avg_first_sale_tstamp`, `LAST(...)` +#### `MEASURES` + +Like in a typical SQL `SELECT`, we use this clause to specify what to output, and use values from the "pattern variables" to output information on the "unusual" sale and the `AVG()` of the typical sales. We use `FIRST()` and `LAST()` to output timestamps for the first and last sales that were used to calculate our `AVG()`. This information lets us know exactly which previous sales were used to determine an "unusual" sale. @@ -367,7 +376,7 @@ Currently, this is the only supported ["output mode"](https://nightlies.apache.o As the name suggests, it indicates to only output one row when a match is found. -Once released, `ALL ROWS PER MATCH` will be able to output multiple rows instead. +Once released, `ALL ROWS PER MATCH` will allow you to output multiple rows instead. --- From f937301daec17601e18517e4f70fb914f7bcd5bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Fri, 11 Jul 2025 15:58:18 +0100 Subject: [PATCH 11/15] Apply suggestions from code review Co-authored-by: Thomas Cooper --- docs/anomaly-detection/index.md | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/docs/anomaly-detection/index.md b/docs/anomaly-detection/index.md index 93e8263..f8b559c 100644 --- a/docs/anomaly-detection/index.md +++ b/docs/anomaly-detection/index.md @@ -4,9 +4,11 @@ title = 'Anomaly Detection' > Note: This tutorial is mainly focused on anomaly detection. For detailed information on working with [Flink ETL Jobs](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/learn-flink/etl/) and [Session Clusters](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#session-cluster-deployments), look at the [Interactive ETL example](../interactive-etl/index.md). -[FlinkCEP](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/libs/cep/) (Complex Event Processing) is a Flink library made for finding patterns in data streams e.g. for detecting suspicious bank transactions. It can be accessed in [Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/overview/) using the [`MATCH_RECOGNIZE` clause](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/). In this tutorial, we will use Flink SQL and its `MATCH_RECOGNIZE` clause to detect and report suspicious sales across many users in real-time. +[Flink CEP](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/libs/cep/) (Complex Event Processing) is a Flink library made for finding patterns in data streams e.g. for detecting suspicious bank transactions. +It can be accessed in [Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/overview/) using the [`MATCH_RECOGNIZE` clause](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/). +In this tutorial, we will use Flink SQL and its `MATCH_RECOGNIZE` clause to detect and report suspicious sales across many users in real-time. -The tutorial is based on the StreamsHub [Flink SQL Examples](https://github.com/streamshub/flink-sql-examples) repository and the code can be found under the `tutorials/anomaly-detection` directory. +The tutorial is based on the StreamsHub [Flink SQL Examples](https://github.com/streamshub/flink-sql-examples) repository and the code can be found under the [`tutorials/anomaly-detection`](https://github.com/streamshub/flink-sql-examples/tree/main/tutorials/anomaly-detection) directory. ## Setup @@ -77,7 +79,10 @@ user-9467&63188787247555258843 ### High Sale Quantities -We want to detect if a user ordered a higher quantity than they usually do. This could be a sign that they made a mistake or their account has been hijacked, which could cause troubles for all parties involved if not dealt with promptly. +We want to detect if a user ordered a higher quantity than they usually do. +This could be a sign that they made a mistake or their account has been hijacked, which could cause troubles for all parties involved if not dealt with promptly. +It could also be a positive sign of increased interest in a particular product and our sales team might want to be notified. +Regardless, it is a situation we would like to be able to spot and take action against. ![](assets/scenario.excalidraw.svg) @@ -171,7 +176,8 @@ WHERE quantity > 3; > Note: This query might take a couple of seconds to return results, since the sales with high quantities are unusual. -Of course, this wouldn't be a particularly good measure. Specific users might always order higher quantities than the average user. +Of course, this wouldn't be a particularly good measure. +Specific users might always order higher quantities than the average user. A more useful measure would involve calculating the average sale quantity of each user, and considering a quantity "unusual" if it is, for example, 3 times higher than that user's average. @@ -305,7 +311,7 @@ This query might look intimidating at first, but becomes easier to understand on #### `ORDER BY purchase_time` -Like previously mentioned, this clause allows us to look for pattens based on time. In our case, the purchase time of the sale. +As mentioned previously, this clause allows us to look for pattens based on time. In our case, the purchase time of the sale. --- @@ -344,11 +350,11 @@ We use two "pattern variables" in our pattern: We append `WITHIN INTERVAL '10' SECOND` after the pattern to set a ["time constraint"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#time-constraint). -> Note: We use an `INTERVAL` of `10 SECOND`s for quick user feedback in this tutorial. In a real situation, you would probably use something like `WITHIN INTERVAL '1' HOUR`. +> Note: We use an `INTERVAL` of `10 SECOND`s for quick user feedback in this tutorial. In a real situation, you would probably use a much longer interval, for example: `WITHIN INTERVAL '1' HOUR`. -This provides several benefits: +Setting a time interval provides several benefits: -- Only the sales made within the last `10 SECOND`s are used. +- Only the sales made within the specified period are used. - Memory use becomes more efficient, since we can prune sales older than this. - Our `AVG()` calculation changes from a typical arithmetic mean to a [simple moving average](https://en.wikipedia.org/wiki/Moving_average). From 4e39966a54be663b7eea35db8766e13cc49171a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Fri, 11 Jul 2025 16:05:49 +0100 Subject: [PATCH 12/15] Don't use wildcard imports https://github.com/streamshub/flink-sql-examples/pull/61/files#r2200868697 --- .../kafka/data/generator/DataGenerator.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java index 7d85fbc..008eb9f 100644 --- a/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java +++ b/tutorials/data-generator/src/main/java/com/github/streamshub/kafka/data/generator/DataGenerator.java @@ -1,15 +1,24 @@ package com.github.streamshub.kafka.data.generator; -import org.apache.kafka.clients.admin.*; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; -import static org.apache.kafka.clients.CommonClientConfigs.*; +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG; public class DataGenerator implements Runnable { From c6a16feabc3f49ca45af93ec82c6788852507bc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Fri, 11 Jul 2025 16:29:14 +0100 Subject: [PATCH 13/15] Add more information on output in classifying section https://github.com/streamshub/flink-sql-examples/pull/61#discussion_r2200754025 --- docs/anomaly-detection/index.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/anomaly-detection/index.md b/docs/anomaly-detection/index.md index f8b559c..d7db7d8 100644 --- a/docs/anomaly-detection/index.md +++ b/docs/anomaly-detection/index.md @@ -143,6 +143,8 @@ There are many arbitrary ways we could use to define an "unusual" or "suspicious By looking at the data in the `SalesRecordTable`, we can observe that users typically order quantities between `1` and `3` (inclusive): +> Note: The output from the following queries will automatically update as new sales are generated. This might feel a bit unusual if you're used to typical static SQL, but it lets us do cool stuff like calculating `COUNT()` and `AVG()` values that update in real time! + 1. Fetch all the sales and group them by user: ```sql From 92caf953bc38653d4fe714352bde2fc28fe452be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Fri, 11 Jul 2025 17:01:37 +0100 Subject: [PATCH 14/15] Change `SALE` to `UNUSUAL_SALE` https://github.com/streamshub/flink-sql-examples/pull/61#discussion_r2200786460 --- .../assets/first_last.excalidraw.svg | 4 +-- docs/anomaly-detection/index.md | 32 +++++++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/docs/anomaly-detection/assets/first_last.excalidraw.svg b/docs/anomaly-detection/assets/first_last.excalidraw.svg index 37107be..dd9b9c3 100644 --- a/docs/anomaly-detection/assets/first_last.excalidraw.svg +++ b/docs/anomaly-detection/assets/first_last.excalidraw.svg @@ -1,5 +1,5 @@ -eyJ2ZXJzaW9uIjoiMSIsImVuY29kaW5nIjoiYnN0cmluZyIsImNvbXByZXNzZWQiOnRydWUsImVuY29kZWQiOiJ4nO1dWVPqSFx1MDAxYr4/v8JybuarXHUwMDFhmN6XuXNBRVx1MDAxMVx1MDAxNVxc+WqKXG5cdTAwMTAghy1AXHUwMDAwcer89+mgkFx1MDAxMDpcdTAwMTggKk5cdTAwMDGWJZ1OeNP9Pnnerdt/fuzt7Ttj29z/a2/ffC5cdTAwMWJNq9IzRvt/uO1Ds9e3Om11XGJNPvc7g1550rPuOHb/rz//9M5Iljut17PMptky205f9fu/+ry398/ktzpiVSbndlx1MDAwNz3wclxuK6KWfipcdTAwMTVKjZ/9enNy6qTTVJieWXaMdq1peoeeVTuEXHUwMDAwzFx1MDAxYcaqQVx1MDAwMJAkWEBcdTAwMDJcdJaAMTE7OrIqTn3SI1x0/C8+61E3rVrdUV24TEr/i826vIrw1573pX2n12mYR51mp+fK+Vx1MDAxYjTdtydlySg3ar3OoF2Z9XF6RrtvXHUwMDFiPTUsXr+q1WzmnfHk6mpo1TDuXHUwMDA3vuPh7Vx1MDAwNmCgPews9aW1etvs9+fk7dhG2XLcofJcdTAwMGadK6Gdrkxm6W9Ppp7RMtPuNLVcdTAwMDfN5qzZaldMd/D3XHI0923tytu3TafYmz/81vLLk9003Vx1MDAwYkOCJEaAcTk74ulcdTAwMTlcdTAwMDRcdTAwMDRcdTAwMDebs532ROmgkJghgXw9rP6x0jZnct2q0eyb3iS4wqWCmujXRp9Gti5cdTAwMWVOX06rJ4fO4VV7PDwqXFyeP2RmNzqnlY757OzPXHUwMDBl/Hr7y1x1MDAxYsCBXTFe5YGcXCLAIWdcZlHvVptWu1x1MDAxMVx1MDAxY91mp9zwbuGHb9RcdTAwMDLIWS7nvIxcdTAwMDHQIJmEXHUwMDEzxZZEjSBC81x1MDAxMII8KZdCXGLBJHOhwyDAXHUwMDE0UnVLi1x1MDAxOFwidKtAg9ZcdTAwMDBccvxcdTAwMTjQYD1o5nq/oUPNXHUwMDBlXHUwMDE2lHGoXHUwMDAzXHUwMDA3ZiRcdTAwMWNcdTAwMWOYSOQ+XHUwMDA311x1MDAwMUec+uupo6uG6vahb/Y6bSdvvUyeXHUwMDBmbK71xGhZTXe42dz5XHUwMDA3Tavm3vp+WYlq9vb9t+9YintmXHUwMDFkWlal4ueKsrqoYbXNXjpcbul0elbNalx1MDAxYs3bRZmNgdPJmf1XqZ3ewPSPhXk21X6YRHRcdHSH92c3T09O/yUxcMY3xssoNVx1MDAxYVdcdTAwMTeha/R6nVFcdTAwMDC7gMxcdTAwMTOeZN7nKT69llx1MDAxOVx1MDAxY5HwXHUwMDFhvytcdTAwMWU/iMSyUUlcZoWSmFQ4ZZiwXHUwMDA1tnJVmFx1MDAwN1x1MDAxYqcopYwySFx0XovBwkFcbpjkSqB1QGp3rCBBen/teVx1MDAxMzD5MPv77z/e751QXG7onbBAlE2j71x1MDAxY3VaLctRd3LtSrHwRHSMnnOoZs1q14LHzHYl5MjkrFx1MDAwM1x1MDAxN0Z101jQXHUwMDAxdZ7/mNJpK2Bjms1SZ1x1MDAxNImN87Iqz1x1MDAxZYsjRK5SuZvGbauFXlrxQVogXHKod5BcdTAwMGWB9NXGkGaQXHUwMDBiyORcInjduaDBxlx1MDAxOe9yxClcdTAwMTbSZ81uXHUwMDBlaSg5XHUwMDExUEDqXHLLV0Hap4WB/v9BRMvzYXvUOFx1MDAxOF1WuonzpkTG4+XPqyj2tYA4KXwvT1x1MDAxOcZcdTAwMDEkT9GtvKAk8r88SvBcZurtQnuoXHUwMDE3Wum4M/6pcM9Ft6iVT0NcdTAwMTBWINVZ1D6EXHUwMDA1gK1McI6VTb199vRdPpVLQKa3qtFcXOvGVrXTscNM6jmRg/bzgoxzVvT8fa1iRjeer8f3w9t6ySaVq0xcblVPjMFT9NhcdTAwMTGD3nRGi1x1MDAxZFx1MDAwNVx1MDAwM0PezIXGjrwuW43az+fo281jR1xicuBcdTAwMTKjzj3mWFx1MDAwNFtndjdcdTAwMTFcdTAwMDJyQtZcIumZeJrIXHUwMDExsVx1MDAxMsfX8tKw8vzgyjxcdTAwMWGwg1T2Pp7I0epG/Vx1MDAxMtwsl3NZ5IhcdTAwMTGwi1x1MDAxY31Z5OguOs9cdDU/TI2xLqwqcWhUXHUwMDE1S1x1MDAwNVxmJEGc9ut/KG70XHUwMDBl4Xx43OiibpvWXHUwMDExKuDusTxcdTAwMWRcXGRqnU6+slxiXFz3kjGFjbbL6NxcIlx1MDAxN/M+Kn2FR40wJVx1MDAwMFwiqnMxcbiLXHQl5VDZo3FGjZSLiShQXHUwMDAyUb9afn3UXGJtvYc5d8zuNMe1yfy9x8Csmk33TouZStZOOT9T7PDpfHBcdTAwMTOJgVx1MDAwMVxuXHUwMDAwWXiKMFx1MDAwNTLRIFx1MDAxOX9/Vv0gJD9EZ1VcdTAwMDQhXHUwMDEygPldelx1MDAxZq2iYONcZrOEK9RKXCI9qMeEWcgh8LhuXHJaXHUwMDA1WlpFYq71y1x1MDAxY0dcdTAwMTBCoOu7jHfmVaXE60b6yjhJWOls71x1MDAxMV7akVx1MDAxONSfQdkxaFxmuHvcnEFcdTAwMDWUXHUwMDE4clx1MDAwZXTRXHUwMDFjXHUwMDEymndRKMZcYuFcdTAwMGaA445CfWd9MIXelprWsH5e4pmz0m3msTK6XHUwMDE4ZpxIXHUwMDE0itiOQuOF8tNcblx1MDAxNCrd6Fx1MDAwMFx1MDAwMjqrXHUwMDE3wtBcdTAwMDAsxFC5pS6Lblx1MDAxZIXqPdNtodAwXHUwMDFmdH1cbi1cdTAwMTazd1x1MDAwZYJZ01CTczaoJmpOvlx1MDAxMIlCadB23VHoW+uauCtsTqGSMiqF/1x1MDAxMejhkYZ7oVx1MDAwMFxiLlx1MDAwMaA7XG79xlx1MDAxNHo9oF3DXHUwMDFjXlx1MDAxYuIp3bY7optpsUhcdTAwMTlOSMGOQuOFsrFCXHUwMDBlU5mvXHUwMDEyXHUwMDAzgLRJTFx1MDAxOJr34Fxur4hxXHUwMDE2Z1x1MDAxMjNcdTAwMWVcbkVbTaEodlxuZbDby5xcdTAwMTZrN1x1MDAxZIpcdTAwMDG7fjg/hEBTLKShUFx1MDAxZbRdd1x1MDAxNPrWuibuSjF4oZQxRIg2kMtcdTAwMTZykzNcbqVcYnIkON9R6Dem0GK7OLZlO11cdTAwMWPcmf3x/UE2I9E4XHUwMDEyhfJgRmZHoVx1MDAxYkK5vFx1MDAwMoUyTtVcdTAwMWJIrVx1MDAxYopCXHUwMDBi61x1MDAwNZGcXG6BYk++bEyheKspXHUwMDE0x06hY5BFjSZ4ubDvu8Pzy2dUR4OHSMBcdTAwMGKmQiHwra+Y1dvCQFx1MDAxZM9cIlx1MDAxMHdcdTAwMDV5oUisroBEzLhwS/J0SEQgfImLgrBUrCtiLlXYoHp+XG5Fe9Ar142+WXSslrn3e99UQKn0/7fVJXrvylx1MDAxY5fl22u9pHo/63dnZ8YxOck7ucdcdTAwMDSJXFyyXHUwMDE3tH5cdTAwMTlYtH4h1LEmZd9cdTAwMDStK9LmxlCtr0Kaglx1MDAwMqKviVembChSXHUwMDExXHUwMDEyXHUwMDA0Sya/vKhomVx1MDAxZDdcdTAwMWHbsJk4bdG6nc6c30nDXHUwMDFlXFxELyVcdTAwMDWB5CCflYFcblx1MDAwMFxiYXJBTZlOSyGaLk6GUlKKIflPKi3dWGtcdTAwMWIraK3AXHUwMDE0Yom5NlqyJGnPJEFIOXexLzBcdTAwMWXeXHUwMDE3ioeVWreTPc7d0Juj49tL0teXib4uXHUwMDE3mlx1MDAxZPm0XHUwMDE1xvpcYoRv3IhP4Vx1MDAwM1x1MDAwMychXHUwMDA2SEpcdTAwMTTFr31cdTAwMWSQ5+IxPeuL5nHGXHUwMDFjpU6GLW7XWndboqBT3OEkJJBKxCim7lx1MDAwMlx1MDAxZk8rpnFcdTAwMTlvW1x1MDAwMcJcdTAwMDOF5XDuacBcdTAwMDNSelx1MDAxMH49fXVcYs+4XHUwMDBmaFx1MDAxZSq+mZpCXHUwMDAyIC5cdTAwMTBcIj6bS4/Dz7Gb1vPuI0Zcctb0/z8qXHUwMDEwQt7tzlbuP7+fXHUwMDA1/GPZwfeu5tOgycfgalx1MDAwNrya+ImFXHUwMDBisNUugJOUXCI+cVx1MDAxMlx1MDAxNHYogGZcdTAwMDLSlUZ8WeTJI1x1MDAxNN/q71x1MDAxMPJcdOWZ6MFcInCZRz3nslR4qCVOXHUwMDFi7Xaims1fRjcyXHUwMDEwXGaU2+/Wq0R9oG/u05ZcdTAwMDDU21x1MDAxY6ssWFGTpZ6Wi5uauPMgWLB1Vq9cdTAwMDQ4Jlx1MDAxMHHuRf5iskQu+oWWZWVHNXztnD31ReGwUijrLZFcdTAwMTVcdTAwMTesKDYgkJGYXHUwMDE2rCyXc2mtXHUwMDBmgbtcdTAwMDUrX7VgpVx1MDAwNEI2XGLS2+lcdTAwMTJhjLhcdTAwMTZcdTAwMWRShOdRMFx1MDAwNFxcKJMzzjxKPGtW9DHZT16z8lx1MDAwZelEjdSuvWZcdTAwMDU9XHUwMDFk3bZcdTAwMWGZRDd7cCjYRTbRKLCT6KSHfeiaeNaahCfS+tLfJeJTMfr1z43Pllx1MDAwMF2hYFx1MDAwZlCBXHUwMDA0W1x1MDAwNKCryqFcdTAwMGLJiMCQYyDWoqxPivn0riBcdTAwMTdcdTAwMDOc+4mG193roZGp1kxNIZtuy1x1MDAwZeIrOZ5opW+PuGXJO//c7ZQyoJR881Q8RIBIyoQ2XHUwMDE1XHUwMDFmXHUwMDFh6kFAOVx1MDAxOIJgXHUwMDFje4RS/cC1KCRWXHUwMDBmNEFcdTAwMDJOXHUwMDE52Ip0vG53jiVOefSdO5ZcdTAwMDfYllx1MDAwMptcIlx1MDAwZsivwPZUaVx1MDAwN+w1gX24MbCZRJIpO1670j+8aFx1MDAxY1x1MDAxMiqVY/X1LPQxuP5YWPum521cdTAwMTPXdIRUycSqLVx1MDAwZibqlMSQIa5cdTAwMWMrKVxi44xcdTAwMTPm61Uz7MmAXHUwMDA2XHUwMDEyXHUwMDFliyqwXHJPXHUwMDE02S3gRFx1MDAxMeds26Hj0e3904to1VwiOaA4UCnLNParcmCSbmRYQoxVf+6ztba17Gerni9HK1izakKIxFx1MDAxOOpWcKLQXHUwMDEwXGZVRrCb84h5q75cdTAwMTh8zJN0Ln/7u764IKz6R8xd5eOLXHUwMDBiXHUwMDE2ZIzHs0zlnluUPybyxZvDo2zq5mxET0+jla/7RiZcZpRcdTAwMTIlicIkUiSCqIKkZz3uMFx1MDAxOVx1MDAwMZPH0TEp1ENPKDzoILmkfpZcdTAwMTDCXHUwMDE5pGD79uTKXHUwMDFjbD0kg1wixlx1MDAxNOtcdTAwMTlcdTAwMTHy5LSrve7h0DZcdTAwMGVLqYfuw3kkRFx1MDAxMlx1MDAxY7C7NbvksUksXHUwMDE2XHSgpp1DwTWh2Fx1MDAxZFwiw1x1MDAxMZlaIVx1MDAxNsuxuzFcdNauXG5joavCKFx1MDAwNdK1uLdcdTAwMGaR+YNMaqvxOC9gPGjMZrKJoVlr1Fx1MDAxMoXK3dnltXPXXHUwMDFlXHUwMDFhi2g0m03L7i/EXZlMulx0V8wwZdy/S81r6lx1MDAxMb2mXHUwMDFlXHUwMDAxo+pHXHUwMDEz/eI8SSVQbyhcdTAwMDRHXHUwMDEyakxa7lxy8jaAlVx1MDAwNNq/Mtd4slxuVlx1MDAxMVx1MDAxMJhqNvVyjZjFLalnYHVcclx1MDAxYugvTds4qah9vvtcdTAwMTSyL1x1MDAxZnPZetXA/W72ppzI5ZDTuV6pukkwiFxi8aNh/fCvXpooZIWl8lpcdTAwMTmXVKk2I8S3XFzgNakokoJcdTAwMDGpWEoojJDFZVx1MDAxZN8wqbhN4DiNXHUwMDBlXHUwMDBlXHUwMDA0XFzPXHIhXTyYslx1MDAwNYPT28dcdTAwMTlwTihcdTAwMDHrXHUwMDA0jubk2Fxcg1x1MDAxN7hMv1jyk7OK73BL1CWUa3NbrZu7KqWh9VxcXHUwMDFj3nXNWq1gnV9p/vmJntuIUFdcdTAwMDdqfqmycqTkXHUwMDAx/M64TUJcdJlYrN3dcdveXHUwMDA28D1bgdtcYiaAz1x1MDAxNSr6/ztcdTAwMTBcYlx1MDAwNbBiNkDcXHUwMDA37OeRW/m49LNvwfuXPOpcXMGH9HnCejxYjdw4WW3h5lx1MDAxMnzopYlCbsqGT1wiN1arqI1yXHUwMDE4xm2EXCIhfe7WjttiXHUwMDAxRzo6OCjCUkJK9GGT0FAm54yqh1asO3espb3bSWzvXHUwMDEwS5zE9uPtabBv2HbeUaO5P0267Fx1MDAwZi1zdLio6L9VJy/3qTKBvavk5iSV9uvHr39cdTAwMDFoZqBcdTAwMGYifQ==1USER-1610123purchase_time (seconds)3FIRST()LAST()UNUSUAL_SALE22 \ No newline at end of file diff --git a/docs/anomaly-detection/index.md b/docs/anomaly-detection/index.md index d7db7d8..3dac204 100644 --- a/docs/anomaly-detection/index.md +++ b/docs/anomaly-detection/index.md @@ -230,12 +230,12 @@ FROM SalesRecordTable MATCH_RECOGNIZE ( ORDER BY purchase_time MEASURES - SALE.quantity AS unusual_quantity, - SALE.purchase_time AS unusual_tstamp - PATTERN (SALE) + UNUSUAL_SALE.quantity AS unusual_quantity, + UNUSUAL_SALE.purchase_time AS unusual_tstamp + PATTERN (UNUSUAL_SALE) DEFINE - SALE AS - CAST(SALE.quantity AS INT) > 3 + UNUSUAL_SALE AS + CAST(UNUSUAL_SALE.quantity AS INT) > 3 ); ``` @@ -247,10 +247,10 @@ MATCH_RECOGNIZE ( The `DEFINE` and `MEASURES` clauses are similar to the `WHERE` and `SELECT` SQL clauses respectively. -- We `DEFINE` a single `SALE` ["pattern variable"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#defining-a-pattern) with our condition, then include it in our pattern. - - > Note: The value of `SALE` is the row/sale which matches our `DEFINE`d condition. +- We `DEFINE` a single `UNUSUAL_SALE` ["pattern variable"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#defining-a-pattern) with our condition, then include it in our pattern. + - > Note: The value of `UNUSUAL_SALE` is the row/sale which matches our `DEFINE`d condition. -- In `MEASURES`, we use the value of `SALE` to output both the quantity and timestamp of the "unusual" sale. +- In `MEASURES`, we use the value of `UNUSUAL_SALE` to output both the quantity and timestamp of the "unusual" sale. ### Pattern navigation @@ -264,20 +264,20 @@ FROM SalesRecordTable MATCH_RECOGNIZE ( ORDER BY purchase_time MEASURES - FIRST(SALE.quantity) AS first_unusual_quantity, - FIRST(SALE.purchase_time) AS first_unusual_tstamp, - LAST(SALE.quantity) AS last_unusual_quantity, - LAST(SALE.purchase_time) AS last_unusual_tstamp - PATTERN (SALE{2}) + FIRST(UNUSUAL_SALE.quantity) AS first_unusual_quantity, + FIRST(UNUSUAL_SALE.purchase_time) AS first_unusual_tstamp, + LAST(UNUSUAL_SALE.quantity) AS last_unusual_quantity, + LAST(UNUSUAL_SALE.purchase_time) AS last_unusual_tstamp + PATTERN (UNUSUAL_SALE{2}) DEFINE - SALE AS - CAST(SALE.quantity AS INT) = 2 + UNUSUAL_SALE AS + CAST(UNUSUAL_SALE.quantity AS INT) = 2 ); ``` ![](assets/first_last.excalidraw.svg) -Notice how the `SALE` pattern variable doesn't simply hold one value, it maps to multiple rows/sales/events. We're able to pass it to the `FIRST()` and `LAST()` functions to output the quantities from both the first and second matching sale respectively. +Notice how the `UNUSUAL_SALE` pattern variable doesn't simply hold one value, it maps to multiple rows/sales/events. We're able to pass it to the `FIRST()` and `LAST()` functions to output the quantities from both the first and second matching sale respectively. These functions are specifically referred to as ["offset functions"](https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/sql/queries/match_recognize/#logical-offsets), since we use "logical offsets" to navigate the events mapped to a particular pattern variable. From dcb0827945b68220e14d2691000630c9293e1eda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Fri, 11 Jul 2025 17:23:19 +0100 Subject: [PATCH 15/15] Even more detail in classifying section --- docs/anomaly-detection/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/anomaly-detection/index.md b/docs/anomaly-detection/index.md index 3dac204..e0d9b8f 100644 --- a/docs/anomaly-detection/index.md +++ b/docs/anomaly-detection/index.md @@ -143,7 +143,7 @@ There are many arbitrary ways we could use to define an "unusual" or "suspicious By looking at the data in the `SalesRecordTable`, we can observe that users typically order quantities between `1` and `3` (inclusive): -> Note: The output from the following queries will automatically update as new sales are generated. This might feel a bit unusual if you're used to typical static SQL, but it lets us do cool stuff like calculating `COUNT()` and `AVG()` values that update in real time! +> Note: Don't worry if the constantly changing output in the following queries is confusing. The main thing to take away is that the values in the `avg_quantity` column stay as `2` most of the time, which means that the average quantity for each user stays as `2` even as new sales are generated and cause the calculation to update. 1. Fetch all the sales and group them by user: