|
|
@@ -0,0 +1,111 @@
|
|
|
+#!/usr/bin/env -S uv run --script
|
|
|
+# /// script
|
|
|
+# requires-python = ">=3.11"
|
|
|
+# dependencies = [
|
|
|
+# "requests",
|
|
|
+# "opencv-python",
|
|
|
+# "numpy",
|
|
|
+# "ntfy",
|
|
|
+# ]
|
|
|
+# ///
|
|
|
+import requests
|
|
|
+import cv2
|
|
|
+import numpy as np
|
|
|
+import time
|
|
|
+
|
|
|
+# URL for the video stream
|
|
|
+stream_url = 'http://loge.local:8083/stream'
|
|
|
+public_url = "https://mail.see.unbl.ink/stream"
|
|
|
+
|
|
|
+# Load a template image of a school bus (you need to have this template)
|
|
|
+template = cv2.imread('/home/powellc/var/inbox/school_bus_template.jpg', 0)
|
|
|
+w, h = template.shape[::-1]
|
|
|
+
|
|
|
+# Open the stream using OpenCV
|
|
|
+cap = cv2.VideoCapture(stream_url)
|
|
|
+
|
|
|
+# Check if the stream opened successfully
|
|
|
+if not cap.isOpened():
|
|
|
+ print("Error: Could not open video stream.")
|
|
|
+ exit()
|
|
|
+
|
|
|
+ # Configure ntfy server and topic
|
|
|
+ntfy_server = "https://ntfy.unbl.ink" # Replace with your ntfy server
|
|
|
+topic = "school-bus-alert" # Replace with your desired topic
|
|
|
+
|
|
|
+# Send notification function with custom server and topic
|
|
|
+def send_ntfy_notification(title, message):
|
|
|
+ # Construct the full URL with topic (this is how ntfy expects the URL)
|
|
|
+ url = f"{ntfy_server}"
|
|
|
+
|
|
|
+ # Payload for the notification
|
|
|
+ headers = {"Title": title}
|
|
|
+ payload = {
|
|
|
+ "topic": topic,
|
|
|
+ "message": message,
|
|
|
+ }
|
|
|
+
|
|
|
+ # Send the POST request to the ntfy server
|
|
|
+ response = requests.post(url, json=payload, headers=headers)
|
|
|
+
|
|
|
+ if response.status_code == 200:
|
|
|
+ print("Notification sent successfully.")
|
|
|
+ else:
|
|
|
+ print(f"Failed to send notification: {response.status_code} - {response.text}")
|
|
|
+
|
|
|
+while True:
|
|
|
+ # Read a frame from the stream
|
|
|
+ ret, frame = cap.read()
|
|
|
+
|
|
|
+ if not ret:
|
|
|
+ print("Failed to capture frame.")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Convert the frame to grayscale for template matching
|
|
|
+ gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
|
+
|
|
|
+ # Perform template matching on the current frame
|
|
|
+ res = cv2.matchTemplate(gray_frame, template, cv2.TM_CCOEFF_NORMED)
|
|
|
+ threshold = 0.80 # Set your threshold for matching
|
|
|
+ loc = np.where(res >= threshold)
|
|
|
+
|
|
|
+ # If a match is found, send a notification
|
|
|
+ if len(loc[0]) > 0:
|
|
|
+ print("Bus detected!")
|
|
|
+
|
|
|
+ # Save the frame with bounding boxes to a file in /tmp
|
|
|
+ timestamp = time.strftime("%Y%m%d_%H%M")
|
|
|
+ filename = f"school_bus_detection_{timestamp}.jpg"
|
|
|
+ output_path = f"/media/photos/misc/school_bus/{filename}"
|
|
|
+
|
|
|
+ cv2.imwrite(output_path, frame)
|
|
|
+ print(f"Frame with match locations saved to {output_path}")
|
|
|
+
|
|
|
+ # Send notification using ntfy (with both title and message)
|
|
|
+ title = "School Bus Detected!"
|
|
|
+ message = f"Check the video feed at: {public_url}\nMatched image at https://files.lab.unbl.ink/school_bus/{filename}"
|
|
|
+
|
|
|
+ # Send the notification with both title and message
|
|
|
+ send_ntfy_notification(title, message)
|
|
|
+
|
|
|
+ # Draw bounding boxes on the detected matches
|
|
|
+ for pt in zip(*loc[::-1]): # Switch x and y for rectangle drawing
|
|
|
+ cv2.rectangle(frame, pt, (pt[0] + w, pt[1] + h), (0, 0, 255), 2)
|
|
|
+
|
|
|
+
|
|
|
+ print("Pausing for 3 minutes to let the bus leave...")
|
|
|
+ time.sleep(180) # Sleep for 3 minutes (180 seconds)
|
|
|
+ else:
|
|
|
+ print("No bus found, sleeping for 2 seconds")
|
|
|
+
|
|
|
+ # Optional: Display the frame with the match location (for debugging)
|
|
|
+ #for pt in zip(*loc[::-1]):
|
|
|
+ # cv2.rectangle(frame, pt, (pt[0] + w, pt[1] + h), (0, 0, 255), 2)
|
|
|
+
|
|
|
+ ## Display the frame (for debugging)
|
|
|
+ #cv2.imshow('Frame', frame)
|
|
|
+
|
|
|
+ time.sleep(2) # two seconds so we don't spam the camera
|
|
|
+ # Break the loop on key press (e.g., 'q' to quit)
|
|
|
+ if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
|
+ break
|