第二十二章 LaneAF框架结构以及接入MMDetection3D模型(车道线感知)

一 前言

近期参与到了手写AI的车道线检测的学习中去,以此系列笔记记录学习与思考的全过程。车道线检测系列会持续更新,力求完整精炼,引人启示。所需前期知识,可以结合手写AI进行系统的学习。

二 LaneAF接入openlane数据集

在这里插入图片描述

2.1 LaneAF文件结构

|-- README.md
|-- datasets  # 数据处理,返回img, seg, mask, af
|   |-- culane.py
|   |-- llamas.py
|   |-- openlane.py
|   |-- transforms.py
|   `-- tusimple.py
|-- infer_culane.py  # 推理
|-- infer_llamas.py
|-- infer_openlane.py
|-- infer_tusimple.py
|-- models  # banckbone
|   |-- dla
|   |   `-- pose_dla_dcn.py
|   |-- enet
|   |   |-- ASNeck.py
|   |   |-- ENet.py
|   |   |-- InitialBlock.py
|   |   |-- RDDNeck.py
|   |   |-- UBNeck.py
|   |-- erfnet
|   |   |-- __init__.py
|   |   |-- erfnet.py
|   |   `-- erfnet_encoder_pretrained.pth.tar
|   `-- loss.py
|-- train_culane.py  # 训练
|-- train_llamas.py
|-- train_openlane.py
|-- train_tusimple.py
|-- utils  # 工具
|   |-- __init__.py
|   |-- affinity_fields.py
|   |-- metrics.py
|   `-- visualize.py

2.2 代码分析

2.2.1 openlane.py

      
import os
import os.path as ops
import shutil
import glob
import json
import sys
sys.path.append("../")

env_path = os.path.join(os.path.dirname(__file__), '..')
if env_path not in sys.path:
    sys.path.append(env_path)

import argparse

import cv2
import numpy as np
from scipy.interpolate import CubicSpline
from PIL import Image

import torch
from torch.utils.data import Dataset
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

from utils.affinity_fields import generateAFs
import datasets.transforms as tf


class OpenLane(Dataset):
    def __init__(self, images_dir, json_file_dir, random_transforms=False):
        super().__init__()

        self.input_size = (352, 640)
        self.output_scale = 0.25
        self.images_dir = images_dir
        self.json_file_dir = json_file_dir
        self.random_transforms = random_transforms
        self.samples = self.init_dataset_2d(json_file_dir)  # 返回一定量的json文件,根据数据类型不同取到的数据也不同
        
        self.mean = [0.485, 0.456, 0.406]
        self.std = [0.229, 0.224, 0.225]
        self.ignore_label = 255
        
        if self.random_transforms:  # 是否需要数据增强
            self.transforms = transforms.Compose([
                tf.GroupRandomScale(size=(0.5, 0.6), interpolation=(cv2.INTER_LINEAR, cv2.INTER_NEAREST)),
                tf.GroupRandomCropRatio(size=(self.input_size[1], self.input_size[0])),
                tf.GroupRandomHorizontalFlip(),
                tf.GroupRandomRotation(degree=(-1, 1), interpolation=(cv2.INTER_LINEAR, cv2.INTER_NEAREST), 
                                       padding=(self.mean, (self.ignore_label, ))),
                tf.GroupNormalize(mean=(self.mean, (0, )), std=(self.std, (1, )))
            ])
        else:
            self.transforms = transforms.Compose([
                tf.GroupRandomScale(size=(0.5, 0.5), interpolation=(cv2.INTER_LINEAR, cv2.INTER_NEAREST)),
                tf.GroupNormalize(mean=(self.mean, (0, )), std=(self.std, (1, )))
            ])

    def __len__(self):
        return len(self.samples)
    
    def get_seg_mask(self, gt_lanes):
        mask_seg = np.zeros((1280, 1920, 3))
        ins_id = 0
        for id, line in enumerate(gt_lanes):
            coords = line
            ins_id += 1
            # category = gt_category[id]
            # gt_visibility = gt_visibility[id]

            if len(coords) < 2:
                continue
            last_poi_x = 0
            last_poi_y = 0

            for num, pos_pixel in enumerate(coords):
                u, v = pos_pixel[0], pos_pixel[1]

                if num == 0:
                    last_poi_x = int(u)
                    last_poi_y = int(v)
                #draw thickness is attention
                cv2.line(mask_seg, (last_poi_x, last_poi_y),
                            (int(u), int(v)), color=(ins_id, ins_id, ins_id),
                            thickness=1)
                last_poi_x = int(u)
                last_poi_y = int(v)

        return mask_seg

    def __getitem__(self, idx):
        img, seg, mask, af = self.get_data_info(idx, debug=False)
        return img, seg, mask, af
                                                 
    def get_data_info(self, index, debug=True):
        """
        获取数据信息(图片、分割、掩膜、AFs)。
        :param index: 数据集的索引。
        :param debug: 调试模式标志。默认为True。
        :return: 返回处理后的图像、分割、掩膜和AFs。
        """
        
        # 读取json文件路径
        label_json = self.samples[index]
        label_file_path = ops.join(self.json_file_dir, label_json)
        
        # 从json文件中加载信息
        with open(label_file_path, 'r') as f:
            info_dict = json.loads(f.read())
            
        # 构建图像路径,并检查其存在性
        image_path = ops.join(self.images_dir, info_dict['file_path'])
        assert ops.exists(image_path), f'{image_path} is not exist.'
        
        # 读取图像,并将其从BGR转为RGB
        img = cv2.imread(image_path).astype(np.float32) / 255.  
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        # 获取车道信息,并转换其结构
        gt_lanes_packeds = info_dict['lane_lines']
        gt_lanes = []
        for j, gt_lanes_packeds in enumerate(gt_lanes_packeds):
            lane = np.array(gt_lanes_packeds['uv'])
            lane = lane.T
            gt_lanes.append(lane)
            
        # 使用车道信息生成分割掩膜
        seg = self.get_seg_mask(gt_lanes)
        img, seg = self.transforms((img, seg))
        
        # 调整分割大小
        seg = cv2.resize(seg, None, fx=self.output_scale, fy=self.output_scale, interpolation=cv2.INTER_NEAREST)
        # (352, 640, 3)
        
        # 生成二进制掩膜
        mask = seg[..., 0].copy()
        mask[seg[..., 0] >= 1] = 1
        mask[seg[..., 0] == self.ignore_label] = self.ignore_label
        
        # 生成 AFs
        seg_wo_ignore = seg[..., 0].copy()
        seg_wo_ignore[seg_wo_ignore == self.ignore_label] = 0
        vaf, haf = generateAFs(seg_wo_ignore.astype(np.longlong), viz=True)  # (88, 160, 2) (88, 160, 1)
        af = np.concatenate((vaf, haf[..., 0: 1]), axis=2)
        
        # 转换所有数据为Tensor
        img = torch.from_numpy(img).permute(2, 0, 1).contiguous().float()
        seg = torch.from_numpy(seg[..., 0]).contiguous().long().unsqueeze(0)
        mask = torch.from_numpy(mask).contiguous().float().unsqueeze(0)
        af = torch.from_numpy(af).permute(2, 0, 1).contiguous().float()
        
        return img, seg, mask, af

    def init_dataset_2d(self, json_file_dir):
        filter_samples = []
        samples = glob.glob(json_file_dir + '**/**/*.json', recursive=True)
        for i, sample in enumerate(samples):  
        # '/workspace/src/openlane_all/lane3d_300/training/segment-10793018113277660068_2714_540_2734_540_with_camera_labels/150698213937291300.json'
            label_file_path = ops.join(self.json_file_dir, sample)  
            # '/workspace/src/openlane_all/lane3d_300/training/segment-10793018113277660068_2714_540_2734_540_with_camera_labels/150698213937291300.json'
            with open(label_file_path, 'r') as f:
                info_dict = json.loads(f.read())
            image_path = ops.join(self.images_dir, info_dict['file_path'])
            if not ops.exists(image_path):
                continue
            
            filter_samples.append(sample)
            if len(filter_samples) > 10000:  # 先取800个看看效果
                break
            
        return filter_samples
    
if __name__ == '__main__':
    images_dir = '/workspace/src/openlane_all/images/'
    json_file_dir = '/workspace/src/openlane_all/lane3d_300/training/'  # 文件的最后要加/ 不然获取不到

    dataset = OpenLane(images_dir, json_file_dir)
    
    train_datalorder = DataLoader(dataset, batch_size=3, shuffle=True)
    # mean = [0.485, 0.456, 0.406] #[103.939, 116.779, 123.68]
    # std = [0.229, 0.224, 0.225] #[1, 1, 1]
    for batch_img, batch_seg, batch_mask, batch_af in train_datalorder:
        print(batch_img.shape, batch_seg.shape, batch_mask.shape, batch_af.shape)
        img = batch_img[0].permute(1,2,0)
        seg = batch_seg[0].permute(1,2,0)
        cv2.imwrite("/workspace/image.jpg", img.numpy()*255)
        cv2.imwrite("/workspace/seg.jpg", seg.numpy() * 100)

类,函数与方法的调用关系:
在这里插入图片描述

2.2.2 ENet与erfnet网络

ENet与ERFNet是轻量化的神经网络,因为我们的重点是学习体会LaneAF亲和力场的思想。下面是以下学习连接,可以自行查阅。

https://zhuanlan.zhihu.com/p/220337109
https://zhuanlan.zhihu.com/p/137640227
https://zhuanlan.zhihu.com/p/57720412
https://blog.csdn.net/baidu_41902768/article/details/103541094

2.2.3 affinity_fields.py

      
import cv2
import numpy as np
import matplotlib.pyplot as plt

def generateAFs(label, viz=True):
    # 创建透视场数组
    num_lanes = np.amax(label)  # 获取车道线的数量
    VAF = np.zeros((label.shape[0], label.shape[1], 2))  # 垂直透视场
    HAF = np.zeros((label.shape[0], label.shape[1], 1))  # 水平透视场

    # 对每条车道线进行循环处理
    for l in range(1, num_lanes+1):
        # 初始化先前的行和列值
        prev_cols = np.array([], dtype=np.int64)
        prev_row = label.shape[0]

        # 从下到上解析每一行
        for row in range(label.shape[0]-1, -1, -1):
            # [0] :np.where 返回一个元组,其每一维都是一个数组,表示该维度上满足条件的索引。
            # 在这里,我们只关心列索引,所以我们取出这个元组的第一个元素
            cols = np.where(label[row, :] == l)[0]  # 获取当前行的前景列值(即车道线位置)

            # 为每个列值生成水平方向向量
            for c in cols:
                if c < np.mean(cols):
                    HAF[row, c, 0] = 1.0  # 向右指示
                elif c > np.mean(cols):
                    HAF[row, c, 0] = -1.0  # 向左指示
                else:
                    HAF[row, c, 0] = 0.0  # 保持不变
                    
            # 检查先前的列和当前的列是否都非空
            if prev_cols.size == 0:  # 如果没有先前的行/列,更新并继续
                prev_cols = cols
                prev_row = row
                continue
            if cols.size == 0:  # 如果当前没有列,继续
                continue
            col = np.mean(cols)  # 计算列的均值

            # 为先前的列生成垂直方向向量
            for c in prev_cols:
                # 计算方向向量的位置
                vec = np.array([col - c, row - prev_row], dtype=np.float32)
                # 单位标准化
                vec = vec / np.linalg.norm(vec)  # 标准化为单位向量  # 模
                VAF[prev_row, c, 0] = vec[0]
                VAF[prev_row, c, 1] = vec[1]  # 具有像两方向的增值

            # 使用当前的行和列值更新先前的行和列值
            prev_cols = cols
            prev_row = row

    if viz: # visualization
        """
        用于可视化向量场(VAF and HAF)的代码块。
        VAF(Vertical Attraction Field) 和 HAF(Horizontal Attraction Field) 
        是两种描述图像上对象形状和结构信息的向量场。
        这里的代码使用matplotlib的quiver函数来绘制这些向量场。
        """
        
        # 定义一个下采样率,该值表示每多少像素显示一个向量
        down_rate = 1 
        # 初始化一个1x2的子图结构,左边显示VAF,右边显示HAF
        fig, (ax1, ax2) = plt.subplots(1, 2)
        # fig, ax1 = plt.subplots(1, 1)
        
        # 使用quiver函数在每个点上画一个箭头,表示VAF中的向量
        # np.arange创建一个等差数组,用于确定每个箭头的x和y坐标。
        # VAF[::down_rate, ::down_rate, 0] 和 VAF[::down_rate, ::down_rate, 1] 表示箭头的X和Y方向的大小。
        # 使用负号(-)是为了翻转箭头的方向,因为图像坐标和传统坐标系在Y轴方向上是相反的。
        q = ax1.quiver(np.arange(0, label.shape[1], down_rate), 
                    -np.arange(0, label.shape[0], down_rate), 
                    VAF[::down_rate, ::down_rate, 0], 
                    -VAF[::down_rate, ::down_rate, 1], scale=120)
       
        # 可视化HAF
        # 同上,但这次是为了HAF。
        q = ax2.quiver(np.arange(0, label.shape[1], down_rate), 
                    -np.arange(0, label.shape[0], down_rate), 
                    HAF[::down_rate, ::down_rate, 0], 
                    -HAF[::down_rate, ::down_rate, 0], scale=120)

        # 显示图像
        plt.show()

    return VAF, HAF

def decodeAFs(BW, VAF, HAF, fg_thresh=128, err_thresh=5, viz=True):
    # 初始化输出数组为0,其大小与 BW 相同  # BW分割
    output = np.zeros_like(BW, dtype=np.uint8)
    # 用于存储每条车道的末端点的列表
    lane_end_pts = [] 
    # 定义下一个可用的车道ID
    next_lane_id = 1
    
    if viz:
        im_color = cv2.applyColorMap(BW, cv2.COLORMAP_JET)
        cv2.imshow('BW', im_color)
        ret = cv2.waitKey(0)
  
    # 从最后一行开始解码到第一行
    # 求解每一行的中心点
    for row in range(BW.shape[0]-1, -1, -1):
        # 获取当前行中的前景像素列,即车道线像素
        cols = np.where(BW[row, :] > fg_thresh)[0]
        # 初始化簇/集群
        clusters = [[]]     
         
        # 如果存在前景像素,则初始化 prev_col 为第一个前景像素列的位置
        if cols.size > 0:
            prev_col = cols[0]
            
        # 水平地解析像素
        for col in cols:
            # 如果当前列与上一个列之间的差值大于给定的阈值,则视为新的集群开始
            if col - prev_col > err_thresh:
                clusters.append([])  # 新开一个聚类
                clusters[-1].append(col)
                prev_col = col
                continue
            
            # 根据水平透视场(HAF)的值,确定像素点是如何与其它像素相关联的
            if HAF[row, prev_col] >= 0 and HAF[row, col] >= 0: 
                # 继续向右移动
                clusters[-1].append(col)
                prev_col = col
                continue
            elif HAF[row, prev_col] >= 0 and HAF[row, col] < 0: 
                # 找到了车道的中心,处理垂直透视场(VAF)
                clusters[-1].append(col)
                prev_col = col
            elif HAF[row, prev_col] < 0 and HAF[row, col] >= 0: 
                # 找到车道的末端,生成新的车道
                clusters.append([])
                clusters[-1].append(col)
                prev_col = col
                continue
            elif HAF[row, prev_col] < 0 and HAF[row, col] < 0: 
                # 继续向右移动
                clusters[-1].append(col)
                prev_col = col
                continue
            
        # vaf与haf中心点差距
        # 上一行指向的有一个值和本行估计的值进行就差距,在一定范围内则连成一条线
        # 行列嵌套循环  
        # 分配线的lane id
        # 建立每条线与头坐标与当前行聚类点之间的cost矩阵(线头来源于上一行的end_point)
        # cost前向infer做,模型里面不做
        # cost???
        assigned = [False for _ in clusters]
        C = np.Inf*np.ones((len(lane_end_pts), len(clusters)), dtype=np.float64)
        #计算每一个线头坐标点与当前行聚类点之间的dist_error
        for r, pts in enumerate(lane_end_pts): # for each end point in an active lane
            for c, cluster in enumerate(clusters):
                if len(cluster) == 0:
                    continue
                # 计算每一个聚类簇的中心点
                cluster_mean = np.array([[np.mean(cluster), row]], dtype=np.float32)
                # 获取每一个线头的坐标在vaf map上的单位向量
                vafs = np.array([VAF[int(round(x[1])), int(round(x[0])), :] for x in pts], dtype=np.float32)
                vafs = vafs / np.linalg.norm(vafs, axis=1, keepdims=True) 
                # 用计算出来的线头坐标结合vaf计算的单位向量来推算下一行的聚类中心
                pred_points = pts + vafs*np.linalg.norm(pts - cluster_mean, axis=1, keepdims=True)
                # 计算真正的聚类中心与vaf预测的聚类中心之间的error 
                error = np.mean(np.linalg.norm(pred_points - cluster_mean, axis=1))
                # 赋值给线头与当前行的error给cost矩阵
                C[r, c] = error
        # 获取线头点与当前行聚类点在C.shape下的坐标
        row_ind, col_ind = np.unravel_index(np.argsort(C, axis=None), C.shape)
        for r, c in zip(row_ind, col_ind):
            if C[r, c] >= err_thresh:
                break
            if assigned[c]:
                continue
            assigned[c] = True
            # 给当前像素点更新最匹配的lane_id
            output[row, clusters[c]] = r+1
            # 根据当前行匹配好的像素点更新线头坐标列表
            lane_end_pts[r] = np.stack((np.array(clusters[c], dtype=np.float32), row*np.ones_like(clusters[c])), axis=1)
        # 没被分配的线分配新的lane_id
        for c, cluster in enumerate(clusters):
            if len(cluster) == 0:
                continue
            if not assigned[c]:
                output[row, cluster] = next_lane_id
                lane_end_pts.append(np.stack((np.array(cluster, dtype=np.float32), row*np.ones_like(cluster)), axis=1))
                next_lane_id += 1
                
    if viz:
        im_color = cv2.applyColorMap(40*output, cv2.COLORMAP_JET)
        cv2.imshow('Output', im_color)
        ret = cv2.waitKey(0)
        
    return output

代码已做详细注释。

2.2.4 train_openlane.py

      
import os
import io
import json
import shutil
from datetime import datetime
from statistics import mean
import argparse

import numpy as np
import matplotlib
matplotlib.use('Agg')
from matplotlib import pyplot as plt
from sklearn.metrics import accuracy_score, f1_score

import torch
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchsummary import summary
from contextlib import redirect_stdout

# from datasets.tusimple import TuSimple
from datasets.openlane import OpenLane
from models.dla.pose_dla_dcn import get_pose_net
from models.erfnet.erfnet import ERFNet
from models.enet.ENet import ENet
from models.loss import FocalLoss, IoULoss, RegL1Loss

'''
python train_openlane.py --images-dir=/workspace/src/openlane_all/images --json-train-dir /workspace/src/openlane_all/lane3d_300/training/ --json-val-dir /workspace/src/openlane_all/lane3d_300/validation/ --backbone=dla34 --random-transforms
'''
parser = argparse.ArgumentParser('Options for training LaneAF models in PyTorch...')
parser.add_argument('--images-dir', type=str, default="/workspace/src/openlane_all/images", help='path to dataset')
parser.add_argument('--json-train-dir', type=str, default="/workspace/src/openlane_all/lane3d_300/training/", help='path to json')
parser.add_argument('--json-val-dir', type=str, default="/workspace/src/openlane_all/lane3d_300/validation/", help='path to json')
parser.add_argument('--output-dir', type=str, default="./.result/work_train/1019_1300", help='output directory for model and logs')
parser.add_argument('--backbone', type=str, default='erfnet', help='type of model backbone (dla34/erfnet/enet)')
parser.add_argument('--snapshot', type=str, default=None, help='path to pre-trained model snapshot')
parser.add_argument('--batch-size', type=int, default=16, metavar='N', help='batch size for training')
parser.add_argument('--epochs', type=int, default=30, metavar='N', help='number of epochs to train for')
parser.add_argument('--learning-rate', type=float, default=1e-4, metavar='LR', help='learning rate')
parser.add_argument('--weight-decay', type=float, default=1e-3, metavar='WD', help='weight decay')
parser.add_argument('--loss-type', type=str, default='wbce', help='type of classification loss to use (focal/bce/wbce)')
parser.add_argument('--log-schedule', type=int, default=10, metavar='N', help='number of iterations to print/save log after')
parser.add_argument('--seed', type=int, default=1, help='set seed to some constant value to reproduce experiments')
parser.add_argument('--no-cuda', action='store_true', default=False, help='do not use cuda for training')
parser.add_argument('--random-transforms', action='store_true', default=True, help='apply random transforms to input during training')

# 解析命令行参数
args = parser.parse_args()

# 检查是否提供了数据集的路径
if args.images_dir is None:
    assert False, '未提供数据集的路径!'

# 设置是否使用CUDA(依赖于系统是否支持CUDA以及是否设置了no_cuda标志)
args.cuda = not args.no_cuda and torch.cuda.is_available()

# 如果没有提供输出目录,则创建一个默认的输出目录,该目录的名称为当前的日期和时间
if args.output_dir is None:
    args.output_dir = datetime.now().strftime("%Y-%m-%d-%H:%M")
    args.output_dir = os.path.join('.', 'experiments', 'Openlane', args.output_dir)

# 将backbone参数转换为小写,并检查其是否属于给定的预期值之一
args.backbone = args.backbone.lower()
if args.backbone not in ['dla34', 'erfnet', 'enet']:
    assert False, '提供的模型backbone不正确!'
  
# 检查输出目录是否存在
if os.path.exists(args.output_dir):
    # 删除已存在的输出目录及其内容
    shutil.rmtree(args.output_dir)
# 创建输出目录
os.makedirs(args.output_dir)

# 使用open()函数打开一个文件,该文件的路径是由参数指定的输出目录和'config.json'文件名组合而成
# 'w'表示该文件将以写入模式打开
with open(os.path.join(args.output_dir, 'config.json'), 'w') as f:
    # 使用json库的dump()函数将参数args的所有变量(属性)转储(序列化)为JSON格式并写入之前打开的文件f中
    # vars(args)会返回args对象的属性和属性值的字典形式
    json.dump(vars(args), f)

# 设置随机种子以保证实验的可重复性
# 使用Python内置的random模块,你可以使用random.seed(value)来设置种子。
# 如果使用numpy库生成随机数,可以使用numpy.random.seed(value)来设定种子。
torch.manual_seed(args.seed)
if args.cuda:
    torch.cuda.manual_seed(args.seed)


kwargs = {'batch_size': args.batch_size, 'shuffle': True, 'num_workers': 0}
train_loader = DataLoader(OpenLane(args.images_dir, args.json_train_dir, args.random_transforms), **kwargs)
# openlane取得一定量的数据,并做了数据增强
kwargs = {'batch_size': args.batch_size, 'shuffle': False, 'num_workers': 0}  
val_loader = DataLoader(OpenLane(args.images_dir, args.json_val_dir, False), **kwargs)  # 数据增强做了一部分

# 全局变量,用于存储所有训练周期中的最佳验证F1分数
best_f1 = 0.0
# 创建文件句柄
f_log = open(os.path.join(args.output_dir, "logs.txt"), "w")  # name './experiments/Openlane/2023-10-19-03:12/logs.txt'


def train(net, epoch):
    # 初始化用于记录本epoch损失和评价指标的列表
    epoch_loss_seg, epoch_loss_vaf, epoch_loss_haf, epoch_loss, epoch_acc, epoch_f1 = list(), list(), list(), list(), list(), list()

    # 设置模型为训练模式
    net.train()

    # 遍历训练数据加载器中的所有样本
    for b_idx, sample in enumerate(train_loader):
        # 从样本中提取图像、分割图、掩码和自动聚焦图
        input_img, input_seg, input_mask, input_af = sample
        # 如果使用CUDA,将数据移动到GPU上
        if args.cuda:
            input_img = input_img.cuda()
            input_seg = input_seg.cuda()
            input_mask = input_mask.cuda()
            input_af = input_af.cuda()

        # 清除优化器中的所有累积渐变
        optimizer.zero_grad()

        # 前向传递,获取模型输出
        outputs = net(input_img)[-1]

        # 创建一个_mask,其值为True(或1.0)表示该位置不应被忽略,False(或0.0)表示应被忽略。
        # 这是基于train_loader中的数据集属性ignore_label来确定的。
        # 如果input_mask中的某个值等于ignore_label,_mask在相应的位置上将为0,否则为1。
        _mask = (input_mask != train_loader.dataset.ignore_label).float()

        # 计算分割损失(loss_seg):
        # 1. 首先,使用_mask来遮盖outputs['hm']和input_mask中应该被忽略的位置。
        # 2. 然后,使用criterion_1(可能是某种损失函数,如MSE或CrossEntropy)来计算两者之间的差异。
        # 3. 对outputs['hm']应用sigmoid激活函数,并与原始的input_mask进行比较以使用criterion_2(又是某种损失函数)计算第二部分的损失。
        # 最后,两部分的损失被加在一起得到总的分割损失。
        loss_seg = criterion_1(outputs['hm']*_mask, input_mask*_mask) + criterion_2(torch.sigmoid(outputs['hm']), input_mask)

        # 计算VAF(速度自动对焦)的损失。
        # 使用criterion_reg(可能是一个回归损失函数)来计算输出的VAF(outputs['vaf'])与真实的VAF(input_af[:, :2, :, :])之间的差异。
        # 最后,损失乘以0.5。
        loss_vaf = 0.5*criterion_reg(outputs['vaf'], input_af[:, :2, :, :], input_mask)

        # 计算HAF(高度自动对焦)的损失。
        # 使用criterion_reg来计算输出的HAF(outputs['haf'])与真实的HAF(input_af[:, 2:3, :, :])之间的差异。
        # 同样,损失乘以0.5。
        loss_haf = 0.5*criterion_reg(outputs['haf'], input_af[:, 2:3, :, :], input_mask)

        # 将模型输出的'hm'(热图)通过sigmoid激活函数进行处理,使其值域在0到1之间。
        # 使用detach()确保此变量不会被跟踪计算图,这意味着它不会保存任何关于模型操作的历史。
        # 使用cpu()将其转移到CPU(如果之前在GPU上),然后将其从PyTorch张量转换为NumPy数组。
        # 最后,使用ravel()将多维数组转换为一维数组。
        pred = torch.sigmoid(outputs['hm']).detach().cpu().numpy().ravel()

        # 为input_mask执行相同的操作,将其转化为一维NumPy数组。
        # 这样做的目的是使'pred'和'target'的维度相同,以便之后计算准确性或其他度量标准。
        target = input_mask.detach().cpu().numpy().ravel()

        # 对忽略的标签进行处理
        pred[target == train_loader.dataset.ignore_label] = 0
        target[target == train_loader.dataset.ignore_label] = 0

        # 计算训练数据的准确度。
        # 通过对'target'和'pred'使用阈值0.5来将它们转换为二进制类标签(0或1)。
        # 例如,如果模型的预测(pred)值大于0.5,那么它将被认为是类1,否则为类0。
        # 使用accuracy_score函数计算这些二进制标签之间的准确度。
        train_acc = accuracy_score((target > 0.5).astype(np.int64), (pred > 0.5).astype(np.int64))

        # 计算训练数据的F1得分。
        # F1得分是精确度和召回率的调和平均值,它为我们提供了一个统一的度量,表示模型的性能。
        # zero_division参数确保当分母为零时(即真正和假正的总数为零时)不会发生除法错误,而是返回1。
        train_f1 = f1_score((target > 0.5).astype(np.int64), (pred > 0.5).astype(np.int64), zero_division=1)

        # 将损失和评价指标添加到记录列表
        epoch_loss_seg.append(loss_seg.item())
        epoch_loss_vaf.append(loss_vaf.item())
        epoch_loss_haf.append(loss_haf.item())
        loss = loss_seg + loss_vaf + loss_haf
        epoch_loss.append(loss.item())
        epoch_acc.append(train_acc)
        epoch_f1.append(train_f1)

        # 反向传播和参数更新
        loss.backward()
        optimizer.step()

        # 每隔args.log_schedule次批次,将当前的训练统计信息打印到控制台和日志文件中。
        # 这样可以在训练过程中定期监视模型的进展,而不必等待整个epoch完成。

        # 检查当前批次的索引是否是args.log_schedule的倍数。
        if b_idx % args.log_schedule == 0:
            # 计算到目前为止已经处理的样本数占整个数据集的百分比。
            percent_complete = 100. * (b_idx+1) * args.batch_size / len(train_loader.dataset)
            
            # 格式化并打印当前epoch、处理的样本数、总体进度百分比、当前的损失和F1得分。
            print('Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\tF1-score: {:.4f}'.format(
                epoch, (b_idx+1) * args.batch_size, len(train_loader.dataset), percent_complete, loss.item(), train_f1))
            
            # 将同样的信息写入日志文件。
            f_log.write('Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}\tF1-score: {:.4f}\n'.format(
                epoch, (b_idx+1) * args.batch_size, len(train_loader.dataset), percent_complete, loss.item(), train_f1))

    # 调整学习率
    scheduler.step()

    # 在完成epoch后,计算平均损失和评价指标并记录日志
    avg_loss_seg = mean(epoch_loss_seg)
    avg_loss_vaf = mean(epoch_loss_vaf)
    avg_loss_haf = mean(epoch_loss_haf)
    avg_loss = mean(epoch_loss)
    avg_acc = mean(epoch_acc)
    avg_f1 = mean(epoch_f1)

    # 打印并写入日志文件
    # ... (省略打印和写入日志的代码,因为它们都很相似)
    print("\n------------------------ Training metrics ------------------------")
    f_log.write("\n------------------------ Training metrics ------------------------\n")
    print("Average segmentation loss for epoch = {:.2f}".format(avg_loss_seg))
    f_log.write("Average segmentation loss for epoch = {:.2f}\n".format(avg_loss_seg))
    print("Average VAF loss for epoch = {:.2f}".format(avg_loss_vaf))
    f_log.write("Average VAF loss for epoch = {:.2f}\n".format(avg_loss_vaf))
    print("Average HAF loss for epoch = {:.2f}".format(avg_loss_haf))
    f_log.write("Average HAF loss for epoch = {:.2f}\n".format(avg_loss_haf))
    print("Average loss for epoch = {:.2f}".format(avg_loss))
    f_log.write("Average loss for epoch = {:.2f}\n".format(avg_loss))
    print("Average accuracy for epoch = {:.4f}".format(avg_acc))
    f_log.write("Average accuracy for epoch = {:.4f}\n".format(avg_acc))
    print("Average F1 score for epoch = {:.4f}".format(avg_f1))
    f_log.write("Average F1 score for epoch = {:.4f}\n".format(avg_f1))
    print("------------------------------------------------------------------\n")
    f_log.write("------------------------------------------------------------------\n\n")

    # 返回模型、损失和评价指标
    return net, avg_loss_seg, avg_loss_vaf, avg_loss_haf, avg_loss, avg_acc, avg_f1

# validation function
def val(net, epoch):
    global best_f1
    epoch_loss_seg, epoch_loss_vaf, epoch_loss_haf, epoch_loss, epoch_acc, epoch_f1 = list(), list(), list(), list(), list(), list()
    net.eval()
    
    for b_idx, sample in enumerate(val_loader):
        input_img, input_seg, input_mask, input_af = sample
        if args.cuda:
            input_img = input_img.cuda()
            input_seg = input_seg.cuda()
            input_mask = input_mask.cuda()
            input_af = input_af.cuda()

        # do the forward pass
        outputs = net(input_img)[-1]

        # calculate losses and metrics
        _mask = (input_mask != val_loader.dataset.ignore_label).float()
        loss_seg = criterion_1(outputs['hm'], input_mask) + criterion_2(torch.sigmoid(outputs['hm']), input_mask)
        loss_vaf = 0.5*criterion_reg(outputs['vaf'], input_af[:, :2, :, :], input_mask)
        loss_haf = 0.5*criterion_reg(outputs['haf'], input_af[:, 2:3, :, :], input_mask)
        pred = torch.sigmoid(outputs['hm']).detach().cpu().numpy().ravel()
        target = input_mask.detach().cpu().numpy().ravel()
        pred[target == val_loader.dataset.ignore_label] = 0
        target[target == val_loader.dataset.ignore_label] = 0
        val_acc = accuracy_score((target > 0.5).astype(np.int64), (pred > 0.5).astype(np.int64))
        val_f1 = f1_score((target > 0.5).astype(np.int64), (pred > 0.5).astype(np.int64), zero_division=1)

        epoch_loss_seg.append(loss_seg.item())
        epoch_loss_vaf.append(loss_vaf.item())
        epoch_loss_haf.append(loss_haf.item())
        loss = loss_seg + loss_vaf + loss_haf
        epoch_loss.append(loss.item())
        epoch_acc.append(val_acc)
        epoch_f1.append(val_f1)

        print('Done with image {} out of {}...'.format(min(args.batch_size*(b_idx+1), len(val_loader.dataset)), len(val_loader.dataset)))

    # now that the epoch is completed calculate statistics and store logs
    avg_loss_seg = mean(epoch_loss_seg)
    avg_loss_vaf = mean(epoch_loss_vaf)
    avg_loss_haf = mean(epoch_loss_haf)
    avg_loss = mean(epoch_loss)
    avg_acc = mean(epoch_acc)
    avg_f1 = mean(epoch_f1)
    print("\n------------------------ Validation metrics ------------------------")
    f_log.write("\n------------------------ Validation metrics ------------------------\n")
    print("Average segmentation loss for epoch = {:.2f}".format(avg_loss_seg))
    f_log.write("Average segmentation loss for epoch = {:.2f}\n".format(avg_loss_seg))
    print("Average VAF loss for epoch = {:.2f}".format(avg_loss_vaf))
    f_log.write("Average VAF loss for epoch = {:.2f}\n".format(avg_loss_vaf))
    print("Average HAF loss for epoch = {:.2f}".format(avg_loss_haf))
    f_log.write("Average HAF loss for epoch = {:.2f}\n".format(avg_loss_haf))
    print("Average loss for epoch = {:.2f}".format(avg_loss))
    f_log.write("Average loss for epoch = {:.2f}\n".format(avg_loss))
    print("Average accuracy for epoch = {:.4f}".format(avg_acc))
    f_log.write("Average accuracy for epoch = {:.4f}\n".format(avg_acc))
    print("Average F1 score for epoch = {:.4f}".format(avg_f1))
    f_log.write("Average F1 score for epoch = {:.4f}\n".format(avg_f1))
    print("--------------------------------------------------------------------\n")
    f_log.write("--------------------------------------------------------------------\n\n")

    # now save the model if it has a better F1 score than the best model seen so forward
    if avg_f1 > best_f1:
        # save the model
        torch.save(model.state_dict(), os.path.join(args.output_dir, 'net_' + '%.4d' % (epoch,) + '.pth'))
        best_f1 = avg_f1

    return avg_loss_seg, avg_loss_vaf, avg_loss_haf, avg_loss, avg_acc, avg_f1

if __name__ == "__main__":
    # 定义头部的输出尺寸: 'hm' 是热图, 'vaf' 是一个两通道的向量场, 'haf' 是另一个热图
    heads = {'hm': 1, 'vaf': 2, 'haf': 1}

    # 根据传入参数选择不同的网络骨干
    if args.backbone == 'dla34':
        model = get_pose_net(num_layers=34, heads=heads, head_conv=256, down_ratio=4)
    elif args.backbone == 'erfnet':
        model = ERFNet(heads=heads)
    elif args.backbone == 'enet':
        model = ENet(heads=heads)

    # 如果提供了预训练的快照,从中加载模型的权重
    if args.snapshot is not None:
        model.load_state_dict(torch.load(args.snapshot), strict=True)
    # 如果启用了CUDA,将模型移至GPU
    if args.cuda:
        model.cuda()
    
    # 打印模型结构
    print(model)
    
    # # 创建一个用于保存模型结构的文件夹
    # folder_path = './.model' 
    # os.makedirs(folder_path, exist_ok=True)
    # # 输出模型结构到文件
    # file_path = os.path.join(folder_path, 'model_structure.txt')
    # with open(file_path, 'w') as f:
    #     with redirect_stdout(f):
    #         summary(model, (1, 100), device='cpu')
    
    # 初始化优化器为Adam,并设置学习率和权重衰减
    optimizer = optim.Adam(model.parameters(), lr=args.learning_rate, weight_decay=args.weight_decay)
    # 设置学习率的调度策略,每10个epoch衰减为原来的0.2
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.2)

    # 初始化热图(heatmap)的偏置,以偏向负类
    model.hm[-1].bias.data.uniform_(-4.595, -4.595)  # 更加偏向于预测“背景”类,从而避免过多的假阳性。模型的初始预测会是99%的概率为背景类,只有1%的概率为物体类

    # 根据参数选择不同的损失函数
    # Focal Loss, BCE和加权的BCE用于分类任务,而IoU和L1用于回归任务
    if args.loss_type == 'focal':
    # 改进的交叉熵损失,处理极度不平衡的数据情况
        criterion_1 = FocalLoss(gamma=2.0, alpha=0.25, size_average=True)
    elif args.loss_type == 'bce':
    # 二元交叉熵
        criterion_1 = torch.nn.BCEWithLogitsLoss()
    elif args.loss_type == 'wbce':
    # 是BCE的加权版本,用于处理类别不平衡的情况
        criterion_1 = torch.nn.BCEWithLogitsLoss(pos_weight=torch.tensor([9.6]).cuda())
    
    # IoU损失函数,用于评估检测框的质量
    # 用于度量两个边界框之间的重叠度。常用于目标检测和实例分割任务。
    criterion_2 = IoULoss()
    # L1损失函数,用于回归任务,如关键点检测或边界框回归
    # 度量预测值与真实值之间的绝对差
    criterion_reg = RegL1Loss()

    # 创建第一个图,用于显示训练过程中的各种损失
    fig1, ax1 = plt.subplots()     # 初始化图形和坐标轴
    plt.grid(True)                 # 添加网格线
    # 以下四行为图形添加四条空曲线,预备显示不同的损失,颜色分别为红色、绿色、蓝色和黑色
    ax1.plot([], 'r', label='Training segmentation loss')   
    ax1.plot([], 'g', label='Training VAF loss')
    ax1.plot([], 'b', label='Training HAF loss')
    ax1.plot([], 'k', label='Training total loss')
    ax1.legend()                   # 显示图例
    # 初始化用于存储训练损失数据的列表
    train_loss_seg, train_loss_vaf, train_loss_haf, train_loss = list(), list(), list(), list()

    # 创建第二个图,用于显示验证过程中的各种损失
    fig2, ax2 = plt.subplots()
    plt.grid(True)
    # 以下四行为图形添加四条空曲线,预备显示不同的损失
    ax2.plot([], 'r', label='Validation segmentation loss')
    ax2.plot([], 'g', label='Validation VAF loss')
    ax2.plot([], 'b', label='Validation HAF loss')
    ax2.plot([], 'k', label='Validation total loss')
    ax2.legend()
    # 初始化用于存储验证损失数据的列表
    val_loss_seg, val_loss_vaf, val_loss_haf, val_loss = list(), list(), list(), list()

    # 创建第三个图,用于显示训练和验证过程中的准确率和F1分数
    fig3, ax3 = plt.subplots()
    plt.grid(True)
    # 以下四行为图形添加四条空曲线,预备显示准确率和F1分数
    ax3.plot([], 'r', label='Training accuracy')
    ax3.plot([], 'g', label='Validation accuracy')
    ax3.plot([], 'b', label='Training F1 score')
    ax3.plot([], 'k', label='Validation F1 score')
    ax3.legend()
    # 初始化用于存储准确率和F1分数数据的列表
    train_acc, val_acc, train_f1, val_f1 = list(), list(), list(), list()

    # 开始训练和验证循环,持续到指定的epoch数
    for i in range(1, args.epochs + 1):
        # 训练阶段
        # `train`函数返回经过训练的模型和一系列的平均损失和评估指标
        model, avg_loss_seg, avg_loss_vaf, avg_loss_haf, avg_loss, avg_acc, avg_f1 = train(model, i)
        # 将每个epoch的平均损失和评估指标保存到对应的列表中
        train_loss_seg.append(avg_loss_seg)
        train_loss_vaf.append(avg_loss_vaf)
        train_loss_haf.append(avg_loss_haf)
        train_loss.append(avg_loss)
        train_acc.append(avg_acc)
        train_f1.append(avg_f1)
        # 绘制并保存训练损失图
        ax1.plot(train_loss_seg, 'r')
        ax1.plot(train_loss_vaf, 'g')
        ax1.plot(train_loss_haf, 'b')
        ax1.plot(train_loss, 'k')
        fig1.savefig(os.path.join(args.output_dir, "train_loss.jpg"))

        # 验证阶段
        # `val`函数返回一系列的平均损失和评估指标
        avg_loss_seg, avg_loss_vaf, avg_loss_haf, avg_loss, avg_acc, avg_f1 = val(model, i)
        # 将每个epoch的平均损失和评估指标保存到对应的列表中
        val_loss_seg.append(avg_loss_seg)
        val_loss_vaf.append(avg_loss_vaf)
        val_loss_haf.append(avg_loss_haf)
        val_loss.append(avg_loss)
        val_acc.append(avg_acc)
        val_f1.append(avg_f1)
        # 绘制并保存验证损失图
        ax2.plot(val_loss_seg, 'r')
        ax2.plot(val_loss_vaf, 'g')
        ax2.plot(val_loss_haf, 'b')
        ax2.plot(val_loss, 'k')
        fig2.savefig(os.path.join(args.output_dir, "val_loss.jpg"))

        # 绘制并保存训练和验证的准确率和F1分数图
        ax3.plot(train_acc, 'r')
        ax3.plot(val_acc, 'g')
        ax3.plot(train_f1, 'b')
        ax3.plot(val_f1, 'k')
        fig3.savefig(os.path.join(args.output_dir, 'trainval_acc_f1.jpg'))

    # 关闭所有的图形界面
    plt.close('all')
    # 关闭日志文件
    f_log.close()   

代码逻辑比较清晰,并已做详细注释,故不作解读。

2.2.5 infer_openlane.py

      
import os
import json
from datetime import datetime
from statistics import mean
import argparse

import numpy as np
import cv2
from sklearn.metrics import accuracy_score, f1_score

import torch
from torch.utils.data import DataLoader

from datasets.openlane import OpenLane
# from models.dla.pose_dla_dcn import get_pose_net
from models.erfnet.erfnet import ERFNet
from models.enet.ENet import ENet
from utils.affinity_fields import decodeAFs
from utils.metrics import match_multi_class, LaneEval
from utils.visualize import tensor2image, create_viz


parser = argparse.ArgumentParser('Options for inference with LaneAF models in PyTorch...')
parser.add_argument('--dataset-dir', type=str, default="/workspace/openlane_all/images", help='path to dataset')
parser.add_argument('--json-val-dir', type=str, default="/workspace/openlane_all/lane3d_300/validation/", help='path to json')
parser.add_argument('--output-dir', type=str, default="./.result/work_test/1019_1300", help='output directory for model and logs')
parser.add_argument('--snapshot', type=str, default=None, help='path to pre-trained model snapshot')
parser.add_argument('--split', type=str, default='/workspace/openlane_all/lane3d_300/test', help='dataset split to evaluate on (train/val/test)')
parser.add_argument('--seed', type=int, default=1, help='set seed to some constant value to reproduce experiments')
parser.add_argument('--no-cuda', action='store_true', default=False, help='do not use cuda for training')
parser.add_argument('--save-viz', action='store_true', default=True, help='save visualization depicting intermediate and final results')

args = parser.parse_args()
# check args
if args.dataset_dir is None:
    assert False, 'Path to dataset not provided!'
if args.snapshot is None:
    assert False, 'Model snapshot not provided!'
if args.split is ['train', 'val', 'test']:
    assert False, 'Incorrect dataset split provided!'

# set batch size to 1 for visualization purposes
args.batch_size = 1

# setup args
args.cuda = not args.no_cuda and torch.cuda.is_available()
if args.output_dir is None:
    args.output_dir = datetime.now().strftime("%Y-%m-%d-%H:%M-infer")
    args.output_dir = os.path.join('.', 'experiments', 'openlane', args.output_dir)

if not os.path.exists(args.output_dir):
    os.makedirs(args.output_dir)
else:
    assert False, 'Output directory already exists!'
    

# load args used from training snapshot (if available)
if os.path.exists(os.path.join(os.path.dirname(args.snapshot), 'config.json')):
    with open(os.path.join(os.path.dirname(args.snapshot), 'config.json')) as f:
        json_args = json.load(f)
    # augment infer args with training args for model consistency
    if 'backbone' in json_args.keys():
        args.backbone = json_args['backbone']
    else:
        args.backbone = 'dla34'

# store config in output directory
with open(os.path.join(args.output_dir, 'config.json'), 'w') as f:
    json.dump(vars(args), f)
    
torch.manual_seed(args.seed)
if args.cuda:
    torch.cuda.manual_seed(args.seed)

kwargs = {'batch_size': args.batch_size, 'shuffle': False, 'num_workers': 1}
test_loader = DataLoader(OpenLane(args.dataset_dir, args.split, False), **kwargs)


def test(net):
    net.eval()
    out_vid = None
    
    for index, sample in enumerate(test_loader):
        input_img, input_seg, input_mask, input_af = sample
        if args.cuda:
            input_img = input_img.cuda()
            input_seg = input_seg.cuda()
            input_mask = input_mask.cuda()
            input_af = input_af.cuda()
            
        st_time = datetime.now()
        
        outputs = net(input_img)[-1]
        
        img = tensor2image(input_img.detach(), np.array(test_loader.dataset.mean),
                           np.array(test_loader.dataset.std))
        mask_out = tensor2image(torch.sigmoid(outputs['hm']).repeat(1, 3, 1, 1).detach(),
                np.array([0.0 for _ in range(3)], dtype='float32'), np.array([1.0 for _ in range(3)], dtype='float32'))
        vaf_out = np.transpose(outputs['vaf'][0, ...].detach().cpu().float().numpy(), (1, 2, 0))
        haf_out = np.transpose(outputs['haf'][0, ...].detach().cpu().float().numpy(), (1, 2, 0))
        
        # decode AFs to get lane instances
        seg_out = decodeAFs(mask_out[..., 0], vaf_out, haf_out, fg_thresh=128, err_thresh=5)
        ed_time = datetime.now()
        
        if torch.any(torch.isnan(input_seg)):
            # 如果标签不可以用 跳过
            pass
        else:
            # 如果标签可用 
            # 重新标记车道线id 以适配真值
            seg_out = match_multi_class(seg_out.astype(np.int64), input_seg[0, 0, :, :].detach().cpu().numpy().astype(np.int64))
        
        # create video visualization  创建视频可视化
        if args.save_viz:
            img_out = create_viz(img, seg_out.astype(np.uint8), mask_out, vaf_out, haf_out)
            
            if out_vid is None:
                out_vid = cv2.VideoWriter(os.path.join(args.output_dir, 'out.mp4'),
                        cv2.VideoWriter_fourcc(*'MJPG'), 5, (img_out.shape[1], img_out.shape[0]))
            cv2.imwrite(os.path.join(args.output_dir, str(index).rjust(6, '0')+'.jpg'), img_out)
            out_vid.write(img_out)
            
        print(f'Done with image {min(args.batch_size*(index+1), len(test_loader.dataset))} out of {len(test_loader.dataset)}')
    
    if args.save_viz:
        if out_vid is not None:
            out_vid.release()
        
    return


if __name__ == '__main__':
    
    heads = {'hm': 1, 'vaf': 2, 'haf': 1}
    model = ERFNet(heads=heads)
    model.load_state_dict(torch.load(args.snapshot), strict=True)
    
    if args.cuda:
        model.cuda()
    
    # print(model)
    
    test(model)

2.3 部分代码解析

2.3.1 model.parameters()

model.parameters()是一个函数调用,返回模型中所有需要学习的参数。这个函数通常用于获取模型中的可学习参数,以便在优化器中对这些参数进行优化。
深度学习中,模型通常由多个可学习的参数组成,如权重和偏置。这些参数通过在训练过程中不断调整来拟合训练数据,并使模型能够更好地进行预测。
model.parameters()函数返回一个可迭代对象(通常是一个生成器)或一个包含模型参数的列表。这允许你通过遍历这些参数并将它们传递给优化器进行优化。
以下是使用model.parameters()的示例代码:

model = MyModel()  # 创建模型实例
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)  # 定义优化器并传递模型参数

在上述代码中,model.parameters()返回模型model中的所有可学习参数,并将它们传递给优化器的构造函数,以便优化器可以更新这些参数。

2.3.2 model.hm[-1].bias.data.uniform_(-4.595, -4.595)

这行代码中的 model.hm[-1].bias.data.uniform_(-4.595, -4.595) 是在初始化神经网络模型中的最后一层热图 (heatmap) 的偏置参数。让我们分解这行代码来理解它的具体作用:

  1. model.hm[-1]: 这部分代表着访问 model 的 hm 属性(假设 hm 是模型中的一系列层的一部分),并获取这个序列中的最后一个层。-1 索引在 Python 中常用来引用序列的最后一个元素。
  2. .bias: 在神经网络的层中,.bias 是一个参数,代表着该层的偏置项。在神经网络中,每个神经元的输出通常计算为输入的加权和加上一个偏置值,即 output = weights * inputs + bias。
  3. .data: 这部分是指向张量中实际数值的引用。在 PyTorch 中,data 属性用来访问张量的原始数据,而不会追踪这些数据的计算历史(这与张量的梯度计算有关)。
  4. uniform_(-4.595, -4.595): 这个方法用于以均匀分布的方式初始化张量的值。方法 uniform_(a, b) 会将张量中的每个元素随机初始化为 a 和 b 之间的一个数值。这里的特殊情况是 a 和 b 都被设置为 -4.595,这意味着所有的偏置值将被初始化为 -4.595。这种初始化方式可能基于模型的某些特定需求或经验调整。
    通常,在神经网络中,权重和偏置的初始化对模型的训练和性能有重要影响。特定的初始化值(如这里的 -4.595)可能是基于经验、实验或论文中的建议选定的,旨在优化模型的学习和收敛性能。
    1.4 loss_seg = criterion_1(outputs[‘hm’]_mask, input_mask_mask) + criterion_2(torch.sigmoid(outputs[‘hm’]), input_mask)
    在您的代码中,使用了两种不同的损失函数(BCEWithLogitsLoss 和 IoULoss)来计算损失 (loss_seg),这是深度学习中常见的一种做法,特别是在复杂或者多任务的问题中。让我们分别理解这两个损失函数的作用及其在此处应用的原因:
  5. BCEWithLogitsLoss(二元交叉熵损失): 这个损失函数通常用于二分类问题。在这里,它被用来比较模型输出 (outputs[‘hm’]) 和目标掩码 (input_mask) 之间的差异。_mask 可能是一个用来指定特定区域(例如感兴趣区域或有效区域)的掩码,它用来确保损失计算只考虑这些区域。pos_weight 参数是用来处理类别不平衡问题的,特别是在正负样本数量差异较大时。
  6. IoULoss(交并比损失): 尽管 IoU 损失通常与目标检测的边界框相关联,但它实际上是衡量两个集合的重叠度的一种通用方法。在分割任务中,IoU 可以作为衡量预测分割掩码和真实掩码之间重叠程度的有效指标。在此上下文中,IoULoss 被用来确保模型的预测 (torch.sigmoid(outputs[‘hm’])) 与真实的掩码 (input_mask) 在空间重叠上尽可能相似。
    为什么同时使用两种损失函数?
  • 互补性: BCEWithLogitsLoss 和 IoULoss 关注损失计算的不同方面。前者关注于像素级别上的分类准确性,后者关注于整体的预测准确性和真实标注之间的重叠程度。这种组合使得模型能够在不同的层面上被优化。
  • 性能提升: 在实践中,将不同的损失函数结合使用通常能带来比单独使用任何一种损失函数更好的性能,尤其是在复杂的任务(如分割)中。
  • 任务特性: 在图像分割,尤其是语义分割和实例分割中,既要考虑到每个像素点的分类准确性,也要考虑预测形状与真实标注形状的匹配度。因此,组合使用这两种损失函数可以同时针对这两个方面进行优化。
    综上所述,在您的案例中,使用这两种损失函数是为了确保模型在分类正确的同时,还能够在形状上更加接近真实的目标,从而提高分割任务的整体性能。

2.3.3 net.eval()与net.train()

在 PyTorch 中,net.eval() 和 net.train() 是用来切换模型在训练和评估(测试)模式之间的函数。这两个模式的主要区别在于模型中某些特定层的行为,比如 Dropout 层和 BatchNorm 层。这些层在训练和评估模式下的行为是不同的:

  1. net.train():
    • 当调用 net.train() 时,模型被设定为训练模式。
    • 在训练模式下,所有的层都是活动的,并且会按照其训练时的行为运行。
    • 例如,Dropout 层会随机地将输入的部分元素置零(根据设定的丢弃概率),以减少模型对特定训练样本的依赖,从而防止过拟合。
    • 同样地,BatchNorm 层会根据每个 batch 的数据计算均值和标准差,并使用这些统计信息来归一化数据。在训练过程中,这些统计量会不断更新。
  2. net.eval():
    • 当调用 net.eval() 时,模型被设定为评估模式。
    • 在评估模式下,某些对训练有益但对模型评估(如验证和测试)可能不合适的层会被调整。例如,Dropout 层在评估模式下不会再随机丢弃任何元素,而是保持全部元素,确保评估时模型的稳定性。
    • 对于 BatchNorm 层,它会停止使用当前 batch 的统计数据,转而使用在训练过程中累积的全局统计数据进行归一化。这样可以保证在评估模型时,模型的性能不会因为单个 batch 数据的波动而受影响。
      总的来说,正确地使用 net.train() 和 net.eval() 对于训练一个稳健的模型以及准确评估模型的性能是非常重要的。在进行模型训练时应使用 net.train(),在进行模型评估、验证或应用于实际测试数据时应使用 net.eval()。

2.3.4 detach().cpu().numpy().float().ravel()

深度学习中,计算图是用来表示计算过程的图结构,其中节点表示操作,边表示数据流。通过计算图,系统可以自动进行反向传播和梯度计算,从而实现参数更新和训练。
在PyTorch中,计算图由张量(Tensor)和操作(Operation)组成。在进行张量运算时,计算图会记录运算过程,可以追踪计算过程中的每个操作和相关的张量。默认情况下,PyTorch会跟踪这些操作,以便进行自动求导和梯度计算。
使用detach()方法可以将一个张量从计算图中分离出来,使得该张量与原始计算图断开联系。也就是说,该张量之后的操作将不再被计算图记录,从而不会对梯度传播产生影响。
使用detach()方法分离张量的主要应用场景是在不需要梯度计算的情况下进行计算,例如在验证或测试阶段,或者在进行推理时。通过分离张量,可以减少计算图的大小,从而提高运行速度和减少内存占用。
需要注意的是,使用detach()方法分离张量后,不能再对其进行梯度计算和梯度更新。如果需要保留梯度信息,可以使用detach().clone()方法来创建一个新的副本。
总之,detach()方法用于将张量从计算图中分离出来,确保后续操作不会对原始计算图产生影响,适用于不需要梯度计算的场景。

2.4 具体代码通俗解释

match_and_clustering
假设与上面相同的初始条件,但是这次我们有一个额外的聚类,并且误差矩阵稍有变化:

  • 当前行的聚类(clusters): [[4, 5, 6], [11, 12, 13], [20, 21, 22]]
    • 第一组和第二组聚类与前面相同,但新增了第三组在列 20、21、22的聚类。
  • 误差矩阵(err_matrix)(考虑了新聚类):
    • 假设为:
      • 车道 1 对第一组的误差:0.5
      • 车道 1 对第二组的误差:3.0
      • 车道 1 对第三组的误差:2.8
      • 车道 2 对第一组的误差:2.5
      • 车道 2 对第二组的误差:0.3
      • 车道 2 对第三组的误差:2.2
        处理过程
  1. 分配现有的车道线:
    • 与之前一样,车道 2 与第二组聚类(列 11、12、13)的误差最小(0.3),因此先将第二组聚类分配给车道 2。
    • 接着,车道 1 与第一组聚类(列 4、5、6)的误差为 0.5,将第一组聚类分配给车道 1。
  2. 更新输出和车道线端点:
    • 更新output数组和lane_end_pts,与之前一样。
  3. 处理未分配的聚类:
    • 现在,第三组聚类(列 20、21、22)未被分配给任何现有车道线。
    • 我们检查每个未分配的聚类。由于第三组聚类未被分配,我们将它标记为新的车道线(例如,车道 3),并将next_lane_id增加 1。
    • 更新output数组,标记列 20、21、22 为属于新的车道 3。
    • 在lane_end_pts中新增车道 3 的端点 [[20, 160]]。
      结果
  • 第 160 行处理后的结果是:
    • 车道 1 包括了列 4、5、6 的点。
    • 车道 2 包括了列 11、12、13 的点。
    • 新创建的车道 3 包括了列 20、21、22 的点。

三 3. LaneAF接入MMDetection3D

3.1 代码结构

在这里插入图片描述

3.2 具体代码实现

3.2.1 train.py

在这里插入代码片

3.2.2 openlane_dataset_AF

      
      
import json
# from utils.utils import *
import os.path as ops
import glob
from tqdm import tqdm
from PIL import Image
import numpy as np
import torch
from torch.utils.data import Dataset
from mmdet3d.datasets.pipelines import Compose
from lane.mmdet3d.datasets.multiview_datasets.instance_bevlane.openlane_extract_AF import OpenLaneSegMask
from mmdet3d.datasets.multiview_datasets.image import img_transform, normalize_img
from mmdet.datasets import DATASETS
import cv2


# 使用@DATASETS.register_module()装饰器注册了一个新的数据集模块。
@DATASETS.register_module()
class OpenLane_Dataset_AF(Dataset):
    def __init__(self, images_dir, json_file_dir, data_config=None, grid_config=None,
                 test_mode=False, pipeline=None, CLASSES=None, use_valid_flag=False):
        # 初始化父类
        super(OpenLane_Dataset_AF, self).__init__()
        
        # 从grid_config中获取宽度和深度范围
        width_range = (grid_config['x'][0], grid_config['x'][1])  # (-19.2, 19.2)
        depth_range = (grid_config['y'][0], grid_config['y'][1])  # (0, 96)
        # 从grid_config中获取宽度和深度的分辨率
        self.width_res = grid_config['x'][2]  # 0.2
        self.depth_res = grid_config['y'][2]  # 0.3
        
        # 从data_config中获取源图像的宽度和高度
        self.IMG_ORIGIN_W, self.IMG_ORIGIN_H = data_config['src_size']  # (1920, 1280)
        # 从data_config中获取输入图像的宽度和高度
        self.input_w, self.input_h = data_config['input_size']  # (960, 1280)
        
        # 获取x和y的最小和最大范围
        self.x_min = grid_config['x'][0]
        self.x_max = grid_config['x'][1]
        self.y_min = grid_config['y'][0]
        self.y_max = grid_config['y'][1]
        
        # 在y范围内均匀采样
        self.y_samples = np.linspace(self.y_min, self.y_max, num=100, endpoint=False)
        
        # 定义z的偏移量
        self.zoff = 1.08
        self.use_valid_flag = use_valid_flag
        self.CLASSES = CLASSES
        # 根据test_mode设置是否为训练模式
        self.is_train = not test_mode  
        self.data_config = data_config
        # 创建网格
        self.grid = self.make_grid()
        
        # 定义图像目录和json文件目录
        self.images_dir = images_dir
        self.json_file_dir = json_file_dir
        # 初始化数据集
        self.samples = self.init_dataset(json_file_dir)
        # 初始化OpenLaneSegMask对象
        self.mask_extract = OpenLaneSegMask(width_range=width_range,
            depth_range=depth_range,
            width_res=self.width_res,
            depth_res=self.depth_res,
            data_config=self.data_config)
        
        self.downsample = 4
        
        # 如果提供了pipeline,则初始化Compose对象
        if pipeline is not None:
            self.pipeline = Compose(pipeline)
        
        # 初始化标志数组
        self.flag = np.zeros(len(self.samples), dtype=np.uint8)

    # 定义返回数据集长度的方法
    def __len__(self):
        return len(self.samples) 

    # 创建一个3D网格的方法
    def make_grid(self):
        xcoords = torch.linspace(self.x_min, self.x_max, int((self.x_max - self.x_min) / self.width_res))  # 191
        ycoords = torch.linspace(self.y_min, self.y_max, int((self.y_max - self.y_min) / self.depth_res))  # 320
        yy, xx = torch.meshgrid(ycoords, xcoords)
        return torch.stack([xx, yy, torch.full_like(xx, self.zoff)], dim=-1)

    # 根据可见性修剪3D车道的方法
    def prune_3d_lane_by_visibility(self, lane_3d, visibility):
        lane_3d = lane_3d[visibility > 0, ...]
        return lane_3d

    def prune_3d_lane_by_range(self, lane_3d, x_min, x_max):
        # 从3D车道线数据中筛选出y坐标在 (0, 200) 范围内的数据点
        lane_3d = lane_3d[np.logical_and(lane_3d[:, 1] > 0, lane_3d[:, 1] < 200), ...]
        # 从筛选后的数据中再次筛选出x坐标在 (x_min, x_max) 范围内的数据点
        lane_3d = lane_3d[np.logical_and(lane_3d[:, 0] > x_min, lane_3d[:, 0] < x_max), ...]
        # 返回剪裁后的3D车道线数据
        return lane_3d


    def data_filter(self, gt_lanes, gt_visibility, gt_category):
        # 根据可视性信息,对每条3D车道线进行修剪
        gt_lanes = [self.prune_3d_lane_by_visibility(np.array(gt_lane), np.array(gt_visibility[k])) for k, gt_lane in
                    enumerate(gt_lanes)]
        
        # 保留至少包含两个点的车道线和其对应的类别
        gt_category = [gt_category[k] for k, lane in enumerate(gt_lanes) if lane.shape[0] > 1]
        gt_lanes = [lane for lane in gt_lanes if lane.shape[0] > 1]

        # 只保留在特定高度范围内的车道线及其对应的类别
        gt_category = [gt_category[k] for k, lane in enumerate(gt_lanes)
                    if lane[0, 1] < self.y_samples[-1] and lane[-1, 1] > self.y_samples[0]]
        gt_lanes = [lane for lane in gt_lanes if lane[0, 1] < self.y_samples[-1] and lane[-1, 1] > self.y_samples[0]]

        # 根据x轴的范围对车道线进行修剪
        gt_lanes = [self.prune_3d_lane_by_range(np.array(lane), self.x_min, self.x_max) for lane in gt_lanes]

        # 再次保留至少包含两个点的车道线及其对应的类别
        gt_category = [gt_category[k] for k, lane in enumerate(gt_lanes) if lane.shape[0] > 1]
        gt_lanes = [lane for lane in gt_lanes if lane.shape[0] > 1]

        # 返回处理过的车道线类别和车道线数据
        return gt_category, gt_lanes

    def sample_augmentation(self):
        """
        对样本进行尺寸调整或增强。
        返回:
            resize (tuple): 调整后的宽度和高度与原始宽度和高度之比的比例因子。
            resize_dims (tuple): 调整后的宽度和高度的尺寸。
        说明:
            fW 和 fH 是期望的输入图像的宽度和高度。
            self.input_w 和 self.input_h 是从配置中获得的输入尺寸。
            self.IMG_ORIGIN_W 和 self.IMG_ORIGIN_H 是原始图像的宽度和高度。
        """

        # 获取期望的输入图像的宽度和高度
        fW, fH = self.input_w, self.input_h
        # 计算调整后的宽度和高度与原始宽度和高度之比的比例因子
        resize = (fW / self.IMG_ORIGIN_W, fH / self.IMG_ORIGIN_H)
        # 获取调整后的宽度和高度的尺寸
        resize_dims = (fW, fH)
        
        return resize, resize_dims

    def get_seg_mask(self, gt_lanes_3d, gt_laned_2d, gt_category, gt_visibility):

        mask_bev, mask_2d = self.mask_extract(gt_lanes_3d, gt_laned_2d, gt_category, gt_visibility)
        return mask_bev, mask_2d

    def perspective(self, matrix, vector):
        """
        使用投影矩阵对向量应用透视投影。
        参数:
            matrix (torch.Tensor): 用于透视投影的矩阵。
            vector (torch.Tensor): 需要进行透视投影的3D向量。
        返回:
            tuple: 包含两个元素的元组:
                - 投影后的2D向量
                - 一个表示向量是否在有效的可见范围内的mask(即是否在摄像机前面)。
        说明:
            1. 首先,我们对3D向量进行操作,使其可以和4x4的投影矩阵进行矩阵乘法。
            2. 接着,我们将3D向量通过矩阵乘法转换为齐次坐标。
            3. 最后,我们将齐次坐标转换为普通的2D坐标。
        """

        # 为向量增加一个维度以进行矩阵乘法操作
        vector = vector.unsqueeze(-1)
        
        # 通过矩阵乘法计算齐次坐标
        homogeneous = torch.matmul(matrix[..., :-1], vector) + matrix[..., [-1]]
        # 齐次坐标来实现透视投影
        """
        在二维和三维空间中,齐次坐标的概念尤其重要。
        考虑二维空间:一个普通的二维点由(x, y)表示。而在齐次坐标中,该点可表示为(X, Y, W),其中x = X/W且y = Y/W。W是齐次坐标的缩放因子或齐次分量。
        同理,在三维空间中,点(x, y, z)在齐次坐标中可以表示为(X, Y, Z, W)。
        """
        homogeneous = homogeneous.squeeze(-1)
        
        # 判断向量是否在摄像机前面(即z坐标大于0)
        b = (homogeneous[..., -1] > 0).unsqueeze(-1)
        b = torch.cat((b, b, b), -1)
        b[..., -1] = True

        # 乘以mask来处理不在摄像机前面的点
        homogeneous = homogeneous * b.float()

        # 将齐次坐标转换为普通的2D坐标
        return homogeneous[..., :-1] / homogeneous[..., [-1]], b.float()

    def get_data_info(self, index, debug=True):
        # 从提供的索引位置获取样本的JSON文件名
        label_json = self.samples[index]

        # 使用os.path.join来构建JSON文件的完整路径
        label_file_path = ops.join(self.json_file_dir, label_json)

        # 初始化列表来存储图像、变换矩阵等相关数据
        # 用于存储图像的列表
        imgs = []
        # 用于存储摄像机外部变换矩阵的列表
        trans = []
        # 用于存储摄像机旋转矩阵的列表
        rots = []
        # 用于存储摄像机内部参数矩阵的列表
        intrins = []
        # 用于存储预处理后的平移矩阵的列表
        post_trans = []
        # 用于存储预处理后的旋转矩阵的列表
        post_rots = []
        # 用于存储摄像机外部参数矩阵的列表
        extrinsics = []
        # 用于存储相机镜头畸变参数的列表
        undists = []

        # 打开并读取指定的JSON文件内容
        with open(label_file_path, 'r') as fr:
            info_dict = json.loads(fr.read())

        # 使用os.path.join来构建与JSON中记录的相对路径对应的图像文件的完整路径
        image_path = ops.join(self.images_dir, info_dict['file_path'])
        # '/workspace/openlane_all/images/training/segment-10017090168044687777_6380_000_6400_000_with_camera_labels/155008346754599000.jpg'
        # 检查图像文件是否存在
        assert ops.exists(image_path), '{:s} not exist'.format(image_path)

        # 使用Python Imaging Library(PIL)库来打开图像
        img = Image.open(image_path)  # (1920, 1280)
        # 使用OpenCV库读取相同的图像文件
        image = cv2.imread(image_path)      
        # 调整OpenCV图像的大小以适配预定的输入尺寸
        image = cv2.resize(image, (self.input_w, self.input_h))  # (960, 640)
        # 从JSON数据中提取摄像机的外部参数并转化为NumPy数组
        extrinsic = np.array(info_dict['extrinsic'])
        # 从JSON数据中提取摄像机的内部参数并转化为NumPy数组
        intrinsic = np.array(info_dict['intrinsic'])
        # 从JSON数据中提取地面真实的车道线信息
        gt_lanes_packeds = info_dict['lane_lines']
        # 使用预定义的方法进行样本增强,并返回resize参数和其维度
        resize, resize_dims = self.sample_augmentation()
        
        # 根据上述参数对PIL图像进行尺寸和方向的变换
        img, post_rot, post_tran = img_transform(img, resize, resize_dims)

        # 定义两个旋转矩阵,它们用于在不同坐标系间转换
        R_vg = np.array([[0, 1, 0],
                        [-1, 0, 0],
                        [0, 0, 1]], dtype=float)
        R_gc = np.array([[1, 0, 0],
                        [0, 0, 1],
                        [0, -1, 0]], dtype=float)

        # 对摄像机的外部参数矩阵进行坐标系转换
        """
        将相机坐标系到全球坐标系的旋转变换、相机坐标系到相机中心的变换和全球坐标系到栅格地面坐标系的旋转变换进行连续相乘
        R_vg 是相机坐标系(Camera Coordinate System)到全球坐标系(Global Coordinate System)的旋转变换矩阵。
        extrinsic[:3, :3] 是 extrinsic 矩阵的前3行前3列,表示相机坐标系到相机中心的变换,即外参。
        R_gc 是全球坐标系到栅格地面坐标系(Grid Ground Coordinate System)的旋转变换矩阵。
        np.linalg.inv(R_vg) 表示 R_vg 的逆矩阵。
        np.matmul(np.matmul(np.matmul(np.linalg.inv(R_vg), extrinsic[:3, :3]), R_vg), R_gc) 表示将旋转矩阵的逆矩阵与外参相乘,然后再与 R_vg 和 R_gc 相乘。
        最后,将乘积结果赋值给 extrinsic[:3, :3],即更新了外参的旋转部分。
        """
        extrinsic[:3, :3] = np.matmul(np.matmul(
            np.matmul(np.linalg.inv(R_vg), extrinsic[:3, :3]),R_vg), R_gc)
        
        # 更新外部参数矩阵的平移部分
        extrinsic[0:2, 3] = 0.0

        gt_lanes_2d, gt_lanes_3d, gt_visibility, gt_category = [], [], [], []

        for j, gt_lane_packed in enumerate(gt_lanes_packeds):
            # A GT lane can be either 2D or 3D
            # if a GT lane is 3D, the height is intact from 3D GT, so keep it intact here too
            lane2d = np.array(gt_lane_packed['uv'], dtype=np.float32)
            lane3d = np.array(gt_lane_packed['xyz'], dtype=np.float32)
            lane_visibility = np.array(gt_lane_packed['visibility'])

            lane3d = np.vstack((lane3d, np.ones((1, lane3d.shape[1]))))
            cam_representation = np.linalg.inv(
            np.array([[0, 0, 1, 0],
                    [-1, 0, 0, 0],
                    [0, -1, 0, 0],
                    [0, 0, 0, 1]], dtype=float))     # 定义相机表示矩阵的逆矩阵

        # 进行逆透视变换:将3D车道线坐标从相机坐标系转换到全局坐标系
        lane3d = np.matmul(extrinsic, np.matmul(cam_representation, lane3d))
        # 提取3D车道线的前三维坐标,并转置为行优先形式
        lane3d = lane3d[0:3, :].T
        # 将2D车道线坐标转置为行优先形式
        lane2d = lane2d.T

        gt_lanes_3d.append(lane3d)
        gt_lanes_2d.append(lane2d)
        gt_visibility.append(lane_visibility)
        gt_category.append(gt_lane_packed['category'])

        gt_category, gt_lanes_3d = self.data_filter(gt_lanes_3d, gt_visibility, gt_category)
        
        img = normalize_img(img)   # 对图像进行归一化处理
        # 将相机的平移向量、旋转矩阵、外参矩阵、内参矩阵、后处理平移向量、后处理旋转矩阵、图像数据和失真系数依次添加到对应的列表中
        trans.append(torch.Tensor(extrinsic[:3, 3]))
        rots.append(torch.Tensor(extrinsic[:3, :3]))
        extrinsics.append(torch.tensor(extrinsic).float())
        intrins.append(torch.cat((torch.Tensor(intrinsic), torch.zeros((3, 1))), dim=1).float())
        post_trans.append(post_tran)
        post_rots.append(post_rot)
        imgs.append(img)
        # 失真系数被定义为一个长度为7的零向量 torch.zeros(7),表示不考虑任何畸变
        undists.append(torch.zeros(7))

        # 将列表转化为张量,对应维度上的元素进行堆叠
        imgs, trans, rots, intrins, post_trans, post_rots, undists, extrinsics = torch.stack(imgs), torch.stack(trans), torch.stack(rots), torch.stack(intrins), torch.stack(
            post_trans), torch.stack(post_rots), torch.stack(undists), torch.stack(extrinsics)

        extrinsics = torch.linalg.inv(extrinsics)   # 对外参矩阵取逆,由相机坐标系到全球坐标系的变换转为全球坐标系到相机坐标系的变换

        # 获取车道线的语义分割遮罩
        mask_bev, mask_2d = self.get_seg_mask(gt_lanes_3d, gt_lanes_2d, gt_category, gt_visibility)
        # # (mask_seg_bev, mask_haf_bev, mask_vaf_bev, mask_offset_bev, mask_z_bev),\
        # (mask_seg_2d,  mask_haf_2d,  mask_vaf_2d)

        # 将车道线遮罩转化为张量
        mask_bev, mask_2d = self.mask_toTensor(mask_bev, mask_2d)

        
        '''
        # if debug:
        #     visu_path = './vis_pic'
        #     calib = np.matmul(intrins, extrinsics)
        #     for gt_lane in gt_lanes:
        #         gt_lane = torch.tensor(gt_lane).float()
        #         img_points, _ = self.perspective(calib, gt_lane)

        #         post_img_points = []
        #         for img_point in img_points:
        #             img_point = torch.matmul(post_rots[0, :2, :2], img_point) + post_trans[0, :2]
        #             post_img_points.append(img_point.detach().cpu().numpy())
        #         post_img_points = np.array(post_img_points)
        #         x_2d, y_2d = post_img_points[:, 0].astype(np.int32), post_img_points[:, 1].astype(np.int32)
        #         for k in range(1, img_points.shape[0]):
        #             image = cv2.line(image, (x_2d[k - 1], y_2d[k - 1]),
        #                              (x_2d[k], y_2d[k]), (0, 0, 255), 4)
            # cv2.imwrite(visu_path + "/img.jpg", image)

        '''
        
        input_dict = dict(
            imgs=imgs,
            trans=trans,
            rots=rots,
            extrinsics=extrinsics,
            intrins=intrins,
            undists=undists,
            post_trans=post_trans,
            post_rots=post_rots,
            mask_bev=mask_bev,
            mask_2d=mask_2d,
            gt_lanes_3d=gt_lanes_3d,
            gt_lanes_2d=gt_lanes_2d,
            grid=self.grid,
            drop_idx=torch.tensor([]),
            file_path=info_dict['file_path'],
        )

        return input_dict

    def mask_toTensor(self, mask_bev, mask_2d):

        mask_seg_bev, mask_haf_bev, mask_vaf_bev, mask_offset_bev, mask_z_bev = mask_bev
        mask_seg_bev[mask_seg_bev > 0] = 1
        mask_seg_bev = torch.from_numpy(mask_seg_bev).contiguous().float().unsqueeze(0)
        mask_haf_bev = torch.from_numpy(mask_haf_bev).contiguous().float()
        mask_vaf_bev = torch.from_numpy(mask_vaf_bev).permute(2, 0, 1).contiguous().float()
        mask_offset_bev  = torch.from_numpy(mask_offset_bev).permute(2, 0, 1).contiguous().float()
        mask_z_bev  = torch.from_numpy(mask_z_bev).permute(2, 0, 1).contiguous().float()

        if mask_2d is not None:
            mask_seg_2d,  mask_haf_2d,  mask_vaf_2d = mask_2d
            mask_seg_2d[mask_seg_2d > 0] = 1
            mask_seg_2d = torch.from_numpy(mask_seg_2d).contiguous().float().unsqueeze(0)
            mask_haf_2d = torch.from_numpy(mask_haf_2d).contiguous().float()
            mask_vaf_2d = torch.from_numpy(mask_vaf_2d).permute(2, 0, 1).contiguous().float()

            return (mask_seg_bev, mask_haf_bev, mask_vaf_bev, mask_offset_bev, mask_z_bev),\
                (mask_seg_2d,  mask_haf_2d,  mask_vaf_2d)
        
        return (mask_seg_bev, mask_haf_bev, mask_vaf_bev, mask_offset_bev, mask_z_bev), None

    def init_dataset(self, json_file_dir):
        filter_samples = []  # '/workspace/openlane_all/images/validation/segment-11048712972908676520_545_000_565_000_with_camera_labels/152268469123823000.jpg'
        samples = glob.glob(json_file_dir + '**/*.json', recursive=True)
        print("[INFO] init datasets...")
        for i, sample in tqdm(enumerate(samples)):
            label_file_path = ops.join(self.json_file_dir, sample)
            with open(label_file_path, 'r') as fr:
                info_dict = json.loads(fr.read())
            image_path = ops.join(self.images_dir, info_dict['file_path'])
            if not ops.exists(image_path):
                # print('{:s} not exist'.format(image_path))
                continue
            # if i < 1014:
            #     continue
            filter_samples.append(sample)
            if len(filter_samples) > 63:
                break
            # print("image_path:", image_path)

        # return samples
        return filter_samples

    def __getitem__(self, idx):
        input_dict = self.get_data_info(idx, debug=False)
        data = self.pipeline(input_dict)
        return data
        # return input_dict

if __name__ == '__main__':
    data_config = {
        'cams': [],
        'Ncams': 1,
        'input_size': (960, 640),
        'src_size': (1920, 1280),
        'thickness': 5,
        'angle_class': 36,

        # Augmentation
        'resize': (-0.06, 0.11),
        'rot': (-5.4, 5.4),
        'flip': True,
        'crop_h': (0.0, 0.0),
        'resize_test': 0.00,

    }

    grid_config = {
        'x': [-10.0, 10.0, 0.15],
        'y': [3.0, 103.0, 0.5],
        'z': [-5, 3, 8],
        'depth': [1.0, 60.0, 1.0],
    }
    # images_dir = '/home/slj/Documents/workspace/mmdet3d/data/openlane/example/image'
    # json_file_dir = '/home/slj/Documents/workspace/mmdet3d/data/openlane/example/annotations/segment-260994483494315994_2797_545_2817_545_with_camera_labels'

    images_dir = '/home/slj/data/openlane/openlane_all/images'
    json_file_dir = '/home/slj/data/openlane/openlane_all/lane3d_300/training/'

    dataset = OpenLane_Dataset_AF(images_dir, json_file_dir, data_config=data_config, grid_config=grid_config,
                 test_mode=False, pipeline=None, CLASSES=None, use_valid_flag=True)

    for idx in tqdm(range(dataset.__len__())):
        input_dict = dataset.__getitem__(idx)
        print(idx)

    

    

3.2.3 openlane_af2D.py

      
      
# Copyright (c) Phigent Robotics. All rights reserved.

dist_params = dict(backend='nccl')
# 设置分布式参数,指定使用的后端为'nccl'
log_level = 'INFO'
# 设置日志级别为'INFO'
work_dir = '../.result/work_AF1022_test'
# 设置工作目录为'./result/work_AF_test'
load_from =  None
# 设置加载模型的路径为 None,即不加载已经训练好的模型
resume_from = None
# 设置从断点恢复的路径为 None,即不从任何断点处恢复
no_validate = True
# 设置为 True,表示不进行验证步骤

#for IterBasedRunner
# runner = dict(type='IterBasedRunner', max_epochs=24) #IterBasedRunner
# workflow = [('train', 20), ('val', 5)] #for IterBasedRunner
# max_loops = 50 #for IterBasedRunner

#for EpochBasedRunner
runner = dict(type='EpochBasedRunner', max_epochs=5)
workflow = [('train', 1), ('val', 1)]#, ('test', 1)]
max_loops = 5

checkpoint_config = dict(interval=500)
# 检查点保存的配置,设置间隔为500步保存一次检查点
# 模型的权重以及一些其他相关的训练参数,可以用于恢复训练、评估模型或在未来进行推理。
opencv_num_threads = 0
# 禁用OpenCV的多线程以避免系统负载过重
mp_start_method = 'fork'
# 将多进程的启动方法设置为'fork'以加快训练速度
# windows一般为'spawn'
class_names = []
# 类别名称,这里初始化为空列表

data_config = {
    'cams': [],             # 这是一个相机列表,当前为空。它可能用于存储有关每个相机的信息或参数。
    'Ncams': 1,             # 这是相机的数量。目前配置只有一个相机。
    'input_size': (960, 640),   # 这是模型的输入图像大小。宽度是960像素,高度是640像素。
    'src_size': (1920, 1280),   # 这是原始图像的大小或解析度,可能在处理之前。宽度是1920像素,高度是1280像素。
    'thickness': 2,         # 这可能是一些图像处理的参数,但具体的上下文不明。
    'angle_class': 36,      # 这可能指的是角度的分类或离散化的数量。

    # 数据增强参数
    'resize': (-0.06, 0.11),   # 调整图像大小的范围。这可以是一种数据增强方法,允许图像在给定的范围内缩小或放大。
    'rot': (-5.4, 5.4),       # 旋转图像的角度范围,为了数据增强。
    'flip': False,            # 是否进行图像翻转的标志。
    'crop_h': (0.0, 0.0),     # 图像垂直裁剪的范围。目前它是0,意味着没有裁剪。
    'resize_test': 0.00,      # 这可能是在测试阶段调整图像大小的参数。
}

#
grid_config = {
    'x': [-19.2, 19.2, 0.2],    # x轴的范围和间距。从-19.2到19.2,每0.2一个间隔。
    'y': [0.0, 96.0, 0.3],     # y轴的范围和间距。从0.0到96.0,每0.3一个间隔。
    'z': [-5, 3, 8],           # z轴的范围和间距。这个配置有些奇怪,因为从-5到3的范围与8的间隔不太匹配。
    'grid_height': 6,          # 网格的高度。
    'depth': [1.0, 60.0, 1.0], # 深度的范围和间距。从1.0到60.0,每1.0一个间隔。
    'grid_res': [0.2, 0.2, 0.6], # 3D网格的分辨率,分别为x、y、z方向。
    'offset': [-19.2, 0.0, 1.08], # 网格的起始偏移量。
    'grid_size': [38.4, 96],   # 网格的大小,这可能是x和y方向上的总长度。
    'depth_discretization': 'LID', # 深度的离散化方法。
    'dbound': [4.0, 70.0, 0.6]  # 深度边界的范围和间隔。
}

# model
model = dict(
    type='LaneDetector2D',  # 模型的类型或名称,它叫做"LaneAF2DDetector",可能是一个2D车道检测器。
        img_backbone=dict(
        pretrained='torchvision://resnet34',  # 使用预训练的resnet34模型。
        type='ResNet',  # 主干网络的类型是ResNet。
        depth=34,  # ResNet的深度,这里使用的是ResNet-34。
        num_stages=4,  # ResNet的阶段数量。
        out_indices=(0, 1, 2, 3),  # 输出的阶段索引。
        frozen_stages=-1,  # 冻结的阶段,-1意味着不冻结任何阶段。
        norm_cfg=dict(type='BN', requires_grad=True),  # 使用批量归一化(Batch Normalization)。
        norm_eval=False,  # 在评估模式下不使用BN。
        with_cp=True,  # 是否使用checkpoint,有助于节省显存。
        style='pytorch'  # 使用PyTorch风格的ResNet。
    ),
        img_neck=dict(
        type='CustomFPN',  # 使用自定义的特征金字塔网络(Feature Pyramid Network, FPN)。
        in_channels=[64, 128, 256, 512],  # 输入通道数,与ResNet-34的输出匹配。
        out_channels=256,  # FPN的输出通道数。
        num_outs=1,  # 输出的数量。
        start_level=0,  # 开始的层级。
        out_ids=[0]  # 输出的id。
    ),
        lane_head=dict(
        type='LaneAF_Head',  # 车道头部的类型。
        in_channel=256,  # 输入通道数,与FPN的输出匹配。
        num_classes=1,  # 类别数。这里设置为1,可能是二进制分类(车道/非车道)。
        seg_loss=dict(type='Lane_FocalLoss'),  # 用于分割的损失函数,这里使用特定的"FocalLoss"。
        haf_loss=dict(type='RegL1Loss'),  # 可能与车道的水平对齐有关的损失函数。
        vaf_loss=dict(type='RegL1Loss'),  # 可能与车道的垂直对齐有关的损失函数。
    ))

# Data
dataset_type = 'OpenLane_Dataset_AF' #'IterBasedRunner', EpochBasedRunner
images_dir = '/workspace/openlane_all/images'
json_file_dir_train = '/workspace/openlane_all/lane3d_300/training/'
json_file_dir_val = '/workspace/openlane_all/lane3d_300/validation/'
file_client_args = dict(backend='disk')

train_pipeline = [
    dict(
        type='LoadAnnotationsSeg',
        bda_aug_conf=None,
        classes=class_names,
        is_train=False),
    # 返回 img_inputs=(imgs, intrins, extrinsics, post_rots, post_trans, undists,
                                # bda_rot, rots, trans, grid, drop_idx)
    # maps_bev,maps_2d,gt_lanes_3d,gt_lanes_2d,file_path
    # """
    # LoadAnnotationsSeg类型的数据加载步骤用于加载标注数据,其中包括了类别信息、语义分割标签等。bda_aug_conf参数为None,表示没有使用数据增强。classes参数指定了类别名称。is_train参数为False,表示这个步骤用于测试和评估。
    # Collect3D类型的数据收集步骤用于将处理后的数据组合成一个字典,其中包括了以下键值对:
    # 'img_inputs':用于输入网络的图像数据。
    # 'maps_bev':Bird's Eye View(BEV)地图数据。
    # 'maps_2d':2D地图数据。
    # 'gt_lanes_3d':3D车道线标签数据。
    # 'gt_lanes_2d':2D车道线标签数据。
    # 'file_path':文件路径信息。
    # """
    dict(
        type='Collect3D', 
        keys=['img_inputs', 'maps_bev', 'maps_2d'], 
        meta_keys=['gt_lanes_3d', 'gt_lanes_2d', 'file_path'])
]
test_pipeline = [
    dict(
        type='LoadAnnotationsSeg',
        bda_aug_conf=None,
        classes=class_names,
        is_train=False),
    dict(
        type='Collect3D', 
        keys=['img_inputs', 'maps_bev', 'maps_2d'], 
        meta_keys=['gt_lanes_3d', 'gt_lanes_2d', 'file_path'])
]
eval_pipeline = [
    dict(
        type='LoadAnnotationsSeg',
        bda_aug_conf=None,
        classes=class_names,
        is_train=False),
    dict(
        type='Collect3D', 
        keys=['img_inputs', 'maps_bev', 'maps_2d'], 
        meta_keys=['gt_lanes_3d', 'gt_lanes_2d', 'file_path'])
]

# 定义输入模态性(Input Modality),用于指定哪些传感器数据将被使用
input_modality = dict(
    use_lidar=False,     # 是否使用激光雷达数据
    use_camera=True,     # 是否使用摄像头数据
    use_radar=False,     # 是否使用雷达数据
    use_map=False,       # 是否使用地图数据
    use_external=False   # 是否使用外部数据
)

# 定义共享数据配置,其中包括数据集类型和模态性配置
share_data_config = dict(
    type=dataset_type,    # 数据集的类型
    # modality=input_modality,  # 输入模态性配置(可选择是否启用)
)

# 定义测试数据配置,包括图像目录、JSON文件目录、数据配置、网格配置、数据处理流程等
test_data_config = dict(
    images_dir=images_dir,            # 图像数据的目录
    json_file_dir=json_file_dir_val,  # JSON文件数据的目录
    data_config=data_config,          # 数据配置
    grid_config=grid_config,          # 网格配置
    pipeline=test_pipeline,           # 数据处理流程
    CLASSES=None,                     # 类别信息(在测试模式下一般不需要)
    test_mode=True,                   # 是否处于测试模式
    use_valid_flag=False,             # 是否使用验证标志(一般在测试模式下不使用)
)

# 定义数据配置字典,包括每个GPU的样本数、每个GPU的工作进程数、是否洗牌等
data = dict(
    samples_per_gpu=16,         # 每个GPU的样本数
    workers_per_gpu=0,          # 每个GPU的工作进程数
    shuffle=True,               # 是否在数据加载时洗牌

    # 训练数据配置,包括图像目录、JSON文件目录、数据配置、网格配置、数据处理流程等
    train=dict(
        images_dir=images_dir,             # 图像数据的目录
        json_file_dir=json_file_dir_train, # JSON文件数据的目录
        data_config=data_config,           # 数据配置
        grid_config=grid_config,           # 网格配置
        pipeline=train_pipeline,           # 数据处理流程
        CLASSES=None,                      # 类别信息(在训练模式下一般不需要)
        test_mode=False,                   # 是否处于测试模式
        use_valid_flag=False,              # 是否使用验证标志(一般在训练模式下不使用)
    ),

    # 验证数据配置,使用了之前定义的测试数据配置
    val=test_data_config,
    # 测试数据配置,也使用了之前定义的测试数据配置
    test=test_data_config
)

# evaluation = dict(interval=1, pipeline=eval_pipeline)

for key in ['train', 'val', 'test']:
    data[key].update(share_data_config)

# 定义优化器配置字典,包括优化器的类型、学习率和权重衰减等参数
optimizer = dict(type='AdamW', lr=2e-4, weight_decay=1e-07)
# 定义优化器配置字典,包括梯度裁剪的配置,这里设置了梯度裁剪的最大范数和裁剪类型
optimizer_config = dict(grad_clip=dict(max_norm=5, norm_type=2))

# 定义学习率配置字典,包括学习率策略、预热策略和预热参数等
lr_config = dict(
    policy='step',                # 学习率策略,这里是按步骤调整学习率
    warmup='linear',              # 预热策略,这里是线性预热
    warmup_iters=200,             # 预热迭代次数
    warmup_ratio=0.001,           # 预热比例
    step=[24,]                    # 学习率调整的步骤,这里在第24个迭代时调整学习率
)

log_config = dict(
    interval=10,
    hooks=[
        dict(type='TextLoggerHook'),
        dict(type='TensorboardLoggerHook')
    ])

custom_hooks = [
    # dict(
    #     type='MEGVIIEMAHook',
    #     init_updates=10560,
    #     priority='NORMAL',
    # ),
    dict(
        type='BevLaneVisAllHook',
    ),
]

# unstable
# fp16 = dict(loss_scale='dynamic')

    

3.2.4 LaneDetector2D.py

      
# Copyright (c) Phigent Robotics. All rights reserved.
import cv2
import numpy as np
import torch
import torch.nn.functional as F
from mmcv.runner import force_fp32

from mmdet.models import DETECTORS
from .. import builder
from mmdet3d.models.detectors.base import Base3DDetector


# 使用装饰器注册为目标检测
@DETECTORS.register_module()
class LaneDetector2D(Base3DDetector):
    def __init__(self, img_backbone, img_neck, lane_head, train_cfg, test_cfg, pretrained=None, **kwargs):
        super(LaneDetector2D, self).__init__(**kwargs)  # 调用父类的初始化方法
        if img_backbone:
            self.img_backbone = builder.build_backbone(img_backbone)
        if img_neck is not None:
            self.img_neck = builder.build_neck(img_neck)
        self.lane_head = builder.build_head(lane_head)

        self.train_cfg = train_cfg
        self.test_cfg = test_cfg
        self.pretrained = pretrained  # 预训练模型的路径

    def image_encoder(self, img):
        # 获取输入图像
        imgs = img  #(640, 960)
        
        # 获取图像的形状,其中:
        # B 为 batch size(批次大小)
        # N 为图像数量
        # C 为通道数量(例如RGB图像C=3)
        # imH 为图像高度
        # imW 为图像宽度
        B, N, C, imH, imW = imgs.shape
        
        # 这样可以同时处理所有图像
        imgs = imgs.view(B * N, C, imH, imW)
        
        # 使用backbone网络对图像进行特征提取,返回的可能是多尺度的特征图
        x = self.img_backbone(imgs)  # 例如:[[8, 64, 160, 240], [8, 128, 80, 120], [8, 256, 40, 60] [8, 512, 20, 30]] # [[1, 64, 160, 240], [1, 128, 80, 120], [1, 256, 40, 60] [1, 512, 20, 30]
        # 如果有定义img_neck(例如,用于特征融合的结构),则进行处理
        if self.with_img_neck:       
            x = self.img_neck(x)  # (8, 256, 160, 240)
            
            # 如果返回的是多个特征图的列表或元组,只取第一个
            if type(x) in [list, tuple]:
                x = x[0]
        
        # 获取处理后的特征图的形状
        _, output_dim, ouput_H, output_W = x.shape
        
        # 如果原始输入中每个batch包含多于1个图像,调整形状回到原始的batch形式
        if N!=1:
            x = x.view(B, N, output_dim, ouput_H, output_W)
        return x
    

    def extract_img_feat(self, img, img_metas, **kwargs):
        """
        提取图像的特征。
        Args:
            img (tuple): 包含多种图像和相关参数的元组。
            img_metas (type not specified): 图像的元信息。
            **kwargs: 其他关键字参数。
        Returns:
            torch.Tensor: 提取的图像特征。
        """
        # features, ks, imu2cs, post_rots, post_trans, undists
        # 从输入img中解包各种参数和信息
        # ks: 内参矩阵
        # imu2cs: IMU到摄像机的转换矩阵
        # post_rots: 位姿旋转
        # post_trans: 位姿平移
        # undists: 非失真参数(可能用于校正镜头畸变)
        ks, imu2cs, post_rots, post_trans, undists = img[1], img[2], img[3], img[4], img[5]
        
        # grid: 网格信息
        # drop_idx: 被舍弃的索引信息
        grid, drop_idx = img[9], img[10]
        
        # 使用image_encoder方法提取图像特征
        x = self.image_encoder(img[0])
        
        return x
    
    #   output['img_inputs'] = (imgs, rots, trans, intrins, post_rots,
    #                                  post_trans, bda_rot, extrinsics, undists, gt_lanes)
    # output['maps'] = (gt_mask, mask_haf, mask_vaf, mask_offset)
    # 被注释掉的部分提供了输入的结构信息,以帮助理解img元组中的内容:
    # img_inputs: 输入的图像和相关参数。包括:
    # imgs: 图像数据
    # rots: 旋转参数
    # trans: 平移参数
    # intrins: 内部参数(或内参)
    # post_rots: 位姿旋转
    # post_trans: 位姿平移
    # bda_rot: BDA的旋转参数
    # extrinsics: 外部参数
    # undists: 非失真参数
    # gt_lanes: 真实车道线信息

    # maps: 相关的映射信息,包括:
    # gt_mask: 真实的掩码
    # mask_haf: HAF的掩码
    # mask_vaf: VAF的掩码
    # mask_offset: 偏移掩码
    #   output['img_inputs'] = (imgs, rots, trans, intrins, post_rots,
    #                                  post_trans, bda_rot, extrinsics, undists, gt_lanes)
    # output['maps'] = (gt_mask, mask_haf, mask_vaf, mask_offset)
    
    def extract_feat(self, img, img_metas, **kwargs):
        """
        从图像和点云数据中提取特征。
        Args:
            img (type not specified): 输入图像数据。
            img_metas (type not specified): 图像的元信息。
            **kwargs: 其他关键字参数。
        Returns:
            torch.Tensor: 提取的图像特征。
        """
        
        # 使用extract_img_feat方法提取图像特征
        img_feats = self.extract_img_feat(img, img_metas, **kwargs)
        
        return img_feats

    def forward_lane_train(self, img_feats, gt_labels, **kwargs):
        """
        训练过程中的前向传播函数,专门用于处理车道线任务。
        Args:
            img_feats (torch.Tensor): 从输入图像中提取的特征。
            gt_labels (list of torch.Tensor): 真实标签,包括车道线的分割掩码、水平方向的掩码、垂直方向的掩码和偏移掩码。
            **kwargs: 其他关键字参数。
        Returns:
            dict: 包含损失信息的字典。
            torch.Tensor: 网络输出的预测值。
        """
        #NOTE: input = [seg_mask, haf_mask, vaf_mask, mask_offset]
        # array1 = maps[0].detach().cpu().numpy()
        # cv2.imwrite("./map.png", np.argmax(array1, axis=0) * 100)
        # NOTE: 输入数据的结构为:[分割掩码, 水平方向的掩码, 垂直方向的掩码, 偏移掩码]
        # 使用lane_head网络部分处理图像特征
        outs = self.lane_head(img_feats)
        # 准备输入到损失函数的数据
        loss_inputs = [outs, gt_labels]
        # 将输出和真实标签传递给损失函数以计算损失
        # 这里注释掉了一个可能用于计算损失的函数,但最终选择使用lane_head的损失函数计算损失
        # losses = self.bev_lane_loss(*loss_inputs)
        losses = self.lane_head.loss(*loss_inputs)
        
        # 返回损失和网络的输出
        return losses, outs

    # def forward_train(self,
    #                   img_metas=None,
    #                   img_inputs=None,
    #                   maps_bev=None,
    #                   maps_2d=None,
    #                   label_bev=None,
    #                   label_2d=None,
    #                   **kwargs):

    #     img_feats = self.extract_feat(img=img_inputs, img_metas=img_metas, **kwargs)
    #     losses = dict()
    #     losses_lane, out = self.forward_lane_train(img_feats, maps_2d)
    #     losses.update(losses_lane)
    #     return losses, out

    def forward_train(self,
                  img_metas=None,
                  img_inputs=None,
                  maps_bev=None,
                  maps_2d=None,
                  label_bev=None,
                  label_2d=None,
                  **kwargs):
        """
        训练过程中的前向传播函数。
        Args:
            img_metas (list): 各图像的元数据信息,例如尺寸、缩放因子等。
            img_inputs (torch.Tensor): 输入的图像数据。
            maps_bev (torch.Tensor): BEV (Bird's Eye View) 的映射数据,可能用于其他任务,这里没有用到。
            maps_2d (torch.Tensor): 2D图像的映射数据,用于车道线检测。
            label_bev (list): BEV的标签数据,可能用于其他任务,这里没有用到。
            label_2d (list): 2D图像的标签数据,用于车道线检测。
            **kwargs: 其他关键字参数。
        Returns:
            dict: 包含损失信息的字典。
            torch.Tensor: 网络输出的预测值。
        """
        
        # 从输入图像中提取特征
        img_feats = self.extract_feat(img=img_inputs, img_metas=img_metas, **kwargs)
        # 初始化损失字典
        losses = dict()
        # 使用专门的车道线训练前向传播函数处理图像特征,并获取车道线的损失和输出
        losses_lane, out = self.forward_lane_train(img_feats, maps_2d)
        # 将车道线的损失添加到总损失字典中
        losses.update(losses_lane)
        # 返回总损失和网络的输出
        return losses, out

    def forward_test(self,
                      img_metas=None,
                      img_inputs=None,
                      maps_bev=None,
                      maps_2d=None,
                      **kwargs):
        return self.simple_test(img_metas, img_inputs, maps_2d)
    
    def aug_test(self, points, img_metas, img=None, rescale=False):
        assert False

    def simple_test(self,
                img_metas=None,
                img_inputs=None,
                gt_labels=None,
                **kwargs):
        """
        简化的测试过程函数。
        Args:
            img_metas (list): 各图像的元数据信息,例如尺寸、缩放因子等。
            img_inputs (torch.Tensor): 输入的图像数据。
            gt_labels (list): 真实标签数据,用于评估或其他目的,但在此函数内部没有使用。
            **kwargs: 其他关键字参数。
        Returns:
            torch.Tensor: 车道线检测的网络输出结果。
        """
        
        # 从输入图像中提取特征
        img_feats = self.extract_feat(img=img_inputs, img_metas=img_metas, **kwargs)
        # 通过车道线头部进行前向传播,获取输出结果
        outs = self.lane_head(img_feats)
        # 返回网络的输出结果
        return outs

    def forward_dummy(self,
                  img_metas=None,
                  img_inputs=None,
                  maps_bev=None,
                  maps_2d=None,
                  **kwargs):
        """
        虚拟的前向传播函数。
        该函数通常用于模型的性能基准测试,不实际考虑模型的输出内容,只关心模型的运行速度。
        Args:
            img_metas (list, optional): 各图像的元数据信息,例如尺寸、缩放因子等。
            img_inputs (torch.Tensor, optional): 输入的图像数据。
            maps_bev (torch.Tensor, optional): BEV(Bird's Eye View,俯视图)的地图数据,本函数中未使用。
            maps_2d (torch.Tensor, optional): 2D地图数据,本函数中未使用。
            **kwargs: 其他关键字参数。
        Returns:
            torch.Tensor: 车道线检测的网络输出结果。
        """
        
        # 从输入图像中提取特征,但忽略其他返回值(例如点云特征)
        img_feats, _ = self.extract_feat(img=img_inputs, img_metas=img_metas, **kwargs)
        # 通过车道线头部进行前向传播,获取输出结果
        outs = self.lane_head(img_feats)
        # 返回网络的输出结果
        return outs
    @property
    def with_img_neck(self):
        """bool: Whether the detector has a neck in image branch."""
        return hasattr(self, 'img_neck') and self.img_neck is not None


# 使用DETECTORS装饰器进行注册,使得LaneATT2DDetector可以被自动识别和使用
@DETECTORS.register_module()
class LaneDetector2DATT(LaneDetector2D):
    def __init__(self, img_backbone, img_neck, lane_head, train_cfg, test_cfg, pretrained=None, **kwargs):
        """
        车道线ATT2D检测器初始化函数。
        Args:
            img_backbone (dict): 图像的主干网络配置。
            img_neck (dict): 图像的颈部网络配置。
            lane_head (dict): 车道线头部网络配置。
            train_cfg (dict): 训练相关的配置参数。
            test_cfg (dict): 测试相关的配置参数。
            pretrained (str, optional): 预训练模型的路径。
            **kwargs: 其他关键字参数。
        """
        # 通过父类进行初始化
        super(LaneDetector2DATT, self).__init__(img_backbone, img_neck, lane_head, train_cfg, test_cfg, pretrained=None, **kwargs)

    def forward_train(self,
                      img_metas=None,
                      img_inputs=None,
                      maps_bev=None,
                      maps_2d=None,
                      label_bev=None,
                      label_2d=None,
                      **kwargs):
        """
        训练模式下的前向传播函数。
        Args:
            img_metas (list, optional): 各图像的元数据信息,例如尺寸、缩放因子等。
            img_inputs (torch.Tensor, optional): 输入的图像数据。
            maps_bev (torch.Tensor, optional): BEV(Bird's Eye View,俯视图)的地图数据。
            maps_2d (torch.Tensor, optional): 2D地图数据。
            label_bev (torch.Tensor, optional): BEV的标签数据。
            label_2d (torch.Tensor, optional): 2D的标签数据。
            **kwargs: 其他关键字参数。
        Returns:
            dict: 各部分的损失。
            torch.Tensor: 车道线检测的网络输出结果。
        """
        # 从输入图像中提取特征
        img_feats = self.extract_feat(img=img_inputs, img_metas=img_metas, **kwargs)
        losses = dict()
        # 使用特征进行车道线的前向训练,并获取损失及输出
        losses_lane, out = self.forward_lane_train(img_feats, label_2d)
        # 更新损失字典
        losses.update(losses_lane)
        
        return losses, out

    def forward_test(self,
                      img_metas=None,
                      img_inputs=None,
                      maps_bev=None,
                      maps_2d=None,
                      label_bev=None,
                      label_2d=None,
                      **kwargs):
        """
        测试模式下的前向传播函数。
        Args:
            img_metas (list, optional): 各图像的元数据信息,例如尺寸、缩放因子等。
            img_inputs (torch.Tensor, optional): 输入的图像数据。
            maps_bev (torch.Tensor, optional): BEV(Bird's Eye View,俯视图)的地图数据。
            maps_2d (torch.Tensor, optional): 2D地图数据。
            label_bev (torch.Tensor, optional): BEV的标签数据。
            label_2d (torch.Tensor, optional): 2D的标签数据。
            **kwargs: 其他关键字参数。
        Returns:
            torch.Tensor: 车道线检测的网络输出结果。
        """
        return self.simple_test(img_metas, img_inputs, label_2d)

    

3.2.5 LaneAF2Dhead

      
      
# !/usr/bin/python
# -*- coding: utf-8 -*-
"""
@Author  :   wangjing
@Version :   0.1
@License :   (C)Copyright 2019-2035
@Desc    :   None
"""
import torch
import torch.nn as nn
from mmdet3d.models.builder import HEADS, build_loss
import numpy as np
import cv2

@HEADS.register_module()
class LaneAF2DHead(nn.Module):
    """
    一个用于车道检测的2D头部网络。它预测了车道的二值分割、水平透视场 (HAF) 和垂直透视场 (VAF)。
    
    属性:
    - num_classes: 分割头的类别数。
    - inner_channel: 中间卷积层的通道数。
    - seg_loss: 用于分割的损失函数。
    - haf_loss: 用于HAF的损失函数。
    - vaf_loss: 用于VAF的损失函数。
    - binary_seg: 用于预测车道分割的网络层。
    - haf_head: 用于预测HAF的网络层。
    - vaf_head: 用于预测VAF的网络层。
    - debug: 如果为True,则会在debug模式下运行,可能包括一些额外的日志或检查。
    """

    def __init__(
        self,
        num_classes=1,
        in_channel=64,
        debug=False,
        seg_loss=dict(type='Lane_FocalLoss'),
        haf_loss=dict(type='RegL1Loss'),
        vaf_loss=dict(type='RegL1Loss'),
    ):
        """
        初始化函数。
        
        参数:
        - num_classes: 分割任务的类别数,默认为1。
        - in_channel: 输入的通道数,默认为64。
        - debug: 是否开启调试模式,默认为False。
        - seg_loss: 用于分割任务的损失函数的配置。
        - haf_loss: 用于HAF任务的损失函数的配置。
        - vaf_loss: 用于VAF任务的损失函数的配置。
        """

        # 初始化父类的构造函数
        super().__init__()
        
        # 定义类属性
        self.num_classes = num_classes
        self.inner_channel = in_channel

        # 构建损失函数
        self.seg_loss = build_loss(seg_loss)
        self.haf_loss = build_loss(haf_loss)
        self.vaf_loss = build_loss(vaf_loss)

        # 定义二值分割头
        self.binary_seg = nn.Sequential(
            nn.Conv2d(in_channel, self.inner_channel, 1),
            nn.BatchNorm2d(self.inner_channel),
            nn.ReLU(),
            nn.Conv2d(self.inner_channel, self.num_classes, 1),
            # nn.Sigmoid(),
        )

        # 定义HAF头
        self.haf_head = nn.Sequential(
            nn.Conv2d(in_channel, self.inner_channel, 1),
            nn.BatchNorm2d(self.inner_channel),
            nn.ReLU(),
            nn.Conv2d(self.inner_channel, 1, 1),
        )
        
        # 定义VAF头
        self.vaf_head = nn.Sequential(
            nn.Conv2d(in_channel, self.inner_channel, 1),
            nn.BatchNorm2d(self.inner_channel),
            nn.ReLU(),
            nn.Conv2d(self.inner_channel, 2, 1),
        )

        # 如果开启调试模式,定义一个交叉熵损失函数
        self.debug = debug
        if self.debug:
            self.debug_loss = nn.CrossEntropyLoss()

    def loss(self, preds_dicts, gt_labels, **kwargs):
        """
        计算LaneAF2DHead模块的损失。
        参数:
        - preds_dicts (tuple): 一个包含模块预测的输出的元组,包括二值分割、HAF预测、VAF预测和topdown。
        - gt_labels (tuple): 一个包含真实标签的元组,包括真实的分割掩码、HAF掩码和VAF掩码。
        - **kwargs: 其他可能需要的关键字参数。
        返回:
        - loss_dict (dict): 包含各种损失的字典。键为损失的名称,值为相应的损失值。
        """
        
        # 初始化损失字典
        loss_dict = dict()
        # 解包预测和真实标签的元组
        binary_seg, haf_pred, vaf_pred, topdown = preds_dicts
        gt_mask, mask_haf, mask_vaf = gt_labels
        # 获取设备信息(例如,CPU或GPU)
        device = binary_seg.device
        # 将真实的分割掩码转移到相应的设备
        maps = gt_mask.to(device)
        # 增加HAF掩码的维度
        mask_haf = torch.unsqueeze(mask_haf, 1)
        
        # 计算HAF的损失
        # haf_loss = self.haf_loss(haf_pred, mask_haf, binary_seg)
        haf_loss = self.haf_loss(haf_pred, mask_haf, gt_mask)
        # 计算VAF的损失
        # vaf_loss = self.vaf_loss(vaf_pred, mask_vaf, binary_seg)
        vaf_loss = self.vaf_loss(vaf_pred, mask_vaf, gt_mask)
        # 计算二值分割的损失
        seg_loss = self.seg_loss(binary_seg, maps)

        # 将各个损失乘以10并存储到损失字典中
        loss_dict['haf_loss'] = haf_loss * 10.0
        loss_dict['vaf_loss'] = vaf_loss * 10.0
        loss_dict['seg_loss'] = seg_loss * 10.0

        # 计算总损失并存储到损失字典中
        loss_dict['loss'] = (2 * haf_loss + 2 * vaf_loss + 8 * seg_loss) * 10.0

        return loss_dict

    def forward(self, topdown):
        """
        前向传播函数。当我们向模型提供一个输入时,它将通过这个函数进行处理。
        参数:
        - topdown (Tensor): 上下文特征映射。
        返回:
        - lane_head_output (tuple): 包含三个部分的输出元组: 二值分割, HAF和VAF,以及输入的上下文特征映射。
        """
        
        # 通过二值分割头部进行处理,获得二值分割的结果
        binary_seg = self.binary_seg(topdown)
        # 通过HAF头部进行处理,获得HAF的结果
        haf = self.haf_head(topdown)
        # 通过VAF头部进行处理,获得VAF的结果
        vaf = self.vaf_head(topdown)
        # 将所有结果整合成一个输出元组
        lane_head_output = binary_seg, haf, vaf, topdown

        return lane_head_output

    def tensor2image(self, tensor, mean, std):
        """
        将给定的Tensor转换为图片格式。
        参数:
        - tensor (Tensor): 需要转换的张量。
        - mean (array): 均值,用于逆归一化。
        - std (array): 标准差,用于逆归一化。
        返回:
        - image (array): 从张量转换而来的图片。
        """

        # 为均值和标准差增加维度,使它们能与张量匹配
        mean = mean[..., np.newaxis, np.newaxis] # (nc, 1, 1)
        mean = np.tile(mean, (1, tensor.size()[2], tensor.size()[3])) # (nc, H, W)
        std = std[..., np.newaxis, np.newaxis] # (nc, 1, 1)
        std = np.tile(std, (1, tensor.size()[2], tensor.size()[3])) # (nc, H, W)

        # 逆归一化
        image = 255.0*(std*tensor[0].cpu().float().numpy() + mean)

        # 如果图像是单通道的,复制通道以形成三通道图像
        if image.shape[0] == 1:
            image = np.tile(image, (3, 1, 1))

        # 调整通道的顺序,从(CHW)转换为(HWC)
        image = np.transpose(image, (1, 2, 0))
        # 将RGB格式转换为BGR格式
        image = image[:, :, ::-1]

        # 返回uint8格式的图像
        return image.astype(np.uint8)

    def decodeAFs(BW, VAF, HAF, fg_thresh=128, err_thresh=5, viz=True):
        """
        从给定的二值分割图(BW)和透视场(VAF and HAF)解码车道线。
        参数:
        - BW: 二值分割图,其中车道线像素为前景。
        - VAF: 垂直透视场
        - HAF: 水平透视场
        - fg_thresh: 用于确定BW中的前景像素的阈值。
        - err_thresh: 确定车道线聚类的阈值。
        - viz: 是否可视化解码过程。
        返回:
        - 输出图,其中每个像素都标有一个车道ID。
        """
        # 初始化输出数组为0,其大小与 BW 相同  # BW分割
        output = np.zeros_like(BW, dtype=np.uint8)
        # 用于存储每条车道的末端点的列表
        lane_end_pts = [] 
        # 定义下一个可用的车道ID
        next_lane_id = 1
        
        # 可视化给定的二值图像
        if viz:
            im_color = cv2.applyColorMap(BW, cv2.COLORMAP_JET)
            cv2.imshow('BW', im_color)
            ret = cv2.waitKey(0)
    
        # 从最后一行开始解码到第一行
        # 求解每一行的中心点
        for row in range(BW.shape[0]-1, -1, -1):
            # 获取当前行中的前景像素列,即车道线像素
            cols = np.where(BW[row, :] > fg_thresh)[0]
            # 初始化簇/集群
            clusters = [[]]     
            
            # 如果存在前景像素,则初始化 prev_col 为第一个前景像素列的位置
            if cols.size > 0:
                prev_col = cols[0]
                
            # 水平地解析像素
            for col in cols:
                # 如果当前列与上一个列之间的差值大于给定的阈值,则视为新的集群开始
                if col - prev_col > err_thresh:
                    clusters.append([])  # 新开一个聚类
                    clusters[-1].append(col)
                    prev_col = col
                    continue
                
                # 根据水平透视场(HAF)的值,确定像素点是如何与其它像素相关联的
                if HAF[row, prev_col] >= 0 and HAF[row, col] >= 0: 
                    # 继续向右移动
                    clusters[-1].append(col)
                    prev_col = col
                    continue
                elif HAF[row, prev_col] >= 0 and HAF[row, col] < 0: 
                    # 找到了车道的中心,处理垂直透视场(VAF)
                    clusters[-1].append(col)
                    prev_col = col
                elif HAF[row, prev_col] < 0 and HAF[row, col] >= 0: 
                    # 找到车道的末端,生成新的车道
                    clusters.append([])
                    clusters[-1].append(col)
                    prev_col = col
                    continue
                elif HAF[row, prev_col] < 0 and HAF[row, col] < 0: 
                    # 继续向右移动
                    clusters[-1].append(col)
                    prev_col = col
                    continue
                
            # vaf与haf中心点差距
            # 上一行指向的有一个值和本行估计的值进行就差距,在一定范围内则连成一条线
            # 行列嵌套循环  
            # 分配线的lane id
            # 建立每条线与头坐标与当前行聚类点之间的cost矩阵(线头来源于上一行的end_point)
            # cost前向infer做,模型里面不做
            assigned = [False for _ in clusters]
            C = np.Inf*np.ones((len(lane_end_pts), len(clusters)), dtype=np.float64)
            #计算每一个线头坐标点与当前行聚类点之间的dist_error
            for r, pts in enumerate(lane_end_pts): # for each end point in an active lane
                for c, cluster in enumerate(clusters):
                    if len(cluster) == 0:
                        continue
                    # 计算每一个聚类簇的中心点
                    cluster_mean = np.array([[np.mean(cluster), row]], dtype=np.float32)
                    # 获取每一个线头的坐标在vaf map上的单位向量
                    vafs = np.array([VAF[int(round(x[1])), int(round(x[0])), :] for x in pts], dtype=np.float32)
                    vafs = vafs / np.linalg.norm(vafs, axis=1, keepdims=True) 
                    # 用计算出来的线头坐标结合vaf计算的单位向量来推算下一行的聚类中心
                    pred_points = pts + vafs*np.linalg.norm(pts - cluster_mean, axis=1, keepdims=True)
                    # 计算真正的聚类中心与vaf预测的聚类中心之间的error 
                    error = np.mean(np.linalg.norm(pred_points - cluster_mean, axis=1))
                    # 赋值给线头与当前行的error给cost矩阵
                    C[r, c] = error
            # 获取线头点与当前行聚类点在C.shape下的坐标
            row_ind, col_ind = np.unravel_index(np.argsort(C, axis=None), C.shape)
            for r, c in zip(row_ind, col_ind):
                if C[r, c] >= err_thresh:
                    break
                if assigned[c]:
                    continue
                assigned[c] = True
                # 给当前像素点更新最匹配的lane_id
                output[row, clusters[c]] = r+1
                # 根据当前行匹配好的像素点更新线头坐标列表
                lane_end_pts[r] = np.stack((np.array(clusters[c], dtype=np.float32), row*np.ones_like(clusters[c])), axis=1)
            # 没被分配的线分配新的lane_id
            for c, cluster in enumerate(clusters):
                if len(cluster) == 0:
                    continue
                if not assigned[c]:
                    output[row, cluster] = next_lane_id
                    lane_end_pts.append(np.stack((np.array(cluster, dtype=np.float32), row*np.ones_like(cluster)), axis=1))
                    next_lane_id += 1
        
        # 可视化最终解码的车道线
        if viz:
            im_color = cv2.applyColorMap(40*output, cv2.COLORMAP_JET)
            cv2.imshow('Output', im_color)
            ret = cv2.waitKey(0)

        return output

    # 定义get_lane函数,从预测的字典中获取车道信息
    def get_lane(self, preds_dicts):
        
        # 从预测字典中提取二值化分割、水平注意力场、垂直注意力场和topdown(可能是俯视图或其他相关数据)
        binary_seg, haf_pred, vaf_pred, topdown = preds_dicts

        # 转换为数组
        # 对二值化分割应用sigmoid激活函数,重复三次以匹配三个通道,然后将其从tensor转换为numpy数组
        mask_out = self.tensor2image(torch.sigmoid(binary_seg).repeat(1, 3, 1, 1).detach(), 
            np.array([0.0 for _ in range(3)], dtype='float32'), np.array([1.0 for _ in range(3)], dtype='float32'))
        
        # 转换vaf_pred(垂直注意力场)从PyTorch tensor到numpy数组,并进行适当的维度重排
        vaf_out = np.transpose(vaf_pred[0, :, :, :].detach().cpu().float().numpy(), (1, 2, 0))
        # 转换haf_pred(水平注意力场)从PyTorch tensor到numpy数组,并进行适当的维度重排
        haf_out = np.transpose(haf_pred[0, :, :, :].detach().cpu().float().numpy(), (1, 2, 0))
        # 使用先前定义的decodeAFs方法解码注意力场,以获取车道实例
        seg_out = self.decodeAFs(mask_out[:, :, 0], vaf_out, haf_out, fg_thresh=128, err_thresh=5)

        return seg_out  # 返回解码后的车道实例

    # 定义create_viz函数,用于将车道线的预测可视化到输入图像上
    def create_viz(img, seg, mask, vaf, haf):
        # 设置缩放因子
        scale = 8

        # 对输入图像进行2倍放大
        img = cv2.resize(img, None, fx=2, fy=2, interpolation=cv2.INTER_LINEAR)
        # 将图像转换为连续数组并设置数据类型为uint8
        img = np.ascontiguousarray(img, dtype=np.uint8)
        # 对车道分割应用颜色映射以生成彩色版本
        seg_color = cv2.applyColorMap(40*seg, cv2.COLORMAP_JET)
        # 获取非零的行和列的索引,即车道的位置
        rows, cols = np.nonzero(seg)

        # 对于每一个非零的位置,即车道的位置
        for r, c in zip(rows, cols):
            # 在图像上绘制表示车道线方向的箭头
            img = cv2.arrowedLine(
                img,  # 输入图像
                (c*scale, r*scale),  # 箭头的起始位置
                (int(c*scale+vaf[r, c, 0]*scale*0.75), int(r*scale+vaf[r, c, 1]*scale*0.5)),  # 箭头的结束位置
                seg_color[r, c, :].tolist(),  # 箭头的颜色
                1,  # 箭头的厚度
                tipLength=0.4  # 箭头尖的长度相对于箭头长度的比率
            )

        # 返回可视化后的图像
        return img

    

http://www.niftyadmin.cn/n/5144222.html

相关文章

【监控指标】监控系统-prometheus、grafana。容器化部署。go语言 gin框架、gRPC框架的集成

文章目录 一、监控有哪些指标二、prometheus、grafana架构Prometheus 组件Grafana 组件架构优点 三、安装prometheus和node-exporter1. docker pull镜像2. 启动node-exporter3. 启动prometheus 四、promql基本语法五、grafana的安装和使用1. 新建空文件夹grafana-storage&#…

Asterisk Ubuntu 安装

更新环境 sudo apt update sudo apt install wget build-essential git autoconf subversion pkg-config libtool sudo contrib/scripts/get_mp3_source.sh A addons/mp3 A addons/mp3/common.c A addons/mp3/huffman.h A addons/mp3/tabinit.c A addons/mp3/Ma…

主键、外键约束

一、主键 Primary Key中文又叫做主键&#xff0c;它的作用是用来唯一确定表中的某一行&#xff0c;相当于一个人的身份证号码。 二、外键约束 A表&#xff08;EmployeeTable&#xff09;的外键&#xff08;GenderID&#xff09;&#xff0c;指向B表&#xff08;GenderTable&…

学习c++的第三天

目录 常量 字面常量 整数常量 浮点常量 布尔常量 字符常量 符号常量 const关键字 宏定义 修饰符类型 类型限定符 存储类 auto 存储类 register 存储类&#xff08;被弃用&#xff09; static 存储类 extern 存储类 mutable 存储类 thread_local 存储类 常量 …

C++ 类的设计

一、c类的设计 类 是一种将抽象转换为用户定义类型的 C工具&#xff0c; 它将数据表示操纵数据的方法组合成一个整洁的包。 语法&#xff1a; 其中"class类名"称为类头。花括号中的部分称为类体&#xff0c;类体中定义了类成员表。 在C中&#xff0c;类是一种数据类型…

RDKit(2023.09.1 )环系统模板

环境 Python 3.9RDKit 2023.09.1from rdkit import Chem from rdkit.Chem import rdDepictor from rdkit.Chem import Draw from rdkit.Chem.Draw import IPythonConsoleIPythonConsole.ipython_useSVG=True %matplotlib inline import rdkit print(rdkit.__version__) 2023.09…

并查集易错点

并查集就俩核心点,1是找父节点,2是合并 1: return fa[x] x ? x : fa[x] find(fa[x]); 2. fa[find(a)] find(b) 第二步还挺容易写错的,左边是find(a)的根,而不是fa[a]

OpenSearch使用scroll滚动搜索实战

文章目录 前言当前环境实战遇到的问题 前言 起源和开源性质: Elasticsearch&#xff1a;Elasticsearch最初由Elastic公司开发&#xff0c;后来开源&#xff0c;但采用了Elastic License&#xff0c;这是一种有限制的许可协议。 OpenSearch&#xff1a;OpenSearch是从Elasticsea…