Skip to content

Commit 605ba12

Browse files
committed
Fix bot to fetch data from single table (netflow_flows)
1 parent 3505c09 commit 605ba12

File tree

3 files changed

+84
-99
lines changed

3 files changed

+84
-99
lines changed

dbutils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,5 +147,6 @@ def migration_step_2():
147147
ipv4_src_addr INET NOT NULL
148148
);
149149
""")
150+
c.execute(f'CREATE INDEX {DB_PREFIX}flows_ts on {DB_PREFIX}flows (ts);')
150151

151152
c.execute(f'CREATE TABLE {DB_PREFIX}bot_jobs (job_id TEXT NOT NULL PRIMARY KEY, last_used_ts NUMERIC(16,6) NOT NULL);')

netflowbot.py

Lines changed: 79 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -52,27 +52,27 @@ def path_part_encode(s):
5252
return s.replace(".", '%2e')
5353

5454

55-
def _get_last_used_seq(job_id):
55+
def _get_last_used_ts(job_id):
5656
with get_db_cursor() as c:
57-
c.execute(f'SELECT j.last_used_seq, r.ts FROM {DB_PREFIX}bot_jobs j, {DB_PREFIX}records r WHERE j.job_id = %s AND j.last_used_seq = r.seq;', (job_id,))
57+
c.execute(f'SELECT j.last_used_ts FROM {DB_PREFIX}bot_jobs j WHERE j.job_id = %s;', (job_id,))
5858
rec = c.fetchone()
5959
if rec is None:
60-
return None, None
61-
last_used_seq, ts = rec
62-
return last_used_seq, ts
60+
return None
61+
last_used_ts, = rec
62+
return last_used_ts
6363

64-
def _get_current_max_seq():
64+
def _get_current_max_ts():
6565
with get_db_cursor() as c:
66-
c.execute(f"SELECT seq, ts FROM {DB_PREFIX}records WHERE seq = (SELECT MAX(seq) FROM {DB_PREFIX}records);")
66+
c.execute(f"SELECT MAX(ts) FROM {DB_PREFIX}flows;")
6767
rec = c.fetchone()
6868
if rec is None:
69-
return None, None
70-
max_seq, now_ts = rec
71-
return max_seq, now_ts
69+
return None
70+
max_ts, = rec
71+
return max_ts
7272

73-
def _save_current_max_seq(job_id, seq):
73+
def _save_current_max_ts(job_id, max_ts):
7474
with get_db_cursor() as c:
75-
c.execute(f"INSERT INTO {DB_PREFIX}bot_jobs (job_id, last_used_seq) VALUES (%s, %s) ON CONFLICT (job_id) DO UPDATE SET last_used_seq = %s;", (job_id, seq, seq))
75+
c.execute(f"INSERT INTO {DB_PREFIX}bot_jobs (job_id, last_used_ts) VALUES (%s, %s) ON CONFLICT (job_id) DO UPDATE SET last_used_ts = %s;", (job_id, max_ts, max_ts))
7676

7777

7878
class NetFlowBot(Collector):
@@ -101,33 +101,33 @@ def jobs(self):
101101
@staticmethod
102102
def perform_account_aggr_job(*args, **job_params):
103103
# \d netflow_flows
104-
# Column | Type | Description
105-
# ---------------+----------+-------------
106-
# record | integer | // FK -> netflow_records.seq (PK)
107-
# in_bytes | integer | number of bytes associated with an IP Flow
108-
# protocol | smallint | IP protocol (see lookup.py -> PROTOCOLS)
109-
# direction | smallint | flow direction: 0 - ingress flow, 1 - egress flow
110-
# l4_dst_port | integer | destination port
111-
# l4_src_port | integer | source port
112-
# input_snmp | smallint | input interface index
113-
# output_snmp | smallint | output interface index
114-
# ipv4_src_addr | text | source IP
115-
# ipv4_dst_addr | text | destination IP
116-
# ---------------+----------+-------------
104+
# Column | Type | Description
105+
# ---------------+---------------+------------
106+
# ts | numeric(16,6) | UNIX timestamp
107+
# client_ip | inet | entity IP address
108+
# in_bytes | integer | number of bytes associated with an IP Flow
109+
# protocol | smallint | IP protocol (see lookup.py -> PROTOCOLS)
110+
# direction | smallint | flow direction: 0 - ingress flow, 1 - egress flow
111+
# l4_dst_port | integer | destination port
112+
# l4_src_port | integer | source port
113+
# input_snmp | smallint | input interface index
114+
# output_snmp | smallint | output interface index
115+
# ipv4_dst_addr | inet | source IP
116+
# ipv4_src_addr | inet | destination IP
117117

118118
job_id = job_params["job_id"]
119119
interval_label = job_params["interval_label"]
120120
account_id = job_params["account_id"]
121121
entities = [(entity_info["entity_id"], entity_info["details"]["ipv4"],) for entity_info in job_params["entities_infos"]]
122122
log.info(f"Starting {interval_label} aggregation job for account {account_id}...")
123123

124-
last_used_seq, last_used_ts = _get_last_used_seq(job_id)
125-
max_seq, max_ts = _get_current_max_seq()
126-
if max_seq is None or last_used_ts == max_ts:
124+
last_used_ts = _get_last_used_ts(job_id)
125+
max_ts = _get_current_max_ts()
126+
if max_ts is None or last_used_ts == max_ts:
127127
log.info(f"No netflow data found for job {job_id}, skipping.")
128128
return
129-
_save_current_max_seq(job_id, max_seq)
130-
if last_used_seq is None:
129+
_save_current_max_ts(job_id, max_ts)
130+
if last_used_ts is None:
131131
log.info(f"Counter was not yet initialized for job {job_id}, skipping.")
132132
return
133133
#time_between = float(max_ts - last_used_ts)
@@ -138,10 +138,10 @@ def perform_account_aggr_job(*args, **job_params):
138138
sum_traffic_egress = 0
139139
sum_traffic_ingress = 0
140140
for entity_id, entity_ip in entities:
141-
v, s = NetFlowBot.get_traffic_for_entity(interval_label, last_used_seq, max_seq, time_between, DIRECTION_EGRESS, entity_id, entity_ip)
141+
v, s = NetFlowBot.get_traffic_for_entity(interval_label, last_used_ts, max_ts, time_between, DIRECTION_EGRESS, entity_id, entity_ip)
142142
values.extend(v)
143143
sum_traffic_egress += s
144-
v, s = NetFlowBot.get_traffic_for_entity(interval_label, last_used_seq, max_seq, time_between, DIRECTION_INGRESS, entity_id, entity_ip)
144+
v, s = NetFlowBot.get_traffic_for_entity(interval_label, last_used_ts, max_ts, time_between, DIRECTION_INGRESS, entity_id, entity_ip)
145145
values.extend(v)
146146
sum_traffic_ingress += s
147147

@@ -160,10 +160,10 @@ def perform_account_aggr_job(*args, **job_params):
160160
# top N IPs:
161161
for entity_id, entity_ip in entities:
162162
for direction in [DIRECTION_EGRESS, DIRECTION_INGRESS]:
163-
values.extend(NetFlowBot.get_top_N_IPs_for_entity(interval_label, last_used_seq, max_seq, time_between, direction, entity_id, entity_ip))
164-
values.extend(NetFlowBot.get_top_N_IPs_for_entity_interfaces(interval_label, last_used_seq, max_seq, time_between, direction, entity_id, entity_ip))
165-
values.extend(NetFlowBot.get_top_N_protocols_for_entity(interval_label, last_used_seq, max_seq, time_between, direction, entity_id, entity_ip))
166-
values.extend(NetFlowBot.get_top_N_protocols_for_entity_interfaces(interval_label, last_used_seq, max_seq, time_between, direction, entity_id, entity_ip))
163+
values.extend(NetFlowBot.get_top_N_IPs_for_entity(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
164+
values.extend(NetFlowBot.get_top_N_IPs_for_entity_interfaces(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
165+
values.extend(NetFlowBot.get_top_N_protocols_for_entity(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
166+
values.extend(NetFlowBot.get_top_N_protocols_for_entity_interfaces(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip))
167167

168168
if not values:
169169
log.warning("No values found to be sent to Grafolean")
@@ -192,7 +192,7 @@ def construct_output_path_prefix(interval_label, direction, entity_id, interface
192192

193193
@staticmethod
194194
@slow_down
195-
def get_traffic_for_entity(interval_label, last_seq, max_seq, time_between, direction, entity_id, entity_ip):
195+
def get_traffic_for_entity(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip):
196196
# returns cumulative traffic for the whole entity, and traffic per interface for this entity
197197
with get_db_cursor() as c:
198198

@@ -201,17 +201,15 @@ def get_traffic_for_entity(interval_label, last_seq, max_seq, time_between, dire
201201
f.{'input_snmp' if direction == DIRECTION_INGRESS else 'output_snmp'},
202202
sum(f.in_bytes)
203203
FROM
204-
{DB_PREFIX}records "r",
205204
{DB_PREFIX}flows "f"
206205
WHERE
207-
r.client_ip = %s AND
208-
r.seq > %s AND
209-
r.seq <= %s AND
210-
r.seq = f.record AND
206+
f.client_ip = %s AND
207+
f.ts > %s AND
208+
f.ts <= %s AND
211209
f.direction = %s
212210
GROUP BY
213211
f.{'input_snmp' if direction == DIRECTION_INGRESS else 'output_snmp'}
214-
""", (entity_ip, last_seq, max_seq, direction))
212+
""", (entity_ip, last_used_ts, max_ts, direction))
215213

216214
values = []
217215
sum_traffic = 0
@@ -233,45 +231,41 @@ def get_traffic_for_entity(interval_label, last_seq, max_seq, time_between, dire
233231

234232
@staticmethod
235233
@slow_down
236-
def get_top_N_IPs_for_entity_interfaces(interval_label, last_seq, max_seq, time_between, direction, entity_id, entity_ip):
234+
def get_top_N_IPs_for_entity_interfaces(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip):
237235
with get_db_cursor() as c, get_db_cursor() as c2:
238236

239237
values = []
240238
c.execute(f"""
241239
SELECT
242240
distinct(f.{'input_snmp' if direction == DIRECTION_INGRESS else 'output_snmp'}) "interface_index"
243241
FROM
244-
netflow_records "r",
245-
netflow_flows "f"
242+
{DB_PREFIX}flows "f"
246243
WHERE
247-
r.client_ip = %s AND
248-
r.seq > %s AND
249-
r.seq <= %s AND
250-
r.seq = f.record AND
244+
f.client_ip = %s AND
245+
f.ts > %s AND
246+
f.ts <= %s AND
251247
f.direction = %s
252-
""", (entity_ip, last_seq, max_seq, direction,))
248+
""", (entity_ip, last_used_ts, max_ts, direction,))
253249

