Playing with Gizmos chain I have a proposal for a specific use case.
In my case I need a 24h/24 inference calculation from rtsp cameras.
VideoSourceGizmo works fine until a camera disconnected or source exhausted ( network error or camera maintenance, etc…) , the VideoSourceGizmo code is clear:
def run(self):
"""Run the video capture loop.
Continuously reads frames from the video source and sends each frame (with metadata) downstream until the source is exhausted or abort is signaled.
"""
with open_video_stream(self._video_source) as src:
meta = {
self.key_frame_width: int(src.get(cv2.CAP_PROP_FRAME_WIDTH)),
self.key_frame_height: int(src.get(cv2.CAP_PROP_FRAME_HEIGHT)),
self.key_fps: src.get(cv2.CAP_PROP_FPS),
self.key_frame_count: int(src.get(cv2.CAP_PROP_FRAME_COUNT)),
}
while not self._abort:
ret, data = src.read()
if not ret:
break
else:
meta2 = copy.copy(meta)
meta2[self.key_frame_id] = self.result_cnt
meta2[self.key_timestamp] = time.time()
self.send_result(
StreamData(data, StreamMeta(meta2, self.get_tags()))
)
if self._stop_composition_on_end:
# Stop composition if video source is over (to halt other sources still running).
if self.composition is not None and not self._abort:
self.composition.stop()
I’m starting touching here and there in the code to add the possibility to catch cv2 source streaming exceptions and implement a no blocking chain.
I’m thinking to add a switch to choose from blocking source stream or wait for a source stream.
This is what a NVR do, if a camera is not connected you get the error but as soon as the camera is up&running again it will continue to record.
I want to avoid working at OS level for example spawn the died process.
Yes, this is great proposal. I have a question: when using cv2.VideoCapture class, both .grab() and .read() methods block indefinitely when camera disconnects from network. Chatgpt told me that this is how ffmpeg-based implementations work and nothing can be done. Any ideas?
Talking about improvements, I’m trying, touching the code, to enable hardware accelleration in Rpi 5 for H265 decoding (no hw accel decoding for H264). It should be a parameter in the stream opening but I’m stuck.
Looking into processes, Python is consuming CPU for each RTSP input stream proportionally to fps input.
Yes, we just released VideoSource improvement which retries on errors. Unfortunately, this is not enough, since for RTSP cameras the underlying ffmpeg (which OpenCV uses for camera handling) just waits with very long timeout for disconnected camera without raising any exceptions. To deal with that we developed FPSStabilizingGizmo. Insert it just after video source in the pipeline. This gizmo detects stalls and repeats last frame (gradually shadowing it to reflect that it is not live) to maintain FPS.
Regarding HW acceleration, one way is to use ffmpeg as separate process. We are working on such video source to be in dgtools.
I have another idea that works using a whatchdog thread that check if the VideoSource thread is alive and if it si not call a function that restart another VideoSource thread.
Here the example code of the whatchdog where target is the first VideoSource thread and action is a function that start a new VideoSource thread
# task for a watchdog thread
def watchdog(target, action):
# run forever
print('Watchdog running')
while True:
# check the status of the target thread
if not target.is_alive():
# report fault
print('Watchdog: target thread is not running, restarting...')
# restart the target thread
target = action()
# block for a moment
sleep(0.5)
...
# create and start a worker thread, returns instance of thread
def boot_worker():
# create and configure the thread
worker = Thread(target=worker_task, name="Worker")
# start the thread
worker.start()
# return the instance of the thread
return worker
.....
# create the watchdog for the worker thread
watchdog = Thread(target=watchdog, args=(worker, boot_worker), daemon=True, name="Watchdog")
# start the watchdog
watchdog.start()
...
Probably also the entire Gizmo chain must be restarted.
Yep, another thread is needed one way or another to monitor stalled VideoCapture.
Our experiments with disconnected and connected again RTSP cameras show that the combination of FPSStabilizingGizmo (which keeps pipeline running with fake frames) and VideoSourceGizmo with retries-on-error, provide adequate recovery.
Also in the latest release of dgtools we implemented streams.Watchdog class for similar purposes. Usage:
watchdog = Watchdog(time_limit=20, tps_threshold=1, smoothing=0.95)
# time_limit (float): Maximum allowed time (in seconds) since the last tick.
# tps_threshold (float): Minimum required filtered ticks per second.
# smoothing (float): Smoothing factor for the low-pass filter (0 < smoothing < 1).
some_gizmo.watchdog = watchdog # attach watchdog to some gizmo you like to monitor
...
# somewhere else:
if not watchdog.check():
# pipeline is not running
As soon as I disconnect per poe camera from the switch I didn’t get exception anymore but after the poe camera restarted the stream didn’t restart to flow on the chain and I got this warning each 30s in the shell
[ WARN:5@69.122] global cap_ffmpeg_impl.hpp:453 _opencv_ffmpeg_interrupt_callback Stream timeout triggered after 30034.920097 ms
[ WARN:5@99.165] global cap_ffmpeg_impl.hpp:453 _opencv_ffmpeg_interrupt_callback Stream timeout triggered after 30034.571256 ms
[ WARN:5@129.228] global cap_ffmpeg_impl.hpp:453 _opencv_ffmpeg_interrupt_callback Stream timeout triggered after 30034.666748 ms
[ WARN:5@159.262] global cap_ffmpeg_impl.hpp:453 _opencv_ffmpeg_interrupt_callback Stream timeout triggered after 30034.033006 ms
[ WARN:5@189.296] global cap_ffmpeg_impl.hpp:453 _opencv_ffmpeg_interrupt_callback Stream timeout triggered after 30033.321534 ms