Latency when using gstreamer streaming

Hi. I am planning to use DeGirum to run YoloV8 segmentation and object detection concurrently. So I started with the tutorial here hailo_examples/examples/016_custom_video_source.ipynb at main · DeGirum/hailo_examples · GitHub under ‘Using Gstreamer using Pygobject module’ to test out the basic functions. However, I noticed there is a noticeable amount of latency in the live video which is unlike the detection code shown here hailo-apps-infra/hailo_apps/hailo_app_python/apps/detection at main · hailo-ai/hailo-apps-infra · GitHub running YoloV8m with 30 fps and no noticeable latency. I also modified the pipeline to
pipeline_str = (
"v4l2src device=/dev/video0 ! "
"image/jpeg, width=1280, height=720, framerate=30/1 ! "
"jpegdec ! videoconvert ! video/x-raw, format=BGR ! "
“appsink name=sink emit-signals=true max-buffers=1 drop=true”
)
and the video plays smoother but the latency is still there. Any idea what is causing the latency?

Hi @hao004

Welcome to the DeGirum community. Thank you for bringing this to our notice. Our team is taking a look at this issue. Can you please let us know how you noticed or measure this latency? Is the throughput good?

Hi @hao004 ,
Thank you for taking the time to dive deeper into this and for bringing it to our attention. We truly appreciate you using PySDK for your use case! I’m currently working on replicating the issue you’re facing, and to help us investigate further, it would be really helpful if you could share the following details:

  1. Which models have you tested with the custom video generator using GStreamer and PySDK?

  2. What FPS did you observe with each of these models?

  3. Did you notice any latency issues? If so, how severe were they?

  4. What was the FPS and latency using hailo framework for the above models ?

In the meantime, while we work on a permanent fix, here are two quick adjustments you can try that should significantly improve FPS:

1. Specify width and height in the GStreamer pipeline
PySDK resizes input images internally to match the model requirements, but this happens on the CPU, which adds a bit of latency. If you specify the width and height directly in the GStreamer pipeline, this overhead is avoided, resulting in faster FPS.

Example:

v4l2src device=/dev/video0 ! videoscale ! videoconvert ! video/x-raw, width=640, height=640, format=BGR ! appsink name=sink

2. Use our pipeline with the OpenCV backend
This approach delivers higher FPS, almost comparable to Hailo’s performance.

Example:

import degirum as dg, degirum_tools

inference_host_address = "@local"
zoo_url = "degirum/hailo"
token = '' 
device_type = "HAILORT/HAILO8L"

# set model name and video source
model_name = "yolov8n_coco--640x640_quant_hailort_multidevice_1"
video_source = 0 # replace with your camera device id 

# load model
model = dg.load_model(
    model_name=model_name, 
    inference_host_address=inference_host_address,
    zoo_url=zoo_url,
    token=token,
    device_type=device_type
)

# run AI inference on video stream and display the results
with degirum_tools.Display("AI Camera") as output_display:
    for inference_result in degirum_tools.predict_stream(model, video_source):
        output_display.show(inference_result.image_overlay)

We’d love to hear back with your test details, as they’ll really help us zero in on the issue. Thanks again for your patience and support!

1 Like

Thanks for replying. I noticed the latency when I looked away from the monitor and then quickly looked back. I could still see my eyeball in its previous position (looking away) on the screen. To measure this latency. The following code is used. I measure about 270 ms of latency. The throughput is good, about 30 fps.

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
import numpy as np
import cv2
import degirum as dg
import time
from collections import deque

# Initialize GStreamer
Gst.init(None)

# Load Hailo model 
model = dg.load_model(
    model_name="yolov8n_coco--640x640_quant_hailort_multidevice_1",
    inference_host_address="@local",
    zoo_url="./models",
    device_type=['HAILORT/HAILO8']
)

