|
36 | 36 | TOP_N_MAX = 10 |
37 | 37 |
|
38 | 38 |
|
| 39 | +# Decorator: to avoid overwhelming the system, we sleep some amount after performing |
| 40 | +# demanding tasks. For how long? Some factor of the timed interval. |
| 41 | +def slow_down(func): |
| 42 | + def wrapper(*args, **kwargs): |
| 43 | + start = time.time() |
| 44 | + ret = func(*args, **kwargs) |
| 45 | + end = time.time() |
| 46 | + time.sleep((end - start) * 0.5) |
| 47 | + return ret |
| 48 | + return wrapper |
| 49 | + |
| 50 | + |
39 | 51 | def path_part_encode(s): |
40 | 52 | return s.replace(".", '%2e') |
41 | 53 |
|
@@ -107,7 +119,7 @@ def perform_account_aggr_job(*args, **job_params): |
107 | 119 | interval_label = job_params["interval_label"] |
108 | 120 | account_id = job_params["account_id"] |
109 | 121 | entities = [(entity_info["entity_id"], entity_info["details"]["ipv4"],) for entity_info in job_params["entities_infos"]] |
110 | | - |
| 122 | + log.info(f"Starting {interval_label} aggregation job for account {account_id}...") |
111 | 123 |
|
112 | 124 | last_used_seq, last_used_ts = _get_last_used_seq(job_id) |
113 | 125 | max_seq, max_ts = _get_current_max_seq() |
@@ -179,6 +191,7 @@ def construct_output_path_prefix(interval_label, direction, entity_id, interface |
179 | 191 |
|
180 | 192 |
|
181 | 193 | @staticmethod |
| 194 | + @slow_down |
182 | 195 | def get_traffic_for_entity(interval_label, last_seq, max_seq, time_between, direction, entity_id, entity_ip): |
183 | 196 | # returns cumulative traffic for the whole entity, and traffic per interface for this entity |
184 | 197 | with get_db_cursor() as c: |
@@ -219,6 +232,7 @@ def get_traffic_for_entity(interval_label, last_seq, max_seq, time_between, dire |
219 | 232 |
|
220 | 233 |
|
221 | 234 | @staticmethod |
| 235 | + @slow_down |
222 | 236 | def get_top_N_IPs_for_entity_interfaces(interval_label, last_seq, max_seq, time_between, direction, entity_id, entity_ip): |
223 | 237 | with get_db_cursor() as c, get_db_cursor() as c2: |
224 | 238 |
|
@@ -270,6 +284,7 @@ def get_top_N_IPs_for_entity_interfaces(interval_label, last_seq, max_seq, time_ |
270 | 284 | return values |
271 | 285 |
|
272 | 286 | @staticmethod |
| 287 | + @slow_down |
273 | 288 | def get_top_N_IPs_for_entity(interval_label, last_seq, max_seq, time_between, direction, entity_id, entity_ip): |
274 | 289 | with get_db_cursor() as c: |
275 | 290 | values = [] |
@@ -305,6 +320,7 @@ def get_top_N_IPs_for_entity(interval_label, last_seq, max_seq, time_between, di |
305 | 320 |
|
306 | 321 |
|
307 | 322 | @staticmethod |
| 323 | + @slow_down |
308 | 324 | def get_top_N_protocols_for_entity_interfaces(interval_label, last_seq, max_seq, time_between, direction, entity_id, entity_ip): |
309 | 325 | with get_db_cursor() as c, get_db_cursor() as c2: |
310 | 326 |
|
@@ -356,6 +372,7 @@ def get_top_N_protocols_for_entity_interfaces(interval_label, last_seq, max_seq, |
356 | 372 | return values |
357 | 373 |
|
358 | 374 | @staticmethod |
| 375 | + @slow_down |
359 | 376 | def get_top_N_protocols_for_entity(interval_label, last_seq, max_seq, time_between, direction, entity_id, entity_ip): |
360 | 377 | with get_db_cursor() as c: |
361 | 378 | values = [] |
@@ -390,6 +407,7 @@ def get_top_N_protocols_for_entity(interval_label, last_seq, max_seq, time_betwe |
390 | 407 | return values |
391 | 408 |
|
392 | 409 | # @staticmethod |
| 410 | + # @slow_down |
393 | 411 | # def get_top_N_protocols(output_path_prefix, from_time, to_time, interface_index, is_direction_in=True): |
394 | 412 | # with get_db_cursor() as c: |
395 | 413 | # # TODO: missing check for IP: r.client_ip = %s AND |
|
0 commit comments