Custom Function

Manually process or modify frames and metadata using python code

Overview

Custom Function node lets you run custom python code to process or modify frames, and extract or add metadata to frames.

Inputs & Outputs

  • Inputs : 1, Media Format : Raw Video
  • Outputs : 1, Media Format: Raw Video
  • Output Metadata : See below.

Properties

PropertyValue
nameFunction name
codePython code to execute in the function.

For your convenience the following variables containing useful information are reserved and passed to the process_frame function.

Minimal function signature:
def process_frame(frame: VideoFrame, **kwargs) -> bool:

Exhaustive list:
def process_frame(frame: VideoFrame, application_id, application_name, deployment_id, deployment_name, node_id, **kwargs) -> bool:

VariableDescription
application_idApplication UUID
application_nameApplication Name
deployment_idDeployment UUID
deployment_nameDeployment Name
node_idThe ID (String) that identifies this Function Node in the pipeline. ex: function1

Metadata

Access metadata using the Function Node or using the API, from the snapshot or clip saved downstream of this node.

Metadata PropertyDescription
<custom metadata key>Custom metadata, if added by the code using frame.meta().set_field() (see reference below).

Function Template

The Function node can run any Python 3 code that you provide. It uses the lumeopipeline python library to access and process frames, and bundles OpenCV and Numpy along with it so you can use those libraries to manipulate frames.

An example template is listed below (when you create this node in the Pipeline Editor, it comes pre-populated with a template to get you started quickly). A few things to note:

  • The process_frame method is mandatory in the Function.
  • Return True from the function to continue processing the frame, False to drop the frame from further processing.
  • You cannot alter the frame size with this Node currently.
  • You can import and use additional libraries in the Function as long as they are installed on the system, or you install them explicitly using lumeopipeline.Utils.install_import method.
  • You can make API calls in the Function, but must keep in mind that the process_frame method blocks the rest of the pipeline, so if your function takes too long to execute, it will delay processing downstream.
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
frame_count = 0

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    # Insert your code here.
    global frame_count
    frame_count = frame_count + 1

    with frame.data() as mat:

        # Here's an example that uses OpenCV to put arbitrary text on the frame
        cv2.putText(mat, "Frame " + str(frame_count), (50,50), cv2.FONT_HERSHEY_DUPLEX, 0.7, (0, 0, 255), 1)

        # Here's an example of how to access the frame level metadata an print on the frame
        meta = frame.meta()
        yidx = 100
        for (key, value) in meta.get_all().items(): 
            cv2.putText(mat, key + " : " + str(value), (50,yidx), cv2.FONT_HERSHEY_DUPLEX, 0.7, (255, 255, 255), 1, cv2.LINE_AA)
            yidx = yidx + 50

        # Add your own metadata to the frame
        meta.set_field("frame_count", frame_count)
        meta.save()


    # Return False to drop this frame, True to continue processing.
    return True

Debugging and Logging

Head over to the Logs tab under the Deployment to view the logs in real time while the pipeline is running.

You can use print() in the Function node to log data to the Console or add a Display Stream Info Node and set it's Log Metadata property to True.

Accessing Metadata

Most nodes in the Pipeline add their own metadata to each frame. This metadata can be accessed using the lumeopipeline library in the Function (reference below). You can find more information about metadata added by specific nodes in each Node's reference.

You can also add your own custom metadata to the frame. This metadata will be passed along to any webhooks sent out by subsequent nodes and saved along with clips and snapshots generated by Save Clip Node & Save Snapshot Node .

from lumeopipeline import VideoFrame

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)

        meta = frame.meta()
        if meta is not None:
            print(meta.get_all())

    except Exception as error:
        print(error)
        pass

    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
prev_frame_objects = []

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    # Insert your code here.
    global prev_frame_objects

    with frame.data() as mat:
      
        # Grab all the metadata stored with this frame. Includes
        # Object level metadata as well as metadata added by each node.
        meta = frame.meta().get_all()
        
        # Alternatively, you can also do: 
        # objects = frame.meta().get_field("objects")
        # nodes = frame.meta().get_fields("nodes")
        objects = meta['objects']
        nodes = meta['nodes']
        
        # Iterate through all objects
        # See here for properties of each object : 
        # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
        for obj in objects:
          
            # Draw a red bounding box
            obj_rect = obj['rect']
            cv2.rectangle(mat, 
                          (obj_rect['left'], obj_rect['top']), 
                          (obj_rect['left'] + obj_rect['width'], obj_rect['top'] + obj_rect['height']), 
                          (0,0,255), thickness=2)
            
            # Write label. 
            # obj['label'] -- object class
            # obj['id'] -- unique id assigned to this object by Track Objects node, persistent across frames. 
            # obj['attributes'] -- if you use secondary classifiers, this contains additional object attributes.
            label ="{} {} {}".format(obj['id'] if 'id' in obj else 'not-tracked',
                                     obj['label'],
                                     ",".join([obj_attr['label'] for obj_attr in obj['attributes']]))
            Utils.write_label_on_frame(mat, obj_rect['left'], obj_rect['top'], label)
            


    # Return False to drop this frame, True to continue processing.
    return True
from lumeopipeline import VideoFrame

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)

        meta = frame.meta()
        if meta is not None:
            # Log metadata added by the all Nodes
            # nodes is a dictionary that contains each node's 
            # metadata. Refer to node specific docs for the property
            # values.
            nodes = meta.get_field("nodes")
            for node_id in nodes:
              	print("{} : {}".format(node_id,nodes[node_id]))

    except Exception as e:
        print(e)

    return True
from lumeopipeline import VideoFrame

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)

        meta = frame.meta()
        if meta is not None:
            print(meta.get_all())
            meta.set_field("key", "value")
            meta.save()

    except Exception as error:
        print(error)
        pass

    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    with frame.data() as mat:
      
        # Grab the object metadata 
        meta = frame.meta()
        objects = meta.get_field("objects")
        
        # Iterate through all objects
        # See here for properties of each object : 
        # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
        probability_threshold = 0.5  # Set your threshold value here
        objects = [obj for obj in objects if obj['probability'] >= probability_threshold]

        meta.set_field("objects", objects)
        meta.save()

    # Return False to drop this frame, True to continue processing.
    return True

lumeopipeline Reference

methodDescription
VideoFrame.video_info() --> VideoInfoReturns information about the video frame.

VideoInfo.height : Frame height
VideoInfo.width : Frame width
VideoInfo.fps_n : Numerator of Frame rate
VideoInfo.fps_d : Denominator of Frame rate
VideoFrame.data() -> numpy.ndarrayReturns a Width x Height x 4 dimensional array containing the frame's pixel data. The array is in the BGRA format.

Altering this array will alter the raw video frame output by this node.
VideoFrame.meta() --> MetaReturns metadata object that contains inserted by Pipeline nodes, as a dictionary.
Meta.get_all() -> DictionaryReturns all metadata fields (key, value pairs) in the frame.
Meta.get_field(key) -> stringReturns value for a specific field
Meta.set_field(key, value) -> boolSets the value for a specific field
Meta.save() -> NoneSave updated metadata if it was modified using set_field or set_user_field
Meta.get_fields(pattern) -> Generator of tuples (field_name, field_value)Get all fields whose names match given pattern

Field names are in form of dot-separated words. The pattern allows for wildcarding of unneeded words with *. The rightmost star matches all the remaining words.

Example patterns:
- *.inference.label skips the first word, it would match
-- inference_node_0123.inference.label
-- inference_node_5678.inference.label

- user.* returns all fields whose names start with "user", it would match
-- user.property1
-- user.property2
-- user.complex.property

- * matches everything
VideoFrame.tensors() --> Vec<Tensor>Returns a list of the output tensors of Inference Node(s).

Can be used to apply custom parsing using the raw inference outputs results.

Check Custom Model Parser for more details.
Tensor.layers -> Vec<Layer>Iterate over this field to get the multiple output layers
Layer.name -> stringThe output layer name
Layer.dimensions -> Vec<Int>The shape of the output layer
Layer.data -> raw_bufferThe raw output data of this layer

lumeopipeline Utils class

For convenience we have an utility class which contains functions that are used in multiple of our dynamic nodes.

You can access them by adding to the Python import section: from lumeopipeline import Utils

