Skip to content

Commit cf9c84c

Browse files
committed
working on asap management message hopping
1 parent f2c23d7 commit cf9c84c

File tree

7 files changed

+181
-66
lines changed

7 files changed

+181
-66
lines changed

src/net/sharksystem/asap/ASAPEngine.java

Lines changed: 54 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,8 @@ public void handleASAPInterest(ASAP_Interest_PDU_1_0 asapInterest, ASAP_1_0 prot
580580

581581
this.sendChunks(sender, peer, incomingChunkStorage, protocol, workingEra, lastEra, os);
582582
}
583+
} else {
584+
System.out.println(this.getLogStart() + "engine does not send received chunks");
583585
}
584586
}
585587

@@ -608,6 +610,7 @@ private void sendChunks(CharSequence sender, String recipient, ASAPChunkStorage
608610

609611
boolean lastRound = false; // assume more than one round
610612
do {
613+
boolean goAhead = true; // to avoid deep if-if-if-if structures
611614
lastRound = workingEra == lastEra;
612615

613616
List<ASAPChunk> chunks = chunkStorage.getChunks(workingEra);
@@ -625,68 +628,69 @@ private void sendChunks(CharSequence sender, String recipient, ASAPChunkStorage
625628
b.append(this.getLogStart());
626629
b.append("chunkUrl: ");
627630
b.append(chunk.getUri());
628-
b.append(" / isPublic: ");
631+
b.append(" | isPublic: ");
629632
b.append(this.isPublic(chunk));
633+
b.append(" | len: ");
634+
b.append(chunk.getLength());
630635
System.out.println(b.toString());
631636
//>>>>>>>>>>>>>>>>>>>debug
632637

638+
if(chunk.getLength() < 1) {
639+
goAhead = false;
640+
}
641+
633642
// is not a public chunk
634-
if (!this.isPublic(chunk)) {
643+
if (goAhead && !this.isPublic(chunk)) {
635644
Set<CharSequence> recipients = chunk.getRecipients();
636-
637-
if (!recipients.contains(recipient)) {
638-
continue;
645+
if (recipients == null || !recipients.contains(recipient)) {
646+
goAhead = false;
639647
}
640648
}
641649

642-
//<<<<<<<<<<<<<<<<<<debug
643-
b = new StringBuilder();
644-
b.append(this.getLogStart());
645-
b.append("send chunk");
646-
System.out.println(b.toString());
647-
//>>>>>>>>>>>>>>>>>>>debug
648-
649-
/*
650-
void assimilate(CharSequence recipient, CharSequence recipientPeer, CharSequence format, CharSequence channel, int era,
651-
int length, List<Integer> offsets, InputStream dataIS, OutputStream os, boolean signed)
652-
throws IOException, ASAPException;
653-
*/
654-
655-
protocol.assimilate(sender, // recipient
656-
recipient, // recipient
657-
this.format,
658-
chunk.getUri(), // channel ok
659-
workingEra, // era ok
660-
chunk.getLength(), // data length
661-
chunk.getOffsetList(),
662-
chunk.getMessageInputStream(),
663-
os,
664-
false);
665-
666-
// remember sent
667-
chunk.deliveredTo(recipient);
668-
//<<<<<<<<<<<<<<<<<<debug
669-
b = new StringBuilder();
670-
b.append(this.getLogStart());
671-
b.append("remembered delivered to ");
672-
b.append(recipient);
673-
System.out.println(b.toString());
674-
//>>>>>>>>>>>>>>>>>>>debug
675-
// sent to all recipients
676-
if (chunk.getRecipients().size() == chunk.getDeliveredTo().size()) {
677-
b = Log.startLog(this);
678-
b.append("#recipients == #deliveredTo chunk delivered to any potential recipient - could drop it");
650+
if (goAhead) {
651+
//<<<<<<<<<<<<<<<<<<debug
652+
b = new StringBuilder();
653+
b.append(this.getLogStart());
654+
b.append("send chunk");
679655
System.out.println(b.toString());
680-
if (this.isDropDeliveredChunks()) {
681-
chunk.drop();
682-
//<<<<<<<<<<<<<<<<<<debug
683-
b = Log.startLog(this);
684-
b.append("chunk dropped");
685-
System.out.println(b.toString());
686-
} else {
656+
//>>>>>>>>>>>>>>>>>>>debug
657+
658+
protocol.assimilate(sender, // recipient
659+
recipient, // recipient
660+
this.format,
661+
chunk.getUri(), // channel ok
662+
workingEra, // era ok
663+
chunk.getLength(), // data length
664+
chunk.getOffsetList(),
665+
chunk.getMessageInputStream(),
666+
os,
667+
false);
668+
669+
// remember sent
670+
chunk.deliveredTo(recipient);
671+
//<<<<<<<<<<<<<<<<<<debug
672+
b = new StringBuilder();
673+
b.append(this.getLogStart());
674+
b.append("remembered delivered to ");
675+
b.append(recipient);
676+
System.out.println(b.toString());
677+
//>>>>>>>>>>>>>>>>>>>debug
678+
// sent to all recipients
679+
if (chunk.getRecipients().size() == chunk.getDeliveredTo().size()) {
687680
b = Log.startLog(this);
688-
b.append("drop flag set false - engine does not remove delivered chunks");
681+
b.append("#recipients == #deliveredTo chunk delivered to any potential recipient - could drop it");
689682
System.out.println(b.toString());
683+
if (this.isDropDeliveredChunks()) {
684+
chunk.drop();
685+
//<<<<<<<<<<<<<<<<<<debug
686+
b = Log.startLog(this);
687+
b.append("chunk dropped");
688+
System.out.println(b.toString());
689+
} else {
690+
b = Log.startLog(this);
691+
b.append("drop flag set false - engine does not remove delivered chunks");
692+
System.out.println(b.toString());
693+
}
690694
}
691695
}
692696
}

src/net/sharksystem/asap/MultiASAPEngineFS.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,9 @@ public interface MultiASAPEngineFS {
6666
boolean isASAPManagementEngineRunning();
6767

6868
EngineSetting getEngineSettings(CharSequence format) throws ASAPException;
69+
70+
/**
71+
* @return all formats currently supported by this mulit engine
72+
*/
73+
Set<CharSequence> getFormats();
6974
}

src/net/sharksystem/asap/MultiASAPEngineFS_Impl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,11 @@ public EngineSetting getEngineSettings(CharSequence format) throws ASAPException
182182
return folderAndListener;
183183
}
184184

185+
public Set<CharSequence> getFormats() {
186+
return this.folderMap.keySet();
187+
}
188+
189+
185190
////////////////////////////////////////////////////////////////////////////////////////////////////////////
186191
// connection management //
187192
////////////////////////////////////////////////////////////////////////////////////////////////////////////

src/net/sharksystem/asap/management/ASAPManagementMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public ASAPManagementMessageHandler(MultiASAPEngineFS multiASAPEngine) throws IO
2323
@Override
2424
public void chunkReceived(String sender, String uri, int era) {
2525
System.out.println(this.getLogStart()
26-
+ "handle received chunk (sender/uri/era)" + sender + "/" + uri + "/" + era);
26+
+ "handle received chunk (sender|uri|era) " + sender + "|" + uri + "|" + era);
2727
try {
2828
ASAPEngine asapManagementEngine = multiASAPEngine.getEngineByFormat(ASAP_1_0.ASAP_MANAGEMENT_FORMAT);
2929
CharSequence owner = asapManagementEngine.getOwner();

src/net/sharksystem/asap/protocol/ASAPPersistentConnection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,12 +350,12 @@ public ASAPPDUExecutor(ASAP_PDU_1_0 asapPDU, InputStream is, OutputStream os,
350350

351351
StringBuilder sb = new StringBuilder();
352352
sb.append(getLogStart());
353-
sb.append("ASAPPDUExecutor created - ");
354-
sb.append("folder: " + engineSetting.folder + " | ");
353+
sb.append("ASAPPDUExecutor: ");
355354
sb.append("engine: " + engineSetting.engine.getClass().getSimpleName() + " | ");
356355
if(engineSetting.listener != null) {
357-
sb.append("listener: " + engineSetting.listener.getClass().getSimpleName());
356+
sb.append("listener: " + engineSetting.listener.getClass().getSimpleName() + " | ");
358357
}
358+
sb.append("folder: " + engineSetting.folder);
359359

360360
System.out.println(sb.toString());
361361
}

src/net/sharksystem/cmdline/CmdLineUI.java

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public class CmdLineUI {
2424
public static final String RESET_ASAP_STORAGES = "resetstorage";
2525
public static final String SET_SEND_RECEIVED_MESSAGES = "sendReceived";
2626
public static final String PRINT_CHANNEL_INFORMATION = "printChannelInfo";
27+
public static final String PRINT_STORAGE_INFORMATION = "printStorageInfo";
28+
public static final String PRINT_ALL_INFORMATION = "printAll";
2729

2830
private final PrintStream consoleOutput;
2931
private final BufferedReader userInput;
@@ -95,6 +97,12 @@ public void printUsage() {
9597
b.append(SET_SEND_RECEIVED_MESSAGES);
9698
b.append(".. set whether received message are to be sent");
9799
b.append("\n");
100+
b.append(PRINT_ALL_INFORMATION);
101+
b.append(".. print general information of all storages of a user");
102+
b.append("\n");
103+
b.append(PRINT_STORAGE_INFORMATION);
104+
b.append(".. print general information about a storage");
105+
b.append("\n");
98106
b.append(PRINT_CHANNEL_INFORMATION);
99107
b.append(".. print general information about a channel");
100108
b.append("\n");
@@ -171,6 +179,14 @@ public void printUsage(String cmdString, String comment) {
171179
out.println(PRINT_CHANNEL_INFORMATION + " user appName uri");
172180
out.println("example: " + PRINT_CHANNEL_INFORMATION + " Alice chat sn2://abChat");
173181
break;
182+
case PRINT_STORAGE_INFORMATION:
183+
out.println(PRINT_STORAGE_INFORMATION + " user appName");
184+
out.println("example: " + PRINT_STORAGE_INFORMATION + " Alice chat");
185+
break;
186+
case PRINT_ALL_INFORMATION:
187+
out.println(PRINT_ALL_INFORMATION + "user");
188+
out.println("example: " + PRINT_ALL_INFORMATION + " Alice ");
189+
break;
174190
default:
175191
out.println("unknown command: " + cmdString);
176192
}
@@ -228,6 +244,10 @@ public void runCommandLoop() {
228244
this.doSetSendReceivedMessage(parameterString); break;
229245
case PRINT_CHANNEL_INFORMATION:
230246
this.doPrintChannelInformation(parameterString); break;
247+
case PRINT_STORAGE_INFORMATION:
248+
this.doPrintStorageInformation(parameterString); break;
249+
case PRINT_ALL_INFORMATION:
250+
this.doPrintAllInformation(parameterString); break;
231251
case "q": // convenience
232252
case EXIT:
233253
this.doKill("all");
@@ -499,6 +519,52 @@ public void doSetSendReceivedMessage(String parameterString) {
499519
}
500520
}
501521

522+
public void doPrintAllInformation(String parameterString) {
523+
StringTokenizer st = new StringTokenizer(parameterString);
524+
525+
try {
526+
String owner = st.nextToken();
527+
528+
MultiASAPEngineFS multiEngine =
529+
MultiASAPEngineFS_Impl.createMultiEngine(TESTS_ROOT_FOLDER + "/" + owner, null);
530+
531+
for(CharSequence format : multiEngine.getFormats()) {
532+
ASAPEngine asapStorage = multiEngine.getEngineByFormat(format);
533+
System.out.println("storage: " + format);
534+
for (CharSequence uri : asapStorage.getChannelURIs()) {
535+
this.printChannelInfo(asapStorage, uri, format);
536+
}
537+
}
538+
}
539+
catch(RuntimeException | IOException | ASAPException e) {
540+
this.printUsage(PRINT_ALL_INFORMATION, e.getLocalizedMessage());
541+
}
542+
}
543+
544+
public void doPrintStorageInformation(String parameterString) {
545+
StringTokenizer st = new StringTokenizer(parameterString);
546+
547+
try {
548+
String owner = st.nextToken();
549+
String appName = st.nextToken();
550+
551+
// first - get storage
552+
ASAPStorage asapStorage = this.storages.get(this.getStorageKey(owner, appName));
553+
if(asapStorage == null) {
554+
System.err.println("storage does not exist: " + this.getStorageKey(owner, appName));
555+
return;
556+
}
557+
558+
// iterate URI
559+
for(CharSequence uri : asapStorage.getChannelURIs()) {
560+
this.doPrintChannelInformation(parameterString + " " + uri);
561+
}
562+
}
563+
catch(RuntimeException | IOException e) {
564+
this.printUsage(PRINT_STORAGE_INFORMATION, e.getLocalizedMessage());
565+
}
566+
}
567+
502568
public void doPrintChannelInformation(String parameterString) {
503569
// out.println("example: " + PRINT_CHANNEL_INFORMATION + " Alice chat sn2://abChat");
504570
StringTokenizer st = new StringTokenizer(parameterString);
@@ -515,20 +581,26 @@ public void doPrintChannelInformation(String parameterString) {
515581
return;
516582
}
517583

518-
ASAPChannel channel = asapStorage.getChannel(uri);
519-
Set<CharSequence> recipients = channel.getRecipients();
584+
this.printChannelInfo(asapStorage, uri, appName);
520585

521-
System.out.println("Owner:App:Channel == " + channel.getOwner() + ":" + appName + ":" + channel.getUri());
522-
System.out.println("#Recipients == " + recipients.size());
523-
for(CharSequence recipient : recipients) {
524-
System.out.println(recipient);
525-
}
526586
}
527587
catch(RuntimeException | ASAPException | IOException e) {
528588
this.printUsage(CREATE_ASAP_MESSAGE, e.getLocalizedMessage());
529589
}
530590
}
531591

592+
private void printChannelInfo(ASAPStorage asapStorage, CharSequence uri, CharSequence appName)
593+
throws IOException, ASAPException {
594+
ASAPChannel channel = asapStorage.getChannel(uri);
595+
Set<CharSequence> recipients = channel.getRecipients();
596+
597+
System.out.println("Owner:App:Channel == " + channel.getOwner() + ":" + appName + ":" + channel.getUri());
598+
System.out.println("#Recipients == " + recipients.size());
599+
for(CharSequence recipient : recipients) {
600+
System.out.println(recipient);
601+
}
602+
}
603+
532604
////////////////////////////////////////////////////////////////////////////////////////////////////////////
533605
// helper methods //
534606
////////////////////////////////////////////////////////////////////////////////////////////////////////////

test/net/sharksystem/asap/MultihopTests.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ public void twoHops() throws IOException, ASAPException, InterruptedException {
9292

9393
// clara era was increased after connection terminated - message from bob is in era before current one
9494
// int eraToLook = ASAPEngine.previousEra(clara.getEra());
95-
int eraToLook = clara.getEra();
9695
ASAPChunk claraABCChat = claraAlice.getChunk("sn2://abc", aliceEraWhenIssuedMessage);
9796
CharSequence message = claraABCChat.getMessages().next();
9897
boolean same = messageAlice2Clara.equalsIgnoreCase(message.toString());
@@ -108,6 +107,12 @@ public void createNonOpenStorage() throws IOException, ASAPException, Interrupte
108107
ui.doCreateASAPApp("Alice chat");
109108
ui.doCreateASAPApp("Bob chat");
110109
ui.doCreateASAPApp("Clara chat");
110+
ui.doCreateASAPApp("David chat");
111+
112+
ui.doSetSendReceivedMessage("Alice:chat on");
113+
ui.doSetSendReceivedMessage("Bob:chat on");
114+
ui.doSetSendReceivedMessage("Clara:chat on");
115+
ui.doSetSendReceivedMessage("David:chat on");
111116

112117
// create closed channel with Alice
113118
ui.doCreateASAPChannel(" Alice chat sn2://closedChannel Bob Clara");
@@ -119,6 +124,10 @@ public void createNonOpenStorage() throws IOException, ASAPException, Interrupte
119124
String parameters = "Alice chat sn2://closedChannel " + messageAlice2Clara;
120125
ui.doCreateASAPMessage(parameters);
121126

127+
// remember Alice' era
128+
ASAPStorage aliceStorage = this.getFreshStorageByName(ui, "Alice:chat");
129+
int aliceEraWhenIssuedMessage = aliceStorage.getEra();
130+
122131
System.out.println("**************************************************************************");
123132
System.out.println("** connect Alice with Bob **");
124133
System.out.println("**************************************************************************");
@@ -165,8 +174,7 @@ public void createNonOpenStorage() throws IOException, ASAPException, Interrupte
165174
// message received?
166175
ASAPChunkStorage bobAlice = bobStorage.getIncomingChunkStorage("Alice");
167176
// clara era was increased after connection terminated - message from bob is in era before current one
168-
int eraToLook = ASAPEngine.previousEra(bobStorage.getEra());
169-
ASAPChunk bobABCChat = bobAlice.getChunk("sn2://closedChannel", eraToLook);
177+
ASAPChunk bobABCChat = bobAlice.getChunk("sn2://closedChannel", aliceEraWhenIssuedMessage);
170178
CharSequence message = bobABCChat.getMessages().next();
171179
Assert.assertTrue(messageAlice2Clara.equalsIgnoreCase(message.toString()));
172180

@@ -216,10 +224,31 @@ public void createNonOpenStorage() throws IOException, ASAPException, Interrupte
216224
// message received?
217225
ASAPChunkStorage claraAlice = claraStorage.getIncomingChunkStorage("Alice");
218226
// clara era was increased after connection terminated - message from bob is in era before current one
219-
eraToLook = ASAPEngine.previousEra(bobStorage.getEra());
220-
ASAPChunk claraABCChat = claraAlice.getChunk("sn2://closedChannel", eraToLook);
227+
ASAPChunk claraABCChat = claraAlice.getChunk("sn2://closedChannel", aliceEraWhenIssuedMessage);
221228
message = claraABCChat.getMessages().next();
222229
Assert.assertTrue(messageAlice2Clara.equalsIgnoreCase(message.toString()));
230+
231+
System.out.println("**************************************************************************");
232+
System.out.println("** connect Clara with David **");
233+
System.out.println("**************************************************************************");
234+
// connect alice with bob
235+
ui.doCreateASAPMultiEngine("Clara");
236+
ui.doOpen("7072 Clara");
237+
// wait a moment to give server socket time to be created
238+
Thread.sleep(10);
239+
ui.doCreateASAPMultiEngine("David");
240+
ui.doConnect("7072 David");
241+
242+
// wait a moment
243+
Thread.sleep(1000);
244+
// kill connections
245+
ui.doKill("all");
246+
// wait a moment
247+
Thread.sleep(1000);
248+
// Bob should now have created an closed asap storage with three recipients
249+
ASAPStorage davidStorage = this.getFreshStorageByName(ui, "David:chat");
250+
251+
Assert.assertFalse(davidStorage.channelExists("sn2://closedChannel"));
223252
}
224253

225254
private ASAPStorage getFreshStorageByName(CmdLineUI ui, String storageName) throws ASAPException, IOException {

0 commit comments

Comments
 (0)