Custom Function
Manually process or modify frames and metadata using python code
Overview
Custom Function node lets you run custom python code to process or modify frames, and extract or add metadata to frames.
Inputs & Outputs
- Inputs : 1, Media Format : Raw Video
- Outputs : 1, Media Format: Raw Video
- Output Metadata : See below.
Properties
| Property | Value | 
|---|---|
| name | Function name | 
| code | Python code to execute in the function. | 
For your convenience the following variables containing useful information are reserved and passed to the process_frame function.
Minimal function signature:
def process_frame(frame: VideoFrame, **kwargs) -> bool:
Exhaustive list:
def process_frame(frame: VideoFrame, application_id, application_name, deployment_id, deployment_name, node_id, **kwargs) -> bool:
| Variable | Description | 
|---|---|
| application_id | Application UUID | 
| application_name | Application Name | 
| deployment_id | Deployment UUID | 
| deployment_name | Deployment Name | 
| node_id | The ID (String) that identifies this Function Node in the pipeline. ex: function1 | 
Metadata
Access metadata using the Function Node or using the API, from the snapshot or clip saved downstream of this node.
| Metadata Property | Description | 
|---|---|
| <custom metadata key> | Custom metadata, if added by the code using frame.meta().set_field()(see reference below). | 
Function Template
The Function node can run any Python 3 code that you provide. It uses the lumeopipeline python library to access and process frames, and bundles OpenCV and Numpy along with it so you can use those libraries to manipulate frames.
An example template is listed below (when you create this node in the Pipeline Editor, it comes pre-populated with a template to get you started quickly). A few things to note:
- The process_framemethod is mandatory in the Function.
- Return Truefrom the function to continue processing the frame,Falseto drop the frame from further processing.
- You cannot alter the frame size with this Node currently.
- You can import and use additional libraries in the Function as long as they are installed on the system, or you install them explicitly using lumeopipeline.Utils.install_importmethod.
- You can make API calls in the Function, but must keep in mind that the process_framemethod blocks the rest of the pipeline, so if your function takes too long to execute, it will delay processing downstream.
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations
# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
frame_count = 0
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    # Insert your code here.
    global frame_count
    frame_count = frame_count + 1
    with frame.data() as mat:
        # Here's an example that uses OpenCV to put arbitrary text on the frame
        cv2.putText(mat, "Frame " + str(frame_count), (50,50), cv2.FONT_HERSHEY_DUPLEX, 0.7, (0, 0, 255), 1)
        # Here's an example of how to access the frame level metadata an print on the frame
        meta = frame.meta()
        yidx = 100
        for (key, value) in meta.get_all().items(): 
            cv2.putText(mat, key + " : " + str(value), (50,yidx), cv2.FONT_HERSHEY_DUPLEX, 0.7, (255, 255, 255), 1, cv2.LINE_AA)
            yidx = yidx + 50
        # Add your own metadata to the frame
        meta.set_field("frame_count", frame_count)
        meta.save()
    # Return False to drop this frame, True to continue processing.
    return TrueDebugging and Logging
Head over to the Logs tab under the Deployment to view the logs in real time while the pipeline is running.
You can use print() in the Function node to log data to the Console or add a Display Stream Info Node and set it's Log Metadata property to True.
 
Accessing Metadata
Most nodes in the Pipeline add their own metadata to each frame. This metadata can be accessed using the lumeopipeline library in the Function (reference below). You can find more information about metadata added by specific nodes in each Node's reference.
- AI Model Node and Track Objects Node : Metadata is stored under objectstop level key.
- Other Nodes : Metadata is stored under nodestop level key.
You can also add your own custom metadata to the frame. This metadata will be passed along to any webhooks sent out by subsequent nodes and saved along with clips and snapshots generated by Save Clip Node & Save Snapshot Node .
from lumeopipeline import VideoFrame
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)
        meta = frame.meta()
        if meta is not None:
            print(meta.get_all())
    except Exception as error:
        print(error)
        pass
    return Truefrom lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations
# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
prev_frame_objects = []
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    # Insert your code here.
    global prev_frame_objects
    with frame.data() as mat:
      
        # Grab all the metadata stored with this frame. Includes
        # Object level metadata as well as metadata added by each node.
        meta = frame.meta().get_all()
        
        # Alternatively, you can also do: 
        # objects = frame.meta().get_field("objects")
        # nodes = frame.meta().get_fields("nodes")
        objects = meta['objects']
        nodes = meta['nodes']
        
        # Iterate through all objects
        # See here for properties of each object : 
        # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
        for obj in objects:
          
            # Draw a red bounding box
            obj_rect = obj['rect']
            cv2.rectangle(mat, 
                          (obj_rect['left'], obj_rect['top']), 
                          (obj_rect['left'] + obj_rect['width'], obj_rect['top'] + obj_rect['height']), 
                          (0,0,255), thickness=2)
            
            # Write label. 
            # obj['label'] -- object class
            # obj['id'] -- unique id assigned to this object by Track Objects node, persistent across frames. 
            # obj['attributes'] -- if you use secondary classifiers, this contains additional object attributes.
            label ="{} {} {}".format(obj['id'] if 'id' in obj else 'not-tracked',
                                     obj['label'],
                                     ",".join([obj_attr['label'] for obj_attr in obj['attributes']]))
            Utils.write_label_on_frame(mat, obj_rect['left'], obj_rect['top'], label)
            
    # Return False to drop this frame, True to continue processing.
    return Truefrom lumeopipeline import VideoFrame
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)
        meta = frame.meta()
        if meta is not None:
            # Log metadata added by the all Nodes
            # nodes is a dictionary that contains each node's 
            # metadata. Refer to node specific docs for the property
            # values.
            nodes = meta.get_field("nodes")
            for node_id in nodes:
              	print("{} : {}".format(node_id,nodes[node_id]))
    except Exception as e:
        print(e)
    return Truefrom lumeopipeline import VideoFrame
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)
        meta = frame.meta()
        if meta is not None:
            print(meta.get_all())
            meta.set_field("key", "value")
            meta.save()
    except Exception as error:
        print(error)
        pass
    return Truefrom lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    with frame.data() as mat:
      
        # Grab the object metadata 
        meta = frame.meta()
        objects = meta.get_field("objects")
        
        # Iterate through all objects
        # See here for properties of each object : 
        # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
        probability_threshold = 0.5  # Set your threshold value here
        objects = [obj for obj in objects if obj['probability'] >= probability_threshold]
        meta.set_field("objects", objects)
        meta.save()
    # Return False to drop this frame, True to continue processing.
    return Truelumeopipeline Reference
