Skip to content

ETL пайплайн для ежедневных данных. Весь путь от API источника до анализа витрин данных в BI системе

Notifications You must be signed in to change notification settings

MrDan1el/ETL-Pipeline-Last.fm

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

44 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

ETL Pipeline Last.fm

Схема проекта

Проект по построению ETL пайплайна для хранения, обработки и анализа данных, получаемых ежедневно из API источника.

Технологический стек и инструменты

  • Docker - создание контейнеров для основных компонентов
  • Airflow - оркестрация процессов
  • Minio S3 - озеро данных
  • Postgres - хранилище данных
  • Metabase - визуализация и анализ данных
  • Python - извлечение и обработка данных
  • PostgreSQL - написание SQL скриптов
  • DBeaver - подключение и работа с базой данных

Структура репозитория

ETL-Pipeline_Last.fm/
│
├── dags/                               # DAG'и Airflow
│   ├── from_dds_to_dm_pg.py            # DAG для обновления витрин данных
│   ├── from_ods_to_dds_pg.py           # DAG для перемещения данных из ODS в DDS слой
│   ├── raw_from_api_to_s3.py           # DAG для получения данных из API в S3 хранилище
│   ├── transformed_from_s3_to_pg.py    # DAG для ETL из S3 в Postgres
|
├── pictures/                           # Изображения схем и архитектуры проекта
│
├── scripts/                            # SQL скрипты
│   ├── ddl_dds.sql                     # DDL скрипт для создания таблиц DDS слоя
│   ├── ddl_dm.sql                      # DDL скрипт для создания витрин данных
│   ├── ddl_ods.sql                     # DDL скрипт для создания таблиц ODS слоя
│
├── .gitignore                          # Файлы и директории, игнорируемые GIT
├── README.md                           # Обзор проекта
├── docker-compose.yaml                 # Конфигурационный файл для развертывания контейнеров
└── requirements.txt                    # Список необходимых библиотек

Описание проекта

Цель проекта - построить ETL Pipeline для хранения и анализа данных, получаемых ежедневно из API источника.

Задачи:

  1. Настроить извлечение сырых данных из API источника.
  2. Спроектировать хранилище данных.
  3. Настроить ETL процесс для ежедневной загрузки данных в хранилище.
  4. Построить витрины данных для дальнейшей визуализации и анализа в BI системе.

Извлечение данных из API

В качестве источника данных используется открытое API Last.fm. Из доступных методов применяется geo.getTopTracks, который возвращает информацию о топе треков на текущий момент времени по запрашиваемой стране. В параметрах метода задается ограничение на количество треков и формат данных. В данном проекте это 100 и JSON соответственно. Запрос отправляется при помощи библиотеки requests языка Python для трех стран: Russian Federation, United States, Kazakhstan.

Полученные JSON файлы в необработанном виде помещаются в озеро данных (S3 хранилище). Запись данных осуществляется по ключу "bucket/top_100/raw/{date}/{country}_{date}.json", где {date} - текущая дата, {country} - страна, для которой хранятся данные.

Процесс запускается ежедневно при помощи оркестратора Airflow, DAG - raw_from_api_to_s3.py. Подключение к S3 осуществляется через S3Hook, параметры подключения задаются через панель администратора в Airflow. Ключ для подключения к API хранится в виде переменной Airflow.

Построение ETL процесса

Подключение к БД и создание таблиц происходит посредством DBeaver. DDL скрипты находятся в директории scripts. В качестве хранилища данных используется Postgres, состоящий из 3-х слоев:

  • ODS - таблица для хранения прошедших через ETL процесс данных;
  • DDS - данные хранятся в виде модели "звезда", подготовленные для дальнейших аналитических запросов;
  • DM - слой для витрин данных.

Для построения ETL процесса применяется язык Python, так как получаемый объем данных небольшой, параллельная обработка не требуется. DAG для ETL процесса - transformed_from_s3_to_pg.py. В нем применяется ExternalTaskSensor, который ожидает завершения выполнения DAG'a с предыдущего шага, а затем начинает работу запланированных task'ов.

Первым делом извлекаются ключи для файлов из S3 хранилища за текущую дату, а затем передаются в следующий task через xcom. Получаемые по ключам данные из JSON файлов проходят через трансформацию: извлекается необходимая информация, происходит очистка и преобразование типов данных. Трансформированные данные объединяются во временный CSV файл. Затем осуществляется загрузка в Postgres базу данных - сначала данные копируются во временную таблицу, а уже затем переносятся в ODS слой при помощи SQL скрипта.

Пример данных, хранящихся в ODS слое:

Пример ODS

Модель данных "звезда"

В ходе проверки данных было обнаружено, что у некоторых записей в поле "duration_sec" стоит значение "0". В таких случаях значение заменяется на среднюю продолжительность треков за дату получения.

Следующим шагом, данные из ODS слоя передаются в DDS слой в виде "звезды" для дальнейших аналитических запросов. За это отвечает DAG - from_ods_to_dds_pg.py. В нем применяется SQLExecuteQueryOperator, передающий данные с помощью SQL скриптов.

Диаграмма DDS слоя:

Диаграмма_DDS

Создание витрин данных для BI системы

В данном проекте, для наглядности, были созданы три витрины данных: "avg_song_duration_by_country", "artist_appearances_by_date" и "expected_artist_royalties_by_date". Для их заполнения используется DAG - from_dds_to_dm_pg.py.

В качестве BI системы применяется Metabase, в которой строятся графики и дашборды по имеющимся витринам данных.

Пример визуализации по витрине "artist_appearances_by_date":

Пример визуализации

About

ETL пайплайн для ежедневных данных. Весь путь от API источника до анализа витрин данных в BI системе

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages