Custom Function Node

Run custom code to process or modify frames, and extract or add metadata to frames.

Inputs & Outputs

  • Inputs : 1, Media Format : Raw Video
  • Outputs : 1, Media Format: Raw Video
  • Output Metadata : See below.

Properties

Property

Value

name

Function name

code

Python code to execute in the function.

Metadata

Access metadata using the Function Node or using the API, from the snapshot or clip saved downstream of this node.

Metadata Property

Description

<custom metadata key>

Custom metadata, if added by the code using frame.meta().set_field() (see reference below).

Function Template

The Function node can run any Python 3 code that you provide. It uses the lumeopipeline python library to access and process frames, and bundles OpenCV and Numpy along with it so you can use those libraries to manipulate frames.

An example template is listed below (when you create this node in the Pipeline Editor, it comes pre-populated with a template to get you started quickly). A few things to note:

  • The process_frame method is mandatory in the Function.
  • Return True from the function to continue processing the frame, False to drop the frame from further processing.
  • You cannot alter the frame size with this Node currently.
  • You can import and use additional libraries in the Function as long as they are installed on the system, or you install them explicitly using lumeopipeline.Utils.install_import method.
  • You can make API calls in the Function, but must keep in mind that the process_frame method blocks the rest of the pipeline, so if your function takes too long to execute, it will delay processing downstream.
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
frame_count = 0

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    # Insert your code here.
    global frame_count
    frame_count = frame_count + 1

    with frame.data() as mat:

        # Here's an example that uses OpenCV to put arbitrary text on the frame
        cv2.putText(mat, "Frame " + str(frame_count), (50,50), cv2.FONT_HERSHEY_DUPLEX, 0.7, (0, 0, 255), 1)

        # Here's an example of how to access the frame level metadata an print on the frame
        meta = frame.meta()
        yidx = 100
        for (key, value) in meta.get_all().items(): 
            cv2.putText(mat, key + " : " + str(value), (50,yidx), cv2.FONT_HERSHEY_DUPLEX, 0.7, (255, 255, 255), 1, cv2.LINE_AA)
            yidx = yidx + 50

        # Add your own metadata to the frame
        meta.set_field("frame_count", frame_count)
        meta.save()


    # Return False to drop this frame, True to continue processing.
    return True

Debugging and Logging

Head over to the Logs tab under the Deployment to view the logs in real time while the pipeline is running.

You can use print() in the Function node to log data to the Console or add a Display Stream Info Node and set it's Log Metadata property to True.

32463246

Accessing Metadata

Most nodes in the Pipeline add their own metadata to each frame. This metadata can be accessed using the lumeopipeline library in the Function (reference below). You can find more information about metadata added by specific nodes in each Node's reference.

You can also add your own custom metadata to the frame. This metadata will be passed along to any webhooks sent out by subsequent nodes and saved along with clips and snapshots generated by Save Clip Node & Save Snapshot Node .

from lumeopipeline import VideoFrame

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)

        meta = frame.meta()
        if meta is not None:
            print(meta.get_all())

    except Exception as error:
        print(error)
        pass

    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
prev_frame_objects = []

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    # Insert your code here.
    global prev_frame_objects

    with frame.data() as mat:
      
        # Grab all the metadata stored with this frame. Includes
        # Object level metadata as well as metadata added by each node.
        meta = frame.meta().get_all()
        
        # Alternatively, you can also do: 
        # objects = frame.meta().get_field("objects")
        # nodes = frame.meta().get_fields("nodes")
        objects = meta['objects']
        nodes = meta['nodes']
        
        # Iterate through all objects
        # See here for properties of each object : 
        # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
        for obj in objects:
          
            # Draw a red bounding box
            obj_rect = obj['rect']
            cv2.rectangle(mat, 
                          (obj_rect['left'], obj_rect['top']), 
                          (obj_rect['left'] + obj_rect['width'], obj_rect['top'] + obj_rect['height']), 
                          (0,0,255), thickness=2)
            
            # Write label. 
            # obj['label'] -- object class
            # obj['id'] -- unique id assigned to this object by Track Objects node, persistent across frames. 
            # obj['attributes'] -- if you use secondary classifiers, this contains additional object attributes.
            label ="{} {} {}".format(obj['id'] if 'id' in obj else 'not-tracked',
                                     obj['label'],
                                     ",".join([obj_attr['label'] for obj_attr in obj['attributes']]))
            Utils.write_label_on_frame(mat, obj_rect['left'], obj_rect['top'], label)
            


    # Return False to drop this frame, True to continue processing.
    return True
