-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdraw_map.py
More file actions
127 lines (102 loc) · 4.06 KB
/
draw_map.py
File metadata and controls
127 lines (102 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#
# Draw maps using Folium and write to an html file.
# Author: N. Beckstead
#
# TODO: Debug when a marker field is None.
# TODO: Draw heatmap using log_mapper.attempts table.
# TODO: Optimize by looking up geo data first. Then call make_marker()
#
import folium
from folium import plugins
import db_helper as database
import geo_helper as geo
import server_vars
MARKER_RADIUS = 4
db, curs = database.connect()
#
# Main map function. Draw all maps.
#
def draw():
markers_map = folium.Map(location=[24.635246, 2.616971], zoom_start=3, tiles='CartoDB dark_matter')
heatmap = folium.Map(location=[24.635246, 2.616971], zoom_start=3, tiles='CartoDB positron')
markers_map = make_markersmap(markers_map)
markers_map.save(server_vars.MAP_LOCATION)
print("Markers done.")
heatmap = make_heatmap(heatmap)
heatmap.save(server_vars.HEATMAP_LOCATION)
print("Heatmap done.")
#
# Produce a heatmap from all attempts
#
def make_heatmap(map_obj):
curs.execute("SELECT ip FROM markers ORDER BY INET_ATON(ip);")
addresses = [ip[0] for ip in curs.fetchall()]
points = list()
max_attempts = "0.0.0.0", 0
for ip in addresses:
curs.execute("SELECT COUNT(stamp) FROM attempts WHERE ip='{}'".format(ip))
attempts = int(curs.fetchone()[0])
marker = geo.lookup(ip)
if marker is None or marker.location is None:
print("Error with {}:{}".format(ip, attempts))
continue
print("Adding {}:{} to map.".format(ip, attempts))
if attempts > max_attempts[1]:
max_attempts = (ip, attempts)
#folium.CircleMarker(location=marker.location, radius=1, color=server_vars.COL_DEFAULT, fill=True).add_to(map_obj)
points.append([marker.location[0], marker.location[1], attempts])
folium.plugins.HeatMap(points, radius=12).add_to(map_obj)
print("Max attempts: {}:{}".format(max_attempts[0], max_attempts[1]))
return map_obj
#
# Function to make a map with circle markers.
#
def make_markersmap(map_obj):
curs.execute("SELECT ip from log_mapper.markers ORDER BY INET_ATON(ip);")# ip ASC;")
list_ips = curs.fetchall()
mc = plugins.MarkerCluster()
for ip_tup in list_ips:
if ip_tup[0] is None:
continue
try:
make_marker(mc, str(ip_tup[0]))
except:
print("[*] Error with IP: {}".format(ip_tup[0]))
map_obj.add_child(mc)
return map_obj
#
# Add a marker to a Folium map object
#
def make_marker(map_obj, ip):
#print("Making marker for: {}".format(ip))
marker = geo.lookup(ip)
if marker is None:
return None
host = database.get_sensor(db, curs, ip)
success = database.get_success(db, curs, ip)
if success is None:
success = "Unknown"
elif success == 0:
success = "Failed"
elif success == 1:
success = "Successful"
else:
success = "Unknown"
# popup_text = """<a href=\"https://www.shodan.io/host/{}\" target=\"_blank\">{}</a><br>
popup_text = """<a href=\"{}\" target=\"_blank\">{}</a><br>
Sensor: {}<br>
Success: {}<br>
Country: {}<br>
Continent: {}<br>
Latitude: {}<br>
Longitude: {}<br>
<a href=\"https://shodan.io/host/{}\" target=\"_blank\">Shodan</a><br>
<a href=\"https://www.censys.io/ipv4/{}\" target=\"_blank\">Censys</a><br>
<a href=\"https://www.talosintelligence.com/reputation_center/lookup?search={}\" target=\"_blank\">Talos</a><br>
"""
# TODO: Print debug message if a field in marker is None
popup_text = popup_text.format(server_vars.POPUP_URL.format(ip), ip, host, success, marker.country, marker.continent, marker.location[0], marker.location[1], ip, ip, ip)
marker_color = database.get_color(db, curs, ip)
folium.CircleMarker(location=marker.location, radius=MARKER_RADIUS, color=marker_color, fill=False, popup=popup_text).add_to(map_obj)
if __name__ == '__main__':
draw()