Проект по построению ETL пайплайна для хранения, обработки и анализа данных, получаемых ежедневно из API источника.
- Docker - создание контейнеров для основных компонентов
- Airflow - оркестрация процессов
- Minio S3 - озеро данных
- Postgres - хранилище данных
- Metabase - визуализация и анализ данных
- Python - извлечение и обработка данных
- PostgreSQL - написание SQL скриптов
- DBeaver - подключение и работа с базой данных
ETL-Pipeline_Last.fm/
│
├── dags/ # DAG'и Airflow
│ ├── from_dds_to_dm_pg.py # DAG для обновления витрин данных
│ ├── from_ods_to_dds_pg.py # DAG для перемещения данных из ODS в DDS слой
│ ├── raw_from_api_to_s3.py # DAG для получения данных из API в S3 хранилище
│ ├── transformed_from_s3_to_pg.py # DAG для ETL из S3 в Postgres
|
├── pictures/ # Изображения схем и архитектуры проекта
│
├── scripts/ # SQL скрипты
│ ├── ddl_dds.sql # DDL скрипт для создания таблиц DDS слоя
│ ├── ddl_dm.sql # DDL скрипт для создания витрин данных
│ ├── ddl_ods.sql # DDL скрипт для создания таблиц ODS слоя
│
├── .gitignore # Файлы и директории, игнорируемые GIT
├── README.md # Обзор проекта
├── docker-compose.yaml # Конфигурационный файл для развертывания контейнеров
└── requirements.txt # Список необходимых библиотек
Цель проекта - построить ETL Pipeline для хранения и анализа данных, получаемых ежедневно из API источника.
Задачи:
- Настроить извлечение сырых данных из API источника.
- Спроектировать хранилище данных.
- Настроить ETL процесс для ежедневной загрузки данных в хранилище.
- Построить витрины данных для дальнейшей визуализации и анализа в BI системе.
В качестве источника данных используется открытое API Last.fm. Из доступных методов применяется geo.getTopTracks, который возвращает информацию о топе треков на текущий момент времени по запрашиваемой стране. В параметрах метода задается ограничение на количество треков и формат данных. В данном проекте это 100 и JSON соответственно. Запрос отправляется при помощи библиотеки requests языка Python для трех стран: Russian Federation, United States, Kazakhstan.
Полученные JSON файлы в необработанном виде помещаются в озеро данных (S3 хранилище). Запись данных осуществляется по ключу "bucket/top_100/raw/{date}/{country}_{date}.json", где {date} - текущая дата, {country} - страна, для которой хранятся данные.
Процесс запускается ежедневно при помощи оркестратора Airflow, DAG - raw_from_api_to_s3.py. Подключение к S3 осуществляется через S3Hook, параметры подключения задаются через панель администратора в Airflow. Ключ для подключения к API хранится в виде переменной Airflow.
Подключение к БД и создание таблиц происходит посредством DBeaver. DDL скрипты находятся в директории scripts. В качестве хранилища данных используется Postgres, состоящий из 3-х слоев:
- ODS - таблица для хранения прошедших через ETL процесс данных;
- DDS - данные хранятся в виде модели "звезда", подготовленные для дальнейших аналитических запросов;
- DM - слой для витрин данных.
Для построения ETL процесса применяется язык Python, так как получаемый объем данных небольшой, параллельная обработка не требуется. DAG для ETL процесса - transformed_from_s3_to_pg.py. В нем применяется ExternalTaskSensor, который ожидает завершения выполнения DAG'a с предыдущего шага, а затем начинает работу запланированных task'ов.
Первым делом извлекаются ключи для файлов из S3 хранилища за текущую дату, а затем передаются в следующий task через xcom. Получаемые по ключам данные из JSON файлов проходят через трансформацию: извлекается необходимая информация, происходит очистка и преобразование типов данных. Трансформированные данные объединяются во временный CSV файл. Затем осуществляется загрузка в Postgres базу данных - сначала данные копируются во временную таблицу, а уже затем переносятся в ODS слой при помощи SQL скрипта.
Пример данных, хранящихся в ODS слое:
В ходе проверки данных было обнаружено, что у некоторых записей в поле "duration_sec" стоит значение "0". В таких случаях значение заменяется на среднюю продолжительность треков за дату получения.
Следующим шагом, данные из ODS слоя передаются в DDS слой в виде "звезды" для дальнейших аналитических запросов. За это отвечает DAG - from_ods_to_dds_pg.py. В нем применяется SQLExecuteQueryOperator, передающий данные с помощью SQL скриптов.
Диаграмма DDS слоя:
В данном проекте, для наглядности, были созданы три витрины данных: "avg_song_duration_by_country", "artist_appearances_by_date" и "expected_artist_royalties_by_date". Для их заполнения используется DAG - from_dds_to_dm_pg.py.
В качестве BI системы применяется Metabase, в которой строятся графики и дашборды по имеющимся витринам данных.
Пример визуализации по витрине "artist_appearances_by_date":



