From 407234975ff2bb5f861d6cbe95cd5e0b010de972 Mon Sep 17 00:00:00 2001 From: Carson Davis Date: Fri, 21 Feb 2025 13:30:14 -0600 Subject: [PATCH 01/12] add initial processing script for full text dumps --- scripts/sde_dump_processing/clean_sde_dump.py | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 scripts/sde_dump_processing/clean_sde_dump.py diff --git a/scripts/sde_dump_processing/clean_sde_dump.py b/scripts/sde_dump_processing/clean_sde_dump.py new file mode 100644 index 00000000..7748e66c --- /dev/null +++ b/scripts/sde_dump_processing/clean_sde_dump.py @@ -0,0 +1,54 @@ +import csv + + +def process_large_csv(input_filename, output_filename): + # Open the input file for reading and the output file for writing. + # We assume UTF-8 encoding (adjust if necessary). + with open(input_filename, encoding="utf-8") as infile, open( + output_filename, "w", encoding="utf-8", newline="" + ) as outfile: + + writer = csv.writer(outfile) + # Write header if needed: + writer.writerow(["id", "url1", "title", "collection", "treepath", "sourcestr56", "text", "sourcebool3"]) + + current_record = "" + for line in infile: + # Remove the trailing newline from the line. + line = line.rstrip("\n") + # If the line starts with /SDE/, it signals the beginning of a new row. + if line.startswith("/SDE/"): + # If we already have a record accumulated, process it. + if current_record: + # Split the record into exactly 8 fields. + # Using maxsplit=7 ensures that any additional occurrences of '火' + # (for example in the text field) remain intact. + parts = current_record.split("火", 7) + # Optional: normalize the text field if needed. + # For example, replace literal newline characters within the text with "\n". + if len(parts) == 8: + parts[6] = parts[6].replace("\n", "\\n") + writer.writerow(parts) + else: + # Handle unexpected formatting issues (e.g. log or skip the record). + print("Warning: Expected 8 fields, got", len(parts)) + # Start a new record with the current line. + current_record = line + else: + # Otherwise, this line is a continuation of the current record. + current_record += "\n" + line + + # After the loop, process the last accumulated record. + if current_record: + parts = current_record.split("火", 7) + if len(parts) == 8: + parts[6] = parts[6].replace("\n", "\\n") + writer.writerow(parts) + else: + print("Warning: Expected 8 fields, got", len(parts)) + + +if __name__ == "__main__": + # Replace with your actual file names. + # process_large_csv("./dump.csv", "./cleaned_dump.csv") + process_large_csv("./tests/original.csv", "./actual_script_output.csv") From c650645249f7611150980bd0bce59a92f454c4bf Mon Sep 17 00:00:00 2001 From: Carson Davis Date: Tue, 25 Feb 2025 16:46:46 -0600 Subject: [PATCH 02/12] add processing scripts for csv --- scripts/sde_dump_processing/clean_sde_dump.py | 24 +++------ .../validate_csv_structure.py | 53 +++++++++++++++++++ scripts/sde_dump_processing/view_one_row.py | 37 +++++++++++++ 3 files changed, 97 insertions(+), 17 deletions(-) create mode 100644 scripts/sde_dump_processing/validate_csv_structure.py create mode 100644 scripts/sde_dump_processing/view_one_row.py diff --git a/scripts/sde_dump_processing/clean_sde_dump.py b/scripts/sde_dump_processing/clean_sde_dump.py index 7748e66c..b8dfeace 100644 --- a/scripts/sde_dump_processing/clean_sde_dump.py +++ b/scripts/sde_dump_processing/clean_sde_dump.py @@ -3,7 +3,6 @@ def process_large_csv(input_filename, output_filename): # Open the input file for reading and the output file for writing. - # We assume UTF-8 encoding (adjust if necessary). with open(input_filename, encoding="utf-8") as infile, open( output_filename, "w", encoding="utf-8", newline="" ) as outfile: @@ -14,33 +13,25 @@ def process_large_csv(input_filename, output_filename): current_record = "" for line in infile: - # Remove the trailing newline from the line. line = line.rstrip("\n") - # If the line starts with /SDE/, it signals the beginning of a new row. - if line.startswith("/SDE/"): - # If we already have a record accumulated, process it. + # Skip lines until the first record is found. + if not current_record and not (line.startswith("/SDE/") or line.startswith("/SDE-TDAMM/")): + continue + if line.startswith("/SDE/") or line.startswith("/SDE-TDAMM/"): if current_record: - # Split the record into exactly 8 fields. - # Using maxsplit=7 ensures that any additional occurrences of '火' - # (for example in the text field) remain intact. - parts = current_record.split("火", 7) - # Optional: normalize the text field if needed. - # For example, replace literal newline characters within the text with "\n". + parts = current_record.split("༜", 7) if len(parts) == 8: parts[6] = parts[6].replace("\n", "\\n") writer.writerow(parts) else: - # Handle unexpected formatting issues (e.g. log or skip the record). print("Warning: Expected 8 fields, got", len(parts)) - # Start a new record with the current line. current_record = line else: - # Otherwise, this line is a continuation of the current record. current_record += "\n" + line # After the loop, process the last accumulated record. if current_record: - parts = current_record.split("火", 7) + parts = current_record.split("༜", 7) if len(parts) == 8: parts[6] = parts[6].replace("\n", "\\n") writer.writerow(parts) @@ -50,5 +41,4 @@ def process_large_csv(input_filename, output_filename): if __name__ == "__main__": # Replace with your actual file names. - # process_large_csv("./dump.csv", "./cleaned_dump.csv") - process_large_csv("./tests/original.csv", "./actual_script_output.csv") + process_large_csv("./inputs/dump_delimeter.csv", "./outputs/cleaned_dump_delimeter.csv") diff --git a/scripts/sde_dump_processing/validate_csv_structure.py b/scripts/sde_dump_processing/validate_csv_structure.py new file mode 100644 index 00000000..97180742 --- /dev/null +++ b/scripts/sde_dump_processing/validate_csv_structure.py @@ -0,0 +1,53 @@ +import csv +import sys + + +def analyze_output_csv(filename): + # Increase CSV field size limit + csv.field_size_limit(sys.maxsize) + + total_rows = 0 + correct_rows = 0 + incorrect_rows = 0 + + with open(filename, encoding="utf-8") as f: + reader = csv.reader(f) + + # Attempt to read a header row + header = next(reader, None) + if header: + print("Header row:", header) + + for row_index, row in enumerate(reader, start=2): + total_rows += 1 + + # Check for exactly 8 columns + if len(row) != 8: + incorrect_rows += 1 + print(f"[WARNING] Row {row_index} has {len(row)} columns (expected 8). Row data") + continue + + # Optional: ensure last column is strictly 'true' or 'false' + if row[7] not in ("true", "false"): + incorrect_rows += 1 + print(f"[WARNING] Row {row_index} last column not 'true' or 'false': {row[0]}, {row[7][:100]}") + continue + + correct_rows += 1 + + print("\n=== ANALYSIS COMPLETE ===") + print(f"Total data rows (excluding header): {total_rows}") + print(f"Correctly formatted rows: {correct_rows}") + print(f"Incorrectly formatted rows: {incorrect_rows}") + + +def main(): + import sys + + filename = sys.argv[1] if len(sys.argv) > 1 else "outputs/cleaned_dump_delimeter.csv" + print(f"Analyzing: {filename}") + analyze_output_csv(filename) + + +if __name__ == "__main__": + main() diff --git a/scripts/sde_dump_processing/view_one_row.py b/scripts/sde_dump_processing/view_one_row.py new file mode 100644 index 00000000..2ee98479 --- /dev/null +++ b/scripts/sde_dump_processing/view_one_row.py @@ -0,0 +1,37 @@ +import csv +import sys + + +def write_id_and_url_for_row(filename, row_number, output_file): + # Increase CSV field size limit so large fields won't cause errors + csv.field_size_limit(sys.maxsize) + + with open(filename, encoding="utf-8") as f: + reader = csv.reader(f) + + # Skip the header row + next(reader, None) + + # IMPORTANT: start=2 to match the validation script's row numbering + for current_row_index, row in enumerate(reader, start=2): + if current_row_index == row_number: + with open(output_file, "w", encoding="utf-8", newline="") as out_f: + writer = csv.writer(out_f) + writer.writerow(row) + return + + # If you get here, that row_number didn't exist + with open(output_file, "w", encoding="utf-8") as out_f: + out_f.write(f"Row {row_number} does not exist in {filename}.\n") + + +def main(): + # Example usage: + filename = "outputs/cleaned_dump_delimeter.csv" + output_file = "outputs/row_output.csv" + desired_row_number = 175655 # or wherever you want + write_id_and_url_for_row(filename, desired_row_number, output_file) + + +if __name__ == "__main__": + main() From 8f8d64e1267418195c0a12ee6234e88a16db5839 Mon Sep 17 00:00:00 2001 From: Dhanur Sharma Date: Thu, 3 Apr 2025 20:58:37 -0500 Subject: [PATCH 03/12] Added command --- .../management/commands/export_urls_to_csv.py | 157 ++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 sde_collections/management/commands/export_urls_to_csv.py diff --git a/sde_collections/management/commands/export_urls_to_csv.py b/sde_collections/management/commands/export_urls_to_csv.py new file mode 100644 index 00000000..afaa11d1 --- /dev/null +++ b/sde_collections/management/commands/export_urls_to_csv.py @@ -0,0 +1,157 @@ +"""" + docker-compose -f local.yml run --rm django python manage.py export_urls_to_csv \ + --output physics_of_the_cosmos.csv --collections physics_of_the_cosmos + +""" + +import csv +import json +import os +from pathlib import Path + +from django.apps import apps +from django.core.management.base import BaseCommand + +from sde_collections.models.collection import Collection +from sde_collections.models.collection_choice_fields import Divisions, DocumentTypes + + +class Command(BaseCommand): + help = "Export URLs from DumpUrl, DeltaUrl, or CuratedUrl models to CSV" + + def add_arguments(self, parser): + parser.add_argument( + "--model", + type=str, + default="CuratedUrl", + choices=["DumpUrl", "DeltaUrl", "CuratedUrl"], + help="Model to export (default: CuratedUrl)", + ) + parser.add_argument( + "--collections", nargs="+", type=str, help="Collection config_folders to filter by (default: all)" + ) + parser.add_argument( + "--output", type=str, default="exported_urls.csv", help="Output CSV file path (default: exported_urls.csv)" + ) + parser.add_argument( + "--batch-size", type=int, default=1000, help="Number of records to process in each batch (default: 1000)" + ) + + def handle(self, *args, **options): + model_name = options["model"] + collection_folders = options["collections"] + output_file = options["output"] + batch_size = options["batch_size"] + + csv_exports_dir = Path("csv_exports") + csv_exports_dir.mkdir(parents=True, exist_ok=True) + + output_file = os.path.join(csv_exports_dir, os.path.basename(output_file)) + + self.stdout.write(f"Exporting {model_name} to {output_file}") + + # Get the model class + model_class = apps.get_model("sde_collections", model_name) + + # Build the queryset + queryset = model_class.objects.all() + + # Filter by collections if specified + if collection_folders: + collections = Collection.objects.filter(config_folder__in=collection_folders) + if not collections.exists(): + self.stderr.write(self.style.ERROR("No collections found with the specified folder names")) + return + queryset = queryset.filter(collection__in=collections) + self.stdout.write(f"Filtering by {len(collections)} collections") + + # Get total count for progress reporting + total_count = queryset.count() + if total_count == 0: + self.stdout.write(self.style.WARNING(f"No {model_name} records found matching the criteria")) + return + + self.stdout.write(f"Found {total_count} records to export") + + # Define all fields to export + base_fields = [ + "url", + "scraped_title", + "scraped_text", + "generated_title", + "visited", + "document_type", + "division", + ] + + # Add paired field tags separately + tag_fields = ["tdamm_tag_manual", "tdamm_tag_ml"] + + # Add model-specific fields + model_specific_fields = [] + if model_name == "DeltaUrl": + model_specific_fields.append("to_delete") + + # Add collection field + collection_fields = ["collection__name", "collection__config_folder"] + + # Combine all fields + fields = base_fields + tag_fields + model_specific_fields + collection_fields + + # Get choice dictionaries for lookup + document_type_choices = dict(DocumentTypes.choices) + division_choices = dict(Divisions.choices) + + # Write to CSV + try: + with open(output_file, "w", newline="", encoding="utf-8") as csv_file: + writer = csv.writer(csv_file) + + # Write header + writer.writerow(fields) + + # Write data rows in batches + processed_count = 0 + for i in range(0, total_count, batch_size): + batch = queryset[i : i + batch_size] + + for obj in batch: + row = [] + for field in fields: + if field == "collection__name": + value = obj.collection.name + elif field == "collection__config_folder": + value = obj.collection.config_folder + elif field == "document_type": + doc_type = getattr(obj, field) + if doc_type is not None: + value = document_type_choices.get(doc_type, str(doc_type)) + else: + value = "" + elif field == "division": + div = getattr(obj, field) + if div is not None: + value = division_choices.get(div, str(div)) + else: + value = "" + elif field in tag_fields: + tags = getattr(obj, field) + if tags: + value = json.dumps(tags) + else: + value = "" + else: + value = getattr(obj, field, "") + row.append(value) + writer.writerow(row) + processed_count += 1 + + # Report progress + progress_pct = processed_count / total_count * 100 + self.stdout.write(f"Processed {processed_count}/{total_count} records ({progress_pct:.1f}%)") + + self.stdout.write(self.style.SUCCESS(f"Successfully exported {processed_count} URLs to {output_file}")) + + except Exception as e: + self.stderr.write(self.style.ERROR(f"Error exporting to CSV: {e}")) + raise From c456efe7edfd1472f5b961099abc22073356f7f9 Mon Sep 17 00:00:00 2001 From: Dhanur Sharma Date: Thu, 3 Apr 2025 21:06:25 -0500 Subject: [PATCH 04/12] Added option for full_text export --- .../management/commands/export_urls_to_csv.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/sde_collections/management/commands/export_urls_to_csv.py b/sde_collections/management/commands/export_urls_to_csv.py index afaa11d1..c62118e8 100644 --- a/sde_collections/management/commands/export_urls_to_csv.py +++ b/sde_collections/management/commands/export_urls_to_csv.py @@ -36,6 +36,9 @@ def add_arguments(self, parser): parser.add_argument( "--batch-size", type=int, default=1000, help="Number of records to process in each batch (default: 1000)" ) + parser.add_argument( + "--full_text", action="store_true", default=False, help="Include full text in export (default: False)" + ) def handle(self, *args, **options): model_name = options["model"] @@ -77,13 +80,19 @@ def handle(self, *args, **options): base_fields = [ "url", "scraped_title", - "scraped_text", "generated_title", "visited", "document_type", "division", ] + # Add scraped_text only if full_text is True + if options["full_text"]: + base_fields.append("scraped_text") + self.stdout.write("Including full text content in export") + else: + self.stdout.write("Excluding full text content from export (use --full_text to include)") + # Add paired field tags separately tag_fields = ["tdamm_tag_manual", "tdamm_tag_ml"] From 7f177246648a109414c98a84a12834e7b1f7d79d Mon Sep 17 00:00:00 2001 From: Dhanur Sharma Date: Thu, 3 Apr 2025 21:25:52 -0500 Subject: [PATCH 05/12] Added changelog --- CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ab909b0..84efc532 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -183,3 +183,14 @@ For each PR made, an entry should be added to this changelog. It should contain - physics_of_the_cosmos - stsci_space_telescope_science_institute - Once the front end has been updated to allow for tag edits, all astrophysics collections will be marked to be run through the pipeline + +- 1298-csv-export-command-for-urls + - Description: Added a new Django management command to export URLs (DumpUrl, DeltaUrl, or CuratedUrl) to CSV files for analysis or backup purposes. The command allows filtering by collection and provides configurable export options. + - Changes: + - Created a new management command `export_urls_to_csv.py` to extract URL data to CSV format + - Implemented options to filter exports by model type and specific collections + - Added support for excluding full text content with the `--full_text` flag to reduce file size + - Included proper handling for paired fields (tdamm_tag_manual, tdamm_tag_ml) + - Added automatic creation of a dedicated `csv_exports` directory for storing export files + - Implemented batched processing to efficiently handle large datasets + - Added progress reporting during export operations From 4b2f32c9b8e7ea8c6f7910026f3261f090752812 Mon Sep 17 00:00:00 2001 From: Carson Davis Date: Mon, 7 Apr 2025 09:47:55 -0500 Subject: [PATCH 06/12] Update sde_collections/management/commands/export_urls_to_csv.py --- sde_collections/management/commands/export_urls_to_csv.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sde_collections/management/commands/export_urls_to_csv.py b/sde_collections/management/commands/export_urls_to_csv.py index c62118e8..6e5c332d 100644 --- a/sde_collections/management/commands/export_urls_to_csv.py +++ b/sde_collections/management/commands/export_urls_to_csv.py @@ -81,7 +81,6 @@ def handle(self, *args, **options): "url", "scraped_title", "generated_title", - "visited", "document_type", "division", ] From c0e017e250cd7a5a80ec7ad57ec12afb16df4e51 Mon Sep 17 00:00:00 2001 From: Carson Davis Date: Thu, 24 Apr 2025 10:03:45 -0500 Subject: [PATCH 07/12] add script to dump curated url list with excludes --- scripts/dump_url_list_excludes_includes.py | 168 +++++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 scripts/dump_url_list_excludes_includes.py diff --git a/scripts/dump_url_list_excludes_includes.py b/scripts/dump_url_list_excludes_includes.py new file mode 100644 index 00000000..6fd11065 --- /dev/null +++ b/scripts/dump_url_list_excludes_includes.py @@ -0,0 +1,168 @@ +""" +this is meant to be run from within a shell. you can do it in the following way: + +establish a coding container + +```shell +tmux new -s docker_django +tmux attach -t docker_django +tmux kill-session -t docker_django +``` + +```bash +dmshell +``` + +copy paste this code into the shell and run it + +getting the info out of the container + +```bash +docker cp 593dab064a15:/tmp/curated_urls_status.json ./curated_urls_status.json +``` + +move it onto local +```bash +scp sde:/home/ec2-user/sde_indexing_helper/curated_urls_status.json . +``` + +""" + +import concurrent.futures +import json +import os +from collections import defaultdict + +from django.db import connection + +from sde_collections.models.delta_url import CuratedUrl + + +def process_chunk(chunk_start, chunk_size, total_count): + """Process a chunk of curated URLs and return data grouped by collection""" + # Close any existing DB connections to avoid sharing connections between processes + connection.close() + + # Get the chunk of data with collection information + curated_urls_chunk = ( + CuratedUrl.objects.select_related("collection") + .all() + .with_exclusion_status() + .order_by("url")[chunk_start : chunk_start + chunk_size] + ) + + # Group URLs by collection folder name + collection_data = defaultdict(list) + for url in curated_urls_chunk: + collection_folder = url.collection.config_folder + included = not url.excluded # Convert to boolean inclusion status + + collection_data[collection_folder].append({"url": url.url, "included": included}) + + # Save to a temporary file + temp_path = f"/tmp/chunk{chunk_start}.json" + with open(temp_path, "w") as f: + json.dump(dict(collection_data), f) + + processed = min(chunk_start + chunk_size, total_count) + print(f"Processed {processed}/{total_count} URLs") + + return temp_path + + +def export_curated_urls_with_status(): + """Export all curated URLs with their inclusion status, grouped by collection""" + output_path = "/tmp/curated_urls_status.json" + + # Get the total count and status statistics + curated_urls = CuratedUrl.objects.all().with_exclusion_status() + total_count = curated_urls.count() + excluded_count = curated_urls.filter(excluded=True).count() + included_count = curated_urls.filter(excluded=False).count() + + print(f"Total URLs: {total_count}") + print(f" Excluded: {excluded_count}") + print(f" Included: {included_count}") + + # Define chunk size and calculate number of chunks + chunk_size = 10000 + chunk_starts = list(range(0, total_count, chunk_size)) + + # Process chunks in parallel + temp_files = [] + with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: + # Submit all tasks + future_to_chunk = { + executor.submit(process_chunk, chunk_start, chunk_size, total_count): chunk_start + for chunk_start in chunk_starts + } + + # Collect results as they complete + for future in concurrent.futures.as_completed(future_to_chunk): + chunk_start = future_to_chunk[future] + try: + temp_file = future.result() + temp_files.append(temp_file) + except Exception as e: + print(f"Chunk starting at {chunk_start} generated an exception: {e}") + + # Combine all temp files into final output + combined_data = {} + + # Sort temp files by chunk start position + temp_files.sort(key=lambda x: int(os.path.basename(x).replace("chunk", "").split(".")[0])) + + for temp_file in temp_files: + with open(temp_file) as infile: + chunk_data = json.load(infile) + # Merge chunk data into combined data + for collection_folder, urls in chunk_data.items(): + if collection_folder not in combined_data: + combined_data[collection_folder] = [] + combined_data[collection_folder].extend(urls) + + # Clean up temp file + os.unlink(temp_file) + + # Write the final combined data + with open(output_path, "w") as outfile: + json.dump(combined_data, outfile, indent=2) + + # Verify export completed successfully + if os.path.exists(output_path): + file_size_mb = os.path.getsize(output_path) / (1024 * 1024) + print(f"Export complete. File saved to: {output_path}") + print(f"File size: {file_size_mb:.2f} MB") + + # Sanity check: Count the total included and excluded URLs in the final file + final_included = 0 + final_excluded = 0 + + # Read the file back and count + with open(output_path) as infile: + file_data = json.load(infile) + for collection_folder, urls in file_data.items(): + for url_data in urls: + if url_data["included"]: + final_included += 1 + else: + final_excluded += 1 + + print("\nSanity check on final file:") + print(f"Total URLs in file: {final_included + final_excluded}") + print(f" Included: {final_included}") + print(f" Excluded: {final_excluded}") + + # Check if counts match + if final_included == included_count and final_excluded == excluded_count: + print("✅ Counts match database query results!") + else: + print("⚠️ Warning: Final counts don't match initial database query!") + print(f" Database included: {included_count}, File included: {final_included}") + print(f" Database excluded: {excluded_count}, File excluded: {final_excluded}") + else: + print("ERROR: Output file was not created!") + + +# Run the export function +export_curated_urls_with_status() From 52798571ae06317147f0e61004818f36877726f2 Mon Sep 17 00:00:00 2001 From: Carson Davis Date: Tue, 29 Apr 2025 14:48:01 -0500 Subject: [PATCH 08/12] update the processing script --- scripts/sde_dump_processing/clean_sde_dump.py | 383 ++++++++++++++++-- 1 file changed, 348 insertions(+), 35 deletions(-) diff --git a/scripts/sde_dump_processing/clean_sde_dump.py b/scripts/sde_dump_processing/clean_sde_dump.py index b8dfeace..11ac191c 100644 --- a/scripts/sde_dump_processing/clean_sde_dump.py +++ b/scripts/sde_dump_processing/clean_sde_dump.py @@ -1,44 +1,357 @@ +""" +to run +python clean_sde_dump.py inputs/sde_init_dump_04_28_2025.csv outputs/input.csv -v + +to compress +7z a -t7z -m0=lzma2 -mx=9 -mmt=on -md=512m outputs/output.7z outputs/input.csv +""" + +import argparse import csv +import logging +import os +import sys +import time +from datetime import timedelta +# Set up logging +# Define a custom formatter to include milliseconds +log_formatter = logging.Formatter("%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S") +# Get the root logger +logger = logging.getLogger() +logger.setLevel(logging.INFO) # Default level +# Create a handler and set the formatter +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(log_formatter) +# Add the handler to the logger +# Check if handlers already exist to avoid duplicates in interactive environments +if not logger.hasHandlers(): + logger.addHandler(stream_handler) -def process_large_csv(input_filename, output_filename): - # Open the input file for reading and the output file for writing. - with open(input_filename, encoding="utf-8") as infile, open( - output_filename, "w", encoding="utf-8", newline="" - ) as outfile: - - writer = csv.writer(outfile) - # Write header if needed: - writer.writerow(["id", "url1", "title", "collection", "treepath", "sourcestr56", "text", "sourcebool3"]) - - current_record = "" - for line in infile: - line = line.rstrip("\n") - # Skip lines until the first record is found. - if not current_record and not (line.startswith("/SDE/") or line.startswith("/SDE-TDAMM/")): - continue - if line.startswith("/SDE/") or line.startswith("/SDE-TDAMM/"): - if current_record: - parts = current_record.split("༜", 7) - if len(parts) == 8: - parts[6] = parts[6].replace("\n", "\\n") - writer.writerow(parts) - else: - print("Warning: Expected 8 fields, got", len(parts)) - current_record = line - else: - current_record += "\n" + line - # After the loop, process the last accumulated record. +def assemble_records_generator(input_filename, skip_header=False, progress_interval=100000): + """ + Reads the input file line by line and yields assembled multi-line records. + Optionally skips the first line if it's a header. + """ + current_record = "" + record_started = False + lines_processed = 0 + start_time = time.time() + last_report_time = start_time + record_count = 0 # Keep track of records yielded + + logger.info(f"Starting to process lines from {input_filename}") + + try: + with open(input_filename, encoding="utf-8") as infile: + # Skip the header line if requested + if skip_header: + try: + first_line = next(infile) + lines_processed += 1 + logger.info(f"Skipped header line: {first_line.rstrip()}") + except StopIteration: + logger.warning("Input file appears to be empty or only contains a header.") + return # Stop if file is empty after header + + # Process remaining lines + while True: + try: + line = next(infile) + lines_processed += 1 + + # Report progress periodically based on lines read + if lines_processed % progress_interval == 0: + current_time = time.time() + elapsed = current_time - start_time + rate = lines_processed / elapsed if elapsed > 0 else 0 + + # Only log if at least 5 seconds have passed since the last report + if current_time - last_report_time >= 5: + logger.info(f"Read {lines_processed:,} lines - " f"Rate: {rate:.0f} lines/sec") + last_report_time = current_time + + line = line.rstrip("\n") + + # Check if this line is the start of a new record + is_record_start = ( + line.startswith("/SDE/") or line.startswith("/SDE-TDAMM/") or line.startswith("/scrapers/") + ) + + # Skip lines until the first record is found + # (This check might be less relevant if the first data line always starts a record) + if not record_started and not is_record_start: + # If we skipped the header, the first line *should* be a record start + # Log a warning if it's not, but continue processing anyway + if skip_header and lines_processed == 2: # Only warn on the first actual data line + logger.warning( + f"First data line does not start with a recognized record prefix: {line[:100]}..." + ) + # continue -> We should actually process this line if it's part of the first record + + if is_record_start: + # If we've already been building a record, yield it + if current_record: + yield current_record + record_count += 1 + + # Start a new record + current_record = line + record_started = True + elif record_started: # Only append if a record has started + # Append to the current record + current_record += "\n" + line + # Handle case where a non-starting line is encountered before the first record start + # This might happen if skip_header=False and there's preamble before the first /SDE/ + elif not record_started and not is_record_start: + continue # Explicitly skip lines before the first record start marker + + except StopIteration: # End of file + break + except UnicodeDecodeError as e: + logger.warning(f"Skipping line {lines_processed} due to UnicodeDecodeError: {e}") + continue # Skip the problematic line + + # Yield the last record if there is one if current_record: - parts = current_record.split("༜", 7) - if len(parts) == 8: - parts[6] = parts[6].replace("\n", "\\n") - writer.writerow(parts) + yield current_record + record_count += 1 + + elapsed = time.time() - start_time + logger.info( + f"Finished reading {lines_processed:,} lines and found {record_count:,} raw records in {elapsed:.2f} seconds" # noqa + ) + + except FileNotFoundError: + logger.error(f"Input file not found: {input_filename}") + raise + + +def process_records_generator(raw_records_iterator, delimiter="༜", expected_fields=None, batch_size=50000): + """ + Processes records from an iterator, yielding valid processed records. + Requires expected_fields to be specified. + """ + if expected_fields is None: + raise ValueError("process_records_generator requires 'expected_fields' to be specified.") + + processed_count = 0 + invalid_records = 0 + start_time = time.time() + last_report_time = start_time + input_record_index = 0 # Track input records processed for logging + + logger.info(f"Starting record processing (expecting {expected_fields} fields)...") + + for i, record in enumerate(raw_records_iterator): + input_record_index = i + 1 + # Split into AT MOST expected_fields parts. The last part will contain the rest. + parts = record.split(delimiter, expected_fields - 1) + + if len(parts) == expected_fields: + # Replace literal newlines with '\\n' string in the text field + # The text field is the last one, index expected_fields - 1 + text_field_index = expected_fields - 1 + if text_field_index < len(parts): # Ensure index exists + parts[text_field_index] = parts[text_field_index].replace("\n", "\\n") + else: + # This case should technically not happen if len(parts) == expected_fields + # but adding a safeguard. + logger.warning( + f"Record ~{input_record_index}: Field index {text_field_index} out of bounds unexpectedly." + ) + + processed_count += 1 + yield parts # Yield the valid processed record + else: + invalid_records += 1 + if invalid_records <= 10: # Log only the first 10 invalid records + logger.warning( + f"Record ~{input_record_index}: Expected {expected_fields} fields, got {len(parts)}. Skipping." + ) + # Optionally log the problematic record content (careful with large fields) + # logger.debug(f"Problematic record content (first 100 chars): {record[:100]}") + # Do not yield invalid records + + # Report progress periodically based on input records processed + if input_record_index % batch_size == 0: + current_time = time.time() + if current_time - last_report_time >= 5: + elapsed = current_time - start_time + rate = input_record_index / elapsed if elapsed > 0 else 0 + logger.info( + f"Processed {input_record_index:,} raw records ({processed_count:,} valid) - " + f"Rate: {rate:.0f} records/sec" + ) + last_report_time = current_time + + if invalid_records > 10: + logger.warning(f"Additional {invalid_records - 10} invalid records were skipped (not shown in logs)") + + logger.info( + f"Finished processing {input_record_index:,} raw records. Found {processed_count:,} valid records and {invalid_records:,} invalid records." # noqa + ) + + +def write_output_file(output_filename, processed_records_iterator, header=None, batch_size=100000): + """Write the processed records from an iterator to the output CSV file.""" + # Create the directory if it doesn't exist + output_dir = os.path.dirname(output_filename) + # Check if output_dir is not empty before creating + if output_dir and not os.path.exists(output_dir): + logger.info(f"Creating output directory: {output_dir}") + os.makedirs(output_dir, exist_ok=True) + + records_written = 0 + start_time = time.time() + last_report_time = start_time + + logger.info(f"Starting to write records to {output_filename}") + + try: + with open(output_filename, "w", encoding="utf-8", newline="") as outfile: + writer = csv.writer(outfile) + + # Write header if provided + if header: + logger.info(f"Writing header: {header}") + writer.writerow(header) else: - print("Warning: Expected 8 fields, got", len(parts)) + logger.warning("No header provided for output file.") + + # Write records as they come from the iterator + for i, record in enumerate(processed_records_iterator): + writer.writerow(record) + records_written += 1 + + # Report progress periodically + if records_written % batch_size == 0: + current_time = time.time() + if current_time - last_report_time >= 5: + elapsed = current_time - start_time + rate = records_written / elapsed if elapsed > 0 else 0 + logger.info(f"Wrote {records_written:,} records - " f"Rate: {rate:.0f} records/sec") + last_report_time = current_time + + total_time = time.time() - start_time + logger.info(f"Finished writing {records_written:,} records in {total_time:.2f} seconds") + + except OSError as e: + logger.error(f"Error writing to output file {output_filename}: {e}") + raise + + return records_written + + +def process_large_csv(input_filename, output_filename, delimiter="༜", verbose=False): + """ + Process a large CSV file iteratively using generators. + Determines header and field count from the first line of the input file. + """ + # Adjust logging level based on verbosity - should be set in main() + if verbose: + logger.setLevel(logging.INFO) + + logger.info(f"Processing {input_filename} to {output_filename} with delimiter '{delimiter}'") + total_start_time = time.time() + + header = None + expected_fields = 0 + + # --- Read header and determine expected fields --- + try: + with open(input_filename, encoding="utf-8") as infile: + first_line = next(infile).rstrip("\n") + header = first_line.split(delimiter) + expected_fields = len(header) + logger.info(f"Detected header: {header}") + logger.info(f"Expecting {expected_fields} fields based on header.") + if expected_fields == 0: + logger.error("Header line is empty or could not be split. Cannot proceed.") + return -1 + except FileNotFoundError: + logger.error(f"Input file not found: {input_filename}") + return -1 # Indicate file not found error + except StopIteration: + logger.error(f"Input file is empty: {input_filename}") + return -1 # Indicate empty file error + except Exception as e: + logger.error(f"Error reading header from {input_filename}: {e}", exc_info=True) + return -1 # Indicate general error during header read + # --- End header reading --- + + try: + # Create iterators/generators + # Pass skip_header=True to avoid processing the header line again + raw_records_iter = assemble_records_generator(input_filename, skip_header=True) + # Pass the dynamically determined expected_fields + processed_records_iter = process_records_generator(raw_records_iter, delimiter, expected_fields) + + # Write to output file by consuming the processed records iterator, using the detected header + records_written = write_output_file(output_filename, processed_records_iter, header) + + total_elapsed = time.time() - total_start_time + rate = records_written / total_elapsed if total_elapsed > 0 else 0 + logger.info( + f"Complete! Processed {records_written:,} valid records in {timedelta(seconds=int(total_elapsed))} " + f"(Average rate: {rate:.1f} records/sec)" + ) + return records_written + + except FileNotFoundError: + # Error already logged by assemble_records_generator if it happens later + # This is unlikely if header reading succeeded, but kept for safety + logger.error(f"Input file disappeared after reading header: {input_filename}") + return -1 + except Exception as e: + logger.error(f"An unexpected error occurred during processing: {e}", exc_info=True) + return -1 # Indicate general error + + +def main(): + """Main entry point for the script.""" + # Set up command line arguments + parser = argparse.ArgumentParser( + description="Process large CSV files with special formatting efficiently, detecting header automatically." + ) + parser.add_argument("input", help="Input file path") + parser.add_argument("output", help="Output file path") + parser.add_argument("--delimiter", default="༜", help="Field delimiter character (default: '༜')") + # Removed --fields argument + parser.add_argument("-v", "--verbose", action="store_true", help="Show detailed progress information (INFO level)") + parser.add_argument("-q", "--quiet", action="store_true", help="Suppress all output except errors (ERROR level)") + + args = parser.parse_args() + + # Adjust logging level based on args BEFORE calling processing function + if args.quiet: + logger.setLevel(logging.ERROR) + elif args.verbose: + logger.setLevel(logging.INFO) + else: + # Default level if neither -v nor -q is specified + logger.setLevel(logging.WARNING) + + # Process the CSV file using the arguments passed + # No longer passing expected_fields + records_processed = process_large_csv( + args.input, args.output, delimiter=args.delimiter, verbose=args.verbose # Pass verbose flag + ) + + if records_processed >= 0 and not args.quiet: + # Use standard print for final user confirmation message + print(f"Successfully processed {records_processed:,} records.") + return 0 + elif records_processed < 0: + # Error messages already logged + print("Processing failed. Check logs for details.", file=sys.stderr) + return 1 + else: # records_processed == 0, potentially valid but maybe unexpected + if not args.quiet: + print("Processing finished, but 0 valid records were written.") + return 0 if __name__ == "__main__": - # Replace with your actual file names. - process_large_csv("./inputs/dump_delimeter.csv", "./outputs/cleaned_dump_delimeter.csv") + sys.exit(main()) From 14b1fea91f1850ddb2cabaf41d3c314fc46458a8 Mon Sep 17 00:00:00 2001 From: Carson Davis Date: Wed, 30 Apr 2025 09:39:46 -0500 Subject: [PATCH 09/12] add changelog entry for clean_text_dump.py --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 84efc532..1c0d7ca1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,13 @@ For each PR made, an entry should be added to this changelog. It should contain - etc. ## Changelog +### 3.1.?? +- 1232-process-the-full-text-dump + - Description: A script was added `/scripts/sde_dump_processing/clean_text_dump.py` which cleans dumps from sinequa. The sinequa dump does not respect normal csv new line formatting, so that a dump of 1.8 million records becomes a csv of 900 million lines. This script can detect the headers and process the dump with the three possible sources TDAMM, SDE, and scripts, in order to create a final, clean csv. It has a simple CLI which allows setting the input and output, the verbosity of the logs, etc. Because the input files can be very large, the script streams them instead of holding them in memory. + - Changes: + - add file /scripts/sde_dump_processing/clean_text_dump.py` +### 3.1.0 - 1209-bug-fix-document-type-creator-form - Description: The dropdown on the pattern creation form needs to be set as multi as the default option since this is why the doc type creator form is used for the majority of multi-URL pattern creations. This should be applied to doc types, division types, and titles as well. - Changes: From 77031c7aa8c4596608c7fa5a8f065b15fa153601 Mon Sep 17 00:00:00 2001 From: Carson Davis Date: Wed, 11 Jun 2025 09:56:11 -0500 Subject: [PATCH 10/12] add a config restricting blank issues --- .github/ISSUE_TEMPLATE/config.yml | 1 + 1 file changed, 1 insertion(+) create mode 100644 .github/ISSUE_TEMPLATE/config.yml diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml new file mode 100644 index 00000000..3ba13e0c --- /dev/null +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -0,0 +1 @@ +blank_issues_enabled: false From 809c11fd49ca4a488c80b52748aa2eb0747e862e Mon Sep 17 00:00:00 2001 From: Dhanur Sharma Date: Fri, 21 Nov 2025 01:02:30 -0500 Subject: [PATCH 11/12] Added https://science.data.nasa.gov/ to CORS Allowed origins --- config/settings/base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/config/settings/base.py b/config/settings/base.py index 45097ab5..c9fa22fd 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -102,6 +102,7 @@ "http://sciencediscoveryengine.nasa.gov", "https://localhost:4200", "http://localhost:4200", + "https://science.data.nasa.gov/", ] # MIGRATIONS From 24b34173b64ccbcbb1299a61e8617ebc99532847 Mon Sep 17 00:00:00 2001 From: Carson Davis Date: Fri, 21 Nov 2025 11:34:34 -0600 Subject: [PATCH 12/12] add pbr as a bandit dependency --- .pre-commit-config.yaml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e00d2c3f..a0f4c06a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -61,18 +61,18 @@ repos: - types-requests - repo: https://github.com/PyCQA/bandit - rev: '1.7.0' + rev: "1.7.0" hooks: - id: bandit - args: ['-r', '--configfile=bandit-config.yml'] + args: ["-r", "--configfile=bandit-config.yml"] + additional_dependencies: + - pbr - repo: https://github.com/zricethezav/gitleaks - rev: 'v8.0.4' + rev: "v8.0.4" hooks: - id: gitleaks - args: ['--config=gitleaks-config.toml'] - - + args: ["--config=gitleaks-config.toml"] ci: autoupdate_schedule: weekly