Custom Function
Manually process or modify frames and metadata using python code
Overview
Custom Function node lets you run custom python code to process or modify frames, and extract or add metadata to frames.
Inputs & Outputs
- Inputs : 1, Media Format : Raw Video
- Outputs : 1, Media Format: Raw Video
- Output Metadata : See below.
Properties
Property | Value |
---|---|
name | Function name |
code | Python code to execute in the function. |
For your convenience the following variables containing useful information are reserved and passed to the process_frame
function.
Minimal function signature:
def process_frame(frame: VideoFrame, **kwargs) -> bool:
Exhaustive list:
def process_frame(frame: VideoFrame, application_id, application_name, deployment_id, deployment_name, node_id, **kwargs) -> bool:
Variable | Description |
---|---|
application_id | Application UUID |
application_name | Application Name |
deployment_id | Deployment UUID |
deployment_name | Deployment Name |
node_id | The ID (String) that identifies this Function Node in the pipeline. ex: function1 |
Metadata
Access metadata using the Function Node or using the API, from the snapshot or clip saved downstream of this node.
Metadata Property | Description |
---|---|
<custom metadata key> | Custom metadata, if added by the code using frame.meta().set_field() (see reference below). |
Function Template
The Function node can run any Python 3 code that you provide. It uses the lumeopipeline
python library to access and process frames, and bundles OpenCV and Numpy along with it so you can use those libraries to manipulate frames.
An example template is listed below (when you create this node in the Pipeline Editor, it comes pre-populated with a template to get you started quickly). A few things to note:
- The
process_frame
method is mandatory in the Function. - Return
True
from the function to continue processing the frame,False
to drop the frame from further processing. - You cannot alter the frame size with this Node currently.
- You can import and use additional libraries in the Function as long as they are installed on the system, or you install them explicitly using
lumeopipeline.Utils.install_import
method. - You can make API calls in the Function, but must keep in mind that the
process_frame
method blocks the rest of the pipeline, so if your function takes too long to execute, it will delay processing downstream.
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
import cv2 # OpenCV for image manipulations
import numpy # numpy for image manipulations
# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
frame_count = 0
def process_frame(frame: VideoFrame, **kwargs) -> bool:
# Insert your code here.
global frame_count
frame_count = frame_count + 1
with frame.data() as mat:
# Here's an example that uses OpenCV to put arbitrary text on the frame
cv2.putText(mat, "Frame " + str(frame_count), (50,50), cv2.FONT_HERSHEY_DUPLEX, 0.7, (0, 0, 255), 1)
# Here's an example of how to access the frame level metadata an print on the frame
meta = frame.meta()
yidx = 100
for (key, value) in meta.get_all().items():
cv2.putText(mat, key + " : " + str(value), (50,yidx), cv2.FONT_HERSHEY_DUPLEX, 0.7, (255, 255, 255), 1, cv2.LINE_AA)
yidx = yidx + 50
# Add your own metadata to the frame
meta.set_field("frame_count", frame_count)
meta.save()
# Return False to drop this frame, True to continue processing.
return True
Debugging and Logging
Head over to the Logs tab under the Deployment to view the logs in real time while the pipeline is running.
You can use print()
in the Function node to log data to the Console or add a Display Stream Info Node and set it's Log Metadata property to True.
Accessing Metadata
Most nodes in the Pipeline add their own metadata to each frame. This metadata can be accessed using the lumeopipeline
library in the Function (reference below). You can find more information about metadata added by specific nodes in each Node's reference.
- AI Model Node and Track Objects Node : Metadata is stored under
objects
top level key. - Other Nodes : Metadata is stored under
nodes
top level key.
You can also add your own custom metadata to the frame. This metadata will be passed along to any webhooks sent out by subsequent nodes and saved along with clips and snapshots generated by Save Clip Node & Save Snapshot Node .
from lumeopipeline import VideoFrame
def process_frame(frame: VideoFrame, **kwargs) -> bool:
try:
video_info = frame.video_info()
print(video_info.fps_d, video_info.fps_n)
meta = frame.meta()
if meta is not None:
print(meta.get_all())
except Exception as error:
print(error)
pass
return True
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2 # OpenCV for image manipulations
import numpy # numpy for image manipulations
# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
prev_frame_objects = []
def process_frame(frame: VideoFrame, **kwargs) -> bool:
# Insert your code here.
global prev_frame_objects
with frame.data() as mat:
# Grab all the metadata stored with this frame. Includes
# Object level metadata as well as metadata added by each node.
meta = frame.meta().get_all()
# Alternatively, you can also do:
# objects = frame.meta().get_field("objects")
# nodes = frame.meta().get_fields("nodes")
objects = meta['objects']
nodes = meta['nodes']
# Iterate through all objects
# See here for properties of each object :
# https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
for obj in objects:
# Draw a red bounding box
obj_rect = obj['rect']
cv2.rectangle(mat,
(obj_rect['left'], obj_rect['top']),
(obj_rect['left'] + obj_rect['width'], obj_rect['top'] + obj_rect['height']),
(0,0,255), thickness=2)
# Write label.
# obj['label'] -- object class
# obj['id'] -- unique id assigned to this object by Track Objects node, persistent across frames.
# obj['attributes'] -- if you use secondary classifiers, this contains additional object attributes.
label ="{} {} {}".format(obj['id'] if 'id' in obj else 'not-tracked',
obj['label'],
",".join([obj_attr['label'] for obj_attr in obj['attributes']]))
Utils.write_label_on_frame(mat, obj_rect['left'], obj_rect['top'], label)
# Return False to drop this frame, True to continue processing.
return True
from lumeopipeline import VideoFrame
def process_frame(frame: VideoFrame, **kwargs) -> bool:
try:
video_info = frame.video_info()
print(video_info.fps_d, video_info.fps_n)
meta = frame.meta()
if meta is not None:
# Log metadata added by the all Nodes
# nodes is a dictionary that contains each node's
# metadata. Refer to node specific docs for the property
# values.
nodes = meta.get_field("nodes")
for node_id in nodes:
print("{} : {}".format(node_id,nodes[node_id]))
except Exception as e:
print(e)
return True
from lumeopipeline import VideoFrame
def process_frame(frame: VideoFrame, **kwargs) -> bool:
try:
video_info = frame.video_info()
print(video_info.fps_d, video_info.fps_n)
meta = frame.meta()
if meta is not None:
print(meta.get_all())
meta.set_field("key", "value")
meta.save()
except Exception as error:
print(error)
pass
return True
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
def process_frame(frame: VideoFrame, **kwargs) -> bool:
with frame.data() as mat:
# Grab the object metadata
meta = frame.meta()
objects = meta.get_field("objects")
# Iterate through all objects
# See here for properties of each object :
# https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
probability_threshold = 0.5 # Set your threshold value here
objects = [obj for obj in objects if obj['probability'] >= probability_threshold]
meta.set_field("objects", objects)
meta.save()
# Return False to drop this frame, True to continue processing.
return True
lumeopipeline Reference
method | Description |
---|---|
VideoFrame.video_info() --> VideoInfo | Returns information about the video frame. VideoInfo.height : Frame height VideoInfo.width : Frame width VideoInfo.fps_n : Numerator of Frame rate VideoInfo.fps_d : Denominator of Frame rate |
VideoFrame.data() -> numpy.ndarray | Returns a Width x Height x 4 dimensional array containing the frame's pixel data. The array is in the BGRA format. Altering this array will alter the raw video frame output by this node. |
VideoFrame.meta() --> Meta | Returns metadata object that contains inserted by Pipeline nodes, as a dictionary. |
Meta.get_all() -> Dictionary | Returns all metadata fields (key, value pairs) in the frame. |
Meta.get_field(key) -> string | Returns value for a specific field |
Meta.set_field(key, value) -> bool | Sets the value for a specific field |
Meta.save() -> None | Save updated metadata if it was modified using set_field or set_user_field |
Meta.get_fields(pattern) -> Generator of tuples (field_name, field_value) | Get all fields whose names match given pattern Field names are in form of dot-separated words. The pattern allows for wildcarding of unneeded words with * . The rightmost star matches all the remaining words. Example patterns: - *.inference.label skips the first word, it would match -- inference_node_0123.inference.label -- inference_node_5678.inference.label - user.* returns all fields whose names start with "user", it would match -- user.property1 -- user.property2 -- user.complex.property - * matches everything |
VideoFrame.tensors() --> Vec<Tensor> | Returns a list of the output tensors of Inference Node(s). Can be used to apply custom parsing using the raw inference outputs results. Check Custom Model Parser for more details. |
Tensor.layers -> Vec<Layer> | Iterate over this field to get the multiple output layers |
Layer.name -> string | The output layer name |
Layer.dimensions -> Vec<Int> | The shape of the output layer |
Layer.data -> raw_buffer | The raw output data of this layer |
lumeopipeline Utils class
For convenience we have an utility class which contains functions that are used in multiple of our dynamic nodes.
You can access them by adding to the Python import section: from lumeopipeline import Utils
See "Use Utils" under Examples below for reference.
method | Description |
---|---|
install_import(package_name=None, attribute_name=None, module_name=None, version=None) | Allows users to install and import additional python modules on devices. Examples: install and import module: pytz = install_import('pytz') , import attribute from module: sleep = install_import('time', 'sleep') , import a module which has a different name from the package it belongs to. box = install_import('python-box', module_name='box') , Install a specific version jwt = install_import('PyJWT', module_name='jwt', version='1.5.3') |
write_label_on_frame(mat, xidx, yidx, label) | Auxiliary function to write text on frame |
num_milliseconds_seen(last_seen_frame, first_seen_frame, video_info) | Returns the total of milliseconds passed between two frames |
get_timestamp(frame, frame_count) | Returns the timestamp of a certain frame, based on the current frame_count and video FPS |
send_webhook(webhook_url, body, compression=True) | Sends a webhook to a specific URL. The compression is enabled by default, you should pass False to the 3rd argument of the method to disable it. |
Examples
Basics
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
import cv2 # OpenCV for image manipulations
import numpy # numpy for image manipulations
frames_to_skip = 5700
def process_frame(frame: VideoFrame, **kwargs) -> bool:
global frames_to_skip
if frames_to_skip > 0:
frames_to_skip = frames_to_skip - 1
return False
return True
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
import cv2 # OpenCV for image manipulations
import numpy # numpy for image manipulations
def process_frame(frame: VideoFrame, **kwargs) -> bool:
# Insert your code here.
try:
meta = frame.meta()
# All Metadata in this frame
print(meta.get_all())
# Metadata about any objects in this frame from Model Inference node
print(meta.get_field("objects"))
except Exception as error:
print(error)
pass
# Return False to drop this frame, True to continue processing.
return True
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
import cv2 # OpenCV for image manipulations
import numpy # numpy for image manipulations
def process_frame(frame: VideoFrame, **kwargs) -> bool:
try:
meta = frame.meta()
if meta is not None:
yidx = 100
with frame.data() as mat:
for (key, value) in meta.get_all().items():
if value is not None:
cv2.putText(mat, key + " : " + str(value), (50,yidx), cv2.FONT_HERSHEY_DUPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
yidx = yidx + 25
except Exception as error:
print(error, flush=True)
pass
return True
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
import cv2 # OpenCV for image manipulations
import numpy # numpy for image manipulations
# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
frame_count = 0
def process_frame(frame: VideoFrame, **kwargs) -> bool:
global frame_count
frame_count = frame_count + 1
try:
meta = frame.meta()
if meta is not None:
# Set user metadata that will be saved along with a
# snapshot or clip. (Access it using the Lumeo Files API)
meta.set_field("frame_count", str(frame_count))
except Exception as error:
print(error, flush=True)
pass
# Return False to drop this frame, True to continue processing.
return True
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
# Installs and import additional python modules on device,
# or just imports it if already installed
#
# Usage:
# - install and import module:
# pytz_ref = install_import('pytz')
#
# - import attribute from module:
# sleep = install_import('time', 'sleep')
#
# - import a module which has a different name from the package it belongs
# box = install_import('python-box', module_name='box')
#
# - install a specific version
# jwt = install_import('PyJWT', module_name='jwt', version='1.5.3')
pytz = Utils.install_import('pytz')
from pytz import reference as pytz_ref
frame_count = 0
# Set to False if the Webhook endpoint does not support compression
compression = True
def process_frame(frame: VideoFrame, **kwargs) -> bool:
global frame_count
global compression
frame_count = frame_count + 1
with frame.data() as mat:
Utils.write_label_on_frame(mat, 10, 10, 'Example of lumeopipeline Utils')
print("Number of milliseconds: " + str(Utils.num_milliseconds_seen(frame_count, 0, frame.video_info())))
print("Frame timestamp: " + str(Utils.get_timestamp(frame, frame_count)))
try:
# This will be converted to JSON internaly by the send_webhook function
body = {'name': "Example", 'city': "New York"}
Utils.send_webhook('http://127.0.0.1', body, compression)
except Exception as e:
print(e)
# Return False to drop this frame, True to continue processing.
return True
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2 # OpenCV for image manipulations
import numpy # numpy for image manipulations
frame_count = 0
total_objects_passed = 0
objects_crossed_per = 0
prev_timestamp = 0
def process_frame(frame: VideoFrame, **kwargs) -> bool:
global frame_count
global total_objects_passed
global objects_crossed_per
global prev_timestamp
try:
with frame.data() as mat:
# Get Current Frame's relative timestamp (in seconds) since
# start of stream
cur_timestamp = Utils.get_timestamp(frame, frame_count)
frame_count += 1
# Grab metadata from previous nodes in this pipeline
meta = frame.meta()
nodes = meta.get_field('nodes')
# Iterate through line counter nodes and add up
# objects that have crossed any line
for node_id in nodes:
if nodes[node_id]['type'] == 'annotate_line_counter':
for line_id in nodes[node_id]['lines']:
total_objects_passed += nodes[node_id]['lines'][line_id]['total_objects_crossed_delta']
# Every 2 seconds, compute the throughput (objects crossed) per minute
# Then reset the counts
if (cur_timestamp - prev_timestamp > 2):
objects_crossed_per = total_objects_passed * 30
total_objects_passed = 0
prev_timestamp = cur_timestamp
# Display values on the frame
Utils.write_label_on_frame(mat, 50,50, "Throughput : {} objects/minute".format(objects_crossed_per))
meta.set_field('objects_crossed_per', objects_crossed_per)
meta.save()
except Exception as e:
print(e)
pass
# Return False to drop this frame, True to continue processing.
return True
Image and Media Manipulation
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
import cv2 # OpenCV for image manipulations
import numpy # numpy for image manipulations
from urllib.request import urlopen
print("module name: ", __name__)
print("watermark.py : opencv ver : ", cv2.__version__)
watermarkurl = "https://s.gravatar.com/avatar/f8066a0ab6f5ffa7d121aa210d988980?s=80"
watermarkimage = numpy.asarray(bytearray(urlopen(watermarkurl).read()), dtype="uint8")
watermark = cv2.imdecode(watermarkimage, cv2.IMREAD_UNCHANGED)
if watermark.shape[2] != 4:
watermark = cv2.cvtColor(watermark,cv2.COLOR_BGR2BGRA)
#watermark = cv2.imread("watermark.png", cv2.IMREAD_UNCHANGED)
watermark_overlay = None
def process_frame(frame: VideoFrame, **kwargs) -> bool:
global watermark
global watermark_overlay
with frame.data() as mat:
if watermark_overlay is None:
(wH, wW) = watermark.shape[:2]
width = frame.video_info().width
height = frame.video_info().height
watermark_overlay = numpy.zeros((height,width,4), dtype="uint8")
watermark_overlay[10:10+wH, 10:10+wW] = watermark
cv2.addWeighted(watermark_overlay, 1.0, mat, 1.0, 0, mat)
#cv2.putText(mat, "LUMEO", (50,50), cv2.FONT_HERSHEY_DUPLEX, 1, (0, 0, 255), 2)
return True
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2 # OpenCV for image manipulations
import numpy # numpy for image manipulations
# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
def process_frame(frame: VideoFrame, **kwargs) -> bool:
# Insert your code here.
with frame.data() as mat:
# Grab all the metadata stored with this frame. Includes
# Object level metadata as well as metadata added by each node.
meta = frame.meta().get_all()
# Alternatively, you can also do:
# objects = frame.meta().get_field("objects")
# nodes = frame.meta().get_fields("nodes")
objects = meta['objects']
nodes = meta['nodes']
# Iterate through all objects
# See here for properties of each object :
# https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
for obj in objects:
# Draw a red bounding box
obj_rect = obj['rect']
cv2.rectangle(mat,
(obj_rect['left'], obj_rect['top']),
(obj_rect['left'] + obj_rect['width'], obj_rect['top'] + obj_rect['height']),
(0,0,255), thickness=2)
# Write label.
# obj['label'] -- object class
# obj['id'] -- unique id assigned to this object by Track Objects node, persistent across frames.
# obj['attributes'] -- if you use secondary classifiers, this contains additional object attributes.
label ="{} {} {}".format(obj['id'] if 'id' in obj else 'not-tracked',
obj['label'],
",".join([obj_attr['label'] for obj_attr in obj['attributes']]))
Utils.write_label_on_frame(mat, obj_rect['left'], obj_rect['top'], label)
# Return False to drop this frame, True to continue processing.
return True
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
import cv2 # OpenCV for image manipulations
import base64 # Base64 encoding for image data
def process_frame(frame: VideoFrame, **kwargs) -> bool:
with frame.data() as mat:
# Downscale and get entire frame as base64 encoded string
frame_base64 = get_image_as_base64(mat, max_dim=640)
# do something with the frame_base64
# Get all objects as base64 encoded strings
meta = frame.meta().get_all()
objects = meta['objects']
for obj in objects:
obj_rect = obj['rect']
obj_base64 = get_image_as_base64(mat, max_dim=160, roi_coords=obj_rect)
# do something with the obj_base64
return True
def downscale_image(image, max_dim=640):
largest_dim = max(image.shape[0], image.shape[1])
scale_factor = 1
if largest_dim > max_dim:
scale_factor = max_dim / largest_dim
if scale_factor != 1:
scaled_image = cv2.resize(image,
(int(image.shape[1]*scale_factor), int(image.shape[0]*scale_factor)),
interpolation = cv2.INTER_CUBIC)
return scaled_image
return image
def get_image_as_base64(image, max_dim, roi_coords=None):
# Crop to ROI
if roi_coords:
img_cropped = image[max(roi_coords['top'],0):min(roi_coords['top']+roi_coords['height'],image.shape[0]),
max(roi_coords['left'],0):min(roi_coords['left']+roi_coords['width'],image.shape[1])]
else:
img_cropped = image
# Downscale image if it is too large
img_cropped = downscale_image(img_cropped, max_dim=max_dim)
# Encode to JPEG
img_encoded = cv2.imencode('.jpg', img_cropped, [int(cv2.IMWRITE_JPEG_QUALITY), 90])[1]
return base64.b64encode(img_encoded).decode('utf-8')
Access and Manipulate Metadata
from lumeopipeline import VideoFrame
def process_frame(frame: VideoFrame, **kwargs) -> bool:
try:
video_info = frame.video_info()
print(video_info.fps_d, video_info.fps_n)
meta = frame.meta()
if meta is not None:
print(meta.get_all())
except Exception as error:
print(error)
pass
return True
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import cv2 # OpenCV for image manipulations
import numpy # numpy for image manipulations
# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
prev_frame_objects = []
def process_frame(frame: VideoFrame, **kwargs) -> bool:
# Insert your code here.
global prev_frame_objects
with frame.data() as mat:
# Grab all the metadata stored with this frame. Includes
# Object level metadata as well as metadata added by each node.
meta = frame.meta().get_all()
# Alternatively, you can also do:
# objects = frame.meta().get_field("objects")
# nodes = frame.meta().get_fields("nodes")
objects = meta['objects']
nodes = meta['nodes']
# Iterate through all objects
# See here for properties of each object :
# https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
for obj in objects:
# Draw a red bounding box
obj_rect = obj['rect']
cv2.rectangle(mat,
(obj_rect['left'], obj_rect['top']),
(obj_rect['left'] + obj_rect['width'], obj_rect['top'] + obj_rect['height']),
(0,0,255), thickness=2)
# Write label.
# obj['label'] -- object class
# obj['id'] -- unique id assigned to this object by Track Objects node, persistent across frames.
# obj['attributes'] -- if you use secondary classifiers, this contains additional object attributes.
label ="{} {} {}".format(obj['id'] if 'id' in obj else 'not-tracked',
obj['label'],
",".join([obj_attr['label'] for obj_attr in obj['attributes']]))
Utils.write_label_on_frame(mat, obj_rect['left'], obj_rect['top'], label)
# Return False to drop this frame, True to continue processing.
return True
from lumeopipeline import VideoFrame
def process_frame(frame: VideoFrame, **kwargs) -> bool:
try:
video_info = frame.video_info()
print(video_info.fps_d, video_info.fps_n)
meta = frame.meta()
if meta is not None:
# Log metadata added by the all Nodes
# nodes is a dictionary that contains each node's
# metadata. Refer to node specific docs for the property
# values.
nodes = meta.get_field("nodes")
for node_id in nodes:
print("{} : {}".format(node_id,nodes[node_id]))
except Exception as e:
print(e)
return True
from lumeopipeline import VideoFrame
def process_frame(frame: VideoFrame, **kwargs) -> bool:
try:
video_info = frame.video_info()
print(video_info.fps_d, video_info.fps_n)
meta = frame.meta()
if meta is not None:
print(meta.get_all())
meta.set_field("key", "value")
meta.save()
except Exception as error:
print(error)
pass
return True
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
def process_frame(frame: VideoFrame, **kwargs) -> bool:
with frame.data() as mat:
# Grab the object metadata
meta = frame.meta()
objects = meta.get_field("objects")
# Iterate through all objects
# See here for properties of each object :
# https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
probability_threshold = 0.5 # Set your threshold value here
objects = [obj for obj in objects if obj['probability'] >= probability_threshold]
meta.set_field("objects", objects)
meta.save()
# Return False to drop this frame, True to continue processing.
return True
Object Tracking
from lumeopipeline import VideoFrame, Utils
import cv2
import numpy as np
# Node-specific variables
node = {
'tracked_objects': {},
'max_frames_missing': 600
}
def IoU(box1, box2):
"""Calculate Intersection over Union (IoU) between two bounding boxes."""
x1, y1, x2, y2 = box1
x1_p, y1_p, x2_p, y2_p = box2
inter_x1 = max(x1, x1_p)
inter_y1 = max(y1, y1_p)
inter_x2 = min(x2, x2_p)
inter_y2 = min(y2, y2_p)
inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)
box1_area = (x2 - x1) * (y2 - y1)
box2_area = (x2_p - x1_p) * (y2_p - y1_p)
iou = inter_area / float(box1_area + box2_area - inter_area)
return iou
def track_objects(node, frame, detected_objects):
new_tracked_objects = {}
for obj in detected_objects:
bbox = obj['rect']
box = [bbox['left'], bbox['top'], bbox['left'] + bbox['width'], bbox['top'] + bbox['height']]
best_iou = 0
best_id = None
for obj_id, tracked in node['tracked_objects'].items():
iou = IoU(box, tracked['rect'])
if iou > best_iou:
best_iou = iou
best_id = obj_id
if best_iou > 0.3: # IoU threshold to match objects
new_tracked_objects[best_id] = {
'rect': box,
'label': obj['label'],
'probability': obj['probability'],
'frames_missing': 0
}
else:
new_id = len(node['tracked_objects']) + len(new_tracked_objects)
new_tracked_objects[new_id] = {
'rect': box,
'label': obj['label'],
'probability': obj['probability'],
'frames_missing': 0
}
for obj_id, tracked in node['tracked_objects'].items():
if obj_id not in new_tracked_objects:
tracked['frames_missing'] += 1
if tracked['frames_missing'] < node['max_frames_missing']:
new_tracked_objects[obj_id] = tracked
node['tracked_objects'] = new_tracked_objects
for obj_id, tracked in node['tracked_objects'].items():
for obj in detected_objects:
bbox = obj['rect']
box = [bbox['left'], bbox['top'], bbox['left'] + bbox['width'], bbox['top'] + bbox['height']]
if tracked['rect'] == box:
obj['id'] = obj_id
save_metadata(frame, detected_objects)
def save_metadata(frame, detected_objects):
try:
meta = frame.meta()
meta.set_field("objects", detected_objects)
meta.save()
except Exception as error:
print(error)
pass
def process_frame(frame: VideoFrame, **kwargs) -> bool:
with frame.data() as mat:
meta = frame.meta()
objects = meta.get_field("objects")
track_objects(node, frame, objects)
return True
Integrations
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import concurrent.futures
frame_count = 0
last_webhook_frame_count = 0
# Use a background thread to send webhooks so that it doesn't
# slow down the video processing pipeline
background_thread = concurrent.futures.ThreadPoolExecutor()
def process_frame(frame: VideoFrame, **kwargs) -> bool:
global frame_count
global last_webhook_frame_count
global background_thread
send_webhook = False
frame_count = frame_count + 1
# Grab all the metadata stored with this frame. Includes
# Object level metadata as well as metadata added by each node.
meta = frame.meta().get_all()
objects = meta['objects']
nodes = meta['nodes']
# Iterate through all objects to see if we detect a person
# See here for properties of each object :
# https://docs.lumeo.com/docs/infer-with-ai-model-node#metadata
# Assumes you have a AI Model node before this Custom function node, configured
# with a Person Detection Model.
for obj in objects:
if obj['label'] == 'person':
send_webhook = True
# Send a webhook if a person was detected, but no frequently
# than once every 5 seconds (~50 frames at 10 FPS) to avoid spamming the server multiple times
# a second.
if send_webhook and \
(last_webhook_frame_count == 0 or last_webhook_frame_count - frame_count > 50):
try:
# The body will be converted to JSON by the send_webhook function
body = {'event_type': "PersonDetected"}
background_thread.submit(send_webhook, body=body)
except Exception as e:
print(e)
last_webhook_frame_count = frame_count
return True
def send_webhook(body):
server_address = 'http://127.0.0.1'
gzip_compression = False
try:
# This method sends the webhook with a application/json content type
Utils.send_webhook(server_address, body, gzip_compression)
except Exception as e:
print("Send webhook error : {}".format(e))
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import concurrent.futures
frame_count = 0
last_webhook_frame_count = 0
# Use a background thread to send webhooks so that it doesn't
# slow down the video processing pipeline
background_thread = concurrent.futures.ThreadPoolExecutor()
def process_frame(frame: VideoFrame, **kwargs) -> bool:
global frame_count
global last_webhook_frame_count
global background_thread
send_webhook = False
frame_count = frame_count + 1
# Grab all the metadata stored with this frame. Includes
# Object level metadata as well as metadata added by each node.
meta = frame.meta().get_all()
objects = meta['objects']
nodes = meta['nodes']
# Look through node properties and detect when a person has crossed a line.
# Assumes there is a Line counter node before this custom function.
# Metadata format: https://docs.lumeo.com/docs/line-crossing-counter-node
if 'annotate_line_counter1' in nodes and \
nodes['annotate_line_counter1']['lines']['line1']['total_objects_crossed_delta'] > 0:
send_webhook = True
# Send a webhook if a person was detected, but no frequently
# than once every 5 seconds (~50 frames at 10 FPS) to avoid spamming the server multiple times
# a second.
if send_webhook and \
(last_webhook_frame_count == 0 or last_webhook_frame_count - frame_count > 50):
try:
# The body will be converted to JSON by the send_webhook function
body = {'event_type': "PersonDetected"}
background_thread.submit(send_webhook, body=body)
except Exception as e:
print(e)
last_webhook_frame_count = frame_count
return True
def send_webhook(body):
server_address = 'http://127.0.0.1'
gzip_compression = False
try:
# This method sends the webhook with a application/json content type
Utils.send_webhook(server_address, body, gzip_compression)
except Exception as e:
print("Send webhook error : {}".format(e))
from lumeopipeline import VideoFrame # Lumeo lib to access frame and metadata
from lumeopipeline import Utils
import os
import json
import concurrent.futures
from datetime import datetime
# Global variables that persist across frames go here.
# Onetime initialization code can also live here.
node_properties = {
"background_thread": concurrent.futures.ThreadPoolExecutor(),
'storage_dir': '/var/lib/lumeo/userdata/',
'file_name': 'mydata.jsonl',
'prev_objects': [],
'initialized': False
}
def process_frame(frame: VideoFrame, **kwargs) -> bool:
object_properties = {}
if not node_properties['initialized']:
init_persistent_storage()
node_properties['initialized'] = True
# Grab all the object metadata stored with this frame.
frame_meta = frame.meta()
meta = frame_meta.get_all()
# Iterate over all the objects in the frame and count any new objects.
objects = meta['objects']
for obj in objects:
if obj.get('id', -1) not in node_properties['prev_objects']:
object_properties[obj['label']] = object_properties.get(obj['label'], 0) + 1
# Append the object properties to the persistent storage
# TODO: Clean up persistent storage if it gets too large.
# Note: Persistent storage file is within a docker container and will be lost if the container is deleted.
if any(value > 0 for value in object_properties.values()):
object_properties['timestamp'] = datetime.utcnow().isoformat() + "Z"
node_properties['background_thread'].submit(append_to_persistent_storage, object_properties)
# Update the list of objects in the frame.
node_properties['prev_objects'] = [obj.get('id', -1) for obj in objects]
return True
def init_persistent_storage(reset=True):
os.makedirs(f"{node_properties['storage_dir']}", exist_ok=True)
if reset:
file_path = f"{node_properties['storage_dir']}{node_properties['file_name']}"
if os.path.exists(file_path):
os.remove(file_path)
def append_to_persistent_storage(row):
jsonl_data = json.dumps(row) + "\n"
with open(f"{node_properties['storage_dir']}{node_properties['file_name']}", "a") as f:
f.write(jsonl_data)
Updated about 1 month ago