# Custom generator with software timestamp
def custom_video_generator(ts_queue):
    pipeline_str = (
        "v4l2src device=/dev/video0 ! "
        "image/jpeg, width=640, height=480, framerate=30/1 ! "
        "jpegdec ! videoconvert ! video/x-raw, format=BGR ! "
        "appsink name=sink emit-signals=true max-buffers=1 drop=true"
    )
    pipeline = Gst.parse_launch(pipeline_str)
    sink = pipeline.get_by_name("sink")

    pipeline.set_state(Gst.State.PLAYING)

    try:
        while True:
            sample = sink.emit("pull-sample")
            if sample:
                buf = sample.get_buffer()
                caps = sample.get_caps()
                width = caps.get_structure(0).get_value("width")
                height = caps.get_structure(0).get_value("height")

                success, map_info = buf.map(Gst.MapFlags.READ)
                if not success:
                    continue

                frame = np.frombuffer(map_info.data, np.uint8).reshape((height, width, 3))
                buf.unmap(map_info)

                # Save capture timestamp in queue
                ts_queue.append(time.time())
                yield frame
            else:
                break
    finally:
        pipeline.set_state(Gst.State.NULL)


# FPS tracking
frame_count = 0
start_time = time.time()
ts_queue = deque()

# Run batch prediction and latency measurement
for result in model.predict_batch(custom_video_generator(ts_queue)):
    display_time = time.time()

    if ts_queue:  # get the matching capture timestamp
        capture_time = ts_queue.popleft()
        latency = (display_time - capture_time) * 1000  # ms
    else:
        latency = float('nan')

    frame_count += 1
    elapsed_time = time.time() - start_time

    # Print FPS every second
    if elapsed_time >= 1.0:
        fps = frame_count / elapsed_time
        print(f"FPS: {fps:.2f} | Latency: {latency:.1f} ms")
        frame_count = 0
        start_time = time.time()

    cv2.imshow("Webcam Inference", result.image_overlay)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cv2.destroyAllWindows()

Currently I am testing with yolov8n_coco–640x640_quant_hailort_multidevice_1 as shown in the code above.

30 FPS.

Yes. About 270 ms of latency.

30 FPS. I did not measure the latency though as I didn’t notice any noticeable latency in the live stream.

I tried the above adjustments but didn’t see any improvements.

Currently I am experimenting with predict() function instead of predict_batch() function and it is running at 30 FPS and has much lower latency, about 16 ms. The code is shown below. Any drawbacks if I were to use this instead of predict_batch()?

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
import numpy as np
import cv2
import degirum as dg
import time

# Initialize GStreamer
Gst.init(None)

# Load Hailo model 
model = dg.load_model(
    model_name="yolov8n_coco--640x640_quant_hailort_multidevice_1",
    inference_host_address="@local",
    zoo_url="./models",
    device_type=['HAILORT/HAILO8']
)

# Custom generator with software timestamp
def custom_video_generator():
    pipeline_str = (
        "v4l2src device=/dev/video0 ! "
        "image/jpeg, width=640, height=480, framerate=30/1 ! "
        "jpegdec ! videoconvert ! video/x-raw, format=BGR ! "
        "appsink name=sink emit-signals=true max-buffers=1 drop=true"
    )
    pipeline = Gst.parse_launch(pipeline_str)
    sink = pipeline.get_by_name("sink")

    pipeline.set_state(Gst.State.PLAYING)

    try:
        while True:
            sample = sink.emit("pull-sample")
            if sample:
                buf = sample.get_buffer()
                caps = sample.get_caps()
                width = caps.get_structure(0).get_value("width")
                height = caps.get_structure(0).get_value("height")

                success, map_info = buf.map(Gst.MapFlags.READ)
                if not success:
                    continue

                frame = np.frombuffer(map_info.data, np.uint8).reshape((height, width, 3))
                buf.unmap(map_info)

                # Attach software capture timestamp
                capture_time = time.time()
                yield frame, capture_time
            else:
                break
    finally:
        pipeline.set_state(Gst.State.NULL)


# FPS + latency tracking
frame_count = 0
start_time = time.time()