See "Use Utils" under Examples below for reference.

methodDescription
install_import(package_name=None, attribute_name=None, module_name=None, version=None)Allows users to install and import additional python modules on devices. Examples: install and import module: pytz = install_import('pytz'), import attribute from module: sleep = install_import('time', 'sleep'), import a module which has a different name from the package it belongs to. box = install_import('python-box', module_name='box'), Install a specific version jwt = install_import('PyJWT', module_name='jwt', version='1.5.3')
write_label_on_frame(mat, xidx, yidx, label)Auxiliary function to write text on frame
num_milliseconds_seen(last_seen_frame, first_seen_frame, video_info)Returns the total of milliseconds passed between two frames
get_timestamp(frame, frame_count)Returns the timestamp of a certain frame, based on the current frame_count and video FPS
send_webhook(webhook_url, body, compression=True)Sends a webhook to a specific URL. The compression is enabled by default, you should pass False to the 3rd argument of the method to disable it.

Examples

Basics

from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


frames_to_skip = 5700

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    global frames_to_skip

    if frames_to_skip > 0:
        frames_to_skip = frames_to_skip - 1
        return False
               
    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


def process_frame(frame: VideoFrame, **kwargs) -> bool:

    # Insert your code here.
    try:
        meta = frame.meta()

        # All Metadata in this frame
        print(meta.get_all())

        # Metadata about any objects in this frame from Model Inference node
        print(meta.get_field("objects"))

    except Exception as error:
        print(error)
        pass
            
    # Return False to drop this frame, True to continue processing.
    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    try:
        meta = frame.meta()

        if meta is not None:
            yidx = 100
            with frame.data() as mat:
              for (key, value) in meta.get_all().items(): 
                  if value is not None:
                      cv2.putText(mat, key + " : " + str(value), (50,yidx), cv2.FONT_HERSHEY_DUPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                      yidx = yidx + 25


    except Exception as error:
        print(error, flush=True)
        pass

    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
frame_count = 0

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    global frame_count
    frame_count = frame_count + 1

    try:
        meta = frame.meta()
        if meta is not None:
          # Set user metadata that will be saved along with a
          # snapshot or clip. (Access it using the Lumeo Files API)
          meta.set_field("frame_count", str(frame_count))
    except Exception as error:
        print(error, flush=True)
        pass
          
    # Return False to drop this frame, True to continue processing.
		return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils

# Installs and import additional python modules on device, 
# or just imports it if already installed
#
# Usage:
# - install and import module:
#   pytz_ref = install_import('pytz')
#
# - import attribute from module:
#   sleep = install_import('time', 'sleep')
#
# - import a module which has a different name from the package it belongs
#   box = install_import('python-box', module_name='box')
#
# - install a specific version
#   jwt = install_import('PyJWT', module_name='jwt', version='1.5.3')

pytz = Utils.install_import('pytz')
from pytz import reference as pytz_ref

frame_count = 0

# Set to False if the Webhook endpoint does not support compression
compression = True

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    global frame_count
    global compression
    frame_count = frame_count + 1

    with frame.data() as mat:
      Utils.write_label_on_frame(mat, 10, 10, 'Example of lumeopipeline Utils')

      print("Number of milliseconds: " + str(Utils.num_milliseconds_seen(frame_count, 0, frame.video_info())))
      print("Frame timestamp: " + str(Utils.get_timestamp(frame, frame_count)))

      try:
        # This will be converted to JSON internaly by the send_webhook function
        body = {'name': "Example", 'city': "New York"}
        Utils.send_webhook('http://127.0.0.1', body, compression)
      except Exception as e:
        print(e)

    # Return False to drop this frame, True to continue processing.
    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations

frame_count = 0
total_objects_passed = 0
objects_crossed_per = 0
prev_timestamp = 0

def process_frame(frame: VideoFrame, **kwargs) -> bool:

  global frame_count
  global total_objects_passed
  global objects_crossed_per
  global prev_timestamp

  try:
    
    with frame.data() as mat:
      
      # Get Current Frame's relative timestamp (in seconds) since 
      # start of stream
      cur_timestamp = Utils.get_timestamp(frame, frame_count)
      frame_count += 1
      
      # Grab metadata from previous nodes in this pipeline
      meta = frame.meta()
      nodes = meta.get_field('nodes')
  
  		# Iterate through line counter nodes and add up 
      # objects that have crossed any line 
      for node_id in nodes:
        if nodes[node_id]['type'] == 'annotate_line_counter':
          for line_id in nodes[node_id]['lines']:
            total_objects_passed += nodes[node_id]['lines'][line_id]['total_objects_crossed_delta']

      # Every 2 seconds, compute the throughput (objects crossed) per minute
      # Then reset the counts
      if (cur_timestamp - prev_timestamp > 2):
        objects_crossed_per = total_objects_passed * 30
        total_objects_passed = 0
        prev_timestamp = cur_timestamp
  
  		# Display values on the frame
      Utils.write_label_on_frame(mat, 50,50, "Throughput : {} objects/minute".format(objects_crossed_per))
      meta.set_field('objects_crossed_per', objects_crossed_per)
      meta.save()

  except Exception as e: 
    print(e)
    pass

  # Return False to drop this frame, True to continue processing.
  return True

Image and Media Manipulation

from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations
from urllib.request import urlopen

print("module name:  ", __name__)
print("watermark.py : opencv ver : ", cv2.__version__)

watermarkurl = "https://s.gravatar.com/avatar/f8066a0ab6f5ffa7d121aa210d988980?s=80"
watermarkimage = numpy.asarray(bytearray(urlopen(watermarkurl).read()), dtype="uint8")
watermark = cv2.imdecode(watermarkimage, cv2.IMREAD_UNCHANGED)
if watermark.shape[2] != 4:
    watermark = cv2.cvtColor(watermark,cv2.COLOR_BGR2BGRA)
#watermark = cv2.imread("watermark.png", cv2.IMREAD_UNCHANGED)
watermark_overlay = None

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    global watermark
    global watermark_overlay

    with frame.data() as mat:
        if watermark_overlay is None:
            (wH, wW) = watermark.shape[:2]
            width = frame.video_info().width
            height = frame.video_info().height
            watermark_overlay = numpy.zeros((height,width,4), dtype="uint8")
            watermark_overlay[10:10+wH, 10:10+wW] = watermark

        cv2.addWeighted(watermark_overlay, 1.0, mat, 1.0, 0, mat)
        #cv2.putText(mat, "LUMEO", (50,50), cv2.FONT_HERSHEY_DUPLEX, 1, (0, 0, 255), 2)

    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


# Global variables that persist across frames go here.
# Onetime initialization code can also live here.

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    # Insert your code here.

    with frame.data() as mat:
      
        # Grab all the metadata stored with this frame. Includes
        # Object level metadata as well as metadata added by each node.
        meta = frame.meta().get_all()
        
        # Alternatively, you can also do: 
        # objects = frame.meta().get_field("objects")
        # nodes = frame.meta().get_fields("nodes")
        objects = meta['objects']
        nodes = meta['nodes']
        
        # Iterate through all objects
        # See here for properties of each object : 
        # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
        for obj in objects:
          
            # Draw a red bounding box
            obj_rect = obj['rect']
            cv2.rectangle(mat, 
                          (obj_rect['left'], obj_rect['top']), 
                          (obj_rect['left'] + obj_rect['width'], obj_rect['top'] + obj_rect['height']), 
                          (0,0,255), thickness=2)
            
            # Write label. 
            # obj['label'] -- object class
            # obj['id'] -- unique id assigned to this object by Track Objects node, persistent across frames. 
            # obj['attributes'] -- if you use secondary classifiers, this contains additional object attributes.
            label ="{} {} {}".format(obj['id'] if 'id' in obj else 'not-tracked',
                                     obj['label'],
                                     ",".join([obj_attr['label'] for obj_attr in obj['attributes']]))
            Utils.write_label_on_frame(mat, obj_rect['left'], obj_rect['top'], label)
            


    # Return False to drop this frame, True to continue processing.
    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import base64                         # Base64 encoding for image data

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    with frame.data() as mat:
                
        # Downscale and get entire frame as base64 encoded string
        frame_base64 = get_image_as_base64(mat, max_dim=640)
        # do something with the frame_base64
        
        # Get all objects as base64 encoded strings
        meta = frame.meta().get_all()
        objects = meta['objects']
        
        for obj in objects:
            obj_rect = obj['rect']
            obj_base64 = get_image_as_base64(mat, max_dim=160, roi_coords=obj_rect)
            # do something with the obj_base64
        
        
    return True

def downscale_image(image, max_dim=640):
    
    largest_dim = max(image.shape[0], image.shape[1])
    scale_factor = 1
    if largest_dim > max_dim:
        scale_factor = max_dim / largest_dim
        
    if scale_factor != 1:
        scaled_image = cv2.resize(image, 
                                  (int(image.shape[1]*scale_factor), int(image.shape[0]*scale_factor)), 
                                  interpolation = cv2.INTER_CUBIC)
        return scaled_image
    return image

def get_image_as_base64(image, max_dim, roi_coords=None):

    # Crop to ROI
    if roi_coords:
        img_cropped = image[max(roi_coords['top'],0):min(roi_coords['top']+roi_coords['height'],image.shape[0]), 
                            max(roi_coords['left'],0):min(roi_coords['left']+roi_coords['width'],image.shape[1])]
    else:
        img_cropped = image    
        
    # Downscale image if it is too large
    img_cropped = downscale_image(img_cropped, max_dim=max_dim)

    # Encode to JPEG
    img_encoded = cv2.imencode('.jpg', img_cropped, [int(cv2.IMWRITE_JPEG_QUALITY), 90])[1]

    return base64.b64encode(img_encoded).decode('utf-8')

Access and Manipulate Metadata

from lumeopipeline import VideoFrame

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)

        meta = frame.meta()
        if meta is not None:
            print(meta.get_all())

    except Exception as error:
        print(error)
        pass

    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
