forked from Nerdy-Things/raspberry-pi-ai-camera-yolov8
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrecognition.py
More file actions
255 lines (204 loc) · 9.4 KB
/
recognition.py
File metadata and controls
255 lines (204 loc) · 9.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# This is a modified file of an official Raspberry PI example. Origin file:
# https://github.com/raspberrypi/picamera2/blob/main/examples/imx500/imx500_object_detection_demo.py
# BSD 2-Clause License. For additional information read: LICENCE-Raspberry-PI
import sys
import argparse
from functools import lru_cache
import cv2
import numpy as np
import time
import os
from itkacher.date_utils import DateUtils
from itkacher.file_utils import FileUtils
from itkacher.video_recorder import VideoRecorder
from picamera2 import MappedArray, Picamera2
from picamera2.devices import IMX500
from picamera2.devices.imx500 import (NetworkIntrinsics,
postprocess_nanodet_detection)
last_detections = []
threshold = 0.55
iou = 0.65
max_detections = 10
class Detection:
def __init__(self, coords, category, conf, metadata):
"""Create a Detection object, recording the bounding box, category and confidence."""
self.category = category
self.conf = conf
self.box = imx500.convert_inference_coords(coords, metadata, picam2)
def parse_detections(metadata: dict):
"""Parse the output tensor into a number of detected objects, scaled to the ISP out."""
global last_detections
bbox_normalization = intrinsics.bbox_normalization
np_outputs = imx500.get_outputs(metadata, add_batch=True)
input_w, input_h = imx500.get_input_size()
if np_outputs is None:
return last_detections
if intrinsics.postprocess == "nanodet":
boxes, scores, classes = \
postprocess_nanodet_detection(outputs=np_outputs[0], conf=threshold, iou_thres=iou,
max_out_dets=max_detections)[0]
from picamera2.devices.imx500.postprocess import scale_boxes
boxes = scale_boxes(boxes, 1, 1, input_h, input_w, False, False)
else:
boxes, scores, classes = np_outputs[0][0], np_outputs[1][0], np_outputs[2][0]
if bbox_normalization:
boxes = boxes / input_h
boxes = np.array_split(boxes, 4, axis=1)
boxes = zip(*boxes)
last_detections = [
Detection(box, category, score, metadata)
for box, score, category in zip(boxes, scores, classes)
if score > threshold
]
# Add tensor saving here
try:
timestamp = DateUtils.get_time()
tensor_folder = f"./data/tensors/{DateUtils.get_date()}/"
tensor_outputs = [boxes, scores, classes]
# Create VideoRecorder instance (if not already created)
video_recorder = VideoRecorder()
video_recorder.save_tensor_data(tensor_outputs, timestamp, tensor_folder)
except Exception as e:
print(f"Error saving tensor data: {e}")
return last_detections
@lru_cache
def get_labels():
labels = intrinsics.labels
if intrinsics.ignore_dash_labels:
labels = [label for label in labels if label and label != "-"]
return labels
def draw_detections(request, stream="main"):
"""Draw the detections for this request onto the ISP output."""
detections = last_results
if detections is None:
return
labels = get_labels()
with MappedArray(request, stream) as m:
for detection in detections:
x, y, w, h = detection.box
label = f"{labels[int(detection.category)]} ({detection.conf:.2f})"
# Calculate text size and position
(text_width, text_height), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
text_x = x + 5
text_y = y + 15
# Create a copy of the array to draw the background with opacity
overlay = m.array.copy()
# Draw the background rectangle on the overlay
cv2.rectangle(overlay,
(text_x, text_y - text_height),
(text_x + text_width, text_y + baseline),
(255, 255, 255), # Background color (white)
cv2.FILLED)
alpha = 0.30
cv2.addWeighted(overlay, alpha, m.array, 1 - alpha, 0, m.array)
# Draw text on top of the background
cv2.putText(m.array, label, (text_x, text_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
# Draw detection box
cv2.rectangle(m.array, (x, y), (x + w, y + h), (0, 255, 0, 0), thickness=2)
if intrinsics.preserve_aspect_ratio:
b_x, b_y, b_w, b_h = imx500.get_roi_scaled(request)
color = (255, 0, 0) # red
cv2.putText(m.array, "ROI", (b_x + 5, b_y + 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
cv2.rectangle(m.array, (b_x, b_y), (b_x + b_w, b_y + b_h), (255, 0, 0, 0))
if __name__ == "__main__":
# Add argument parsing
parser = argparse.ArgumentParser()
parser.add_argument('--save_tensors', action='store_true', help='Save tensor data')
parser.add_argument('--record_video', action='store_true', help='Record video from images')
args = parser.parse_args()
model = "./imx500-models-backup/imx500_network_yolov8n_pp.rpk"
# Initialize video recorder if needed
video_recorder = VideoRecorder() if args.record_video else None
# This must be called before instantiation of Picamera2
imx500 = IMX500(model)
intrinsics = imx500.network_intrinsics
# Initialize the Picamera2 object
picam2 = Picamera2()
# Configure the camera with proper error handling
try:
# Get the input size for the camera configuration
input_size = imx500.get_input_size()
print(f"Camera input size: {input_size}")
# Check if get_transform exists
transform = None
if hasattr(imx500, 'get_transform'):
transform = imx500.get_transform()
# Create the camera configuration
camera_config = picam2.create_preview_configuration(
main={"size": input_size},
transform=transform,
buffer_count=4
)
picam2.configure(camera_config)
# Set up camera metadata
if hasattr(imx500, 'post_callback'):
picam2.post_callback = imx500.post_callback
else:
print("Warning: imx500.post_callback not found, skipping this step")
except Exception as e:
print(f"Error configuring camera: {e}")
# Fallback configuration if the specialized configuration fails
print("Attempting to use default camera configuration...")
default_config = picam2.create_preview_configuration()
picam2.configure(default_config)
print("Using default camera configuration.")
# Start the camera
picam2.start()
# Allow time for camera to initialize
time.sleep(2)
# Get the labels
labels = get_labels()
# Your existing setup code...
# Modify your main loop
image_count = 0
IMAGES_PER_VIDEO = 300 # Will create a 10-second video at 30fps
try:
while True:
last_results = parse_detections(picam2.capture_metadata())
# Record file to SD card
data_folder = f"./data/images/{DateUtils.get_date()}/"
try:
# Ensure the folder exists
FileUtils.create_folders(data_folder)
# Save image
current_time = DateUtils.get_time()
image_path = f"{data_folder}/{current_time}.jpg"
picam2.capture_file(image_path)
image_count += 1
# Save tensors if enabled
if args.save_tensors and len(last_results) > 0:
tensor_folder = f"./data/tensors/{DateUtils.get_date()}/"
FileUtils.create_folders(tensor_folder)
try:
# Use the tensor outputs from the last detection
if 'boxes' in locals() and 'scores' in locals() and 'classes' in locals():
tensor_outputs = [boxes, scores, classes]
video_recorder.save_tensor_data(tensor_outputs, current_time, tensor_folder)
except Exception as error:
print(f"Error saving tensor data: {error}")
# Create video if enough frames collected
if args.record_video and image_count >= IMAGES_PER_VIDEO:
try:
video_folder = f"./data/videos/{DateUtils.get_date()}/"
FileUtils.create_folders(video_folder)
output_video = f"{video_folder}/video_{current_time}.mp4"
video_recorder.record_video(data_folder, output_video)
image_count = 0 # Reset counter
except Exception as error:
print(f"Error creating video: {error}")
except Exception as e:
print(f"Error in main loop: {e}")
FileUtils.create_folders(data_folder)
if (len(last_results) > 0):
for result in last_results:
label = f"{labels[int(result.category)]} ({result.conf:.2f})"
print(f"Detected {label}")
# Optional: add a small delay to reduce CPU usage
time.sleep(0.01)
except KeyboardInterrupt:
print("Program terminated by user")
finally:
# Clean up
picam2.stop()
print("Camera stopped and resources released")