Custom Model Parser
Lumeo is continuously improving the inference support for different model's formats and architectures. However, there might be some cases were Lumeo doesn't support parsing the inference output of some custom or less known architectures. In those cases, you can access the output tensor raw results and parse them in a Function Node.
Below we share the required steps in the model & pipeline configuration, and a snippet of the python code that can be used to access the raw tensors, parse them and store the detected objects and classes in the frame metadata.
1) Configure the inference model to disable internal parsing

Select "Architecture -> Other" in model "Type & weights" tab

Select "No clustering" on the "Clustering algorithm" of "Parameters" tab
2) Add a Custom Function node to process inference raw output tensors
Reference: Custom Function Node

Connect a "Custom Function" after the "AI Model" node
3) Write the python code in the "Custom Function" to parse the inference output and store the results in frame metadata
from lumeopipeline import VideoFrame
import cv2
import numpy as np
# Global variables that persist across frames go here.
# One time initialization code can also live here.
def process_frame(frame: VideoFrame, deployment_id=None, node_id=None, **kwargs) -> bool:
with frame.data() as mat:
# Get the inference model raw output tensor(s)
tensors = frame.tensors()
print("Tensors length = {}".format(len(tensors)))
# Variables where we will store the output tensor as numpy array
heatmaps = pafs = None
for tensor in tensors:
# If there are multiple upstream inference nodes, it might be required filter the tensor by
# the corresponding inference node 'source_node_id'
print("tensor.source_node_id = {}".format(tensor.source_node_id))
for layer in tensor.layers:
print(" Layer name = {}".format(layer.name))
print(" dimensions = {}".format(layer.dimensions))
print(" data = {}".format(layer.data))
# Copy the raw tensor data, filtering by output tensor 'layer.name'
if layer.name == "output_conf:0":
heatmaps = np.asarray(layer.data.copy()).reshape(layer.dimensions)
elif layer.name == "output_paf:0":
pafs = np.asarray(layer.data.copy()).reshape(layer.dimensions)
# Get the height and width of the frame
height, width, _ = mat.shape
if heatmaps is not None and pafs is not None:
###### This step might not be required for your model
# Just an example how to resize the output tensors to half of the size of the input image
heatmaps = np.transpose(heatmaps, (1, 2, 0))
heatmaps = cv2.resize(heatmaps, (int(width/2), int(height/2)), interpolation=cv2.INTER_CUBIC)
pafs = np.transpose(pafs, (1, 2, 0))
pafs = cv2.resize(pafs, (int(width/2), int(height/2)), interpolation=cv2.INTER_CUBIC)
######
current_poses = []
# Insert here the code to parse the output tensors and extract meaningful information (detected objects, classifier results, etc)
#
# Tracking logic and object clustering (for example NMS / Non-maximum Suppression) can also be applied here.
# Save the metadata on Lumeo frame
save_metadata(frame, current_poses)
return True
def save_metadata(frame, current_poses):
try:
# Access frame metadata
meta = frame.meta()
# Get the "objects" field
objects = meta.get_field("objects")
# Iterate over the detected objects on this frame, and create a new object (or update existing ones)
for pose in current_poses:
pose_obj = {
"label": "person",
"class_id": 0,
"probability": pose.confidence,
"rect": {
"left": pose.bbox[0],
"top": pose.bbox[1],
"width": pose.bbox[2],
"height": pose.bbox[3],
},
}
objects.append(pose_obj)
# Save results on Lumeo frame metadata, so it can be access later in downstream nodes
meta.set_field("objects", objects)
meta.save()
except Exception as error:
print(error)
pass
from lumeopipeline import VideoFrame
import cv2
import numpy as np
import json
import torch
import torchvision
# Global variables that persist across frames go here.
# One time initialization code can also live here.
def process_frame(frame: VideoFrame, deployment_id=None, node_id=None, **kwargs) -> bool:
labels = ['animal', 'person', 'vehicle']
with frame.data() as mat:
# Get the inference model raw output tensor(s)
tensors = frame.tensors()
print("Tensors length = {}".format(len(tensors)))
# Variables where we will store the output tensor as numpy array
output_data = xyxy = classes = scores = None
for tensor in tensors:
# If there are multiple upstream inference nodes, it might be required filter the tensor by
# the corresponding inference node 'source_node_id'
print("tensor.source_node_id = {}".format(tensor.source_node_id))
for layer in tensor.layers:
print(" Layer name = {}".format(layer.name))
print(" dimensions = {}".format(layer.dimensions))
#print(" data = {}".format(layer.data))
# Copy the raw tensor data, filtering by output tensor 'layer.name'
if layer.name == "output0":
output_data = np.asarray(layer.data.copy()).reshape(layer.dimensions)
break
# Get the height and width of the frame
height, width, _ = mat.shape
if output_data is not None:
# Insert here the code to parse the output tensors and extract meaningful information (detected objects, classifier results, etc)
detected_objects = []
xyxy, classes, scores = YOLOdetect(output_data) #boxes(x,y,x,y), classes(int), scores(float) [25200]
if xyxy is not None and classes is not None and scores is not None:
# Tracking logic and object clustering (for example NMS / Non-maximum Suppression) can also be applied here.
#nms_output = torchvision.ops.nms(torch.tensor(xyxy, dtype=torch.float),
# torch.tensor(scores, dtype=torch.float),
# 0.5)
# Normalize to input image dimensions 640x640 in this example.
xyxy = xyxy / 640
for i in range(scores.size):
# Filter out objects with low probability
if ((scores[i] > 0.1) and (scores[i] <= 1.0)):
xmin = int(max(1,(xyxy[i][0] * width)))
ymin = int(max(1,(xyxy[i][1] * height)))
xmax = int(min(height,(xyxy[i][2] * width)))
ymax = int(min(width,(xyxy[i][3] * height)))
obj_width = xmax - xmin
obj_height = ymax - ymin
# Filter objects based on min dimensions
if obj_width > 5 and obj_height > 5:
detected_objects.append({
"label": labels[classes[i]],
"class_id": int(classes[i]),
"probability": float(scores[i]),
"rect": {
"left": xmin,
"top": ymin,
"width": obj_width,
"height": obj_height
}
})
#print(json.dumps(detected_objects))
# Save the metadata on Lumeo frame
save_metadata(frame, detected_objects)
return True
def save_metadata(frame, detected_objects):
try:
# Access frame metadata
meta = frame.meta()
# Get the "objects" field
objects = meta.get_field("objects")
# Iterate over the detected objects on this frame, and create a new object (or update existing ones)
for obj in detected_objects:
objects.append(obj)
# Save results on Lumeo frame metadata, so it can be access later in downstream nodes
meta.set_field("objects", objects)
meta.save()
except Exception as error:
print(error)
pass
def classFilter(classdata):
classes = [] # create a list
for i in range(classdata.shape[0]): # loop through all predictions
classes.append(classdata[i].argmax()) # get the best classification location
return classes # return classes (int)
# input is inference_output, output is boxes(xyxy), classes, scores
def YOLOdetect(inference_output):
boxes = np.squeeze(inference_output[..., :4]) # boxes [25200, 4]
scores = np.squeeze( inference_output[..., 4:5]) # confidences [25200, 1]
classes = classFilter(inference_output[..., 5:]) # get classes
# Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right
x, y, w, h = boxes[..., 0], boxes[..., 1], boxes[..., 2], boxes[..., 3] #xywh
xyxy = [x - w / 2, y - h / 2, x + w / 2, y + h / 2] # xywh to xyxy [4, 25200]
xyxy = np.transpose(np.array(xyxy)) # [25200, 4]
return xyxy, classes, scores # output is boxes(x,y,x,y), classes(int), scores(float) [predictions length]
Updated 9 months ago