prev_frame_objects = []

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    # Insert your code here.
    global prev_frame_objects

    with frame.data() as mat:
      
        # Grab all the metadata stored with this frame. Includes
        # Object level metadata as well as metadata added by each node.
        meta = frame.meta().get_all()
        
        # Alternatively, you can also do: 
        # objects = frame.meta().get_field("objects")
        # nodes = frame.meta().get_fields("nodes")
        objects = meta['objects']
        nodes = meta['nodes']
        
        # Iterate through all objects
        # See here for properties of each object : 
        # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
        for obj in objects:
          
            # Draw a red bounding box
            obj_rect = obj['rect']
            cv2.rectangle(mat, 
                          (obj_rect['left'], obj_rect['top']), 
                          (obj_rect['left'] + obj_rect['width'], obj_rect['top'] + obj_rect['height']), 
                          (0,0,255), thickness=2)
            
            # Write label. 
            # obj['label'] -- object class
            # obj['id'] -- unique id assigned to this object by Track Objects node, persistent across frames. 
            # obj['attributes'] -- if you use secondary classifiers, this contains additional object attributes.
            label ="{} {} {}".format(obj['id'] if 'id' in obj else 'not-tracked',
                                     obj['label'],
                                     ",".join([obj_attr['label'] for obj_attr in obj['attributes']]))
            Utils.write_label_on_frame(mat, obj_rect['left'], obj_rect['top'], label)
            


    # Return False to drop this frame, True to continue processing.
    return True
