@@ -38,6 +38,9 @@ import org.streamreasoning.rsp4j.api.sds.SDSConfiguration
3838import org.apache.jena.rdf.model.*;
3939import org.apache.jena.atlas.lib.tuple.Tuple
4040
41+ // Per-monitor state
42+ data class WindowBuf (var tick : Long? = null , val current : MutableList <Table > = mutableListOf())
43+
4144// Class managing streams
4245class StreamManager (private val settings : Settings , val staticTable : StaticTable , private val interpreter : Interpreter ? ) {
4346
@@ -50,12 +53,14 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
5053 private var streams: MutableMap <String , MutableMap <LiteralExpr , StreamObject >> = mutableMapOf ()
5154 private var monitors: MutableMap <LiteralExpr , MonitorObject > = mutableMapOf ()
5255
53- private var queryResults: MutableMap <LiteralExpr , Table ?> = ConcurrentHashMap ()
56+ private val windowBufs = ConcurrentHashMap <LiteralExpr , WindowBuf >()
57+ private val queryResults = ConcurrentHashMap <LiteralExpr , List <Table >>() // read-only snapshots
5458
5559 var clockVar : String? = null
5660 var clockTimestampSec : String? = null
5761 var lastTimestamp: Long = 0
58-
62+ var lastWindowTs: Long? = null
63+
5964 var nStaticGraphsPushed = 0
6065
6166 init {
@@ -89,12 +94,12 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
8994 sdsConfig = SDSConfiguration (defaultPath)
9095 }
9196
92- engine = CSPARQLEngine (0 , ec)
97+ engine = CSPARQLEngine (ts , ec)
9398 engineInitialized = true
9499 }
95100 }
96101
97- public fun getQueryResults (name : LiteralExpr ): Table ? {
102+ public fun getQueryResults (name : LiteralExpr ): List < Table > ? {
98103 return queryResults[name]
99104 }
100105
@@ -130,13 +135,27 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
130135 val stream = streams[className]!! [obj]!!
131136 val expressions = interpreter!! .staticInfo.streamersTable[className]!! [methodName]!!
132137
133- val subjIri = " ${settings.runPrefix}${obj.literal} "
134138 val timestamp = getTimestamp()
135139
136140 for (expr in expressions) {
137141 val res = interpreter.eval(expr, stackEntry)
138142
139- val predIri = " ${settings.progPrefix}${className} _${expr.toString().removePrefix(" this." ).replace(' .' , ' _' )} "
143+ val exprParts = expr.toString().split(' .' )
144+ var subjIri: String = " ${settings.runPrefix}${obj.literal} "
145+ var predIri = " ${settings.progPrefix}${className} _${expr.toString().removePrefix(" this." ).replace(' .' , ' _' )} "
146+ if (exprParts.size < 2 ) {
147+ throw Exception (" Streamer expression must be of the form this.field or this.obj.field" )
148+ }
149+ if (exprParts.size > 2 ) {
150+ // find the one before last part
151+ var currentObj: LiteralExpr = obj
152+ for (i in 1 until exprParts.size - 1 ) {
153+ val fieldName = exprParts[i]
154+ currentObj = interpreter.heap[currentObj]!! [fieldName]!!
155+ }
156+ subjIri = " ${settings.runPrefix}${currentObj.literal} "
157+ predIri = " ${settings.progPrefix}${(currentObj.tag as BaseType ).name} _${exprParts.last()} "
158+ }
140159
141160 val m = ModelFactory .createDefaultModel()
142161 val stmt = m.createStatement(m.createResource(subjIri), m.createProperty(predIri), literalToIri(m, res, settings))
@@ -162,14 +181,25 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
162181 public fun registerQuery (name : LiteralExpr , queryExpr : Expression , params : List <Expression >, stackMemory : Memory , heap : GlobalMemory , obj : LiteralExpr , SPARQL : Boolean = true, declaredType : Type ): JenaContinuousQueryExecution {
163182 initEngineIfNeeded()
164183 val queryStr = prepareQuery(name, queryExpr, params, stackMemory, heap, obj, SPARQL )
184+ if (settings.verbose) println (" Registering query:\n $queryStr " )
165185
166186 val cqe = engine!! .register(queryStr, sdsConfig) as JenaContinuousQueryExecution
167187 monitors[name] = MonitorObject (name, declaredType)
168188
189+ windowBufs.computeIfAbsent(name) { WindowBuf () }
190+
169191 val outputStream = cqe.outstream()
170- outputStream?.addConsumer { arg, ts ->
171- val results: Table = arg as Table
172- queryResults[name] = results
192+ outputStream?.addConsumer { arg, ts ->
193+ val table = arg as Table
194+ val b = windowBufs.getOrPut(name) { WindowBuf () }
195+
196+ if (b.tick != ts) {
197+ b.tick = ts
198+ b.current.clear()
199+ }
200+ b.current + = table
201+
202+ queryResults[name] = b.current.toList()
173203 }
174204
175205 return cqe
@@ -181,7 +211,7 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
181211 var prefixes = " "
182212 for ((key, value) in settings.prefixMap()) prefixes + = " PREFIX $key : <$value >\n "
183213
184- val queryHeader = " REGISTER RSTREAM <${settings.runPrefix}${name.literal} > AS "
214+ val queryHeader = " REGISTER RSTREAM <${settings.runPrefix}${name.literal} /out > AS "
185215
186216 val queryBody = interpreter!! .prepareQuery(queryExpr, params, stackMemory, heap, obj, SPARQL )
187217 .removePrefix(" \" " ).removeSuffix(" \" " )
0 commit comments