254250
for interface_index, in c.fetchall():
255251
c2.execute(f"""
256252
SELECT
257253
f.{'ipv4_src_addr' if direction == DIRECTION_INGRESS else 'ipv4_dst_addr'},
258254
sum(f.in_bytes) "traffic"
259255
FROM
260-
netflow_records "r",
261-
netflow_flows "f"
256+
{DB_PREFIX}flows "f"
262257
WHERE
263-
r.client_ip = %s AND
264-
r.seq > %s AND
265-
r.seq <= %s AND
266-
r.seq = f.record AND
258+
f.client_ip = %s AND
259+
f.ts > %s AND
260+
f.ts <= %s AND
267261
f.direction = %s AND
268262
f.{'input_snmp' if direction == DIRECTION_INGRESS else 'output_snmp'} = %s
269263
GROUP BY
270264
f.{'ipv4_src_addr' if direction == DIRECTION_INGRESS else 'ipv4_dst_addr'}
271265
ORDER BY
272266
traffic desc
273267
LIMIT {TOP_N_MAX};
274-
""", (entity_ip, last_seq, max_seq, direction, interface_index,))
268+
""", (entity_ip, last_used_ts, max_ts, direction, interface_index,))
275269

276270
output_path_interface = NetFlowBot.construct_output_path_prefix(interval_label, direction, entity_id, interface=interface_index)
277271
for top_ip, traffic_bytes in c2.fetchall():
@@ -285,28 +279,26 @@ def get_top_N_IPs_for_entity_interfaces(interval_label, last_seq, max_seq, time_
285279

286280
@staticmethod
287281
@slow_down
288-
def get_top_N_IPs_for_entity(interval_label, last_seq, max_seq, time_between, direction, entity_id, entity_ip):
282+
def get_top_N_IPs_for_entity(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip):
289283
with get_db_cursor() as c:
290284
values = []
291285
c.execute(f"""
292286
SELECT
293287
f.{'ipv4_src_addr' if direction == DIRECTION_INGRESS else 'ipv4_dst_addr'},
294288
sum(f.in_bytes) "traffic"
295289
FROM
296-
netflow_records "r",
297-
netflow_flows "f"
290+
{DB_PREFIX}flows "f"
298291
WHERE
299-
r.client_ip = %s AND
300-
r.seq > %s AND
301-
r.seq <= %s AND
302-
r.seq = f.record AND
292+
f.client_ip = %s AND
293+
f.ts > %s AND
294+
f.ts <= %s AND
303295
f.direction = %s
304296
GROUP BY
305297
f.{'ipv4_src_addr' if direction == DIRECTION_INGRESS else 'ipv4_dst_addr'}
306298
ORDER BY
307299
traffic desc
308300
LIMIT {TOP_N_MAX};
309-
""", (entity_ip, last_seq, max_seq, direction,))
301+
""", (entity_ip, last_used_ts, max_ts, direction,))
310302

