Skip to content

Commit 1743521

Browse files
committed
Encounter Manager ready to be used in large scale a-hoc network.
1 parent f2e1271 commit 1743521

File tree

14 files changed

+352
-121
lines changed

14 files changed

+352
-121
lines changed

src/main/java/net/sharksystem/asap/ASAPEncounterManagerAdmin.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,37 @@ public interface ASAPEncounterManagerAdmin {
1717
/**
1818
* @return set of ID to which an open connection exists right now.
1919
*/
20-
public abstract Set<CharSequence> getConnectedPeerIDs();
20+
Set<CharSequence> getConnectedPeerIDs();
2121

2222
/**
2323
* Add a peerID to what no connection should be established.
2424
* Note: Adding a peer to the deny list does not necessarily terminate an
2525
* existing connection to that peer.
2626
* @param peerID
2727
*/
28-
public abstract void addToDenyList(CharSequence peerID);
28+
void addToDenyList(CharSequence peerID);
2929

3030
/**
3131
* Remove a peerID from deny list
3232
* @param peerID
3333
*/
34-
public abstract void removeFromDenyList(CharSequence peerID);
34+
void removeFromDenyList(CharSequence peerID);
3535

3636
/**
3737
* Get PeerID set to which no connection should be established
3838
* @return
3939
*/
40-
public abstract Set<CharSequence> getDenyList();
40+
Set<CharSequence> getDenyList();
41+
42+
/**
43+
* remove all entries from deny list
44+
*/
45+
void clearDenyList();
4146

4247
/**
4348
* Cancel a connection to a peer. This method call does not change the deny list.
4449
* @param peerID
4550
*/
46-
public abstract void cancelConnection(CharSequence peerID);
51+
void cancelConnection(CharSequence peerID);
4752

4853
}

src/main/java/net/sharksystem/asap/ASAPEncounterManagerImpl.java

Lines changed: 113 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,25 @@
11
package net.sharksystem.asap;
22

3+
import net.sharksystem.SharkException;
4+
import net.sharksystem.asap.fs.ExtraDataFS;
35
import net.sharksystem.asap.protocol.ASAPConnection;
46
import net.sharksystem.asap.protocol.ASAPConnectionListener;
7+
import net.sharksystem.asap.utils.ASAPSerialization;
58
import net.sharksystem.asap.utils.PeerIDHelper;
69
import net.sharksystem.utils.streams.StreamPair;
710
import net.sharksystem.utils.Log;
811

912
import java.io.*;
1013
import java.util.*;
1114

