Skip to content

Commit f7b26f6

Browse files
Features/bdp 94 lof (#37)
* refactor in local folder * lof poc
1 parent 6f295ed commit f7b26f6

20 files changed

Lines changed: 739 additions & 118 deletions
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from pyspark.sql import SparkSession
2+
from scripts.local.shared.schemas import transaction_schema
3+
from scripts.local.aggregate.components.aggregations import aggregate
4+
5+
spark = (
6+
SparkSession.builder.appName("DataAggregations")
7+
.config("spark.sql.parquet.enableVectorizedReader", "true")
8+
.config("spark.sql.parquet.mergeSchema", "false") # No need as we explicitly specify the schema
9+
.config("spark.executor.memory", "6g")
10+
.config("spark.driver.memory", "2g")
11+
# .config("spark.local.dir", "/mnt/d/spark-temp") # Change the temp directory
12+
.getOrCreate()
13+
)
14+
15+
source_dir = "data/historical/etl/transactions"
16+
output_dir = "data/historical/aggregations"
17+
18+
aggregate(spark, source_dir, output_dir, transaction_schema)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from pyspark.sql import SparkSession
2+
from scripts.local.shared.benchmark_schemas import benchmark_transaction_schema
3+
from components.aggregations import aggregate
4+
5+
spark = (
6+
SparkSession.builder.appName("DataAggregations")
7+
.config("spark.sql.parquet.enableVectorizedReader", "true")
8+
.config("spark.sql.parquet.mergeSchema", "false") # No need as we explicitly specify the schema
9+
.config("spark.executor.memory", "6g")
10+
.config("spark.driver.memory", "2g")
11+
# .config("spark.local.dir", "/mnt/d/spark-temp") # Change the temp directory
12+
.getOrCreate()
13+
)
14+
15+
source_dir = "data/benchmark/etl/transactions"
16+
output_dir = "data/benchmark/aggregations"
17+
18+
aggregate(spark, source_dir, output_dir, benchmark_transaction_schema)

scripts/local/etl/aggregations.py renamed to scripts/local/aggregate/components/aggregations.py

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from pyspark.sql.functions import mean, mode, stddev, count, median, sum, min, max, col, lit, count_distinct, unix_timestamp, lag, first, when, monotonically_increasing_id
2-
from pyspark.sql import SparkSession, DataFrame
2+
from pyspark.sql import DataFrame
33
from pyspark.sql.window import Window
4-
from scripts.local.shared.schemas import transaction_schema
54

65
def calculate_aggregations(df):
76
sender_window = Window.partitionBy("sender_address").orderBy("block_timestamp")
@@ -147,39 +146,26 @@ def preprocess_btc_df(df):
147146

148147
return df_btc_send.unionByName(df_btc_receive)
149148

150-
151-
spark = (
152-
SparkSession.builder.appName("DataAggregations")
153-
.config("spark.sql.parquet.enableVectorizedReader", "true")
154-
.config("spark.sql.parquet.mergeSchema", "false") # No need as we explicitly specify the schema
155-
.config("spark.executor.memory", "6g")
156-
.config("spark.driver.memory", "2g")
157-
# .config("spark.local.dir", "/mnt/d/spark-temp") # Change the temp directory
158-
.getOrCreate()
159-
)
160-
161-
source_dir = "data/historical/etl/transactions" #data/benchmark/etl/transactions or data/historical/etl/transactions
162-
output_dir = "data/historical/aggregations" #data/benchmark/aggregations or data/historical/aggregations
163-
164-
cols_to_drop = ["transaction_id", "block_number", "transaction_index"]
165-
transaction_df = spark.read.schema(transaction_schema).parquet(source_dir).drop(*cols_to_drop)
149+
def aggregate(spark, source_dir, output_dir, schema):
150+
cols_to_drop = ["transaction_id", "block_number", "transaction_index"]
151+
transaction_df = spark.read.schema(schema).parquet(source_dir).drop(*cols_to_drop)
166152

167-
unique_degrees_df = calculate_unique_degrees(transaction_df)
153+
unique_degrees_df = calculate_unique_degrees(transaction_df)
168154

169-
df_eth = transaction_df.where(col("network_name") == "ethereum")
170-
df_btc = transaction_df.where(col("network_name") == "bitcoin")
155+
df_eth = transaction_df.where(col("network_name") == "ethereum")
156+
df_btc = transaction_df.where(col("network_name") == "bitcoin")
171157

172-
df_btc = preprocess_btc_df(df_btc)
158+
df_btc = preprocess_btc_df(df_btc)
173159

174-
df_btc_aggregations = calculate_aggregations(df_btc)
175-
df_eth_aggregations = calculate_aggregations(df_eth)
160+
df_btc_aggregations = calculate_aggregations(df_btc)
161+
df_eth_aggregations = calculate_aggregations(df_eth)
176162

177-
df_btc_aggregations = df_btc_aggregations.withColumn("network_name", lit("bitcoin"))
178-
df_eth_aggregations = df_eth_aggregations.withColumn("network_name", lit("ethereum"))
163+
df_btc_aggregations = df_btc_aggregations.withColumn("network_name", lit("bitcoin"))
164+
df_eth_aggregations = df_eth_aggregations.withColumn("network_name", lit("ethereum"))
179165

180-
aggregations_df = df_btc_aggregations.unionByName(df_eth_aggregations)
181-
aggregations_df = aggregations_df.join(unique_degrees_df, "address", "outer").na.fill(0)
166+
aggregations_df = df_btc_aggregations.unionByName(df_eth_aggregations)
167+
aggregations_df = aggregations_df.join(unique_degrees_df, "address", "outer").na.fill(0)
182168

183-
aggregations_df.coalesce(1).write.parquet(output_dir, mode="overwrite", compression="zstd")
169+
aggregations_df.coalesce(1).write.parquet(output_dir, mode="overwrite", compression="zstd")
184170

185-
spark.stop()
171+
spark.stop()

scripts/local/anomalies_detection/preprocessing/components/preprocess_data_second_step.py

Lines changed: 0 additions & 11 deletions
This file was deleted.

scripts/local/anomalies_detection/preprocessing/preprocess_benchmark_datasets.py

Lines changed: 0 additions & 23 deletions
This file was deleted.

scripts/local/etl/benchmark_components/etl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from functools import reduce
22
from typing import Callable
33
from pyspark.sql import SparkSession, DataFrame
4-
from scripts.local.shared.schemas import benchmark_input_schema
4+
from scripts.local.shared.benchmark_schemas import benchmark_input_schema
55

66
def extract(spark: SparkSession, path: str) -> DataFrame:
77
df = spark.read.schema(benchmark_input_schema).csv(path, header=True)
File renamed without changes.

scripts/local/anomalies_detection/preprocessing/components/join_transactions_with_aggregations.py renamed to scripts/local/preprocessing/components/join_transactions_with_aggregations.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
from pyspark.sql.functions import col
2-
from scripts.local.shared.schemas import transaction_scaled_schema, aggregations_scaled_schema
32
from scripts.local.shared.consts import sender_fields, receiver_fields, common_fields
43

5-
def join_transactions_with_aggregations(spark, transactions_dir, aggregations_dir, output_dir):
6-
transactions_df = spark.read.schema(transaction_scaled_schema).parquet(transactions_dir)
4+
def join_transactions_with_aggregations(spark, transactions_dir, aggregations_dir, output_dir, transactions_scaled_schema, aggregations_scaled_schema):
5+
transactions_df = spark.read.schema(transactions_scaled_schema).parquet(transactions_dir)
76
aggregations_df = spark.read.schema(aggregations_scaled_schema).parquet(aggregations_dir)
87

98
sender_aggregations = aggregations_df.select(
@@ -30,4 +29,4 @@ def join_transactions_with_aggregations(spark, transactions_dir, aggregations_di
3029
compression="zstd"
3130
)
3231

33-
return final_df
32+
return final_df

scripts/local/anomalies_detection/preprocessing/components/preprocess_data_first_step.py renamed to scripts/local/preprocessing/components/preprocess_data_first_step.py

File renamed without changes.

0 commit comments

Comments
 (0)