311303
output_path_entity = NetFlowBot.construct_output_path_prefix(interval_label, direction, entity_id, interface=None)
312304
for top_ip, traffic_bytes in c.fetchall():
@@ -321,45 +313,41 @@ def get_top_N_IPs_for_entity(interval_label, last_seq, max_seq, time_between, di
321313

322314
@staticmethod
323315
@slow_down
324-
def get_top_N_protocols_for_entity_interfaces(interval_label, last_seq, max_seq, time_between, direction, entity_id, entity_ip):
316+
def get_top_N_protocols_for_entity_interfaces(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip):
325317
with get_db_cursor() as c, get_db_cursor() as c2:
326318

327319
values = []
328320
c.execute(f"""
329321
SELECT
330322
distinct(f.{'input_snmp' if direction == DIRECTION_INGRESS else 'output_snmp'}) "interface_index"
331323
FROM
332-
netflow_records "r",
333-
netflow_flows "f"
324+
{DB_PREFIX}flows "f"
334325
WHERE
335-
r.client_ip = %s AND
336-
r.seq > %s AND
337-
r.seq <= %s AND
338-
r.seq = f.record AND
326+
f.client_ip = %s AND
327+
f.ts > %s AND
328+
f.ts <= %s AND
339329
f.direction = %s
340-
""", (entity_ip, last_seq, max_seq, direction,))
330+
""", (entity_ip, last_used_ts, max_ts, direction,))
341331

342332
for interface_index, in c.fetchall():
343333
c2.execute(f"""
344334
SELECT
345335
f.protocol,
346336
sum(f.in_bytes) "traffic"
347337
FROM
348-
netflow_records "r",
349-
netflow_flows "f"
338+
{DB_PREFIX}flows "f"
350339
WHERE
351-
r.client_ip = %s AND
352-
r.seq > %s AND
353-
r.seq <= %s AND
354-
r.seq = f.record AND
340+
f.client_ip = %s AND
341+
f.ts > %s AND
342+
f.ts <= %s AND
355343
f.direction = %s AND
356344
f.{'input_snmp' if direction == DIRECTION_INGRESS else 'output_snmp'} = %s
357345
GROUP BY
358346
f.protocol
359347
ORDER BY
360348
traffic desc
361349
LIMIT {TOP_N_MAX};
362-
""", (entity_ip, last_seq, max_seq, direction, interface_index,))
350+
""", (entity_ip, last_used_ts, max_ts, direction, interface_index,))
363351

