-
Notifications
You must be signed in to change notification settings - Fork 10
Anomaly Detection tutorial #61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
tomncooper
merged 15 commits into
streamshub:main
from
piotrpdev:docs/anomaly-detection-tutorial
Jul 14, 2025
Merged
Changes from all commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
83ab98b
Create topics with retention
piotrpdev 9f6d7ab
Increase emission rate to 1 event per second
piotrpdev 577a4c0
Add `batchSize()` to `Data` class
piotrpdev 2770cd1
Add `batchSize()` and quantity multiplication to `SalesData`
piotrpdev 646b829
Use `batchSize()` in `DataGenerator`
piotrpdev bef1a96
Add `flink-session-anomaly.yaml`
piotrpdev b4b4abc
Add anomaly detection tutorial unfinished draft
piotrpdev 4ac19bc
Add to "Useful pattern" section
piotrpdev ee48a98
Add to "Persisting back to Kafka" section
piotrpdev fc8176b
Minor changes for clarity
piotrpdev f937301
Apply suggestions from code review
piotrpdev 4e39966
Don't use wildcard imports
piotrpdev c6a16fe
Add more information on output in classifying section
piotrpdev 92caf95
Change `SALE` to `UNUSUAL_SALE`
piotrpdev dcb0827
Even more detail in classifying section
piotrpdev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions
5
docs/anomaly-detection/assets/reluctant_quantifier.excalidraw.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions
5
docs/anomaly-detection/assets/skip_past_last_row.excalidraw.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| apiVersion: flink.apache.org/v1beta1 | ||
| kind: FlinkDeployment | ||
| metadata: | ||
| name: session-cluster-anomaly | ||
| spec: | ||
| image: quay.io/streamshub/flink-sql-runner:0.2.0 | ||
| flinkVersion: v2_0 | ||
| flinkConfiguration: | ||
| taskmanager.numberOfTaskSlots: "2" | ||
| serviceAccount: flink | ||
| jobManager: | ||
| resource: | ||
| memory: "2048m" | ||
| cpu: 1 | ||
| taskManager: | ||
| resource: | ||
| memory: "2048m" | ||
| cpu: 2 |
80 changes: 80 additions & 0 deletions
80
tutorials/anomaly-detection/standalone-etl-anomaly-deployment.yaml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| apiVersion: flink.apache.org/v1beta1 | ||
| kind: FlinkDeployment | ||
| metadata: | ||
| name: standalone-etl-anomaly | ||
| spec: | ||
| image: quay.io/streamshub/flink-sql-runner:0.2.0 | ||
| flinkVersion: v2_0 | ||
| flinkConfiguration: | ||
| taskmanager.numberOfTaskSlots: "1" | ||
| serviceAccount: flink | ||
| jobManager: | ||
| resource: | ||
| memory: "2048m" | ||
| cpu: 1 | ||
| taskManager: | ||
| resource: | ||
| memory: "2048m" | ||
| cpu: 1 | ||
| job: | ||
| jarURI: local:///opt/streamshub/flink-sql-runner.jar | ||
| args: [" | ||
| CREATE TABLE SalesRecordTable ( | ||
| invoice_id STRING, | ||
| user_id STRING, | ||
| product_id STRING, | ||
| quantity STRING, | ||
| unit_cost STRING, | ||
| `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', | ||
| WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND | ||
| ) WITH ( | ||
| 'connector' = 'kafka', | ||
| 'topic' = 'flink.sales.records', | ||
| 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', | ||
| 'properties.group.id' = 'sales-record-group', | ||
| 'value.format' = 'avro-confluent', | ||
| 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', | ||
| 'scan.startup.mode' = 'latest-offset' | ||
| ); | ||
| CREATE TABLE UnusualSalesRecordTable ( | ||
| user_id STRING, | ||
| unusual_invoice_id STRING, | ||
| unusual_quantity INT, | ||
| unusual_tstamp TIMESTAMP(3), | ||
| avg_quantity INT, | ||
| avg_first_sale_tstamp TIMESTAMP(3), | ||
| avg_last_sale_tstamp TIMESTAMP(3), | ||
| PRIMARY KEY (`user_id`) NOT ENFORCED | ||
| ) WITH ( | ||
| 'connector' = 'upsert-kafka', | ||
| 'topic' = 'flink.unusual.sales.records', | ||
| 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', | ||
| 'properties.client.id' = 'sql-cleaning-client', | ||
| 'properties.transaction.timeout.ms' = '800000', | ||
| 'key.format' = 'csv', | ||
| 'value.format' = 'csv', | ||
| 'value.fields-include' = 'ALL' | ||
| ); | ||
| INSERT INTO UnusualSalesRecordTable | ||
| SELECT * | ||
| FROM SalesRecordTable | ||
| MATCH_RECOGNIZE ( | ||
| PARTITION BY user_id | ||
| ORDER BY purchase_time | ||
| MEASURES | ||
| UNUSUAL_SALE.invoice_id AS unusual_invoice_id, | ||
| CAST(UNUSUAL_SALE.quantity AS INT) AS unusual_quantity, | ||
| UNUSUAL_SALE.purchase_time AS unusual_tstamp, | ||
| AVG(CAST(TYPICAL_SALE.quantity AS INT)) AS avg_quantity, | ||
| FIRST(TYPICAL_SALE.purchase_time) AS avg_first_sale_tstamp, | ||
| LAST(TYPICAL_SALE.purchase_time) AS avg_last_sale_tstamp | ||
| ONE ROW PER MATCH | ||
| AFTER MATCH SKIP PAST LAST ROW | ||
| PATTERN (TYPICAL_SALE+? UNUSUAL_SALE) WITHIN INTERVAL '10' SECOND | ||
| DEFINE | ||
| UNUSUAL_SALE AS | ||
| UNUSUAL_SALE.quantity > AVG(CAST(TYPICAL_SALE.quantity AS INT)) * 3 | ||
| ); | ||
| "] | ||
| parallelism: 1 | ||
| upgradeMode: stateless |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.