-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmb_shell.py
More file actions
executable file
·811 lines (761 loc) · 36 KB
/
mb_shell.py
File metadata and controls
executable file
·811 lines (761 loc) · 36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
#!/usr/bin/env python
import cmd
import pyodbc
import json
import threading
import binascii
from bluepy.btle import Scanner, DefaultDelegate, BTLEException
import re
import sys
import copy
import struct
import datetime
import Queue
import ConfigParser
import argparse
import os
import string
import random
import time
base_route = os.path.dirname(os.path.realpath(__file__))
sys.path.append(base_route + '/lib')
from mibandalarm import MiBandAlarm
from miband2 import MiBand2
from miband3 import MiBand3
import mibanddb as mbdb
ENV_CONFIG="development"
CONFIG_MODE="MB2"
config_route = base_route + "/configuration"
env_route = config_route + "/" + ENV_CONFIG
q = Queue.Queue()
max_connections = 2
# For automated download stablish a period in which we don't download data
# activity_fetch_cooldown = 6 * 60
connected_devices = {}
mibands = {}
try:
env = ConfigParser.ConfigParser()
env.readfp(open(env_route + '/server.conf'))
except Exception as e:
print e
print "unrecognised config mode [%s]" % ENV_CONFIG
sys.exit(-1)
config = ConfigParser.ConfigParser()
config.readfp(open(config_route + '/mb_presets.conf'))
cnxn = {"server": env.get('DATABASE', "server"), "database": env.get('DATABASE', "database"),
"username": env.get('DATABASE', "username"), "password": env.get('DATABASE', "password")}
cnxn_string = ('DRIVER={ODBC Driver 17 for SQL Server};Server='+cnxn["server"]+
';Database='+cnxn["database"]+';uid='+cnxn["username"]+
';pwd='+ cnxn["password"])
class MiBandScanDelegate(DefaultDelegate):
def __init__(self, thresh):
DefaultDelegate.__init__(self)
self.tmp_devices = {}
self.thresh = thresh
def handleDiscovery(self, dev, isNewDev, isNewData):
try:
name = dev.getValue(9)
serv = dev.getValueText(2)
if serv == '0000fee0-0000-1000-8000-00805f9b34fb' and dev.addr and dev.rssi >= self.thresh:
if name == 'MI Band 2':
self.tmp_devices[dev.addr] = {"device": dev, "name": name, "model": "mb2", "strikes": 0}
elif name == 'Mi Band 3':
self.tmp_devices[dev.addr] = {"device": dev, "name": name, "model": "mb3", "strikes": 0}
except Exception as e:
print e
print "ERROR"
def read_json(filename, default="{}"):
try:
f = open(filename)
except IOError:
f = open(filename, 'w')
f.write(default)
f.close()
f = open(filename)
js = json.load(f)
f.close()
return js
def save_local(cmd):
if args.mode == "local":
with open(base_route + '/localdata/registered_devices.json', 'wb') as outfile:
json.dump(cmd.registered_devices, outfile)
with open(base_route + '/localdata/devices_last_sync.json', 'wb') as outfile:
json.dump(cmd.devices_last_sync, outfile)
with open(base_route + '/localdata/devices_alarms.json', 'wb') as outfile:
json.dump(cmd.devices_alarms, outfile)
with open(base_route + '/localdata/devices_keys.json', 'wb') as outfile:
json.dump(cmd.devices_keys, outfile)
# Scanning process that checks for new devices and calculates reputation based on different parameters
# Note that a far away device won't disappear from the scanner devices list but will keep it's data static
# We check for nearby-ness and stilness of devices to recalculate reputation
# With very few reputation, device gets deleted, with enough reputation, activity gets fetched automatically
def scan_miband2(scanner,strikes,thresh):
print("Scanning!")
scanner.clear()
scanner.start()
t = threading.currentThread()
while getattr(t, "do_scan", True):
old_devices = copy.deepcopy(scanner.delegate.tmp_devices)
scanner.process(1)
for d in old_devices.keys():
if d in scanner.delegate.tmp_devices.keys() and (d not in connected_devices.keys()):
if ((old_devices[d]["device"].rssi >= scanner.delegate.tmp_devices[d]["device"].rssi)
or scanner.delegate.tmp_devices[d]["device"].rssi < thresh):
scanner.delegate.tmp_devices[d]["strikes"] += 1
if scanner.delegate.tmp_devices[d]["strikes"] >= strikes:
del scanner.delegate.tmp_devices[d]
print("Stopped scanning...")
scanner.stop()
# Ping thread to check if devices are still alive, this doesn't work well (causes interferences)
def ping_connected(sleeptime):
print("Pinging connected devices...")
t = threading.currentThread()
while getattr(t, "do_ping", True):
for d in connected_devices.keys():
try:
connected_devices[d].char_battery.read()
except Exception as e:
print e
connected_devices[d].force_disconnect()
del connected_devices[d]
time.sleep(sleeptime)
print("Stopped pinging...")
def random_key(length=16):
return ''.join(random.choice(string.ascii_uppercase + string.digits + string.ascii_lowercase) for _ in range(length))
def get_device_name(device):
return device.getValueText(9)
def get_device_model(device):
return DEVICE_MODELS[get_device_name(device)]
def worker(cmd):
while True:
item = q.get()
do_fetch_activity(item, cmd)
q.task_done()
def do_fetch_activity(item, cmd):
print "Fetching MiBand [%s] activity!" % item
if item not in connected_devices.keys():
try:
if not item in cmd.devices_keys.keys():
cmd.devices_keys[item] = random_key()
model = self.models[item]
if model.upper() == "MB2":
mb = MiBand2(addr, self.devices_keys[addr], initialize=False)
elif model.upper() == "MB3":
mb = MiBand3(addr, self.devices_keys[addr], initialize=False)
connected_devices[item] = mb
except BTLEException as e:
print("There was a problem connecting this MiBand, try again later")
print e
try:
if args.mode == "db":
last_sync = mbdb.get_device_last_sync(cnxn_string, item)
else:
last_sync = None
if item in cmd.devices_last_sync.keys():
last_sync = cmd.devices_last_sync[item]
if last_sync != None:
connected_devices[item].setLastSyncDate(last_sync)
connected_devices[item].send_alert(b'\x03')
connected_devices[item].fetch_activity_data()
connected_devices[item].send_alert(b'\x03')
if len(connected_devices[item].getActivityDataBuffer()) > 0:
print "Saving Data to DB..."
if args.mode == "db":
mbdb.write_activity_data(cnxn_string, connected_devices[item])
else:
connected_devices[item].store_activity_data_file(base_route + '/localdata/activity_log/')
print "Finished fetching MiBand [%s] activity!" % item
except BTLEException as e:
print("There was a problem retrieving this MiBand's activity, try again later")
print e
class MiBandCMD(cmd.Cmd):
"""Command Processor for intercating with many MiBands at a time"""
def __init__(self):
cmd.Cmd.__init__(self)
threshold = -70
strikes = 5
pingtimer = 1
self.sc = Scanner()
self.scd = MiBandScanDelegate(threshold)
self.sc.withDelegate(self.scd)
self.mibands = []
self.scan_thread = threading.Thread(target=scan_miband2, args=(self.sc,strikes,threshold))
self.scan_thread.start()
#self.ping_thread = threading.Thread(target=ping_connected, args=(pingtimer,))
#self.ping_thread.start()
for i in range(max_connections):
t = threading.Thread(target=worker, args=(self,))
t.daemon = True
t.start()
self.prompt = 'MBS => '
def exit_safely(self):
self.scan_thread.do_scan = False
#self.ping_thread.do_ping = False
self.scan_thread.join()
print ("Disconnecting from %s devices" % len(connected_devices.values()))
for con in connected_devices.values():
con.disconnect()
return True
def do_devices(self, line):
tmp_mibands = copy.deepcopy(self.scd.tmp_devices)
self.mibands = {k: v["device"] for k, v in tmp_mibands.items()}
self.models = {k: v["model"] for k, v in tmp_mibands.items()}
tmp_strikes = {k: v["strikes"] for k, v in tmp_mibands.items()}
for idx,mb in enumerate(self.mibands.keys()):
name = "Someone"
uid = 0
udata = None
if args.mode == "db":
devid = mbdb.get_device_id(cnxn_string, mb.upper())
devuser = mbdb.get_device_user(cnxn_string, devid)
if devuser != -1:
udata = mbdb.get_user_data(cnxn_string,devuser)
else:
udata = None
else:
# TODO: User Data on local storage???
pass
if udata:
name = udata["alias"]
uid = udata["id"]
model = self.models[self.mibands.keys()[idx]].upper()
str = "[%s]%10s's %s <U:%05d> (%s) %sdB S:%s " % (idx, name, model, uid, mb, self.mibands[self.mibands.keys()[idx]].rssi, "X"*tmp_strikes[self.mibands.keys()[idx]])
if (args.mode == "db" and mbdb.is_device_registered(cnxn_string, mb)) or (args.mode == "json" and mb in self.registered_devices):
str += "[R]"
if mb in connected_devices:
mb_dev = connected_devices[mb]
if args.mode == "db":
mbdb.update_battery(cnxn_string, mb_dev.addr, mb_dev.battery_info['level'])
str += "[C] [B:{0:03d}%]".format(mb.battery_info["level"])
print str
def do_reboot(self, params):
try:
dev_id = int(params)
except ValueError:
print "*** arguments should be numbers"
return
except IndexError:
print "*** alert takes at least one parameter"
return
if dev_id >= len(self.mibands.keys()):
print "*** device not in the device list"
return
if ((args.mode == "db" and mbdb.is_device_registered(cnxn_string, self.mibands.keys()[dev_id]))
or (args.mode == "json" and self.mibands.keys()[dev_id] in self.registered_devices)):
if self.mibands.keys()[dev_id] in connected_devices.keys():
try:
mb = connected_devices[self.mibands.keys()[dev_id]]
mb.reboot()
except BTLEException:
print("There was a problem rebooting this MiBand, try again later")
else:
print("That MiBand is not connected!")
else:
print("That MiBand is not registered")
def do_alert(self, params):
l = params.split()
if len(l)!=2:
print "*** invalid number of arguments"
return
try:
l = [int(i) for i in l]
except ValueError:
print "*** arguments should be numbers"
return
except IndexError:
print "*** alert takes at least one parameter"
return
dev_id = int(l[0])
if dev_id >= len(self.mibands.keys()):
print "*** device not in the device list"
return
alert_int = int(l[1])
if ((args.mode == "db" and mbdb.is_device_registered(cnxn_string, self.mibands.keys()[dev_id]))
or args.mode == "json" and self.mibands.keys()[dev_id] in self.registered_devices):
if self.mibands.keys()[dev_id] in connected_devices.keys():
try:
mb = connected_devices[self.mibands.keys()[dev_id]]
data = struct.pack('B', alert_int)
mb.send_alert(data)
print "Sending Notification: " + binascii.hexlify(data)
except BTLEException:
print("There was a problem alerting this MiBand, try again later")
else:
print("That MiBand is not connected!")
else:
print("That MiBand is not registered")
def do_configure(self, params):
l = params.split()
try:
dev_id = int(l[0])
command = ""
if len(l) > 1:
command = l[1]
except ValueError:
print "*** argument 1 should be number"
return
except IndexError:
print "*** configure takes at least one parameter"
return
if dev_id >= len(self.mibands.keys()):
print "*** device not in the device list"
return
if command == "":
print("Using default configuration preset [%s]" % CONFIG_MODE)
command = CONFIG_MODE
if not config.has_section(command):
print "*** invalid configuration preset '%s'" % command
return
self.configure_miband(dev_id, command)
def configure_miband(self, dev_id, preset):
if dev_id >= len(self.mibands.keys()):
print "*** device not in the device list"
return
if ((args.mode == "db" and mbdb.is_device_registered(cnxn_string, self.mibands.keys()[dev_id]))
or args.mode == "json" and self.mibands.keys()[dev_id] in self.registered_devices):
if self.mibands.keys()[dev_id] in connected_devices.keys():
try:
mb = connected_devices[self.mibands.keys()[dev_id]]
print("Configuring MiBand to [%s] presets" % preset)
if config.has_option(preset, "MonitorHRSleep"):
mb.monitorHeartRateSleep(config.getint(preset, "MonitorHRSleep"))
if config.has_option(preset, "MonitorHRInterval"):
mb.setMonitorHeartRateInterval(config.getint(preset, "MonitorHRInterval"))
if config.has_option(preset, "DisplayTimeFormat"):
mb.setDisplayTimeFormat(config.get(preset, "DisplayTimeFormat"))
if config.has_option(preset, "DisplayTimeHours"):
mb.setDisplayTimeHours(config.getint(preset, "DisplayTimeHours"))
if config.has_option(preset, "DistanceUnit"):
mb.setDistanceUnit(config.get(preset, "DistanceUnit"))
if config.has_option(preset, "LiftWristActivate"):
mb.setLiftWristToActivate(config.getint(preset, "LiftWristActivate"))
if config.has_option(preset, "RotateWristSwitch"):
mb.setRotateWristToSwitchInfo(config.getint(preset, "RotateWristSwitch"))
if config.has_option(preset, "DisplayItems"):
disp = [x.strip() for x in config.get(preset, 'DisplayItems').split(',')]
steps = True if 'steps' in disp else False
distance = True if 'distance' in disp else False
calories = True if 'calories' in disp else False
heartrate = True if 'heartrate' in disp else False
battery = True if 'battery' in disp else False
mb.setDisplayItems(steps=steps, distance=distance, calories=calories, heartrate=heartrate, battery=battery)
if config.has_option(preset, "DoNotDisturb"):
enableLift = config.getint(preset, "DoNotDisturbLift") if config.has_option(preset, "DoNotDisturbLift") else 1
mb.setDoNotDisturb(config.get(preset, "DoNotDisturb"), enableLift=enableLift)
if config.has_option(preset, "InactivityWarnings"):
start = config.getint(preset, "InactivityWarningsStart") if config.has_option(preset, "InactivityWarningsStart") else 8
end = config.getint(preset, "InactivityWarningsEnd") if config.has_option(preset, "InactivityWarningsEnd") else 19
threshold = config.getint(preset, "InactivityWarningsThresholdHours") if config.has_option(preset, "InactivityWarningsThresholdHours") else 1
mb.setInactivityWarnings(config.getint(preset, "InactivityWarnings"), threshold=threshold*60, start=(start, 0), end=(end, 0))
if config.has_option(preset, "DisplayCaller"):
mb.setDisplayCaller(config.getint(preset, "DisplayCaller"))
except BTLEException as e:
print("There was a problem configuring this MiBand, try again later")
print e
else:
print("That MiBand is not connected, please connect it before configuring.")
else:
print("That MiBand is not registered, please register it before configuring.")
def do_setuser(self, params):
try:
l = params.split()
dev_id = int(l[0])
if args.mode == "db":
user_id = int(l[1])
else:
# TODO: Not persisted
user_alias = l[1]
if l[2] == "M":
user_gender = 0
elif l[2] == "F":
user_gender = 1
else:
user_gender = 2
user_bd_year = int(l[3])
user_bd_month = int(l[4])
user_bd_day = 0
user_weight = float(l[5])
user_height = int(l[6])
position = None
if l[2] == "left":
position = (0, "left")
elif l[2] == "right":
position = (1, "right")
else:
print("*** only left and right supported")
return
except ValueError:
print "*** argument should be number"
return
except IndexError:
print "*** setuser takes at least one parameter"
return
if dev_id >= len(self.mibands.keys()):
print "*** device not in the device list"
return
if args.mode == "db":
udata = mbdb.get_user_data(cnxn_string, user_id)
if udata or args.mode == "json":
if ((args.mode == "db" and mbdb.is_device_registered(cnxn_string, self.mibands.keys()[dev_id]))
or args.mode == "json" and self.mibands.keys()[dev_id] in self.registered_devices):
if self.mibands.keys()[dev_id] in connected_devices.keys():
mb = connected_devices[self.mibands.keys()[dev_id]]
if args.mode == "db":
if mbdb.set_device_user(cnxn_string, mb.addr, user_id, position[0]):
mb.setUserInfo(udata["alias"], udata["sex"], udata["height"], udata["weight"], udata["birth"])
else:
mb.setUserInfo(user_alias, user_gender, user_height, user_weight, (user_bd_year, user_bd_month, user_bd_day))
mb.setWearLocation(position[1])
else:
print("MiBand should be connected before setting user data")
else:
print("MiBand should be registered before setting user data")
else:
print("*** user with id %s doesn't exist" % user_id)
def do_reluser(self, params):
if args.mode == "db":
try:
l = params.split()
dev_id = int(l[0])
user_id = int(l[1])
except ValueError:
print "*** argument should be number"
return
except IndexError:
print "*** reluser takes at least one parameter"
return
if dev_id >= len(self.mibands.keys()):
print "*** device not in the device list"
return
udata = mbdb.get_user_data(cnxn_string, user_id)
if udata:
if mbdb.is_device_registered(cnxn_string, self.mibands.keys()[dev_id]):
if self.mibands.keys()[dev_id] in connected_devices.keys():
mb = connected_devices[self.mibands.keys()[dev_id]]
if mbdb.release_device_user(cnxn_string, mb.addr, user_id):
print "MiBand Released from user"
else:
print "There was a problem releasing this MiBand"
else:
print("MiBand should be connected before releasing user data")
else:
print("MiBand should be registered before releasing user data")
else:
print("*** user with id %s doesn't exist" % user_id)
else:
# TODO: If storage, release properly
print("This operation is only available for DB mode")
def do_connect(self, params):
try:
l = int(params)
except ValueError:
print "*** argument should be number"
return
except IndexError:
print "*** connect takes at least one parameter"
return
dev_id = l
if dev_id >= len(self.mibands.keys()):
print "*** device not in the device list"
return
if len(connected_devices.keys()) >= 5:
print("Can't connect to more than 5 devices at the same time, disconnect some")
else:
if ((args.mode == "db" and mbdb.is_device_registered(cnxn_string, self.mibands.keys()[dev_id]))
or args.mode == "json" and self.mibands.keys()[dev_id] in self.registered_devices):
if self.mibands.keys()[dev_id] in connected_devices.keys():
print("That MiBand is already connected")
else:
try:
addr = self.mibands.keys()[dev_id]
model = self.models[addr]
if not addr in self.devices_keys.keys():
self.devices_keys[addr] = random_key()
if model.upper() == "MB2":
mb = MiBand2(addr, self.devices_keys[addr], initialize=False)
elif model.upper() == "MB3":
mb = MiBand3(addr, self.devices_keys[addr], initialize=False)
connected_devices[self.mibands.keys()[dev_id]] = mb
if args.mode == "db":
alarms = mbdb.get_device_alarms(cnxn_string, mb.addr)
mbdb.update_battery(cnxn_string, mb.addr, mb.battery_info['level'])
else:
if mb.addr in self.devices_alarms.keys():
alarms = self.devices_alarms[mb.addr]
else:
alarms = []
for a in alarms:
mb.alarms += [MiBandAlarm(a["hour"], a["minute"], enabled=a["enabled"], repetitionMask=a["repetition"])]
except BTLEException as e:
print("There was a problem connecting to this MiBand, try again later")
print e
else:
print("You have to register the MiBand before connecting to it")
def do_disconnect(self, params):
try:
l = int(params)
except ValueError:
print "*** argument should be number"
return
except IndexError:
print "*** disconnect takes at least one parameter"
return
dev_id = l
if dev_id >= len(self.mibands.keys()):
print "*** device not in the device list"
return
if self.mibands.keys()[dev_id] in connected_devices.keys():
try:
mb = connected_devices[self.mibands.keys()[dev_id]]
mb.disconnect()
del connected_devices[self.mibands.keys()[dev_id]]
del mb
print ("MiBand disconnected!")
except BTLEException as e:
print("There was a problem disconnecting this MiBand, try again later")
print e
else:
print("That MiBand isn't connected!")
def do_register(self, params):
try:
l = int(params)
except ValueError:
print "*** argument should be number"
return
except IndexError:
print "*** register takes at least one parameter"
return
dev_id = l
if dev_id >= len(self.mibands.keys()):
print "*** device not in the device list"
return
if ((args.mode == "db" and mbdb.is_device_registered(cnxn_string, self.mibands.keys()[dev_id]))
or args.mode == "json" and self.mibands.keys()[dev_id] in self.registered_devices):
print("That MiBand is already registered")
else:
mb = None
try:
addr = self.mibands.keys()[dev_id]
model = self.models[addr]
if not addr in self.devices_keys.keys():
self.devices_keys[addr] = random_key()
if model.upper() == "MB2":
mb = MiBand2(addr, self.devices_keys[addr], initialize=False)
elif model.upper() == "MB3":
mb = MiBand3(addr, self.devices_keys[addr], initialize=False)
mb.cleanAlarms()
if args.mode == "db":
dev_id = mbdb.get_device_id(cnxn_string, mb.addr)
mbdb.delete_all_alarms(cnxn_string, dev_id)
mbdb.register_device(cnxn_string, mb.addr)
mbdb.update_battery(cnxn_string, mb.addr, mb.battery_info['level'])
else:
self.registered_devices += [mb.addr]
# Device stays connected after initialize, but we don't want that
mb.disconnect()
except BTLEException as e:
print("There was a problem registering this MiBand, try again later")
print e
except KeyError as e:
print("Device was kicked out")
print e
def do_unregister(self, params):
try:
l = int(params)
except ValueError:
print "*** argument should be number"
return
except IndexError:
print "*** unregister takes at least one parameter"
return
dev_id = l
if dev_id >= len(self.mibands.keys()):
print "*** device not in the device list"
return
if ((args.mode == "db" and mbdb.is_device_registered(cnxn_string, self.mibands.keys()[dev_id]))
or args.mode == "json" and self.mibands.keys()[dev_id] in self.registered_devices):
if not self.mibands.keys()[dev_id] in connected_devices.values():
try:
if args.mode == "db":
mbdb.unregister_device(cnxn_string, self.mibands.keys()[dev_id])
mbdb.delete_all_alarms(cnxn_string, self.mibands.keys()[dev_id])
else:
self.registered_devices.remove(self.mibands.keys()[dev_id])
del self.devices_keys[self.mibands.keys()[dev_id]]
print("MiBand unregistered!")
except BTLEException:
print("There was a problem unregistering this MiBand, try again later")
else:
print("Disconnect the miBand2 first!")
else:
print("That MiBand is not registered")
def do_activity(self, params):
try:
l = int(params)
except ValueError:
print "*** argument should be number"
return
except IndexError:
print "*** activity takes at least one parameter"
return
dev_id = l
if dev_id >= len(self.mibands.keys()):
print "*** device not in the device list"
return
if ((args.mode == "db" and mbdb.is_device_registered(cnxn_string, self.mibands.keys()[dev_id]))
or args.mode == "json" and self.mibands.keys()[dev_id] in self.registered_devices):
if self.mibands.keys()[dev_id] in connected_devices.keys():
q.put(self.mibands.keys()[dev_id])
q.join()
else:
print("MiBand should be connected before fetching activity data")
else:
print("MiBand should be registered before fetching activity data")
def do_alarms(self, params):
l = params.split()
try:
dev_id = int(l[0])
command = "list"
if len(l) > 1:
command = l[1]
except ValueError:
print "*** argument 1 should be number"
return
except IndexError:
print "*** alarms takes at least one parameter"
return
if command not in ['list', 'queue', 'set', 'toggle', 'toggleday', 'delete', 'clear']:
print "*** invalid alarm command, see help"
return
if dev_id >= len(self.mibands.keys()):
print "*** device not in the device list"
return
if ((args.mode == "db" and mbdb.is_device_registered(cnxn_string, self.mibands.keys()[dev_id]))
or args.mode == "json" and self.mibands.keys()[dev_id] in self.registered_devices):
if self.mibands.keys()[dev_id] in connected_devices.keys():
mb = connected_devices[self.mibands.keys()[dev_id]]
if args.mode == "db":
alarms = mbdb.get_device_alarms(cnxn_string, self.mibands.keys()[dev_id])
else:
if self.mibands.keys()[dev_id] in self.devices_alarms.keys():
alarms = self.devices_alarms[self.mibands.keys()[dev_id]]
else:
alarms = []
if command == 'list':
if len(alarms) > 0:
for idx,a in enumerate(mb.alarms):
print "[%s]" % idx + str(a)
if command == 'clear':
if len(alarms) > 0:
mb.cleanAlarms()
if args.mode == "db":
mbdb.delete_all_alarms(cnxn_string, mb.addr)
else:
self.devices_alarms[self.mibands.keys()[dev_id]] = []
elif command == 'queue':
try:
hour, minute = map(lambda x: int(x), l[2].split(":"))
alarm_id = mb.queueAlarm(hour, minute)
if args.mode == "db":
mbdb.set_alarm(cnxn_string, mb.addr, mb.alarms[alarm_id], alarm_id)
else:
if len(alarms) > 0:
self.devices_alarms[self.mibands.keys()[dev_id]] += [{"enabled": True, "repetition": 128, "hour": hour, "minute": minute}]
else:
self.devices_alarms[self.mibands.keys()[dev_id]] = [{"enabled": True, "repetition": 128, "hour": hour, "minute": minute}]
except IndexError:
print "*** queue takes an hour parameter in format HH:MM"
except ValueError:
print "*** queue takes an hour parameter in format HH:MM"
elif command == 'delete':
try:
alarm_id = int(l[2])
mb.deleteAlarm(alarm_id)
if len(alarms) > 0:
if args.mode == "db":
mbdb.delete_alarm(cnxn_string, mb.addr, alarm_id)
else:
del self.devices_alarms[self.mibands.keys()[dev_id]][alarm_id]
except IndexError:
print "*** delete takes an alarm_id parameter"
except ValueError:
print "*** delete's alarm_id should be a number"
elif command == 'toggle':
try:
alarm_id = int(l[2])
mb.toggleAlarm(alarm_id)
if args.mode == "db":
mbdb.set_alarm(cnxn_string, mb.addr, mb.alarms[alarm_id], alarm_id)
else:
self.devices_alarms[self.mibands.keys()[dev_id]][alarm_id]["enabled"] = mb.alarms[alarm_id].enabled
except IndexError:
print "*** toggle takes an alarm_id parameter"
except ValueError:
print "*** toggle's alarm_id should be a number"
elif command == 'toggleday':
try:
alarm_id = int(l[2])
day_id = int(l[3])
if day_id not in range(1,8):
print "*** day_id should be between 1 (Monday) and 7 (Sunday)"
return
else:
mb.toggleAlarmDay(alarm_id, day_id-1)
if args.mode == "db":
mbdb.set_alarm(cnxn_string, mb.addr, mb.alarms[alarm_id], alarm_id)
else:
self.devices_alarms[self.mibands.keys()[dev_id]][alarm_id]["repetition"] = mb.alarms[alarm_id].repetitionMask
except IndexError:
print "*** toggleday takes an alarm_id parameter and a day_id parameter (1-7)"
except ValueError:
print "*** toggleday's alarm_id and day_id should be both numbers"
elif command == "set":
try:
alarm_id = int(l[2])
hour, minute = map(lambda x: int(x), l[3].split(":"))
mb.changeAlarmTime(alarm_id, hour, minute)
if args.mode == "db":
mbdb.set_alarm(cnxn_string, mb.addr, mb.alarms[alarm_id], alarm_id)
else:
self.devices_alarms[self.mibands.keys()[dev_id]][alarm_id]["hour"] = mb.alarms[alarm_id].hour
self.devices_alarms[self.mibands.keys()[dev_id]][alarm_id]["minute"] = mb.alarms[alarm_id].minute
except IndexError:
print "*** set takes an alarm_id parameter and an hour parameter in format HH:MM"
except ValueError:
print "*** toggleday's alarm_id and hour (HH:MM) should be both numbers"
else:
print("MiBand should be connected before viewing/changing alarms")
else:
print("MiBand should be registered before viewing/changing alarms")
def do_save(self, line):
if args.mode == "json":
print("Saving local data")
save_local(self)
else:
print("This command is only available to local mode")
def do_exit(self, line):
print("Saving local data before exiting")
save_local(self)
return self.exit_safely()
def do_EOF(self, line):
print("Saving local data before exiting")
save_local(self)
return self.exit_safely()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='MB2 Command Shell')
parser.add_argument('-m', '--mode', default="json", choices=("json", "db"),
help='Storage mode')
args = parser.parse_args()
mb2cmd = MiBandCMD()
if args.mode == "db":
try:
pyodbc.connect(cnxn_string, timeout=3)
except pyodbc.OperationalError as e:
print str(e[1])
sys.exit(-1)
elif args.mode == "json":
mb2cmd.registered_devices = read_json(base_route + '/localdata/registered_devices.json', default="[]")
mb2cmd.devices_last_sync = read_json(base_route + '/localdata/devices_last_sync.json')
mb2cmd.devices_alarms = read_json(base_route + '/localdata/devices_alarms.json')
mb2cmd.devices_keys = read_json(base_route + '/localdata/devices_keys.json')
mb2cmd.cmdloop()