Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition = "2024"

[dependencies]
csv = "1.4.0"
egg = "0.10.0"
libc = "0.2"
oxrdf = "0.3.3"
oxttl = "0.2.3"
Expand Down
4 changes: 4 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ fn regenerate_bindings() {
.allowlist_function("GrB_Vector_extractTuples_BOOL")
.allowlist_function("GrB_vxm")
.allowlist_item("LAGRAPH_MSG_LEN")
.allowlist_item("RPQMatrixOp")
.allowlist_type("RPQMatrixPlan")
.allowlist_type("LAGraph_Graph")
.allowlist_type("LAGraph_Kind")
.allowlist_function("LAGraph_CheckGraph")
Expand All @@ -83,6 +85,8 @@ fn regenerate_bindings() {
.allowlist_function("LAGraph_Delete")
.allowlist_function("LAGraph_Cached_AT")
.allowlist_function("LAGraph_MMRead")
.allowlist_function("LAGraph_RPQMatrix")
.allowlist_function("LAGraph_DestroyRpqMatrixPlan")
.default_enum_style(bindgen::EnumVariation::Rust {
non_exhaustive: false,
})
Expand Down
1 change: 1 addition & 0 deletions src/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl Drop for LagraphGraph {
unsafe impl Send for LagraphGraph {}
unsafe impl Sync for LagraphGraph {}

#[derive(Debug)]
pub struct GraphblasVector {
pub inner: GrB_Vector,
}
Expand Down
29 changes: 29 additions & 0 deletions src/lagraph_sys_generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,32 @@ unsafe extern "C" {
msg: *mut ::std::os::raw::c_char,
) -> ::std::os::raw::c_int;
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
pub enum RPQMatrixOp {
RPQ_MATRIX_OP_LABEL = 0,
RPQ_MATRIX_OP_LOR = 1,
RPQ_MATRIX_OP_CONCAT = 2,
RPQ_MATRIX_OP_KLEENE = 3,
RPQ_MATRIX_OP_KLEENE_L = 4,
RPQ_MATRIX_OP_KLEENE_R = 5,
}
#[repr(C)]
#[derive(Debug, Copy, Clone)]
pub struct RPQMatrixPlan {
pub op: RPQMatrixOp,
pub lhs: *mut RPQMatrixPlan,
pub rhs: *mut RPQMatrixPlan,
pub mat: GrB_Matrix,
pub res_mat: GrB_Matrix,
}
unsafe extern "C" {
pub fn LAGraph_RPQMatrix(
nnz: *mut GrB_Index,
plan: *mut RPQMatrixPlan,
msg: *mut ::std::os::raw::c_char,
) -> GrB_Info;
}
unsafe extern "C" {
pub fn LAGraph_DestroyRpqMatrixPlan(plan: *mut RPQMatrixPlan) -> GrB_Info;
}
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub mod formats;
pub mod graph;
pub mod rpq;
pub mod sparql;
#[allow(unused_unsafe, dead_code)]
pub(crate) mod utils;
pub mod utils;

pub mod lagraph_sys;
54 changes: 54 additions & 0 deletions src/rpq/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//! Regular Path Query (RPQ) evaluation over edge-labeled graphs.
//! ```rust,ignore
//! use pathrex::sparql::parse_rpq;
//! use pathrex::rpq::{RpqEvaluator, nfarpq::NfaRpqEvaluator};
//!
//! let triple = parse_rpq("SELECT ?x ?y WHERE { ?x <knows>/<likes>* ?y . }")?;
//! let result = NfaRpqEvaluator.evaluate(&triple.subject, &triple.path, &triple.object, &graph)?;
//! ```

pub mod rpqmatrix;

use crate::graph::GraphDecomposition;
use crate::graph::GraphblasVector;
use crate::sparql::ExtractError;
use spargebra::SparqlSyntaxError;
use spargebra::algebra::PropertyPathExpression;
use spargebra::term::TermPattern;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum RpqError {
#[error("SPARQL syntax error: {0}")]
Parse(#[from] SparqlSyntaxError),

#[error("query extraction error: {0}")]
Extract(#[from] ExtractError),

#[error("unsupported path expression: {0}")]
UnsupportedPath(String),

#[error("label not found in graph: '{0}'")]
LabelNotFound(String),

#[error("vertex not found in graph: '{0}'")]
VertexNotFound(String),

#[error("GraphBLAS/LAGraph error: {0}")]
GraphBlas(String),
}

#[derive(Debug)]
pub struct RpqResult {
pub reachable: GraphblasVector,
}

pub trait RpqEvaluator {
fn evaluate<G: GraphDecomposition>(
&self,
subject: &TermPattern,
path: &PropertyPathExpression,
object: &TermPattern,
graph: &G,
) -> Result<RpqResult, RpqError>;
}
214 changes: 214 additions & 0 deletions src/rpq/rpqmatrix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
//! Plan-based RPQ evaluation using `LAGraph_RPQMatrix`.

use std::ptr::null_mut;

use egg::{Id, RecExpr, define_language};
use spargebra::algebra::PropertyPathExpression;
use spargebra::term::TermPattern;

use crate::graph::{GraphDecomposition, GraphblasVector, ensure_grb_init};
use crate::lagraph_sys::*;
use crate::rpq::{RpqError, RpqEvaluator, RpqResult};
use crate::{grb_ok, la_ok};

unsafe impl Send for RPQMatrixPlan {}

define_language! {
pub enum RpqPlan {
Label(String),
"/" = Seq([Id; 2]),
"|" = Alt([Id; 2]),
"*" = Star([Id; 1]),
}
}

/// Compile a [`PropertyPathExpression`] into [`RecExpr<RpqPlan>`].
pub fn to_expr(path: &PropertyPathExpression) -> Result<RecExpr<RpqPlan>, RpqError> {
let mut expr = RecExpr::default();
to_expr_aux(path, &mut expr)?;
Ok(expr)
}

fn to_expr_aux(
path: &PropertyPathExpression,
expr: &mut RecExpr<RpqPlan>,
) -> Result<Id, RpqError> {
match path {
PropertyPathExpression::NamedNode(nn) => {
Ok(expr.add(RpqPlan::Label(nn.as_str().to_owned())))
}

PropertyPathExpression::Sequence(lhs, rhs) => {
let l = to_expr_aux(lhs, expr)?;
let r = to_expr_aux(rhs, expr)?;
Ok(expr.add(RpqPlan::Seq([l, r])))
}

PropertyPathExpression::Alternative(lhs, rhs) => {
let l = to_expr_aux(lhs, expr)?;
let r = to_expr_aux(rhs, expr)?;
Ok(expr.add(RpqPlan::Alt([l, r])))
}

PropertyPathExpression::ZeroOrMore(inner) => {
let i = to_expr_aux(inner, expr)?;
Ok(expr.add(RpqPlan::Star([i])))
}

PropertyPathExpression::OneOrMore(inner) => {
let e = to_expr_aux(inner, expr)?;
let s = expr.add(RpqPlan::Star([e]));
Ok(expr.add(RpqPlan::Seq([e, s])))
}

PropertyPathExpression::ZeroOrOne(_) => Err(RpqError::UnsupportedPath(
"ZeroOrOne (?) is not supported by RPQMatrix".into(),
)),

PropertyPathExpression::Reverse(_) => Err(RpqError::UnsupportedPath(
"Reverse paths are not supported".into(),
)),

PropertyPathExpression::NegatedPropertySet(_) => Err(RpqError::UnsupportedPath(
"NegatedPropertySet paths are not supported".into(),
)),
}
}

/// Convert a [`RecExpr<RpqPlan>`] into the flat [`RPQMatrixPlan`] array that
/// `LAGraph_RPQMatrix` expects.
pub fn materialize<G: GraphDecomposition>(
expr: &RecExpr<RpqPlan>,
graph: &G,
) -> Result<Vec<RPQMatrixPlan>, RpqError> {
let null_plan = RPQMatrixPlan {
op: RPQMatrixOp::RPQ_MATRIX_OP_LABEL,
lhs: null_mut(),
rhs: null_mut(),
mat: null_mut(),
res_mat: null_mut(),
};
let mut plans = vec![null_plan; expr.len()];

for (id, node) in expr.as_ref().iter().enumerate() {
plans[id] = match node {
RpqPlan::Label(label) => {
let lg = graph
.get_graph(label)
.map_err(|_| RpqError::LabelNotFound(label.clone()))?;
let mat = unsafe { (*lg.inner).A };
RPQMatrixPlan {
op: RPQMatrixOp::RPQ_MATRIX_OP_LABEL,
lhs: null_mut(),
rhs: null_mut(),
mat,
res_mat: null_mut(),
}
}

RpqPlan::Seq([l, r]) => RPQMatrixPlan {
op: RPQMatrixOp::RPQ_MATRIX_OP_CONCAT,
lhs: unsafe { plans.as_mut_ptr().add(usize::from(*l)) },
rhs: unsafe { plans.as_mut_ptr().add(usize::from(*r)) },
mat: null_mut(),
res_mat: null_mut(),
},

RpqPlan::Alt([l, r]) => RPQMatrixPlan {
op: RPQMatrixOp::RPQ_MATRIX_OP_LOR,
lhs: unsafe { plans.as_mut_ptr().add(usize::from(*l)) },
rhs: unsafe { plans.as_mut_ptr().add(usize::from(*r)) },
mat: null_mut(),
res_mat: null_mut(),
},

RpqPlan::Star([i]) => RPQMatrixPlan {
op: RPQMatrixOp::RPQ_MATRIX_OP_KLEENE,
lhs: null_mut(),
rhs: unsafe { plans.as_mut_ptr().add(usize::from(*i)) },
mat: null_mut(),
res_mat: null_mut(),
},
};
}

Ok(plans)
}

/// RPQ evaluator backed by `LAGraph_RPQMatrix`.
pub struct RpqMatrixEvaluator;

impl RpqEvaluator for RpqMatrixEvaluator {
fn evaluate<G: GraphDecomposition>(
&self,
subject: &TermPattern,
path: &PropertyPathExpression,
object: &TermPattern,
graph: &G,
) -> Result<RpqResult, RpqError> {
if !matches!(object, TermPattern::Variable(_)) {
return Err(RpqError::UnsupportedPath(
"bound object term is not yet supported by RpqMatrixEvaluator".into(),
));
}

ensure_grb_init().map_err(|e| RpqError::GraphBlas(e.to_string()))?;

let n = graph.num_nodes() as GrB_Index;

let expr = to_expr(path)?;

let mut plans = materialize(&expr, graph)?;
let root_ptr = unsafe { plans.as_mut_ptr().add(plans.len() - 1) };

let mut nnz: GrB_Index = 0;
la_ok!(LAGraph_RPQMatrix(&mut nnz, root_ptr))
.map_err(|e| RpqError::GraphBlas(e.to_string()))?;

let res_mat = unsafe { (*root_ptr).res_mat };

let src = unsafe {
GraphblasVector::new_bool(n).map_err(|e| RpqError::GraphBlas(e.to_string()))?
};
match subject {
TermPattern::NamedNode(nn) => {
let id = graph
.get_node_id(nn.as_str())
.ok_or_else(|| RpqError::VertexNotFound(nn.as_str().to_owned()))?
as GrB_Index;
grb_ok!(GrB_Vector_setElement_BOOL(src.inner, true, id))
.map_err(|e| RpqError::GraphBlas(e.to_string()))?;
}
TermPattern::Variable(_) => {
for i in 0..n {
grb_ok!(GrB_Vector_setElement_BOOL(src.inner, true, i))
.map_err(|e| RpqError::GraphBlas(e.to_string()))?;
}
}
_ => {
return Err(RpqError::UnsupportedPath(
"subject must be a variable or named node".into(),
));
}
}

let result = unsafe {
GraphblasVector::new_bool(n).map_err(|e| RpqError::GraphBlas(e.to_string()))?
};
grb_ok!(GrB_vxm(
result.inner,
null_mut(),
null_mut(),
GrB_LOR_LAND_SEMIRING_BOOL,
src.inner,
res_mat,
null_mut(),
))
.map_err(|e| RpqError::GraphBlas(e.to_string()))?;

grb_ok!(LAGraph_DestroyRpqMatrixPlan(root_ptr))
.map_err(|e| RpqError::GraphBlas(e.to_string()))?;

Ok(RpqResult { reachable: result })
}
}
20 changes: 20 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
use crate::{graph::*, lagraph_sys::*};
use std::{fmt::Display, sync::Arc};

pub fn build_graph(edges: &[(&str, &str, &str)]) -> <InMemory as Backend>::Graph {
let builder = InMemoryBuilder::new();
let edges = edges
.iter()
.cloned()
.map(|(s, t, l)| {
Ok(Edge {
source: s.to_string(),
label: l.to_string(),
target: t.to_string(),
})
})
.collect::<Vec<Result<Edge, GraphError>>>();
builder
.with_stream(edges.into_iter())
.expect("Should insert edges stream")
.build()
.expect("build must succeed")
}

pub struct CountOutput<E: std::error::Error>(pub usize, std::marker::PhantomData<E>);

impl<E: std::error::Error> CountOutput<E> {
Expand Down
Loading
Loading