Skip to content

OAK LR

介绍

这个脚本用于创建一个基于 depthai 库的 Pipeline,用于处理摄像头输入和目标检测神经网络推理。它可以根据传入的参数配置摄像头和神经网络模型,并构建一个数据流处理的 Pipeline。

源码

create_pipeline_for_lr.py
# coding=utf-8
from enum import Enum
from typing import Union

import depthai as dai


class StereoPair(str, Enum):
    """
    Stereo pair for stereo pair inference

    Attributes:
        LR: CAM_LEFT (CAM_B) as the left camera in the camera pair, and CAM_RIGHT (CAM_C) as the right camera in the camera pair
        LC: CAM_LEFT (CAM_B) as the left camera in the camera pair, and CAM_CENTER (CAM_A) as the right camera in the camera pair
        CR: CAM_CENTER(CAM_A) is the left camera in the camera pair, and CAM_RIGHT(CAM_C) is the right camera in the camera pair
    """

    LR = "LR"
    LC = "LC"
    CR = "CR"


cam_left_socket = {
    StereoPair.LR: dai.CameraBoardSocket.CAM_B,
    StereoPair.LC: dai.CameraBoardSocket.CAM_B,
    StereoPair.CR: dai.CameraBoardSocket.CAM_A,
}

cam_right_socket = {
    StereoPair.LR: dai.CameraBoardSocket.CAM_C,
    StereoPair.LC: dai.CameraBoardSocket.CAM_A,
    StereoPair.CR: dai.CameraBoardSocket.CAM_C,
}


def create_pipeline(**kwargs):
    model_data = kwargs.get("model_data")
    config_data = kwargs.get("config_data")
    nn_config = config_data.nn_config
    color_res = kwargs.get("color_res", dai.ColorCameraProperties.SensorResolution.THE_1200_P)
    isp_scale = kwargs.get("isp_scale", (2, 3))
    stereo_pair = kwargs.get("stereo_pair", StereoPair.LR)
    pipeline = dai.Pipeline()

    cam_rgb = pipeline.create(dai.node.ColorCamera)
    detection_network = (
        create_stereo(pipeline, cam_rgb=cam_rgb, **kwargs)
        if kwargs.get("spatial", False)
        else pipeline.create(dai.node.YoloDetectionNetwork)
    )

    xout_image = pipeline.create(dai.node.XLinkOut)
    nn_out = pipeline.create(dai.node.XLinkOut)

    xout_image.setStreamName("image")
    nn_out.setStreamName("nn")

    cam_rgb.setPreviewSize(nn_config.nn_width, nn_config.nn_height)
    cam_rgb.setBoardSocket(cam_left_socket[stereo_pair])
    cam_rgb.setResolution(color_res)
    cam_rgb.setIspScale(isp_scale)
    cam_rgb.setInterleaved(False)
    cam_rgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR)
    cam_rgb.setFps(kwargs.get("fps", 30))
    cam_rgb.setPreviewKeepAspectRatio(not kwargs.get("fullFov", False))

    detection_network.setConfidenceThreshold(nn_config.NN_specific_metadata.confidence_threshold)
    detection_network.setNumClasses(nn_config.NN_specific_metadata.classes)
    detection_network.setCoordinateSize(nn_config.NN_specific_metadata.coordinates)
    detection_network.setAnchors(nn_config.NN_specific_metadata.anchors)
    detection_network.setAnchorMasks(nn_config.NN_specific_metadata.anchor_masks)
    detection_network.setIouThreshold(nn_config.NN_specific_metadata.iou_threshold)
    detection_network.setBlob(model_data)
    detection_network.input.setBlocking(False)
    detection_network.input.setQueueSize(1)

    cam_rgb.preview.link(detection_network.input)
    if kwargs.get("syncNN", False):
        detection_network.passthrough.link(xout_image.input)
    elif kwargs.get("high_res", False):
        cam_rgb.video.link(xout_image.input)
    else:
        cam_rgb.preview.link(xout_image.input)
    detection_network.out.link(nn_out.input)

    return pipeline