364352
output_path_interface = NetFlowBot.construct_output_path_prefix(interval_label, direction, entity_id, interface=interface_index)
365353
for protocol, traffic_bytes in c2.fetchall():
@@ -373,28 +361,26 @@ def get_top_N_protocols_for_entity_interfaces(interval_label, last_seq, max_seq,
373361

374362
@staticmethod
375363
@slow_down
376-
def get_top_N_protocols_for_entity(interval_label, last_seq, max_seq, time_between, direction, entity_id, entity_ip):
364+
def get_top_N_protocols_for_entity(interval_label, last_used_ts, max_ts, time_between, direction, entity_id, entity_ip):
377365
with get_db_cursor() as c:
378366
values = []
379367
c.execute(f"""
380368
SELECT
381369
f.protocol,
382370
sum(f.in_bytes) "traffic"
383371
FROM
384-
netflow_records "r",
385-
netflow_flows "f"
372+
{DB_PREFIX}flows "f"
386373
WHERE
387-
r.client_ip = %s AND
388-
r.seq > %s AND
389-
r.seq <= %s AND
390-
r.seq = f.record AND
374+
f.client_ip = %s AND
375+
f.ts > %s AND
376+
f.ts <= %s AND
391377
f.direction = %s
392378
GROUP BY
393379
f.protocol
394380
ORDER BY
395381
traffic desc
396382
LIMIT {TOP_N_MAX};
397-
""", (entity_ip, last_seq, max_seq, direction,))
383+
""", (entity_ip, last_used_ts, max_ts, direction,))
398384

399385
output_path_entity = NetFlowBot.construct_output_path_prefix(interval_label, direction, entity_id, interface=None)
400386
for protocol, traffic_bytes in c.fetchall():
@@ -410,18 +396,16 @@ def get_top_N_protocols_for_entity(interval_label, last_seq, max_seq, time_betwe
410396
# @slow_down
411397
# def get_top_N_protocols(output_path_prefix, from_time, to_time, interface_index, is_direction_in=True):
412398
# with get_db_cursor() as c:
413-
# # TODO: missing check for IP: r.client_ip = %s AND
399+
# # TODO: missing check for IP: f.client_ip = %s AND
414400
# c.execute(f"""
415401
# SELECT
416402
# f.PROTOCOL,
417403
# sum(f.IN_BYTES) "traffic"
418404
# FROM
419-
# netflow_records "r",
420-
# netflow_flows "f"
405+
# {DB_PREFIX}flows "f"
421406
# WHERE
422-
# r.ts >= %s AND
423-
# r.ts < %s AND
424-
# r.seq = f.record AND
407+
# f.ts >= %s AND
408+
# f.ts < %s AND
425409
# f.{'INPUT_SNMP' if is_direction_in else 'OUTPUT_SNMP'} = %s AND
426410
# f.DIRECTION = {'0' if is_direction_in else '1'}
427411
# GROUP BY

netflowwriter.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,12 @@ def write_record(ts, client, export):
109109
# check for missing NetFlow records:
110110
last_record_seq = last_record_seqs.get(client_ip)
111111
if last_record_seq is None:
112-
log.warning(f"Last record sequence number is not known, starting with {export.header.sequence}")
112+
log.warning(f"[{client_ip}] Last record sequence number is not known, starting with {export.header.sequence}")
113113
elif export.header.sequence != last_record_seq + 1:
114-
log.error(f"Sequence number ({export.header.sequence}) does not follow ({last_record_seq}), some records might have been skipped")
114+
log.error(f"[{client_ip}] Sequence number ({export.header.sequence}) does not follow ({last_record_seq}), some records might have been skipped")
115115
last_record_seqs[client_ip] = export.header.sequence
116116

117-
log.debug(f"Received record [{export.header.sequence}]: {datetime.utcfromtimestamp(ts)} from {client_ip}")
117+
log.debug(f"[{client_ip}] Received record [{export.header.sequence}]: {datetime.utcfromtimestamp(ts)}")
118118
with get_db_cursor() as c:
119119
# save each of the flows within the record, but use execute_values() to perform bulk insert:
120120
def _get_data(netflow_version, ts, client_ip, flows):
@@ -169,7 +169,7 @@ def _get_data(netflow_version, ts, client_ip, flows):
169169
socket.inet_ntoa(struct.pack('!I', f.data["IPV4_SRC_ADDR"])),
170170
)
171171
else:
172-
log.error(f"Only Netflow v5 and v9 currently supported, ignoring record (version: [{export.header.version}])")
172+
log.error(f"[{client_ip}] Only Netflow v5 and v9 currently supported, ignoring record (version: [{export.header.version}])")
173173
return
174174

175175
data_iterator = _get_data(export.header.version, ts, client_ip, export.flows)

0 commit comments

Comments
 (0)