1+ import sys
2+ from awsglue .transforms import *
3+ from awsglue .utils import getResolvedOptions
4+ from awsglue .context import GlueContext
5+ from awsglue .job import Job
6+ from pyspark .sql import SparkSession
7+ from pyspark .sql .functions import col , when
8+ from pyspark .sql .types import (
9+ StructType ,
10+ StructField ,
11+ StringType ,
12+ TimestampType ,
13+ DoubleType ,
14+ )
15+
16+ ## @params: [JOB_NAME]
17+ args = getResolvedOptions (sys .argv , ['JOB_NAME' , 'QUANTILE' ])
18+
19+ input_schema = StructType (
20+ [
21+ StructField ("transaction_hash" , StringType (), True ),
22+ StructField ("sender_address" , StringType (), True ),
23+ StructField ("receiver_address" , StringType (), True ),
24+ StructField ("block_timestamp_unscaled" , TimestampType (), True ),
25+ StructField ("network_name" , DoubleType (), True ),
26+ StructField ("cluster_id" , DoubleType (), True ),
27+ StructField ("distance" , DoubleType (), True ),
28+ ]
29+ )
30+
31+ columns_to_drop = ["cluster_id" , "distance" ]
32+
33+ def validate_params (quantile ):
34+ if not quantile :
35+ print ("ERROR: parameter quantile required" )
36+ sys .exit (1 )
37+
38+ def setup_blockchain_db (spark ):
39+ spark .sql ("""
40+ CREATE DATABASE IF NOT EXISTS bdp
41+ """ )
42+
43+ def setup_iceberg_table (spark ):
44+ spark .sql ("""
45+ CREATE TABLE IF NOT EXISTS glue_catalog.bdp.anomaly_detection (
46+ transaction_hash STRING,
47+ sender_address STRING,
48+ receiver_address STRING,
49+ block_timestamp_unscaled TIMESTAMP,
50+ network_name STRING,
51+ is_anomaly BOOLEAN
52+ )
53+ PARTITIONED BY (network_name, day(block_timestamp_unscaled))
54+ LOCATION 's3://bdp-anomaly-detection'
55+ TBLPROPERTIES (
56+ 'table_type' = 'ICEBERG',
57+ 'write.format.default' = 'parquet',
58+ 'write.parquet.compression-codec' = 'zstd'
59+ )
60+ """ )
61+
62+ spark = (
63+ SparkSession .builder .appName ("DataAggregations" )
64+ .config ("spark.sql.parquet.enableVectorizedReader" , "false" )
65+ .config ("spark.sql.parquet.mergeSchema" , "true" ) # No need as we explicitly specify the schema
66+ .config ("spark.sql.extensions" , "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" ) \
67+ .config ("spark.sql.catalog.glue_catalog" , "org.apache.iceberg.spark.SparkCatalog" ) \
68+ .config ("spark.sql.catalog.glue_catalog.warehouse" , "s3://bdp-wallets-aggregations/" ) \
69+ .config ("spark.sql.catalog.glue_catalog.catalog-impl" , "org.apache.iceberg.aws.glue.GlueCatalog" ) \
70+ .config ("spark.sql.catalog.glue_catalog.io-impl" , "org.apache.iceberg.aws.s3.S3FileIO" ) \
71+ .config ("spark.sql.catalog.glue_catalog.glue.id" , "982534349340" ) \
72+ .config ("spark.sql.adaptive.enabled" , "true" ) # Keep partitions in simmilar size
73+ .getOrCreate ()
74+ )
75+
76+ quantile = float (args ["QUANTILE" ])
77+ validate_params (quantile )
78+
79+ glueContext = GlueContext (spark )
80+ job = Job (glueContext )
81+
82+ job .init (args ['JOB_NAME' ], args )
83+
84+ df = spark .read .option ("header" , "false" ).schema (input_schema ).csv ("s3://bdp-inference-results/kmeans/part-00000-609399e4-4d11-42c0-94eb-3d82fbc5d896-c000.csv.out" )
85+
86+ setup_blockchain_db (spark )
87+ setup_iceberg_table (spark )
88+ spark .sql ("TRUNCATE TABLE glue_catalog.bdp.anomaly_detection" )
89+
90+ try :
91+ threshold = df .approxQuantile ("distance" , [quantile ], 0.001 )[0 ]
92+ except :
93+ quantile = 0.673
94+ threshold = df .approxQuantile ("distance" , [quantile ], 0.001 )[0 ]
95+
96+ df = df .withColumn ("is_anomaly" , when (col ("distance" ) > threshold , True ).otherwise (False ))
97+
98+ df = df .drop (* columns_to_drop )
99+ df = df .withColumn ("network_name" , when (col ("network_name" ) == False , "ethereum" ).otherwise ("bitcoin" ))
100+
101+ glueContext .write_data_frame .from_catalog (
102+ frame = df ,
103+ database = "bdp" ,
104+ table_name = "anomaly_detection" ,
105+ additional_options = {
106+ "useCatalogSchema" : True ,
107+ "useSparkDataSource" : True
108+ }
109+ )
110+
111+ job .commit ()
0 commit comments