-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquery.py
More file actions
198 lines (161 loc) · 5.99 KB
/
query.py
File metadata and controls
198 lines (161 loc) · 5.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
"""
query.py — Command-line log viewer
Run this script to search and filter the events log without opening DB Browser.
Usage examples:
python query.py # show last 50 events
python query.py --limit 100 # show last 100 events
python query.py --type DELETED # filter by event type
python query.py --type RENAMED # filter by event type
python query.py --file budget.xlsx # search by filename
python query.py --today # events from today only
python query.py --date 2026-05-26 # events from a specific date
python query.py --summary # count of each event type
"""
import sqlite3
import argparse
import os
import sys
from datetime import date
# ------------------------------------------------------------------
# CONFIG — update db_path if your log directory differs
# ------------------------------------------------------------------
DEFAULT_DB_PATH = r"C:\Users\primelink\Desktop\LOGS\filelog.db"
# ------------------------------------------------------------------
# HELPERS
# ------------------------------------------------------------------
def get_connection(db_path: str) -> sqlite3.Connection:
if not os.path.exists(db_path):
print(f"[ERROR] Database not found at: {db_path}")
print("Make sure the file watcher has been run at least once.")
sys.exit(1)
return sqlite3.connect(db_path)
def format_row(row: tuple) -> str:
"""Formats a single event row for readable terminal output."""
id_, timestamp, event_type, src_path, dest_path, file_size, md5_hash, prev_hash = row
size_str = f"{file_size:,} bytes" if file_size else "N/A"
if dest_path:
return (f"[{timestamp}] {event_type}\n"
f" FROM : {src_path}\n"
f" TO : {dest_path}\n"
f" SIZE : {size_str}\n")
elif event_type in ("MODIFIED", "MODIFIED (offline)") and prev_hash:
return (f"[{timestamp}] {event_type}\n"
f" PATH : {src_path}\n"
f" SIZE : {size_str}\n"
f" BEFORE: {prev_hash}\n"
f" AFTER : {md5_hash}\n")
else:
return (f"[{timestamp}] {event_type}\n"
f" PATH : {src_path}\n"
f" SIZE : {size_str}\n")
# ------------------------------------------------------------------
# QUERY FUNCTIONS
# ------------------------------------------------------------------
def query_events(conn: sqlite3.Connection, event_type: str = None,
filename: str = None, date_filter: str = None,
limit: int = 50) -> list:
"""
Builds and runs a filtered SELECT query against the events table.
All filters are optional and stack — you can combine --type and --file.
"""
conditions = []
params = []
if event_type:
conditions.append("event_type LIKE ?")
params.append(f"%{event_type.upper()}%")
if filename:
conditions.append("(src_path LIKE ? OR dest_path LIKE ?)")
params.append(f"%{filename}%")
params.append(f"%{filename}%")
if date_filter:
conditions.append("timestamp LIKE ?")
params.append(f"{date_filter}%")
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
params.append(limit)
query = f"""
SELECT id, timestamp, event_type, src_path, dest_path, file_size, md5_hash, prev_hash
FROM events
{where}
ORDER BY timestamp DESC
LIMIT ?
"""
return conn.execute(query, params).fetchall()
def query_summary(conn: sqlite3.Connection):
"""Prints a count of each event type — useful for a quick overview."""
rows = conn.execute("""
SELECT event_type, COUNT(*) as count
FROM events
GROUP BY event_type
ORDER BY count DESC
""").fetchall()
print("\n=== EVENT SUMMARY ===\n")
if not rows:
print("No events logged yet.")
return
max_len = max(len(r[0]) for r in rows)
for event_type, count in rows:
print(f" {event_type:<{max_len}} {count:>6} event(s)")
total = sum(r[1] for r in rows)
print(f"\n {'TOTAL':<{max_len}} {total:>6} event(s)\n")
# ------------------------------------------------------------------
# ENTRY POINT
# ------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Query the file watcher event log."
)
parser.add_argument(
"--db", default=DEFAULT_DB_PATH,
help="Path to filelog.db (default: configured path)"
)
parser.add_argument(
"--type", dest="event_type",
help="Filter by event type (e.g. DELETED, RENAMED, MOVED, CREATED, MODIFIED)"
)
parser.add_argument(
"--file", dest="filename",
help="Filter by filename or partial path"
)
parser.add_argument(
"--today", action="store_true",
help="Show only events from today"
)
parser.add_argument(
"--date",
help="Show events from a specific date (format: YYYY-MM-DD)"
)
parser.add_argument(
"--limit", type=int, default=50,
help="Maximum number of results to show (default: 50)"
)
parser.add_argument(
"--summary", action="store_true",
help="Show a count of each event type instead of individual events"
)
args = parser.parse_args()
conn = get_connection(args.db)
if args.summary:
query_summary(conn)
conn.close()
return
date_filter = None
if args.today:
date_filter = date.today().isoformat()
elif args.date:
date_filter = args.date
rows = query_events(
conn,
event_type=args.event_type,
filename=args.filename,
date_filter=date_filter,
limit=args.limit
)
print(f"\n=== {len(rows)} EVENT(S) ===\n")
if not rows:
print("No events match your filter.")
else:
for row in rows:
print(format_row(row))
conn.close()
if __name__ == "__main__":
main()