Add graph-store PPR E2E wiring#655
Conversation
|
|
||
| logger = Logger() | ||
|
|
||
| # Default number of inference processes per machine incase one isnt provided in inference args |
There was a problem hiding this comment.
In Graph Store mode, the source of truth should be cluster_info.num_processes_per_compute, not a local CPU/GPU heuristic. The previous fallback could make inference spawn a different number of compute processes than storage expected, causing storage rendezvous failures like “only N/M clients joined.”
| def parse_sampler_options(args: Mapping[str, str]) -> Optional[SamplerOptions]: | ||
| sampler_type = args.get("sampler_type", "khop").strip().lower().replace("-", "_") | ||
| if sampler_type == "": | ||
| sampler_type = "khop" | ||
|
|
||
| if sampler_type in {"khop", "k_hop", "neighbor", "neighbor_sampler"}: | ||
| return None | ||
|
|
||
| if sampler_type != "ppr": | ||
| raise ValueError( | ||
| f"Unsupported sampler_type={sampler_type}. Expected one of: khop, ppr." | ||
| ) | ||
|
|
||
| max_ppr_nodes = args.get("ppr_max_nodes") | ||
| if max_ppr_nodes is None: | ||
| max_ppr_nodes = args.get("ppr_max_ppr_nodes", "50") | ||
|
|
||
| num_neighbors_per_hop = args.get("ppr_neighbors_per_hop") | ||
| if num_neighbors_per_hop is None: | ||
| num_neighbors_per_hop = args.get("ppr_num_neighbors_per_hop", "1000") | ||
|
|
||
| return PPRSamplerOptions( | ||
| alpha=float(args.get("ppr_alpha", "0.5")), | ||
| eps=float(args.get("ppr_eps", "0.0001")), | ||
| max_ppr_nodes=int(max_ppr_nodes), | ||
| num_neighbors_per_hop=int(num_neighbors_per_hop), | ||
| max_fetch_iterations=_parse_optional_int(args.get("ppr_max_fetch_iterations")), | ||
| ) |
There was a problem hiding this comment.
instead of this can we encode PPRSamplerOptions as a json dict in the config?
Also IMO it's a bit weird to have the sampling parameterized like this since I don't think the model can / should be the same for PPR vs khop sampling right?
There was a problem hiding this comment.
Also IMO it's a bit weird to have the sampling parameterized like this since I don't think the model can / should be the same for PPR vs khop sampling right?
Ping on this :)
|
/e2e_test |
GiGL Automation@ 20:52:09UTC : 🔄 @ 22:16:13UTC : ✅ Workflow completed successfully. |
kmontemayor2-sc
left a comment
There was a problem hiding this comment.
Actually, is there a reason we want a full e2e test here? Why not just example the graph store integration test 1?
We don't really need the "full" e2e test suite here right? And I feel like doing it this way makes our examples more confusing.
| def parse_sampler_options(args: Mapping[str, str]) -> Optional[SamplerOptions]: | ||
| sampler_type = args.get("sampler_type", "khop").strip().lower().replace("-", "_") | ||
| if sampler_type == "": | ||
| sampler_type = "khop" | ||
|
|
||
| if sampler_type in {"khop", "k_hop", "neighbor", "neighbor_sampler"}: | ||
| return None | ||
|
|
||
| if sampler_type != "ppr": | ||
| raise ValueError( | ||
| f"Unsupported sampler_type={sampler_type}. Expected one of: khop, ppr." | ||
| ) | ||
|
|
||
| max_ppr_nodes = args.get("ppr_max_nodes") | ||
| if max_ppr_nodes is None: | ||
| max_ppr_nodes = args.get("ppr_max_ppr_nodes", "50") | ||
|
|
||
| num_neighbors_per_hop = args.get("ppr_neighbors_per_hop") | ||
| if num_neighbors_per_hop is None: | ||
| num_neighbors_per_hop = args.get("ppr_num_neighbors_per_hop", "1000") | ||
|
|
||
| return PPRSamplerOptions( | ||
| alpha=float(args.get("ppr_alpha", "0.5")), | ||
| eps=float(args.get("ppr_eps", "0.0001")), | ||
| max_ppr_nodes=int(max_ppr_nodes), | ||
| num_neighbors_per_hop=int(num_neighbors_per_hop), | ||
| max_fetch_iterations=_parse_optional_int(args.get("ppr_max_fetch_iterations")), | ||
| ) |
There was a problem hiding this comment.
Also IMO it's a bit weird to have the sampling parameterized like this since I don't think the model can / should be the same for PPR vs khop sampling right?
Ping on this :)
Summary
Stacked on #645.
Enables PPR sampling in the existing homogeneous Graph Store training and inference entrypoints, then adds a short E2E test that exercises that path.
Changes include:
parse_sampler_optionsforsampler_type: pprtask args.SamplerOptionsthrough homogeneous Graph Store training and inference loaders.local_world_sizefromGraphStoreInfoand fail fast when it disagrees with the cluster topology.