diff --git a/.gitignore b/.gitignore index 23b99e089..2815e7977 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,14 @@ __pycache__/ bibliovenv/ Bibenv/ -.idea/ \ No newline at end of file +.idea/ + +# ETL test files +scopus_test.csv +scopus_standardized.csv +openalex_standardized.csv +pubmed_standardized.csv +test_etl.py + +# Jupyter checkpoints +.ipynb_checkpoints/ \ No newline at end of file diff --git a/demo_etl_pipeline.ipynb b/demo_etl_pipeline.ipynb new file mode 100644 index 000000000..6c45c60c2 --- /dev/null +++ b/demo_etl_pipeline.ipynb @@ -0,0 +1,674 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "12ef2c5b-d603-48eb-a551-a5e36def9c2d", + "metadata": {}, + "source": [ + "# Bibliometrix ETL Pipeline - Demo Notebook\n", + "## From Heterogeneous Bibliographic Data to a Unified Schema\n", + "\n", + "This notebook demonstrates the full ETL pipeline developed for the Bibliometrix-Python project. The pipeline standardizes bibliographic data from multiple sources (Scopus, PubMed, OpenAlex) into a unified Web of Science-compatible schema.\n", + "\n", + "### Pipeline Architecture\n", + "- **Extract**: Load data from local files or REST APIs\n", + "- **Transform**: Rename columns, enforce types, handle nulls, calculate SR\n", + "- **Validate**: Check schema, types, and null values before analysis" + ] + }, + { + "cell_type": "markdown", + "id": "d081bd6b-c2ee-4fed-a5c9-8e832ac3f85e", + "metadata": {}, + "source": [ + "## 1. Setup and Imports" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "edabf0ee-54f2-4840-974d-be5f0c6885f2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "All modules imported successfully!\n" + ] + } + ], + "source": [ + "import sys\n", + "sys.path.insert(0, r\"C:\\Users\\mlosc\\bibliometrix-python\")\n", + "import pandas as pd\n", + "from www.services.etl.transformer import transform\n", + "from www.services.etl.validator import validate\n", + "from www.services.etl.api_retriever import retrieve_openalex, retrieve_pubmed\n", + "from www.services.etl.mappings import SCOPUS_CSV_MAPPING\n", + "print(\"All modules imported successfully!\")" + ] + }, + { + "cell_type": "markdown", + "id": "ab5a1dbb-93ad-40a5-9e73-5ee7efdc2fa4", + "metadata": {}, + "source": [ + "## 2. Base Level - Loading a Scopus CSV File\n", + "In this section we demonstrate the BASE LEVEL of the pipeline.\n", + "We load a manually exported CSV file from Scopus and standardized it." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "e8046794-78ff-4348-b19c-b84b63523845", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Raw shape: (20, 45)\n", + "\n", + "Raw column names (Scopus format):\n", + "['Authors', 'Author full names', 'Author(s) ID', 'Title', 'Year', 'Source title', 'Volume', 'Issue', 'Art. No.', 'Page start', 'Page end', 'Cited by', 'DOI', 'Link', 'Affiliations', 'Authors with affiliations', 'Abstract', 'Author Keywords', 'Index Keywords', 'Molecular Sequence Numbers', 'Chemicals/CAS', 'Tradenames', 'Manufacturers', 'Funding Details', 'Funding Texts', 'References', 'Correspondence Address', 'Editors', 'Publisher', 'Sponsors', 'Conference name', 'Conference date', 'Conference location', 'Conference code', 'ISSN', 'ISBN', 'CODEN', 'PubMed ID', 'Language of Original Document', 'Abbreviated Source Title', 'Document Type', 'Publication Stage', 'Open Access', 'Source', 'EID']\n" + ] + } + ], + "source": [ + "df_raw = pd.read_csv(\"scopus_test.csv\",encoding=\"utf-8\")\n", + "print(f\"Raw shape: {df_raw.shape}\")\n", + "print(f\"\\nRaw column names (Scopus format):\")\n", + "print(df_raw.columns.tolist())" + ] + }, + { + "cell_type": "markdown", + "id": "8a2faa93-e9fc-493e-b58a-2eb92af18b4f", + "metadata": {}, + "source": [ + "## 2.1 Transform - Applying the ETL Pipeline\n", + "We apply the mapping dictionary to rename columns to WoS tags, enforce correct data types, fill missing values, and calculate the SR field." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "de137c4b-0f66-4bd4-b3c2-09cbd123e51f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[TRANSFORM] Starting transformation for source: SCOPUS\n", + "[TRANSFORM] Columns renamed.\n", + "[TRANSFORM] List columns enforced\n", + "[TRANSFORM] Integer columns enforced.\n", + "[TRANSFORM] Missing columns filled.\n", + "[TRANSFORM] Null values filled.\n", + "[TRANSFORM] SR field calculated.\n", + "[TRANSFORM] Done. Shape: (20, 47).\n", + "\n", + "Standardized shape: (20, 47)\n", + "\n", + "Standardized column names (WoS format):\n", + "['AU', 'AF', 'Author(s) ID', 'TI', 'PY', 'SO', 'VL', 'IS', 'Art. No.', 'BP', 'EP', 'TC', 'DI', 'Link', 'C1', 'Authors with affiliations', 'AB', 'DE', 'ID', 'Molecular Sequence Numbers', 'Chemicals/CAS', 'Tradenames', 'Manufacturers', 'Funding Details', 'Funding Texts', 'CR', 'RP', 'Editors', 'Publisher', 'Sponsors', 'Conference name', 'Conference date', 'Conference location', 'Conference code', 'ISSN', 'ISBN', 'CODEN', 'PMID', 'LA', 'JI', 'DT', 'Publication Stage', 'Open Access', 'Source', 'UT', 'DB', 'SR']\n" + ] + } + ], + "source": [ + "df_scopus = transform(df_raw, SCOPUS_CSV_MAPPING, \"SCOPUS\")\n", + "print(f\"\\nStandardized shape: {df_scopus.shape}\")\n", + "print(f\"\\nStandardized column names (WoS format):\")\n", + "print(df_scopus.columns.tolist())" + ] + }, + { + "cell_type": "markdown", + "id": "7f7f7c73-eb45-4f5d-ab46-d6385369cfc0", + "metadata": {}, + "source": [ + "## 2.2 Validate - Checking the Standardized DataFrame\n", + "The validator checks thaat all mandatory columns are present, no null values remain, and list columns are correctly typed.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "e218f9b8-793b-4ce9-a0cf-a94361b2b9c2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "---Running ETL Validation---\n", + "[OK] All mandatory columns are present.\n", + "[OK] No null values found.\n", + "[OK] All list columns are correctly typed.\n", + "---Validation PASSED---\n", + "\n", + "Sample of key standardized columns:\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
TIAUPYSOTCSR
0Investigation on the suppression of Gas-Coaldu...[Bin L., Jinzhang J., YangQiang, Dongming W., ...2027Fuel0Bin L., 2027, Fuel
1Impurity effects on CO2 trapping indices: A nu...[Alkhowaildi M., Tariq Z., AlTammar M.J., Hote...2027Fuel0Alkhowaildi M., 2027, Fuel
2Fully elucidating catalyst-driven combustion m...[Wen M., Han J., Zhang X., Zhao Y., Zhang Y., ...2027Fuel0Wen M., 2027, Fuel
3Wettability of geological formations in a CO2 ...[Aboushanab M., Arif M.]2027Fuel0Aboushanab M., 2027, Fuel
4Physics-informed dual integration machine lear...[Zhang M., Zhu W., Mao T., Cao J., Meng X., Bi...2027Fuel0Zhang M., 2027, Fuel
\n", + "
" + ], + "text/plain": [ + " TI \\\n", + "0 Investigation on the suppression of Gas-Coaldu... \n", + "1 Impurity effects on CO2 trapping indices: A nu... \n", + "2 Fully elucidating catalyst-driven combustion m... \n", + "3 Wettability of geological formations in a CO2 ... \n", + "4 Physics-informed dual integration machine lear... \n", + "\n", + " AU PY SO TC \\\n", + "0 [Bin L., Jinzhang J., YangQiang, Dongming W., ... 2027 Fuel 0 \n", + "1 [Alkhowaildi M., Tariq Z., AlTammar M.J., Hote... 2027 Fuel 0 \n", + "2 [Wen M., Han J., Zhang X., Zhao Y., Zhang Y., ... 2027 Fuel 0 \n", + "3 [Aboushanab M., Arif M.] 2027 Fuel 0 \n", + "4 [Zhang M., Zhu W., Mao T., Cao J., Meng X., Bi... 2027 Fuel 0 \n", + "\n", + " SR \n", + "0 Bin L., 2027, Fuel \n", + "1 Alkhowaildi M., 2027, Fuel \n", + "2 Wen M., 2027, Fuel \n", + "3 Aboushanab M., 2027, Fuel \n", + "4 Zhang M., 2027, Fuel " + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "result = validate(df_scopus)\n", + "print(\"\\nSample of key standardized columns:\")\n", + "df_scopus[[\"TI\",\"AU\",\"PY\",\"SO\",\"TC\",\"SR\"]].head(5)" + ] + }, + { + "cell_type": "markdown", + "id": "0d067e7a-db8d-4a8e-ba22-c6e12b7c07ea", + "metadata": {}, + "source": [ + "## 3. Advanced Level - Retrieving Data via API\n", + "Here we demonstrate the ADVANCED LEVEL of the pipeline.\n", + "Data is retrieved automatically from OpenAlex and PubMed REST APIs using a simple text query, with no manual download required." + ] + }, + { + "cell_type": "markdown", + "id": "244029a4-cac5-43b6-864f-e275e436d198", + "metadata": {}, + "source": [ + "### 3.1 OpenAlex API" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "b8872656-19b5-4461-9f55-620bed659f65", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[OpenAlex] Searching for: 'machine learning' (max 10 results)\n", + "[OpenAlex] Page 1: retrieved 25 records. Total so far: 10\n", + "[OpenAlex] Done. Total records retrieved: 10\n", + "\n", + "Retrieved shape: (10, 23)\n", + "\n", + "First 3 titles:\n", + " - Scikit-learn: Machine Learning in Python\n", + " - Genetic algorithms in search, optimization, and machine learning\n", + " - C4.5: Programs for Machine Learning\n" + ] + } + ], + "source": [ + "df_openalex_raw = retrieve_openalex(query=\"machine learning\", max_results=10)\n", + "print(f\"\\nRetrieved shape: {df_openalex_raw.shape}\")\n", + "print(f\"\\nFirst 3 titles:\")\n", + "for title in df_openalex_raw[\"TI\"].head(3):\n", + " print(f\" - {title}\")" + ] + }, + { + "cell_type": "markdown", + "id": "c9114131-27f9-46b8-a897-fd8ca583621d", + "metadata": {}, + "source": [ + "### 3.2 Transform and Validate OpenAlex data" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "1e1622a4-6787-4d30-b8a7-7073bdf097bd", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[TRANSFORM] Starting transformation for source: OPENALEX\n", + "[TRANSFORM] Columns renamed.\n", + "[TRANSFORM] List columns enforced\n", + "[TRANSFORM] Integer columns enforced.\n", + "[TRANSFORM] Missing columns filled.\n", + "[TRANSFORM] Null values filled.\n", + "[TRANSFORM] SR field calculated.\n", + "[TRANSFORM] Done. Shape: (10, 24).\n", + "---Running ETL Validation---\n", + "[OK] All mandatory columns are present.\n", + "[OK] No null values found.\n", + "[OK] All list columns are correctly typed.\n", + "---Validation PASSED---\n", + "\n", + "Sample of key standardized columns:\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
TIAUPYSOTCSR
0Scikit-learn: Machine Learning in Python[]201263678, 2012,
1Genetic algorithms in search, optimization, an...[]198949333, 1989,
2C4.5: Programs for Machine Learning[]199223696, 1992,
\n", + "
" + ], + "text/plain": [ + " TI AU PY SO TC \\\n", + "0 Scikit-learn: Machine Learning in Python [] 2012 63678 \n", + "1 Genetic algorithms in search, optimization, an... [] 1989 49333 \n", + "2 C4.5: Programs for Machine Learning [] 1992 23696 \n", + "\n", + " SR \n", + "0 , 2012, \n", + "1 , 1989, \n", + "2 , 1992, " + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df_openalex = transform(df_openalex_raw, {}, \"OPENALEX\")\n", + "result = validate(df_openalex)\n", + "print(\"\\nSample of key standardized columns:\")\n", + "df_openalex[[\"TI\",\"AU\",\"PY\",\"SO\",\"TC\",\"SR\"]].head(3)" + ] + }, + { + "cell_type": "markdown", + "id": "356be294-611c-4e12-b362-14865b3c205f", + "metadata": {}, + "source": [ + "### 3.3 PubMed API" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "bcec90db-76aa-4fcf-a5a8-8d363742d1b6", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[PubMed] Searching for: 'machine learning' (max 10 results)\n", + "[PubMed] Found 10 IDs.\n", + "[PubMed] Fetched batch 1.Total so far: 10\n", + "[PubMed] Done. Total records retrieved: 10\n", + "\n", + "Retrieve shape: (10, 23)\n", + "\n", + "First 3 titles:\n", + " - Prediction of an fMRI-based schizophrenia biomarker from EEG using dynamic\n", + " - fNIRS Single-trial decoding improves systematically with higher optode density,\n", + " - Comprehensive analysis of m6A RNA methylation regulators and the immune\n" + ] + } + ], + "source": [ + "df_pubmed_raw = retrieve_pubmed(query=\"machine learning\", max_results=10)\n", + "print(f\"\\nRetrieve shape: {df_pubmed_raw.shape}\")\n", + "print(f\"\\nFirst 3 titles:\")\n", + "for title in df_pubmed_raw[\"TI\"].head(3):\n", + " print(f\" - {title}\")" + ] + }, + { + "cell_type": "markdown", + "id": "7d0e6149-a384-4b49-a9d5-1034c6f96051", + "metadata": {}, + "source": [ + "### 3.4 Transform and Validate PubMed data" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "86ba80c8-d695-4781-a081-b581b9a8b317", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[TRANSFORM] Starting transformation for source: PUBMED\n", + "[TRANSFORM] Columns renamed.\n", + "[TRANSFORM] List columns enforced\n", + "[TRANSFORM] Integer columns enforced.\n", + "[TRANSFORM] Missing columns filled.\n", + "[TRANSFORM] Null values filled.\n", + "[TRANSFORM] SR field calculated.\n", + "[TRANSFORM] Done. Shape: (10, 24).\n", + "---Running ETL Validation---\n", + "[OK] All mandatory columns are present.\n", + "[OK] No null values found.\n", + "[OK] All list columns are correctly typed.\n", + "---Validation PASSED---\n", + "n\\Sample of key standardized columns:\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
TIAUPYSOTCSR
0Prediction of an fMRI-based schizophrenia biom...[Tamano R, Ogawa T, Katagiri A, Cai C, Kawanab...2026Biomedical physics & engineering express0Tamano R, 2026, Biomedical physics & engineeri...
1fNIRS Single-trial decoding improves systemati...[Fischer T, Middell E, Moradi S, von Luhmann A]2026Journal of neural engineering0Fischer T, 2026, Journal of neural engineering
2Comprehensive analysis of m6A RNA methylation ...[Liu X, Hu J, Shi G, Zhu W, Hao Q]2026Frontiers in neurology0Liu X, 2026, Frontiers in neurology
\n", + "
" + ], + "text/plain": [ + " TI \\\n", + "0 Prediction of an fMRI-based schizophrenia biom... \n", + "1 fNIRS Single-trial decoding improves systemati... \n", + "2 Comprehensive analysis of m6A RNA methylation ... \n", + "\n", + " AU PY \\\n", + "0 [Tamano R, Ogawa T, Katagiri A, Cai C, Kawanab... 2026 \n", + "1 [Fischer T, Middell E, Moradi S, von Luhmann A] 2026 \n", + "2 [Liu X, Hu J, Shi G, Zhu W, Hao Q] 2026 \n", + "\n", + " SO TC \\\n", + "0 Biomedical physics & engineering express 0 \n", + "1 Journal of neural engineering 0 \n", + "2 Frontiers in neurology 0 \n", + "\n", + " SR \n", + "0 Tamano R, 2026, Biomedical physics & engineeri... \n", + "1 Fischer T, 2026, Journal of neural engineering \n", + "2 Liu X, 2026, Frontiers in neurology " + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df_pubmed = transform(df_pubmed_raw, {}, \"PUBMED\")\n", + "result = validate(df_pubmed)\n", + "print(\"n\\Sample of key standardized columns:\")\n", + "df_pubmed[[\"TI\",\"AU\",\"PY\",\"SO\",\"TC\",\"SR\"]].head(3)" + ] + }, + { + "cell_type": "markdown", + "id": "1682906a-734a-436d-a702-196c1251835b", + "metadata": {}, + "source": [ + "## 4. Exporting the Standardized DataFrame to CSV\n", + "The standardized DataFrame can be exported to CSV for use with the Bibliometrix-Python analytical functions." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "4c63ab59-a3ec-4a1d-96ec-71302f1eaaf7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Scopus standardized CSV saved: scopus_standardized.csv\n", + "OpenAlex standardized CSV saved: openalex_standardized.csv\n", + "PubMed standardized CSV saved: pubmed_standardized.csv\n", + "\n", + "All files exported successfully!\n" + ] + } + ], + "source": [ + "df_scopus.to_csv(\"scopus_standardized.csv\", index=False)\n", + "print(\"Scopus standardized CSV saved: scopus_standardized.csv\")\n", + "df_openalex.to_csv(\"openalex_standardized.csv\", index=False)\n", + "print(\"OpenAlex standardized CSV saved: openalex_standardized.csv\")\n", + "df_pubmed.to_csv(\"pubmed_standardized.csv\", index=False)\n", + "print(\"PubMed standardized CSV saved: pubmed_standardized.csv\")\n", + "print(\"\\nAll files exported successfully!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "afa6b4a8-8275-43c9-af7d-9398a1cd117b", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/www/services/__init__.py b/www/services/__init__.py index 28584e105..9a16e7bea 100644 --- a/www/services/__init__.py +++ b/www/services/__init__.py @@ -1,17 +1,2 @@ -from .biblionetwork import * -from .cocmatrix import * -from .couplingmap import * -from .format_functions import * -from .histnetwork import * -from .histplot import * -from .htmldownload import * -from .igraph2vis import * -from .metatagextraction import * -from .networkplot import * -from .parsers import * -from .plotlydownload import * -from .savereport import * -from .tabletag import * -from .termextraction import * -from .thematicmap import * -from .utils import * \ No newline at end of file +# Selective imports to avoid loading heavy dependencies automatically. +# Individual modules can still be imported directly when needed. \ No newline at end of file diff --git a/www/services/etl/__init__.py b/www/services/etl/__init__.py new file mode 100644 index 000000000..5f9941761 --- /dev/null +++ b/www/services/etl/__init__.py @@ -0,0 +1,4 @@ +from .standardizer import convert2df +from .transformer import transform +from .validator import validate +from .api_retriever import retrieve_openalex, retrieve_pubmed diff --git a/www/services/etl/api_retriever.py b/www/services/etl/api_retriever.py new file mode 100644 index 000000000..3ae35586b --- /dev/null +++ b/www/services/etl/api_retriever.py @@ -0,0 +1,281 @@ +""" +API Retriever module for the Bibliometrix ETL pipeline. +Retrieves bibliographic data from PubMed and OpenAlex REST APIs. +Handles pagination, rate limits, and retrieves automatically. +""" +import requests +import time +import pandas as pd + +OPENALEX_BASE_URL = "https://api.openalex.org/works" +PUBMED_SEARCH_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi" +PUBMED_FETCH_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi" + +MAX_RETRIES = 3 +RETRY_DELAY = 2 +PAGE_SIZE = 25 + +def _get_with_retry(url:str, params:dict) -> dict: + """ + Perform a GET request with automatic retry on failure. + Waits RETRY_DELAY seconds between attempts + Args: + url:The endpoint URL to call. + params:Query parameters to include in the request. + Returns: + The parsed JSON response as a dictionary. + Raises: + RuntimeError: If all retry attempts fail. + """ + for attempt in range(1,MAX_RETRIES+1): + try: + response = requests.get(url, params=params, timeout=10) + response.raise_for_status() + return response.json() + except requests.RequestException as e: + print(f"[API] Attempt {attempt}/{MAX_RETRIES} failed: {e}") + if attempt < MAX_RETRIES: + time.sleep(RETRY_DELAY) + raise RuntimeError(f"All {MAX_RETRIES} attempts failed for URL: {url}") + +def _parse_openalex_record(work:dict) -> dict: + """ + Parse a single OpenAlex work record into a flat dictionary + using the standard WoS-compatible field names. + Args: + work:A single work object from the OpenAlex API response. + Returns: + A flat dictionary with standardized field names. + """ + authors = [] + authors_full = [] + affiliations = [] + for autorship in work.get("autorship",[]): + author_name = autorship.get("author",{}).get("display_name","") + authors.append(author_name) + authors_full.append(author_name) + for inst in autorship.get("institutions",[]): + affiliations.append(inst.get("display_name","")) + abstract = "" + inverted_index = work.get("abstract_inverted_index",{}) + if inverted_index: + words = [""] * (max( + pos for psitions in inverted_index.values() + for pos in psitions + )+1) + for word, positions in inverted_index.items(): + for pos in positions: + words[pos] = word + abstract = " ".join(words) + keywords = [kw.get("display_name","") for kw in work.get("keywords",[])] + cited_refs = work.get("referenced_works", []) + source = work.get("primary_location",{}) or {} + source_info = source.get("source",{}) or {} + journal = source.get("display_name","") + journal_abbr = source.get("issn_1","") + biblio = work.get("biblio",{}) or {} + doi = work.get("doi","") or "" + doi = doi.replace("https://doi.org/","") + return { + "TI":work.get("title",""), + "AB":abstract, + "PY":str(work.get("publication_year","")), + "SO":journal, + "JI":journal_abbr, + "VL":str(biblio.get("volume","") or ""), + "IS":str(biblio.get("issue","") or ""), + "BP":str(biblio.get("first_page","") or ""), + "EP":str(biblio.get("last_page","") or ""), + "DI":doi, + "UT":work.get("id",""), + "PMID":str(work.get("ids",{}).get("pmid","") or "").replace("https://pubmed.ncbi.nlm.nih.gov/",""), + "DT":work.get("type",""), + "LA":work.get("language",""), + "TC":work.get("cited_by_count",0), + "AU":authors, + "AF":authors_full, + "C1":affiliations, + "CR":cited_refs, + "DE":keywords, + "ID":keywords, + "RP":"", + "DB":"OPENALEX" + } + +def retrieve_openalex(query:str, max_results:int = 100) -> pd.DataFrame: + """ + Retrieve bibliographic records form OpenAlex for a given query. + Handles pagination automatically. + Args: + query:The search query string (e.g. 'machine learning'). + max_results: Maximum number of records to retrieve. + Returns: + A DataFrame with no row per record, using standard WoS field names. + """ + print(f"[OpenAlex] Searching for: '{query}' (max {max_results} results)") + records = [] + page = 1 + while len(records) < max_results: + params = { + "search":query, + "per-page":PAGE_SIZE, + "page":page, + "select":"id,title,abstract_inverted_index,publication_year,primary_location,biblio,doi,ids,type,language,cited_by_count,authorships,keywords,referenced_works", + } + data = _get_with_retry(OPENALEX_BASE_URL, params) + works = data.get("results",[]) + if not works: + print("[OpenAlex] No more results.") + break + for work in works: + records.append(_parse_openalex_record(work)) + if len(records) >= max_results: + break + print(f"[OpenAlex] Page {page}: retrieved {len(works)} records. Total so far: {len(records)}") + page += 1 + time.sleep(0.5) + print(f"[OpenAlex] Done. Total records retrieved: {len(records)}") + return pd.DataFrame(records) + +def _fetch_pubmed_ids(query:str, max_results:int) -> list: + """ + Search PubMed for a query and return a list of PubMed IDs. + Args: + query:The search query string. + max_results:Maximum number of IDs to retrieve. + Returns: + A list of PubMed ID strings. + """ + params = { + "db":"pubmed", + "term":query, + "retmax":max_results, + "retmode":"json" + } + data = _get_with_retry(PUBMED_SEARCH_URL,params) + ids = data.get("esearchresult", {}).get("idlist",[]) + print(f"[PubMed] Found {len(ids)} IDs.") + return ids + +def _fetch_pubmed_records(pmids:list) -> list: + """ + Fetch full records for a list of PubMed IDs in batches. + Args: + pmids:List of PubMed ID strings. + Returns: + A list of flat dictionaries with standardized field names. + """ + records = [] + batch_size = 20 + for i in range(0,len(pmids),batch_size): + batch = pmids[i:i+batch_size] + params = { + "db":"pubmed", + "id":",".join(batch), + "retmode":"text", + "rettype":"medline" + } + for attempt in range(1, MAX_RETRIES+1): + try: + response = requests.get(PUBMED_FETCH_URL, params=params, timeout=10) + response.raise_for_status() + break + except requests.RequestException as e: + print(f"[PubMed] Attempt {attempt}/{MAX_RETRIES} failed: {e}") + if attempt < MAX_RETRIES: + time.sleep(RETRY_DELAY) + else: + raise RuntimeError(f"Failed to fetch PubMed batch starting at index {i}") + records.extend(_parse_pubmed_text(response.text)) + print(f"[PubMed] Fetched batch {i//batch_size+1}.Total so far: {len(records)}") + time.sleep(0.5) + return records + +def _medline_to_standard(record:dict) -> dict: + """ + Convert a raw MEDLINE record dictionary to standard WoS field names. + Args: + record:A raw dictionary with MEDLINE field tags as keys. + Returns: + A flat dictionary with standardized WoS field names. + """ + authors = [a.strip() for a in record.get("AU","").split(";") if a.strip()] + authors_full = [a.strip() for a in record.get("FAU","").split(";") if a.strip()] + dp = record.get("DP","") + import re + year_match = re.search(r"\d{4}", dp) + year = year_match.group(0) if year_match else "" + keywords = [k.strip().replace("*","") for k in record.get("MH","").split(";") if k.strip()] + return { + "TI":record.get("TI", ""), + "AB":record.get("AB", ""), + "PY":year, + "SO":record.get("JT", ""), + "JI":record.get("TA", ""), + "VL":record.get("VI", ""), + "IS":record.get("IP", ""), + "BP":record.get("PG", "").split("-")[0] if record.get("PG") else "", + "EP":record.get("PG", "").split("-")[-1] if record.get("PG") else "", + "DI":record.get("LID", ""), + "UT":record.get("PMID", ""), + "PMID":record.get("PMID", ""), + "DT":record.get("PT", ""), + "LA":record.get("LA", ""), + "TC":0, + "AU":authors, + "AF":authors_full, + "C1":[a.strip() for a in record.get("AD", "").split(";") if a.strip()], + "CR":[], + "DE":keywords, + "ID":keywords, + "RP":"", + "DB":"PUBMED" + } + +def _parse_pubmed_text(text:str) -> list: + """ + Parse PubMed MEDLINE text format into a list of flat dictionaries. + Args: + text:Raw MEDLINE text from the PubMed efetch API. + Returns: + A list of flat dictionaries with standardized field names. + """ + records = [] + current = {} + current_key = None + for line in text.splitlines(): + if line.strip() == "": + if current: + records.append(_medline_to_standard(current)) + current = {} + current_key = None + continue + if line[:4].strip() and line [4:6] == "- ": + current_key = line[:4].strip() + value = line[6:].strip() + if current_key in current: + current[current_key] += ";" + value + else: + current[current_key] = value + if current: + records.append(_medline_to_standard(current)) + return records + +def retrieve_pubmed(query:str, max_results:int=100) -> pd.DataFrame: + """ + Retrieve bibliographic records from PubMed for a given query. + Args: + query:The search query string (e.g. 'machine learning'). + max_results:Maximum number of records to retrieve. + Returns: + A DataFrame with one row per record, using standard WoS field names. + """ + print(f"[PubMed] Searching for: '{query}' (max {max_results} results)") + pmids = _fetch_pubmed_ids(query, max_results) + if not pmids: + print("[PubMed] No results found.") + return pd.DataFrame() + records = _fetch_pubmed_records(pmids) + print(f"[PubMed] Done. Total records retrieved: {len(records)}") + return pd.DataFrame(records) + diff --git a/www/services/etl/mappings/__init__.py b/www/services/etl/mappings/__init__.py new file mode 100644 index 000000000..b5aa1178f --- /dev/null +++ b/www/services/etl/mappings/__init__.py @@ -0,0 +1,5 @@ +from .scopus_mappin import SCOPUS_CSV_MAPPING +from .dimensions_mapping import DIMENSIONS_MAPPING +from .pubmed_mapping import PUBMED_MAPPING +from .openlex_mapping import OPENALEX_MAPPING + diff --git a/www/services/etl/mappings/dimensions_mapping.py b/www/services/etl/mappings/dimensions_mapping.py new file mode 100644 index 000000000..850359ab2 --- /dev/null +++ b/www/services/etl/mappings/dimensions_mapping.py @@ -0,0 +1,21 @@ +""" +Mapping dictionary for Dimensions exported data. +Maps raw Dimensions column names to standard WoS field tags +""" +DIMENSIONS_MAPPING = { + "Title":"TI", + "Abstract":"AB", + "PubYear":"PY", + "Source title":"SO", + "Volume":"VL", + "Issue":"IS", + "Pagination":"BP", + "DOI":"DI", + "Publication ID":"UT", + "PMID":"PMID", + "Publication Type":"DT", + "Times cited":"TC", + "Authors":"AU", + "Corresponding Authors":"RP", + "MeSH terms":"DE" +} \ No newline at end of file diff --git a/www/services/etl/mappings/openalex_mapping.py b/www/services/etl/mappings/openalex_mapping.py new file mode 100644 index 000000000..f218c047a --- /dev/null +++ b/www/services/etl/mappings/openalex_mapping.py @@ -0,0 +1,21 @@ +""" +Mapping dictionary for OpenAlex API response data. +Maps raw OpenAlex field names to standard WoS field tags. +""" +OPENALEX_MAPPING = { + "title":"TI", + "abstract":"AB", + "publication_year":"PY", + "primary_location.source.display_name":"SO", + "primary_location.source.issn_1":"JI", + "biblio.volume":"VL", + "biblio.issue":"IS", + "biblio.first_page":"BP", + "biblio.last_page":"EP", + "doi":"DI", + "id":"UT", + "ids.pmid":"PMID", + "type":"DT", + "language":"LA", + "citied_by_count":"TC" +} \ No newline at end of file diff --git a/www/services/etl/mappings/pubmed_mapping.py b/www/services/etl/mappings/pubmed_mapping.py new file mode 100644 index 000000000..d669248ce --- /dev/null +++ b/www/services/etl/mappings/pubmed_mapping.py @@ -0,0 +1,23 @@ +""" +Mapping dictionary for PubMed exported data +Maps raw PubMed field tags to standard WoS field tags. +""" +PUBMED_MAPPING = { + "TI":"TI", + "AB":"AB", + "DP":"PY", + "JT":"SO", + "TA":"JI", + "VI":"VL", + "IP":"IS", + "PG":"BP", + "LID":"DI", + "PMID":"PMID", + "PT":"DT", + "LA":"LA", + "AU":"AU", + "FAU":"AF", + "AD":"C1", + "MH":"DE", + "GR":"FU" +} \ No newline at end of file diff --git a/www/services/etl/mappings/scopus_mapping.py b/www/services/etl/mappings/scopus_mapping.py new file mode 100644 index 000000000..b6327c17e --- /dev/null +++ b/www/services/etl/mappings/scopus_mapping.py @@ -0,0 +1,29 @@ +""" +Mapping dictionary for Scopus exported data. +Maps raw Scopus column names to standard WoS tags +""" + +SCOPUS_CSV_MAPPING = { + "Title":"TI", + "Abstract":"AB", + "Year":"PY", + "Source title":"SO", + "Abbreviated Source Title":"JI", + "Volume":"VL", + "Issue":"IS", + "Page start":"BP", + "Page end":"EP", + "DOI":"DI", + "EID":"UT", + "PubMed ID":"PMID", + "Document Type":"DT", + "Language of Original Document":"LA", + "Cited by":"TC", + "Authors":"AU", + "Author full names":"AF", + "Affiliations":"C1", + "Correspondence Address":"RP", + "References":"CR", + "Author Keywords":"DE", + "Index Keywords":"ID" +} \ No newline at end of file diff --git a/www/services/etl/standardizer.py b/www/services/etl/standardizer.py new file mode 100644 index 000000000..579af14e4 --- /dev/null +++ b/www/services/etl/standardizer.py @@ -0,0 +1,117 @@ +""" +Standardizer module for the Bibliometrix ETL pipeline. +This is the main point of the pipeline, equivalent to the convert2df() function in the R version of the Bibliometrix. +Usage: + from www.services.etl.standardizer import convert2df + #From a local file: + df = convert2df(source="scopus",filepath="data/scopus_export.csv") + #From an API query: + df = convert2df(source="openlax",query="machine learning",max_results=100) +""" +import pandas as pd +from www.services.etl.transformer import transform +from www.services.etl.validator import validate +from www.services.etl.api_retriever import retrieve_openalex, retrieve_pubmed +from www.services.etl.mappings import (SCOPUS_CSV_MAPPING, DIMENSIONS_MAPPING, PUBMED_MAPPING, OPENALEX_MAPPING) + +FILE_SOURCES = { + "scopus":SCOPUS_CSV_MAPPING, + "dimension":DIMENSIONS_MAPPING, + "pubmed":PUBMED_MAPPING +} + +API_SOURCES = { + "openalex":retrieve_openalex, + "pubmed":retrieve_pubmed +} + +def extract_file(source:str,filepath:str) -> pd.DataFrame: + """ + Extract raw data from a local file based on the source type. + Supports CSV, XLSX, and TXT (PubMed MEDLINE format). + Args: + source:The source database name (e.g. 'scopus','dimensions'). + filepath:The path to the local file to load. + Returns: + A raw DataFrame loaded from the file. + Raises: + ValueError:If the source or the file type is not supported. + """ + print(f"[EXTRACT] Loading file: {filepath} (source: {source})") + if source == "scopus": + if filepath.endswith(".csv"): + return pd.read_csv(filepath,encoding="utf-8") + else: + raise ValueError(f"Scopus only supports .csv files. Go: {filepath}") + elif source == "dimensions": + if filepath.endswith(".xlsx"): + return pd.read_excel(filepath,skiprows=1) + elif filepath.endswith(".csv"): + return pd.read_csv(filepath,skiprows=1,encoding="utf-8") + else: + raise ValueError(f"Dimensions only support .csv or .xlsx files.Got: {filepath}") + elif source == "pubmed": + if filepath.endswith(".txt"): + from www.services.etl.api_retriever import _parse_pubmed_text + with open(filepath, "r", encoding="utf-8") as f: + text = f.read() + records = _parse_pubmed_text(text) + return pd.DataFrame(records) + else: + raise ValueError(f"PubMed only supports .txt files.Got: {filepath}") + else: + raise ValueError(f"Unsupported file source: '{source}'." + f"Supported sources: {list(FILE_SOURCES.keys())}") + +def convert2df( + source:str, + filepath:str=None, + query:str=None, + max_results:int=100, + run_validation:bool=True, +) -> pd.DataFrame: + """ + Main entry point of the Bibliometrix ETL pipeline. + Converts heterogeneous bibliographic data into a standardized DataFrame. + Equivalent to the convert2df() function in the R version of the Bibliometrix. + Can operate in two modes: + - FILE MODE:loads a manually exported file (Base Level) + - API MODE:retrieves data automatically via REST API (Advanced Level) + Args: + source:The data source. Supported values: + File mode: 'scopus','dimensions';'pubmed' + API mode:'openalex','pubmed' + filepath:Path to the local file (required for file mode). + query:Search query string (required for API mode). + max_results:Maximum number of records to retrieve (API mode only). + run_validation:If True, runs the validator before returning the DataFrame. + Returns: + A fully standardized pandas DataFrame ready for Bibliometrix analysis. + Raises: + ValueError: If neither filepath nor query is provided, or if the source is not supported. + Examples: + >>> df = convert2df(source="scopus",filepath="scopus_sxport.csv") + >>> df = convert2df(source="openalex",query="deep learning",max_results=50) + """ + print(f"[convert2df] Source: {source} | Mode: {'API' if query else 'FILE'}") + if query is not None: + if source not in API_SOURCES: + raise ValueError(f"API not supported for source: '{source}'. Supported API sources: {list(API_SOURCES.keys())}") + retriever = API_SOURCES[source] + df = retriever(query=query, max_results=max_results) + mapping = [] + db_name = source.upper() + elif filepath is not None: + if source not in FILE_SOURCES: + raise ValueError(f"File mode not supported for source: '{source}'. Supported file sources: {list(FILE_SOURCES.keys())}") + df = extract_file(source, filepath) + mapping = FILE_SOURCES[source] + db_name = source.upper() + else: + raise ValueError("You must provide either 'filepath' (file mode) or 'query' (API mode).") + df = transform(df,mapping,db_name) + if run_validation: + validate(df) + return df + + diff --git a/www/services/etl/transformer.py b/www/services/etl/transformer.py new file mode 100644 index 000000000..90bd374fd --- /dev/null +++ b/www/services/etl/transformer.py @@ -0,0 +1,175 @@ +""" +Transformer module for the Bibliometrix ETL pipeline. +Handles column renaming, type enforcement, null handling, +and derived field calculation. +""" +import pandas as pd +import re + +LIST_COLUMNS = ["AU","AF","C1","CR","DE","ID"] +INT_COLUMNS = ["TC"] +COLUMN_DEFAULT = { + "DB":"", + "UT":"", + "DI":"", + "PMID":"", + "TI":"", + "SO":"", + "JI":"", + "PY":"", + "DT":"", + "LA":"", + "TC":0, + "AU":[], + "AF":[], + "C1":[], + "RP":"", + "CR":[], + "DE":[], + "ID":[], + "AB":"", + "VL":"", + "IS":"", + "BP":"", + "EP":"", + "SR":"" +} + +def rename_columns(df:pd.DataFrame, mapping:dict) ->pd.DataFrame: + """ + Rename raw source columns to standard WoS field tags using a mapping dictionary. + Args: + df: The raw DataFrame from the source. + mapping: A dictionary mapping raw column names to WoS tags. + Returns: + A new DataFrame with renamed columns. + """ + existing_mapping = {k: v for k,v in mapping.items() if k in df.columns} + return df.rename(columns=existing_mapping) + +def enforce_list_columns(df:pd.DataFrame) -> pd.DataFrame: + """ + Ensure that all list columns contain Python lists of strings. + Splits string values using semicolon as delimeter. + Args: + df: The DataFrame after column renaming. + Returns: + The DataFrame with list columns properly typed. + """ + for col in LIST_COLUMNS: + if col in df.columns: + def to_list(val): + if isinstance(val,list): + return val + if pd.isna(val) or val == "" or val is None: + return [] + if isinstance(val,str): + return [item.strip() for item in val.split(";") if item.strip()] + return [str(val)] + df[col] = df[col].apply(to_list) + return df + +def enforce_int_columns(df:pd.DataFrame) -> pd.DataFrame: + """ + Ensure that integer columns are properly cast to integers. + Replaces nulls with 0. + Args: + df:The DataFrame after columns renaming. + Returns: + The DataFrame with integer columns properly typed. + """ + for col in INT_COLUMNS: + if col in df.columns: + df[col] = pd.to_numeric(df[col],errors="coerce").fillna(0).astype(int) + return df + +def fill_missing_columns(df:pd.DataFrame, db_name:str) -> pd.DataFrame: + """ + Add any missing mandatory columns with their default empty values. + Also sets the DB column to identify the data source. + Args: + df:The DataFrame after type enforcement. + db_name:The name of the source database (e.g. 'SCOPUS', 'PUBMED'). + Returns: + The DataFrame with all mandatory columns present. + """ + for col,default in COLUMN_DEFAULT.items(): + if col not in df.columns: + if isinstance(default,list): + df[col] = [[] for _ in range(len(df))] + else: + df[col] = default + df["DB"] = db_name.upper() + return df + +def fill_null_values(df:pd.DataFrame) -> pd.DataFrame: + """ + Replace all remaining NaN and None values with appropriate defaluts. + List columns get empty lists, all others get empty strings. + Args: + df:The DataFrame before final export. + Returns: + The DataFrame with no null values remaining. + """ + for col in df.columns: + if col in LIST_COLUMNS: + df[col] = df[col].apply(lambda x: x if isinstance(x,list) else []) + elif col == "TC": + df[col] = df[col].fillna(0) + else: + df[col] = df[col].fillna("") + return df + +def calculate_sr(df:pd.DataFrame) -> pd.DataFrame: + """ + Calculate the Short Reference (SR) field for each record. + Format: 'FirstAuthorSurname, PublicationYear, JournalName' + This field is used as a primary key in a citation network analyses. + Args: + df:The DataFrame with AU, PY, and SO columns populated. + Returns: + The DataFrame with the SR column filled. + """ + def build_sr(row): + authors = row.get("AU", []) + if isinstance(authors,list) and len(authors)>0: + first_author = authors[0].split(",")[0].strip() + else: + first_author = "" + year = str(row.get("PY","")).strip() + journal = str(row.get("SO","")).strip() + return f"{first_author}, {year}, {journal}" + df["SR"] = df.apply(build_sr, axis=1) + return df + +def transform(df:pd.DataFrame, mapping:dict, db_name:str) -> pd.DataFrame: + """ + Run the full transformation pipeline on a row DataFrame. + Steps: rename -> enforce types -> fill missing columns -> fill nulls -> calculate SR. + Args: + df:The raw DataFrame loaded from the source file or API. + mapping:The column mapping dictionary for this source. + db_name:The name of the source database. + Returns: + A fully standardized DataFrame ready for validation and analysis. + """ + print(f"[TRANSFORM] Starting transformation for source: {db_name}") + df = rename_columns(df,mapping) + print("[TRANSFORM] Columns renamed.") + df = enforce_list_columns(df) + print("[TRANSFORM] List columns enforced") + df = enforce_int_columns(df) + print("[TRANSFORM] Integer columns enforced.") + df = fill_missing_columns(df, db_name) + print("[TRANSFORM] Missing columns filled.") + df = fill_null_values(df) + print("[TRANSFORM] Null values filled.") + df = calculate_sr(df) + print("[TRANSFORM] SR field calculated.") + print(f"[TRANSFORM] Done. Shape: {df.shape}.") + return df + + + + + \ No newline at end of file diff --git a/www/services/etl/validator.py b/www/services/etl/validator.py new file mode 100644 index 000000000..98f881d97 --- /dev/null +++ b/www/services/etl/validator.py @@ -0,0 +1,92 @@ +""" +Validator module for the Bibliometrix ETL pipeline. +Checks that the standardized DataFrame meets the required schema +before it is passed to the analytical functions. +""" +import pandas as pd + +MANDATORY_COLUMNS = [ + "DB","UT","DI","PMID","TI","SO","JI","PY","DT","LA","TC","AU","AF","C1","RP","CR","DE","ID","AB","VL","IS","BP","EP","SR" +] + +LIST_COLUMNS = ["AU","AF","C1","CR","DE","ID"] + +def check_mandatory_columns(df: pd.DataFrame) -> list: + """ + Check that all mandatory columns are present in the DataFrame. + Args: + df:The standardized DataFrame to validate. + Returns: + A list of missing column names. Empty list means all columns are present. + """ + missing = [col for col in MANDATORY_COLUMNS if col not in df.columns] + return missing + +def check_no_nulls(df:pd.DataFrame) -> dict: + """ + Check that no Nan or None value remain in the DataFrame. + Args: + df:The standardized DataFrame to validate. + Returns: + A dictionary mapping column names to the count of null values found. + Empty dict means no nulls found + """ + null_counts = {} + for col in df.columns: + count = df[col].isna().sum() + if count > 0: + null_counts[col] = int(count) + return null_counts + +def check_list_columns(df:pd.DataFrame) -> list: + """ + Check that list columns contain actual Python lists and not strings. + Args: + df:The standardized DataFrame to validate. + Returns: + A list of column name where the type contract is violated. + """ + violations = [] + for col in LIST_COLUMNS: + if col in df.columns: + sample = df[col].dropna() + for val in sample: + if not isinstance(val,list): + violations.append(col) + break + return violations + +def validate(df: pd.DataFrame) -> bool: + """ + Run all validation checks on the standardized DataFrame. + Prints a report of any issues found. + Args: + df:The standardized DataFrame to validate. + Returns: + True if all checks pass, False if any check fails. + """ + print("---Running ETL Validation---") + passed = True + missing_cols = check_mandatory_columns(df) + if missing_cols: + print(f"[FAIL] Missing mandatory columns: {missing_cols}") + passed = False + else: + print("[OK] All mandatory columns are present.") + null_counts = check_no_nulls(df) + if null_counts: + print(f"[FAIL] Null values found: {null_counts}") + passed = False + else: + print("[OK] No null values found.") + violations = check_list_columns(df) + if violations: + print(f"[FAIL] List type violations in columns: {violations}") + passed = False + else: + print("[OK] All list columns are correctly typed.") + if passed: + print("---Validation PASSED---") + else: + print("---Validation FAILED---") + return passed