Skip to content

Commit c81ee8c

Browse files
committed
working on asap management message hopping
1 parent cf9c84c commit c81ee8c

File tree

6 files changed

+45
-62
lines changed

6 files changed

+45
-62
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package net.sharksystem.asap;
22

33
import java.io.IOException;
4+
import java.util.HashMap;
45
import java.util.Set;
56

67
public interface ASAPChannel {
78
CharSequence getOwner() throws IOException;
89
CharSequence getUri() throws IOException;
910
Set<CharSequence> getRecipients() throws IOException;
11+
HashMap<String, String> getExtraData() throws IOException;
1012
}

src/net/sharksystem/asap/ASAPChannelImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package net.sharksystem.asap;
22

33
import java.io.IOException;
4+
import java.util.HashMap;
45
import java.util.Set;
56

67
public class ASAPChannelImpl implements ASAPChannel {
@@ -28,6 +29,11 @@ public Set<CharSequence> getRecipients() throws IOException {
2829
return this.asapEngine.getRecipients(this.getUri());
2930
}
3031

32+
@Override
33+
public HashMap<String, String> getExtraData() throws IOException {
34+
return this.asapEngine.getStorage().getChunk(uri, this.asapEngine.getEra()).getExtraData();
35+
}
36+
3137
public void setOwner(CharSequence owner) throws IOException {
3238
this.asapEngine.putExtra(this.getUri(), CHANNEL_OWNER, owner.toString());
3339
}

src/net/sharksystem/asap/ASAPChunk.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,6 @@ public interface ASAPChunk {
124124
void deliveredTo(String peer) throws IOException;
125125

126126
List<CharSequence> getDeliveredTo();
127+
128+
void copyMetaData(ASAPChannel channel) throws IOException;
127129
}

src/net/sharksystem/asap/ASAPChunkFS.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,15 @@ public void clone(ASAPChunk chunkSource) throws IOException {
3636
this.saveStatus();
3737
}
3838

39+
@Override
40+
public void copyMetaData(ASAPChannel channel) throws IOException {
41+
this.uri = channel.getUri().toString();
42+
this.recipients = channel.getRecipients();
43+
this.extraData = channel.getExtraData();
44+
45+
this.saveStatus();
46+
}
47+
3948
public HashMap<String, String> getExtraData() {
4049
return this.extraData;
4150
}

src/net/sharksystem/asap/ASAPEngine.java

Lines changed: 25 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -338,17 +338,6 @@ public void handleASAPAssimilate(ASAP_AssimilationPDU_1_0 asapAssimiliationPDU,
338338
InputStream is, OutputStream os, ASAPChunkReceivedListener listener)
339339
throws ASAPException, IOException {
340340

341-
/*
342-
if(this.isASAPManagementMessage(asapAssimiliationPDU)) {
343-
//<<<<<<<<<<<<<<<<<<debug
344-
StringBuilder b = new StringBuilder();
345-
b.append(this.getLogStart());
346-
b.append("got asap management assimilate - it's the wrong place for that - multiengine should be called");
347-
System.out.println(b.toString());
348-
//>>>>>>>>>>>>>>>>>>>debug
349-
throw new ASAPException("got asap management assimilate - it's the wrong place for that - multiengine should be called");
350-
}
351-
*/
352341
String sender = asapAssimiliationPDU.getPeer();
353342
int eraSender = asapAssimiliationPDU.getEra();
354343

@@ -363,7 +352,7 @@ public void handleASAPAssimilate(ASAP_AssimilationPDU_1_0 asapAssimiliationPDU,
363352
//>>>>>>>>>>>>>>>>>>>debug
364353

365354
// get received storage
366-
ASAPChunkStorage senderStorage = this.getIncomingChunkStorage(sender);
355+
ASAPChunkStorage incomingSenderStorage = this.getIncomingChunkStorage(sender);
367356
//<<<<<<<<<<<<<<<<<<debug
368357
b = new StringBuilder();
369358
b.append(this.getLogStart());
@@ -377,42 +366,28 @@ public void handleASAPAssimilate(ASAP_AssimilationPDU_1_0 asapAssimiliationPDU,
377366
try {
378367
// read URI
379368
String uri = asapAssimiliationPDU.getChannelUri();
380-
if(!senderStorage.existsChunk(uri, eraSender) && asapAssimiliationPDU.recipientPeerSet()) {
369+
370+
// get local target for data to come
371+
ASAPChunk incomingChunk = incomingSenderStorage.getChunk(uri, eraSender);
372+
373+
if(!incomingSenderStorage.existsChunk(uri, eraSender) && asapAssimiliationPDU.recipientPeerSet()) {
381374
//<<<<<<<<<<<<<<<<<<debug
382375
b = new StringBuilder();
383376
b.append(this.getLogStart());
384377
b.append("recipient (");
385378
b.append(asapAssimiliationPDU.getRecipientPeer());
386-
b.append(") but no chunk on my side - should run asap management before?:");
379+
b.append(") no incoming chunk yet:");
387380
b.append(asapAssimiliationPDU.toString());
388-
System.out.println(b.toString());
389381
//>>>>>>>>>>>>>>>>>>>debug
390-
}
391-
392-
ASAPChunk chunk = senderStorage.getChunk(uri, eraSender);
393382

394-
if(chunk != null) {
395-
//<<<<<<<<<<<<<<<<<<debug
396-
b = new StringBuilder();
397-
b.append(this.getLogStart());
398-
b.append("got local chunk to store messages for uri: ");
399-
b.append(uri);
400-
b.append(" | era: ");
401-
b.append(eraSender);
383+
// is there a local chunk - to clone recipients from?
384+
if(this.channelExists(uri)) {
385+
b.append("local chunk exists - copy meta data");
386+
incomingChunk.copyMetaData(this.getChannel(uri));
387+
} else {
388+
b.append("no local chunk to copy meta data from");
389+
}
402390
System.out.println(b.toString());
403-
//>>>>>>>>>>>>>>>>>>>debug
404-
} else {
405-
//<<<<<<<<<<<<<<<<<<debug
406-
b = new StringBuilder();
407-
b.append(this.getLogStart());
408-
b.append("ERROR: no chunk found for sender/uri: ");
409-
b.append(" | ");
410-
b.append(uri);
411-
b.append(" | era: ");
412-
b.append(eraSender);
413-
System.err.println(b.toString());
414-
//>>>>>>>>>>>>>>>>>>>debug
415-
throw new ASAPException("couldn't create local chunk storage - give up");
416391
}
417392

418393
List<Integer> messageOffsets = asapAssimiliationPDU.getMessageOffsets();
@@ -431,7 +406,7 @@ public void handleASAPAssimilate(ASAP_AssimilationPDU_1_0 asapAssimiliationPDU,
431406
b.append(")");
432407
System.out.println(b.toString());
433408
//>>>>>>>>>>>>>>>>>>>debug
434-
chunk.addMessage(protocolInputStream, nextOffset - offset);
409+
incomingChunk.addMessage(protocolInputStream, nextOffset - offset);
435410
if(!changed) { changed = true; this.contentChanged();}
436411
offset = nextOffset;
437412
}
@@ -447,7 +422,7 @@ public void handleASAPAssimilate(ASAP_AssimilationPDU_1_0 asapAssimiliationPDU,
447422
System.out.println(b.toString());
448423
//>>>>>>>>>>>>>>>>>>>debug
449424

450-
chunk.addMessage(protocolInputStream, asapAssimiliationPDU.getLength() - offset);
425+
incomingChunk.addMessage(protocolInputStream, asapAssimiliationPDU.getLength() - offset);
451426
if(!changed) { changed = true; this.contentChanged();}
452427

453428
// read all messages
@@ -490,18 +465,6 @@ public void handleASAPAssimilate(ASAP_AssimilationPDU_1_0 asapAssimiliationPDU,
490465
public void handleASAPInterest(ASAP_Interest_PDU_1_0 asapInterest, ASAP_1_0 protocol, OutputStream os)
491466
throws ASAPException, IOException {
492467

493-
/*
494-
if(this.isASAPManagementMessage(asapInterest)) {
495-
//<<<<<<<<<<<<<<<<<<debug
496-
StringBuilder b = new StringBuilder();
497-
b.append(this.getLogStart());
498-
b.append("got asap management interest - not handled in this implementation.");
499-
System.out.println(b.toString());
500-
//>>>>>>>>>>>>>>>>>>>debug
501-
return;
502-
}
503-
*/
504-
505468
// get remote peer
506469
String peer = asapInterest.getPeer();
507470

@@ -564,6 +527,7 @@ public void handleASAPInterest(ASAP_Interest_PDU_1_0 asapInterest, ASAP_1_0 prot
564527
//>>>>>>>>>>>>>>>>>>>debug
565528

566529
this.sendChunks(this.owner, peer, this.getChunkStorage(), protocol, workingEra, lastEra, os);
530+
567531
//<<<<<<<<<<<<<<<<<<debug
568532
b = new StringBuilder();
569533
b.append(this.getLogStart());
@@ -594,7 +558,7 @@ public void setSendReceivedChunks(boolean on) throws IOException {
594558
this.saveStatus();
595559
}
596560

597-
private void sendChunks(CharSequence sender, String recipient, ASAPChunkStorage chunkStorage,
561+
private void sendChunks(CharSequence sender, String remotePeer, ASAPChunkStorage chunkStorage,
598562
ASAP_1_0 protocol, int workingEra,
599563
int lastEra, OutputStream os) throws IOException, ASAPException {
600564
/*
@@ -642,7 +606,7 @@ private void sendChunks(CharSequence sender, String recipient, ASAPChunkStorage
642606
// is not a public chunk
643607
if (goAhead && !this.isPublic(chunk)) {
644608
Set<CharSequence> recipients = chunk.getRecipients();
645-
if (recipients == null || !recipients.contains(recipient)) {
609+
if (recipients == null || !recipients.contains(remotePeer)) {
646610
goAhead = false;
647611
}
648612
}
@@ -655,8 +619,8 @@ private void sendChunks(CharSequence sender, String recipient, ASAPChunkStorage
655619
System.out.println(b.toString());
656620
//>>>>>>>>>>>>>>>>>>>debug
657621

658-
protocol.assimilate(sender, // recipient
659-
recipient, // recipient
622+
protocol.assimilate(sender, // remotePeer
623+
remotePeer, // remotePeer
660624
this.format,
661625
chunk.getUri(), // channel ok
662626
workingEra, // era ok
@@ -667,18 +631,18 @@ private void sendChunks(CharSequence sender, String recipient, ASAPChunkStorage
667631
false);
668632

669633
// remember sent
670-
chunk.deliveredTo(recipient);
634+
chunk.deliveredTo(remotePeer);
671635
//<<<<<<<<<<<<<<<<<<debug
672636
b = new StringBuilder();
673637
b.append(this.getLogStart());
674638
b.append("remembered delivered to ");
675-
b.append(recipient);
639+
b.append(remotePeer);
676640
System.out.println(b.toString());
677641
//>>>>>>>>>>>>>>>>>>>debug
678642
// sent to all recipients
679643
if (chunk.getRecipients().size() == chunk.getDeliveredTo().size()) {
680644
b = Log.startLog(this);
681-
b.append("#recipients == #deliveredTo chunk delivered to any potential recipient - could drop it");
645+
b.append("#recipients == #deliveredTo chunk delivered to any potential remotePeer - could drop it");
682646
System.out.println(b.toString());
683647
if (this.isDropDeliveredChunks()) {
684648
chunk.drop();
@@ -696,7 +660,7 @@ private void sendChunks(CharSequence sender, String recipient, ASAPChunkStorage
696660
}
697661

698662
// remember that we are in sync until that era
699-
this.setLastSeen(recipient, workingEra);
663+
this.setLastSeen(remotePeer, workingEra);
700664

701665
// make a breakpoint here
702666
if(this.memento != null) this.memento.save(this);

test/net/sharksystem/asap/MultihopTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void twoHops() throws IOException, ASAPException, InterruptedException {
9999
}
100100

101101
@Test
102-
public void createNonOpenStorage() throws IOException, ASAPException, InterruptedException {
102+
public void closedChannelTest() throws IOException, ASAPException, InterruptedException {
103103
CmdLineUI ui = new CmdLineUI(System.out);
104104
ui.doResetASAPStorages();
105105

0 commit comments

Comments
 (0)