from lumeopipeline import VideoFrame

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)

        meta = frame.meta()
        if meta is not None:
            # Log metadata added by the all Nodes
            # nodes is a dictionary that contains each node's 
            # metadata. Refer to node specific docs for the property
            # values.
            nodes = meta.get_field("nodes")
            for node_id in nodes:
                print("{} : {}".format(node_id,nodes[node_id]))

    except Exception as e:
        print(e)

    return True
from lumeopipeline import VideoFrame

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)

        meta = frame.meta()
        if meta is not None:
            print(meta.get_all())
            meta.set_field("key", "value")
            meta.save()

    except Exception as error:
        print(error)
        pass

    return True

lumeopipeline Reference

method

Description

VideoFrame.video_info() --> VideoInfo

Returns information about the video frame.

VideoInfo.height : Frame height
VideoInfo.width : Frame width
VideoInfo.fps_n : Numerator of Frame rate
VideoInfo.fps_d : Denominator of Frame rate

VideoFrame.data() -> numpy.ndarray

Returns a Width x Height x 4 dimensional array containing the frame's pixel data. The array is in the BGRA format.

Altering this array will alter the raw video frame output by this node.

VideoFrame.meta() --> Meta

Returns metadata object that contains inserted by Pipeline nodes, as a dictionary.

Meta.get_all() -> Dictionary

Returns all metadata fields (key, value pairs) in the frame.

Meta.get_field(key) -> string

Returns value for a specific field

Meta.set_field(key, value) -> bool

Sets the value for a specific field

Meta.save() -> None

Save updated metadata if it was modified using set_field or set_user_field

Meta.get_fields(pattern) -> Generator of tuples (field_name, field_value)

Get all fields whose names match given pattern

Field names are in form of dot-separated words. The pattern allows for wildcarding of unneeded words with *. The rightmost star matches all the remaining words.

Example patterns:

  • *.inference.label skips the first word, it would match
    -- inference_node_0123.inference.label
    -- inference_node_5678.inference.label

  • user.* returns all fields whose names start with "user", it would match
    -- user.property1
    -- user.property2
    -- user.complex.property

  • * matches everything

lumeopipeline Utils class

For convenience we have an utility class which contains functions that are used in multiple of our dynamic nodes.

You can access them by adding to the Python import section: from lumeopipeline import Utils

See "Use Utils" under Examples below for reference.

method

Description

install_import(package_name=None, attribute_name=None, module_name=None, version=None)

Allows users to install and import additional python modules on devices.

Examples:
install and import module:

pytz = install_import('pytz')

import attribute from module:

sleep = install_import('time', 'sleep')

import a module which has a different name from the package it belongs to.

box = install_import('python-box', module_name='box')

Install a specific version

jwt = install_import('PyJWT', module_name='jwt', version='1.5.3')

write_label_on_frame(mat, xidx, yidx, label)

Auxiliary function to write text on frame

num_milliseconds_seen(last_seen_frame, first_seen_frame, video_info)

Returns the total of milliseconds passed between two frames

get_timestamp(frame, frame_count)

Returns the timestamp of a certain frame, based on the current frame_count and video FPS

send_webhook(webhook_url, body, compression=True)

Sends a webhook to a specific URL.
The compression is enabled by default, you should pass False to the 3rd argument of the method to disable it.

Examples

Basics

from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


frames_to_skip = 5700

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    global frames_to_skip

    if frames_to_skip > 0:
        frames_to_skip = frames_to_skip - 1
        return False
               
    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


