Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data-ingestion/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ authors = [
readme = "README.md"
requires-python = "==3.13.*"
dependencies = [
"dagster==1.11.10",
"dagster>=1.11.10",
"dagster-docker>=0.27.10",
"dagster-postgres>=0.27.10",
"massive>=2.0.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,28 @@
import pandas as pd
from dagster import AssetExecutionContext
from datetime import datetime
from typing import Literal, List
from typing import Literal, List, Annotated

from finwar_data_ingestion.resources import PostgreSQLResource, MassiveResource


MAX_MASSIVE_REQ_RETRY = 3


def resolve_maybe_file(context: dg.ResolutionContext, raw: str) -> str:
if raw.startswith('file://'):
with open(raw[7:], 'r') as file:
return file.read().rstrip()
else:
return raw


StringOrFile = Annotated[
str,
dg.Resolver(resolve_maybe_file, model_field_type=str),
]


class StocksIntradayHistoryComponent(dg.Component, dg.Model, dg.Resolvable):
"""Represents historical stock quotation for one or more ticker symbols.

Expand All @@ -25,14 +39,20 @@ class StocksIntradayHistoryComponent(dg.Component, dg.Model, dg.Resolvable):
time_partitioning (Literal["daily", "weekly", "monthly"]): The partitioning frequency
for the data retrieval. The partitioning unit should be shorter or equal to the
requested time window. Default is ``"monthly"``.
massive_api_key (str): API key for authenticating with the Massive API.
massive_api_key (StringOrFile): API key for authenticating with the Massive API. It can
either be defined as a raw string or as a filepath prefixed by "file://" leading to
the file containing the secret value.
precision (Literal["second", "minute", "day", "week", "month", "quarter", "year"]):
The granularity of the bars. Must be finer than or equal to the requested
time window. Default is ``"day"``.
adjusted (bool): Whether to return data adjusted for splits. Default is ``False``.
table_name (str): The name of the created TimescaleDB table. Default is ``"stocks_history"``.
timescaledb_username (str): The username for the TimescaleDB authentication.
timescaledb_password (str): The password for the TimescaleDB authentication.
timescaledb_username (str): The username for the TimescaleDB authentication. It can
either be defined as a raw string or as a filepath prefixed by "file://" leading to
the file containing the secret value.
timescaledb_password (str): The password for the TimescaleDB authentication. It can
either be defined as a raw string or as a filepath prefixed by "file://" leading to
the file containing the secret value.
timescaledb_host (str): The host of the TimescaleDB database.
timescaledb_port (str): The opened port of the TimescaleDB database.
timescaledb_db (str): The name of the TimescaleDB database.
Expand All @@ -42,12 +62,12 @@ class StocksIntradayHistoryComponent(dg.Component, dg.Model, dg.Resolvable):
start_date: str
end_date: str | None = None
time_partitioning: Literal['daily', 'weekly', 'monthly'] = 'monthly'
massive_api_key: str
massive_api_key: StringOrFile
precision: Literal['second', 'minute', 'day', 'week', 'month', 'quarter', 'year'] = 'day'
adjusted: bool = False
table_name: str = 'stocks_history'
timescaledb_username: str
timescaledb_password: str
timescaledb_username: StringOrFile
timescaledb_password: StringOrFile
timescaledb_host: str
timescaledb_port: str | int
timescaledb_db: str
Expand Down Expand Up @@ -183,27 +203,27 @@ def load_stocks_intraday_history(
time,
symbol,
open,
high,
low,
close,
volume
)
VALUES (
%s,
%s,
%s,
%s,
%s,
%s,
%s
)
ON CONFLICT (time, symbol)
DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume
high,
low,
close,
volume
)
VALUES (
%s,
%s,
%s,
%s,
%s,
%s,
%s
)
ON CONFLICT (time, symbol)
DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume
""",
data,
)
Expand Down
Loading