from lumeopipeline import VideoFrame

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)

        meta = frame.meta()
        if meta is not None:
            # Log metadata added by the all Nodes
            # nodes is a dictionary that contains each node's 
            # metadata. Refer to node specific docs for the property
            # values.
            nodes = meta.get_field("nodes")
            for node_id in nodes:
              	print("{} : {}".format(node_id,nodes[node_id]))

    except Exception as e:
        print(e)

    return True
from lumeopipeline import VideoFrame

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)

        meta = frame.meta()
        if meta is not None:
            print(meta.get_all())
            meta.set_field("key", "value")
            meta.save()

    except Exception as error:
        print(error)
        pass

    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    with frame.data() as mat:
      
        # Grab the object metadata 
        meta = frame.meta()
        objects = meta.get_field("objects")
        
        # Iterate through all objects
        # See here for properties of each object : 
        # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
        probability_threshold = 0.5  # Set your threshold value here
        objects = [obj for obj in objects if obj['probability'] >= probability_threshold]

        meta.set_field("objects", objects)
        meta.save()

    # Return False to drop this frame, True to continue processing.
    return True

Object Tracking

from lumeopipeline import VideoFrame, Utils
import cv2
import numpy as np

# Node-specific variables
node = {
    'tracked_objects': {},
    'max_frames_missing': 600
}

