diff --git a/build.gradle b/build.gradle index 83cb4bd3..3256cae5 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ plugins { - id 'org.jetbrains.kotlin.jvm' version '1.6.21' + id 'org.jetbrains.kotlin.jvm' version '1.8.22' id 'com.github.johnrengelman.shadow' version '5.2.0' id 'antlr' id "kr.motd.sphinx" version "2.10.0" @@ -17,7 +17,7 @@ java { repositories { mavenCentral() - maven { url "https://overture.au.dk/artifactory/libs-release/" } + maven { url "https://jitpack.io" } } test { @@ -35,20 +35,25 @@ dependencies { testImplementation 'io.kotest:kotest-runner-junit5:5.2.3' testImplementation 'org.jetbrains.kotlin:kotlin-test-junit' testImplementation 'org.apache.commons:commons-lang3:3.12.0' - testImplementation 'org.apache.jena:jena-fuseki-main:3.16.0' + testImplementation 'org.apache.jena:jena-fuseki-main:4.10.0' implementation 'com.github.ajalt.clikt:clikt:3.4.2' implementation 'org.antlr:antlr4:4.8' antlr 'org.antlr:antlr4:4.8' implementation 'net.sourceforge.owlapi:org.semanticweb.hermit:1.4.5.519' implementation 'org.slf4j:slf4j-simple:1.7.25' - implementation 'org.apache.jena:apache-jena-libs:3.16.0' - implementation 'org.apache.jena:jena-core:3.16.0' + implementation 'org.apache.jena:apache-jena-libs:4.10.0' + implementation 'org.apache.jena:jena-core:4.10.0' implementation 'org.siani.javafmi:fmu-wrapper:2.26.3' implementation "com.influxdb:influxdb-client-kotlin:2.3.0" implementation 'com.sksamuel.hoplite:hoplite-core:1.4.1' implementation 'com.sksamuel.hoplite:hoplite-yaml:1.4.1' - implementation 'com.github.owlcs:ontapi:2.1.0' + implementation 'com.github.owlcs:ontapi:3.0.5' implementation 'org.jline:jline:3.21.0' + implementation 'com.github.streamreasoning.rsp4j:api:1.1.8' + implementation 'com.github.streamreasoning.rsp4j:yasper:1.1.8' + implementation 'com.github.streamreasoning.rsp4j:io:1.1.8' + implementation 'com.github.streamreasoning.rsp4j:dsms:1.1.8' + implementation 'com.github.streamreasoning.rsp4j:csparql2:1.1.8' } diff --git a/src/main/antlr/While.g4 b/src/main/antlr/While.g4 index 463c8741..9c23d988 100644 --- a/src/main/antlr/While.g4 +++ b/src/main/antlr/While.g4 @@ -31,7 +31,10 @@ SIMULATE : 'simulate'; VALIDATE : 'validate'; CLASSIFY : 'classify'; ADAPT : 'adapt'; +MONITOR : 'monitor'; TICK : 'tick'; +WINDOW : 'window'; +PUSH_STATIC: 'pushStatic'; BREAKPOINT : 'breakpoint'; SUPER : 'super'; DESTROY : 'destroy'; @@ -49,6 +52,8 @@ CLASSIFIES: 'classifies'; RETRIEVES: 'retrieves'; DOMAIN : 'domain'; CONTEXT : 'context'; +STREAMER : 'streamer'; +EMITS : 'emits'; //Keywords: constants TRUE : 'True'; @@ -94,6 +99,9 @@ FMU : 'FMO'; PORT : 'port'; SPARQLMODE : 'SPARQL'; INFLUXMODE : 'INFLUXDB'; +// MONITORTYPE : 'Monitor'; +// QUERYRESULTTYPE : 'QueryResult'; +CLOCK : 'clock'; // Note that the IN, OUT constants are also used in // TypeChecker.kt:translateType in the FMU branch; adapt strings there if // changing the syntax here @@ -117,13 +125,13 @@ namelist : NAME (COMMA NAME)*; program : (class_def)* MAIN statement END (class_def)*; //classes -class_def : (abs=ABSTRACT)? (hidden=HIDE)? CLASS className = NAME (LT namelist GT)? (EXTENDS superType = type)? OPARAN (external=fieldDeclList)? CPARAN +class_def : (abs=ABSTRACT)? (hidden=HIDE)? (streamer=STREAMER)? CLASS className = NAME (LT namelist GT)? (EXTENDS superType = type)? OPARAN (external=fieldDeclList)? CPARAN (internal = fieldDeclInitList)? (models_block)? (classifies_block (retrieves_block)?)? method_def* END; -method_def : (abs=ABSTRACT)? (builtinrule=RULE)? (domainrule=DOMAIN)? (overriding=OVERRIDE)? type NAME OPARAN paramList? CPARAN (statement END)?; +method_def : (abs=ABSTRACT)? (builtinrule=RULE)? (domainrule=DOMAIN)? (overriding=OVERRIDE)? type NAME OPARAN paramList? CPARAN emits_block? (statement END)?; models_block : MODELS owldescription=STRING SEMI #simple_models_block | MODELS OPARAN guard=expression CPARAN owldescription=STRING SEMI models_block #complex_models_block @@ -132,14 +140,19 @@ classifies_block : CLASSIFIES owldescription=STRING SEMI # adapt ; retrieves_block : RETRIEVES selectquery=STRING SEMI # adaptation_retrieves_block ; +emits_block : EMITS OPARAN expression (COMMA expression)* CPARAN; + //Statements statement : SKIP_S SEMI # skip_statment | ((declType = type)? target=expression ASS)? CLASSIFY OPARAN context=expression CPARAN SEMI # classify_statement | ADAPT OPARAN adapter=expression CPARAN SEMI # adapt_statement - | (declType = type)? expression ASS expression SEMI # assign_statement + | (declType = type)? target=expression ASS MONITOR OPARAN registeredQuery=expression (COMMA expression)* CPARAN SEMI # monitor_statement + | (clock = CLOCK)? (declType = type)? expression ASS expression SEMI # assign_statement | ((declType = type)? target=expression ASS)? SUPER OPARAN (expression (COMMA expression)*)? CPARAN SEMI # super_statement | RETURN expression SEMI # return_statement | fmu=expression DOT TICK OPARAN time=expression CPARAN SEMI # tick_statement + | (declType = type)? target=expression ASS WINDOW OPARAN monitor=expression CPARAN SEMI # window_statement + | (declType = type)? target=expression ASS PUSH_STATIC OPARAN sources=expression CPARAN SEMI # pushStatic_statement | ((declType = type)? target=expression ASS)? expression DOT NAME OPARAN (expression (COMMA expression)*)? CPARAN SEMI # call_statement // TODO: allow new statements without assignment | (declType = type)? target=expression ASS NEW newType = type OPARAN (expression (COMMA expression)*)? CPARAN (MODELS owldescription = expression)? SEMI # create_statement @@ -194,6 +207,7 @@ expression : THIS # this_expression type : NAME #simple_type | NAME LT typelist GT #nested_type | FMU OBRACK fmuParamList? CBRACK #fmu_type + // | MONITORTYPE LT typelist GT #monitor_type ; typelist : type (COMMA type)*; param : type NAME; diff --git a/src/main/kotlin/no/uio/microobject/ast/Translate.kt b/src/main/kotlin/no/uio/microobject/ast/Translate.kt index 16fafa6c..398c5780 100644 --- a/src/main/kotlin/no/uio/microobject/ast/Translate.kt +++ b/src/main/kotlin/no/uio/microobject/ast/Translate.kt @@ -22,6 +22,7 @@ class Translate : WhileBaseVisitor() { private val classifiesTable: MutableMap> = mutableMapOf() private val checkClassifiesTable: MutableMap>> = mutableMapOf() private val contextTable : MutableMap = mutableMapOf() + private val streamersTable : MutableMap>> = mutableMapOf() private fun translateModels(ctx : Models_blockContext) : Pair>, String>{ if(ctx is Simple_models_blockContext) @@ -75,6 +76,11 @@ class Translate : WhileBaseVisitor() { addClassifyQuery(cl.className.text, cl.classifies_block(), null) } } + + // create streamer configuration + if (cl.streamer != null) { + streamersTable[cl.className.text] = mutableMapOf() + } val inFields = if(cl.external != null) { var res = listOf() if(cl.external.fieldDecl() != null) { @@ -141,6 +147,14 @@ class Translate : WhileBaseVisitor() { val params = if (nm.paramList() != null) paramListTranslate(nm.paramList()) else listOf() res[nm.NAME().text] = MethodInfo(SkipStmt(ctx!!.start.line), params, nm.builtinrule != null, nm.domainrule != null, cl.className.text, metType) } + if (nm.emits_block() != null) { + if (!streamersTable.containsKey(cl.className.text)) + throw Exception("Class with methods containing emits clause must be a streamer: ${nm.NAME().text}") + + streamersTable[cl.className.text]!![nm.NAME().text] = mutableSetOf() + for(i in 0 until nm.emits_block().expression().size) + streamersTable[cl.className.text]!![nm.NAME().text]!! += visit(nm.emits_block().expression(i)) as Expression + } } table[cl.className.text] = Pair(fields, res) } @@ -179,7 +193,7 @@ class Translate : WhileBaseVisitor() { return Pair( StackEntry(visit(ctx.statement()) as Statement, mutableMapOf(), Names.getObjName("_Entry_"), Names.getStackId()), - StaticTable(fieldTable, methodTable, hierarchy, modelsTable, hidden, owldescr, checkClassifiesTable, contextTable) + StaticTable(fieldTable, methodTable, hierarchy, modelsTable, hidden, owldescr, checkClassifiesTable, contextTable, streamersTable) ) } @@ -402,6 +416,50 @@ class Translate : WhileBaseVisitor() { return SimulationStmt(target, path, res, ctx!!.start.line, declares) } + override fun visitMonitor_statement(ctx: Monitor_statementContext?): ProgramElement { + val target = visit(ctx!!.target) as Location + if(ctx.declType != null) { + val decl = getClassDecl(ctx) + val className = if(decl == null) ERRORTYPE.name else decl!!.className.text + val targetType = TypeChecker.translateType(ctx.declType, className, mutableMapOf()) + target.setType(targetType) + } + + val query = visit(ctx!!.registeredQuery) as Expression + val ll = emptyList().toMutableList() + for(i in 2 until ctx!!.expression().size) + ll += visit(ctx.expression(i)) as Expression + + return MonitorStmt(target, query, ll, ctx!!.start.line, target.getType()) + } + + override fun visitPushStatic_statement(ctx: PushStatic_statementContext?): ProgramElement { + val target = visit(ctx!!.target) as Location + if(ctx.declType != null) { + val decl = getClassDecl(ctx) + val className = if(decl == null) ERRORTYPE.name else decl!!.className.text + val targetType = TypeChecker.translateType(ctx.declType, className, mutableMapOf()) + target.setType(targetType) + } + + val sources = visit(ctx!!.sources) as Expression + + return PushStaticStmt(target, sources, ctx!!.start.line, target.getType()) + } + + + override fun visitWindow_statement(ctx: Window_statementContext?): ProgramElement { + val target = visit(ctx!!.target) as Location + if(ctx.declType != null) { + val decl = getClassDecl(ctx) + val className = if(decl == null) ERRORTYPE.name else decl!!.className.text + val targetType = TypeChecker.translateType(ctx.declType, className, mutableMapOf()) + target.setType(targetType) + } + val monitor = visit(ctx!!.monitor) as Expression + return WindowStmt(target, monitor, ctx!!.start.line, declares=target.getType()) + } + override fun visitTick_statement(ctx: Tick_statementContext?): ProgramElement { return TickStmt(visit(ctx!!.fmu) as Expression, visit(ctx!!.time) as Expression ) } @@ -418,7 +476,8 @@ class Translate : WhileBaseVisitor() { val def = getClassDecl(ctx as RuleContext) val declares = if(ctx!!.declType == null) null else TypeChecker.translateType(ctx.declType, if(def != null) def!!.className.text else ERRORTYPE.name, mutableMapOf()) - return AssignStmt(visit(ctx!!.expression(0)) as Location, visit(ctx.expression(1)) as Expression, ctx!!.start.line, declares) + val isClock = if(ctx.clock == null) false else true + return AssignStmt(visit(ctx!!.expression(0)) as Location, visit(ctx.expression(1)) as Expression, isClock, ctx!!.start.line, declares) } override fun visitSkip_statment(ctx: Skip_statmentContext?): ProgramElement { diff --git a/src/main/kotlin/no/uio/microobject/ast/stmt/AssignStmt.kt b/src/main/kotlin/no/uio/microobject/ast/stmt/AssignStmt.kt index fb2cef62..955c6851 100644 --- a/src/main/kotlin/no/uio/microobject/ast/stmt/AssignStmt.kt +++ b/src/main/kotlin/no/uio/microobject/ast/stmt/AssignStmt.kt @@ -10,9 +10,10 @@ import no.uio.microobject.runtime.Memory import no.uio.microobject.runtime.StackEntry import no.uio.microobject.type.BaseType import no.uio.microobject.type.Type +import no.uio.microobject.type.INTTYPE // Assignment, where value cannot refer to calls or object creations. -data class AssignStmt(val target : Location, val value : Expression, val pos : Int = -1, val declares: Type?) : +data class AssignStmt(val target : Location, val value : Expression, val isClock: Boolean = false, val pos : Int = -1, val declares: Type?) : Statement { override fun toString(): String = "$target := $value" override fun getRDF(): String { @@ -28,7 +29,14 @@ data class AssignStmt(val target : Location, val value : Expression, val pos : I override fun eval(heapObj: Memory, stackFrame : StackEntry, interpreter: Interpreter) : EvalResult { val res = interpreter.eval(value, stackFrame) when (target) { - is LocalVar -> stackFrame.store[target.name] = res + is LocalVar -> { + if (isClock && interpreter.streamManager.clockVar == null) { + interpreter.streamManager.clockVar = target.name + } + if (interpreter.streamManager.clockVar != null && target.name.equals(interpreter.streamManager.clockVar) + && res.tag == INTTYPE) interpreter.streamManager.updateClock(res.literal.toLong()) + stackFrame.store[target.name] = res + } is OwnVar -> { val got = interpreter.staticInfo.fieldTable[(stackFrame.obj.tag as BaseType).name] ?: throw Exception("Cannot find class ${stackFrame.obj.tag.name}") if (!got.map {it.name} .contains(target.name)) diff --git a/src/main/kotlin/no/uio/microobject/ast/stmt/CallStmt.kt b/src/main/kotlin/no/uio/microobject/ast/stmt/CallStmt.kt index 8e27012b..efd55c04 100644 --- a/src/main/kotlin/no/uio/microobject/ast/stmt/CallStmt.kt +++ b/src/main/kotlin/no/uio/microobject/ast/stmt/CallStmt.kt @@ -39,8 +39,14 @@ data class CallStmt(val target : Location, val callee : Location, val method : S for (i in m.params.indices) { newMemory[m.params[i]] = interpreter.eval(params[i], stackFrame) } + + var emitFromMethod: String? = null + if (interpreter.staticInfo.streamersTable.containsKey(newObj.tag.name) && + interpreter.staticInfo.streamersTable[newObj.tag.name]!!.containsKey(method)) + emitFromMethod = method + return EvalResult( - StackEntry(StoreReturnStmt(target), stackFrame.store, stackFrame.obj, stackFrame.id), + StackEntry(StoreReturnStmt(target, emitFromMethod=emitFromMethod), stackFrame.store, stackFrame.obj, stackFrame.id), listOf(StackEntry(m.stmt, newMemory, newObj, Names.getStackId())) ) } diff --git a/src/main/kotlin/no/uio/microobject/ast/stmt/ConstructStmt.kt b/src/main/kotlin/no/uio/microobject/ast/stmt/ConstructStmt.kt index 1cc8aba7..f569b081 100644 --- a/src/main/kotlin/no/uio/microobject/ast/stmt/ConstructStmt.kt +++ b/src/main/kotlin/no/uio/microobject/ast/stmt/ConstructStmt.kt @@ -109,6 +109,9 @@ data class ConstructStmt(val target : Location, val query: Expression, val param newListMemory["next"] = list interpreter.heap[newListName] = newListMemory list = newListName + if (interpreter.staticInfo.streamersTable.containsKey(className)) { + interpreter.streamManager.registerStream(className, newObjName) + } } } return replaceStmt(AssignStmt(target, list, declares = declares), stackFrame) diff --git a/src/main/kotlin/no/uio/microobject/ast/stmt/CreateStmt.kt b/src/main/kotlin/no/uio/microobject/ast/stmt/CreateStmt.kt index 35f90cc7..c80d5e51 100644 --- a/src/main/kotlin/no/uio/microobject/ast/stmt/CreateStmt.kt +++ b/src/main/kotlin/no/uio/microobject/ast/stmt/CreateStmt.kt @@ -50,6 +50,9 @@ data class CreateStmt(val target : Location, val className: String, val params : interpreter.heap[name] = newMemory val localFrame = StackEntry(SkipStmt(), mutableMapOf(Pair("this", name)),name,0) n.filter { it.internalInit != null }.forEach { newMemory[it.name] = interpreter.eval(it.internalInit!!, localFrame) } + if (interpreter.staticInfo.streamersTable.containsKey(className)) { + interpreter.streamManager.registerStream(className, name) + } return replaceStmt(AssignStmt(target, name, declares = declares), stackFrame) } } \ No newline at end of file diff --git a/src/main/kotlin/no/uio/microobject/ast/stmt/MonitorStmt.kt b/src/main/kotlin/no/uio/microobject/ast/stmt/MonitorStmt.kt new file mode 100644 index 00000000..d82b12a8 --- /dev/null +++ b/src/main/kotlin/no/uio/microobject/ast/stmt/MonitorStmt.kt @@ -0,0 +1,113 @@ +package no.uio.microobject.ast.stmt + +import com.sksamuel.hoplite.ConfigLoader +import no.uio.microobject.ast.* +import no.uio.microobject.ast.expr.FALSEEXPR +import no.uio.microobject.ast.expr.LiteralExpr +import no.uio.microobject.ast.expr.TRUEEXPR +import no.uio.microobject.runtime.* +import no.uio.microobject.type.* +import org.apache.jena.datatypes.xsd.XSDDatatype +import java.io.File + +data class MonitorStmt(val target : Location, val query: Expression, val params : List, val pos : Int = -1, val declares: Type?) : + Statement { + override fun toString(): String = "$target := monitor($query, ${params.joinToString(",")})" + override fun getRDF(): String { + var s = """ + prog:stmt${this.hashCode()} rdf:type smol:MonitorStatement. + prog:stmt${this.hashCode()} smol:hasTarget prog:loc${target.hashCode()}. + prog:stmt${this.hashCode()} smol:hasQuery prog:expr${query.hashCode()}. + prog:stmt${this.hashCode()} smol:Line '$pos'^^xsd:integer. + + """.trimIndent() + for (i in params.indices){ + s += "prog:stmt${this.hashCode()} smol:hasParameter [smol:hasParameterIndex $i ; smol:hasParameterValue prog:expr${params[i].hashCode()}; ].\n" + s += params[i].getRDF() + } + // return s + target.getRDF() + return s + target.getRDF() + query.getRDF() + // '${literal.removePrefix("\"").removeSuffix("\"")}' + } + + + override fun eval(heapObj: Memory, stackFrame: StackEntry, interpreter: Interpreter): EvalResult { + val name = Names.getObjName("Monitor") + if (declares is ComposedType && declares.getPrimary().getNameString().equals("Monitor")) { + // only consider the first type for now. e.g., Monitor + interpreter.streamManager.registerQuery(name, query, params, stackFrame.store, interpreter.heap, stackFrame.obj, declaredType=declares.params[0]) + } else { + throw Exception("Monitor statement can only be assigned to type Monitor") + } + return replaceStmt(AssignStmt(target, name, declares = declares), stackFrame) + } + +} + +class MonitorObject(private val name: LiteralExpr, private val declaredType: Type) { + + private fun iriToLiteral(iri: String, interpreter: Interpreter): LiteralExpr { + + if (iri.endsWith("^^http://www.w3.org/2001/XMLSchema#integer") || + iri.endsWith("^^http://www.w3.org/2001/XMLSchema#long") || + iri.endsWith("^^http://www.w3.org/2001/XMLSchema#int") + ) + return LiteralExpr(iri.split("^^")[0].removeSurrounding("\""), INTTYPE) + if (iri.endsWith("^^http://www.w3.org/2001/XMLSchema#boolean")) return LiteralExpr(iri.split("^^")[0], BOOLEANTYPE) + if (iri.endsWith("^^http://www.w3.org/2001/XMLSchema#double") || + iri.endsWith("^^http://www.w3.org/2001/XMLSchema#float") || + iri.endsWith("^^http://www.w3.org/2001/XMLSchema#decimal")) { + val raw = iri.split("^^")[0] + val inner = raw.removeSurrounding("\"") + // try to remove scientific notation + try { + val normalized = inner.toDouble() + val normalizedStr = "${normalized}" + return LiteralExpr(normalizedStr, DOUBLETYPE) + } catch (e: NumberFormatException) { + throw Exception("Invalid xsd:double literal: $raw") + } + } + if (iri.endsWith("^^http://www.w3.org/2001/XMLSchema#string")) return LiteralExpr(iri.split("^^")[0], STRINGTYPE) + val literal = iri.removePrefix(interpreter.settings.runPrefix) + for (obj in interpreter.heap.keys + interpreter.simMemory.keys) + if (obj.literal.equals(literal)) return obj + return LiteralExpr("ERROR") + } + + fun getWindowResults(interpreter: Interpreter): LiteralExpr { + val rdfTables = interpreter.streamManager.getQueryResults(name) + + var list = LiteralExpr("null") + + if (rdfTables != null) { + for (rdfTable in rdfTables) { + + val resIt = rdfTable.rows().iterator() + val firstVar = rdfTable.varNames.get(0) + + while (resIt.hasNext()) { + val rdfTuple = resIt.next() + try { + // only consider first result for now + var literal = iriToLiteral(rdfTuple.get(firstVar).toString(), interpreter) + if (literal.tag != declaredType) + throw Exception("Monitor parameter has incorrect type (expected ${declaredType}, got ${literal.tag})") + + val name = Names.getObjName("List") + val newMemory: Memory = mutableMapOf() + newMemory["content"] = literal + newMemory["next"] = list + interpreter.heap[name] = newMemory + list = name + + } catch (e: Exception) { + throw Exception("Error while processing query result: ${e.message}") + } + } + } + } + return list + } +} + diff --git a/src/main/kotlin/no/uio/microobject/ast/stmt/PushStaticStmt.kt b/src/main/kotlin/no/uio/microobject/ast/stmt/PushStaticStmt.kt new file mode 100644 index 00000000..f22040ec --- /dev/null +++ b/src/main/kotlin/no/uio/microobject/ast/stmt/PushStaticStmt.kt @@ -0,0 +1,78 @@ +package no.uio.microobject.ast.stmt + +import com.sksamuel.hoplite.ConfigLoader +import no.uio.microobject.ast.* +import no.uio.microobject.ast.expr.FALSEEXPR +import no.uio.microobject.ast.expr.LiteralExpr +import no.uio.microobject.ast.expr.TRUEEXPR +import no.uio.microobject.runtime.* +import no.uio.microobject.type.* +import no.uio.microobject.data.TripleSettings +import no.uio.microobject.main.ReasonerMode +import org.apache.jena.datatypes.xsd.XSDDatatype +import org.apache.jena.rdf.model.Model +import java.io.File +import java.io.FileWriter + +data class PushStaticStmt(val target : Location, val sources: Expression, val pos : Int = -1, val declares: Type?) : Statement { + override fun toString(): String = "$target:=pushStatic(sources=$sources)" + override fun getRDF(): String { + var s = "prog:stmt${this.hashCode()} rdf:type smol:PushStaticStmt.".trimIndent() + return s + } + + + override fun eval(heapObj: Memory, stackFrame: StackEntry, interpreter: Interpreter): EvalResult { + val namedIri = interpreter.streamManager.getStaticNamedIri() + + // parse sources from comma-separated string sources + val sourcesLit = interpreter.eval(sources, stackFrame) + if (sourcesLit.tag != STRINGTYPE) { + throw Exception("The sources parameter in pushStatic statement must be a string literal") + } + val sourcesStr = sourcesLit.literal.removeSurrounding("\"") + val sourcesList = sourcesStr.split(",") + val sourcesMap = hashMapOf( + "heap" to false, + "staticTable" to false, + "vocabularyFile" to false, + "externalOntology" to false, + "urlOntology" to false, + "fmos" to false + ) + var reasonerMode: ReasonerMode = ReasonerMode.off + for (s in sourcesList) { + val trimmed = s.trim() + if (sourcesMap.containsKey(trimmed)) { + sourcesMap[trimmed] = true + } else if (trimmed == "reasoner") { + reasonerMode = interpreter.settings.reasoner + } else { + throw Exception("Unknown source '$trimmed' in pushStatic statement, only comma-separated [heap, staticTable, vocabularyFile, externalOntology, urlOntology, fmos (or reasoner)] are allowed") + } + } + + // load settings from source map + val ts = TripleSettings( + sources = sourcesMap, + guards = hashMapOf("heap" to true, "staticTable" to true), + virtualization = hashMapOf("heap" to true, "staticTable" to true, "fmos" to true), + jenaReasoner = reasonerMode, + cachedModel = null + ) + + // write output to a file + val model = interpreter.tripleManager.getModel(ts) + val file = "output.ttl" + File(interpreter.settings.outdir).mkdirs() + File("${interpreter.settings.outdir}/${file}").createNewFile() + model.write(FileWriter("${interpreter.settings.outdir}/${file}"),"TTL") + val resultPath = "" + + // return the file path as a string literal + println("Saving static data to $resultPath") + val resultLit = LiteralExpr(resultPath, STRINGTYPE) + return replaceStmt(AssignStmt(target, resultLit, declares = declares), stackFrame) + } + +} diff --git a/src/main/kotlin/no/uio/microobject/ast/stmt/ReturnStmt.kt b/src/main/kotlin/no/uio/microobject/ast/stmt/ReturnStmt.kt index 883a8831..84ab6321 100644 --- a/src/main/kotlin/no/uio/microobject/ast/stmt/ReturnStmt.kt +++ b/src/main/kotlin/no/uio/microobject/ast/stmt/ReturnStmt.kt @@ -5,6 +5,7 @@ import no.uio.microobject.runtime.EvalResult import no.uio.microobject.runtime.Interpreter import no.uio.microobject.runtime.Memory import no.uio.microobject.runtime.StackEntry +import no.uio.microobject.type.BaseType // Return statement data class ReturnStmt(var value : Expression, val pos : Int = -1) : Statement { @@ -22,6 +23,7 @@ data class ReturnStmt(var value : Expression, val pos : Int = -1) : Statement { val over = interpreter.stack.pop() if (over.active is StoreReturnStmt) { val res = interpreter.eval(value, stackFrame) + checkTriggerStream(over.active, stackFrame, interpreter) return EvalResult( StackEntry( AssignStmt(over.active.target, res, declares = null), @@ -35,8 +37,17 @@ data class ReturnStmt(var value : Expression, val pos : Int = -1) : Statement { val active = over.active.first val next = over.active.second val res = interpreter.eval(value, stackFrame) + checkTriggerStream(over.active.first, stackFrame, interpreter) return replaceStmt(appendStmt(AssignStmt(active.target, res, declares = null), next), over) } throw Exception("Malformed heap") } + + // trigger stream at the end of emission method, right before returning the result + private fun checkTriggerStream(storeReturnStmt: StoreReturnStmt, stackFrame: StackEntry, interpreter: Interpreter) { + if (storeReturnStmt.emitFromMethod != null) { + val className = (stackFrame.obj.tag as BaseType).name + interpreter.streamManager.triggerStream(className, stackFrame.obj, storeReturnStmt.emitFromMethod, stackFrame) + } + } } \ No newline at end of file diff --git a/src/main/kotlin/no/uio/microobject/ast/stmt/StoreReturnStmt.kt b/src/main/kotlin/no/uio/microobject/ast/stmt/StoreReturnStmt.kt index f55840b5..8612b4ba 100644 --- a/src/main/kotlin/no/uio/microobject/ast/stmt/StoreReturnStmt.kt +++ b/src/main/kotlin/no/uio/microobject/ast/stmt/StoreReturnStmt.kt @@ -8,7 +8,7 @@ import no.uio.microobject.runtime.Memory import no.uio.microobject.runtime.StackEntry // This is a runtime-syntax only statement which models that we will write the return value of the next method in the stack into target -data class StoreReturnStmt(val target : Location, val pos : Int = -1) : Statement { +data class StoreReturnStmt(val target : Location, val pos : Int = -1, val emitFromMethod: String? = null) : Statement { override fun toString(): String = "$target <- stack" override fun getRDF(): String { return """ diff --git a/src/main/kotlin/no/uio/microobject/ast/stmt/WindowStmt.kt b/src/main/kotlin/no/uio/microobject/ast/stmt/WindowStmt.kt new file mode 100644 index 00000000..f9a382b5 --- /dev/null +++ b/src/main/kotlin/no/uio/microobject/ast/stmt/WindowStmt.kt @@ -0,0 +1,27 @@ +package no.uio.microobject.ast.stmt + +import no.uio.microobject.ast.Expression +import no.uio.microobject.ast.Statement +import no.uio.microobject.ast.Location +import no.uio.microobject.runtime.EvalResult +import no.uio.microobject.runtime.Interpreter +import no.uio.microobject.runtime.Memory +import no.uio.microobject.runtime.StackEntry +import no.uio.microobject.type.Type + +data class WindowStmt(val target: Location, val monitor: Expression, val pos : Int = -1, val declares: Type?) : Statement { + override fun toString(): String = "$target := window($monitor)" + override fun getRDF(): String { + //TODO: extend ontology + return "" + } + + override fun eval(heapObj: Memory, stackFrame: StackEntry, interpreter: Interpreter): EvalResult { + val lit = interpreter.eval(monitor, stackFrame) + val monitorObj = interpreter.streamManager.getMonitor(lit) + if(monitorObj == null) + throw Exception("Object $monitorObj is not a monitor object") + val list = monitorObj.getWindowResults(interpreter) + return replaceStmt(AssignStmt(target, list, pos=pos, declares=declares), stackFrame) + } +} \ No newline at end of file diff --git a/src/main/kotlin/no/uio/microobject/data/StreamManager.kt b/src/main/kotlin/no/uio/microobject/data/StreamManager.kt new file mode 100644 index 00000000..b082821e --- /dev/null +++ b/src/main/kotlin/no/uio/microobject/data/StreamManager.kt @@ -0,0 +1,274 @@ +package no.uio.microobject.data + +import no.uio.microobject.runtime.* +import no.uio.microobject.ast.expr.* +import no.uio.microobject.ast.Expression +import no.uio.microobject.ast.stmt.MonitorObject +import no.uio.microobject.main.Settings +import no.uio.microobject.type.* + +import java.nio.file.* +import java.util.Observer; +import java.util.Observable +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer +import java.io.File +import java.io.StringWriter +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.apache.log4j.BasicConfigurator +import org.apache.log4j.Level +import org.apache.log4j.Logger as Log4jLogger +import org.apache.log4j.PropertyConfigurator +import org.apache.jena.rdf.model.Model +import org.apache.jena.sparql.algebra.Table +import org.apache.jena.graph.Graph +import kotlin.text.toLong + +import org.streamreasoning.rsp4j.csparql2.engine.CSPARQLEngine +import org.streamreasoning.rsp4j.csparql2.engine.JenaContinuousQueryExecution +import org.streamreasoning.rsp4j.csparql2.stream.GraphStreamSchema +import org.streamreasoning.rsp4j.csparql2.sysout.ResponseFormatterFactory +import org.streamreasoning.rsp4j.csparql2.syntax.QueryFactory +import org.streamreasoning.rsp4j.csparql2.sysout.GenericResponseSysOutFormatter +import org.streamreasoning.rsp4j.api.engine.config.EngineConfiguration; +import org.streamreasoning.rsp4j.io.DataStreamImpl +import org.streamreasoning.rsp4j.api.stream.data.DataStream +import org.streamreasoning.rsp4j.api.sds.SDSConfiguration +import org.apache.jena.rdf.model.*; +import org.apache.jena.atlas.lib.tuple.Tuple + +// Per-monitor state +data class WindowBuf(var tick: Long? = null, val current: MutableList = mutableListOf()) + +// Class managing streams +class StreamManager(private val settings: Settings, val staticTable: StaticTable, private val interpreter: Interpreter?) { + + private var engine: CSPARQLEngine? = null + private var engineInitialized = false + private var sdsConfig: SDSConfiguration? = null + private var ec: EngineConfiguration? = null + val LOG : Logger? = LoggerFactory.getLogger(StreamManager::class.java) + + private var streamToClass: MutableMap = mutableMapOf() + private var streams: MutableMap> = mutableMapOf() + private var monitors: MutableMap = mutableMapOf() + + private val windowBufs = ConcurrentHashMap() + private val queryResults = ConcurrentHashMap>() // read-only snapshots + + var clockVar : String? = null + var clockTimestampSec : Long? = null + + // stream state + var lastTriggerTs: MutableMap = mutableMapOf() + var lastModels: MutableMap = mutableMapOf() + + var nStaticGraphsPushed = 0 + + init { + + // configure c-sparql logging + val propFile = File("src/main/resources/log4j_configuration/csparql_log4j.properties") + if (propFile.exists()) { + PropertyConfigurator.configure(propFile.absolutePath) + } else { + // only log errors if the logger config file is not present + BasicConfigurator.configure() + Log4jLogger.getRootLogger().level = Level.ERROR + System.setProperty(org.slf4j.impl.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "ERROR") + } + } + + private fun initEngineIfNeeded() { + if (!engineInitialized) { + val ts = getTimestamp() + + val configPath = "src/main/resources/csparql2.properties" + val defaultPath = "src/main/resources/default-csparql2.properties" + + if (File(configPath).exists()) { + ec = EngineConfiguration(configPath) + sdsConfig = SDSConfiguration(configPath) + println("Loaded csparql2.properties successfully at ts=$ts.") + } else { + println("Failed to load csparql2.properties, falling back to default-csparql2.properties at ts=$ts.") + ec = EngineConfiguration(defaultPath) + sdsConfig = SDSConfiguration(defaultPath) + } + + engine = CSPARQLEngine(ts, ec) + engineInitialized = true + } + } + + public fun getQueryResults(name: LiteralExpr): List
? { + return queryResults[name] + } + + public fun registerStream(className: String, obj: LiteralExpr) { + initEngineIfNeeded() + val streamIri = "${settings.runPrefix}${obj.toString()}" + + val stream = StreamObject(streamIri) + val reg = engine!!.register(stream) + stream.setWritable(reg) + + if (!streams.containsKey(className)) streams[className] = mutableMapOf() + streams[className]!![obj] = stream + streamToClass[obj] = className + } + + private fun getTimestamp(): Long { + if (clockTimestampSec != null) + return secToMs(clockTimestampSec!!) + return System.currentTimeMillis() + } + + private fun secToMs(s: Long): Long { + return s * 1000 + } + + fun updateClock(newTsSec: Long) { + if (clockTimestampSec != newTsSec) { + clockTimestampSec = newTsSec + + // push last models to streams if time has advanced + pushLastModels() + } + } + + private fun pushLastModels() { + for ((obj, model) in lastModels) { + if (model.isEmpty) continue + streams[streamToClass[obj]!!]!![obj]!!.putGraph(model.getGraph(), lastTriggerTs[obj]!!) + lastModels[obj] = ModelFactory.createDefaultModel() + } + } + + public fun triggerStream(className: String, obj: LiteralExpr, methodName: String, stackEntry: StackEntry) { + val stream = streams[className]!![obj]!! + val expressions = interpreter!!.staticInfo.streamersTable[className]!![methodName]!! + + val timestamp = getTimestamp() + if (!lastTriggerTs.containsKey(obj) || timestamp > lastTriggerTs[obj]!!) { + // put model only if time has advanced (guarantee one graph per timestamp) + if (lastTriggerTs.containsKey(obj)) + stream.putGraph(lastModels[obj]!!.getGraph(), lastTriggerTs[obj]!!) + lastModels[obj] = ModelFactory.createDefaultModel() + lastTriggerTs[obj] = timestamp + } + + for (expr in expressions) { + val res = interpreter.eval(expr, stackEntry) + + val exprParts = expr.toString().split('.') + var subjIri: String = "${settings.runPrefix}${obj.literal}" + var predIri = "${settings.progPrefix}${className}_${expr.toString().removePrefix("this.").replace('.', '_')}" + if (exprParts.size < 2) { + throw Exception("Streamer expression must be of the form this.field or this.obj.field") + } + if (exprParts.size > 2) { + // find the one before last part + var currentObj: LiteralExpr = obj + for (i in 1 until exprParts.size - 1) { + val fieldName = exprParts[i] + currentObj = interpreter.heap[currentObj]!![fieldName]!! + } + subjIri = "${settings.runPrefix}${currentObj.literal}" + predIri = "${settings.progPrefix}${(currentObj.tag as BaseType).name}_${exprParts.last()}" + } + + val stmt = lastModels[obj]!!.createStatement(lastModels[obj]!!.createResource(subjIri), lastModels[obj]!!.createProperty(predIri), literalToIri(lastModels[obj]!!, res, settings)) + lastModels[obj]!!.add(stmt) + // println("Added to stream $obj at $timestamp: $stmt") + } + + // if the clock variable is not used, push the model immediately + // assume system time (ms) has advanced or will advance before next trigger + if (clockVar == null) { + stream.putGraph(lastModels[obj]!!.getGraph(), lastTriggerTs[obj]!!) + // println("Pushed immediately to stream $obj at ${lastTriggerTs[obj]}: ${lastModels[obj]!!.size()} triples (now: ${getTimestamp()})") + lastModels[obj] = ModelFactory.createDefaultModel() + } + } + + private fun literalToIri(m: Model, lit: LiteralExpr, settings: Settings): RDFNode = + when (lit.tag) { + INTTYPE -> m.createTypedLiteral(lit.literal.toInt()) + DOUBLETYPE -> m.createTypedLiteral(lit.literal.toDouble()) + BOOLEANTYPE-> m.createTypedLiteral(lit.literal.toBoolean()) + STRINGTYPE -> m.createLiteral(lit.literal) + else -> m.createResource("${settings.runPrefix}${lit.literal}") + } + + public fun getMonitor(name: LiteralExpr): MonitorObject? { + return monitors[name] + } + + public fun registerQuery(name: LiteralExpr, queryExpr : Expression, params: List, stackMemory: Memory, heap: GlobalMemory, obj: LiteralExpr, SPARQL : Boolean = true, declaredType: Type): JenaContinuousQueryExecution { + initEngineIfNeeded() + val queryStr = prepareQuery(name, queryExpr, params, stackMemory, heap, obj, SPARQL) + if (settings.verbose) println("Registering query:\n$queryStr") + + val cqe = engine!!.register(queryStr, sdsConfig) as JenaContinuousQueryExecution + monitors[name] = MonitorObject(name, declaredType) + + windowBufs.computeIfAbsent(name) { WindowBuf() } + + val outputStream = cqe.outstream() + outputStream?.addConsumer { arg, ts -> + val table = arg as Table + val b = windowBufs.getOrPut(name) { WindowBuf() } + + if (b.tick != ts) { + b.tick = ts + b.current.clear() + } + b.current += table + + queryResults[name] = b.current.toList() + } + + return cqe + } + + private fun prepareQuery(name: LiteralExpr, queryExpr : Expression, params : List, + stackMemory: Memory, heap: GlobalMemory, obj: LiteralExpr, SPARQL : Boolean = true) : String{ + + var prefixes = "" + for ((key, value) in settings.prefixMap()) prefixes += "PREFIX $key: <$value>\n" + + val queryHeader = "REGISTER RSTREAM <${settings.runPrefix}${name.literal}/out> AS " + + val queryBody = interpreter!!.prepareQuery(queryExpr, params, stackMemory, heap, obj, SPARQL) + .removePrefix("\"").removeSuffix("\"") + + var queryWithPrefixes = prefixes + queryHeader + queryBody + queryWithPrefixes = queryWithPrefixes.replace("\\\"", "\"") + + return queryWithPrefixes + } + + public fun getStaticNamedIri(): String { + val s = "${settings.runPrefix}loadStatic${nStaticGraphsPushed}" + nStaticGraphsPushed += 1 + return s + } + +} + +class StreamObject(var iri: String) : DataStreamImpl(iri) { + + private var s: DataStream? = null + + fun setWritable(s: DataStream) { + this.s = s + } + + fun putGraph(m: Graph, t: Long) { + if (s == null) throw Exception("Stream $iri is not writable") + s!!.put(m, t) + } +} + \ No newline at end of file diff --git a/src/main/kotlin/no/uio/microobject/data/TripleManager.kt b/src/main/kotlin/no/uio/microobject/data/TripleManager.kt index 20e38a2c..d3c55f51 100644 --- a/src/main/kotlin/no/uio/microobject/data/TripleManager.kt +++ b/src/main/kotlin/no/uio/microobject/data/TripleManager.kt @@ -18,7 +18,7 @@ import org.apache.jena.graph.impl.GraphBase import org.apache.jena.graph.Node import org.apache.jena.graph.Node_URI import org.apache.jena.graph.NodeFactory -import org.apache.jena.graph.Triple +import org.apache.jena.graph.Triple as JenaTriple import org.apache.jena.graph.compose.MultiUnion import org.apache.jena.query.* import org.apache.jena.rdf.model.* @@ -212,30 +212,30 @@ class TripleManager(private val settings: Settings, val staticTable: StaticTable // A custom type of (nice)iterator which takes a list as input and iterates over them. // It iterates through all elements in the list from start to end. - private class TripleListIterator(private val tripleList: List): NiceIterator() { + private class TripleListIterator(private val tripleList: List): NiceIterator() { var listIndex: Int = 0 // index of next element override fun hasNext(): Boolean = listIndex < tripleList.size - override fun next(): Triple = tripleList[(listIndex++)] + override fun next(): JenaTriple = tripleList[(listIndex++)] } // Helper method to crate triple with URIs in all three positions - private fun uriTriple(s: String, p: String, o: String): Triple { - return Triple(NodeFactory.createURI(s), NodeFactory.createURI(p), NodeFactory.createURI(o)) + private fun uriTriple(s: String, p: String, o: String): JenaTriple { + return JenaTriple.create(NodeFactory.createURI(s), NodeFactory.createURI(p), NodeFactory.createURI(o)) } // Helper method to crate triple with URIs in two first positions and a literal in object position - private fun literalTriple(s: String, p: String, o: Any?, type: BaseType): Triple? { + private fun literalTriple(s: String, p: String, o: Any?, type: BaseType): JenaTriple? { if (o == null) return null - return Triple( + return JenaTriple.create( NodeFactory.createURI(s), NodeFactory.createURI(p), getLiteralNode(LiteralExpr(o.toString(), type), settings) ) } // If searchTriple matches candidateTriple, then candidateTriple will be added to matchList - private fun addIfMatch(candidateTriple: Triple?, searchTriple: Triple?, matchList: MutableList, pseudo: Boolean) { + private fun addIfMatch(candidateTriple: JenaTriple?, searchTriple: JenaTriple?, matchList: MutableList, pseudo: Boolean) { if (searchTriple == null) return if (candidateTriple == null) return // This is just a quick fix to resolve the problem with > and < in the uris. They appear for example when the stdlib.smol is used, since it has List. @@ -290,7 +290,7 @@ class TripleManager(private val settings: Settings, val staticTable: StaticTable } private inner class FMOGraph : GraphBase() { - override fun graphBaseFind(searchTriple: Triple): ExtendedIterator { + override fun graphBaseFind(searchTriple: JenaTriple): ExtendedIterator { if(interpreter == null) return TripleListIterator(mutableListOf()) @@ -298,7 +298,7 @@ class TripleManager(private val settings: Settings, val staticTable: StaticTable val smol = prefixMap["smol"] val run = prefixMap["run"] - val matchingTriples: MutableList = mutableListOf() + val matchingTriples: MutableList = mutableListOf() for( fmo in interpreter.simMemory ){ val name = fmo.key.literal @@ -345,7 +345,7 @@ class TripleManager(private val settings: Settings, val staticTable: StaticTable // Returns an iterator of all triples in the static table that matches searchTriple // graphBaseFind only constructs the triples that match searchTriple. - public override fun graphBaseFind(searchTriple: Triple): ExtendedIterator { + public override fun graphBaseFind(searchTriple: JenaTriple): ExtendedIterator { val useGuardClauses = tripleSettings.guards.getOrDefault("staticTable", true) val fieldTable: Map = staticTable.fieldTable val methodTable: Map> = staticTable.methodTable @@ -386,7 +386,7 @@ class TripleManager(private val settings: Settings, val staticTable: StaticTable } - val matchingTriples: MutableList = mutableListOf() + val matchingTriples: MutableList = mutableListOf() // Generate triples for fields (and classes) for(classObj in fieldTable){ @@ -545,7 +545,7 @@ class TripleManager(private val settings: Settings, val staticTable: StaticTable // Returns an iterator of all triples in the heap that matches searchTriple // graphBaseFind only constructs/fetches the triples that match searchTriple. - public override fun graphBaseFind(searchTriple: Triple): ExtendedIterator { + public override fun graphBaseFind(searchTriple: JenaTriple): ExtendedIterator { val useGuardClauses = false //tripleSettings.guards.getOrDefault("heap", true) val settings: Settings = interpreter.settings val heap: GlobalMemory = interpreter.heap @@ -568,7 +568,7 @@ class TripleManager(private val settings: Settings, val staticTable: StaticTable } } - val matchingTriples: MutableList = mutableListOf() + val matchingTriples: MutableList = mutableListOf() for(obj in heap.keys){ if(staticTable.hiddenSet.contains(obj.tag.getPrimary().getNameString())) continue; @@ -605,7 +605,7 @@ class TripleManager(private val settings: Settings, val staticTable: StaticTable retVal = interpreter.evalCall(obj.literal, obj.tag.name, m.key) val resNode = getLiteralNode(retVal.second, settings) val resTriple = - Triple( + JenaTriple.create( NodeFactory.createURI(settings.replaceKnownPrefixesNoColon("run:${obj.literal}")), NodeFactory.createURI(predicateString), resNode @@ -631,7 +631,7 @@ class TripleManager(private val settings: Settings, val staticTable: StaticTable if(retVal == null) retVal = interpreter.evalCall(obj.literal, obj.tag.name, m.key) val resNode = getLiteralNode(retVal.second, settings) val resTriple = - Triple( + JenaTriple.create( NodeFactory.createURI(settings.replaceKnownPrefixesNoColon(models)), NodeFactory.createURI(predicateString), resNode @@ -720,7 +720,7 @@ class TripleManager(private val settings: Settings, val staticTable: StaticTable } } - val candidateTriple = Triple( + val candidateTriple = JenaTriple.create( NodeFactory.createURI(settings.replaceKnownPrefixesNoColon(target)), NodeFactory.createURI(predicateString), getLiteralNode(value, settings) @@ -738,7 +738,7 @@ class TripleManager(private val settings: Settings, val staticTable: StaticTable } val target: LiteralExpr = heap[obj]!!.getOrDefault(store, LiteralExpr("ERROR")) - val candidateTriple = Triple( + val candidateTriple = JenaTriple.create( NodeFactory.createURI(subjectString), NodeFactory.createURI(predicateString), getLiteralNode(target, settings) diff --git a/src/main/kotlin/no/uio/microobject/runtime/Interpreter.kt b/src/main/kotlin/no/uio/microobject/runtime/Interpreter.kt index ae1fb91b..4297b78a 100644 --- a/src/main/kotlin/no/uio/microobject/runtime/Interpreter.kt +++ b/src/main/kotlin/no/uio/microobject/runtime/Interpreter.kt @@ -14,7 +14,9 @@ import kotlinx.coroutines.runBlocking import no.uio.microobject.ast.* import no.uio.microobject.ast.expr.LiteralExpr import no.uio.microobject.ast.stmt.ReturnStmt +import no.uio.microobject.ast.stmt.MonitorObject import no.uio.microobject.data.TripleManager +import no.uio.microobject.data.StreamManager import no.uio.microobject.main.Settings import no.uio.microobject.type.* import org.apache.jena.query.QueryExecution @@ -68,6 +70,7 @@ class Interpreter( // TripleManager used to provide virtual triples etc. val tripleManager : TripleManager = TripleManager(settings, staticInfo, this) + val streamManager : StreamManager = StreamManager(settings, staticInfo, this) /** * Evaluates a call on a method of a class diff --git a/src/main/kotlin/no/uio/microobject/runtime/State.kt b/src/main/kotlin/no/uio/microobject/runtime/State.kt index d7958935..cd510cd5 100644 --- a/src/main/kotlin/no/uio/microobject/runtime/State.kt +++ b/src/main/kotlin/no/uio/microobject/runtime/State.kt @@ -3,6 +3,7 @@ package no.uio.microobject.runtime import no.uio.microobject.ast.Expression import no.uio.microobject.ast.expr.LiteralExpr import no.uio.microobject.ast.Statement +import no.uio.microobject.ast.stmt.MonitorObject import no.uio.microobject.type.Type import org.apache.jena.query.ResultSet @@ -29,8 +30,8 @@ data class StaticTable( val hiddenSet: Set,//This set of classes is skipped by the lifting val owldescr: MutableMap, // This maps class names to the default models block val checkClassifiesTable: MutableMap>> = mutableMapOf(), // Queries for classification - val contextTable: MutableMap - + val contextTable: MutableMap, + val streamersTable: MutableMap>> ) { override fun toString(): String = """ diff --git a/src/main/kotlin/no/uio/microobject/type/TypeChecker.kt b/src/main/kotlin/no/uio/microobject/type/TypeChecker.kt index 598a8b66..2b957e24 100644 --- a/src/main/kotlin/no/uio/microobject/type/TypeChecker.kt +++ b/src/main/kotlin/no/uio/microobject/type/TypeChecker.kt @@ -1071,6 +1071,61 @@ class TypeChecker(private val ctx: WhileParser.ProgramContext, private val setti if(tickType != DOUBLETYPE) log("Tick statement expects a Double as second parameter, but got $tickType }.",ctx) } + is WhileParser.Monitor_statementContext -> { + // todo check monitor type + var expType: Type? = null + if (ctx.declType != null) { + val lhs = ctx.expression(0) + if (lhs !is WhileParser.Var_expressionContext) { + log("Variable declaration must declare a variable.", ctx) + } else { + val name = lhs.NAME().text + if (vars.keys.contains(name)) log("Variable $name declared twice.", ctx) + else { + expType = translateType(ctx.type(), className, generics) + vars[name] = expType + } + } + } + } + is WhileParser.Window_statementContext -> { + var expType: Type? = null + if (ctx.declType != null) { + val lhs = ctx.expression(0) + if (lhs !is WhileParser.Var_expressionContext) { + log("Variable declaration must declare a variable.", ctx) + } else { + val name = lhs.NAME().text + if (vars.keys.contains(name)) log("Variable $name declared twice.", ctx) + else { + expType = translateType(ctx.type(), className, generics) + vars[name] = expType + } + } + } + if(ctx.target != null && ctx.target !is WhileParser.Var_expressionContext && inRule) + log("Non-local access in rule method.", ctx) + } + is WhileParser.PushStatic_statementContext -> { + val innerType = getType(ctx.sources, inner, vars, thisType, inRule) + if(innerType != ERRORTYPE && innerType != STRINGTYPE) + log("PushStatic expects a string as its parameter", ctx) + if (ctx.declType != null) { + val lhs = ctx.target + if (lhs !is WhileParser.Var_expressionContext) { + log("Variable declaration must declare a variable.", ctx) + } else { + val name = lhs.NAME().text + if (vars.keys.contains(name)) log("Variable $name declared twice.", ctx) + else { + val expType = translateType(ctx.type(), className, generics) + vars[name] = expType + } + } + if (translateType(ctx.declType, className, generics) != STRINGTYPE) + log("PushStatic expects a String variable to assign to", ctx) + } + } else -> { log("Statements with class ${ctx.javaClass} cannot be type checked",ctx) } diff --git a/src/main/resources/StdLib.smol b/src/main/resources/StdLib.smol index 39037aba..7720c9fb 100644 --- a/src/main/resources/StdLib.smol +++ b/src/main/resources/StdLib.smol @@ -127,4 +127,6 @@ class ExplList(LISTTT content, ExplList next) end end -class ExplPair(PAIRTONET first, PAIRTTWOT second) end \ No newline at end of file +class ExplPair(PAIRTONET first, PAIRTTWOT second) end + +hidden class Monitor() end diff --git a/src/main/resources/default-csparql2.properties b/src/main/resources/default-csparql2.properties new file mode 100644 index 00000000..e3d86f29 --- /dev/null +++ b/src/main/resources/default-csparql2.properties @@ -0,0 +1,18 @@ +rsp_engine.time=EventTime +rsp_engine.stream.item.class=org.streamreasoning.rsp4j.csparql2.stream.GraphStreamSchema + +rsp_engine.response_format=JSON-LD + +rsp_engine.on_window_close=false +rsp_engine.non_empty_content=true +rsp_engine.periodic=false +rsp_engine.on_content_change=false +rsp_engine.tick=TIME_DRIVEN + +rsp_engine.report_grain=SINGLE +rsp_engine.sds.mantainance=NAIVE +rsp_engine.partialwindow=true + +# # to enable reasoning, uncomment the following two lines (entailment options: RDFS, OWL) +# jasper.entailment=RDFS +# rsp_engine.tbox_location=src/test/resources/csparql/example-tbox-rdfxml.owl diff --git a/src/test/kotlin/no/uio/microobject/test/triples/OWLQueryTest.kt b/src/test/kotlin/no/uio/microobject/test/triples/OWLQueryTest.kt index 526bf80d..95169fb1 100644 --- a/src/test/kotlin/no/uio/microobject/test/triples/OWLQueryTest.kt +++ b/src/test/kotlin/no/uio/microobject/test/triples/OWLQueryTest.kt @@ -17,7 +17,7 @@ class OWLQueryTest: MicroObjectTest() { "OWL query 1" { val s = interpreter.owlQuery(q1) - assertEquals(s.count(), 8) + assertEquals(s.count(), 9) } "OWL query 2" { val s = interpreter.owlQuery(q2) diff --git a/src/test/resources/csparql/01_no_aggregation.smol b/src/test/resources/csparql/01_no_aggregation.smol new file mode 100644 index 00000000..e83b0b5f --- /dev/null +++ b/src/test/resources/csparql/01_no_aggregation.smol @@ -0,0 +1,74 @@ +// reada src/test/resources/csparql/01_no_aggregation.smol + +class Observation(Int x, Int ts) end + +streamer class C(Int x, Observation obs, Monitor m) + + Unit register() + Monitor m = monitor(" + SELECT ?x + FROM NAMED WINDOW ON %1 [RANGE PT5.001s STEP PT2s] + WHERE { + WINDOW ?w { ?s prog:Observation_x ?x } + } + ", this); + this.m = m; + end + + Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts) + this.x = this.x + 1; + this.obs = new Observation(this.x, ts); + end + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ intToString(l.content) ++ " "; + l = l.next; + end + return s; + end +end + +main + // configure simulation + clock Int i = 100; + Int endAt = 200; + + // initialize stream + C o = new C(0, null, null); + + // initialize monitor + o.register(); + + // run simulation + while i < endAt do + + // put triples in the stream + o.doStream(i); + + // read window contents + String res = o.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + + // advance clock (timestamp) + i = i + 1; + end +end + +// Output: +// >>100: +// >>101: +// >>102: 1 2 +// >>103: 1 2 +// >>104: 1 2 3 4 +// >>105: 1 2 3 4 +// >>106: 2 3 4 5 6 +// >>107: 2 3 4 5 6 +// >>108: 4 5 6 7 8 +// .. + +// Notes: +// - Step 2 seconds = new window every 2 seconds +// - Range 5.001s: Windows have {2, 4, 5, 5, 5, ...} elements diff --git a/src/test/resources/csparql/03_sum.smol b/src/test/resources/csparql/03_sum.smol new file mode 100644 index 00000000..235d1e1b --- /dev/null +++ b/src/test/resources/csparql/03_sum.smol @@ -0,0 +1,75 @@ +// reada src/test/resources/csparql/03_sum.smol + +class Observation(Int x, Int ts) end + +streamer class C(Int x, Observation obs, Monitor m) + + Unit register() + Monitor m = monitor(" + SELECT (SUM(?x) AS ?sumX) + FROM NAMED WINDOW ON %1 [RANGE PT5.001s STEP PT2s] + WHERE { + WINDOW ?w { ?s prog:Observation_x ?x } + } + ", this); + this.m = m; + end + + Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts) + this.x = this.x + 1; + this.obs = new Observation(this.x, ts); + end + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ intToString(l.content) ++ " "; + l = l.next; + end + return s; + end +end + +main + // configure simulation + clock Int i = 100; + Int endAt = 200; + + // initialize stream + C o = new C(0, null, null); + + // initialize monitor + o.register(); + + // run simulation + while i < endAt do + + // put triples in the stream + o.doStream(i); + + // read window contents + String res = o.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + + // advance clock (timestamp) + i = i + 1; + end +end + +// Output: +// >>100: +// >>101: +// >>102: 3 = 1+2 +// >>103: 3 +// >>104: 10 = 1+2+3+4 +// >>105: 10 +// >>106: 20 = 2+3+4+5+6 +// >>107: 20 +// >>108: 30 = 4+5+6+7+8 +// >>109: 30 +// >>110: 40 = 6+7+8+9+10 +// .. + +// Notes: +// - SUM aggregation works as expected diff --git a/src/test/resources/csparql/05_count.smol b/src/test/resources/csparql/05_count.smol new file mode 100644 index 00000000..af389d69 --- /dev/null +++ b/src/test/resources/csparql/05_count.smol @@ -0,0 +1,73 @@ +// reada src/test/resources/csparql/05_count.smol + +class Observation(Int x, Int ts) end + +streamer class C(Int x, Observation obs, Monitor m) + + Unit register() + Monitor m = monitor(" + SELECT (COUNT(?x) AS ?countX) + FROM NAMED WINDOW ON %1 [RANGE PT5.001s STEP PT2s] + WHERE { + WINDOW ?w { ?s prog:Observation_x ?x } + } + ", this); + this.m = m; + end + + Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts) + this.x = this.x + 1; + this.obs = new Observation(this.x, ts); + end + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ intToString(l.content) ++ " "; + l = l.next; + end + return s; + end +end + +main + // configure simulation + clock Int i = 100; + Int endAt = 200; + + // initialize stream + C o = new C(0, null, null); + + // initialize monitor + o.register(); + + // run simulation + while i < endAt do + + // put triples in the stream + o.doStream(i); + + // read window contents + String res = o.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + + // advance clock (timestamp) + i = i + 1; + end +end + +// Output: +// >>100: +// >>101: +// >>102: 2 +// >>103: 2 +// >>104: 4 +// >>105: 4 +// >>106: 5 +// >>107: 5 +// >>108: 5 +// .. + +// Notes: +// - SUM aggregation works as expected diff --git a/src/test/resources/csparql/06_average.smol b/src/test/resources/csparql/06_average.smol new file mode 100644 index 00000000..fa4e12cb --- /dev/null +++ b/src/test/resources/csparql/06_average.smol @@ -0,0 +1,76 @@ +// reada src/test/resources/csparql/06_average.smol + +class Observation(Int x, Int ts) end + +streamer class C(Int x, Observation obs, Monitor m) + + Unit register() + Monitor m = monitor(" + SELECT (AVG(?x) AS ?avgX) + FROM NAMED WINDOW ON %1 [RANGE PT5.001s STEP PT2s] + WHERE { + WINDOW ?w { ?s prog:Observation_x ?x } + } + ", this); + this.m = m; + end + + Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts) + this.x = this.x + 1; + this.obs = new Observation(this.x, ts); + end + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ doubleToString(l.content) ++ " "; + l = l.next; + end + return s; + end +end + +main + // configure simulation + clock Int i = 100; + Int endAt = 200; + + // initialize stream + C o = new C(0, null, null); + + // initialize monitor + o.register(); + + // run simulation + while i < endAt do + + // put triples in the stream + o.doStream(i); + + // read window contents + String res = o.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + + // advance clock (timestamp) + i = i + 1; + end +end + +// Output: +// >>100: +// >>101: +// >>102: 1.5 = (1+2) / 2 +// >>103: 1.5 +// >>104: 2.5 = (1+2+3+4) / 4 +// >>105: 2.5 +// >>106: 4.0 = (2+3+4+5+6) / 5 +// >>107: 4.0 +// >>108: 6.0 = (4+5+6+7+8) / 5 +// >>109: 6.0 +// >>110: 8.0 = (6+7+8+9+10) / 5 +// .. + +// Notes: +// - AVG aggregation works as expected +// - Average returns double values (not integer) diff --git a/src/test/resources/csparql/07_stream_object.smol b/src/test/resources/csparql/07_stream_object.smol new file mode 100644 index 00000000..8af3893a --- /dev/null +++ b/src/test/resources/csparql/07_stream_object.smol @@ -0,0 +1,74 @@ +// reada src/test/resources/csparql/07_stream_object.smol + +class Observation(Int x, Int ts) end + +streamer class C(Int x, Observation obs, Monitor m) + + Unit register() + Monitor m = monitor(" + SELECT ?s + FROM NAMED WINDOW ON %1 [RANGE PT5.001s STEP PT2s] + WHERE { + WINDOW ?w { ?s prog:Observation_x ?x } + } + ", this); + this.m = m; + end + + Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts) + this.x = this.x + 1; + this.obs = new Observation(this.x, ts); + end + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ l.content ++ " "; + l = l.next; + end + return s; + end +end + +main + // configure simulation + clock Int i = 100; + Int endAt = 200; + + // initialize stream + C o = new C(0, null, null); + + // initialize monitor + o.register(); + + // run simulation + while i < endAt do + + // put triples in the stream + o.doStream(i); + + // read window contents + String res = o.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + + // advance clock (timestamp) + i = i + 1; + end +end + +// Output: +// >>100: +// >>101: +// >>102: obj639 obj640 +// >>103: obj639 obj640 +// >>104: obj639 obj640 obj641 obj644 +// >>105: obj639 obj640 obj641 obj644 +// >>106: obj640 obj641 obj644 obj647 obj652 +// >>107: obj640 obj641 obj644 obj647 obj652 +// >>108: obj644 obj647 obj652 obj657 obj663 +// >>109: obj644 obj647 obj652 obj657 obj663 +// .. + +// Notes: +// - Objects can be used as stream elements diff --git a/src/test/resources/csparql/08_use_window_results.smol b/src/test/resources/csparql/08_use_window_results.smol new file mode 100644 index 00000000..4f20f0e6 --- /dev/null +++ b/src/test/resources/csparql/08_use_window_results.smol @@ -0,0 +1,80 @@ +// reada src/test/resources/csparql/08_use_window_results.smol + +class Observation(Int x, Int ts) end + +streamer class C(Int x, Observation obs, Monitor m) + + Unit register() + Monitor m = monitor(" + SELECT (SUM(?x) AS ?sumX) + FROM NAMED WINDOW ON %1 [RANGE PT5.001s STEP PT2s] + WHERE { + WINDOW ?w { ?s prog:Observation_x ?x } + } + ", this); + this.m = m; + end + + Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts) + this.x = this.x + 1; + this.obs = new Observation(this.x, ts); + end + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ intToString(l.content) ++ " "; + if l.content / 20 * 20 == l.content then + s = s ++ "mult20 "; + end + l = l.next; + end + return s; + end +end + +main + // configure simulation + clock Int i = 100; + Int endAt = 200; + + // initialize stream + C o = new C(0, null, null); + + // initialize monitor + o.register(); + + // run simulation + while i < endAt do + + // put triples in the stream + o.doStream(i); + + // read window contents + String res = o.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + + // advance clock (timestamp) + i = i + 1; + end +end + +// Output: +// >>100: +// >>101: +// >>102: 3 +// >>103: 3 +// >>104: 10 +// >>105: 10 +// >>106: 20 mult20 +// >>107: 20 mult20 +// >>108: 30 +// >>109: 30 +// >>110: 40 mult20 +// >>111: 40 mult20 +// >>112: 50 +// .. + +// Notes: +// - Shows how to use the window results (e.g., check if the sum is multiple of 20) diff --git a/src/test/resources/csparql/10_multiple_streams.smol b/src/test/resources/csparql/10_multiple_streams.smol new file mode 100644 index 00000000..9814e923 --- /dev/null +++ b/src/test/resources/csparql/10_multiple_streams.smol @@ -0,0 +1,108 @@ +// reada src/test/resources/csparql/10_multiple_streams.smol + +class Observation(Int x, Int ts) end + +class Controller(List streamers, Monitor m) + + Unit register() + Monitor m = monitor(" + SELECT ?x + FROM NAMED WINDOW ON %1 [RANGE PT4.001s STEP PT2s] + FROM NAMED WINDOW ON %2 [RANGE PT4.001s STEP PT2s] + FROM NAMED WINDOW ON %3 [RANGE PT4.001s STEP PT2s] + WHERE { + { WINDOW { ?s prog:C_x ?x. } } + UNION + { WINDOW { ?s prog:C_x ?x. } } + UNION + { WINDOW { ?s prog:C_x ?x. } } + } + ", + this.streamers.content, + this.streamers.next.content, + this.streamers.next.next.content + ); + this.m = m; + end + + Unit doStream() + List objs = this.streamers; + while objs != null do + C o = objs.content; + o.doStream(); + objs = objs.next; + end + end + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ intToString(l.content) ++ " "; + l = l.next; + end + return s; + end + +end + +streamer class C(Int x) + + Unit doStream() emits(this.x) + this.x = this.x + 1; + end + +end + +main + // configure simulation + clock Int i = 100; + Int endAt = 200; + + // initialize stream + C o1 = new C(0); + C o2 = new C(10); + C o3 = new C(20); + List l3 = new List(o3, null); + List l2 = new List(o2, l3); + List streamers = new List(o1, l2); + + Controller ctrl = new Controller(streamers, null); + + // initialize monitor + ctrl.register(); + + // run simulation + while i < endAt do + + // put triples in the stream + ctrl.doStream(i); + + // read window contents + String res = ctrl.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + + // advance clock (timestamp) + i = i + 1; + end +end + +// Output: +// >>100: +// >>101: +// >>102: +// >>103: 1 11 21 2 12 22 : 1-2, 11-12, 21-22 +// >>104: 1 11 21 2 12 22 +// >>105: 1 21 14 4 3 23 11 22 24 12 13 2 : 2-5, 12-15, 22-25 +// >>106: 1 21 14 4 3 23 11 22 24 12 13 2 +// >>107: 25 4 14 3 23 16 26 24 6 5 13 15 : 3-6, 13-16, 23-26 +// >>108: 25 4 14 3 23 16 26 24 6 5 13 15 +// >>109: 25 27 16 26 28 6 17 5 8 7 18 15 : 5-8, 15-18, 25-28 +// >>110: 25 27 16 26 28 6 17 5 8 7 18 15 +// >>111: 30 20 27 28 10 9 17 29 8 19 7 18 : 7-10, 17-20, 27-30 +// >>112: 30 20 27 28 10 9 17 29 8 19 7 18 +// >>113: 30 20 32 21 10 9 22 29 12 11 19 31 : 9-12, 19-22, 29-32 +// .. + +// Notes: +// - Three streams with different starting points (0, 10, 20) diff --git a/src/test/resources/csparql/11_use_domain.smol b/src/test/resources/csparql/11_use_domain.smol new file mode 100644 index 00000000..cf99c73b --- /dev/null +++ b/src/test/resources/csparql/11_use_domain.smol @@ -0,0 +1,112 @@ +// Commands: +// java -jar build/libs/smol.jar -b src/test/resources/csparql/example.owl -p ex=https://example.com/ontology# +// reada src/test/resources/csparql/11_use_domain.smol + +class Observation(Int x, Int ts) end + +class Controller(List streamers, Monitor m) + + Unit register() + Monitor m = monitor(" + SELECT ?x + FROM NAMED WINDOW ON %1 [RANGE PT4.001s STEP PT2s] + FROM NAMED WINDOW ON %2 [RANGE PT4.001s STEP PT2s] + FROM NAMED WINDOW ON %3 [RANGE PT4.001s STEP PT2s] + WHERE { + { WINDOW { ?s prog:C_x ?x. } } + UNION + { WINDOW { ?s prog:C_x ?x. } } + UNION + { WINDOW { ?s prog:C_x ?x. } } + } + ", + this.streamers.content, + this.streamers.next.content, + this.streamers.next.next.content + ); + this.m = m; + end + + Unit doStream() + List objs = this.streamers; + while objs != null do + C o = objs.content; + o.doStream(); + objs = objs.next; + end + end + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ intToString(l.content) ++ " "; + l = l.next; + end + return s; + end + +end + +streamer class C(Int x) models "a ex:Class."; + + Unit doStream() emits(this.x) + this.x = this.x + 1; + end + +end + +main + // configure simulation + clock Int i = 100; + Int endAt = 200; + + // initialize stream + List streamers = construct(" + SELECT ?id ?x + WHERE { + ?s a ex:Class ; + ex:id ?id ; + ex:x0 ?x . + } ORDER BY DESC (?id) + "); + Controller ctrl = new Controller(streamers, null); + + // initialize monitor + ctrl.register(); + + // run simulation + while i < endAt do + + // put triples in the stream + ctrl.doStream(i); + + // read window contents + String res = ctrl.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + + // advance clock (timestamp) + i = i + 1; + end +end + +// Output: +// >>100: +// >>101: +// >>102: +// >>103: 11 21 1 12 22 2 : 1-2, 11-12, 21-22 +// >>104: 11 21 1 12 22 2 +// >>105: 24 21 4 1 12 11 23 2 14 22 13 3 : 2-5, 12-15, 22-25 +// >>106: 24 21 4 1 12 11 23 2 14 22 13 3 +// >>107: 24 15 4 6 23 26 14 25 13 3 5 16 : 3-6, 13-16, 23-26 +// >>108: 24 15 4 6 23 26 14 25 13 3 5 16 +// >>109: 15 27 6 18 26 17 7 28 25 8 5 16 : 5-8, 15-18, 25-28 +// >>110: 15 27 6 18 26 17 7 28 25 8 5 16 +// >>111: 27 10 30 18 17 29 7 20 28 19 9 8 : 7-10, 17-20, 27-30 +// >>112: 27 10 30 18 17 29 7 20 28 19 9 8 +// >>113: 22 10 30 21 32 11 29 20 31 19 12 9 : 9-12, 19-22, 29-32 +// .. + +// Notes: +// - Same output as 10_multiple_streams.smol +// - But now the streamers are created based on the domain knowledge diff --git a/src/test/resources/csparql/12_different_ranges.smol b/src/test/resources/csparql/12_different_ranges.smol new file mode 100644 index 00000000..f373dfb1 --- /dev/null +++ b/src/test/resources/csparql/12_different_ranges.smol @@ -0,0 +1,108 @@ +// reada src/test/resources/csparql/12_different_ranges.smol + +class Observation(Int x, Int ts) end + +class Controller(List streamers, Monitor m) + + Unit register() + Monitor m = monitor(" + SELECT ?x + FROM NAMED WINDOW ON %1 [RANGE PT4.001s STEP PT2s] + FROM NAMED WINDOW ON %2 [RANGE PT3.001s STEP PT1s] + FROM NAMED WINDOW ON %3 [RANGE PT5.001s STEP PT4s] + WHERE { + { WINDOW { ?s prog:C_x ?x. } } + UNION + { WINDOW { ?s prog:C_x ?x. } } + UNION + { WINDOW { ?s prog:C_x ?x. } } + } + ", + this.streamers.content, + this.streamers.next.content, + this.streamers.next.next.content + ); + this.m = m; + end + + Unit doStream() + List objs = this.streamers; + while objs != null do + C o = objs.content; + o.doStream(); + objs = objs.next; + end + end + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ intToString(l.content) ++ " "; + l = l.next; + end + return s; + end + +end + +streamer class C(Int x) + + Unit doStream() emits(this.x) + this.x = this.x + 1; + end + +end + +main + // configure simulation + clock Int i = 100; + Int endAt = 200; + + // initialize stream + C o1 = new C(0); + C o2 = new C(10); + C o3 = new C(20); + List l3 = new List(o3, null); + List l2 = new List(o2, l3); + List streamers = new List(o1, l2); + + Controller ctrl = new Controller(streamers, null); + + // initialize monitor + ctrl.register(); + + // run simulation + while i < endAt do + + // put triples in the stream + ctrl.doStream(i); + + // read window contents + String res = ctrl.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + + // advance clock (timestamp) + i = i + 1; + end +end + +// Output: +// >>100: +// >>101: +// >>102: +// >>103: 11 21 1 12 22 2 : 1-2, 11-12, 21-22 +// >>104: 11 21 1 12 22 2 +// >>105: 22 4 21 24 12 23 1 11 14 13 2 3 : 2-5, 12-15, 22-25 +// >>106: 22 4 21 24 12 23 1 11 14 13 2 3 +// >>107: 26 4 25 6 24 23 14 13 16 3 5 15 : 3-6, 13-16, 23-26 +// >>108: 26 4 25 6 24 23 14 13 16 3 5 15 +// >>109: 18 26 17 25 6 7 28 27 8 16 5 15 : 5-8, 15-18, 25-28 +// >>110: 18 26 17 25 6 7 28 27 8 16 5 15 +// >>111: 18 17 10 20 7 28 19 27 30 8 29 9 : 7-10, 17-20, 27-30 +// >>112: 18 17 10 20 7 28 19 27 30 8 29 9 +// >>113: 10 11 20 19 22 12 30 21 29 32 31 9 : 9-12, 19-22, 29-32 + + +// Notes: +// - First range and step applies: [RANGE 4s STEP 2s] diff --git a/src/test/resources/csparql/13_static_join.smol b/src/test/resources/csparql/13_static_join.smol new file mode 100644 index 00000000..1a9107bc --- /dev/null +++ b/src/test/resources/csparql/13_static_join.smol @@ -0,0 +1,134 @@ +// reada src/test/resources/csparql/13_static_join.smol + +class Observation(Int x, Int ts) end + +class Controller(List streamers) + + Unit register(String static) + List objs = this.streamers; + while objs != null do + C o = objs.content; + o.register(static); + objs = objs.next; + end + end + + Unit doStream(Int ts) + List objs = this.streamers; + while objs != null do + C o = objs.content; + o.doStream(ts); + objs = objs.next; + end + end + + String windowToString() + String s = ""; + List objs = this.streamers; + while objs != null do + C o = objs.content; + String res = o.windowToString(); + s = s ++ o ++ " ( " ++ res ++ ") "; + objs = objs.next; + end + return s; + end +end + +streamer class C(Int x, Int mult, Observation obs, Monitor m) + + Unit register(String static) + Monitor m = monitor(" + SELECT (?x * ?k AS ?res) + FROM NAMED WINDOW ON %1 [RANGE PT5.001s STEP PT2s] + FROM %2 + WHERE { + ?streamer prog:C_mult ?k. + WINDOW ?w { + ?streamer prog:C_obs ?obs . + ?obs prog:Observation_x ?x . + } + } + ", + this, + static + ); + this.m = m; + end + + Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts) + this.obs = new Observation(this.x, ts); + this.x = this.x + 1; + end + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ intToString(l.content) ++ " "; + l = l.next; + end + return s; + end + +end + +main + // configure simulation + clock Int i = 100; + Int endAt = 200; + + C o1 = new C(0, 2, null, null); + C o2 = new C(10, 3, null, null); + C o3 = new C(20, 10, null, null); + + // push static data + String static = pushStatic("heap,staticTable"); + + // initialize stream + List l3 = new List(o3, null); + List l2 = new List(o2, l3); + List streamers = new List(o1, l2); + Controller ctrl = new Controller(streamers); + + // initialize monitor + ctrl.register(static); + + // run simulation + while i < endAt do + + // put triples in the stream + ctrl.doStream(i); + + // read window contents + String res = ctrl.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + + // advance clock (timestamp) + i = i + 1; + end +end + +// Output: +// >>100: obj8 ( ) obj9 ( ) obj10 ( ) +// >>101: obj8 ( ) obj9 ( ) obj10 ( ) +// >>102: obj8 ( ) obj9 ( ) obj10 ( ) +// >>103: obj8 ( 0 2 ) obj9 ( 30 33 ) obj10 ( 200 210 ) +// >>104: obj8 ( 0 2 ) obj9 ( 30 33 ) obj10 ( 200 210 ) +// >>105: obj8 ( 0 2 4 6 ) obj9 ( 30 33 36 39 ) obj10 ( 200 210 220 230 ) +// >>106: obj8 ( 0 2 4 6 ) obj9 ( 30 33 36 39 ) obj10 ( 200 210 220 230 ) +// >>107: obj8 ( 2 4 6 8 10 ) obj9 ( 33 36 39 42 45 ) obj10 ( 210 220 230 240 250 ) +// >>108: obj8 ( 2 4 6 8 10 ) obj9 ( 33 36 39 42 45 ) obj10 ( 210 220 230 240 250 ) +// >>109: obj8 ( 6 8 10 12 14 ) obj9 ( 39 42 45 48 51 ) obj10 ( 230 240 250 260 270 ) +// >>110: obj8 ( 6 8 10 12 14 ) obj9 ( 39 42 45 48 51 ) obj10 ( 230 240 250 260 270 ) +// >>111: obj8 ( 10 12 14 16 18 ) obj9 ( 45 48 51 54 57 ) obj10 ( 250 260 270 280 290 ) +// .. + +// Notes: +// - pushStatic("heap, staticTable") dumps static data to output.ttl +// - This file contains triples for C_mult property of each streamer object (static data) + +// - Results from streamers are multiplied by {2, 3, 10} respectively +// - Stream data is inside the window: ?x +// - Static data is outside the window: ?k +// - Static data is joined with stream data: ?x * ?k diff --git a/src/test/resources/csparql/14_multiple_monitors.smol b/src/test/resources/csparql/14_multiple_monitors.smol new file mode 100644 index 00000000..58aff3c2 --- /dev/null +++ b/src/test/resources/csparql/14_multiple_monitors.smol @@ -0,0 +1,120 @@ +// reada src/test/resources/csparql/14_multiple_monitors.smol + +class Observation(Int x, Int ts) end + +class Controller(List streamers) + + Unit register() + List objs = this.streamers; + while objs != null do + C o = objs.content; + o.register(); + objs = objs.next; + end + end + + Unit doStream(Int ts) + List objs = this.streamers; + while objs != null do + C o = objs.content; + o.doStream(ts); + objs = objs.next; + end + end + + String windowToString() + String s = ""; + List objs = this.streamers; + while objs != null do + C o = objs.content; + String res = o.windowToString(); + s = s ++ o ++ " ( " ++ res ++ ") "; + objs = objs.next; + end + return s; + end +end + +streamer class C(Int x, Observation obs, Monitor m) + + Unit register() + Monitor m = monitor(" + SELECT ?x + FROM NAMED WINDOW ON %1 [RANGE PT4.001s STEP PT2s] + WHERE { + WINDOW ?w { ?s prog:Observation_x ?x. } + } + ", + this + ); + this.m = m; + end + + Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts) + this.obs = new Observation(this.x, ts); + this.x = this.x + 1; + end + + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ intToString(l.content) ++ " "; + l = l.next; + end + return s; + end + +end + +main + // configure simulation + clock Int i = 100; + Int endAt = 200; + + // initialize stream + C o1 = new C(0, null, null); + C o2 = new C(10, null, null); + C o3 = new C(20, null, null); + List l3 = new List(o3, null); + List l2 = new List(o2, l3); + List streamers = new List(o1, l2); + Controller ctrl = new Controller(streamers); + + // initialize monitor + ctrl.register(); + + // run simulation + while i < endAt do + + // put triples in the stream + ctrl.doStream(i); + + // read window contents + String res = ctrl.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + + // advance clock (timestamp) + i = i + 1; + end +end + +// Output: +// >>100: obj7 ( ) obj8 ( ) obj9 ( ) +// >>101: obj7 ( ) obj8 ( ) obj9 ( ) +// >>102: obj7 ( ) obj8 ( ) obj9 ( ) +// >>103: obj7 ( 0 1 ) obj8 ( 10 11 ) obj9 ( 20 21 ) +// >>104: obj7 ( 0 1 ) obj8 ( 10 11 ) obj9 ( 20 21 ) +// >>105: obj7 ( 0 1 2 3 ) obj8 ( 10 11 12 13 ) obj9 ( 20 21 22 23 ) +// >>106: obj7 ( 0 1 2 3 ) obj8 ( 10 11 12 13 ) obj9 ( 20 21 22 23 ) +// >>107: obj7 ( 2 3 4 5 ) obj8 ( 12 13 14 15 ) obj9 ( 22 23 24 25 ) +// >>108: obj7 ( 2 3 4 5 ) obj8 ( 12 13 14 15 ) obj9 ( 22 23 24 25 ) +// >>109: obj7 ( 4 5 6 7 ) obj8 ( 14 15 16 17 ) obj9 ( 24 25 26 27 ) +// >>110: obj7 ( 4 5 6 7 ) obj8 ( 14 15 16 17 ) obj9 ( 24 25 26 27 ) +// >>111: obj7 ( 6 7 8 9 ) obj8 ( 16 17 18 19 ) obj9 ( 26 27 28 29 ) +// >>112: obj7 ( 6 7 8 9 ) obj8 ( 16 17 18 19 ) obj9 ( 26 27 28 29 ) +// >>113: obj7 ( 8 9 10 11 ) obj8 ( 18 19 20 21 ) obj9 ( 28 29 30 31 ) + +// Notes: +// - Multiple monitors on multiple streams are supported diff --git a/src/test/resources/csparql/15_different_periods.smol b/src/test/resources/csparql/15_different_periods.smol new file mode 100644 index 00000000..57229941 --- /dev/null +++ b/src/test/resources/csparql/15_different_periods.smol @@ -0,0 +1,136 @@ +// reada src/test/resources/csparql/15_different_periods.smol + +class Observation(Int x, Int ts) end + +class Controller(List streamers) + + Unit register() + List objs = this.streamers; + while objs != null do + C o = objs.content; + o.register(); + objs = objs.next; + end + end + + // Unit doStream(Int ts) + // List objs = this.streamers; + // while objs != null do + // C o = objs.content; + // o.doStream(ts); + // objs = objs.next; + // end + // end + + String windowToString() + String s = ""; + List objs = this.streamers; + while objs != null do + C o = objs.content; + String res = o.windowToString(); + s = s ++ o ++ " ( " ++ res ++ ") "; + objs = objs.next; + end + return s; + end +end + +streamer class C(Int x, Int period, Observation obs, Monitor m) + + Unit register() + Monitor m = monitor(" + SELECT ?x + FROM NAMED WINDOW ON %1 [RANGE PT6.001s STEP PT2s] + WHERE { + WINDOW ?w { ?s prog:Observation_x ?x. } + } + ", + this + ); + this.m = m; + end + + Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts) + this.obs = new Observation(this.x, ts); + this.x = this.x + 1; + end + + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ intToString(l.content) ++ " "; + l = l.next; + end + return s; + end + +end + +main + // configure simulation + clock Int i = 100; + Int endAt = 200; + + // initialize stream + C o1 = new C(0, 1, null, null); + C o2 = new C(10, 2, null, null); + C o3 = new C(20, 3, null, null); + List l3 = new List(o3, null); + List l2 = new List(o2, l3); + List streamers = new List(o1, l2); + Controller ctrl = new Controller(streamers); + + // initialize monitor + ctrl.register(); + + // run simulation + List objs = null; + while i < endAt do + + // put triples in the stream + objs = streamers; + while objs != null do + C o = objs.content; + if (((i / o.period) * o.period) == i) then + o.doStream(i); + else + o.x = o.x + 1; // increment without streaming + end + objs = objs.next; + end + + // advance clock (timestamp) + i = i + 1; + + // read window contents + String res = ctrl.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + end +end + +// Output: +// >>101: obj6 ( ) obj7 ( ) obj8 ( ) +// >>102: obj6 ( ) obj7 ( ) obj8 ( ) +// >>103: obj6 ( 0 1 ) obj7 ( 10 ) obj8 ( ) +// >>104: obj6 ( 0 1 ) obj7 ( 10 ) obj8 ( ) +// >>105: obj6 ( 0 1 2 3 ) obj7 ( 10 12 ) obj8 ( 22 ) +// >>106: obj6 ( 0 1 2 3 ) obj7 ( 10 12 ) obj8 ( 22 ) +// >>107: obj6 ( 0 1 2 3 4 5 ) obj7 ( 10 12 14 ) obj8 ( 22 25 ) +// >>108: obj6 ( 0 1 2 3 4 5 ) obj7 ( 10 12 14 ) obj8 ( 22 25 ) +// >>109: obj6 ( 2 3 4 5 6 7 ) obj7 ( 12 14 16 ) obj8 ( 22 25 ) +// >>110: obj6 ( 2 3 4 5 6 7 ) obj7 ( 12 14 16 ) obj8 ( 22 25 ) +// >>111: obj6 ( 4 5 6 7 8 9 ) obj7 ( 14 16 18 ) obj8 ( 25 28 ) +// >>112: obj6 ( 4 5 6 7 8 9 ) obj7 ( 14 16 18 ) obj8 ( 25 28 ) +// >>113: obj6 ( 6 7 8 9 10 11 ) obj7 ( 16 18 20 ) obj8 ( 28 31 ) +// >>114: obj6 ( 6 7 8 9 10 11 ) obj7 ( 16 18 20 ) obj8 ( 28 31 ) +// >>115: obj6 ( 8 9 10 11 12 13 ) obj7 ( 18 20 22 ) obj8 ( 28 31 ) +// >>116: obj6 ( 8 9 10 11 12 13 ) obj7 ( 18 20 22 ) obj8 ( 28 31 ) +// >>117: obj6 ( 10 11 12 13 14 15 ) obj7 ( 20 22 24 ) obj8 ( 31 34 ) + +// Notes: +// - Clock variable is increased before reading the window contents (can only read past data) +// - First monitor streams every second: can fit 6 elements in a 6.001s window +// - Second monitor streams every 2 seconds: can fit 3 elements in a 6.001s window +// - Third monitor streams every 3 seconds: can fit 2 elements in a 6.001s window diff --git a/src/test/resources/csparql/16_system_time.smol b/src/test/resources/csparql/16_system_time.smol new file mode 100644 index 00000000..5df173d7 --- /dev/null +++ b/src/test/resources/csparql/16_system_time.smol @@ -0,0 +1,110 @@ +// reada src/test/resources/csparql/16_system_time.smol + +class Observation(Int x, Int ts) end + +streamer class C(Int x, Observation obs, Monitor m) + + Unit register() + Monitor m = monitor(" + SELECT ?x + FROM NAMED WINDOW ON %1 [RANGE PT0.5s STEP PT0.2s] + WHERE { + WINDOW ?w { ?s prog:Observation_x ?x } + } + ", this); + this.m = m; + end + + Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts) + this.x = this.x + 1; + this.obs = new Observation(this.x, ts); + end + + Unit wait(Int iter) + Int x = 0; + Int j = 0; + Int k = 0; + while j < 100 do + while k < 100 do + x = j * k; + x = 0; + while x < iter do + x = x + 1; + end + k = k + 1; + end + k = 0; + j = j + 1; + end + end + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ intToString(l.content) ++ " "; + l = l.next; + end + return s; + end +end + +main + // configure simulation + Int i = 0; + Int endAt = 10; + + // initialize stream + C o = new C(0, null, null); + + // initialize monitor + o.register(); + + // run simulation + while i < endAt do + + // put triples in the stream + o.doStream(i); + + // advance i + i = i + 1; + + // wait + o.wait(i); + + // read window contents + String res = o.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + + end +end + +// Example run output: +// >>1: +// >>2: +// >>3: 1 2 +// >>4: 1 2 3 +// >>5: 1 2 3 +// >>6: 1 2 3 +// >>7: 2 3 4 5 6 +// >>8: 2 3 4 5 6 +// >>9: 2 3 4 5 6 +// >>10: 4 5 6 7 8 9 + +// Analysis of the example: +// - Window configuration: Range 500ms, Step 200ms +// - Consider first timestamp (1758566583488) as 0 point, listing (add, push) times in ms relative to that point +// - Ex. (1758566583488 - 1758566583488, 1758566583506 - 1758566583488) = (0, 18) +// - p.w.i = previous window includes (the elements in the last window are monitored) +// * 1: ( 0 , 18 ) -> window (0, 200] +// * 2: (137, 138) -> window (0, 200] +// * 3: (230, 357) -> window (0, 400]; p.w.i. 1, 2 +// * 4: (404, 408) -> window (100, 600]; p.w.i. 1, 2, 3 +// * 5: (451, 451) -> window (100, 600]; p.w.i. 1, 2, 3 +// * 6: (511, 511) -> window (100, 600]; p.w.i. 1, 2, 3 +// * 7: (647, 651) -> window (300, 800]; p.w.i. 2, 3, 4, 5, 6 +// * 8: (706, 707) -> window (300, 800]; p.w.i. 2, 3, 4, 5, 6 +// * 9: (749, 749) -> window (300, 800]; p.w.i. 2, 3, 4, 5, 6 +// * 10:(800, 803) -> window (500,1000]; p.w.i. 4, 5, 6, 7, 8, 9 +// - Full output given in 16_system_time_full_output.txt +// - Note: The actual times may vary based on system performance and load diff --git a/src/test/resources/csparql/16_system_time_full_output.txt b/src/test/resources/csparql/16_system_time_full_output.txt new file mode 100644 index 00000000..17e83cd4 --- /dev/null +++ b/src/test/resources/csparql/16_system_time_full_output.txt @@ -0,0 +1,51 @@ +Full output with debug prints: +Added to stream obj6 at 1758566583488: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj6, https://github.com/Edkamb/SemanticObjects/Program#C_obs, https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj8] +Added to stream obj6 at 1758566583488: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj8, https://github.com/Edkamb/SemanticObjects/Program#Observation_x, "1"^^http://www.w3.org/2001/XMLSchema#int] +Added to stream obj6 at 1758566583488: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj8, https://github.com/Edkamb/SemanticObjects/Program#Observation_ts, "0"^^http://www.w3.org/2001/XMLSchema#int] +Pushed immediately to stream obj6 at 1758566583488: 3 triples (now: 1758566583506) +>>1: +Added to stream obj6 at 1758566583625: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj6, https://github.com/Edkamb/SemanticObjects/Program#C_obs, https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj9] +Added to stream obj6 at 1758566583625: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj9, https://github.com/Edkamb/SemanticObjects/Program#Observation_x, "2"^^http://www.w3.org/2001/XMLSchema#int] +Added to stream obj6 at 1758566583625: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj9, https://github.com/Edkamb/SemanticObjects/Program#Observation_ts, "1"^^http://www.w3.org/2001/XMLSchema#int] +Pushed immediately to stream obj6 at 1758566583625: 3 triples (now: 1758566583626) +>>2: +Added to stream obj6 at 1758566583718: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj6, https://github.com/Edkamb/SemanticObjects/Program#C_obs, https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj10] +Added to stream obj6 at 1758566583718: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj10, https://github.com/Edkamb/SemanticObjects/Program#Observation_x, "3"^^http://www.w3.org/2001/XMLSchema#int] +Added to stream obj6 at 1758566583718: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj10, https://github.com/Edkamb/SemanticObjects/Program#Observation_ts, "2"^^http://www.w3.org/2001/XMLSchema#int] +Pushed immediately to stream obj6 at 1758566583718: 3 triples (now: 1758566583845) +>>3: 1 2 +Added to stream obj6 at 1758566583892: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj6, https://github.com/Edkamb/SemanticObjects/Program#C_obs, https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj13] +Added to stream obj6 at 1758566583892: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj13, https://github.com/Edkamb/SemanticObjects/Program#Observation_x, "4"^^http://www.w3.org/2001/XMLSchema#int] +Added to stream obj6 at 1758566583892: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj13, https://github.com/Edkamb/SemanticObjects/Program#Observation_ts, "3"^^http://www.w3.org/2001/XMLSchema#int] +Pushed immediately to stream obj6 at 1758566583892: 3 triples (now: 1758566583896) +>>4: 1 2 3 +Added to stream obj6 at 1758566583939: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj6, https://github.com/Edkamb/SemanticObjects/Program#C_obs, https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj17] +Added to stream obj6 at 1758566583939: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj17, https://github.com/Edkamb/SemanticObjects/Program#Observation_x, "5"^^http://www.w3.org/2001/XMLSchema#int] +Added to stream obj6 at 1758566583939: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj17, https://github.com/Edkamb/SemanticObjects/Program#Observation_ts, "4"^^http://www.w3.org/2001/XMLSchema#int] +Pushed immediately to stream obj6 at 1758566583939: 3 triples (now: 1758566583939) +>>5: 1 2 3 +Added to stream obj6 at 1758566583999: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj6, https://github.com/Edkamb/SemanticObjects/Program#C_obs, https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj21] +Added to stream obj6 at 1758566583999: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj21, https://github.com/Edkamb/SemanticObjects/Program#Observation_x, "6"^^http://www.w3.org/2001/XMLSchema#int] +Added to stream obj6 at 1758566583999: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj21, https://github.com/Edkamb/SemanticObjects/Program#Observation_ts, "5"^^http://www.w3.org/2001/XMLSchema#int] +Pushed immediately to stream obj6 at 1758566583999: 3 triples (now: 1758566583999) +>>6: 1 2 3 +Added to stream obj6 at 1758566584135: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj6, https://github.com/Edkamb/SemanticObjects/Program#C_obs, https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj25] +Added to stream obj6 at 1758566584135: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj25, https://github.com/Edkamb/SemanticObjects/Program#Observation_x, "7"^^http://www.w3.org/2001/XMLSchema#int] +Added to stream obj6 at 1758566584135: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj25, https://github.com/Edkamb/SemanticObjects/Program#Observation_ts, "6"^^http://www.w3.org/2001/XMLSchema#int] +Pushed immediately to stream obj6 at 1758566584135: 3 triples (now: 1758566584139) +>>7: 2 3 4 5 6 +Added to stream obj6 at 1758566584194: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj6, https://github.com/Edkamb/SemanticObjects/Program#C_obs, https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj31] +Added to stream obj6 at 1758566584194: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj31, https://github.com/Edkamb/SemanticObjects/Program#Observation_x, "8"^^http://www.w3.org/2001/XMLSchema#int] +Added to stream obj6 at 1758566584194: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj31, https://github.com/Edkamb/SemanticObjects/Program#Observation_ts, "7"^^http://www.w3.org/2001/XMLSchema#int] +Pushed immediately to stream obj6 at 1758566584194: 3 triples (now: 1758566584195) +>>8: 2 3 4 5 6 +Added to stream obj6 at 1758566584237: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj6, https://github.com/Edkamb/SemanticObjects/Program#C_obs, https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj37] +Added to stream obj6 at 1758566584237: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj37, https://github.com/Edkamb/SemanticObjects/Program#Observation_x, "9"^^http://www.w3.org/2001/XMLSchema#int] +Added to stream obj6 at 1758566584237: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj37, https://github.com/Edkamb/SemanticObjects/Program#Observation_ts, "8"^^http://www.w3.org/2001/XMLSchema#int] +Pushed immediately to stream obj6 at 1758566584237: 3 triples (now: 1758566584237) +>>9: 2 3 4 5 6 +Added to stream obj6 at 1758566584288: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj6, https://github.com/Edkamb/SemanticObjects/Program#C_obs, https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj43] +Added to stream obj6 at 1758566584288: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj43, https://github.com/Edkamb/SemanticObjects/Program#Observation_x, "10"^^http://www.w3.org/2001/XMLSchema#int] +Added to stream obj6 at 1758566584288: [https://github.com/Edkamb/SemanticObjects/Run1758566561585#obj43, https://github.com/Edkamb/SemanticObjects/Program#Observation_ts, "9"^^http://www.w3.org/2001/XMLSchema#int] +Pushed immediately to stream obj6 at 1758566584288: 3 triples (now: 1758566584291) +>>10: 4 5 6 7 8 9 diff --git a/src/test/resources/csparql/17_influxdb.smol b/src/test/resources/csparql/17_influxdb.smol new file mode 100644 index 00000000..635cbb62 --- /dev/null +++ b/src/test/resources/csparql/17_influxdb.smol @@ -0,0 +1,109 @@ +// put config.yml in src/main/resources with InfluxDB connection details +// reada src/test/resources/csparql/17_influxdb.smol + +class Observation(Double x, Int ts) end + +streamer class C(Observation obs, Monitor m) + + Unit register() + Monitor m = monitor(" + SELECT ?x + FROM NAMED WINDOW ON %1 [RANGE PT1200.001s STEP PT120s] + WHERE { + WINDOW ?w { ?s prog:Observation_x ?x } + } + ", this); + this.m = m; + end + + Unit doStream(Double x, Int ts) emits(this.obs, this.obs.x, this.obs.ts) + this.obs = new Observation(x, ts); + end + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ doubleToString(l.content) ++ " "; + l = l.next; + end + return s; + end + +end + +class Reader() + List readInfluxDB(Int from, Int to) + List list = access(" + from(bucket: \"sr-demo\") + |> range(start: %1, stop: %2) + |> filter(fn: (r) => r._measurement == \"sensor\" and r._field == \"soil_moisture\" and r.field == \"FA01\") + |> yield(name: \"_value\") + ", INFLUXDB("src/main/resources/config.yml"), from, to); + return list; + end +end + +main + Reader r = new Reader(); + Int i = 0; + clock Int ts = 1727740800; + Int endAt = 1727745000; + Int tickS = 60; + + // initialize stream + C o = new C(null, null); + + // initialize monitor + o.register(); + + while ts < endAt do + // print("Querying InfluxDB from " ++ intToString(ts) ++ " to " ++ intToString(ts + tickS)); + List l = r.readInfluxDB(ts, ts + tickS); + while l != null do + o.doStream(l.content, ts); + String res = o.windowToString(); + print(intToString(i) ++ "[" ++ intToString(ts) ++ "]: " ++ res); + l = l.next; + end + i = i + 1; + ts = ts + tickS; + end + +end + +// Output: +// 0[1727740800]: +// 2[1727740920]: +// 4[1727741040]: 2.0 +// 6[1727741160]: 2.0 2.011940000178814 +// 8[1727741280]: 2.0 2.011940000178814 2.0358200005364417 +// 10[1727741400]: 2.0 2.011940000178814 2.0358200005364417 2.0597000008940696 +// 12[1727741520]: 2.0 2.011940000178814 2.0358200005364417 2.0597000008940696 2.0835800012516974 +// 14[1727741640]: 2.0 2.011940000178814 2.0358200005364417 2.0597000008940696 2.0835800012516974 2.1074600016093252 +// 16[1727741760]: 2.0 2.011940000178814 2.0358200005364417 2.0597000008940696 2.0835800012516974 2.1074600016093252 2.131340001966953 +// 18[1727741880]: 2.0 2.011940000178814 2.0358200005364417 2.0597000008940696 2.0835800012516974 2.1074600016093252 2.131340001966953 2.155220002324581 +// 20[1727742000]: 2.0 2.011940000178814 2.0358200005364417 2.0597000008940696 2.0835800012516974 2.1074600016093252 2.131340001966953 2.155220002324581 2.1791000026822087 +// 22[1727742120]: 2.0358200005364417 2.1791000026822087 2.0597000008940696 2.0835800012516974 2.0 2.2029800030398365 2.155220002324581 2.1074600016093252 2.011940000178814 2.131340001966953 +// 24[1727742240]: 2.0358200005364417 2.1791000026822087 2.0597000008940696 2.0835800012516974 2.2029800030398365 2.155220002324581 2.2268600033974644 2.1074600016093252 2.011940000178814 2.131340001966953 +// 26[1727742360]: 2.0358200005364417 2.1791000026822087 2.0597000008940696 2.0835800012516974 2.2029800030398365 2.155220002324581 2.1074600016093252 2.2268600033974644 2.250740003755092 2.131340001966953 +// 28[1727742480]: 2.1791000026822087 2.0597000008940696 2.0835800012516974 2.2029800030398365 2.27462000411272 2.155220002324581 2.1074600016093252 2.2268600033974644 2.250740003755092 2.131340001966953 +// 30[1727742600]: 2.1791000026822087 2.0835800012516974 2.298500004470348 2.2029800030398365 2.27462000411272 2.155220002324581 2.1074600016093252 2.2268600033974644 2.250740003755092 2.131340001966953 +// 32[1727742720]: 2.1791000026822087 2.298500004470348 2.2029800030398365 2.27462000411272 2.155220002324581 2.1074600016093252 2.2268600033974644 2.250740003755092 2.3223800048279757 2.131340001966953 +// 34[1727742840]: 2.1791000026822087 2.298500004470348 2.2029800030398365 2.27462000411272 2.155220002324581 2.2268600033974644 2.250740003755092 2.3462600051856035 2.3223800048279757 2.131340001966953 +// 36[1727742960]: 2.1791000026822087 2.298500004470348 2.3701400055432313 2.2029800030398365 2.27462000411272 2.155220002324581 2.2268600033974644 2.250740003755092 2.3462600051856035 2.3223800048279757 +// 38[1727743080]: 2.1791000026822087 2.298500004470348 2.3701400055432313 2.2029800030398365 2.394020005900859 2.27462000411272 2.2268600033974644 2.250740003755092 2.3462600051856035 2.3223800048279757 +// 40[1727743200]: 2.298500004470348 2.2029800030398365 2.3701400055432313 2.394020005900859 2.417900006258487 2.27462000411272 2.2268600033974644 2.250740003755092 2.3462600051856035 2.3223800048279757 +// 42[1727743320]: 2.298500004470348 2.3701400055432313 2.394020005900859 2.417900006258487 2.27462000411272 2.2268600033974644 2.250740003755092 2.3462600051856035 2.441780006616115 2.3223800048279757 +// 44[1727743440]: 2.298500004470348 2.3701400055432313 2.417900006258487 2.394020005900859 2.27462000411272 2.250740003755092 2.3462600051856035 2.441780006616115 2.3223800048279757 2.4656600069737427 +// 46[1727743560]: 2.298500004470348 2.3701400055432313 2.417900006258487 2.394020005900859 2.27462000411272 2.4895400073313705 2.3462600051856035 2.441780006616115 2.3223800048279757 2.4656600069737427 +// 48[1727743680]: 2.5134200076889983 2.298500004470348 2.3701400055432313 2.417900006258487 2.394020005900859 2.4895400073313705 2.3462600051856035 2.441780006616115 2.3223800048279757 2.4656600069737427 +// 50[1727743800]: 2.5134200076889983 2.537300008046626 2.3701400055432313 2.417900006258487 2.394020005900859 2.4895400073313705 2.3462600051856035 2.441780006616115 2.3223800048279757 2.4656600069737427 +// 52[1727743920]: 2.5134200076889983 2.537300008046626 2.3701400055432313 2.417900006258487 2.394020005900859 2.561180008404254 2.4895400073313705 2.3462600051856035 2.441780006616115 2.4656600069737427 +// 54[1727744040]: 2.5134200076889983 2.537300008046626 2.585060008761882 2.3701400055432313 2.417900006258487 2.394020005900859 2.561180008404254 2.4895400073313705 2.441780006616115 2.4656600069737427 +// 56[1727744160]: 2.5134200076889983 2.537300008046626 2.585060008761882 2.417900006258487 2.394020005900859 2.561180008404254 2.4895400073313705 2.441780006616115 2.4656600069737427 2.6089400091195096 +// 58[1727744280]: 2.5134200076889983 2.537300008046626 2.6328200094771375 2.585060008761882 2.417900006258487 2.561180008404254 2.4895400073313705 2.441780006616115 2.4656600069737427 2.6089400091195096 +// 60[1727744400]: 2.5134200076889983 2.537300008046626 2.6328200094771375 2.585060008761882 2.6567000098347653 2.561180008404254 2.4895400073313705 2.441780006616115 2.4656600069737427 2.6089400091195096 + +// Notes: +// - [RANGE 20m STEP 2m]: windows have {0, 0, 2, 3, 4, ..., 9, 10, 10, 10, ...} elements diff --git a/src/test/resources/csparql/18_reasoning.smol b/src/test/resources/csparql/18_reasoning.smol new file mode 100644 index 00000000..8fb4625b --- /dev/null +++ b/src/test/resources/csparql/18_reasoning.smol @@ -0,0 +1,145 @@ +// uncomment the last 2 lines for reasoning in src/main/resources/default-csparql2.properties +// java -jar build/libs/smol.jar -b src/test/resources/csparql/example2.owl -p ex=https://example.com/ontology# +// reada src/test/resources/csparql/18_reasoning.smol + +class SensorData(Int x, Int ts) end + +class Observation extends SensorData() end + +class Controller(List streamers) + + Unit register(String static) + List objs = this.streamers; + while objs != null do + C o = objs.content; + o.register(static); + objs = objs.next; + end + end + + Unit doStream(Int ts) + List objs = this.streamers; + while objs != null do + C o = objs.content; + o.doStream(ts); + objs = objs.next; + end + end + + String windowToString() + String s = ""; + List objs = this.streamers; + while objs != null do + C o = objs.content; + String res = o.windowToString(); + s = s ++ o ++ " ( " ++ res ++ ") "; + objs = objs.next; + end + return s; + end +end + +// C is subclass of D +class D(Int x, Int mult, Observation obs, Monitor m) end + +// ex:StreamerClass is subclass of ex:Class +streamer class C extends D() models "a ex:StreamerClass."; + + Unit register(String static) + Monitor m = monitor(" + SELECT (?x * ?k AS ?res) + FROM NAMED WINDOW ON %1 [RANGE PT5.001s STEP PT2s] + FROM %2 + WHERE { + ?streamer prog:C_mult ?k . + ?streamer domain:models ?model . + + ?model a ex:Class . + ?streamer a prog:D . + + WINDOW ?w { + ?streamer prog:C_obs ?obs . + ?obs prog:Observation_x ?x . + } + } + ", + this, + static + ); + this.m = m; + end + + Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts) + this.obs = new Observation(this.x, ts); + this.x = this.x + 1; + end + + String windowToString() + String s = ""; + List l = window(this.m); + while l != null do + s = s ++ intToString(l.content) ++ " "; + l = l.next; + end + return s; + end + +end + +main + // configure simulation + clock Int i = 100; + Int endAt = 200; + + C o1 = new C(0, 2, null, null); + C o2 = new C(10, 3, null, null); + C o3 = new C(20, 10, null, null); + + // push static data + String static = pushStatic("heap,staticTable,externalOntology"); + + breakpoint; + + // initialize stream + List l3 = new List(o3, null); + List l2 = new List(o2, l3); + List streamers = new List(o1, l2); + Controller ctrl = new Controller(streamers); + + // initialize monitor + ctrl.register(static); + + // run simulation + while i < endAt do + + // put triples in the stream + ctrl.doStream(i); + + // read window contents + String res = ctrl.windowToString(); + print(">>" ++ intToString(i) ++ ": " ++ res); + + // advance clock (timestamp) + i = i + 1; + end +end + +// Output: +// >>100: obj8 ( ) obj9 ( ) obj10 ( ) +// >>101: obj8 ( ) obj9 ( ) obj10 ( ) +// >>102: obj8 ( ) obj9 ( ) obj10 ( ) +// >>103: obj8 ( 0 2 ) obj9 ( 30 33 ) obj10 ( 200 210 ) +// >>104: obj8 ( 0 2 ) obj9 ( 30 33 ) obj10 ( 200 210 ) +// >>105: obj8 ( 0 2 4 6 ) obj9 ( 30 33 36 39 ) obj10 ( 200 210 220 230 ) +// >>106: obj8 ( 0 2 4 6 ) obj9 ( 30 33 36 39 ) obj10 ( 200 210 220 230 ) +// >>107: obj8 ( 2 4 6 8 10 ) obj9 ( 33 36 39 42 45 ) obj10 ( 210 220 230 240 250 ) +// >>108: obj8 ( 2 4 6 8 10 ) obj9 ( 33 36 39 42 45 ) obj10 ( 210 220 230 240 250 ) +// >>109: obj8 ( 6 8 10 12 14 ) obj9 ( 39 42 45 48 51 ) obj10 ( 230 240 250 260 270 ) +// >>110: obj8 ( 6 8 10 12 14 ) obj9 ( 39 42 45 48 51 ) obj10 ( 230 240 250 260 270 ) +// >>111: obj8 ( 10 12 14 16 18 ) obj9 ( 45 48 51 54 57 ) obj10 ( 250 260 270 280 290 ) +// .. + +// Notes: +// 1. ?model a ex:Class is inferred from ?streamer a ex:StreamerClass and ex:StreamerClass rdfs:subClassOf ex:Class +// 2. ?streamer a prog:D is inferred from ?streamer a prog:C and prog:C rdfs:subClassOf prog:D +// ?obs a prog:SensorData cannot be inferred because ?obs is null diff --git a/src/test/resources/csparql/example-tbox-rdfxml.owl b/src/test/resources/csparql/example-tbox-rdfxml.owl new file mode 100644 index 00000000..0761d8d8 --- /dev/null +++ b/src/test/resources/csparql/example-tbox-rdfxml.owl @@ -0,0 +1,20 @@ + + + + + + + + + + + + + + + + diff --git a/src/test/resources/csparql/example.owl b/src/test/resources/csparql/example.owl new file mode 100644 index 00000000..9ca7f7cc --- /dev/null +++ b/src/test/resources/csparql/example.owl @@ -0,0 +1,27 @@ +@prefix owl: . +@prefix rdf: . +@prefix xsd: . +@prefix rdfs: . +@prefix ex: . + +# class terminology +ex:Class a owl:Class . + +# property terminology +ex:id a owl:DatatypeProperty ; + rdfs:domain ex:Class ; + rdfs:range xsd:string . +ex:x0 a owl:DatatypeProperty ; + rdfs:domain ex:Class ; + rdfs:range xsd:integer . + +# instances +ex:A a ex:Class ; + ex:id "A" ; + ex:x0 0 . +ex:B a ex:Class ; + ex:id "B" ; + ex:x0 10 . +ex:C a ex:Class ; + ex:id "C" ; + ex:x0 20 . diff --git a/src/test/resources/csparql/example2.owl b/src/test/resources/csparql/example2.owl new file mode 100644 index 00000000..510e74d6 --- /dev/null +++ b/src/test/resources/csparql/example2.owl @@ -0,0 +1,29 @@ +@prefix owl: . +@prefix rdf: . +@prefix xsd: . +@prefix rdfs: . +@prefix ex: . + +# class terminology +ex:Class a owl:Class . +ex:StreamerClass a owl:Class ; + rdfs:subClassOf ex:Class . + +# property terminology +ex:id a owl:DatatypeProperty ; + rdfs:domain ex:Class ; + rdfs:range xsd:string . +ex:x0 a owl:DatatypeProperty ; + rdfs:domain ex:Class ; + rdfs:range xsd:integer . + +# instances +ex:A a ex:StreamerClass ; + ex:id "A" ; + ex:x0 0 . +ex:B a ex:StreamerClass ; + ex:id "B" ; + ex:x0 10 . +ex:C a ex:StreamerClass ; + ex:id "C" ; + ex:x0 20 .