Skip to content

Commit 49d1586

Browse files
committed
add format conversion scripts to glue and spearman feature selection
1 parent f7b26f6 commit 49d1586

3 files changed

Lines changed: 251 additions & 0 deletions

File tree

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import sys
2+
from awsglue.utils import getResolvedOptions
3+
from awsglue.context import GlueContext
4+
from awsglue.job import Job
5+
from pyspark.sql import SparkSession
6+
from pyspark.sql.functions import when, col
7+
import boto3
8+
9+
10+
def get_selected_columns():
11+
bucket_name = "bdp-feature-selection"
12+
file_key = "data/selected_columns.txt"
13+
14+
s3_client = boto3.client('s3')
15+
response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
16+
file_content = response['Body'].read().decode('utf-8')
17+
return file_content.splitlines()
18+
19+
20+
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
21+
22+
spark = (
23+
SparkSession.builder
24+
.appName("DataAggregations")
25+
.config("spark.sql.parquet.enableVectorizedReader", "true")
26+
.config("spark.sql.parquet.mergeSchema", "true")
27+
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
28+
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
29+
.config("spark.sql.catalog.glue_catalog.warehouse", "s3://bdp-scaled-features/")
30+
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
31+
.config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
32+
.config("spark.sql.catalog.glue_catalog.glue.id", "982534349340")
33+
.config("spark.sql.adaptive.enabled", "true")
34+
.getOrCreate()
35+
)
36+
37+
glueContext = GlueContext(spark)
38+
job = Job(glueContext)
39+
job.init(args['JOB_NAME'], args)
40+
41+
42+
columns_to_select = get_selected_columns()
43+
44+
features_df = glueContext.create_data_frame.from_catalog(
45+
database="bdp",
46+
table_name="scaled_features",
47+
additional_options = {
48+
"useCatalogSchema": True,
49+
"useSparkDataSource": True
50+
}
51+
).select(*columns_to_select)
52+
53+
#fraction = 1_000_000/features_df.count()
54+
#sampled_df = features_df.sample(withReplacement=False, fraction=fraction, seed=42)
55+
56+
features_df = features_df.withColumn("network_name", when(col("network_name") == True, 1.0).otherwise(0.0))
57+
58+
59+
features_df.coalesce(1).write \
60+
.format("csv") \
61+
.option("header", "false") \
62+
.mode("append") \
63+
.save("s3://bdp-test-data/scaled/")
64+
65+
job.commit()
66+
spark.stop()
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import sys
2+
from awsglue.utils import getResolvedOptions
3+
from awsglue.context import GlueContext
4+
from awsglue.job import Job
5+
from pyspark.sql import SparkSession
6+
from pyspark.sql.functions import when, col
7+
from pyspark.ml.feature import VectorAssembler
8+
from pyspark.ml.functions import vector_to_array, array_to_vector
9+
import sagemaker_pyspark
10+
import boto3
11+
12+
def get_selected_columns():
13+
bucket_name = "bdp-feature-selection"
14+
file_key = "data/selected_columns.txt"
15+
16+
s3_client = boto3.client('s3')
17+
response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
18+
file_content = response['Body'].read().decode('utf-8')
19+
return file_content.splitlines()
20+
21+
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
22+
23+
classpath = ":".join(sagemaker_pyspark.classpath_jars())
24+
25+
spark = (
26+
SparkSession.builder
27+
.appName("DataAggregations")
28+
.config("spark.sql.parquet.enableVectorizedReader", "true")
29+
.config("spark.sql.parquet.mergeSchema", "true")
30+
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
31+
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
32+
.config("spark.sql.catalog.glue_catalog.warehouse", "s3://bdp-scaled-features/")
33+
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
34+
.config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
35+
.config("spark.sql.catalog.glue_catalog.glue.id", "982534349340")
36+
.config("spark.sql.adaptive.enabled", "true")
37+
.config("spark.driver.extraClassPath", classpath)
38+
.config("spark.executor.extraClassPath", classpath)
39+
.getOrCreate()
40+
)
41+
42+
glueContext = GlueContext(spark)
43+
job = Job(glueContext)
44+
job.init(args['JOB_NAME'], args)
45+
46+
columns_to_select = get_selected_columns()
47+
48+
features_df = glueContext.create_data_frame.from_catalog(
49+
database="bdp",
50+
table_name="scaled_features",
51+
additional_options = {
52+
"useCatalogSchema": True,
53+
"useSparkDataSource": True
54+
}
55+
).select(*columns_to_select)
56+
57+
features_df = features_df.withColumn("network_name", when(col("network_name") == True, 1.0).otherwise(0.0))
58+
59+
assembler = VectorAssembler(
60+
inputCols=columns_to_select,
61+
outputCol="features"
62+
)
63+
64+
features_vector_df = assembler.transform(features_df)
65+
66+
dense_features_df = features_vector_df.withColumn(
67+
"features",
68+
array_to_vector(vector_to_array(col("features")))
69+
)
70+
71+
dense_features_df.select("features").write \
72+
.format("sagemaker") \
73+
.option("recordio-protobuf", "true") \
74+
.option("featureDim", len(columns_to_select)) \
75+
.mode("overwrite") \
76+
.save("s3://bdp-recordio/train/")
77+
78+
79+
80+
job.commit()
81+
spark.stop()