def IoU(box1, box2):
    """Calculate Intersection over Union (IoU) between two bounding boxes."""
    x1, y1, x2, y2 = box1
    x1_p, y1_p, x2_p, y2_p = box2

    inter_x1 = max(x1, x1_p)
    inter_y1 = max(y1, y1_p)
    inter_x2 = min(x2, x2_p)
    inter_y2 = min(y2, y2_p)

    inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)

    box1_area = (x2 - x1) * (y2 - y1)
    box2_area = (x2_p - x1_p) * (y2_p - y1_p)

    iou = inter_area / float(box1_area + box2_area - inter_area)

    return iou

def track_objects(node, frame, detected_objects):
    new_tracked_objects = {}

    for obj in detected_objects:
        bbox = obj['rect']
        box = [bbox['left'], bbox['top'], bbox['left'] + bbox['width'], bbox['top'] + bbox['height']]
        best_iou = 0
        best_id = None

        for obj_id, tracked in node['tracked_objects'].items():
            iou = IoU(box, tracked['rect'])
            if iou > best_iou:
                best_iou = iou
                best_id = obj_id

        if best_iou > 0.3:  # IoU threshold to match objects
            new_tracked_objects[best_id] = {
                'rect': box,
                'label': obj['label'],
                'probability': obj['probability'],
                'frames_missing': 0
            }
        else:
            new_id = len(node['tracked_objects']) + len(new_tracked_objects)
            new_tracked_objects[new_id] = {
                'rect': box,
                'label': obj['label'],
                'probability': obj['probability'],
                'frames_missing': 0
            }

    for obj_id, tracked in node['tracked_objects'].items():
        if obj_id not in new_tracked_objects:
            tracked['frames_missing'] += 1
            if tracked['frames_missing'] < node['max_frames_missing']:
                new_tracked_objects[obj_id] = tracked

    node['tracked_objects'] = new_tracked_objects

    for obj_id, tracked in node['tracked_objects'].items():
        for obj in detected_objects:
            bbox = obj['rect']
            box = [bbox['left'], bbox['top'], bbox['left'] + bbox['width'], bbox['top'] + bbox['height']]
            if tracked['rect'] == box:
                obj['id'] = obj_id

    save_metadata(frame, detected_objects)

def save_metadata(frame, detected_objects):
    try:
        meta = frame.meta()
        meta.set_field("objects", detected_objects)
        meta.save()
    except Exception as error:
        print(error)
        pass

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    with frame.data() as mat:
        meta = frame.meta()
        objects = meta.get_field("objects")

        track_objects(node, frame, objects)

    return True

Integrations

from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import concurrent.futures

frame_count = 0
last_webhook_frame_count = 0

# Use a background thread to send webhooks so that it doesn't
# slow down the video processing pipeline
background_thread = concurrent.futures.ThreadPoolExecutor()

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    global frame_count
    global last_webhook_frame_count
    global background_thread

    send_webhook = False
    frame_count = frame_count + 1
    
    # Grab all the metadata stored with this frame. Includes
    # Object level metadata as well as metadata added by each node.
    meta = frame.meta().get_all()
    objects = meta['objects']
    nodes = meta['nodes']

    # Iterate through all objects to see if we detect a person
    # See here for properties of each object : 
    # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
    # Assumes you have a AI Model node before this Custom function node, configured
    # with a Person Detection Model.
    for obj in objects:
        if obj['label'] == 'person':
            send_webhook = True

    # Send a webhook if a person was detected, but no frequently 
    # than once every 5 seconds (~50 frames at 10 FPS) to avoid spamming the server multiple times 
    # a second.
    if send_webhook and \
       (last_webhook_frame_count == 0 or last_webhook_frame_count - frame_count > 50):
        
        try:
            # The body will be converted to JSON by the send_webhook function
            body = {'event_type': "PersonDetected"}
            background_thread.submit(send_webhook, body=body) 
        except Exception as e:
            print(e)

        last_webhook_frame_count = frame_count

    return True


