Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
## Improvements

- `GdsSessions.get_or_create` now allows to specify the `aura_instance_id` instead of `uri` as part of the `db_connection`. This is required if the instance id could not be derived from the provided database connection URI such as for Multi-Database instances.
- `GdsSessions.estimate` now recommends smaller sizes such as `2GB`. Also allows specifying property and label counts for better estimates.

## Other changes

Expand Down
3 changes: 3 additions & 0 deletions doc/modules/ROOT/pages/graph-analytics-serverless.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ memory = sessions.estimate(
node_count=20,
relationship_count=50,
algorithm_categories=[AlgorithmCategory.CENTRALITY, AlgorithmCategory.NODE_EMBEDDING],
node_label_count=1,
node_property_count=1,
relationship_property_count=1
)
----

Expand Down
4 changes: 2 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ style skip_notebooks="false":
convert-notebooks:
./scripts/nb2doc/convert.sh

unit-tests:
pytest tests/unit
unit-tests extra_options="":
pytest tests/unit {{extra_options}}

# just it test true "--durations=20"
it filter="" enterprise="true" extra_options="":
Expand Down
16 changes: 13 additions & 3 deletions src/graphdatascience/session/aura_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,16 +305,26 @@ def wait_for_instance_running(
return WaitResult.from_error(f"Instance is not running after waiting for {waited_time} seconds")

def estimate_size(
self, node_count: int, relationship_count: int, algorithm_categories: list[AlgorithmCategory]
self,
node_count: int,
node_label_count: int,
node_property_count: int,
relationship_count: int,
relationship_property_count: int,
algorithm_categories: list[AlgorithmCategory],
) -> EstimationDetails:
data = {
"node_count": node_count,
"node_label_count": node_label_count,
"node_property_count": node_property_count,
"relationship_count": relationship_count,
"relationship_property_count": relationship_property_count,
"algorithm_categories": [i.value for i in algorithm_categories],
"instance_type": "dsenterprise",
}

response = self._request_session.post(f"{self._base_uri}/{AuraApi.API_VERSION}/instances/sizing", json=data)
response = self._request_session.post(
f"{self._base_uri}/{AuraApi.API_VERSION}/graph-analytics/sessions/sizing", json=data
)
self._check_resp(response)

return EstimationDetails.from_json(response.json()["data"])
Expand Down
23 changes: 21 additions & 2 deletions src/graphdatascience/session/aura_api_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,8 @@ def from_json(cls, json: dict[str, Any]) -> InstanceCreateDetails:

@dataclass(repr=True, frozen=True)
class EstimationDetails:
min_required_memory: str
estimated_memory: str
recommended_size: str
did_exceed_maximum: bool

@classmethod
def from_json(cls, json: dict[str, Any]) -> EstimationDetails:
Expand All @@ -181,6 +180,26 @@ def from_json(cls, json: dict[str, Any]) -> EstimationDetails:

return cls(**{f.name: json[f.name] for f in fields})

def exceeds_recommended(self) -> bool:
return EstimationDetails._memory_in_bytes(self.estimated_memory) > EstimationDetails._memory_in_bytes(
self.recommended_size
)

@staticmethod
def _memory_in_bytes(size: str) -> float:
size_str = size.upper().strip()
# treat GB, Gi and G the same as its only used for comparing it internally
size_str = size_str.removesuffix("B").removesuffix("I")

if size_str.endswith("G"):
return float(size_str[:-1]) * 1024**3 # 1GB = 1024^3 bytes
elif size_str.endswith("M"):
return float(size_str[:-1]) * 1024**2 # 1MB = 1024^2 bytes
elif size_str.endswith("K"):
return float(size_str[:-1]) * 1024 # 1KB = 1024 bytes
else:
return float(size_str) # assume bytes


class WaitResult(NamedTuple):
connection_url: str
Expand Down
22 changes: 18 additions & 4 deletions src/graphdatascience/session/dedicated_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,29 @@ def estimate(
self,
node_count: int,
relationship_count: int,
algorithm_categories: list[AlgorithmCategory] | None = None,
algorithm_categories: list[AlgorithmCategory] | list[str] | None = None,
node_label_count: int = 0,
node_property_count: int = 0,
relationship_property_count: int = 0,
) -> SessionMemory:
if algorithm_categories is None:
algorithm_categories = []
estimation = self._aura_api.estimate_size(node_count, relationship_count, algorithm_categories)
else:
algorithm_categories = [
AlgorithmCategory(cat) if isinstance(cat, str) else cat for cat in algorithm_categories
]
estimation = self._aura_api.estimate_size(
node_count=node_count,
node_label_count=node_label_count,
node_property_count=node_property_count,
relationship_count=relationship_count,
relationship_property_count=relationship_property_count,
algorithm_categories=algorithm_categories,
)

if estimation.did_exceed_maximum:
if estimation.exceeds_recommended():
warnings.warn(
f"The estimated memory `{estimation.min_required_memory}` exceeds the maximum size"
f"The estimated memory `{estimation.estimated_memory}` exceeds the maximum size"
f" supported by your Aura project (`{estimation.recommended_size}`).",
ResourceWarning,
)
Expand Down
47 changes: 36 additions & 11 deletions src/graphdatascience/session/gds_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ def __init__(self, api_credentials: AuraAPICredentials) -> None:
"""
Initializes a new instance of the GdsSessions class.

Args:
api_credentials (AuraAPICredentials): The Aura API credentials used for establishing a connection.
Parameters
----------
api_credentials
The Aura API credentials used for establishing a connection.
"""
aura_env = os.environ.get("AURA_ENV")
aura_api = AuraApi(
Expand All @@ -71,22 +73,45 @@ def estimate(
self,
node_count: int,
relationship_count: int,
algorithm_categories: list[AlgorithmCategory] | None = None,
algorithm_categories: list[AlgorithmCategory] | list[str] | None = None,
node_label_count: int = 0,
node_property_count: int = 0,
relationship_property_count: int = 0,
) -> SessionMemory:
"""
Estimates the memory required for a session with the given node and relationship counts.

Args:
node_count (int): The number of nodes.
relationship_count (int): The number of relationships.
algorithm_categories (list[AlgorithmCategory] | None): The algorithm categories to consider.

Returns:
SessionMemory: The estimated memory required for the session.
Parameters
----------
node_count
Number of nodes.
relationship_count
Number of relationships.
algorithm_categories
The algorithm categories to consider.
node_label_count
Number of node labels.
node_property_count
Number of node properties.
relationship_property_count
Number of relationship properties.


Returns
-------
SessionMemory
The estimated memory required for the session.
"""
if algorithm_categories is None:
algorithm_categories = []
return self._impl.estimate(node_count, relationship_count, algorithm_categories)
return self._impl.estimate(
node_count=node_count,
relationship_count=relationship_count,
algorithm_categories=algorithm_categories,
node_label_count=node_label_count,
node_property_count=node_property_count,
relationship_property_count=relationship_property_count,
)

def available_cloud_locations(self) -> list[CloudLocation]:
"""
Expand Down
21 changes: 17 additions & 4 deletions tests/unit/session/test_dedicated_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(
self.id_counter = 0
self.time = 0
self._status_after_creating = status_after_creating
self._size_estimation = size_estimation or EstimationDetails("1GB", "8GB", False)
self._size_estimation = size_estimation or EstimationDetails("1GB", "8GB")
self._console_user = console_user
self._admin_user = admin_user

Expand Down Expand Up @@ -225,7 +225,13 @@ def project_details(self) -> ProjectDetails:
return ProjectDetails(id=self._project_id, cloud_locations={CloudLocation("aws", "leipzig-1")})

def estimate_size(
self, node_count: int, relationship_count: int, algorithm_categories: list[AlgorithmCategory]
self,
node_count: int,
node_label_count: int,
node_property_count: int,
relationship_count: int,
relationship_property_count: int,
algorithm_categories: list[AlgorithmCategory],
) -> EstimationDetails:
return self._size_estimation

Expand Down Expand Up @@ -893,14 +899,21 @@ def test_create_waiting_forever(


def test_estimate_size() -> None:
aura_api = FakeAuraApi(size_estimation=EstimationDetails("1GB", "8GB", False))
aura_api = FakeAuraApi(size_estimation=EstimationDetails("1GB", "8GB"))
sessions = DedicatedSessions(aura_api)

assert sessions.estimate(1, 1, [AlgorithmCategory.CENTRALITY]) == SessionMemory.m_8GB


def test_estimate_str_categories_size() -> None:
aura_api = FakeAuraApi(size_estimation=EstimationDetails("1GB", "8GB"))
sessions = DedicatedSessions(aura_api)

assert sessions.estimate(1, 1, ["centrality"]) == SessionMemory.m_8GB


def test_estimate_size_exceeds() -> None:
aura_api = FakeAuraApi(size_estimation=EstimationDetails("16GB", "8GB", True))
aura_api = FakeAuraApi(size_estimation=EstimationDetails("16GB", "8GB"))
sessions = DedicatedSessions(aura_api)

with pytest.warns(
Expand Down
39 changes: 30 additions & 9 deletions tests/unit/test_aura_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,11 +586,11 @@ def test_dont_wait_forever_for_session(requests_mock: Mocker, caplog: LogCapture

with caplog.at_level(logging.DEBUG):
assert (
"Session `id0` is not running after 0.2 seconds"
in api.wait_for_session_running("id0", sleep_time=0.05, max_wait_time=0.2).error
"Session `id0` is not running after 0.01 seconds"
in api.wait_for_session_running("id0", sleep_time=0.001, max_wait_time=0.01).error
)

assert "Session `id0` is not yet running. Current status: Creating Host: foo.bar. Retrying in 0.1" in caplog.text
assert "Session `id0` is not yet running. Current status: Creating Host: foo.bar. Retrying in 0.001" in caplog.text


def test_wait_for_session_running(requests_mock: Mocker) -> None:
Expand Down Expand Up @@ -1024,11 +1024,11 @@ def test_dont_wait_forever(requests_mock: Mocker, caplog: LogCaptureFixture) ->

with caplog.at_level(logging.DEBUG):
assert (
"Instance is not running after waiting for 0.7"
in api.wait_for_instance_running("id0", max_wait_time=0.7).error
"Instance is not running after waiting for 0.01"
in api.wait_for_instance_running("id0", max_wait_time=0.01, sleep_time=0.001).error
)

assert "Instance `id0` is not yet running. Current status: creating. Retrying in 0.2 seconds..." in caplog.text
assert "Instance `id0` is not yet running. Current status: creating. Retrying in 0.001 seconds..." in caplog.text


def test_wait_for_instance_running(requests_mock: Mocker) -> None:
Expand Down Expand Up @@ -1099,12 +1099,14 @@ def test_wait_for_instance_deleting(requests_mock: Mocker) -> None:
def test_estimate_size(requests_mock: Mocker) -> None:
mock_auth_token(requests_mock)
requests_mock.post(
"https://api.neo4j.io/v1/instances/sizing",
json={"data": {"did_exceed_maximum": True, "min_required_memory": "307GB", "recommended_size": "96GB"}},
"https://api.neo4j.io/v1/graph-analytics/sessions/sizing",
json={"data": {"estimated_memory": "3070GB", "recommended_size": "512GB"}},
)

api = AuraApi("", "", project_id="some-tenant")
assert api.estimate_size(100, 10, [AlgorithmCategory.NODE_EMBEDDING]) == EstimationDetails("307GB", "96GB", True)
assert api.estimate_size(100, 1, 1, 10, 1, [AlgorithmCategory.NODE_EMBEDDING]) == EstimationDetails(
estimated_memory="3070GB", recommended_size="512GB"
)


def test_extract_id() -> None:
Expand Down Expand Up @@ -1215,3 +1217,22 @@ def test_parse_session_info_without_optionals() -> None:
project_id="tenant-1",
user_id="user-1",
)


def test_estimate_size_parsing() -> None:
assert EstimationDetails._memory_in_bytes("8GB") == 8589934592
assert EstimationDetails._memory_in_bytes("8G") == 8589934592
assert EstimationDetails._memory_in_bytes("512MB") == 536870912
assert EstimationDetails._memory_in_bytes("256KB") == 262144
assert EstimationDetails._memory_in_bytes("1024B") == 1024
assert EstimationDetails._memory_in_bytes("12345") == 12345
assert EstimationDetails._memory_in_bytes("8Gi") == 8589934592
assert EstimationDetails._memory_in_bytes("8gb") == 8589934592


def test_estimate_exceeds_maximum() -> None:
estimation = EstimationDetails(estimated_memory="16Gi", recommended_size="8Gi")
assert estimation.exceeds_recommended() is True

estimation = EstimationDetails(estimated_memory="8Gi", recommended_size="16Gi")
assert estimation.exceeds_recommended() is False