Custom Model Parser

Lumeo is continuously improving the inference support for different model's formats and architectures. However, there might be some cases were Lumeo doesn't support parsing the inference output of some custom or less known architectures. In those cases, you can access the output tensor raw results and parse them in a Function Node.

Below we share the required steps in the model & pipeline configuration, and a snippet of the python code that can be used to access the raw tensors, parse them and store the detected objects and classes in the frame metadata.

1) Configure the inference model to disable internal parsing

1581

Select "Architecture -> Other" in model "Type & weights" tab

1460

Select "No clustering" on the "Clustering algorithm" of "Parameters" tab

2) Add a Custom Function node to process inference raw output tensors

Reference: Custom Function Node

1554

Connect a "Custom Function" after the "AI Model" node

3) Write the python code in the "Custom Function" to parse the inference output and store the results in frame metadata

from lumeopipeline import VideoFrame
import cv2
import numpy as np

# Global variables that persist across frames go here.
# One time initialization code can also live here.


def process_frame(frame: VideoFrame, deployment_id=None, node_id=None, **kwargs) -> bool:

    with frame.data() as mat:

        # Get the inference model raw output tensor(s)
        tensors = frame.tensors()
        print("Tensors length = {}".format(len(tensors)))

        # Variables where we will store the output tensor as numpy array
        heatmaps = pafs = None

        for tensor in tensors:
            # If there are multiple upstream inference nodes, it might be required filter the tensor by 
            # the corresponding inference node 'source_node_id'
            print("tensor.source_node_id = {}".format(tensor.source_node_id))

            for layer in tensor.layers:
                print("  Layer name = {}".format(layer.name))
                print("  dimensions = {}".format(layer.dimensions))
                print("  data = {}".format(layer.data))

                # Copy the raw tensor data, filtering by output tensor 'layer.name'
                if layer.name == "output_conf:0":
                    heatmaps = np.asarray(layer.data.copy()).reshape(layer.dimensions)
                elif layer.name == "output_paf:0":
                    pafs = np.asarray(layer.data.copy()).reshape(layer.dimensions)

        # Get the height and width of the frame
        height, width, _ = mat.shape

        if heatmaps is not None and pafs is not None:

            ###### This step might not be required for your model
            # Just an example how to resize the output tensors to half of the size of the input image 
            heatmaps = np.transpose(heatmaps, (1, 2, 0))
            heatmaps = cv2.resize(heatmaps, (int(width/2), int(height/2)), interpolation=cv2.INTER_CUBIC)
            
            pafs = np.transpose(pafs, (1, 2, 0))
            pafs = cv2.resize(pafs, (int(width/2), int(height/2)), interpolation=cv2.INTER_CUBIC)
            ######

            
            current_poses = []
            # Insert here the code to parse the output tensors and extract meaningful information (detected objects, classifier results, etc)
            #
            # Tracking logic and object clustering (for example NMS / Non-maximum Suppression) can also be applied here.

            # Save the metadata on Lumeo frame
            save_metadata(frame, current_poses)

    return True


def save_metadata(frame, current_poses):
    try:
        # Access frame metadata
        meta = frame.meta()
        
        # Get the "objects" field
        objects = meta.get_field("objects")

        # Iterate over the detected objects on this frame, and create a new object (or update existing ones)
        for pose in current_poses:
            pose_obj = {
                "label": "person",
                "class_id": 0,
                "probability": pose.confidence,
                "rect": {
                    "left": pose.bbox[0],
                    "top": pose.bbox[1],
                    "width": pose.bbox[2],
                    "height": pose.bbox[3],
                },
            }
            objects.append(pose_obj)

        # Save results on Lumeo frame metadata, so it can be access later in downstream nodes
        meta.set_field("objects", objects)
        meta.save()

    except Exception as error:
        print(error)
        pass
from lumeopipeline import VideoFrame
import cv2
import numpy as np
import json

import torch
import torchvision

# Global variables that persist across frames go here.
# One time initialization code can also live here.

