Skip to content

Commit 853b4c2

Browse files
authored
iceberg partitioning (#29)
* iceberg partitioning * table name fix
1 parent f381aed commit 853b4c2

File tree

5 files changed

+18
-225
lines changed

5 files changed

+18
-225
lines changed

infrastructure/modules/glue_catalog/main.tf

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,13 @@ resource "aws_glue_catalog_table" "cleaned_transactions" {
1212
metadata_operation = "CREATE"
1313
}
1414
}
15-
16-
partition_keys {
15+
//Commented because https://github.com/hashicorp/terraform-provider-aws/issues/36531
16+
/*partition_keys {
1717
name = "network_name"
1818
type = "string"
19-
}
19+
}*/
2020

2121
parameters = {
22-
"table_type" = "ICEBERG",
2322
"write.format.default" = "parquet"
2423
"write.parquet.compression-codec" = "zstd"
2524
}
@@ -133,13 +132,13 @@ resource "aws_glue_catalog_table" "wallets_aggregations" {
133132
}
134133
}
135134

136-
partition_keys {
135+
//Commented because https://github.com/hashicorp/terraform-provider-aws/issues/36531
136+
/*partition_keys {
137137
name = "network_name"
138138
type = "string"
139-
}
139+
}*/
140140

141141
parameters = {
142-
"table_type" = "ICEBERG",
143142
"write.format.default" = "parquet",
144143
"write.parquet.compression-codec" = "zstd",
145144
"write.bucketed-columns" = "address",
@@ -407,13 +406,13 @@ resource "aws_glue_catalog_table" "features" {
407406
}
408407
}
409408

410-
partition_keys {
409+
//Commented because https://github.com/hashicorp/terraform-provider-aws/issues/36531
410+
/*partition_keys {
411411
name = "network_name"
412412
type = "boolean"
413-
}
413+
}*/
414414

415415
parameters = {
416-
"table_type" = "ICEBERG",
417416
"write.format.default" = "parquet",
418417
"write.parquet.compression-codec" = "zstd"
419418
}

scripts/cloud/etl/orphan_files_removal.py

Lines changed: 0 additions & 30 deletions
This file was deleted.

scripts/cloud/etl/preprocessing.py

Lines changed: 3 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -20,93 +20,9 @@ def setup_blockchain_db(spark):
2020

2121

2222
def setup_iceberg_table(spark):
23-
spark.sql("""
24-
CREATE TABLE IF NOT EXISTS glue_catalog.bdp.features (
25-
block_timestamp FLOAT,
26-
block_number FLOAT,
27-
transaction_index FLOAT,
28-
fee FLOAT,
29-
total_transferred_value FLOAT,
30-
total_input_value FLOAT,
31-
sent_value FLOAT,
32-
received_value FLOAT,
33-
network_name BOOLEAN,
34-
35-
avg_sent_value FLOAT,
36-
avg_received_value FLOAT,
37-
avg_total_value_for_sender FLOAT,
38-
avg_total_value_for_receiver FLOAT,
39-
40-
sum_sent_value FLOAT,
41-
sum_received_value FLOAT,
42-
sum_total_value_for_sender FLOAT,
43-
sum_total_value_for_receiver FLOAT,
44-
45-
min_sent_value FLOAT,
46-
min_received_value FLOAT,
47-
min_total_value_for_sender FLOAT,
48-
min_total_value_for_receiver FLOAT,
49-
50-
max_sent_value FLOAT,
51-
max_received_value FLOAT,
52-
max_total_value_for_sender FLOAT,
53-
max_total_value_for_receiver FLOAT,
54-
55-
median_sent_value FLOAT,
56-
median_received_value FLOAT,
57-
median_total_value_for_sender FLOAT,
58-
median_total_value_for_receiver FLOAT,
59-
60-
mode_sent_value FLOAT,
61-
mode_received_value FLOAT,
62-
mode_total_value_for_sender FLOAT,
63-
mode_total_value_for_receiver FLOAT,
64-
65-
stddev_sent_value FLOAT,
66-
stddev_received_value FLOAT,
67-
stddev_total_value_for_sender FLOAT,
68-
stddev_total_value_for_receiver FLOAT,
69-
70-
num_sent_transactions FLOAT,
71-
num_received_transactions FLOAT,
72-
73-
avg_time_between_sent_transactions FLOAT,
74-
avg_time_between_received_transactions FLOAT,
75-
76-
avg_outgoing_speed_count FLOAT,
77-
avg_incoming_speed_count FLOAT,
78-
avg_outgoing_speed_value FLOAT,
79-
avg_incoming_speed_value FLOAT,
80-
81-
avg_outgoing_acceleration_count FLOAT,
82-
avg_incoming_acceleration_count FLOAT,
83-
avg_outgoing_acceleration_value FLOAT,
84-
avg_incoming_acceleration_value FLOAT,
85-
86-
avg_fee_paid FLOAT,
87-
total_fee_paid FLOAT,
88-
min_fee_paid FLOAT,
89-
max_fee_paid FLOAT,
90-
91-
activity_duration_for_sender FLOAT,
92-
first_transaction_timestamp_for_sender FLOAT,
93-
last_transaction_timestamp_for_sender FLOAT,
94-
95-
activity_duration_for_receiver FLOAT,
96-
first_transaction_timestamp_for_receiver FLOAT,
97-
last_transaction_timestamp_for_receiver FLOAT,
98-
99-
unique_out_degree FLOAT,
100-
unique_in_degree FLOAT
101-
)
102-
PARTITIONED BY (network_name)
103-
LOCATION 's3://bdp-features'
104-
TBLPROPERTIES (
105-
'table_type' = 'ICEBERG',
106-
'write.format.default' = 'parquet',
107-
'write.parquet.compression-codec' = 'zstd'
108-
)
109-
""")
23+
spark.sql(""" ALTER TABLE glue_catalog.bdp.features ADD IF NOT EXISTS
24+
PARTITION (network_name = true)
25+
PARTITION (network_name = false) """)
11026

11127

11228
cols_dict = {

scripts/cloud/etl/transactions_cleaning.py

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -179,26 +179,9 @@ def setup_blockchain_db(spark):
179179

180180

181181
def setup_iceberg_table(spark):
182-
spark.sql("""
183-
CREATE TABLE IF NOT EXISTS glue_catalog.bdp.cleaned_transactions (
184-
transaction_id STRING,
185-
block_timestamp TIMESTAMP,
186-
block_number BIGINT,
187-
transaction_hash STRING,
188-
transaction_index BIGINT,
189-
fee DOUBLE,
190-
sender_address STRING,
191-
receiver_address STRING,
192-
total_transferred_value DOUBLE,
193-
total_input_value DOUBLE,
194-
sent_value DOUBLE,
195-
received_value DOUBLE,
196-
network_name STRING
197-
)
198-
PARTITIONED BY (network_name)
199-
LOCATION 's3://bdp-cleaned-transactions'
200-
TBLPROPERTIES ('table_type' = 'ICEBERG', 'write.format.default'='parquet', 'write.parquet.compression-codec'='zstd')
201-
""")
182+
spark.sql("""ALTER TABLE glue_catalog.bdp.cleaned_transactions ADD IF NOT EXISTS
183+
PARTITION (network_name = 'ethereum')
184+
PARTITION (network_name = 'bitcoin')""")
202185

203186
def validate_date(date_str: str) -> bool:
204187
try:

scripts/cloud/etl/wallets_aggregations.py

Lines changed: 3 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -161,84 +161,9 @@ def setup_blockchain_db(spark):
161161
""")
162162

163163
def setup_iceberg_table(spark):
164-
spark.sql("""
165-
CREATE TABLE IF NOT EXISTS glue_catalog.bdp.wallets_aggregations (
166-
address STRING,
167-
network_name STRING,
168-
169-
avg_sent_value DOUBLE,
170-
avg_received_value DOUBLE,
171-
avg_total_value_for_sender DOUBLE,
172-
avg_total_value_for_receiver DOUBLE,
173-
174-
sum_sent_value DOUBLE,
175-
sum_received_value DOUBLE,
176-
sum_total_value_for_sender DOUBLE,
177-
sum_total_value_for_receiver DOUBLE,
178-
179-
min_sent_value DOUBLE,
180-
min_received_value DOUBLE,
181-
min_total_value_for_sender DOUBLE,
182-
min_total_value_for_receiver DOUBLE,
183-
184-
max_sent_value DOUBLE,
185-
max_received_value DOUBLE,
186-
max_total_value_for_sender DOUBLE,
187-
max_total_value_for_receiver DOUBLE,
188-
189-
median_sent_value DOUBLE,
190-
median_received_value DOUBLE,
191-
median_total_value_for_sender DOUBLE,
192-
median_total_value_for_receiver DOUBLE,
193-
194-
mode_sent_value DOUBLE,
195-
mode_received_value DOUBLE,
196-
mode_total_value_for_sender DOUBLE,
197-
mode_total_value_for_receiver DOUBLE,
198-
199-
stddev_sent_value DOUBLE,
200-
stddev_received_value DOUBLE,
201-
stddev_total_value_for_sender DOUBLE,
202-
stddev_total_value_for_receiver DOUBLE,
203-
204-
num_sent_transactions BIGINT,
205-
num_received_transactions BIGINT,
206-
207-
avg_time_between_sent_transactions DOUBLE,
208-
avg_time_between_received_transactions DOUBLE,
209-
210-
avg_outgoing_speed_count DOUBLE,
211-
avg_incoming_speed_count DOUBLE,
212-
avg_outgoing_speed_value DOUBLE,
213-
avg_incoming_speed_value DOUBLE,
214-
215-
avg_outgoing_acceleration_count DOUBLE,
216-
avg_incoming_acceleration_count DOUBLE,
217-
avg_outgoing_acceleration_value DOUBLE,
218-
avg_incoming_acceleration_value DOUBLE,
219-
220-
avg_fee_paid DOUBLE,
221-
total_fee_paid DOUBLE,
222-
min_fee_paid DOUBLE,
223-
max_fee_paid DOUBLE,
224-
225-
activity_duration BIGINT,
226-
first_transaction_timestamp TIMESTAMP,
227-
last_transaction_timestamp TIMESTAMP,
228-
229-
unique_out_degree BIGINT,
230-
unique_in_degree BIGINT
231-
)
232-
PARTITIONED BY (network_name)
233-
LOCATION 's3://bdp-wallets-aggregations'
234-
TBLPROPERTIES (
235-
'table_type' = 'ICEBERG',
236-
'write.format.default' = 'parquet',
237-
'write.parquet.compression-codec' = 'zstd',
238-
'write.bucketed-columns' = 'address',
239-
'write.num-buckets' = '2048'
240-
)
241-
""")
164+
spark.sql("""ALTER TABLE glue_catalog.bdp.wallets_aggregations ADD IF NOT EXISTS
165+
PARTITION (network_name = 'ethereum')
166+
PARTITION (network_name = 'bitcoin') """)
242167

243168
spark = (
244169
SparkSession.builder.appName("DataAggregations")

0 commit comments

Comments
 (0)