diff --git a/src/obelisk/asynchronous/core.py b/src/obelisk/asynchronous/core.py index e4eff93..65866a8 100644 --- a/src/obelisk/asynchronous/core.py +++ b/src/obelisk/asynchronous/core.py @@ -126,7 +126,9 @@ def check_metric_type(self) -> Self: return self -def serialize_comma_string(input: Any, handler: SerializerFunctionWrapHandler) -> str | None: +def serialize_comma_string( + input: Any, handler: SerializerFunctionWrapHandler +) -> str | None: if val := handler(input): return ",".join(val) return None @@ -140,11 +142,16 @@ class QueryParams(BaseModel): Contrary to the name, this does not correlate directly to URL query parameters sent to Obelisk. """ + dataset: str - groupBy: Annotated[list[FieldName] | None, WrapSerializer(serialize_comma_string)] = None + groupBy: Annotated[ + list[FieldName] | None, WrapSerializer(serialize_comma_string) + ] = None """List of Field Names to aggregate by as defined in Obelisk docs, None selects the server-side defaults.""" aggregator: Aggregator | None = None - fields: Annotated[list[FieldName] | None, WrapSerializer(serialize_comma_string)] = None + fields: Annotated[ + list[FieldName] | None, WrapSerializer(serialize_comma_string) + ] = None """List of Field Names as defined in Obelisk docs, None selects the server-side defaults.""" orderBy: Annotated[list[str] | None, WrapSerializer(serialize_comma_string)] = None """List of Field Names, with their potential prefixes and suffixes, to select ordering. None user server defaults.""" @@ -170,7 +177,9 @@ def check_datatype_needed(self) -> Self: return self def to_dict(self) -> dict[str, Any]: - return self.model_dump(exclude_none=True, by_alias=True, mode='json', exclude={"dataset"}) + return self.model_dump( + exclude_none=True, by_alias=True, mode="json", exclude={"dataset"} + ) class ChunkedParams(BaseModel): @@ -180,6 +189,7 @@ class ChunkedParams(BaseModel): for example processing weeks of data one hour at a time. This limits memory useage. """ + dataset: str groupBy: list[FieldName] | None = None aggregator: Aggregator | None = None @@ -228,6 +238,7 @@ def chunks(self) -> Iterator[QueryParams]: class QueryResult(BaseModel): """The data returned by a single chunk fetch""" + cursor: str | None = None """Cursors always point to the next page of data matched by filters. They are none if there is no more data, they do not consider datapoint count limits.""" @@ -327,7 +338,7 @@ async def query(self, params: QueryParams) -> list[Datapoint]: result_limit = params.limit # Obelisk CORE does not actually stop emitting a cursor when done, limit serves as page limit - params.limit = self.page_limit + params.limit = min(self.page_limit, result_limit) while True: result: QueryResult = await self.fetch_single_chunk(params) diff --git a/src/obelisk/types/__init__.py b/src/obelisk/types/__init__.py index 2b54443..7bb2cc3 100644 --- a/src/obelisk/types/__init__.py +++ b/src/obelisk/types/__init__.py @@ -38,6 +38,7 @@ class TimestampPrecision(str, Enum): class Datapoint(BaseModel): """An Obelisk Classic / HFS datapoint. May contain more or less fields""" + timestamp: int value: Any dataset: str | None = None @@ -46,19 +47,19 @@ class Datapoint(BaseModel): userId: int | None = None """This field is only used on HFS, and has a different name in some deployments.""" - model_config = ConfigDict( - extra='allow' - ) + model_config = ConfigDict(extra="allow") class QueryResult(BaseModel): """Result of a query""" + items: list[Datapoint] cursor: str | None = None class ObeliskKind(str, Enum): """Defines which variety of Obelisk a Client should use, and provides some URLs and config information.""" + CLASSIC = "classic" HFS = "hfs" CORE = "core"