Skip to content

Commit 5c527e7

Browse files
committed
multiple streams enabled (aligned with csparql-int branch)
1 parent a20f647 commit 5c527e7

File tree

9 files changed

+275
-109
lines changed

9 files changed

+275
-109
lines changed

src/main/kotlin/no/uio/microobject/data/StreamManager.kt

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
6161
var lastTimestamp: Long = 0
6262
var lastWindowTs: Long? = null
6363

64+
var lastTriggerTs: Long? = null
65+
var lastModel: Model? = null
66+
6467
var nStaticGraphsPushed = 0
6568

6669
init {
@@ -136,6 +139,13 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
136139
val expressions = interpreter!!.staticInfo.streamersTable[className]!![methodName]!!
137140

138141
val timestamp = getTimestamp()
142+
if (lastTriggerTs == null || timestamp > lastTriggerTs!!) {
143+
// put model only if time has advanced (guarantee one graph per timestamp)
144+
if (lastTriggerTs != null)
145+
stream.putGraph(lastModel!!.getGraph(), lastTriggerTs!!)
146+
lastModel = ModelFactory.createDefaultModel()
147+
lastTriggerTs = timestamp
148+
}
139149

140150
for (expr in expressions) {
141151
val res = interpreter.eval(expr, stackEntry)
@@ -157,11 +167,8 @@ class StreamManager(private val settings: Settings, val staticTable: StaticTable
157167
predIri = "${settings.progPrefix}${(currentObj.tag as BaseType).name}_${exprParts.last()}"
158168
}
159169

160-
val m = ModelFactory.createDefaultModel()
161-
val stmt = m.createStatement(m.createResource(subjIri), m.createProperty(predIri), literalToIri(m, res, settings))
162-
m.add(stmt)
163-
164-
stream.putGraph(m.getGraph(), timestamp)
170+
val stmt = lastModel!!.createStatement(lastModel!!.createResource(subjIri), lastModel!!.createProperty(predIri), literalToIri(lastModel!!, res, settings))
171+
lastModel!!.add(stmt)
165172
}
166173
}
167174

src/test/resources/csparql/03_logical_window_step.smol renamed to src/test/resources/csparql/03_sum.smol

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
1-
// reada src/test/resources/csparql/03_logical_window_step.smol
1+
// reada src/test/resources/csparql/03_sum.smol
22

3-
streamer class C(Int x, Monitor<Int> m)
3+
class Observation(Int x, Int ts) end
4+
5+
streamer class C(Int x, Observation obs, Monitor<Int> m)
46

57
Unit register()
68
Monitor<Int> m = monitor("
79
SELECT (SUM(?x) AS ?sumX)
8-
FROM NAMED WINDOW <win> ON %1 [RANGE PT6s STEP PT2s]
10+
FROM NAMED WINDOW <win> ON %1 [RANGE PT5.001s STEP PT2s]
911
WHERE {
10-
WINDOW ?w { ?s prog:C_x ?x }
12+
WINDOW ?w { ?s prog:Observation_x ?x }
1113
}
1214
", this);
1315
this.m = m;
1416
end
1517

16-
Unit doStream() emits(this.x)
18+
Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts)
1719
this.x = this.x + 1;
20+
this.obs = new Observation(this.x, ts);
1821
end
1922

2023
String windowToString()
@@ -34,7 +37,7 @@ main
3437
Int endAt = 200;
3538

3639
// initialize stream
37-
C o = new C(0, null);
40+
C o = new C(0, null, null);
3841

3942
// initialize monitor
4043
o.register();
@@ -43,7 +46,7 @@ main
4346
while i < endAt do
4447

4548
// put triples in the stream
46-
o.doStream();
49+
o.doStream(i);
4750

4851
// read window contents
4952
String res = o.windowToString();
@@ -69,6 +72,4 @@ end
6972
// ..
7073

7174
// Notes:
72-
// - Step 2 seconds = new window every 2 seconds
73-
// - Range 6s: Windows have {2, 4, 5, 5, 5, ...} elements
74-
75+
// - SUM aggregation works as expected

src/test/resources/csparql/05_count.smol

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
1-
streamer class C(Int x, Monitor<Int> m)
1+
// reada src/test/resources/csparql/05_count.smol
2+
3+
class Observation(Int x, Int ts) end
4+
5+
streamer class C(Int x, Observation obs, Monitor<Int> m)
26

