-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathReadAndPartition.py
More file actions
149 lines (114 loc) · 5.07 KB
/
ReadAndPartition.py
File metadata and controls
149 lines (114 loc) · 5.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
'''
Read the user information(device location information), Timestamp, ID, ID Type, Latitude, Longitude, Accuracy(uncertainty radius), Human Time,
then partition them into different parts by apply partition threshold on uncertainty radius of each record
input:
raw user information
outout:
partitioned user information: gps stay information / celluar stay information
'''
from __future__ import print_function
import csv, time, os, copy, psutil, sys
from collections import defaultdict
import shutil, json
from multiprocessing import Pool
from multiprocessing import current_process, Lock, cpu_count
def init(l):
global lock
lock = l
def partition(user, partition_Threshold):
## split into gps traces and cellular traces
user_gps = {}
user_cell = {}
for d in user.keys():
user_gps[d] = []
user_cell[d] = []
for trace in user[d]:
if int(trace[5]) <= partition_Threshold:
user_gps[d].append(trace)
else:
user_cell[d].append(trace)
return user_gps, user_cell
def func(args):
name, user, partitionThreshold, outputFileGps, outputFileCell = args
userGps, userCell = partition(user,partitionThreshold)
if(not len(userGps) or not len(userCell)): return
# IO
with lock:
f1 = open(outputFileGps, 'a')
writeCSV1 = csv.writer(f1, delimiter=',')
f2 = open(outputFileCell, 'a')
writeCSV2 = csv.writer(f2, delimiter=',')
for day in sorted(userGps.keys()):
for trace in userGps[day]:
trace[1] = name
writeCSV1.writerow(trace)
f1.close()
for day in sorted(userCell.keys()):
for trace in userCell[day]:
trace[1] = name
writeCSV2.writerow(trace)
f2.close()
if __name__ == '__main__':
inputFile = sys.argv[1]
outputFileGps = sys.argv[2]
outputFileCell = sys.argv[3]
partitionThreshold = int(sys.argv[4])
f = open(outputFileGps, 'w')
f.write('unix_start_t,user_ID,mark_1, \
orig_lat,orig_long,orig_unc, \
stay_lat,stay_long,stay_unc,\
stay_dur,stay_ind,human_start_t\n')
f.close()
f = open(outputFileCell, 'w')
f.write('unix_start_t,user_ID,mark_1,orig_lat,orig_long,orig_unc,stay_lat,stay_long,stay_unc,stay_dur,stay_ind,human_start_t\n')
f.close()
l = Lock() # thread locker
pool = Pool(cpu_count(), initializer=init, initargs=(l,))
# fixed param
user_num_in_mem = 1000
## get time period covered by the data and user ID from file
#day_list = set() # time period covered by the data
usernamelist = set() # user names
with open(inputFile,'r+') as csvfile:
readCSV = csv.reader(csvfile, delimiter=',')
for row in readCSV:
#day_list.add(row[-1][:6]) # the last colume is humantime, in format 200506082035
usernamelist.add(row[1]) # get ID list; the second colume is userID
#day_list = sorted(list(day_list))
usernamelist = list(usernamelist)
print('total number of users to be processed: ', len(usernamelist))
def divide_chunks(usernamelist, n):
for i in range(0, len(usernamelist), n): # looping till length usernamelist
yield usernamelist[i:i + n]
## user_num_in_mem: How many elements each chunk should have
usernamechunks = list(divide_chunks(usernamelist, user_num_in_mem))
print('number of chunks to be processed', len(usernamechunks))
## read and process traces for one bulk
while (len(usernamechunks)):
namechunk = usernamechunks.pop()
print("Start processing bulk: ", len(usernamechunks) + 1, ' at time: ', time.strftime("%m%d-%H:%M"), ' memory: ', psutil.virtual_memory().percent)
UserList = {name: defaultdict(list) for name in namechunk}
with open(inputFile,'r+') as readfile:
readCSV = csv.reader(readfile, delimiter=',')
for row in readCSV:
if '.' not in row[3] or '.' not in row[4]: continue # debug a data issue: not '.' in lat or long
name = row[1]
if name in UserList:
UserList[name][row[-1][:6]].append(row)
for name in UserList:
for day in UserList[name]:
for row in UserList[name][day]:
row[1] = None # save memory: user id is long and cost memory
row[5] = int(float(row[5])) # convert uncertainty radius to integer
row.extend([-1, -1, -1, -1, -1])# standardizing data structure; add -1 will be filled by info of stays
row[6], row[-1] = row[-1], row[6] # push human time to the last column
print("End reading")
# pool
tasks = [pool.apply_async(func, (task,)) for task in [(name, UserList[name], partitionThreshold, outputFileGps, outputFileCell) for name in UserList]]
finishit = [t.get() for t in tasks]
'''
for name in UserList:
func((name, UserList[name], partitionThreshold, outputFileGps, outputFileCell))
'''
pool.close()
pool.join()