| method | Description | 
|---|---|
| VideoFrame.video_info() --> VideoInfo | Returns information about the video frame. VideoInfo.height : Frame height VideoInfo.width : Frame width VideoInfo.fps_n : Numerator of Frame rate VideoInfo.fps_d : Denominator of Frame rate | 
| VideoFrame.data() -> numpy.ndarray | Returns a Width x Height x 4 dimensional array containing the frame's pixel data. The array is in the BGRA format. Altering this array will alter the raw video frame output by this node. | 
| VideoFrame.meta() --> Meta | Returns metadata object that contains inserted by Pipeline nodes, as a dictionary. | 
| Meta.get_all() -> Dictionary | Returns all metadata fields (key, value pairs) in the frame. | 
| Meta.get_field(key) -> string | Returns value for a specific field | 
| Meta.set_field(key, value) -> bool | Sets the value for a specific field | 
| Meta.save() -> None | Save updated metadata if it was modified using set_fieldorset_user_field | 
| Meta.get_fields(pattern) -> Generator of tuples (field_name, field_value) | Get all fields whose names match given pattern Field names are in form of dot-separated words. The pattern allows for wildcarding of unneeded words with *. The rightmost star matches all the remaining words.Example patterns: - *.inference.labelskips the first word, it would match-- inference_node_0123.inference.label-- inference_node_5678.inference.label- user.*returns all fields whose names start with "user", it would match-- user.property1-- user.property2-- user.complex.property- *matches everything | 
| VideoFrame.tensors() --> Vec<Tensor> | Returns a list of the output tensors of Inference Node(s). Can be used to apply custom parsing using the raw inference outputs results. Check Custom Model Parser for more details. | 
| Tensor.layers -> Vec<Layer> | Iterate over this field to get the multiple output layers | 
| Layer.name -> string | The output layer name | 
| Layer.dimensions -> Vec<Int> | The shape of the output layer | 
| Layer.data -> raw_buffer | The raw output data of this layer | 
lumeopipeline Utils class
For convenience we have an utility class which contains functions that are used in multiple of our dynamic nodes.
You can access them by adding to the Python import section:  from lumeopipeline import Utils
See "Use Utils" under Examples below for reference.
| method | Description | 
|---|---|
| install_import(package_name=None, attribute_name=None, module_name=None, version=None) | Allows users to install and import additional python modules on devices. Examples: install and import module: pytz = install_import('pytz'), import attribute from module:sleep = install_import('time', 'sleep'), import a module which has a different name from the package it belongs to.box = install_import('python-box', module_name='box'), Install a specific versionjwt = install_import('PyJWT', module_name='jwt', version='1.5.3') | 
| write_label_on_frame(mat, xidx, yidx, label) | Auxiliary function to write text on frame | 
| num_milliseconds_seen(last_seen_frame, first_seen_frame, video_info) | Returns the total of milliseconds passed between two frames | 
| get_timestamp(frame, frame_count) | Returns the timestamp of a certain frame, based on the current frame_count and video FPS | 
| send_webhook(webhook_url, body, compression=True) | Sends a webhook to a specific URL. The compression is enabled by default, you should pass Falseto the 3rd argument of the method to disable it. | 
Examples
Basics
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations
frames_to_skip = 5700
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    global frames_to_skip
    if frames_to_skip > 0:
        frames_to_skip = frames_to_skip - 1
        return False
               
    return Truefrom lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    # Insert your code here.
    try:
        meta = frame.meta()
        # All Metadata in this frame
        print(meta.get_all())
        # Metadata about any objects in this frame from Model Inference node
        print(meta.get_field("objects"))
    except Exception as error:
        print(error)
        pass
            
    # Return False to drop this frame, True to continue processing.
    return Truefrom lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        meta = frame.meta()
        if meta is not None:
            yidx = 100
            with frame.data() as mat:
              for (key, value) in meta.get_all().items(): 
                  if value is not None:
                      cv2.putText(mat, key + " : " + str(value), (50,yidx), cv2.FONT_HERSHEY_DUPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                      yidx = yidx + 25
    except Exception as error:
        print(error, flush=True)
        pass
    return Truefrom lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations
# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
frame_count = 0
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    global frame_count
    frame_count = frame_count + 1
    try:
        meta = frame.meta()
        if meta is not None:
          # Set user metadata that will be saved along with a
          # snapshot or clip. (Access it using the Lumeo Files API)
          meta.set_field("frame_count", str(frame_count))
    except Exception as error:
        print(error, flush=True)
        pass
          
    # Return False to drop this frame, True to continue processing.
		return Truefrom lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
# Installs and import additional python modules on device, 
# or just imports it if already installed
#
# Usage:
# - install and import module:
#   pytz_ref = install_import('pytz')
#
# - import attribute from module:
#   sleep = install_import('time', 'sleep')
#
# - import a module which has a different name from the package it belongs
#   box = install_import('python-box', module_name='box')
#
# - install a specific version
#   jwt = install_import('PyJWT', module_name='jwt', version='1.5.3')
pytz = Utils.install_import('pytz')
from pytz import reference as pytz_ref
frame_count = 0
# Set to False if the Webhook endpoint does not support compression
compression = True
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    global frame_count
    global compression
    frame_count = frame_count + 1
    with frame.data() as mat:
      Utils.write_label_on_frame(mat, 10, 10, 'Example of lumeopipeline Utils')
      print("Number of milliseconds: " + str(Utils.num_milliseconds_seen(frame_count, 0, frame.video_info())))
      print("Frame timestamp: " + str(Utils.get_timestamp(frame, frame_count)))
      try:
        # This will be converted to JSON internaly by the send_webhook function
        body = {'name': "Example", 'city': "New York"}
        Utils.send_webhook('http://127.0.0.1', body, compression)
      except Exception as e:
        print(e)
    # Return False to drop this frame, True to continue processing.
    return Truefrom lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations
frame_count = 0
total_objects_passed = 0
objects_crossed_per = 0
prev_timestamp = 0
def process_frame(frame: VideoFrame, **kwargs) -> bool:
  global frame_count
  global total_objects_passed
  global objects_crossed_per
  global prev_timestamp
  try:
    
    with frame.data() as mat:
      
      # Get Current Frame's relative timestamp (in seconds) since 
      # start of stream
      cur_timestamp = Utils.get_timestamp(frame, frame_count)
      frame_count += 1
      
      # Grab metadata from previous nodes in this pipeline
      meta = frame.meta()
      nodes = meta.get_field('nodes')
  
  		# Iterate through line counter nodes and add up 
      # objects that have crossed any line 
      for node_id in nodes:
        if nodes[node_id]['type'] == 'annotate_line_counter':
          for line_id in nodes[node_id]['lines']:
            total_objects_passed += nodes[node_id]['lines'][line_id]['total_objects_crossed_delta']
      # Every 2 seconds, compute the throughput (objects crossed) per minute
      # Then reset the counts
      if (cur_timestamp - prev_timestamp > 2):
        objects_crossed_per = total_objects_passed * 30
        total_objects_passed = 0
        prev_timestamp = cur_timestamp
  
  		# Display values on the frame
      Utils.write_label_on_frame(mat, 50,50, "Throughput : {} objects/minute".format(objects_crossed_per))
      meta.set_field('objects_crossed_per', objects_crossed_per)
      meta.save()
  except Exception as e: 
    print(e)
    pass
  # Return False to drop this frame, True to continue processing.
  return TrueImage and Media Manipulation
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations
from urllib.request import urlopen
print("module name:  ", __name__)
print("watermark.py : opencv ver : ", cv2.__version__)
watermarkurl = "https://s.gravatar.com/avatar/f8066a0ab6f5ffa7d121aa210d988980?s=80"
watermarkimage = numpy.asarray(bytearray(urlopen(watermarkurl).read()), dtype="uint8")
watermark = cv2.imdecode(watermarkimage, cv2.IMREAD_UNCHANGED)
if watermark.shape[2] != 4:
    watermark = cv2.cvtColor(watermark,cv2.COLOR_BGR2BGRA)
#watermark = cv2.imread("watermark.png", cv2.IMREAD_UNCHANGED)
watermark_overlay = None
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    global watermark
    global watermark_overlay
    with frame.data() as mat:
        if watermark_overlay is None:
            (wH, wW) = watermark.shape[:2]
            width = frame.video_info().width
            height = frame.video_info().height
            watermark_overlay = numpy.zeros((height,width,4), dtype="uint8")
            watermark_overlay[10:10+wH, 10:10+wW] = watermark
        cv2.addWeighted(watermark_overlay, 1.0, mat, 1.0, 0, mat)
        #cv2.putText(mat, "LUMEO", (50,50), cv2.FONT_HERSHEY_DUPLEX, 1, (0, 0, 255), 2)
    return Truefrom lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations
# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    # Insert your code here.
    with frame.data() as mat:
      
        # Grab all the metadata stored with this frame. Includes
        # Object level metadata as well as metadata added by each node.
        meta = frame.meta().get_all()
        
        # Alternatively, you can also do: 
        # objects = frame.meta().get_field("objects")
        # nodes = frame.meta().get_fields("nodes")
        objects = meta['objects']
        nodes = meta['nodes']
        
        # Iterate through all objects
        # See here for properties of each object : 
        # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
        for obj in objects:
          
            # Draw a red bounding box
            obj_rect = obj['rect']
            cv2.rectangle(mat, 
                          (obj_rect['left'], obj_rect['top']), 
                          (obj_rect['left'] + obj_rect['width'], obj_rect['top'] + obj_rect['height']), 
                          (0,0,255), thickness=2)
            
            # Write label. 
            # obj['label'] -- object class
            # obj['id'] -- unique id assigned to this object by Track Objects node, persistent across frames. 
            # obj['attributes'] -- if you use secondary classifiers, this contains additional object attributes.
            label ="{} {} {}".format(obj['id'] if 'id' in obj else 'not-tracked',
                                     obj['label'],
                                     ",".join([obj_attr['label'] for obj_attr in obj['attributes']]))
            Utils.write_label_on_frame(mat, obj_rect['left'], obj_rect['top'], label)
            
    # Return False to drop this frame, True to continue processing.
    return Truefrom lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import base64                         # Base64 encoding for image data
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    with frame.data() as mat:
                
        # Downscale and get entire frame as base64 encoded string
        frame_base64 = get_image_as_base64(mat, max_dim=640)
        # do something with the frame_base64
        
        # Get all objects as base64 encoded strings
        meta = frame.meta().get_all()
        objects = meta['objects']
        
        for obj in objects:
            obj_rect = obj['rect']
            obj_base64 = get_image_as_base64(mat, max_dim=160, roi_coords=obj_rect)
            # do something with the obj_base64
        
        
    return True
def downscale_image(image, max_dim=640):
    
    largest_dim = max(image.shape[0], image.shape[1])
    scale_factor = 1
    if largest_dim > max_dim:
        scale_factor = max_dim / largest_dim
        
    if scale_factor != 1:
        scaled_image = cv2.resize(image, 
                                  (int(image.shape[1]*scale_factor), int(image.shape[0]*scale_factor)), 
                                  interpolation = cv2.INTER_CUBIC)
        return scaled_image
    return image
def get_image_as_base64(image, max_dim, roi_coords=None):
    # Crop to ROI
    if roi_coords:
        img_cropped = image[max(roi_coords['top'],0):min(roi_coords['top']+roi_coords['height'],image.shape[0]), 
                            max(roi_coords['left'],0):min(roi_coords['left']+roi_coords['width'],image.shape[1])]
    else:
        img_cropped = image    
        
    # Downscale image if it is too large
    img_cropped = downscale_image(img_cropped, max_dim=max_dim)
    # Encode to JPEG
    img_encoded = cv2.imencode('.jpg', img_cropped, [int(cv2.IMWRITE_JPEG_QUALITY), 90])[1]
    return base64.b64encode(img_encoded).decode('utf-8')Access and Manipulate Metadata
from lumeopipeline import VideoFrame
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)
        meta = frame.meta()
        if meta is not None:
            print(meta.get_all())
    except Exception as error:
        print(error)
        pass
    return Truefrom lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations
# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
prev_frame_objects = []
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    # Insert your code here.
    global prev_frame_objects
    with frame.data() as mat:
      
        # Grab all the metadata stored with this frame. Includes
        # Object level metadata as well as metadata added by each node.
        meta = frame.meta().get_all()
        
        # Alternatively, you can also do: 
        # objects = frame.meta().get_field("objects")
        # nodes = frame.meta().get_fields("nodes")
        objects = meta['objects']
        nodes = meta['nodes']
        
        # Iterate through all objects
        # See here for properties of each object : 
        # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
        for obj in objects:
          
            # Draw a red bounding box
            obj_rect = obj['rect']
            cv2.rectangle(mat, 
                          (obj_rect['left'], obj_rect['top']), 
                          (obj_rect['left'] + obj_rect['width'], obj_rect['top'] + obj_rect['height']), 
                          (0,0,255), thickness=2)
            
            # Write label. 
            # obj['label'] -- object class
            # obj['id'] -- unique id assigned to this object by Track Objects node, persistent across frames. 
            # obj['attributes'] -- if you use secondary classifiers, this contains additional object attributes.
            label ="{} {} {}".format(obj['id'] if 'id' in obj else 'not-tracked',
                                     obj['label'],
                                     ",".join([obj_attr['label'] for obj_attr in obj['attributes']]))
            Utils.write_label_on_frame(mat, obj_rect['left'], obj_rect['top'], label)
            
    # Return False to drop this frame, True to continue processing.
    return Truefrom lumeopipeline import VideoFrame
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)
        meta = frame.meta()
        if meta is not None:
            # Log metadata added by the all Nodes
            # nodes is a dictionary that contains each node's 
            # metadata. Refer to node specific docs for the property
            # values.
            nodes = meta.get_field("nodes")
            for node_id in nodes:
              	print("{} : {}".format(node_id,nodes[node_id]))
    except Exception as e:
        print(e)
    return Truefrom lumeopipeline import VideoFrame
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)
        meta = frame.meta()
        if meta is not None:
            print(meta.get_all())
            meta.set_field("key", "value")
            meta.save()
    except Exception as error:
        print(error)
        pass
    return Truefrom lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    with frame.data() as mat:
      
        # Grab the object metadata 
        meta = frame.meta()
        objects = meta.get_field("objects")
        
        # Iterate through all objects
        # See here for properties of each object : 
        # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
        probability_threshold = 0.5  # Set your threshold value here
        objects = [obj for obj in objects if obj['probability'] >= probability_threshold]
        meta.set_field("objects", objects)
        meta.save()
    # Return False to drop this frame, True to continue processing.
    return TrueObject Tracking