37
Unit register()
48
Monitor<Int> m = monitor("
5-
SELECT (COUNT(?x) as ?countX)
6-
FROM STREAM %1 [RANGE 5s STEP 2s]
7-
WHERE { ?s prog:C_x ?x }
9+
SELECT (COUNT(?x) AS ?countX)
10+
FROM NAMED WINDOW <win> ON %1 [RANGE PT5.001s STEP PT2s]
11+
WHERE {
12+
WINDOW ?w { ?s prog:Observation_x ?x }
13+
}
814
", this);
915
this.m = m;
1016
end
1117

12-
Unit doStream() emits(this.x)
18+
Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts)
1319
this.x = this.x + 1;
20+
this.obs = new Observation(this.x, ts);
1421
end
1522

1623
String windowToString()
@@ -25,16 +32,27 @@ streamer class C(Int x, Monitor<Int> m)
2532
end
2633

2734
main
28-
C o = new C(0, null);
29-
o.register();
30-
35+
// configure simulation
3136
clock Int i = 100;
3237
Int endAt = 200;
3338

39+
// initialize stream
40+
C o = new C(0, null, null);
41+
42+
// initialize monitor
43+
o.register();
44+
45+
// run simulation
3446
while i < endAt do
35-
o.doStream();
47+
48+
// put triples in the stream
49+
o.doStream(i);
50+
51+
// read window contents
3652
String res = o.windowToString();
3753
print(">>" ++ intToString(i) ++ ": " ++ res);
54+
55+
// advance clock (timestamp)
3856
i = i + 1;
3957
end
4058
end
@@ -50,3 +68,6 @@ end
5068
// >>107: 5
5169
// >>108: 5
5270
// ..
71+
72+
// Notes:
73+
// - SUM aggregation works as expected

src/test/resources/csparql/06_average.smol

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
1-
streamer class C(Int x, Monitor<Double> m)
1+
// reada src/test/resources/csparql/06_average.smol
2+
3+
class Observation(Int x, Int ts) end
4+
5+
streamer class C(Int x, Observation obs, Monitor<Double> m)
26

37
Unit register()
48
Monitor<Double> m = monitor("
5-
SELECT (AVG(?x) as ?avgX)
6-
FROM STREAM %1 [RANGE 5s STEP 2s]
7-
WHERE { ?s prog:C_x ?x }
9+
SELECT (AVG(?x) AS ?avgX)
10+
FROM NAMED WINDOW <win> ON %1 [RANGE PT5.001s STEP PT2s]
11+
WHERE {
12+
WINDOW ?w { ?s prog:Observation_x ?x }
13+
}
814
", this);
915
this.m = m;
1016
end
1117

12-
Unit doStream() emits(this.x)
18+
Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts)
1319
this.x = this.x + 1;
20+
this.obs = new Observation(this.x, ts);
1421
end
1522

1623
String windowToString()
@@ -25,16 +32,27 @@ streamer class C(Int x, Monitor<Double> m)
2532
end
2633

2734
main
28-
C o = new C(0, null);
29-
o.register();
30-
35+
// configure simulation
3136
clock Int i = 100;
3237
Int endAt = 200;
3338

39+
// initialize stream
40+
C o = new C(0, null, null);
41+
42+
// initialize monitor
43+
o.register();
44+
45+
// run simulation
3446
while i < endAt do
35-
o.doStream();
47+
48+
// put triples in the stream
49+
o.doStream(i);
50+
51+
// read window contents
3652
String res = o.windowToString();
3753
print(">>" ++ intToString(i) ++ ": " ++ res);
54+
55+
// advance clock (timestamp)
3856
i = i + 1;
3957
end
4058
end
@@ -54,4 +72,5 @@ end
5472
// ..
5573

5674
// Notes:
75+
// - AVG aggregation works as expected
5776
// - Average returns double values (not integer)
Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,28 @@
1-
streamer class C(Int x, Monitor<C> m)
1+
// reada src/test/resources/csparql/07_stream_object.smol
2+
3+
class Observation(Int x, Int ts) end
4+
5+
streamer class C(Int x, Observation obs, Monitor<Observation> m)
26

