Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/anomaly-detection/assets/avg_range.excalidraw.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions docs/anomaly-detection/assets/first_last.excalidraw.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions docs/anomaly-detection/assets/moving_average.excalidraw.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions docs/anomaly-detection/assets/scenario.excalidraw.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions docs/anomaly-detection/assets/useful_match.excalidraw.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 5 additions & 0 deletions docs/anomaly-detection/assets/useful_query.excalidraw.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
484 changes: 484 additions & 0 deletions docs/anomaly-detection/index.md

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions tutorials/anomaly-detection/flink-session-anomaly.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: session-cluster-anomaly
spec:
image: quay.io/streamshub/flink-sql-runner:0.2.0
flinkVersion: v2_0
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 2
80 changes: 80 additions & 0 deletions tutorials/anomaly-detection/standalone-etl-anomaly-deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: standalone-etl-anomaly
spec:
image: quay.io/streamshub/flink-sql-runner:0.2.0
flinkVersion: v2_0
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/streamshub/flink-sql-runner.jar
args: ["
CREATE TABLE SalesRecordTable (
invoice_id STRING,
user_id STRING,
product_id STRING,
quantity STRING,
unit_cost STRING,
`purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'flink.sales.records',
'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092',
'properties.group.id' = 'sales-record-group',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6',
'scan.startup.mode' = 'latest-offset'
);
CREATE TABLE UnusualSalesRecordTable (
user_id STRING,
unusual_invoice_id STRING,
unusual_quantity INT,
unusual_tstamp TIMESTAMP(3),
avg_quantity INT,
avg_first_sale_tstamp TIMESTAMP(3),
avg_last_sale_tstamp TIMESTAMP(3),
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'flink.unusual.sales.records',
'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092',
'properties.client.id' = 'sql-cleaning-client',
'properties.transaction.timeout.ms' = '800000',
'key.format' = 'csv',
'value.format' = 'csv',
'value.fields-include' = 'ALL'
);
INSERT INTO UnusualSalesRecordTable
SELECT *
FROM SalesRecordTable
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY purchase_time
MEASURES
UNUSUAL_SALE.invoice_id AS unusual_invoice_id,
CAST(UNUSUAL_SALE.quantity AS INT) AS unusual_quantity,
UNUSUAL_SALE.purchase_time AS unusual_tstamp,
AVG(CAST(TYPICAL_SALE.quantity AS INT)) AS avg_quantity,
FIRST(TYPICAL_SALE.purchase_time) AS avg_first_sale_tstamp,
LAST(TYPICAL_SALE.purchase_time) AS avg_last_sale_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (TYPICAL_SALE+? UNUSUAL_SALE) WITHIN INTERVAL '10' SECOND
DEFINE
UNUSUAL_SALE AS
UNUSUAL_SALE.quantity > AVG(CAST(TYPICAL_SALE.quantity AS INT)) * 3
);
"]
parallelism: 1
upgradeMode: stateless
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

public interface Data {
String topic();
default int batchSize() {
return 1;
}
SpecificRecord generate();
String generateCsv();
Schema schema();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,89 @@
package com.github.streamshub.kafka.data.generator;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG;

public class DataGenerator implements Runnable {
final static Map<String, String> RETENTION_CONFIG = Collections.singletonMap(RETENTION_MS_CONFIG, String.valueOf(60 * 60 * 1000)); // 1 hour
final static int KAFKA_ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG = 5000;
final static String KAFKA_ADMIN_CLIENT_ID_CONFIG = "data-generator-admin-client";
final String bootstrapServers;
final List<Data> dataTypes;
final Properties kafkaAdminProps;

public DataGenerator(String bootstrapServers, List<Data> dataTypes) {
this.bootstrapServers = bootstrapServers;
this.dataTypes = dataTypes;

kafkaAdminProps = new Properties();
kafkaAdminProps.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
kafkaAdminProps.put(CLIENT_ID_CONFIG, KAFKA_ADMIN_CLIENT_ID_CONFIG);
kafkaAdminProps.put(REQUEST_TIMEOUT_MS_CONFIG, String.valueOf(KAFKA_ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG));
}

@Override
public void run() {
createTopics(dataTypes.stream().map(Data::topic).toList());

if (Boolean.parseBoolean(System.getenv("USE_APICURIO_REGISTRY"))) {
String registryUrl = System.getenv("REGISTRY_URL");
Producer<String, Object> producer = new KafkaProducer<>(KafkaClientProps.avro(bootstrapServers, registryUrl));
send(producer, () -> dataTypes.stream().map(this::generateAvroRecord).toList());
send(producer, () -> generateTopicRecords(this::generateAvroRecord));
} else {
Producer<String, String> producer = new KafkaProducer<>(KafkaClientProps.csv(bootstrapServers));
send(producer, () -> dataTypes.stream().map(this::generateCsvRecord).toList());
send(producer, () -> generateTopicRecords(this::generateCsvRecord));
}
}

private <V> List<ProducerRecord<String, V>> generateTopicRecords(Function<Data, ProducerRecord<String, V>> recordsGenerator) {
List<ProducerRecord<String, V>> records = new ArrayList<>();

for (Data dataType : this.dataTypes) {
for (int i = 0; i < dataType.batchSize(); i++) {
records.add(recordsGenerator.apply(dataType));
}
}

return records;
}

private void createTopics(List<String> topicNames) {
try (AdminClient adminClient = AdminClient.create(kafkaAdminProps)) {
Set<String> existingTopicNames = adminClient.listTopics().names().get();
List<NewTopic> newTopics = topicNames.stream()
.filter(topicName -> !existingTopicNames.contains(topicName)) // createTopics() will throw if topic already exists
.map(
topicName -> new NewTopic(topicName, Optional.empty(), Optional.empty()).configs(RETENTION_CONFIG)
)
.toList();

if (newTopics.isEmpty()) {
return;
}

adminClient.createTopics(newTopics).all().get();
System.out.println("Kafka Data Generator : Successfully created topics: " + newTopics);
} catch (Exception e) {
System.out.println("Kafka Data Generator : Failed to create topics.");
throw new RuntimeException(e);
}
}

Expand Down Expand Up @@ -61,7 +120,7 @@ private <V> void send(Producer<String, V> producer, Supplier<List<ProducerRecord
}
}

Thread.sleep(3000);
Thread.sleep(1000);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@

public class SalesData implements Data {
private final Random random = new Random();
private final static int USER_COUNT = 100;

public String topic() {
return "flink.sales.records";
}
public Schema schema() {
return Sales.SCHEMA$;
}
public int batchSize() {
return USER_COUNT;
}

public SpecificRecord generate() {
return Sales.newBuilder()
Expand All @@ -40,15 +44,21 @@ private String generateInvoiceId() {
}

private String generateUserId() {
return "user-" + Math.abs(random.nextInt(100));
return "user-" + Math.abs(random.nextInt(USER_COUNT));
}

private String generateProductId() {
return String.valueOf(Math.abs(random.nextInt(200)));
}

private String generateQuantity() {
return String.valueOf(Math.abs(random.nextInt(3) + 1));
int multiplier = 1;

if (Math.abs(random.nextInt(batchSize() * 10)) < 2) {
multiplier = 10;
}

return String.valueOf(Math.abs((1 + random.nextInt(3)) * multiplier ));
}

private String generateUnitCost() {
Expand Down