Skip to content

Commit b2eb3dc

Browse files
committed
transient message exchange works; no infinite routing
1 parent 6680eb9 commit b2eb3dc

File tree

3 files changed

+53
-44
lines changed

3 files changed

+53
-44
lines changed

src/main/java/net/sharksystem/asap/engine/ASAPEngine.java

Lines changed: 46 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -412,12 +412,18 @@ public void handleASAPAssimilate(ASAP_AssimilationPDU_1_0 asapAssimilationPDU, A
412412
MessagesContainer messagesContainer = null;
413413
ASAPInMemoTransientMessages transientMessages = null;
414414

415-
if(eraSender != ASAP.TRANSIENT_ERA) {
416-
incomingChunk = this.getIncomingChunk(asapAssimilationPDU);
417-
messagesContainer = incomingChunk;
418-
} else {
419-
transientMessages = new ASAPInMemoTransientMessages(asapAssimilationPDU);
420-
messagesContainer = transientMessages;
415+
try {
416+
if(eraSender != ASAP.TRANSIENT_ERA) {
417+
incomingChunk = this.getIncomingChunk(encounteredPeer, asapAssimilationPDU);
418+
messagesContainer = incomingChunk;
419+
} else {
420+
transientMessages = new ASAPInMemoTransientMessages(asapAssimilationPDU);
421+
messagesContainer = transientMessages;
422+
}
423+
}
424+
catch(ASAPException e) {
425+
asapAssimilationPDU.takeDataFromStream();
426+
throw e;
421427
}
422428

423429
// put messages into container - incoming chunk or transient message container
@@ -476,7 +482,7 @@ public void handleASAPAssimilate(ASAP_AssimilationPDU_1_0 asapAssimilationPDU, A
476482
}
477483
}
478484

479-
private ASAPInternalChunk getIncomingChunk(ASAP_AssimilationPDU_1_0 asapAssimilationPDU)
485+
private ASAPInternalChunk getIncomingChunk(String encounteredPeer, ASAP_AssimilationPDU_1_0 asapAssimilationPDU)
480486
throws IOException, ASAPException {
481487

482488
String uri = asapAssimilationPDU.getChannelUri();
@@ -486,50 +492,53 @@ private ASAPInternalChunk getIncomingChunk(ASAP_AssimilationPDU_1_0 asapAssimila
486492
ASAPChunkStorage incomingChunkStorage = incomingStorage.getChunkStorage();
487493
Log.writeLog(this, this.toString(), "got incoming chunk storage for senderE2E: " + senderE2E);
488494

489-
try {
490495
// get local target for data to come
496+
if (!incomingChunkStorage.existsChunk(uri, eraSender)) {
491497
ASAPInternalChunk localChunk = null;
498+
// is there a local chunk - to clone recipients from?
499+
if (this.channelExists(uri)) {
500+
localChunk = this.getStorage().getChunk(uri, this.getEra());
501+
} else {
502+
Log.writeLog(this, this.toString(), "asked to set up new channel: (uri/senderE2E): "
503+
+ uri + " | " + senderE2E);
504+
// this channel is new to local peer - am I allowed to create it?
505+
if (!this.securityAdministrator.allowedToCreateChannel(asapAssimilationPDU)) {
506+
Log.writeLog(this, this.toString(),
507+
".. not allowed .. TODO not yet implemented .. always set up");
492508

493-
if (!incomingChunkStorage.existsChunk(uri, eraSender)) {
494-
// is there a local chunk - to clone recipients from?
495-
if (this.channelExists(uri)) {
496-
localChunk = this.getStorage().getChunk(uri, this.getEra());
509+
//allowedAssimilation = false; // TODO
497510
} else {
498-
Log.writeLog(this, this.toString(), "asked to set up new channel: (uri/senderE2E): "
499-
+ uri + " | " + senderE2E);
500-
// this channel is new to local peer - am I allowed to create it?
501-
if (!this.securityAdministrator.allowedToCreateChannel(asapAssimilationPDU)) {
502-
Log.writeLog(this, this.toString(),
503-
".. not allowed .. TODO not yet implemented .. always set up");
504-
505-
//allowedAssimilation = false; // TODO
506-
} else {
507-
Log.writeLog(this, this.toString(), "allowed. Set it up.");
508-
this.createChannel(uri);
509-
}
511+
Log.writeLog(this, this.toString(), "allowed. Set it up.");
512+
this.createChannel(uri);
510513
}
511-
} else {
512-
Log.writeLog(this, this.toString(), "received chunk that already exists - did nothing: "
513-
+ senderE2E + " | " + eraSender + " | " + uri);
514-
515-
// read assimilation message payload to oblivion!
516-
asapAssimilationPDU.takeDataFromStream();
517514
}
518-
519515
ASAPInternalChunk incomingChunk = incomingStorage.createNewChunk(uri, eraSender);
520-
//ASAPInternalChunk incomingChunk = incomingChunkStorage.getChunk(uri, eraSender);
521516

522517
if (localChunk != null) {
523518
Log.writeLog(this, this.toString(), "copy local meta data into newly created incoming chunk");
524519
incomingChunk.copyMetaData(this.getChannel(uri));
525520
}
526-
527521
return incomingChunk;
522+
} else {
523+
// already exists. Add message if sender == originator and era last era of this channel
524+
if(incomingStorage.getEra() == eraSender // era is current era
525+
&& PeerIDHelper.sameID(encounteredPeer, senderE2E) // E2E sender == P2P sender
526+
) {
527+
Log.writeLog(this, this.toString(),
528+
"received chunk exists but sender is originator and current era: "
529+
+ senderE2E + " | " + eraSender + " | " + uri);
528530

529-
} catch (IOException | ASAPException e) {
530-
Log.writeLogErr(this, this.toString(),
531-
"exception (give up, keep streams untouched): " + e.getLocalizedMessage());
532-
throw e;
531+
// finish routing...
532+
if(incomingChunkStorage.getChunk(uri, eraSender).getASAPHopList().size() > 1) {
533+
throw new ASAPException("chunk was already routed to me - met originator - will no overwrite chunk");
534+
}
535+
536+
return incomingChunkStorage.getChunk(uri, eraSender);
537+
} else {
538+
// read assimilation message payload to oblivion!
539+
throw new ASAPException("chunk that already exists; not current era or not originator: "
540+
+ senderE2E + " | " + eraSender + " | " + uri);
541+
}
533542
}
534543
}
535544

src/main/java/net/sharksystem/asap/protocol/ASAPPersistentConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,7 @@ public void run() {
429429
}
430430
}
431431
catch(ASAPException asape) {
432-
Log.writeLogErr(this, "asap exception while processing PDU - but go ahead: "
432+
Log.writeLog(this, "while processing PDU (go ahead): "
433433
+ asape.getLocalizedMessage());
434434
}
435435
catch(IOException ioe) {

src/test/java/net/sharksystem/asap/engine/MultihopTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,9 @@ public void asapRoutingIsFiniteAndCheckEra() throws IOException, ASAPException,
358358
System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
359359
alicePeer.startEncounter(TestHelper.getPortNumber(), bobPeer);
360360
// give your app a moment to process
361-
Thread.sleep(1000);
361+
Thread.sleep(500);
362362
alicePeer.stopEncounter(bobPeer);
363-
Thread.sleep(1000);
363+
Thread.sleep(500);
364364
Assert.assertEquals(1, bobListener.numberOfMessages);
365365

366366
// check local eras
@@ -375,9 +375,9 @@ public void asapRoutingIsFiniteAndCheckEra() throws IOException, ASAPException,
375375
System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
376376
claraPeer.startEncounter(TestHelper.getPortNumber(), bobPeer);
377377
// give your app a moment to process
378-
Thread.sleep(1000);
378+
Thread.sleep(500);
379379
claraPeer.stopEncounter(bobPeer);
380-
Thread.sleep(1000);
380+
Thread.sleep(500);
381381
Assert.assertEquals(1, claraListener.numberOfMessages);
382382

383383
// no change - receiving message does not change local understanding of an era
@@ -392,9 +392,9 @@ public void asapRoutingIsFiniteAndCheckEra() throws IOException, ASAPException,
392392
System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
393393
alicePeer.startEncounter(TestHelper.getPortNumber(), claraPeer);
394394
// give your app a moment to process
395-
Thread.sleep(1000);
395+
Thread.sleep(500);
396396
alicePeer.stopEncounter(claraPeer);
397-
Thread.sleep(1000);
397+
Thread.sleep(500);
398398
Assert.assertEquals(0, claraListener.numberOfMessages);
399399

400400
// no change - receiving message does not change local understanding of an era

0 commit comments

Comments
 (0)