Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/obelisk/asynchronous/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ def check_metric_type(self) -> Self:
return self


def serialize_comma_string(input: Any, handler: SerializerFunctionWrapHandler) -> str | None:
def serialize_comma_string(
input: Any, handler: SerializerFunctionWrapHandler
) -> str | None:
if val := handler(input):
return ",".join(val)
return None
Expand All @@ -140,11 +142,16 @@ class QueryParams(BaseModel):

Contrary to the name, this does not correlate directly to URL query parameters sent to Obelisk.
"""

dataset: str
groupBy: Annotated[list[FieldName] | None, WrapSerializer(serialize_comma_string)] = None
groupBy: Annotated[
list[FieldName] | None, WrapSerializer(serialize_comma_string)
] = None
"""List of Field Names to aggregate by as defined in Obelisk docs, None selects the server-side defaults."""
aggregator: Aggregator | None = None
fields: Annotated[list[FieldName] | None, WrapSerializer(serialize_comma_string)] = None
fields: Annotated[
list[FieldName] | None, WrapSerializer(serialize_comma_string)
] = None
"""List of Field Names as defined in Obelisk docs, None selects the server-side defaults."""
orderBy: Annotated[list[str] | None, WrapSerializer(serialize_comma_string)] = None
"""List of Field Names, with their potential prefixes and suffixes, to select ordering. None user server defaults."""
Expand All @@ -170,7 +177,9 @@ def check_datatype_needed(self) -> Self:
return self

def to_dict(self) -> dict[str, Any]:
return self.model_dump(exclude_none=True, by_alias=True, mode='json', exclude={"dataset"})
return self.model_dump(
exclude_none=True, by_alias=True, mode="json", exclude={"dataset"}
)


class ChunkedParams(BaseModel):
Expand All @@ -180,6 +189,7 @@ class ChunkedParams(BaseModel):
for example processing weeks of data one hour at a time.
This limits memory useage.
"""

dataset: str
groupBy: list[FieldName] | None = None
aggregator: Aggregator | None = None
Expand Down Expand Up @@ -228,6 +238,7 @@ def chunks(self) -> Iterator[QueryParams]:

class QueryResult(BaseModel):
"""The data returned by a single chunk fetch"""

cursor: str | None = None
"""Cursors always point to the next page of data matched by filters.
They are none if there is no more data, they do not consider datapoint count limits."""
Expand Down Expand Up @@ -327,7 +338,7 @@ async def query(self, params: QueryParams) -> list[Datapoint]:
result_limit = params.limit

# Obelisk CORE does not actually stop emitting a cursor when done, limit serves as page limit
params.limit = self.page_limit
params.limit = min(self.page_limit, result_limit)

while True:
result: QueryResult = await self.fetch_single_chunk(params)
Expand Down
7 changes: 4 additions & 3 deletions src/obelisk/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class TimestampPrecision(str, Enum):

class Datapoint(BaseModel):
"""An Obelisk Classic / HFS datapoint. May contain more or less fields"""

timestamp: int
value: Any
dataset: str | None = None
Expand All @@ -46,19 +47,19 @@ class Datapoint(BaseModel):
userId: int | None = None
"""This field is only used on HFS, and has a different name in some deployments."""

model_config = ConfigDict(
extra='allow'
)
model_config = ConfigDict(extra="allow")


class QueryResult(BaseModel):
"""Result of a query"""

items: list[Datapoint]
cursor: str | None = None


class ObeliskKind(str, Enum):
"""Defines which variety of Obelisk a Client should use, and provides some URLs and config information."""

CLASSIC = "classic"
HFS = "hfs"
CORE = "core"
Expand Down