@@ -7,14 +7,11 @@ import no.uio.microobject.ast.stmt.MonitorObject
77import no.uio.microobject.main.Settings
88import no.uio.microobject.type.*
99
10-
11- import eu.larkc.csparql.core.engine.*
12- import eu.larkc.csparql.cep.api.*
13- import eu.larkc.csparql.core.*
14- import eu.larkc.csparql.common.*
10+ import java.nio.file.*
1511import java.util.Observer;
1612import java.util.Observable
1713import java.util.concurrent.ConcurrentHashMap
14+ import java.util.function.Consumer
1815import java.io.File
1916import java.io.StringWriter
2017import org.slf4j.Logger
@@ -24,32 +21,41 @@ import org.apache.log4j.Level
2421import org.apache.log4j.Logger as Log4jLogger
2522import org.apache.log4j.PropertyConfigurator
2623import org.apache.jena.rdf.model.Model
24+ import org.apache.jena.sparql.algebra.Table
25+ import org.apache.jena.graph.Graph
2726import kotlin.text.toLong
2827
29- class ResultPusher (private val name : LiteralExpr , private val resultTable : MutableMap <LiteralExpr , RDFTable ?>) : Observer {
30- public override fun update (o : Observable , arg : Any ) {
31- val results: RDFTable = arg as RDFTable
32- resultTable[name] = results
33- }
34- }
28+ import org.streamreasoning.rsp4j.csparql2.engine.CSPARQLEngine
29+ import org.streamreasoning.rsp4j.csparql2.engine.JenaContinuousQueryExecution
30+ import org.streamreasoning.rsp4j.csparql2.stream.GraphStreamSchema
31+ import org.streamreasoning.rsp4j.csparql2.sysout.ResponseFormatterFactory
32+ import org.streamreasoning.rsp4j.csparql2.syntax.QueryFactory
33+ import org.streamreasoning.rsp4j.csparql2.sysout.GenericResponseSysOutFormatter
34+ import org.streamreasoning.rsp4j.api.engine.config.EngineConfiguration;
35+ import org.streamreasoning.rsp4j.io.DataStreamImpl
36+ import org.streamreasoning.rsp4j.api.stream.data.DataStream
37+ import org.streamreasoning.rsp4j.api.sds.SDSConfiguration
38+ import org.apache.jena.rdf.model.*;
39+ import org.apache.jena.atlas.lib.tuple.Tuple
3540
3641// Class managing streams
3742class StreamManager (private val settings : Settings , val staticTable : StaticTable , private val interpreter : Interpreter ? ) {
3843
39- private val engine: CsparqlEngineImpl = CsparqlEngineImpl ()
44+ private var engine: CSPARQLEngine ? = null
4045 private var engineInitialized = false
46+ private var sdsConfig: SDSConfiguration ? = null
47+ private var ec: EngineConfiguration ? = null
4148 val LOG : Logger ? = LoggerFactory .getLogger(StreamManager ::class .java)
4249
43- private var streams: MutableMap <String , MutableMap <LiteralExpr , RdfStream >> = mutableMapOf ()
50+ private var streams: MutableMap <String , MutableMap <LiteralExpr , StreamObject >> = mutableMapOf ()
4451 private var monitors: MutableMap <LiteralExpr , MonitorObject > = mutableMapOf ()
45-
46- private var queryResults: MutableMap <LiteralExpr , RDFTable ?> = ConcurrentHashMap ()
52+
53+ private var queryResults: MutableMap <LiteralExpr , Table ?> = ConcurrentHashMap ()
4754
4855 var clockVar : String? = null
4956 var clockTimestampSec : String? = null
5057 var lastTimestamp: Long = 0
51- var firstTsPassed: Boolean = false
52-
58+
5359 var nStaticGraphsPushed = 0
5460
5561 init {
@@ -65,22 +71,41 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
6571 System .setProperty(org.slf4j.impl.SimpleLogger .DEFAULT_LOG_LEVEL_KEY , " ERROR" )
6672 }
6773 }
74+
75+ private fun initEngineIfNeeded () {
76+ if (! engineInitialized) {
77+ val ts = getTimestamp()
78+
79+ val configPath = " src/main/resources/csparql2.properties"
80+ val defaultPath = " src/main/resources/default-csparql2.properties"
81+
82+ if (File (configPath).exists()) {
83+ ec = EngineConfiguration (configPath)
84+ sdsConfig = SDSConfiguration (configPath)
85+ println (" Loaded csparql2.properties successfully at ts=$ts ." )
86+ } else {
87+ println (" Failed to load csparql2.properties, falling back to default-csparql2.properties at ts=$ts ." )
88+ ec = EngineConfiguration (defaultPath)
89+ sdsConfig = SDSConfiguration (defaultPath)
90+ }
6891
69- private fun initEngine () {
70-
71- engine.initialize(true ) // timestamp enabled
72- engineInitialized = true
92+ engine = CSPARQLEngine (0 , ec)
93+ engineInitialized = true
94+ }
7395 }
7496
75- public fun getQueryResults (name : LiteralExpr ): RDFTable ? {
97+ public fun getQueryResults (name : LiteralExpr ): Table ? {
7698 return queryResults[name]
7799 }
78100
79101 public fun registerStream (className : String , obj : LiteralExpr ) {
80- if ( ! engineInitialized) initEngine ()
102+ initEngineIfNeeded ()
81103 val streamIri = " ${settings.runPrefix}${obj.toString()} "
82- val stream = RdfStream (streamIri)
83- engine.registerStream(stream)
104+
105+ val stream = StreamObject (streamIri)
106+ val reg = engine!! .register(stream)
107+ stream.setWritable(reg)
108+
84109 if (! streams.containsKey(className)) streams[className] = mutableMapOf ()
85110 streams[className]!! [obj] = stream
86111 }
@@ -89,13 +114,10 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
89114 var ms: Long
90115 if (clockTimestampSec != null ) {
91116 ms = secToMs(clockTimestampSec!! .toLong())
92- if (! firstTsPassed) ms - = 1 // workaround: first timestamp is decreased by 1 ms
93117 } else {
94118 ms = System .currentTimeMillis()
95119 }
96120
97- if (ms > lastTimestamp && ! firstTsPassed) firstTsPassed = true
98-
99121 lastTimestamp = ms
100122 return ms
101123 }
@@ -115,72 +137,72 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
115137 val res = interpreter.eval(expr, stackEntry)
116138
117139 val predIri = " ${settings.progPrefix}${className} _${expr.toString().removePrefix(" this." ).replace(' .' , ' _' )} "
118- val objIri = literalToIri(res)
119140
120- val quad = RdfQuadruple (subjIri, predIri, objIri, timestamp)
121- // println(quad.toString())
122- stream.put(quad)
123- }
124- }
141+ val m = ModelFactory .createDefaultModel()
142+ val stmt = m.createStatement(m.createResource(subjIri), m.createProperty(predIri), literalToIri(m, res, settings))
143+ m.add(stmt)
125144
126- private fun literalToIri (lit : LiteralExpr ): String {
127- if (lit.tag == INTTYPE ) return " \" ${lit.literal} \" ^^http://www.w3.org/2001/XMLSchema#integer"
128- if (lit.tag == BOOLEANTYPE ) return " \" ${lit.literal} \" ^^http://www.w3.org/2001/XMLSchema#boolean"
129- if (lit.tag == DOUBLETYPE ) return " \" ${lit.literal} \" ^^http://www.w3.org/2001/XMLSchema#double"
130- if (lit.tag == STRINGTYPE ) return " \" ${lit.literal} \" ^^http://www.w3.org/2001/XMLSchema#string"
131- return " ${settings.runPrefix}${lit.literal} "
145+ stream.putGraph(m.getGraph(), timestamp)
146+ }
132147 }
133148
134- public fun addMonitor (name : LiteralExpr , monitor : MonitorObject ) {
135- monitors[name] = monitor
149+ private fun literalToIri (m : Model , lit : LiteralExpr , settings : Settings ): RDFNode =
150+ when (lit.tag) {
151+ INTTYPE -> m.createTypedLiteral(lit.literal.toInt())
152+ DOUBLETYPE -> m.createTypedLiteral(lit.literal.toDouble())
153+ BOOLEANTYPE -> m.createTypedLiteral(lit.literal.toBoolean())
154+ STRINGTYPE -> m.createLiteral(lit.literal)
155+ else -> m.createResource(" ${settings.runPrefix}${lit.literal} " )
136156 }
137157
138158 public fun getMonitor (name : LiteralExpr ): MonitorObject ? {
139159 return monitors[name]
140160 }
141161
142- public fun registerQuery (name : LiteralExpr , queryExpr : Expression , params : List <Expression >, stackMemory : Memory , heap : GlobalMemory , obj : LiteralExpr , SPARQL : Boolean = true): CsparqlQueryResultProxy {
143- if ( ! engineInitialized) initEngine ()
162+ public fun registerQuery (name : LiteralExpr , queryExpr : Expression , params : List <Expression >, stackMemory : Memory , heap : GlobalMemory , obj : LiteralExpr , SPARQL : Boolean = true, declaredType : Type ): JenaContinuousQueryExecution {
163+ initEngineIfNeeded ()
144164 val queryStr = prepareQuery(name, queryExpr, params, stackMemory, heap, obj, SPARQL )
145- if (settings.verbose) println (" Registering query:\n $queryStr " )
146- var resultProxy = engine.registerQuery(queryStr, false ) // reasoning disabled
147- resultProxy.addObserver(ResultPusher (name, queryResults)) // each key is only used by one observer
148- return resultProxy
165+
166+ val cqe = engine!! .register(queryStr, sdsConfig) as JenaContinuousQueryExecution
167+ monitors[name] = MonitorObject (name, declaredType)
168+
169+ val outputStream = cqe.outstream()
170+ outputStream?.addConsumer { arg, ts ->
171+ val results: Table = arg as Table
172+ queryResults[name] = results
173+ }
174+
175+ return cqe
149176 }
150177
151- private fun prepareQuery (name : LiteralExpr , queryExpr : Expression , params : List <Expression >, stackMemory : Memory , heap : GlobalMemory , obj : LiteralExpr , SPARQL : Boolean = true) : String {
152- val queryHeader = " REGISTER QUERY Query${name.literal} AS "
178+ private fun prepareQuery (name : LiteralExpr , queryExpr : Expression , params : List <Expression >,
179+ stackMemory : Memory , heap : GlobalMemory , obj : LiteralExpr , SPARQL : Boolean = true) : String {
180+
181+ var prefixes = " "
182+ for ((key, value) in settings.prefixMap()) prefixes + = " PREFIX $key : <$value >\n "
183+
184+ val queryHeader = " REGISTER RSTREAM <${settings.runPrefix}${name.literal} > AS "
153185
154186 val queryBody = interpreter!! .prepareQuery(queryExpr, params, stackMemory, heap, obj, SPARQL )
155187 .removePrefix(" \" " ).removeSuffix(" \" " )
156188
157- var queryWithPrefixes = queryHeader
158- // for ((key, value) in settings.prefixMap()) queryWithPrefixes += "PREFIX $key: <$value>\n"
159- queryWithPrefixes + = queryBody
189+ var queryWithPrefixes = prefixes + queryHeader + queryBody
160190 queryWithPrefixes = queryWithPrefixes.replace(" \\\" " , " \" " )
161191
162- // Replace occurrences of value:x with <keyx> for each prefix in prefixMap
163- for ((key, value) in settings.prefixMap()) {
164- // Regex to match key: followed by a valid identifier (e.g., obj4)
165- val regex = Regex (""" ${Regex .escape(key)} :([A-Za-z0-9_]+)""" )
166- queryWithPrefixes = queryWithPrefixes.replace(regex) { matchResult ->
167- " <$value${matchResult.groupValues[1 ]} >"
168- }
169- }
170-
171192 return queryWithPrefixes
172193 }
173194
174- public fun putStaticNamedGraph (iri : String , model : Model ) {
175- if (! engineInitialized) initEngine()
195+ // todo implement
196+ // public fun putStaticNamedGraph(iri: String, model: Model) {
197+ // if (!engineInitialized) initEngine()
176198
177- // serialize the model (RDF/XML matches the engine's first attempt)
178- val sw = StringWriter ()
179- model.write(sw, " RDF/XML" )
199+ // // serialize the model (RDF/XML matches the engine's first attempt)
200+ // val sw = StringWriter()
201+ // model.write(sw, "RDF/XML")
180202
181- // hand it to the C-SPARQL engine
182- engine.putStaticNamedModel(iri, sw.toString())
183- }
203+ // // hand it to the C-SPARQL engine
204+ // engine.putStaticNamedModel(iri, sw.toString())
205+ // }
184206
185207 public fun getStaticNamedIri (): String {
186208 val s = " ${settings.runPrefix} loadStatic${nStaticGraphsPushed} "
@@ -189,4 +211,18 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
189211 }
190212
191213}
214+
215+ class StreamObject (var iri : String ) : DataStreamImpl<Graph>(iri) {
216+
217+ private var s: DataStream <Graph >? = null
218+
219+ fun setWritable (s : DataStream <Graph >) {
220+ this .s = s
221+ }
222+
223+ fun putGraph (m : Graph , t : Long ) {
224+ if (s == null ) throw Exception (" Stream $iri is not writable" )
225+ s!! .put(m, t)
226+ }
227+ }
192228
0 commit comments