Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 45 additions & 14 deletions scripts/sde_dump_processing/clean_sde_dump.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""
to run
python clean_sde_dump.py inputs/sde_init_dump_04_28_2025.csv outputs/input.csv -v
python3 scripts/sde_dump_processing/clean_sde_dump.py \
/Users/dhanursharma/Everything/sde/full_text_dump/dev_sde_index_cmr_related_urls_sample_2025-08-19.csv \
/Users/dhanursharma/Everything/sde/full_text_dump/processed_dev_sde_index_cmr_related_urls_sample_2025-08-19.csv -v

to compress
7z a -t7z -m0=lzma2 -mx=9 -mmt=on -md=512m outputs/output.7z outputs/input.csv
Expand Down Expand Up @@ -128,13 +131,15 @@ def assemble_records_generator(input_filename, skip_header=False, progress_inter
raise


def process_records_generator(raw_records_iterator, delimiter="༜", expected_fields=None, batch_size=50000):
def process_records_generator(raw_records_iterator, delimiter="༜", expected_fields=None, batch_size=50000, header=None):
"""
Processes records from an iterator, yielding valid processed records.
Requires expected_fields to be specified.
Requires expected_fields and a header to be specified.
"""
if expected_fields is None:
raise ValueError("process_records_generator requires 'expected_fields' to be specified.")
if header is None:
raise ValueError("process_records_generator requires a 'header' list to identify target fields.")

processed_count = 0
invalid_records = 0
Expand All @@ -144,23 +149,34 @@ def process_records_generator(raw_records_iterator, delimiter="༜", expected_fi

logger.info(f"Starting record processing (expecting {expected_fields} fields)...")

# --- Find indices of target full-text fields from the header ---
# Default to -1; if a field is not found, its processing will be skipped.
text_field_index = -1
data_product_desc_index = -1
try:
text_field_index = header.index("text")
logger.info(f"Target field 'text' found in header at index: {text_field_index}")
except ValueError:
logger.info("Target field 'text' not found in header. It will be skipped if not present.")
try:
data_product_desc_index = header.index("data_product_desc")
logger.info(f"Target field 'data_product_desc' found in header at index: {data_product_desc_index}")
except ValueError:
logger.info("Target field 'data_product_desc' not found in header. It will be skipped if not present.")

for i, record in enumerate(raw_records_iterator):
input_record_index = i + 1
# Split into AT MOST expected_fields parts. The last part will contain the rest.
parts = record.split(delimiter, expected_fields - 1)

if len(parts) == expected_fields:
# Replace literal newlines with '\\n' string in the text field
# The text field is the last one, index expected_fields - 1
text_field_index = expected_fields - 1
if text_field_index < len(parts): # Ensure index exists
# Replace newlines in the 'text' field if it was found
if text_field_index != -1 and text_field_index < len(parts):
parts[text_field_index] = parts[text_field_index].replace("\n", "\\n")
else:
# This case should technically not happen if len(parts) == expected_fields
# but adding a safeguard.
logger.warning(
f"Record ~{input_record_index}: Field index {text_field_index} out of bounds unexpectedly."
)

# Replace newlines in the 'data_product_desc' field if it was found
if data_product_desc_index != -1 and data_product_desc_index < len(parts):
parts[data_product_desc_index] = parts[data_product_desc_index].replace("\n", "\\n")

processed_count += 1
yield parts # Yield the valid processed record
Expand Down Expand Up @@ -265,6 +281,21 @@ def process_large_csv(input_filename, output_filename, delimiter="༜", verbose=
first_line = next(infile).rstrip("\n")
header = first_line.split(delimiter)
expected_fields = len(header)
# try:
# sourcestr15_index = header.index('sourcestr15')
# print("\n--- SCRIPT DEBUGGING INFO ---")
# print(f"Total fields found in header: {expected_fields}")
# print(f"0-based index of 'sourcestr15': {sourcestr15_index}")

# number_to_subtract = expected_fields - sourcestr15_index
# print(f"THE NUMBER TO SUBTRACT IS: {number_to_subtract}")
# print("-----------------------------\n")

# except ValueError:
# print("\n--- SCRIPT DEBUGGING ERROR ---")
# print("CRITICAL: 'sourcestr15' was NOT found in the input file's header.")
# print("This means the cleaning cannot be done.")
# print("-------------------------------\n")
logger.info(f"Detected header: {header}")
logger.info(f"Expecting {expected_fields} fields based on header.")
if expected_fields == 0:
Expand All @@ -285,8 +316,8 @@ def process_large_csv(input_filename, output_filename, delimiter="༜", verbose=
# Create iterators/generators
# Pass skip_header=True to avoid processing the header line again
raw_records_iter = assemble_records_generator(input_filename, skip_header=True)
# Pass the dynamically determined expected_fields
processed_records_iter = process_records_generator(raw_records_iter, delimiter, expected_fields)
# Pass the dynamically determined expected_fields and the header
processed_records_iter = process_records_generator(raw_records_iter, delimiter, expected_fields, header=header)

# Write to output file by consuming the processed records iterator, using the detected header
records_written = write_output_file(output_filename, processed_records_iter, header)
Expand Down