Skip to content

Commit 36c2be5

Browse files
committed
point 2 point tests work even with corrections in era counting after assimilation
1 parent 3d5eaf8 commit 36c2be5

File tree

4 files changed

+54
-40
lines changed

4 files changed

+54
-40
lines changed

src/net/sharksystem/asap/ASAPEngine.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,9 @@ private String getLogStart() {
196196
StringBuilder b = new StringBuilder();
197197
b.append("ASAPEngine (");
198198
b.append(this.owner);
199-
b.append(") ");
199+
b.append(", era: ");
200+
b.append(this.era);
201+
b.append("): ");
200202

201203
return b.toString();
202204
}
@@ -269,6 +271,8 @@ public void handleASAPAssimilate(ASAP_AssimilationPDU_1_0 asapAssimiliationPDU,
269271
System.out.println(b.toString());
270272
//>>>>>>>>>>>>>>>>>>>debug
271273

274+
boolean changed = false;
275+
272276
try {
273277
// read URI
274278
String uri = asapAssimiliationPDU.getChannel();
@@ -315,7 +319,7 @@ public void handleASAPAssimilate(ASAP_AssimilationPDU_1_0 asapAssimiliationPDU,
315319
System.out.println(b.toString());
316320
//>>>>>>>>>>>>>>>>>>>debug
317321
chunk.addMessage(protocolInputStream, nextOffset - offset);
318-
322+
if(!changed) { changed = true; this.contentChanged();}
319323
offset = nextOffset;
320324
}
321325

@@ -331,6 +335,7 @@ public void handleASAPAssimilate(ASAP_AssimilationPDU_1_0 asapAssimiliationPDU,
331335
//>>>>>>>>>>>>>>>>>>>debug
332336

333337
chunk.addMessage(protocolInputStream, asapAssimiliationPDU.getLength() - offset);
338+
if(!changed) { changed = true; this.contentChanged();}
334339

335340
// read all messages
336341
if(listener != null) {
@@ -385,21 +390,27 @@ public void handleASAPInterest(ASAP_Interest_PDU_1_0 asapInterest, ASAP_1_0 prot
385390

386391
// check conflict
387392
if(!this.permission2ProceedConversation(peer)) {
388-
b = Log.startLog(this);
393+
b = new StringBuilder();
394+
b.append(this.getLogStart());
389395
b.append("no permission to communicate with remote peer: ");
390396
b.append(peer);
391397
System.err.println(b.toString());
392398
throw new ASAPException("no permission to communicate with remote peer: " + peer);
399+
} else {
400+
System.out.println(this.getLogStart() + "permission ok, process interest");
393401
}
394402

395403
// era we are about to transmit
396404
int workingEra = this.getEraStartSync(peer);
405+
System.out.println(this.getLogStart() + "last_seen: " + workingEra + " | era: " + this.era);
397406

398407
if(workingEra == this.era) {
399408
// nothing todo
400-
b = Log.startLog(this);
401-
b.append("there are not information before that era\n");
409+
b = new StringBuilder();
410+
b.append(this.getLogStart());
411+
b.append("there are no information before that era; ");
402412
b.append("we only deliver information from previous eras - nothing todo here.");
413+
System.out.println(b.toString());
403414
return;
404415
}
405416

@@ -662,7 +673,8 @@ private boolean isPublic(ASAPChunk chunk) {
662673

663674
@Override
664675
public void newEra() {
665-
StringBuilder sb = Log.startLog(this);
676+
StringBuilder sb = new StringBuilder();
677+
sb.append(this.getLogStart());
666678
sb.append("newEra() | owner: ");
667679
sb.append(this.owner);
668680
sb.append(" | format: ");

src/net/sharksystem/asap/protocol/ASAPPersistentConnection.java

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ public void finished(Thread t) {
105105

106106
private void terminate(String message, Exception e) {
107107
// write log
108-
StringBuilder sb = this.startLog();
108+
StringBuilder sb = new StringBuilder();
109+
sb.append(this.getLogStart());
109110
sb.append(message);
110111
if(e != null) {
111112
sb.append(e.getLocalizedMessage());
@@ -122,7 +123,8 @@ private void sendOnlineMessages() throws IOException {
122123
this.onlineMessageSources = new ArrayList<>();
123124
while(!copy.isEmpty()) {
124125
ASAPOnlineMessageSource asapOnline = copy.remove(0);
125-
StringBuilder sb = this.startLog();
126+
StringBuilder sb = new StringBuilder();
127+
sb.append(this.getLogStart());
126128
sb.append("going to send online message");
127129
System.out.println(sb.toString());
128130
asapOnline.sendMessages(this, this.os);
@@ -134,9 +136,9 @@ private class OnlineMessageSenderThread extends Thread {
134136
public void run() {
135137
try {
136138
// get exclusive access to streams
137-
System.out.println(startLog() + "online sender is going to wait for stream access");
139+
System.out.println(getLogStart() + "online sender is going to wait for stream access");
138140
wait4ExclusiveStreamsAccess();
139-
System.out.println(startLog() + "online sender got stream access");
141+
System.out.println(getLogStart() + "online sender got stream access");
140142
sendOnlineMessages();
141143
// prepare a graceful death
142144
onlineMessageSenderThread = null;
@@ -146,7 +148,7 @@ public void run() {
146148
terminate("could not write data into stream", e);
147149
}
148150
finally {
149-
System.out.println(startLog() + "online sender releases lock");
151+
System.out.println(getLogStart() + "online sender releases lock");
150152
releaseStreamsLock();
151153
}
152154
}
@@ -201,10 +203,10 @@ public void run() {
201203
while (!this.terminated) {
202204
this.pduReader = new ASAPPDUReader(protocol, is, this);
203205
try {
204-
System.out.println(this.startLog() + "start reading");
206+
System.out.println(this.getLogStart() + "start reading");
205207
this.runObservedThread(pduReader, this.maxExecutionTime);
206208
} catch (ASAPExecTimeExceededException e) {
207-
System.out.println(this.startLog() + "reading on stream took longer than allowed");
209+
System.out.println(this.getLogStart() + "reading on stream took longer than allowed");
208210
}
209211

210212
System.out.println(this.getLogStart() + "back from reading");
@@ -236,19 +238,19 @@ public void run() {
236238
protocol,this);
237239

238240
// get exclusive access to streams
239-
System.out.println(this.startLog() + "asap pdu executor going to wait for stream access");
241+
System.out.println(this.getLogStart() + "asap pdu executor going to wait for stream access");
240242
this.wait4ExclusiveStreamsAccess();
241243
try {
242-
System.out.println(this.startLog() + "asap pdu executor got stream access - process pdu");
244+
System.out.println(this.getLogStart() + "asap pdu executor got stream access - process pdu");
243245
this.runObservedThread(executor, maxExecutionTime);
244246
} catch (ASAPExecTimeExceededException e) {
245-
System.out.println(this.startLog() + "asap pdu processing took longer than allowed");
247+
System.out.println(this.getLogStart() + "asap pdu processing took longer than allowed");
246248
this.terminate("asap pdu processing took longer than allowed", e);
247249
break;
248250
} finally {
249251
// wake waiting thread if any
250252
this.releaseStreamsLock();
251-
System.out.println(this.startLog() + "asap pdu executor release locks");
253+
System.out.println(this.getLogStart() + "asap pdu executor release locks");
252254
}
253255
} catch (ASAPException e) {
254256
System.out.println(this.getLogStart() + " problem when executing asap received pdu: " + e);
@@ -327,15 +329,6 @@ private void runObservedThread(Thread t, long maxExecutionTime) throws ASAPExecT
327329
throw new ASAPExecTimeExceededException(sb.toString());
328330
}
329331

330-
private StringBuilder startLog() {
331-
StringBuilder sb = net.sharksystem.asap.util.Log.startLog(this);
332-
sb.append(" recipient: ");
333-
sb.append(this.remotePeer);
334-
sb.append(" | ");
335-
336-
return sb;
337-
}
338-
339332
private class ASAPPDUExecutor extends Thread {
340333
private final ASAP_PDU_1_0 asapPDU;
341334
private final InputStream is;
@@ -355,6 +348,7 @@ public ASAPPDUExecutor(ASAP_PDU_1_0 asapPDU, InputStream is, OutputStream os,
355348
this.threadFinishedListener = threadFinishedListener;
356349

357350
StringBuilder sb = new StringBuilder();
351+
sb.append(getLogStart());
358352
sb.append("ASAPPDUExecutor created - ");
359353
sb.append("folder: " + engineSetting.folder + " | ");
360354
sb.append("engine: " + engineSetting.engine.getClass().getSimpleName() + " | ");
@@ -373,25 +367,26 @@ private void finish() {
373367

374368
public void run() {
375369
if(engineSetting.engine == null) {
376-
System.err.println("ASAPPDUExecutor called without engine set - fatal");
370+
System.err.println(getLogStart() + "ASAPPDUExecutor called without engine set - fatal");
377371
this.finish();
378372
return;
379373
}
380374

381-
System.out.println("ASAPPDUExecutor calls engine: " + engineSetting.engine.getClass().getSimpleName());
375+
System.out.println(getLogStart() + "ASAPPDUExecutor calls engine: "
376+
+ engineSetting.engine.getClass().getSimpleName());
382377

383378
try {
384379
switch (asapPDU.getCommand()) {
385380
case ASAP_1_0.INTEREST_CMD:
386-
System.out.println("ASAPPDUExecutor call handleASAPInterest");
381+
System.out.println(getLogStart() + "ASAPPDUExecutor call handleASAPInterest");
387382
engineSetting.engine.handleASAPInterest((ASAP_Interest_PDU_1_0) asapPDU, protocol, os);
388383
break;
389384
case ASAP_1_0.OFFER_CMD:
390-
System.out.println("ASAPPDUExecutor call handleASAPOffer");
385+
System.out.println(getLogStart() + "ASAPPDUExecutor call handleASAPOffer");
391386
engineSetting.engine.handleASAPOffer((ASAP_OfferPDU_1_0) asapPDU, protocol, os);
392387
break;
393388
case ASAP_1_0.ASSIMILATE_CMD:
394-
System.out.println("ASAPPDUExecutor call handleASAPAssimilate");
389+
System.out.println("getLogStart() + ASAPPDUExecutor call handleASAPAssimilate");
395390
engineSetting.engine.handleASAPAssimilate((ASAP_AssimilationPDU_1_0) asapPDU, protocol, is, os,
396391
engineSetting.listener);
397392
break;
@@ -445,7 +440,6 @@ ASAP_PDU_1_0 getASAPPDU() {
445440
public void run() {
446441
try {
447442
this.asapPDU = protocol.readPDU(is);
448-
// this.pduReaderListener.finished(this);
449443
} catch (IOException e) {
450444
this.ioException = e;
451445
} catch (ASAPException e) {

test/net/sharksystem/asap/MultihopTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package net.sharksystem.asap;
22

33
import net.sharksystem.cmdline.CmdLineUI;
4+
import org.junit.Assert;
45
import org.junit.Test;
56

67
import java.io.IOException;
@@ -28,7 +29,8 @@ public void twoHops() throws IOException, ASAPException, InterruptedException {
2829
ui.doSetSendReceivedMessage("Clara:twoHops on");
2930

3031
// add message to alice storage
31-
ui.doCreateASAPMessage("Alice twoHops abcChat HiClara");
32+
String messageAlice2Clara = "Alice twoHops abcChat HiClara";
33+
ui.doCreateASAPMessage(messageAlice2Clara);
3234

3335
System.out.println("**************************************************************************");
3436
System.out.println("** connect Alice with Bob **");
@@ -48,6 +50,9 @@ public void twoHops() throws IOException, ASAPException, InterruptedException {
4850
// kill connections
4951
ui.doKill("all");
5052

53+
// wait a moment
54+
Thread.sleep(1000);
55+
5156
System.out.println("**************************************************************************");
5257
System.out.println("** connect Bob with Clara **");
5358
System.out.println("**************************************************************************");
@@ -66,6 +71,7 @@ public void twoHops() throws IOException, ASAPException, InterruptedException {
6671
ASAPStorage clara = ui.getStorage("Clara:twoHops");
6772
ASAPChunkStorage claraBob = clara.getIncomingChunkStorage("Bob");
6873
ASAPChunk claraABCChat = claraBob.getChunk("abcChat", clara.getEra());
69-
74+
CharSequence message = claraABCChat.getMessages().next();
75+
Assert.assertTrue(message.toString().equalsIgnoreCase(messageAlice2Clara));
7076
}
7177
}

test/net/sharksystem/asap/Point2PointTests.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public void openMessageExchange() throws IOException, ASAPException, Interrupted
168168

169169
// simulate a sync
170170
bobStorage = ASAPEngineFS.getASAPStorage(BOB, BOB_APP_FOLDER, CHAT_FORMAT);
171-
Assert.assertEquals(1, bobStorage.getEra());
171+
Assert.assertEquals(2, bobStorage.getEra());
172172
}
173173

174174
@Test
@@ -303,7 +303,7 @@ public void notOpenMessageChunkExchange() throws IOException, ASAPException, Int
303303

304304
// simulate a sync
305305
bobStorage = ASAPEngineFS.getASAPStorage(BOB, BOB_APP_FOLDER, CHAT_FORMAT);
306-
Assert.assertEquals(1, bobStorage.getEra());
306+
Assert.assertEquals(2, bobStorage.getEra());
307307
}
308308

309309
@Test
@@ -316,21 +316,23 @@ public void usageWithImmediateSync() throws IOException, ASAPException, Interrup
316316
ASAPEngineFS.removeFolder(BOB_ROOT_FOLDER); // clean previous version before
317317

318318
// alice writes a message into chunkStorage
319-
ASAPStorage aliceStorage =
320-
ASAPEngineFS.getASAPStorage(ALICE, ALICE_APP_FOLDER, CHAT_FORMAT);
319+
ASAPStorage aliceStorage = ASAPEngineFS.getASAPStorage(ALICE, ALICE_APP_FOLDER, CHAT_FORMAT);
321320

322321
int aliceInitialEra = aliceStorage.getEra();
323322

324323
aliceStorage.addRecipient(ALICE_BOB_CHAT_URL, BOB);
324+
325+
// content changed - next change in topology should increase alice era.
325326
aliceStorage.add(ALICE_BOB_CHAT_URL, ALICE2BOB_MESSAGE);
326327

327328
// bob does the same
328-
ASAPStorage bobStorage =
329-
ASAPEngineFS.getASAPStorage(BOB, BOB_APP_FOLDER, CHAT_FORMAT);
329+
ASAPStorage bobStorage = ASAPEngineFS.getASAPStorage(BOB, BOB_APP_FOLDER, CHAT_FORMAT);
330330

331331
int bobInitialEra = bobStorage.getEra();
332332

333333
bobStorage.addRecipient(ALICE_BOB_CHAT_URL, ALICE);
334+
335+
// content changed - next change in topology should increase bob era.
334336
bobStorage.add(ALICE_BOB_CHAT_URL, BOB2ALICE_MESSAGE);
335337

336338
///////////////////////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)