Machine Learning

Building a Custom GStreamer Plugin for NVIDIA DeepStream

a production-ready pipeline for multi-stream video analytics: hardware-accelerated decoding, tracking, on-screen rendering, and messaging, all using GStreamer. With standard discovery models deployed in TensorRT, nvinfer it handles everything.

However, common law has its limits. Visual language models, post-processing, rotating bounding boxes, or the need for hot swapping at runtime, these are areas where nvinferthe consideration decreases. Sometimes you have a mature PyTorch stack that your team has painstakingly tweaked, and you want DeepStream to call it. that instead of restarting it in the configuration file.

It's worth noting that for the YOLO family of models in particular, Marcos Luciano's DeepStream-Yolo has already done a great job of implementing custom processing in C++. If C++ is on the table, start there. This article takes a different angle: achieving the exact same result in Python, using a custom GStreamer plugin with pyservicemaker without sacrificing throughput.

A key insight that makes this possible: downstream elements are similar nvtracker, nvdsosdagain nvmsgconv I don't care which component generated the discovery metadata. Write to the DeepStream metadata structure correctly and the entire ecosystem works as it should nvinfer he is never in the picture.

DeepStream metadata

Every buffer that flows through a DeepStream pipeline carries more than one pixel of data. From the time the frames pass nvstreammuxeach one GstBuffer has NvDsBatchMeta the structure to which it is attached. The hierarchy is straightforward and can be found in official documents.

NvDsBatchMeta
├── NvDsUserMeta                        (batch-level custom metadata)
└── NvDsFrameMeta                       (one per source stream)
    ├── NvDsUserMeta                    (frame-level custom metadata)
    └── NvDsObjectMeta                  (one per detected object)
        ├── NvDsClassifierMeta
        └── NvDsUserMeta                (object-level custom metadata)

NvDsBatchMeta describes the entire collection. Each one NvDsFrameMeta corresponds to a single source stream and carries frame-level information such as source ID and frame number. Each one NvDsObjectMeta represents a single detection, which means that when our plugin writes a detection, we will write i NvDsObjectMeta to each one.

The important thing to understand is that none of this is yours nvinfer. It is a shared data contract. Any GStreamer object in the path can read from it, write to it, or both:

  • nvtracker reads object binding boxes and writes tracking IDs.
  • nvdsosd you read boxes and labels to draw overlays.
  • nvmsgconv you read the entire structure to generate payloads of messages.

Our custom plugin will simply write the access to this property in the same way nvinfer and everything below downloads without modification. One important constraint to understand before we write any code: NvDsObjectMeta circumstances cannot be built directly from Python. Trying to emphasize the class suggests a No constructor defined! error at runtime.

The reason is construction. DeepStream manages its metadata objects by using memory pools, pre-allocated blocks that are reused across frames to avoid repeated batch allocations and outbound pipeline allocations. These lakes are managed by NvDsBatchMeta and stay on the C side of the border. The Python binding exposes access to those pools, but intentionally doesn't expose the Python side constructor, because it's too old NvDsObjectMeta outside of the pool will bypass lifecycle management that keeps DeepStream's memory usage predictable. The correct way to find it is to query the collection: batch_meta.acquire_object_meta()which gives you a previously allocated instance from the pool. Once the frame is complete, DeepStream returns it to the pool automatically.

Python bridge: pyservicemaker

To interact with DeepStream metadata from Python, we'll use it pyservicemakercurrent NVIDIA, supports Python SDK for DeepStream. The official documentation covers the basics of pipelines and flows, but stops short of showing how to write and attach metadata from a specified index element. That's the gap that this article fills.

The main abstraction BatchMetadataOperator. Split it up and run it handle_metadata(batch_meta) gives you full access NvDsBatchMeta for every buffer flowing through the pipeline. From there, duplicating frames is as easy as using batch_meta.frame_items and attaching a visual object.

pyservicemaker it also provides a Buffer to wrap around Gst.Buffer that is revealing batch_meta specifically and, importantly, i extract(batch_id) method that returns a DLPack handle to GPU memory for each frame. That's what makes zero copy possible as we can render the frame directly to TensorRT without leaving the GPU.

Instead of using BatchMetadataOperator independently of the probe, we'll wrap the exact same pattern in our custom plugins do_transform_ip method, which gives us control over the life cycle of elements, structures, and caps along with metadata access. But first, we need to build that plugin.

Python GStreamer plugin available

