Skip to content

Commit a57963c

Browse files
committed
refactured pdu executor - made it inner class of persistent connection
1 parent af87c20 commit a57963c

File tree

11 files changed

+226
-133
lines changed

11 files changed

+226
-133
lines changed

src/net/sharksystem/asap/ASAPEngine.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ public abstract class ASAPEngine implements ASAPStorage, ASAPProtocolEngine {
3131
protected ASAPMemento memento = null;
3232

3333
/* private */ final private ASAPChunkStorage chunkStorage;
34-
private boolean dropDeliveredChunks = false;
34+
protected boolean dropDeliveredChunks = false;
3535

3636
private ASAPOnlineMessageSender asapOnlineMessageSender;
3737
protected boolean contentChanged = false;
38-
private boolean sendReceivedChunks = false;
38+
protected boolean sendReceivedChunks = false;
3939

4040
protected ASAPEngine(ASAPChunkStorage chunkStorage, CharSequence chunkContentFormat)
4141
throws ASAPException, IOException {
@@ -436,16 +436,24 @@ public void handleASAPInterest(ASAP_Interest_PDU_1_0 asapInterest, ASAP_1_0 prot
436436
//>>>>>>>>>>>>>>>>>>>debug
437437

438438
if(this.isSendReceivedChunks()) {
439+
System.out.println(this.getLogStart() + "send also received chunks - if any");
439440

441+
for(CharSequence sender : this.getSender()) {
442+
System.out.println(this.getLogStart() + "send chunks received from: " + sender);
443+
ASAPChunkStorage incomingChunkStorage = this.getIncomingChunkStorage(sender);
444+
445+
this.sendChunks(peer, incomingChunkStorage, protocol, workingEra, lastEra, os);
446+
}
440447
}
441448
}
442449

443450
private boolean isSendReceivedChunks() {
444451
return this.sendReceivedChunks;
445452
}
446453

447-
public void setSendReceivedChunks(boolean on) {
454+
public void setSendReceivedChunks(boolean on) throws IOException {
448455
this.sendReceivedChunks = on;
456+
this.saveStatus();
449457
}
450458

451459
private void sendChunks(String peer, ASAPChunkStorage chunkStorage,
@@ -564,8 +572,9 @@ private boolean isDropDeliveredChunks() {
564572
return this.dropDeliveredChunks;
565573
}
566574

567-
public void setDropDeliveredChunks(boolean drop) {
575+
public void setDropDeliveredChunks(boolean drop) throws IOException {
568576
this.dropDeliveredChunks = drop;
577+
this.saveStatus();
569578
}
570579

571580
static int nextEra(int workingEra) {

src/net/sharksystem/asap/ASAPMementoFS.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public void save(ASAPEngine engine) throws IOException {
4141
dos.writeInt(engine.era);
4242
dos.writeInt(engine.oldestEra);
4343
dos.writeBoolean(engine.contentChanged);
44+
dos.writeBoolean(engine.dropDeliveredChunks);
45+
dos.writeBoolean(engine.sendReceivedChunks);
4446

4547
// write lastSeen hash map
4648
if(engine.lastSeen != null && !engine.lastSeen.isEmpty()) {
@@ -61,6 +63,8 @@ private void setDefaults(ASAPEngine engine) {
6163
engine.era = ASAPEngine.DEFAULT_INIT_ERA;
6264
engine.oldestEra = ASAPEngine.DEFAULT_INIT_ERA;
6365
engine.lastSeen = new HashMap<>();
66+
engine.dropDeliveredChunks = false;
67+
engine.sendReceivedChunks = false;
6468
}
6569

6670
public void restore(ASAPEngine engine) throws IOException {
@@ -80,6 +84,8 @@ public void restore(ASAPEngine engine) throws IOException {
8084
engine.era = dis.readInt();
8185
engine.oldestEra = dis.readInt();
8286
engine.contentChanged = dis.readBoolean();
87+
engine.dropDeliveredChunks = dis.readBoolean();
88+
engine.sendReceivedChunks = dis.readBoolean();
8389

8490
// try to read lastSeen list
8591
boolean first = true;

src/net/sharksystem/asap/ASAPPDUExecutor.java

Lines changed: 0 additions & 93 deletions
This file was deleted.

src/net/sharksystem/asap/ASAPProtocolEngine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ void handleASAPAssimilate(ASAP_AssimilationPDU_1_0 asap_assimilation, ASAP_1_0 p
3232
* to all their recipients. If no set, they remain intact.
3333
* @param drop
3434
*/
35-
void setDropDeliveredChunks(boolean drop);
35+
void setDropDeliveredChunks(boolean drop) throws IOException;
3636

3737
/**
3838
* engine can deliver local message but also received messages - default false - send no received messages
3939
* @param on
4040
*/
41-
void setSendReceivedChunks(boolean on);
41+
void setSendReceivedChunks(boolean on) throws IOException;
4242
}

src/net/sharksystem/asap/ASAPStorage.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,23 @@ public interface ASAPStorage {
7070
*/
7171
public void setRecipients(CharSequence urlTarget, List<CharSequence> recipients) throws IOException;
7272

73+
/**
74+
* Chunks are delivered when seeing other peers. This flag allows to decide whether delivered chunks
75+
* are to be deleted.
76+
* @param drop
77+
*/
78+
void setDropDeliveredChunks(boolean drop) throws IOException;
79+
80+
/**
81+
* Chunks are delivered when seeing other peers. Default behaviour is to send only message which
82+
* are in local peers own storage. A peer can also have received messages in an incoming storage.
83+
* This flag allows to force even delivery of received messages from incoming storages. Basis of
84+
* multihop communication.
85+
*
86+
* @param drop
87+
*/
88+
void setSendReceivedChunks(boolean drop) throws IOException;
89+
7390
/**
7491
/**
7592
* returns recipient list

src/net/sharksystem/asap/EngineSetting.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package net.sharksystem.asap;
22

3-
class EngineSetting {
4-
final CharSequence folder;
5-
ASAPChunkReceivedListener listener;
6-
ASAPEngine engine;
3+
public class EngineSetting {
4+
public final CharSequence folder;
5+
public ASAPChunkReceivedListener listener;
6+
public ASAPEngine engine;
77

88
EngineSetting(CharSequence folder, ASAPChunkReceivedListener listener) {
99
this.folder = folder;

src/net/sharksystem/asap/MultiASAPEngineFS.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,6 @@ Thread getExecutorThread(ASAP_PDU_1_0 asappdu, InputStream is, OutputStream os,
6565
void addOnlinePeersChangedListener(ASAPOnlinePeersChangedListener listener);
6666

6767
void removeOnlinePeersChangedListener(ASAPOnlinePeersChangedListener listener);
68+
69+
EngineSetting getEngineSettings(CharSequence format) throws ASAPException;
6870
}

src/net/sharksystem/asap/MultiASAPEngineFS_Impl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public ASAPEngine getASAPEngine(CharSequence appName, CharSequence format)
147147
return asapEngine;
148148
}
149149

150-
private EngineSetting getEngineSettings(CharSequence format) throws ASAPException {
150+
public EngineSetting getEngineSettings(CharSequence format) throws ASAPException {
151151
EngineSetting folderAndListener = folderMap.get(format);
152152
if(folderAndListener == null)
153153
throw new ASAPException("no folder for owner / format: " + owner + "/" + format);
@@ -351,7 +351,7 @@ public void pushInterests(OutputStream os) throws IOException, ASAPException {
351351
public Thread getExecutorThread(ASAP_PDU_1_0 asappdu, InputStream is, OutputStream os,
352352
ThreadFinishedListener threadFinishedListener) throws ASAPException {
353353
// process pdu
354-
return new ASAPPDUExecutor(asappdu, is, os,
354+
return new ASAPPersistentConnection.ASAPPDUExecutor(asappdu, is, os,
355355
this.getEngineSettings(asappdu.getFormat()),
356356
new ASAP_Modem_Impl(), threadFinishedListener);
357357
}

src/net/sharksystem/asap/protocol/ASAPPersistentConnection.java

Lines changed: 110 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public ASAPPersistentConnection(InputStream is, OutputStream os, MultiASAPEngine
3636
}
3737

3838
private String getLogStart() {
39-
return this.getClass().getSimpleName() + ": ";
39+
return this.getClass().getSimpleName() + "(connected to: " + this.remotePeer + "): ";
4040
}
4141

4242
private void setRemotePeer(String remotePeerName) {
@@ -225,26 +225,37 @@ public void run() {
225225
System.out.println(this.getLogStart() + "read valid pdu");
226226
this.setRemotePeer(asappdu.getPeer());
227227
// process received pdu
228-
try {
229-
this.executor =
230-
this.multiASAPEngineFS.getExecutorThread(asappdu, this.is, this.os, this);
231-
// get exclusive access to streams
232-
System.out.println(this.startLog() + "asap pdu executor going to wait for stream access");
233-
this.wait4ExclusiveStreamsAccess();
228+
229+
if(asappdu.getFormat().equalsIgnoreCase(ASAP_1_0.ASAP_MANAGEMENT_FORMAT.toString())) {
230+
System.out.println(this.getLogStart()
231+
+ "got asap management message - not processed, took remote peer name only");
232+
} else {
234233
try {
235-
System.out.println(this.startLog() + "asap pdu executor got stream access - process pdu");
236-
this.runObservedThread(executor, maxExecutionTime);
237-
} catch (ASAPExecTimeExceededException e) {
238-
System.out.println(this.startLog() + "asap pdu processing took longer than allowed");
239-
this.terminate("asap pdu processing took longer than allowed", e);
240-
break;
241-
} finally {
242-
// wake waiting thread if any
243-
this.releaseStreamsLock();
244-
System.out.println(this.startLog() + "asap pdu executor release locks");
234+
this.executor =
235+
new ASAPPDUExecutor(asappdu,
236+
this.is, this.os,
237+
this.multiASAPEngineFS.getEngineSettings(asappdu.getFormat()),
238+
new ASAP_Modem_Impl(),
239+
this);
240+
// this.multiASAPEngineFS.getExecutorThread(asappdu, this.is, this.os, this);
241+
// get exclusive access to streams
242+
System.out.println(this.startLog() + "asap pdu executor going to wait for stream access");
243+
this.wait4ExclusiveStreamsAccess();
244+
try {
245+
System.out.println(this.startLog() + "asap pdu executor got stream access - process pdu");
246+
this.runObservedThread(executor, maxExecutionTime);
247+
} catch (ASAPExecTimeExceededException e) {
248+
System.out.println(this.startLog() + "asap pdu processing took longer than allowed");
249+
this.terminate("asap pdu processing took longer than allowed", e);
250+
break;
251+
} finally {
252+
// wake waiting thread if any
253+
this.releaseStreamsLock();
254+
System.out.println(this.startLog() + "asap pdu executor release locks");
255+
}
256+
} catch (ASAPException e) {
257+
System.out.println(this.getLogStart() + " problem when executing asap received pdu: " + e);
245258
}
246-
} catch (ASAPException e) {
247-
System.out.println(this.getLogStart() + " problem when executing asap received pdu: " + e);
248259
}
249260
}
250261
}
@@ -327,5 +338,84 @@ private StringBuilder startLog() {
327338

328339
return sb;
329340
}
330-
}
331341

342+
public static class ASAPPDUExecutor extends Thread {
343+
private final ASAP_PDU_1_0 asapPDU;
344+
private final InputStream is;
345+
private final OutputStream os;
346+
private final EngineSetting engineSetting;
347+
private final ASAP_1_0 protocol;
348+
private final ThreadFinishedListener threadFinishedListener;
349+
350+
public ASAPPDUExecutor(ASAP_PDU_1_0 asapPDU, InputStream is, OutputStream os,
351+
EngineSetting engineSetting, ASAP_1_0 protocol,
352+
ThreadFinishedListener threadFinishedListener) {
353+
this.asapPDU = asapPDU;
354+
this.is = is;
355+
this.os = os;
356+
this.engineSetting = engineSetting;
357+
this.protocol = protocol;
358+
this.threadFinishedListener = threadFinishedListener;
359+
360+
StringBuilder sb = new StringBuilder();
361+
sb.append("ASAPPDUExecutor created - ");
362+
sb.append("folder: " + engineSetting.folder + " | ");
363+
sb.append("engine: " + engineSetting.engine.getClass().getSimpleName() + " | ");
364+
if(engineSetting.listener != null) {
365+
sb.append("listener: " + engineSetting.listener.getClass().getSimpleName());
366+
}
367+
368+
System.out.println(sb.toString());
369+
}
370+
371+
private void finish() {
372+
if(this.threadFinishedListener != null) {
373+
this.threadFinishedListener.finished(this);
374+
}
375+
}
376+
377+
public void run() {
378+
if(engineSetting.engine == null) {
379+
System.err.println("ASAPPDUExecutor called without engine set - fatal");
380+
this.finish();
381+
return;
382+
}
383+
384+
System.out.println("ASAPPDUExecutor calls engine: " + engineSetting.engine.getClass().getSimpleName());
385+
386+
try {
387+
switch (asapPDU.getCommand()) {
388+
case ASAP_1_0.INTEREST_CMD:
389+
System.out.println("ASAPPDUExecutor call handleASAPInterest");
390+
engineSetting.engine.handleASAPInterest((ASAP_Interest_PDU_1_0) asapPDU, protocol, os);
391+
break;
392+
case ASAP_1_0.OFFER_CMD:
393+
System.out.println("ASAPPDUExecutor call handleASAPOffer");
394+
engineSetting.engine.handleASAPOffer((ASAP_OfferPDU_1_0) asapPDU, protocol, os);
395+
break;
396+
case ASAP_1_0.ASSIMILATE_CMD:
397+
System.out.println("ASAPPDUExecutor call handleASAPAssimilate");
398+
engineSetting.engine.handleASAPAssimilate((ASAP_AssimilationPDU_1_0) asapPDU, protocol, is, os,
399+
engineSetting.listener);
400+
break;
401+
402+
default:
403+
System.err.println(
404+
this.getClass().getSimpleName() + ": " + "unknown ASAP command: " + asapPDU.getCommand());
405+
}
406+
}
407+
catch(IOException | ASAPException e) {
408+
System.err.println("Exception while processing ASAP PDU - close streams" + e.getLocalizedMessage());
409+
try {
410+
os.close(); // more important to close than input stream - do it first
411+
is.close();
412+
} catch (IOException ex) {
413+
ex.printStackTrace();
414+
}
415+
}
416+
finally {
417+
this.finish();
418+
}
419+
}
420+
}
421+
}

0 commit comments

Comments
 (0)