Skip to content

Commit 44041f2

Browse files
committed
multiple monitors enabled
1 parent 5c527e7 commit 44041f2

File tree

2 files changed

+130
-9
lines changed

2 files changed

+130
-9
lines changed

src/main/kotlin/no/uio/microobject/data/StreamManager.kt

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
6161
var lastTimestamp: Long = 0
6262
var lastWindowTs: Long? = null
6363

64-
var lastTriggerTs: Long? = null
65-
var lastModel: Model? = null
64+
// monitor state
65+
var lastTriggerTs: MutableMap<LiteralExpr, Long> = mutableMapOf()
66+
var lastModels: MutableMap<LiteralExpr, Model> = mutableMapOf()
6667

6768
var nStaticGraphsPushed = 0
6869

@@ -139,12 +140,12 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
139140
val expressions = interpreter!!.staticInfo.streamersTable[className]!![methodName]!!
140141

141142
val timestamp = getTimestamp()
142-
if (lastTriggerTs == null || timestamp > lastTriggerTs!!) {
143+
if (!lastTriggerTs.containsKey(obj) || timestamp > lastTriggerTs[obj]!!) {
143144
// put model only if time has advanced (guarantee one graph per timestamp)
144-
if (lastTriggerTs != null)
145-
stream.putGraph(lastModel!!.getGraph(), lastTriggerTs!!)
146-
lastModel = ModelFactory.createDefaultModel()
147-
lastTriggerTs = timestamp
145+
if (lastTriggerTs.containsKey(obj))
146+
stream.putGraph(lastModels[obj]!!.getGraph(), lastTriggerTs[obj]!!)
147+
lastModels[obj] = ModelFactory.createDefaultModel()
148+
lastTriggerTs[obj] = timestamp
148149
}
149150

150151
for (expr in expressions) {
@@ -167,8 +168,8 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
167168
predIri = "${settings.progPrefix}${(currentObj.tag as BaseType).name}_${exprParts.last()}"
168169
}
169170

170-
val stmt = lastModel!!.createStatement(lastModel!!.createResource(subjIri), lastModel!!.createProperty(predIri), literalToIri(lastModel!!, res, settings))
171-
lastModel!!.add(stmt)
171+
val stmt = lastModels[obj]!!.createStatement(lastModels[obj]!!.createResource(subjIri), lastModels[obj]!!.createProperty(predIri), literalToIri(lastModels[obj]!!, res, settings))
172+
lastModels[obj]!!.add(stmt)
172173
}
173174
}
174175

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// reada src/test/resources/csparql/14_multiple_monitors.smol
2+
3+
class Observation(Int x, Int ts) end
4+
5+
class Controller(List<C> streamers)
6+
7+
Unit register()
8+
List<C> objs = this.streamers;
9+
while objs != null do
10+
C o = objs.content;
11+
o.register();
12+
objs = objs.next;
13+
end
14+
end
15+
16+
Unit doStream(Int ts)
17+
List<C> objs = this.streamers;
18+
while objs != null do
19+
C o = objs.content;
20+
o.doStream(ts);
21+
objs = objs.next;
22+
end
23+
end
24+
25+
String windowToString()
26+
String s = "";
27+
List<C> objs = this.streamers;
28+
while objs != null do
29+
C o = objs.content;
30+
String res = o.windowToString();
31+
s = s ++ o ++ " ( " ++ res ++ ") ";
32+
objs = objs.next;
33+
end
34+
return s;
35+
end
36+
end
37+
38+
streamer class C(Int x, Observation obs, Monitor<Int> m)
39+
40+
Unit register()
41+
Monitor<Int> m = monitor("
42+
SELECT ?x
43+
FROM NAMED WINDOW <win> ON %1 [RANGE PT4.001s STEP PT2s]
44+
WHERE {
45+
WINDOW ?w { ?s prog:Observation_x ?x. }
46+
}
47+
",
48+
this
49+
);
50+
this.m = m;
51+
end
52+
53+
Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts)
54+
this.obs = new Observation(this.x, ts);
55+
this.x = this.x + 1;
56+
end
57+
58+
59+
String windowToString()
60+
String s = "";
61+
List<Int> l = window(this.m);
62+
while l != null do
63+
s = s ++ intToString(l.content) ++ " ";
64+
l = l.next;
65+
end
66+
return s;
67+
end
68+
69+
end
70+
71+
main
72+
// configure simulation
73+
clock Int i = 100;
74+
Int endAt = 200;
75+
76+
// initialize stream
77+
C o1 = new C(0, null, null);
78+
C o2 = new C(10, null, null);
79+
C o3 = new C(20, null, null);
80+
List<C> l3 = new List<C>(o3, null);
81+
List<C> l2 = new List<C>(o2, l3);
82+
List<C> streamers = new List<C>(o1, l2);
83+
Controller ctrl = new Controller(streamers);
84+
85+
// initialize monitor
86+
ctrl.register();
87+
88+
// run simulation
89+
while i < endAt do
90+
91+
// put triples in the stream
92+
ctrl.doStream(i);
93+
94+
// read window contents
95+
String res = ctrl.windowToString();
96+
print(">>" ++ intToString(i) ++ ": " ++ res);
97+
98+
// advance clock (timestamp)
99+
i = i + 1;
100+
end
101+
end
102+
103+
// Output:
104+
// >>100: obj7 ( ) obj8 ( ) obj9 ( )
105+
// >>101: obj7 ( ) obj8 ( ) obj9 ( )
106+
// >>102: obj7 ( ) obj8 ( ) obj9 ( )
107+
// >>103: obj7 ( 0 1 ) obj8 ( 10 11 ) obj9 ( 20 21 )
108+
// >>104: obj7 ( 0 1 ) obj8 ( 10 11 ) obj9 ( 20 21 )
109+
// >>105: obj7 ( 0 1 2 3 ) obj8 ( 10 11 12 13 ) obj9 ( 20 21 22 23 )
110+
// >>106: obj7 ( 0 1 2 3 ) obj8 ( 10 11 12 13 ) obj9 ( 20 21 22 23 )
111+
// >>107: obj7 ( 2 3 4 5 ) obj8 ( 12 13 14 15 ) obj9 ( 22 23 24 25 )
112+
// >>108: obj7 ( 2 3 4 5 ) obj8 ( 12 13 14 15 ) obj9 ( 22 23 24 25 )
113+
// >>109: obj7 ( 4 5 6 7 ) obj8 ( 14 15 16 17 ) obj9 ( 24 25 26 27 )
114+
// >>110: obj7 ( 4 5 6 7 ) obj8 ( 14 15 16 17 ) obj9 ( 24 25 26 27 )
115+
// >>111: obj7 ( 6 7 8 9 ) obj8 ( 16 17 18 19 ) obj9 ( 26 27 28 29 )
116+
// >>112: obj7 ( 6 7 8 9 ) obj8 ( 16 17 18 19 ) obj9 ( 26 27 28 29 )
117+
// >>113: obj7 ( 8 9 10 11 ) obj8 ( 18 19 20 21 ) obj9 ( 28 29 30 31 )
118+
119+
// Notes:
120+
// - Multiple monitors on multiple streams are supported

0 commit comments

Comments
 (0)