def process_frame(frame: VideoFrame, deployment_id=None, node_id=None, **kwargs) -> bool:

    labels = ['animal', 'person', 'vehicle']

    with frame.data() as mat:

        # Get the inference model raw output tensor(s)
        tensors = frame.tensors()
        print("Tensors length = {}".format(len(tensors)))

        # Variables where we will store the output tensor as numpy array
        output_data = xyxy = classes = scores = None

        for tensor in tensors:
            # If there are multiple upstream inference nodes, it might be required filter the tensor by 
            # the corresponding inference node 'source_node_id'
            print("tensor.source_node_id = {}".format(tensor.source_node_id))

            for layer in tensor.layers:
                print("  Layer name = {}".format(layer.name))
                print("  dimensions = {}".format(layer.dimensions))
                #print("  data = {}".format(layer.data))

                # Copy the raw tensor data, filtering by output tensor 'layer.name'
                if layer.name == "output0":
                    output_data = np.asarray(layer.data.copy()).reshape(layer.dimensions)
                    break
                

        # Get the height and width of the frame
        height, width, _ = mat.shape

        if output_data is not None:
          
          # Insert here the code to parse the output tensors and extract meaningful information (detected objects, classifier results, etc)
          detected_objects = []
          xyxy, classes, scores = YOLOdetect(output_data) #boxes(x,y,x,y), classes(int), scores(float) [25200]
          
          if xyxy is not None and classes is not None and scores is not None:
            
              # Tracking logic and object clustering (for example NMS / Non-maximum Suppression) can also be applied here.
              #nms_output = torchvision.ops.nms(torch.tensor(xyxy, dtype=torch.float), 
              #                                 torch.tensor(scores, dtype=torch.float), 
              #                                 0.5)
                            
              # Normalize to input image dimensions 640x640 in this example.
              xyxy = xyxy / 640
            
              for i in range(scores.size):
                  
                  # Filter out objects with low probability
                  if ((scores[i] > 0.1) and (scores[i] <= 1.0)):
                      xmin = int(max(1,(xyxy[i][0] * width)))
                      ymin = int(max(1,(xyxy[i][1] * height)))
                      xmax = int(min(height,(xyxy[i][2] * width)))
                      ymax = int(min(width,(xyxy[i][3] * height)))
                      
                      obj_width = xmax - xmin
                      obj_height = ymax - ymin
                      
                      # Filter objects based on min dimensions
                      if obj_width > 5 and obj_height > 5:
                          detected_objects.append({
                              "label": labels[classes[i]],
                              "class_id": int(classes[i]),
                              "probability": float(scores[i]),
                              "rect": {
                                  "left": xmin,
                                  "top":  ymin,
                                  "width": obj_width,
                                  "height": obj_height
                              }
                          })
              
              #print(json.dumps(detected_objects))
  
              # Save the metadata on Lumeo frame
              save_metadata(frame, detected_objects)

    return True


def save_metadata(frame, detected_objects):
    try:
        # Access frame metadata
        meta = frame.meta()
        
        # Get the "objects" field
        objects = meta.get_field("objects")

        # Iterate over the detected objects on this frame, and create a new object (or update existing ones)
        for obj in detected_objects:
            objects.append(obj)

        # Save results on Lumeo frame metadata, so it can be access later in downstream nodes
        meta.set_field("objects", objects)
        meta.save()

    except Exception as error:
        print(error)
        pass

      
def classFilter(classdata):
    classes = []  # create a list
    for i in range(classdata.shape[0]):         # loop through all predictions
        classes.append(classdata[i].argmax())   # get the best classification location
    return classes  # return classes (int)


# input is inference_output, output is boxes(xyxy), classes, scores
def YOLOdetect(inference_output):

    boxes = np.squeeze(inference_output[..., :4])                           # boxes  [25200, 4]
    scores = np.squeeze( inference_output[..., 4:5])                        # confidences  [25200, 1]
    classes = classFilter(inference_output[..., 5:])                        # get classes
    # Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right
    x, y, w, h = boxes[..., 0], boxes[..., 1], boxes[..., 2], boxes[..., 3] #xywh
    xyxy = [x - w / 2, y - h / 2, x + w / 2, y + h / 2]                     # xywh to xyxy   [4, 25200]
    xyxy = np.transpose(np.array(xyxy))                                     # [25200, 4]

    return xyxy, classes, scores  # output is boxes(x,y,x,y), classes(int), scores(float) [predictions length]