from lumeopipeline import VideoFrame, Utils
import cv2
import numpy as np
# Node-specific variables
node = {
    'tracked_objects': {},
    'max_frames_missing': 600
}
def IoU(box1, box2):
    """Calculate Intersection over Union (IoU) between two bounding boxes."""
    x1, y1, x2, y2 = box1
    x1_p, y1_p, x2_p, y2_p = box2
    inter_x1 = max(x1, x1_p)
    inter_y1 = max(y1, y1_p)
    inter_x2 = min(x2, x2_p)
    inter_y2 = min(y2, y2_p)
    inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)
    box1_area = (x2 - x1) * (y2 - y1)
    box2_area = (x2_p - x1_p) * (y2_p - y1_p)
    iou = inter_area / float(box1_area + box2_area - inter_area)
    return iou
def track_objects(node, frame, detected_objects):
    new_tracked_objects = {}
    for obj in detected_objects:
        bbox = obj['rect']
        box = [bbox['left'], bbox['top'], bbox['left'] + bbox['width'], bbox['top'] + bbox['height']]
        best_iou = 0
        best_id = None
        for obj_id, tracked in node['tracked_objects'].items():
            iou = IoU(box, tracked['rect'])
            if iou > best_iou:
                best_iou = iou
                best_id = obj_id
        if best_iou > 0.3:  # IoU threshold to match objects
            new_tracked_objects[best_id] = {
                'rect': box,
                'label': obj['label'],
                'probability': obj['probability'],
                'frames_missing': 0
            }
        else:
            new_id = len(node['tracked_objects']) + len(new_tracked_objects)
            new_tracked_objects[new_id] = {
                'rect': box,
                'label': obj['label'],
                'probability': obj['probability'],
                'frames_missing': 0
            }
    for obj_id, tracked in node['tracked_objects'].items():
        if obj_id not in new_tracked_objects:
            tracked['frames_missing'] += 1
            if tracked['frames_missing'] < node['max_frames_missing']:
                new_tracked_objects[obj_id] = tracked
    node['tracked_objects'] = new_tracked_objects
    for obj_id, tracked in node['tracked_objects'].items():
        for obj in detected_objects:
            bbox = obj['rect']
            box = [bbox['left'], bbox['top'], bbox['left'] + bbox['width'], bbox['top'] + bbox['height']]
            if tracked['rect'] == box:
                obj['id'] = obj_id
    save_metadata(frame, detected_objects)
