Pipeline Examples

This page shows example pipelines for common scenarios.

Overview

Use these pipelines to quickly get started with a few common scenarios. Follow the Getting Started Guide to get familiar with how to deploy and run these pipelines with cameras or streams.

You can also visit the Solutions tab in the Analytics Library to see a list of ready-to-use pipelines, clone them with a click and modify them as per your needs.

Detect objects with an AI Model

This pipeline detects objects using an Object Detection model from the Marketplace, displays a bounding box around the detected objects, and uses custom code to blur out the object.

1277
from lumeopipeline import VideoFrame  # Auto added by Lumeo
import cv2                    # Auto added by Lumeo
import numpy                  # Auto added by Lumeo


def process_frame(frame: VideoFrame, **kwargs) -> bool:
    width = frame.video_info().width
    height = frame.video_info().height

    with frame.data() as mat:
        try:        
            objects = frame.meta().get_field("objects")
            for object in objects:            
                # Look for the `face` label in the metadata. The label will 
                # depend on the model you use here.          
                if object['label'] == "face":
                    object_rect = object['rect']
                    object_rect = {key: int(value) for key, value in object_rect.items()}
                    y = object_rect['top']
                    yEnd = object_rect['top'] + object_rect['height']
                    x = object_rect['left']
                    xEnd = object_rect['left'] + object_rect['width']
                    mat[y:yEnd,x:xEnd] = cv2.blur(mat[y:yEnd,x:xEnd], (23,23) )
        except:
            pass

    return True

Detect & classify objects with 2 AI Models

This pipeline detects objects using an Object Detection model & a classification model from the Marketplace, displays a bounding box around the detected objects, and uses custom code to blur out the object.

1219
def process_frame(frame: VideoFrame, **kwargs) -> bool:
  width = frame.video_info().width
  height = frame.video_info().height

  with frame.data() as mat:
    try:        
      objects = frame.meta().get_field("objects")
      for object in objects:            
        # Look for the `face` label in the metadata, and blur all masked faces        
        if object['label'] == "face" and object_has_attribute(object,"mask"):
          object_rect = object['rect']
          object_rect = {key: int(value) for key, value in object_rect.items()}
          y = object_rect['top']
          yEnd = object_rect['top'] + object_rect['height']
          x = object_rect['left']
          xEnd = object_rect['left'] + object_rect['width']
          mat[y:yEnd,x:xEnd] = cv2.blur(mat[y:yEnd,x:xEnd], (23,23) )
    except:
        pass

  return True

def object_has_attribute(object, attribute_label) -> bool:
  for attribute in object['attributes']:
    if attribute['label'] == attribute_label:
      return True
  return False

Save snapshots based on AI Model detections

This pipeline does 2 things : (1) stream the output of a face detection & mask classification models and (2) saves a snapshot whenever a face is detected without the mask.

1275
from lumeopipeline import VideoFrame  # Auto added by Lumeo
import cv2                    # Auto added by Lumeo
import numpy                  # Auto added by Lumeo

# This global variable persists across frames.
capture_enabled = True

def process_frame(frame: VideoFrame, **kwargs) -> bool:

  global capture_enabled
  no_face_found = True

  try:        
    objects = frame.meta().get_field("objects")
    for object in objects:            
        # Look for the face label in the metadata
        if object['label'] == "face":
          print(object)
          no_face_found = False                        
          if object_has_attribute(object,"mask") is False and capture_enabled is True:
            # Stop capturing more frames till this face goes out 
            # of the frame. (Otherwise you end up capturing every
            # frame this face appears in. For a more real world solution, 
            # add a tracker node before this to identify unique faces.)
            capture_enabled = False
            
            # Returning true passes the frame to the downstream node.
            # Which in this case, is the snapshot node.
            return True

    if no_face_found:
      # Use the lack of a face to reset our capture flag, so that 
      # we capture the next face that comes into view.
      capture_enabled = True

  except Error as e:
    print("Exception: ", e)
    pass            

  # Returning false drops this frame from further processing by a
  # downstream node.
  return False

def object_has_attribute(object, attribute_label) -> bool:
  for attribute in object['attributes']:
    if attribute['label'] == attribute_label:
      return True
  return False

Save Clips based on AI model detections

This pipeline does 2 things : (1) stream the output of a face detection & mask classification models and (2) saves a clip whenever a face is detected without the mask.

1296
from lumeopipeline import VideoFrame  # Auto added by Lumeo
import cv2                    # Auto added by Lumeo
import numpy                  # Auto added by Lumeo

def process_frame(frame: VideoFrame, **kwargs) -> bool:

  try:        
    objects = frame.meta().get_field("objects")
    for object in objects:            
        # Look for the face label in the metadata
        if object['label'] == "face" and object_has_attribute(object,"mask") is False:
            # Returning true passes the frame to the downstream node.
            # Which in this case, is the encode/save clip node.
            return True
  except:
    pass            

  # Returning false drops this frame from further processing by a
  # downstream node.
  return False