# Global variables that persist across frames go here.
# Onetime initialization code can also live here.

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    # Insert your code here.

    with frame.data() as mat:
      
        # Grab all the metadata stored with this frame. Includes
        # Object level metadata as well as metadata added by each node.
        meta = frame.meta().get_all()
        
        # Alternatively, you can also do: 
        # objects = frame.meta().get_field("objects")
        # nodes = frame.meta().get_fields("nodes")
        objects = meta['objects']
        nodes = meta['nodes']
        
        # Iterate through all objects
        # See here for properties of each object : 
        # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
        for obj in objects:
          
            # Draw a red bounding box
            obj_rect = obj['rect']
            cv2.rectangle(mat, 
                          (obj_rect['left'], obj_rect['top']), 
                          (obj_rect['left'] + obj_rect['width'], obj_rect['top'] + obj_rect['height']), 
                          (0,0,255), thickness=2)
            
            # Write label. 
            # obj['label'] -- object class
            # obj['id'] -- unique id assigned to this object by Track Objects node, persistent across frames. 
            # obj['attributes'] -- if you use secondary classifiers, this contains additional object attributes.
            label ="{} {} {}".format(obj['id'] if 'id' in obj else 'not-tracked',
                                     obj['label'],
                                     ",".join([obj_attr['label'] for obj_attr in obj['attributes']]))
            Utils.write_label_on_frame(mat, obj_rect['left'], obj_rect['top'], label)
            


    # Return False to drop this frame, True to continue processing.
    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


def process_frame(frame: VideoFrame, **kwargs) -> bool:

    # Insert your code here.
    try:
        meta = frame.meta()

        # All Metadata in this frame
        print(meta.get_all())

        # Metadata about any objects in this frame from Model Inference node
        print(meta.get_field("objects"))

    except Exception as error:
        print(error)
        pass
            
    # Return False to drop this frame, True to continue processing.
    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations
from urllib.request import urlopen

print("module name:  ", __name__)
print("watermark.py : opencv ver : ", cv2.__version__)

watermarkurl = "https://s.gravatar.com/avatar/f8066a0ab6f5ffa7d121aa210d988980?s=80"
watermarkimage = numpy.asarray(bytearray(urlopen(watermarkurl).read()), dtype="uint8")
watermark = cv2.imdecode(watermarkimage, cv2.IMREAD_UNCHANGED)
if watermark.shape[2] != 4:
    watermark = cv2.cvtColor(watermark,cv2.COLOR_BGR2BGRA)
