-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathUpdateStayDuration.py
More file actions
151 lines (120 loc) · 5.23 KB
/
UpdateStayDuration.py
File metadata and controls
151 lines (120 loc) · 5.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
'''
Update the duration of each stay information of one user
if the duration is smaller than duration constraint threshold, remove the point from the cluster center
input:
user stay information
duration constraint threshold
outout:
updated user stay information
'''
import sys, json,os, psutil, csv, time, func_timeout
import numpy as np
from distance import distance
from class_cluster import cluster
from collections import defaultdict
from multiprocessing import Pool
from multiprocessing import current_process, Lock, cpu_count
def init(l):
global lock
lock = l
def update_duration(user, dur_constr):
"""
:param user:
:return:
"""
for d in user.keys():
for trace in user[d]: trace[9] = -1 # clear needed! #modify grid
i = 0
j = i
while i < len(user[d]):
if j >= len(user[d]): # a day ending with a stay, j goes beyond the last observation
dur = str(int(user[d][j - 1][0]) + max(0, int(user[d][j - 1][9])) - int(user[d][i][0]))
for k in range(i, j, 1):
user[d][k][9] = dur
break
if user[d][j][6] == user[d][i][6] and user[d][j][7] == user[d][i][7] and j < len(user[d]):
j += 1
else:
dur = str(int(user[d][j - 1][0]) + max(0, int(user[d][j - 1][9])) - int(user[d][i][0]))
for k in range(i, j, 1):
user[d][k][9] = dur
i = j
for d in user.keys():
for trace in user[d]:
# those trace with gps as -1,-1 (not clustered) should not assign a duration
# print(trace)
# if(str(trace[6])[-2]=='.'): continue
if float(trace[6]) == -1: trace[9] = -1
## our default output format: give -1 to non-stay records
if float(trace[9]) < dur_constr: # change back keep full trajectory: do not use center for those are not stays
trace[6], trace[7], trace[8], trace[9] = -1, -1, -1, -1 # for no stay, do not give center
return user
def func(args):
name, user, dur_constraint, outputFile = args
try:
user = update_duration(user,dur_constraint)
with lock:
f = open(outputFile, 'a')
writeCSV = csv.writer(f, delimiter=',')
for day in sorted(user.keys()):
for trace in user[day]:
trace[1] = name
writeCSV.writerow(trace)
f.close()
except:
return
if __name__ == '__main__':
inputFile = sys.argv[1]
outputFile = sys.argv[2]
duration_constraint = int(sys.argv[3])
outputFile = outputFile.replace('.csv','_tmp.csv')
f = open(outputFile, 'w')
f.write('unix_start_t,user_ID,mark_1,orig_lat,orig_long,orig_unc,stay_lat,stay_long,stay_unc,stay_dur,stay_ind,human_start_t\n')
f.close()
l = Lock() # thread locker
pool = Pool(cpu_count(), initializer=init, initargs=(l,))
# fixed param
user_num_in_mem = 1000
usernamelist = set() # user names
with open(inputFile,'rU') as csvfile:
readCSV = csv.reader(csvfile, delimiter=',')
for row in readCSV:
if not len(row) ==12 : continue
usernamelist.add(row[1]) # get ID list; the second colume is userID
usernamelist = list(usernamelist)
print('total number of users to be processed: ', len(usernamelist))
def divide_chunks(usernamelist, n):
for i in range(0, len(usernamelist), n): # looping till length usernamelist
yield usernamelist[i:i + n]
usernamechunks = list(divide_chunks(usernamelist, user_num_in_mem))
print('number of chunks to be processed', len(usernamechunks))
## read and process traces for one bulk
while (len(usernamechunks)):
namechunk = usernamechunks.pop()
print("Start processing bulk: ", len(usernamechunks) + 1, ' at time: ', time.strftime("%m%d-%H:%M"), ' memory: ', psutil.virtual_memory().percent)
UserList = {name: defaultdict(list) for name in namechunk}
with open(inputFile,'rU') as readfile:
readCSV = csv.reader(readfile, delimiter=',')
readCSV.next()
for row in readCSV:
#if not len(row) ==12 or len(row[0])==0: continue
#if '.' not in row[3] or '.' not in row[4]: continue # debug a data issue: not '.' in lat or long
#if(len(row[6].split('.'))>2 or len(row[7].split('.'))>2): continue
#if (('-' in row[6]) and (not row[6][0]=='-')) or (('-' in row[7]) and (not row[7][0]=='-')): continue
name = row[1]
if name in UserList:
UserList[name][row[-1][:6]].append(row)
print("End reading")
# pool
tasks = [pool.apply_async(func, (task,)) for task in [(name, UserList[name], duration_constraint, outputFile) for name in UserList]]
finishit = [t.get() for t in tasks]
'''
for name in UserList:
func((name, UserList[name], duration_constraint, outputFile))
'''
pool.close()
pool.join()
outputFile_real = outputFile.replace('_tmp.csv','.csv')
if os.path.isfile(outputFile_real):
os.remove(outputFile_real)
os.rename(outputFile,outputFile_real)