def object_has_attribute(object, attribute_label) -> bool:
  for attribute in object['attributes']:
  	if attribute['label'] == attribute_label:
    	return True
  return False

Inject metadata and conditionally save snapshots

This pipeline injects metadata from a network socket using the Add Metadata Node, displays that metadata on the video and saves a snapshot every time the metadata changes. The saved snapshot can be accessed with the files API along with the injected metadata.

An alternate approach to injecting the metadata is also possible - use the function node to source the metadata and add it to the frame using the meta.set_user_field() method. This metadata will also be saved along with the file and can be accessed using the files API.

1140

In order to inject metadata into the pipeline, you can run netcat to listen to the socket on the gateway device and inject any metadata you type on the terminal:

nc -l 127.0.0.1 19999

Then type in any text on the prompt and hit enter. The Add Metadata node will connect to this port and receive whatever text you type into the netcat prompt.

This metadata is saved with the corresponding Snapshot (or a Clip if you so wish) and can be accessed using the files API.

from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations

# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
previous_meta = None
frame_count = 0

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    
    global previous_meta
    global frame_count
    capture_snapshot = False

    try:
        meta = frame.meta()
        if meta is not None:
            # Print metadata on video frame.
            yidx = 100
            with frame.data() as mat:
              for (key, value) in meta.get_fields("*"): 
                  if value is not None:
                      cv2.putText(mat, key + " : " + value, (50,yidx), cv2.FONT_HERSHEY_DUPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                      yidx = yidx + 25
            
            # Add Custom user metadata to the frame which will
            # get saved along with the snapshot(s) or clips. 
            # (Access it using the Lumeo Files API)
            frame_count = frame_count + 1
            meta.set_user_field("frame_count", str(frame_count))          
            
            # Metadata inserted by "Add Metadata" Node has the key
            # "<nodeid>.metadata". Capture a snapshot every time this
            # metadata changes.
            for (key, value) in meta.get_fields("*.metadata"):
              if previous_meta != value:
                capture_snapshot = True
                previous_meta = value

    except Exception as error:
        print(error, flush=True)
        pass


    # Return False to drop this frame, True to continue processing.
    return capture_snapshot

Detect Loitering using Object Tracker node

This pipeline detects the presence of a given object in the view for more than a certain amount of time. It uses a People detection model, coupled with an Object Tracker to identify a unique object as it moves in the scene across video frames.

Then it processes the object information to determine if a given object has been in the view for more than a threshold amount of time, and draws a bounding box around it to alert the viewer.

1101

Detect Loitering using person detection model, and a Object tracker

from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2  # OpenCV for image manipulations
import numpy  # numpy for image manipulations

# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
object_tracker = {}
frame_count = -1

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    global frame_count
    frame_count += 1

    loitering_objects = []
    with frame.data() as mat:

        meta = frame.meta()

        objects = meta.get_field("objects")

        # Go through all detected objects
        for obj in objects:
            region_label = obj['label'].lower()

            # Only track "person" objects
            if region_label == "person":
                if region_label not in object_tracker:
                    object_tracker[region_label] = {}

                # Extract object's unique ID added by the Track Objects node
                tracker_id = obj['id'] if 'id' in obj else -1

                # If we are seeing this object for the first time, add it to our
                # set of tracked objects
                if tracker_id not in object_tracker[region_label]:
                    object_tracker[region_label][tracker_id] = {}
                    object_tracker[region_label][tracker_id]['first_seen'] = frame_count

                # How many frames have we seen this object in ?
                num_frames_seen = frame_count - object_tracker[region_label][tracker_id]['first_seen']

                # Display object label
                object_rect = obj['rect']
                object_coords = [int(object_rect['left']), int(object_rect['top']), \
                                 int(object_rect['left'] + object_rect['width']),
                                 int(object_rect['top'] + object_rect['height'])]
                object_label = str(tracker_id) + " : " + str(num_frames_seen)
                cv2.putText(mat, object_label, (object_coords[0], object_coords[1]), cv2.FONT_HERSHEY_DUPLEX, 1,
                            (255, 255, 255), 2)

                # Highlight the object if it's been around in the view
                # for over 900 frames ("loitering"). Alternatively, this may be done using clock time (1 minute)
                # to make this frame rate invariant.
                if num_frames_seen > 900:
                    loitering_objects.append(tracker_id)
                    cv2.rectangle(mat, (object_coords[0], object_coords[1]), (object_coords[2], object_coords[3]),
                                  (0, 215, 255), 4)


    # Add loitering object id's to the metadata so it is captured in any clip we save
    # downstream.
    if len(loitering_objects) > 0:
        meta.set_field("loitering_objects", str(loitering_objects))
        meta.save()

    # Return False to drop this frame, True to continue processing.
    return True