def create_stereo(pipeline, **kwargs):
    color_res = kwargs.get("color_res", dai.ColorCameraProperties.SensorResolution.THE_1200_P)
    isp_scale = kwargs.get("isp_scale", (2, 3))
    stereo_pair = kwargs.get("stereo_pair", StereoPair.LR)
    cam_left = kwargs["cam_left"]

    cam_right = pipeline.create(dai.node.ColorCamera)
    cam_right.setBoardSocket(cam_right_socket[stereo_pair])
    cam_right.setResolution(color_res)
    cam_right.setIspScale(isp_scale)
    cam_right.setFps(kwargs.get("fps", 30))

    stereo = pipeline.createStereoDepth()
    stereo.initialConfig.setConfidenceThreshold(245)
    stereo.setLeftRightCheck(kwargs.get("lr_check", False))
    stereo.setExtendedDisparity(kwargs.get("extended_disparity", False))
    stereo.setSubpixel(kwargs.get("subpixel", False))
    stereo.setDepthAlign(cam_left.getBoardSocket())

    detection_network: Union[dai.node.NeuralNetwork, dai.node.YoloSpatialDetectionNetwork] = pipeline.create(
        dai.node.YoloSpatialDetectionNetwork
    )
    detection_network.setDepthLowerThreshold(100)  # mm
    detection_network.setDepthUpperThreshold(10_000)  # mm
    detection_network.setBoundingBoxScaleFactor(0.3)

    cam_left.isp.link(stereo.left)
    cam_right.isp.link(stereo.right)
    stereo.depth.link(detection_network.inputDepth)

    return detection_network

用法

这个脚本可以通过调用create_pipeline函数来创建一个 Pipeline 对象。函数接受以下参数:

  • model_data:神经网络模型的数据文件路径。
  • config_data:神经网络模型的配置文件路径。
  • spatial:一个布尔值,指示是否使用空间目标检测网络。默认为False
  • color_res:彩色摄像头的分辨率。默认为dai.ColorCameraProperties.SensorResolution.THE_1200_P
  • isp_scale:图像信号处理(ISP)的缩放因子。默认为(2, 3)
  • stereo_pair:立体相机对的类型。默认为StereoPair.LR
  • fps:摄像头的帧率。默认为 30。
  • fullFov:一个布尔值,指示是否保持全视场比例。默认为False
  • syncNN:一个布尔值,指示是否同步神经网络推理。默认为False
  • high_res:一个布尔值,指示是否使用高分辨率预览。默认为False

示例

下面是一个使用示例:

import depthai as dai
from create_pipeline_for_lr import create_pipeline, StereoPair

# 指定模型文件和配置文件路径
model_data = "path/to/model.blob"
config_data = "path/to/config.json"

# 创建 Pipeline
pipeline = create_pipeline(model_data=model_data, config_data=config_data, spatial=True, stereo_pair=StereoPair.LR)

# 创建 Device
with dai.Device(pipeline) as device:
    # 启动 Pipeline
    device.startPipeline()

    # 获取输出流
    image_queue = device.getOutputQueue(name="image", maxSize=4, blocking=False)
    nn_queue = device.getOutputQueue(name="nn", maxSize=4, blocking=False)

    while True:
        # 从输出队列中获取数据
        image_data = image_queue.get()
        nn_data = nn_queue.get()

        # 处理数据...

常见问题解答

Q: 该脚本支持哪些深度摄像头设备?

A: 该脚本使用 depthai 库,支持 OAK-D-LR 以及使用 AR0234 相机模组作为左右相机的 OAK-FFC。

Q: 如何指定自定义的模型和配置文件?

A: 在调用create_pipeline函数时,通过model_dataconfig_data参数指定模型和配置文件的路径。

Q: 是否支持其他类型的神经网络模型?

A: 目前这个脚本支持 YoloDetectionNetwork 和 YoloSpatialDetectionNetwork 两种类型的神经网络模型。