GStreamer finds plugins at runtime by scanning the listed directory GST_PLUGIN_PATH. For Python plugins specifically, it looks like inside a python/ subdirectory within each of those paths. That means your plugin is a .py file downloaded to the correct location, no compilation, no CMake, no shared library. The tradeoff is that the registration pattern is rigid and getting it wrong produces silent failures that are really hard to fix.

$GST_PLUGIN_PATH/
└── python/
    └── gstexampleplugin.py   # your plugin

Set up GST_PLUGIN_PATH point to the parent directory and GStreamer will find it python/gstexampleplugin.py automatically on the next pipeline run.

Plugin Skeleton

Here's a small skeleton outline of the passthrough inference feature: it finds the combined video buffers, uses the inference, appends the metadata, and passes the buffer downstream unmodified.

import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import Gst, GstBase, GObject

import torch
from pyservicemaker import Buffer

GST_PLUGIN_NAME = "gstexampleplugin"

Gst.init(None)

class GstExamplePlugin(GstBase.BaseTransform):

    __gstmetadata__ = (
        'GstExamplePlugin',                     # name
        'Filter/Effect/Video',                  # classification
        'Custom inference element',             # description
        'Your Name'                             # author
    )

    src_format = Gst.Caps.from_string(
        "video/x-raw(memory:NVMM), format=RGB, "
        "width=(int)[ 1, 2147483647 ], height=(int)[ 1, 2147483647 ], "
        "framerate=(fraction)[ 0/1, 2147483647/1 ]"
    )
    sink_format = Gst.Caps.from_string(
        "video/x-raw(memory:NVMM), format=RGB, "
        "width=(int)[ 1, 2147483647 ], height=(int)[ 1, 2147483647 ], "
        "framerate=(fraction)[ 0/1, 2147483647/1 ]"
    )

    src_pad_template = Gst.PadTemplate.new(
        "src", Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, src_format
    )
    sink_pad_template = Gst.PadTemplate.new(
        "sink", Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, sink_format
    )
    __gsttemplates__ = (src_pad_template, sink_pad_template)

    __gproperties__ = {
        'model-engine': (
            str,
            'TensorRT engine path',
            'Path to the .engine file',
            '',
            GObject.ParamFlags.READWRITE
        ),
        'confidence-threshold': (
            float,
            'Confidence threshold',
            'Minimum confidence to attach a detection',
            0.0, 1.0, 0.5,
            GObject.ParamFlags.READWRITE
        ),
    }

    def __init__(self):
        super().__init__()
        self.model_engine = ''
        self.confidence_threshold = 0.5
        self.engine = None

    def do_get_property(self, prop):
        if prop.name == 'model-engine':
            return self.model_engine
        elif prop.name == 'confidence-threshold':
            return self.confidence_threshold

    def do_set_property(self, prop, value):
        if prop.name == 'model-engine':
            self.model_engine = value
        elif prop.name == 'confidence-threshold':
            self.confidence_threshold = value

    def do_start(self):
        # Load your TensorRT engine here
        self.engine = load_engine(self.model_engine) # This function should be implemented
        return True

    def do_transform_ip(self, gst_buffer: Gst.Buffer) -> Gst.FlowReturn:
        """In-place transform: attach metadata, pass buffer unchanged."""
        buffer = Buffer(gst_buffer)
        batch_meta = buffer.batch_meta

        frames = []
        for frame_meta in batch_meta.frame_items:
            t = torch.utils.dlpack.from_dlpack(buffer.extract(frame_meta.batch_id))
            frames.append
        batch = torch.stack(frames, dim=0)

        # Run your model inference
        results = self.engine(batch)
        
        # Now we will need to iterate over the results for each frame
        # and attach it to the object_meta in case it is detection/segmentation
        # otherwise we can do it as user_meta
        # The following is pseudocode, which depends on your inference
        for frame_meta in batch_meta.frame_items:
            for det in results:
                obj = batch_meta.acquire_object_meta()
                # Fill the obj with each detection
                ...
                frame_meta.append(obj)
 
        return Gst.FlowReturn.OK


# --- Registration ---
GObject.type_register(GstExamplePlugin)
__gstelementfactory__ = (GST_PLUGIN_NAME, Gst.Rank.NONE, GstExamplePlugin)

A few things to know about this skeleton:

GstBase.BaseTransform is the correct base class for an in-place filter, which receives a buffer, modifies it (by appending metadata) and passes it downstream. We take it out do_transform_ip rather than do_transform because we are not afraid of the new output buffer.

__gstmetadata__ again __gsttemplates__ they are not optional. GStreamer will not register an object without it. Caps thread video/x-raw(memory:NVMM) tells GStreamer this thing works with NVIDIA memory that is important to stay on the GPU in the DeepStream pipeline.