def save_metadata(frame, detected_objects):
    try:
        meta = frame.meta()
        meta.set_field("objects", detected_objects)
        meta.save()
    except Exception as error:
        print(error)
        pass
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    with frame.data() as mat:
        meta = frame.meta()
        objects = meta.get_field("objects")
        track_objects(node, frame, objects)
    return True
Integrations
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import concurrent.futures
frame_count = 0
last_webhook_frame_count = 0
# Use a background thread to send webhooks so that it doesn't
# slow down the video processing pipeline
background_thread = concurrent.futures.ThreadPoolExecutor()
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    global frame_count
    global last_webhook_frame_count
    global background_thread
    send_webhook = False
    frame_count = frame_count + 1
    
    # Grab all the metadata stored with this frame. Includes
    # Object level metadata as well as metadata added by each node.
    meta = frame.meta().get_all()
    objects = meta['objects']
    nodes = meta['nodes']
    # Iterate through all objects to see if we detect a person
    # See here for properties of each object : 
    # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
    # Assumes you have a AI Model node before this Custom function node, configured
    # with a Person Detection Model.
    for obj in objects:
        if obj['label'] == 'person':
            send_webhook = True
    # Send a webhook if a person was detected, but no frequently 
    # than once every 5 seconds (~50 frames at 10 FPS) to avoid spamming the server multiple times 
    # a second.
    if send_webhook and \
       (last_webhook_frame_count == 0 or last_webhook_frame_count - frame_count > 50):
        
        try:
            # The body will be converted to JSON by the send_webhook function
            body = {'event_type': "PersonDetected"}
            background_thread.submit(send_webhook, body=body) 
        except Exception as e:
            print(e)
        last_webhook_frame_count = frame_count
    return True