scripts/cloud/etl/spearman.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
from pyspark.sql import SparkSession
2+
from pyspark.ml.feature import VectorAssembler
3+
import pandas as pd
4+
import numpy as np
5+
import seaborn as sns
6+
import matplotlib.pyplot as plt
7+
from pyspark.ml.feature import VectorAssembler
8+
from pyspark.ml.stat import Correlation
9+
import sys
10+
from awsglue.utils import getResolvedOptions
11+
from awsglue.context import GlueContext
12+
from awsglue.job import Job
13+
import boto3
14+
import io
15+
16+
17+
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
18+
19+
spark = (
20+
SparkSession.builder
21+
.appName("FeatureSelection")
22+
.config("spark.sql.parquet.enableVectorizedReader", "true")
23+
.config("spark.sql.parquet.mergeSchema", "true")
24+
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
25+
.config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
26+
.config("spark.sql.catalog.glue_catalog.warehouse", "s3://bdp-scaled-features/")
27+
.config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
28+
.config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
29+
.config("spark.sql.catalog.glue_catalog.glue.id", "982534349340")
30+
.config("spark.sql.adaptive.enabled", "true")
31+
.getOrCreate()
32+
)
33+
34+
glueContext = GlueContext(spark)
35+
job = Job(glueContext)
36+
job.init(args['JOB_NAME'], args)
37+
38+
39+
features_df = glueContext.create_data_frame.from_catalog(
40+
database="bdp",
41+
table_name="scaled_features",
42+
additional_options = {
43+
"useCatalogSchema": True,
44+
"useSparkDataSource": True
45+
}
46+
)
47+
48+
feature_cols = [col for col in features_df.columns]
49+
50+
df_vectorized = VectorAssembler(inputCols=feature_cols, outputCol="features").transform(features_df)
51+
df_vectorized.cache()
52+
correlation_matrix = Correlation.corr(df_vectorized, "features", method="spearman").head()[0].toArray()
53+
correlation_matrix_np = np.array(correlation_matrix)
54+
correlation_matrix_df = pd.DataFrame(correlation_matrix_np, index=feature_cols, columns=feature_cols)
55+
df_vectorized.unpersist()
56+
57+
output_bucket = "bdp-feature-selection"
58+
59+
# Save correlation matrix to S3 as CSV
60+
s3_client = boto3.client('s3')
61+
correlation_csv_buffer = io.StringIO()
62+
correlation_matrix_df.to_csv(correlation_csv_buffer)
63+
s3_client.put_object(
64+
Bucket=output_bucket,
65+
Key="data/correlation_matrix.csv",
66+
Body=correlation_csv_buffer.getvalue()
67+
)
68+
69+
70+
threshold = 0.9
71+
to_remove = set()
72+
for i in range(len(correlation_matrix_np)):
73+
for j in range(i+1, len(correlation_matrix_np)):
74+
if abs(correlation_matrix_np[i, j]) > threshold:
75+
to_remove.add(feature_cols[j])
76+
77+
selected_columns = [col for col in feature_cols if col not in to_remove]
78+
79+
# Save selected columns (only names) to S3 as a plain text file
80+
selected_columns_buffer = io.StringIO()
81+
selected_columns_buffer.write("\n".join(selected_columns)) # Write column names line by line
82+
s3_client.put_object(
83+
Bucket=output_bucket,
84+
Key="data/selected_columns.txt",
85+
Body=selected_columns_buffer.getvalue(),
86+
ContentType="text/plain"
87+
)
88+
89+
# Generate and save the heatmap plot
90+
plt.figure(figsize=(12, 8))
91+
sns.heatmap(correlation_matrix_df, cmap="coolwarm", fmt=".2f", vmin=-1, vmax=1, annot=False)
92+
plt.title("Correlation Heatmap")
93+
heatmap_buffer = io.BytesIO()
94+
plt.savefig(heatmap_buffer, format='png', bbox_inches='tight')
95+
heatmap_buffer.seek(0)
96+
s3_client.put_object(
97+
Bucket=output_bucket,
98+
Key="data/correlation_heatmap.png",
99+
Body=heatmap_buffer,
100+
ContentType='image/png'
101+
)
102+
103+
job.commit()
104+
spark.stop()

0 commit comments

Comments
 (0)