37
Unit register()
4-
Monitor<C> m = monitor("
8+
Monitor<Observation> m = monitor("
59
SELECT ?s
6-
FROM STREAM %1 [RANGE 6s STEP 4s]
7-
WHERE { ?s prog:C_x ?x }
10+
FROM NAMED WINDOW <win> ON %1 [RANGE PT5.001s STEP PT2s]
11+
WHERE {
12+
WINDOW ?w { ?s prog:Observation_x ?x }
13+
}
814
", this);
915
this.m = m;
1016
end
1117

12-
Unit doStream() emits(this, this.x)
18+
Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts)
1319
this.x = this.x + 1;
20+
this.obs = new Observation(this.x, ts);
1421
end
1522

1623
String windowToString()
1724
String s = "";
18-
List<C> l = window(this.m);
25+
List<Observation> l = window(this.m);
1926
while l != null do
2027
s = s ++ l.content ++ " ";
2128
l = l.next;
@@ -25,29 +32,43 @@ streamer class C(Int x, Monitor<C> m)
2532
end
2633

2734
main
28-
C o = new C(0, null);
29-
o.register();
30-
35+
// configure simulation
3136
clock Int i = 100;
3237
Int endAt = 200;
3338

39+
// initialize stream
40+
C o = new C(0, null, null);
41+
42+
// initialize monitor
43+
o.register();
44+
45+
// run simulation
3446
while i < endAt do
35-
o.doStream();
47+
48+
// put triples in the stream
49+
o.doStream(i);
50+
51+
// read window contents
3652
String res = o.windowToString();
3753
print(">>" ++ intToString(i) ++ ": " ++ res);
54+
55+
// advance clock (timestamp)
3856
i = i + 1;
3957
end
4058
end
4159

4260
// Output:
4361
// >>100:
4462
// >>101:
45-
// >>102:
46-
// >>103:
47-
// >>104: obj5 obj5 obj5 obj5
48-
// >>105: obj5 obj5 obj5 obj5
49-
// >>106: obj5 obj5 obj5 obj5
50-
// >>107: obj5 obj5 obj5 obj5
51-
// >>108: obj5 obj5 obj5 obj5 obj5 obj5
52-
// >>109: obj5 obj5 obj5 obj5 obj5 obj5
63+
// >>102: obj639 obj640
64+
// >>103: obj639 obj640
65+
// >>104: obj639 obj640 obj641 obj644
66+
// >>105: obj639 obj640 obj641 obj644
67+
// >>106: obj640 obj641 obj644 obj647 obj652
68+
// >>107: obj640 obj641 obj644 obj647 obj652
69+
// >>108: obj644 obj647 obj652 obj657 obj663
70+
// >>109: obj644 obj647 obj652 obj657 obj663
5371
// ..
72+
73+
// Notes:
74+
// - Objects can be used as stream elements

src/test/resources/csparql/08_use_window_results.smol

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
1-
streamer class C(Int x, Monitor<Int> m)
1+
// reada src/test/resources/csparql/08_use_window_results.smol
2+
3+
class Observation(Int x, Int ts) end
4+
5+
streamer class C(Int x, Observation obs, Monitor<Int> m)
26

37
Unit register()
48
Monitor<Int> m = monitor("
5-
SELECT (SUM(?x) as ?sumX)
6-
FROM STREAM %1 [RANGE 5s STEP 2s]
7-
WHERE { ?s prog:C_x ?x }
9+
SELECT (SUM(?x) AS ?sumX)
10+
FROM NAMED WINDOW <win> ON %1 [RANGE PT5.001s STEP PT2s]
11+
WHERE {
12+
WINDOW ?w { ?s prog:Observation_x ?x }
13+
}
814
", this);
915
this.m = m;
1016
end
1117

12-
Unit doStream() emits(this.x)
18+
Unit doStream(Int ts) emits(this.obs, this.obs.x, this.obs.ts)
1319
this.x = this.x + 1;
20+
this.obs = new Observation(this.x, ts);
1421
end
1522

1623
String windowToString()
@@ -28,16 +35,27 @@ streamer class C(Int x, Monitor<Int> m)
2835
end
2936

3037
main
31-
C o = new C(0, null);
32-
o.register();
33-
38+
// configure simulation
3439
clock Int i = 100;
3540
Int endAt = 200;
3641

42+
// initialize stream
43+
C o = new C(0, null, null);
44+
45+
// initialize monitor
46+
o.register();
47+
48+
// run simulation
3749
while i < endAt do
38-
o.doStream();
50+
51+
// put triples in the stream
52+
o.doStream(i);
53+
54+
// read window contents
3955
String res = o.windowToString();
4056
print(">>" ++ intToString(i) ++ ": " ++ res);
57+
58+
// advance clock (timestamp)
4159
i = i + 1;
4260
end
4361
end
@@ -56,6 +74,7 @@ end
5674
// >>110: 40 mult20
5775
// >>111: 40 mult20
5876
// >>112: 50
77+
// ..
5978

6079
// Notes:
6180
// - Shows how to use the window results (e.g., check if the sum is multiple of 20)

0 commit comments

Comments
 (0)