__gproperties__ it reveals model-engine again confidence-threshold as first class GStreamer properties, which means you can set them from gst-launch command line or Python pipeline code without touching the source.

The last two lines are required to register: GObject.type_register tells the GObject type system about the class as well __gstelementfactory__ tells GStreamer what element name to expose and what class to instantiate.

Validating the plugin. Once the file is in place and the cache is clear, confirm the registration with:

GST_PLUGIN_PATH=/path/to/your/plugins gst-inspect-1.0 gstexampleplugin

You should see element metadata, pad templates, and both properties listed. If you see them, GStreamer knows about your plugin and you're ready to throw it into the pipeline.

Example of End-to-End Inference with Ultralytics

With the plugin skeleton in place, it's time to fill in the inference logic. The full working code is available as a GitHub Gist. Once you get it, you can test it like we did before or launch the pipeline. Here's a simple example that just creates an index and displays the fps:

gst-launch-1.0 -v 
  nvstreammux name=m width=1280 height=720 batch-size=1 
    batched-push-timeout=33000 ! 
  nvvideoconvert nvbuf-memory-type=0 ! 
  'video/x-raw(memory:NVMM), format=RGB' ! 
  gstyoloplugin model-path=/path/to/yolo26s.engine ! 
  fpsdisplaysink text-overlay=false silent=false sync=false 
    video-sink=fakesink 
  uridecodebin uri=file:///path/to/video.mp4 ! m.sink_0

Code Testing

Compatibility problem

If you read the code you may have noticed that we are overriding this tuple thing, but only inside ultralytics.nn.backends.tensorrt module as this is where the problem lies. There is a parallel case known between the TensorRT Python bindings as well as The GStreamer Python wrapper framework (PyGObject) that will crash your pipeline with the infamous “Segmentation fault (core dumped).” That's why it was necessary to create this code snippet that helps us maintain the intended behavior:

import ultralytics.nn.backends.tensorrt as trt_backend

_original_tuple = tuple

def safe_tuple(obj):
    if "tensorrt" in type(obj).__module__ and type(obj).__name__ == "Dims":
        return _original_tuple(obj[i] for i in range(len(obj)))
    return _original_tuple(obj)

trt_backend.tuple = safe_tuple

This replaces the tuple reference within the Ultralytics backend's namespace at runtime with a version that reverts to index-based access of Dims things, leaving everything else untouched. It's not pretty, but it's surgical and needs to happen during import, before any model is solidified.

The Inference Loop

The inference loop itself is straightforward:

  1. Extract frames from buffers
  2. Process + targeting
  3. Paste the results into the metadata of each frame element if the elements at the bottom of the pipeline are deep plugins.

Below, is a zero-copy code snippet using DLPack:

frames = []
for frame_meta in batch_meta.frame_items:
    t = torch.utils.dlpack.from_dlpack(buffer.extract(frame_meta.batch_id))
    frames.append
batch = torch.stack(frames, dim=0)

It pre-processes the Input

YOLO models, if you go through a torch.Tensorexpect a fixed installation orientation (N, 3, 640, 640) according to the scriptures. However, the frames are coming out nvstreammux it will be any solution your source. The method used is letterboxing: we scale the frame to fit within the target dimensions while maintaining the aspect ratio, and then wrap the remaining space. The key insight here is that we can do this entirely on the GPU, across the entire cluster at once, without touching CPU memory.

With deframing, letterboxing, inference and interpolation all happening on the GPU in one go do_transform_ip call, the plugin behaves exactly nvinfer in its view of everything downstream but with the full flexibility of the underlying Python stack.

From here, the rest of the DeepStream pipeline takes over: nvtracker provides IDs, nvdsosd you draw overlays too nvmsgconv arrange payloads.

Key Takeaways and Next Steps

If you've followed this far, you have a working pattern to change nvinfer it's your Python inference element, and more importantly, you understand why each episode is the way it is.

The pattern is familiar. Everything described here: plugin skeleton, integrated preprocessing and metadata attachment is model-agnostic. Replacing Ultralytics YOLO rfdetr for Roboflow is straightforward and GStreamer and pyservicemaker the scaffolding remains the same. It's the same with rare architectures: NVIDIA's deepstream_reference_apps repository includes a working example of integrating the Vision-Language Model with vLLM using this very plugin method, which you should read if you're pushing without getting into video understanding.

The full code of the plugin is available as a GitHub Gist. If you build something on top of it: a different model, multi-stream setup or VLM integration, I'd be curious to hear how it goes. Enjoy coding!

Source link

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button