for frame, capture_time in custom_video_generator():
    # Run single-frame inference
    result = model.predict(frame)

    # Measure latency
    display_time = time.time()
    latency = (display_time - capture_time) * 1000  # ms

    # Count frames for FPS
    frame_count += 1
    elapsed_time = time.time() - start_time

    # Print FPS once per second
    if elapsed_time >= 1.0:
        fps = frame_count / elapsed_time
        print(f"FPS: {fps:.2f} | Latency: {latency:.1f} ms")
        frame_count = 0
        start_time = time.time()

    # Show annotated frame
    cv2.imshow("Webcam Inference", result.image_overlay)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cv2.destroyAllWindows()

Hi @hao004

Thank you for providing detailed answers to all our queries. We will dig deeper to understand the source of the latency.

Batch predict is optimized for throughput performance. If your system supports multiple applications that uses multiple models, this optimization may be crucial.

@hao004 we have identified the issue and working on fixing the same. Meanwhile I would like to suggest a hot-fix that will bring down the latency significantly. The fix is to run a warm-up inference on a dummy frame before running the loop. This will load the model before you start capturing actual frames from the camera.

Here’s how to do it:

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
import numpy as np
import cv2
import degirum as dg
import time
from collections import deque

# Initialize GStreamer
Gst.init(None)

model = dg.load_model(
        model_name="yolov8n_coco--640x640_quant_hailort_multidevice_1",
        inference_host_address="@local",
        zoo_url="degirum/hailo"
    )

# Custom generator with software timestamp
def custom_video_generator(ts_queue):
    pipeline_str = (
        "v4l2src device=/dev/video0 ! "
        "image/jpeg, width=640, height=480, framerate=30/1 ! "
        "jpegdec ! videoconvert ! video/x-raw, format=BGR ! "
        "appsink name=sink emit-signals=true max-buffers=1 drop=true"
    )
    pipeline = Gst.parse_launch(pipeline_str)
    sink = pipeline.get_by_name("sink")

    pipeline.set_state(Gst.State.PLAYING)

    try:
        while True:
            sample = sink.emit("pull-sample")
            if sample:
                buf = sample.get_buffer()
                caps = sample.get_caps()
                width = caps.get_structure(0).get_value("width")
                height = caps.get_structure(0).get_value("height")

                success, map_info = buf.map(Gst.MapFlags.READ)
                if not success:
                    continue

                frame = np.frombuffer(map_info.data, np.uint8).reshape((height, width, 3))
                buf.unmap(map_info)

                # Save capture timestamp in queue
                ts_queue.append(time.time())
                yield frame
            else:
                break
    finally:
        pipeline.set_state(Gst.State.NULL)

# Warm up inference
print("Performing warmup inference...")
dummy_frame = np.zeros((640, 640, 3), dtype=np.uint8)
_ = model.predict(dummy_frame)
print("Warmup complete")

# FPS tracking
frame_count = 0
start_time = time.time()
ts_queue = deque()

# Run batch prediction and latency measurement
for result in model.predict_batch(custom_video_generator(ts_queue)):
    display_time = time.time()

    if ts_queue:  # get the matching capture timestamp
        capture_time = ts_queue.popleft()
        latency = (display_time - capture_time) * 1000  # ms
    else:
        latency = float('nan')

    frame_count += 1
    elapsed_time = time.time() - start_time

    # Print FPS every second
    if elapsed_time >= 1.0:
        fps = frame_count / elapsed_time
        print(f"FPS: {fps:.2f} | Latency: {latency:.1f} ms")
        frame_count = 0
        start_time = time.time()

    cv2.imshow("Webcam Inference", result.image_overlay)
    if cv2.waitKey(1) & 0xFF == ord("q"):
       break

cv2.destroyAllWindows()

Please give this a try and let me know if you face any issues. Thanks again for using PySDK and bringing this to our attention.

Thanks for the swift action. I tried it, and it gives about 35 ms latency, a huge improvement compared to before.

2 Likes