#watermark = cv2.imread("watermark.png", cv2.IMREAD_UNCHANGED)
watermark_overlay = None

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    global watermark
    global watermark_overlay

    with frame.data() as mat:
        if watermark_overlay is None:
            (wH, wW) = watermark.shape[:2]
            width = frame.video_info().width
            height = frame.video_info().height
            watermark_overlay = numpy.zeros((height,width,4), dtype="uint8")
            watermark_overlay[10:10+wH, 10:10+wW] = watermark

        cv2.addWeighted(watermark_overlay, 1.0, mat, 1.0, 0, mat)
        #cv2.putText(mat, "LUMEO", (50,50), cv2.FONT_HERSHEY_DUPLEX, 1, (0, 0, 255), 2)

    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    try:
        meta = frame.meta()

        if meta is not None:
            yidx = 100
            with frame.data() as mat:
              for (key, value) in meta.get_all().items(): 
                  if value is not None:
                      cv2.putText(mat, key + " : " + str(value), (50,yidx), cv2.FONT_HERSHEY_DUPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                      yidx = yidx + 25


    except Exception as error:
        print(error, flush=True)
        pass

    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
frame_count = 0

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    global frame_count
    frame_count = frame_count + 1

    try:
        meta = frame.meta()
        if meta is not None:
          # Set user metadata that will be saved along with a
          # snapshot or clip. (Access it using the Lumeo Files API)
          meta.set_field("frame_count", str(frame_count))
    except Exception as error:
        print(error, flush=True)
        pass
          
    # Return False to drop this frame, True to continue processing.
        return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils

# Installs and import additional python modules on device, 
# or just imports it if already installed
#
# Usage:
# - install and import module:
#   pytz_ref = install_import('pytz')
#
# - import attribute from module:
#   sleep = install_import('time', 'sleep')
#
# - import a module which has a different name from the package it belongs
#   box = install_import('python-box', module_name='box')
#
# - install a specific version
#   jwt = install_import('PyJWT', module_name='jwt', version='1.5.3')

pytz = Utils.install_import('pytz')
from pytz import reference as pytz_ref

frame_count = 0

# Set to False if the Webhook endpoint does not support compression
compression = True

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    global frame_count
    global compression
    frame_count = frame_count + 1

    with frame.data() as mat:
      Utils.write_label_on_frame(mat, 10, 10, 'Example of lumeopipeline Utils')

      print("Number of milliseconds: " + str(Utils.num_milliseconds_seen(frame_count, 0, frame.video_info())))
      print("Frame timestamp: " + str(Utils.get_timestamp(frame, frame_count)))

      try:
        # This will be converted to JSON internaly by the send_webhook function
        body = {'name': "Example", 'city': "New York"}
        Utils.send_webhook('http://127.0.0.1', body, compression)
      except Exception as e:
        print(e)

    # Return False to drop this frame, True to continue processing.
    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations

frame_count = 0
total_objects_passed = 0
objects_crossed_per = 0
prev_timestamp = 0

def process_frame(frame: VideoFrame, **kwargs) -> bool:

  global frame_count
  global total_objects_passed
  global objects_crossed_per
  global prev_timestamp

  try:
    
    with frame.data() as mat:
      
      # Get Current Frame's relative timestamp (in seconds) since 
      # start of stream
      cur_timestamp = Utils.get_timestamp(frame, frame_count)
      frame_count += 1
      
      # Grab metadata from previous nodes in this pipeline
      meta = frame.meta()
      nodes = meta.get_field('nodes')
  
        # Iterate through line counter nodes and add up 
      # objects that have crossed any line 
      for node_id in nodes:
        if nodes[node_id]['type'] == 'annotate_line_counter':
          for line_id in nodes[node_id]['lines']:
            total_objects_passed += nodes[node_id]['lines'][line_id]['total_objects_crossed_delta']

      # Every 2 seconds, compute the throughput (objects crossed) per minute
      # Then reset the counts
      if (cur_timestamp - prev_timestamp > 2):
        objects_crossed_per = total_objects_passed * 30
        total_objects_passed = 0
        prev_timestamp = cur_timestamp
  
        # Display values on the frame
      Utils.write_label_on_frame(mat, 50,50, "Throughput : {} objects/minute".format(objects_crossed_per))
      meta.set_field('objects_crossed_per', objects_crossed_per)
      meta.save()

  except Exception as e: 
    print(e)
    pass

  # Return False to drop this frame, True to continue processing.
  return True

Access and Manipulate Metadata

from lumeopipeline import VideoFrame

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)

        meta = frame.meta()
        if meta is not None:
            print(meta.get_all())

    except Exception as error:
        print(error)
        pass

    return True
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2                            # OpenCV for image manipulations
import numpy                          # numpy for image manipulations


# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
prev_frame_objects = []

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    # Insert your code here.
    global prev_frame_objects

    with frame.data() as mat:
      
        # Grab all the metadata stored with this frame. Includes
        # Object level metadata as well as metadata added by each node.
        meta = frame.meta().get_all()
        
        # Alternatively, you can also do: 
        # objects = frame.meta().get_field("objects")
        # nodes = frame.meta().get_fields("nodes")
        objects = meta['objects']
        nodes = meta['nodes']
        
        # Iterate through all objects
        # See here for properties of each object : 
        # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
        for obj in objects:
          
            # Draw a red bounding box
            obj_rect = obj['rect']
            cv2.rectangle(mat, 
                          (obj_rect['left'], obj_rect['top']), 
                          (obj_rect['left'] + obj_rect['width'], obj_rect['top'] + obj_rect['height']), 
                          (0,0,255), thickness=2)
            
            # Write label. 
            # obj['label'] -- object class
            # obj['id'] -- unique id assigned to this object by Track Objects node, persistent across frames. 
            # obj['attributes'] -- if you use secondary classifiers, this contains additional object attributes.
            label ="{} {} {}".format(obj['id'] if 'id' in obj else 'not-tracked',
                                     obj['label'],
                                     ",".join([obj_attr['label'] for obj_attr in obj['attributes']]))
            Utils.write_label_on_frame(mat, obj_rect['left'], obj_rect['top'], label)
            


    # Return False to drop this frame, True to continue processing.
    return True