12-
public class ASAPEncounterManagerImpl implements ASAPEncounterManager,
13-
ASAPEncounterManagerAdmin,
15+
public class ASAPEncounterManagerImpl implements ASAPEncounterManager, ASAPEncounterManagerAdmin,
1416
ASAPConnectionListener {
1517
public static final long DEFAULT_WAIT_BEFORE_RECONNECT_TIME = 1000; // a second - debugging
1618
public static final long DEFAULT_WAIT_TO_AVOID_RACE_CONDITION = 500; // milliseconds - worked fine with BT.
19+
public static final String DATASTORAGE_FILE_EXTENSION = "em";
20+
private static final CharSequence ENCOUNTER_MANAGER_DENY_LIST_KEY = "denylist";
21+
private final CharSequence peerID;
22+
private ExtraDataFS extraDataStorage = null;
1723

1824
private int randomValue;
1925
private long waitBeforeReconnect;
@@ -41,23 +47,35 @@ public class ASAPEncounterManagerImpl implements ASAPEncounterManager,
4147
/** remember remote address of peers (they can have more than one): peerID -> remote address */
4248
private Map<CharSequence, Set<CharSequence>> peerRemoteAddresses = new HashMap<>();
4349

44-
public ASAPEncounterManagerImpl(ASAPConnectionHandler asapConnectionHandler) {
45-
this(asapConnectionHandler, DEFAULT_WAIT_BEFORE_RECONNECT_TIME);
50+
public ASAPEncounterManagerImpl(ASAPConnectionHandler asapConnectionHandler, CharSequence peerID) throws SharkException, IOException {
51+
this(asapConnectionHandler, peerID, DEFAULT_WAIT_BEFORE_RECONNECT_TIME);
4652
}
4753

48-
public ASAPEncounterManagerImpl(ASAPConnectionHandler asapConnectionHandler, long waitingPeriod) {
54+
public ASAPEncounterManagerImpl(ASAPConnectionHandler asapConnectionHandler, CharSequence peerID,
55+
long waitingPeriod, CharSequence rootFolder) throws SharkException, IOException {
4956
this.asapConnectionHandler = asapConnectionHandler;
57+
this.peerID = peerID;
5058
this.randomValue = new Random(System.currentTimeMillis()).nextInt();
5159
this.waitBeforeReconnect = waitingPeriod;
52-
this.restoreDenyList();
60+
61+
if(rootFolder != null && rootFolder.length() > 0) {
62+
// create folder
63+
this.extraDataStorage = new ExtraDataFS(rootFolder + "/" + ENCOUNTER_MANAGER_DENY_LIST_KEY, DATASTORAGE_FILE_EXTENSION);
64+
this.restoreDenyList();
65+
}
66+
}
67+
68+
public ASAPEncounterManagerImpl(ASAPConnectionHandler asapConnectionHandler, CharSequence peerID,
69+
long waitingPeriod) throws SharkException, IOException {
70+
this(asapConnectionHandler, peerID, waitingPeriod, null);
5371
}
5472

5573
private boolean coolDownOver(CharSequence id, ASAPEncounterConnectionType connectionType) {
5674
Date now = new Date();
5775
Date lastEncounter = this.encounterDate.get(id);
5876

5977
if(lastEncounter == null) {
60-
Log.writeLog(this, this.toString(), "device/peer not in encounteredDevices - should connect");
78+
Log.writeLog(this, this.toString(), "device/peer not in encounteredDevices");
6179
this.encounterDate.put(id, now);
6280
return true;
6381
}
@@ -87,7 +105,13 @@ private boolean coolDownOver(CharSequence id, ASAPEncounterConnectionType connec
87105
}
88106

89107
@Override
90-
public boolean shouldCreateConnectionToPeer(CharSequence remoteAdressOrPeerID, ASAPEncounterConnectionType connectionType) {
108+
public boolean shouldCreateConnectionToPeer(CharSequence remoteAdressOrPeerID,
109+
ASAPEncounterConnectionType connectionType) {
110+
Log.writeLog(this, this.toString(), "should connect to " + remoteAdressOrPeerID + " ?");
111+
// on deny list?
112+
if(this.denyList.contains(remoteAdressOrPeerID)) return false;
113+
Log.writeLog(this, this.toString(), remoteAdressOrPeerID + " not on deny list");
114+
91115
// do we have a connection under a peerID?
92116
StreamPair streamPair = this.openStreamPairs.get(remoteAdressOrPeerID);
93117
if(streamPair != null) {
@@ -131,29 +155,39 @@ public void handleEncounter(StreamPair streamPair, ASAPEncounterConnectionType c
131155

132156
private void handleEncounter(StreamPair streamPair, ASAPEncounterConnectionType connectionType, boolean initiator,
133157
boolean raceCondition) throws IOException {
158+
// always exchange peerIDs
159+
DataOutputStream dos = new DataOutputStream(streamPair.getOutputStream());
160+
dos.writeUTF(this.peerID.toString());
161+
DataInputStream dis = new DataInputStream(streamPair.getInputStream());
162+
String remotePeerID = dis.readUTF();
163+
164+
if(remotePeerID != null && remotePeerID.length() > 0) {
165+
streamPair.setEndpointID(remotePeerID);
166+
}
134167

135-
CharSequence streamPairID = streamPair.getSessionID();
168+
CharSequence connectionID = streamPair.getEndpointID();
169+
if(connectionID == null || connectionID.length() == 0) connectionID = streamPair.getSessionID();
136170

137-
Log.writeLog(this, this.toString(), "socket called: handle new encounter" + streamPair);
171+
Log.writeLog(this, this.toString(), "decide whether to pursue this new encounter: " + streamPair);
138172

139173
// should we connect in the first place
140-
if (!this.shouldCreateConnectionToPeer(streamPairID, connectionType)) {
174+
if (!this.shouldCreateConnectionToPeer(connectionID, connectionType)) {
141175
// no - than shut it down.
176+
Log.writeLog(this, this.toString(),
177+
"close connection (on deny list or in cool down)");
142178
streamPair.close();
143179
return;
144180
}
145181

146182
// new stream pair is ok. Is there a race condition expected ?
147183
if(raceCondition) {
148184
// avoid the nasty race condition
149-
boolean waited = this.waitBeforeASAPSessionLaunch(
150-
streamPair.getInputStream(),
151-
streamPair.getOutputStream(),
152-
initiator, DEFAULT_WAIT_TO_AVOID_RACE_CONDITION);
185+
Log.writeLog(this, this.toString(), "solve race condition");
186+
boolean waited = this.solveRaceCondition(streamPair, initiator, DEFAULT_WAIT_TO_AVOID_RACE_CONDITION);
153187

154188
// ask again?
155189
if (waited) {
156-
if (!this.shouldCreateConnectionToPeer(streamPairID, connectionType)) {
190+
if (!this.shouldCreateConnectionToPeer(connectionID, connectionType)) {
157191
streamPair.close();
158192
return;
159193
}
@@ -162,7 +196,7 @@ private void handleEncounter(StreamPair streamPair, ASAPEncounterConnectionType
162196

163197
// we a through with it - remember that new stream pair
164198
Log.writeLog(this, this.toString(), "remember streamPair: " + streamPair);
165-
this.openStreamPairs.put(streamPairID, streamPair);
199+
this.openStreamPairs.put(connectionID, streamPair);
166200

167201
Log.writeLog(this, this.toString(), "going to launch a new asap connection");
168202

@@ -174,35 +208,46 @@ private void handleEncounter(StreamPair streamPair, ASAPEncounterConnectionType
174208

175209
asapConnection.addASAPConnectionListener(this);
176210

177-
this.openASAPConnections.put(asapConnection, streamPairID);
211+
this.openASAPConnections.put(asapConnection, connectionID);
178212

179213
} catch (IOException | ASAPException e) {
180214
Log.writeLog(this, this.toString(), "while launching asap connection: "
181215
+ e.getLocalizedMessage());
182216
}
183217
}
184218

185-
private boolean waitBeforeASAPSessionLaunch(InputStream is, OutputStream os, boolean connectionInitiator,
186-
long waitInMillis) throws IOException {
219+
private boolean solveRaceCondition(StreamPair streamPair, boolean connectionInitiator,
220+
long waitInMillis) throws IOException {
187221
// run a little negotiation before we start
188-
DataOutputStream dos = new DataOutputStream(os);
222+
DataOutputStream dos = new DataOutputStream(streamPair.getOutputStream());
189223
int remoteValue = 0;
224+
String remotePeerID = null;
190225

191226
try {
227+
// write protocol unit
192228
dos.writeInt(this.randomValue);
193-
DataInputStream dis = new DataInputStream(is);
229+
dos.writeUTF(this.peerID.toString());
230+
231+
// read it
232+
DataInputStream dis = new DataInputStream(streamPair.getInputStream());
194233
remoteValue = dis.readInt();
234+
remotePeerID = dis.readUTF();
235+
236+
if(remotePeerID != null && remotePeerID.length() > 0) streamPair.setEndpointID(remotePeerID);
195237
} catch (IOException e) {
196238
// decision is made - this connection is dead
197-
os.close();
198-
is.close();
239+
streamPair.close();
199240
}
200241

201242
StringBuilder sb = new StringBuilder();
202-
sb.append("try to solve race condition: localValue == ");
243+
sb.append("try to solve race condition: random (local/remote) == ");
203244
sb.append(this.randomValue);
204-
sb.append(" | remoteValue == ");
245+
sb.append("/");
205246
sb.append(remoteValue);
247+
sb.append(" | peerID == ");
248+
sb.append(this.peerID);
249+
sb.append("/");
250+
sb.append(remotePeerID);
206251
sb.append(" | initiator == ");
207252
sb.append(connectionInitiator);
208253

@@ -301,30 +346,59 @@ public synchronized void asapConnectionTerminated(Exception terminatingException
301346
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
302347

303348
private Set<CharSequence> denyList = new HashSet<>();
304-
305-
//// housekeeping
306-
private void restoreDenyList() {
307-
// TODO
308-
//this.denyList = new HashSet<>();
309-
Log.writeLog(this, "need to implement restoreDenyList");
349+
350+
//// housekeeping deny list
351+
public void clearDenyList() {
352+
this.denyList = new HashSet<>();
353+
try {
354+
this.saveDenyList();
355+
} catch (IOException | SharkException e) {
356+
Log.writeLogErr(this, this.toString(), "cannot persist deny list: " + e.getLocalizedMessage());
357+
}
358+
}
359+
private void restoreDenyList() throws SharkException, IOException {
360+
if(this.extraDataStorage == null) {
361+
Log.writeLog(this, this.toString(), "no persistent storage for deny list");
362+
} else {
363+
byte[] denyListBytes = this.extraDataStorage.getExtra(ENCOUNTER_MANAGER_DENY_LIST_KEY);
364+
if(denyListBytes != null && denyListBytes.length > 0) {
365+
ByteArrayInputStream bais = new ByteArrayInputStream(denyListBytes);
366+
this.denyList = ASAPSerialization.readCharSequenceSetParameter(bais);
367+
} else {
368+
this.clearDenyList();
369+
}
370+
}
310371
}
311372

312-
private void saveDenyList() {
313-
// TODO
314-
Log.writeLog(this, "need to implement saveDenyList");
373+
private void saveDenyList() throws IOException, SharkException {
374+
if(this.extraDataStorage == null) {
375+
Log.writeLog(this, this.toString(), "no persistent storage for deny list");
376+
} else {
377+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
378+
ASAPSerialization.writeCharSequenceSetParameter(this.denyList, baos);
379+
this.extraDataStorage.putExtra(ENCOUNTER_MANAGER_DENY_LIST_KEY, baos.toByteArray());
380+
}
315381
}
316382

317383
///// manage deny list
318384
@Override
319385
public void addToDenyList(CharSequence peerID) {
320386
this.denyList.add(peerID);
321-
this.saveDenyList();
387+
try {
388+
this.saveDenyList();
389+
} catch (IOException | SharkException e) {
390+
Log.writeLogErr(this, this.toString(), "cannot persist deny list: " + e.getLocalizedMessage());
391+
}
322392
}
323393

324394
@Override
325395
public void removeFromDenyList(CharSequence peerID) {
326396
this.denyList.remove(peerID);
327-
this.saveDenyList();
397+
try {
398+
this.saveDenyList();
399+
} catch (IOException | SharkException e) {
400+
Log.writeLogErr(this, this.toString(), "cannot persist deny list: " + e.getLocalizedMessage());
401+
}
328402
}
329403

330404
@Override
@@ -349,7 +423,8 @@ public void cancelConnection(CharSequence peerID) {
349423
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
350424

351425
public String toString() {
352-
return this.asapConnectionHandler.toString();
426+
if(this.asapConnectionHandler != null) return this.asapConnectionHandler.toString();
427+
else return "null";
353428
}
354429

355430
}

src/main/java/net/sharksystem/asap/apps/TCPServerSocketAcceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public TCPServerSocketAcceptor(int portNumber, ASAPEncounterManager encounterMan
1616
this.encounterManager = encounterManager;
1717
SocketFactory socketFactory = new SocketFactory(portNumber, this);
1818

19-
Log.writeLog(this, "start socket factory");
19+
Log.writeLog(this, "start socket factory - no race condition assumed");
2020
new Thread(socketFactory).start();
2121
}
2222

src/main/java/net/sharksystem/asap/fs/ExtraDataFS.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@ public ExtraDataFS(CharSequence rootFolderName) throws SharkException, IOExcepti
2020

2121
}
2222

23-
public ExtraDataFS(CharSequence rootFolderName, CharSequence fileExtension) throws IOException, SharkException {
24-
this.rootFolderName = rootFolderName;
25-
this.fileExtension = fileExtension;
23+
public ExtraDataFS(CharSequence rootFolder, CharSequence fileName) throws IOException, SharkException {
24+
this.rootFolderName = rootFolder;
25+
this.fileExtension = fileName;
26+
File rootFolderFile = new File(rootFolder.toString());
27+
if(!rootFolderFile.exists()) {
28+
// create
29+
rootFolderFile.mkdirs();
30+
}
2631
this.restoreExtraData();
2732
}
2833

@@ -76,7 +81,8 @@ public void saveExtraData() throws IOException {
7681
// write key
7782
ASAPSerialization.writeCharSequenceParameter(key, os);
7883
// value
79-
ASAPSerialization.writeByteArray(this.extraData.get(key), os);
84+
byte[] v = this.extraData.get(key);
85+
ASAPSerialization.writeByteArray(v, os);
8086
}
8187

8288
os.close();

src/main/java/net/sharksystem/utils/streams/StreamPair.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,7 @@ public interface StreamPair {
1616

1717
CharSequence getSessionID();
1818

19-
CharSequence getEndpointAddress();
19+
CharSequence getEndpointID();
20+
21+
void setEndpointID(CharSequence peerID);
2022
}

0 commit comments

Comments
 (0)