def send_webhook(body):
    server_address = 'http://127.0.0.1'
    gzip_compression = False
    try:
        # This method sends the webhook with a application/json content type
        Utils.send_webhook(server_address, body, gzip_compression)
    except Exception as e: 
        print("Send webhook error : {}".format(e))from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import concurrent.futures
frame_count = 0
last_webhook_frame_count = 0
# Use a background thread to send webhooks so that it doesn't
# slow down the video processing pipeline
background_thread = concurrent.futures.ThreadPoolExecutor()
def process_frame(frame: VideoFrame, **kwargs) -> bool:
    global frame_count
    global last_webhook_frame_count
    global background_thread
    send_webhook = False
    frame_count = frame_count + 1
    
    # Grab all the metadata stored with this frame. Includes
    # Object level metadata as well as metadata added by each node.
    meta = frame.meta().get_all()
    objects = meta['objects']
    nodes = meta['nodes']
    # Look through node properties and detect when a person has crossed a line.
    # Assumes there is a Line counter node before this custom function.
    # Metadata format: https://docs.lumeo.com/docs/line-crossing-counter-node
    if 'annotate_line_counter1' in nodes and \
       nodes['annotate_line_counter1']['lines']['line1']['total_objects_crossed_delta'] > 0:
        send_webhook = True
    # Send a webhook if a person was detected, but no frequently 
    # than once every 5 seconds (~50 frames at 10 FPS) to avoid spamming the server multiple times 
    # a second.
    if send_webhook and \
       (last_webhook_frame_count == 0 or last_webhook_frame_count - frame_count > 50):
        
        try:
            # The body will be converted to JSON by the send_webhook function
            body = {'event_type': "PersonDetected"}
            background_thread.submit(send_webhook, body=body) 
        except Exception as e:
            print(e)
        last_webhook_frame_count = frame_count
    return True
def send_webhook(body):
    server_address = 'http://127.0.0.1'
    gzip_compression = False
    try:
        # This method sends the webhook with a application/json content type
        Utils.send_webhook(server_address, body, gzip_compression)
    except Exception as e: 
        print("Send webhook error : {}".format(e))from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import os
import json
import concurrent.futures
from datetime import datetime
# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
node_properties = {
    "background_thread": concurrent.futures.ThreadPoolExecutor(),
    'storage_dir': '/var/lib/lumeo/userdata/',
    'file_name': 'mydata.jsonl',
    'prev_objects': [],
    'initialized': False
}
def process_frame(frame: VideoFrame, **kwargs) -> bool:
      
    object_properties = {}
      
    if not node_properties['initialized']:
        init_persistent_storage()
        node_properties['initialized'] = True
      
    # Grab all the object metadata stored with this frame.
    frame_meta = frame.meta()
    meta = frame_meta.get_all()
    # Iterate over all the objects in the frame and count any new objects.
    objects = meta['objects']
    for obj in objects:
        if obj.get('id', -1) not in node_properties['prev_objects']:
            object_properties[obj['label']] = object_properties.get(obj['label'], 0) + 1
                
    # Append the object properties to the persistent storage
    # TODO: Clean up persistent storage if it gets too large.
    # Note: Persistent storage file is within a docker container and will be lost if the container is deleted.
    if any(value > 0 for value in object_properties.values()):
        object_properties['timestamp'] = datetime.utcnow().isoformat() + "Z"
        node_properties['background_thread'].submit(append_to_persistent_storage, object_properties)
    
    # Update the list of objects in the frame.
    node_properties['prev_objects'] = [obj.get('id', -1) for obj in objects]
    
    return True
    
    
def init_persistent_storage(reset=True):    
    os.makedirs(f"{node_properties['storage_dir']}", exist_ok=True)
    if reset:
        file_path = f"{node_properties['storage_dir']}{node_properties['file_name']}"
        if os.path.exists(file_path):
            os.remove(file_path)
    
    
def append_to_persistent_storage(row):
    jsonl_data = json.dumps(row) + "\n"
    with open(f"{node_properties['storage_dir']}{node_properties['file_name']}", "a") as f:
        f.write(jsonl_data)
    Updated 8 days ago