from lumeopipeline import VideoFrame

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)

        meta = frame.meta()
        if meta is not None:
            # Log metadata added by the all Nodes
            # nodes is a dictionary that contains each node's 
            # metadata. Refer to node specific docs for the property
            # values.
            nodes = meta.get_field("nodes")
            for node_id in nodes:
                print("{} : {}".format(node_id,nodes[node_id]))

    except Exception as e:
        print(e)

    return True
from lumeopipeline import VideoFrame

def process_frame(frame: VideoFrame, **kwargs) -> bool:
    try:
        video_info = frame.video_info()
        print(video_info.fps_d, video_info.fps_n)

        meta = frame.meta()
        if meta is not None:
            print(meta.get_all())
            meta.set_field("key", "value")
            meta.save()

    except Exception as error:
        print(error)
        pass

    return True

Webhooks

from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import concurrent.futures

frame_count = 0
last_webhook_frame_count = 0

# Use a background thread to send webhooks so that it doesn't
# slow down the video processing pipeline
background_thread = concurrent.futures.ThreadPoolExecutor()

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    global frame_count
    global last_webhook_frame_count
    global background_thread

    send_webhook = False
    frame_count = frame_count + 1
    
    # Grab all the metadata stored with this frame. Includes
    # Object level metadata as well as metadata added by each node.
    meta = frame.meta().get_all()
    objects = meta['objects']
    nodes = meta['nodes']

    # Iterate through all objects to see if we detect a person
    # See here for properties of each object : 
    # https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
    # Assumes you have a AI Model node before this Custom function node, configured
    # with a Person Detection Model.
    for obj in objects:
        if obj['label'] == 'person':
            send_webhook = True

    # Send a webhook if a person was detected, but no frequently 
    # than once every 5 seconds (~50 frames at 10 FPS) to avoid spamming the server multiple times 
    # a second.
    if send_webhook and \
       (last_webhook_frame_count == 0 or last_webhook_frame_count - frame_count > 50):
        
        try:
            # The body will be converted to JSON by the send_webhook function
            body = {'event_type': "PersonDetected"}
            background_thread.submit(send_webhook, body=body) 
        except Exception as e:
            print(e)

        last_webhook_frame_count = frame_count

    return True


def send_webhook(body):
    server_address = 'http://127.0.0.1'
    gzip_compression = False
    try:
        # This method sends the webhook with a application/json content type
        Utils.send_webhook(server_address, body, gzip_compression)
    except Exception as e: 
        print("Send webhook error : {}".format(e))
from lumeopipeline import VideoFrame  # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import concurrent.futures

frame_count = 0
last_webhook_frame_count = 0

# Use a background thread to send webhooks so that it doesn't
# slow down the video processing pipeline
background_thread = concurrent.futures.ThreadPoolExecutor()

def process_frame(frame: VideoFrame, **kwargs) -> bool:

    global frame_count
    global last_webhook_frame_count
    global background_thread

    send_webhook = False
    frame_count = frame_count + 1
    
    # Grab all the metadata stored with this frame. Includes
    # Object level metadata as well as metadata added by each node.
    meta = frame.meta().get_all()
    objects = meta['objects']
    nodes = meta['nodes']

    # Look through node properties and detect when a person has crossed a line.
    # Assumes there is a Line counter node before this custom function.
    # Metadata format: https://docs.lumeo.com/docs/line-crossing-counter-node
    if 'annotate_line_counter1' in nodes and \
       nodes['annotate_line_counter1']['lines']['line1']['total_objects_crossed_delta'] > 0:
        send_webhook = True

    # Send a webhook if a person was detected, but no frequently 
    # than once every 5 seconds (~50 frames at 10 FPS) to avoid spamming the server multiple times 
    # a second.
    if send_webhook and \
       (last_webhook_frame_count == 0 or last_webhook_frame_count - frame_count > 50):
        
        try:
            # The body will be converted to JSON by the send_webhook function
            body = {'event_type': "PersonDetected"}
            background_thread.submit(send_webhook, body=body) 
        except Exception as e:
            print(e)

        last_webhook_frame_count = frame_count

    return True


def send_webhook(body):
    server_address = 'http://127.0.0.1'
    gzip_compression = False
    try:
        # This method sends the webhook with a application/json content type
        Utils.send_webhook(server_address, body, gzip_compression)
    except Exception as e: 
        print("Send webhook error : {}".format(e))

Did this page help you?