def send_webhook(body):
    server_address = 'http://127.0.0.1'
    gzip_compression = False
    try:
        # This method sends the webhook with a application/json content type
        Utils.send_webhook(server_address, body, gzip_compression)
    except Exception as e: 
        print("Send webhook error : {}".format(e))
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import concurrent.futures

frame_count = 0
last_webhook_frame_count = 0

# Use a background thread to send webhooks so that it doesn't
# slow down the video processing pipeline
background_thread = concurrent.futures.ThreadPoolExecutor()

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    global frame_count
    global last_webhook_frame_count
    global background_thread

    send_webhook = False
    frame_count = frame_count + 1
    
    # Grab all the metadata stored with this frame. Includes
    # Object level metadata as well as metadata added by each node.
    meta = frame.meta().get_all()
    objects = meta['objects']
    nodes = meta['nodes']

    # Look through node properties and detect when a person has crossed a line.
    # Assumes there is a Line counter node before this custom function.
    # Metadata format: https://docs.lumeo.com/docs/line-crossing-counter-node
    if 'annotate_line_counter1' in nodes and \
       nodes['annotate_line_counter1']['lines']['line1']['total_objects_crossed_delta'] > 0:
        send_webhook = True

    # Send a webhook if a person was detected, but no frequently 
    # than once every 5 seconds (~50 frames at 10 FPS) to avoid spamming the server multiple times 
    # a second.
    if send_webhook and \
       (last_webhook_frame_count == 0 or last_webhook_frame_count - frame_count > 50):
        
        try:
            # The body will be converted to JSON by the send_webhook function
            body = {'event_type': "PersonDetected"}
            background_thread.submit(send_webhook, body=body) 
        except Exception as e:
            print(e)

        last_webhook_frame_count = frame_count

    return True


def send_webhook(body):
    server_address = 'http://127.0.0.1'
    gzip_compression = False
    try:
        # This method sends the webhook with a application/json content type
        Utils.send_webhook(server_address, body, gzip_compression)
    except Exception as e: 
        print("Send webhook error : {}".format(e))
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import os
import json
import concurrent.futures
from datetime import datetime

# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
node_properties = {
    "background_thread": concurrent.futures.ThreadPoolExecutor(),
    'storage_dir': '/var/lib/lumeo/userdata/',
    'file_name': 'mydata.jsonl',
    'prev_objects': [],
    'initialized': False
}

def process_frame(frame: VideoFrame, **kwargs) -> bool:
      
    object_properties = {}
      
    if not node_properties['initialized']:
        init_persistent_storage()
        node_properties['initialized'] = True
      
    # Grab all the object metadata stored with this frame.
    frame_meta = frame.meta()
    meta = frame_meta.get_all()

    # Iterate over all the objects in the frame and count any new objects.
    objects = meta['objects']
    for obj in objects:
        if obj.get('id', -1) not in node_properties['prev_objects']:
            object_properties[obj['label']] = object_properties.get(obj['label'], 0) + 1
                
    # Append the object properties to the persistent storage
    # TODO: Clean up persistent storage if it gets too large.
    # Note: Persistent storage file is within a docker container and will be lost if the container is deleted.
    if any(value > 0 for value in object_properties.values()):
        object_properties['timestamp'] = datetime.utcnow().isoformat() + "Z"
        node_properties['background_thread'].submit(append_to_persistent_storage, object_properties)
    
    # Update the list of objects in the frame.
    node_properties['prev_objects'] = [obj.get('id', -1) for obj in objects]
    
    return True
    
    
def init_persistent_storage(reset=True):    
    os.makedirs(f"{node_properties['storage_dir']}", exist_ok=True)
    if reset:
        file_path = f"{node_properties['storage_dir']}{node_properties['file_name']}"
        if os.path.exists(file_path):
            os.remove(file_path)
    
    
def append_to_persistent_storage(row):
    jsonl_data = json.dumps(row) + "\n"
    with open(f"{node_properties['storage_dir']}{node_properties['file_name']}", "a") as f:
        f.write(jsonl_data)