基于RetinaNet和TensorFlow Object Detection API实现目标检测(附源码)

文章目录

  • 一、RetinaNet原理
  • 二、RetinaNet实现
    • 1. tf.train.CheckPoint简介
    • 2. RetinaNet的TensorFlow源码

一、RetinaNet原理

在这里插入图片描述
待补充

二、RetinaNet实现

1. tf.train.CheckPoint简介

待补充

2. RetinaNet的TensorFlow源码

  Step 1:安装Tensorflow 2 Object Detection API及相关包

# 删除models文件夹下所有文件
!rm -rf ./models/
# 拷贝Tensorflow Model Garden
!git clone --depth 1 https://github.com/tensorflow/models/
# 编译Object Detection API protocol buffers
!cd models/research/ && protoc object_detection/protos/*.proto --python_out=.

%%writefile models/research/setup.py
import os
from setuptools import find_packages
from setuptools import setup

REQUIRED_PACKAGES = [
    'tf-models-official==2.8.0',
    'tensorflow_io==0.24.0',
    'numpy==1.21.5'
]

setup(
    name='object_detection',
    version='0.1',
    install_requires=REQUIRED_PACKAGES,
    include_package_data=True,
    packages=(
        [p for p in find_packages() if p.startswith('object_detection')] +
        find_packages(where=os.path.join('.', 'slim'))),
    package_dir={
        'datasets': os.path.join('slim', 'datasets'),
        'nets': os.path.join('slim', 'nets'),
        'preprocessing': os.path.join('slim', 'preprocessing'),
        'deployment': os.path.join('slim', 'deployment'),
        'scripts': os.path.join('slim', 'scripts'),
    },
    description='Tensorflow Object Detection Library',
    python_requires='>3.6',
)

# Run the setup script you just wrote
!python -m pip install models/research

  Step 2:导入包

import matplotlib
import matplotlib.pyplot as plt

import os
import random
import io
import imageio
import glob
import scipy.misc
import numpy as np
from six import BytesIO
from PIL import Image, ImageDraw, ImageFont
from IPython.display import display, Javascript
from IPython.display import Image as IPyImage

import tensorflow as tf

from object_detection.utils import label_map_util
from object_detection.utils import config_util
from object_detection.utils import visualization_utils as viz_utils
from object_detection.utils import colab_utils
from object_detection.builders import model_builder

%matplotlib inline

  Step 3:图片加载&画图工具函数定义

def load_image_into_numpy_array(path):
  """Load an image from file into a numpy array.

  Puts image into numpy array to feed into tensorflow graph.
  Note that by convention we put it into a numpy array with shape
  (height, width, channels), where channels=3 for RGB.

  Args:
    path: a file path.

  Returns:
    uint8 numpy array with shape (img_height, img_width, 3)
  """
  img_data = tf.io.gfile.GFile(path, 'rb').read()
  image = Image.open(BytesIO(img_data))
  (im_width, im_height) = image.size
  return np.array(image.getdata()).reshape(
      (im_height, im_width, 3)).astype(np.uint8)

def plot_detections(image_np,
                    boxes,
                    classes,
                    scores,
                    category_index,
                    figsize=(12, 16),
                    image_name=None):
  """Wrapper function to visualize detections.

  Args:
    image_np: uint8 numpy array with shape (img_height, img_width, 3)
    boxes: a numpy array of shape [N, 4]
    classes: a numpy array of shape [N]. Note that class indices are 1-based,
      and match the keys in the label map.
    scores: a numpy array of shape [N] or None.  If scores=None, then
      this function assumes that the boxes to be plotted are groundtruth
      boxes and plot all boxes as black with no classes or scores.
    category_index: a dict containing category dictionaries (each holding
      category index `id` and category name `name`) keyed by category indices.
    figsize: size for the figure.
    image_name: a name for the image file.
  """
  image_np_with_annotations = image_np.copy()
  viz_utils.visualize_boxes_and_labels_on_image_array(
      image_np_with_annotations,
      boxes,
      classes,
      scores,
      category_index,
      use_normalized_coordinates=True,
      min_score_thresh=0.8)
  if image_name:
    plt.imsave(image_name, image_np_with_annotations)
  else:
    plt.imshow(image_np_with_annotations)

  Step 4:下载训练图片集(此处以training-zombie为例)

# download the images
!wget --no-check-certificate \
    https://storage.googleapis.com/tensorflow-3-public/datasets/training-zombie.zip \
    -O ./training-zombie.zip

import zipfile
# unzip to a local directory
local_zip = './training-zombie.zip'
zip_ref = zipfile.ZipFile(local_zip, 'r')
zip_ref.extractall('./training')
zip_ref.close()

  Step 5:切换训练图片的路径,初始化训练图片list,并展示样例

train_image_dir = './training'
train_image_name = 'training-zombie'

# Load images and visualize
train_images_np = []
for i in range(1, 6):
  image_path = os.path.join(train_image_dir, train_image_name + str(i) + '.jpg')
  train_images_np.append(load_image_into_numpy_array(image_path))

plt.rcParams['axes.grid'] = False
plt.rcParams['xtick.labelsize'] = False
plt.rcParams['ytick.labelsize'] = False
plt.rcParams['xtick.top'] = False
plt.rcParams['xtick.bottom'] = False
plt.rcParams['ytick.left'] = False
plt.rcParams['ytick.right'] = False
plt.rcParams['figure.figsize'] = [14, 7]

for idx, train_image_np in enumerate(train_images_np):
  plt.subplot(2, 3, idx+1)  # 2, 3 -> 1, 5
  plt.imshow(train_image_np)
plt.show() # 样例展示

  样例效果如下图:
在这里插入图片描述
  Step 6:初始化边框位置(人为确定真实框线的坐标,用于训练)

gt_boxes = [
        np.array([[0.27333333, 0.41500586, 0.74333333, 0.57678781]], dtype=np.float32),
        np.array([[0.29833333, 0.45955451, 0.75666667, 0.61078546]], dtype=np.float32),
        np.array([[0.40833333, 0.18288394, 0.945, 0.34818288]], dtype=np.float32),
        np.array([[0.16166667, 0.61899179, 0.8, 0.91910903]], dtype=np.float32),
        np.array([[0.28833333, 0.12543962, 0.835, 0.35052755]], dtype=np.float32),
]

  Step 7:初始化待检测目标的label和分类,由于我们只检测一种物体,故分类为1

zombie_class_id = 1
num_classes = 1

category_index = {zombie_class_id: {'id': zombie_class_id, 'name': 'zombie'}}

  Step 8:将训练数据转换为tensor(即TensorFlow可识别的数据格式)

label_id_offset = 1
train_image_tensors = []
gt_classes_one_hot_tensors = []
gt_box_tensors = []
for (train_image_np, gt_box_np) in zip(
    train_images_np, gt_boxes):
  train_image_tensors.append(tf.expand_dims(tf.convert_to_tensor(
      train_image_np, dtype=tf.float32), axis=0))
  gt_box_tensors.append(tf.convert_to_tensor(gt_box_np, dtype=tf.float32))
  zero_indexed_groundtruth_classes = tf.convert_to_tensor(
      np.ones(shape=[gt_box_np.shape[0]], dtype=np.int32) - label_id_offset)
  gt_classes_one_hot_tensors.append(tf.one_hot(
      zero_indexed_groundtruth_classes, num_classes))
print('Done prepping data.')

  Step 9:展示准备好的训练tensor和边框(在数据的预处理过程中,要多观察数据是否正确)

dummy_scores = np.array([1.0], dtype=np.float32)  # give boxes a score of 100%

plt.figure(figsize=(30, 15))
for idx in range(5):
  plt.subplot(2, 3, idx+1)
  plot_detections(
      train_images_np[idx],
      gt_boxes[idx],
      np.ones(shape=[gt_boxes[idx].shape[0]], dtype=np.int32),
      dummy_scores, category_index)
plt.show()

  展示效果如下图:
在这里插入图片描述
  Step 10:下载Retinanet模型

!wget http://download.tensorflow.org/models/object_detection/tf2/20200711/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
!tar -xf ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.tar.gz
!mv ssd_resnet50_v1_fpn_640x640_coco17_tpu-8/checkpoint models/research/object_detection/test_data/

  Step 11:模型加载、修改(主要修改检测物体的类别数量)、weights初始化(通过假数据的预测初始化weights)

tf.keras.backend.clear_session()

print('Building model and restoring weights for fine-tuning...', flush=True)
num_classes = 1
pipeline_config = 'models/research/object_detection/configs/tf2/ssd_resnet50_v1_fpn_640x640_coco17_tpu-8.config'
checkpoint_path = 'models/research/object_detection/test_data/checkpoint/ckpt-0'

# Load pipeline config and build a detection model.
configs = config_util.get_configs_from_pipeline_file(pipeline_config)
model_config = configs['model']
model_config.ssd.num_classes = num_classes
model_config.ssd.freeze_batchnorm = True
detection_model = model_builder.build(
      model_config=model_config, is_training=True)

fake_box_predictor = tf.train.Checkpoint(
    _base_tower_layers_for_heads=detection_model._box_predictor._base_tower_layers_for_heads,
    _box_prediction_head=detection_model._box_predictor._box_prediction_head,
    )
fake_model = tf.train.Checkpoint(
          _feature_extractor=detection_model._feature_extractor,
          _box_predictor=fake_box_predictor)
ckpt = tf.train.Checkpoint(model=fake_model)
ckpt.restore(checkpoint_path)

# Run model through a dummy image so that variables are created
image, shapes = detection_model.preprocess(tf.zeros([1, 640, 640, 3]))
prediction_dict = detection_model.predict(image, shapes)
_ = detection_model.postprocess(prediction_dict, shapes)
print('Weights restored!')

  Step 12:定义train_step和train_loop

tf.keras.backend.set_learning_phase(True)

# 训练参数设置
batch_size = 4
learning_rate = 0.01
num_batches = 100

# 从模型中选择需要fine tune的参数
trainable_variables = detection_model.trainable_variables
to_fine_tune = []
prefixes_to_train = [
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalBoxHead',
  'WeightSharedConvolutionalBoxPredictor/WeightSharedConvolutionalClassHead']
for var in trainable_variables:
  if any([var.name.startswith(prefix) for prefix in prefixes_to_train]):
    to_fine_tune.append(var)

# train_step.
def get_model_train_step_function(model, optimizer, vars_to_fine_tune):
  """Get a tf.function for training step."""
  @tf.function
  def train_step_fn(image_tensors,
                    groundtruth_boxes_list,
                    groundtruth_classes_list):
    """A single training iteration.
    Args:
      image_tensors: A list of [1, height, width, 3] Tensor of type tf.float32.
        Note that the height and width can vary across images, as they are
        reshaped within this function to be 640x640.
      groundtruth_boxes_list: A list of Tensors of shape [N_i, 4] with type
        tf.float32 representing groundtruth boxes for each image in the batch.
      groundtruth_classes_list: A list of Tensors of shape [N_i, num_classes]
        with type tf.float32 representing groundtruth boxes for each image in
        the batch.

    Returns:
      A scalar tensor representing the total loss for the input batch.
    """
    shapes = tf.constant(batch_size * [[640, 640, 3]], dtype=tf.int32)
    model.provide_groundtruth(
        groundtruth_boxes_list=groundtruth_boxes_list,
        groundtruth_classes_list=groundtruth_classes_list)
    with tf.GradientTape() as tape:
      preprocessed_images = tf.concat(
          [detection_model.preprocess(image_tensor)[0]
           for image_tensor in image_tensors], axis=0)
      prediction_dict = model.predict(preprocessed_images, shapes)
      losses_dict = model.loss(prediction_dict, shapes)
      total_loss = losses_dict['Loss/localization_loss'] + losses_dict['Loss/classification_loss']
      gradients = tape.gradient(total_loss, vars_to_fine_tune)
      optimizer.apply_gradients(zip(gradients, vars_to_fine_tune))
    return total_loss

  return train_step_fn

# 优化器
optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=0.9)
train_step_fn = get_model_train_step_function(
    detection_model, optimizer, to_fine_tune)

print('Start fine-tuning!', flush=True)

# 开始训练(即train_loop)
for idx in range(num_batches):
  # Grab keys for a random subset of examples
  all_keys = list(range(len(train_images_np)))
  random.shuffle(all_keys)
  example_keys = all_keys[:batch_size]

  gt_boxes_list = [gt_box_tensors[key] for key in example_keys]
  gt_classes_list = [gt_classes_one_hot_tensors[key] for key in example_keys]
  image_tensors = [train_image_tensors[key] for key in example_keys]

  # Training step (forward pass + backwards pass)
  total_loss = train_step_fn(image_tensors, gt_boxes_list, gt_classes_list)

  if idx % 10 == 0:
    print('batch ' + str(idx) + ' of ' + str(num_batches)
    + ', loss=' +  str(total_loss.numpy()), flush=True)

print('Done fine-tuning!')

  Step 13:下载测试图片,用来测试上一步训练好的模型

# uncomment if you want to delete existing files
!rm zombie-walk-frames.zip
!rm -rf ./zombie-walk
!rm -rf ./results

# download test images
!wget --no-check-certificate \
    https://storage.googleapis.com/tensorflow-3-public/datasets/zombie-walk-frames.zip \
    -O zombie-walk-frames.zip

# unzip test images
local_zip = './zombie-walk-frames.zip'
zip_ref = zipfile.ZipFile(local_zip, 'r')
zip_ref.extractall('./results')
zip_ref.close()

  Step 14:测试image to numpy 转换

test_image_dir = './results/'
test_images_np = []

# load images into a numpy array. this will take a few minutes to complete.
for i in range(0, 237):
    image_path = os.path.join(test_image_dir, 'zombie-walk' + "{0:04}".format(i) + '.jpg')
    print(image_path)
    test_images_np.append(np.expand_dims(
      load_image_into_numpy_array(image_path), axis=0))

  Step 15:目标检测函数定义

@tf.function
def detect(input_tensor):
    """Run detection on an input image.
    Args:
    input_tensor: A [1, height, width, 3] Tensor of type tf.float32.
      Note that height and width can be anything since the image will be
      immediately resized according to the needs of the model within this
      function.

    Returns:
    A dict containing 3 Tensors (`detection_boxes`, `detection_classes`,
      and `detection_scores`).
    """
    preprocessed_image, shapes = detection_model.preprocess(input_tensor)
    prediction_dict = detection_model.predict(preprocessed_image, shapes)
    
    detections = detection_model.postprocess(prediction_dict, shapes)
    
    return detections

  Step 16:调用目标检测函数,测试模型准确度

label_id_offset = 1
results = {'boxes': [], 'scores': []}

i = 150
images_np = test_images_np
# input_tensor = train_image_tensors[i]
input_tensor = tf.convert_to_tensor(images_np[i], dtype=tf.float32)
detections = detect(input_tensor)

detections['detection_boxes'][0].shape
detections['detection_classes'][0].shape
plot_detections(
  images_np[i][0],
  detections['detection_boxes'][0].numpy(),
  detections['detection_classes'][0].numpy().astype(np.uint32)
  + label_id_offset,
  detections['detection_scores'][0].numpy(),
  category_index, figsize=(15, 20))

  测试结果如下图:
在这里插入图片描述
  由此可见,模型的检测效果符合预期。


http://www.niftyadmin.cn/n/362355.html

相关文章

Linux Shell 实现部署单机Oracle 21C

oracle前言 Oracle开发的关系数据库产品因性能卓越而闻名,Oracle数据库产品为财富排行榜上的前1000家公司所采用,许多大型网站也选用了Oracle系统,是世界最好的数据库产品。此外,Oracle公司还开发其他应用程序和软件。同时&#…

Spring如何解决循环依赖问题

Spring如何解决循环依赖问题 Spring如何解决循环依赖问题 Spring如何解决循环依赖问题 循环依赖问题在Spring中主要有三种情况: (1)通过构造方法进行依赖注入时产生的循环依赖问题。 (2)通过setter方法进行依赖注入且…

轻松安装Redis:不用担心配置问题

一、Centos安装Redis 1.安装 EPEL 源 Redis 不在 CentOS 官方仓库中,需要安装 EPEL 源才能访问到 Redis 软件包。运行以下命令安装 EPEL 源: sudo yum install epel-release 2.安装 Redis 使用以下命令安装 Redis: sudo yum install re…

常用监控方案 Prometheus + Grafana 简单使用小结

文章目录 前言一、概念1.1 发展1.2 时序数据1.3 Metric 二、Prometheus2.1 架构2.2 配置2.3 查询语言PromQL2.4 Exporter 三、Grafana3.1 数据源3.2 权限3.3 面板可视化3.4 仪表盘 四、实战4.1 监控 Windows/Linux4.2 监控 JVM4.3 监控 MySQL4.4 监控 Springboot API 参考 前言…

HP打印机网络连接扫描仪失败

财务反映,使用主机上的HP LaserJet Pro M329-HP Scan连接扫描仪提示失败。 测试果然失败,提示如下图: 点击修复后,提示 需要安装HP Print and Scan Doctor。 同意安装,并启动HP Print and Scan Doctor 点击开始,其开始搜索,过程有点慢。 第一次发现其连接错误,居然…

网络安全大厂面试题汇总

注:本套面试题,已整理成pdf文档,但内容还在持续更新中,因为无论如何都不可能覆盖所有的面试问题,更多的还是希望由点达面,查漏补缺。 ​ 某大厂一面整理 1、防范常见的 Web 攻击 2、重要协议分布层 3、ar…

Kubernetes(k8s)安装教程简洁版

k8s部署安装 本次搭建了三台虚拟机(系统centos),一台master节点,两台worker节点 master节点主要是控制平台,不跑任务;worker节点主要用来跑任务,可以扩缩容 所需组件 docker(也可…

Flink第七章:状态编程

系列文章目录 Flink第一章:环境搭建 Flink第二章:基本操作. Flink第三章:基本操作(二) Flink第四章:水位线和窗口 Flink第五章:处理函数 Flink第六章:多流操作 Flink第七章:状态编程 文章目录 系列文章目录前言一、Keyed State(按键分区)1.KeyedStateTest.scala2.PeriodicPVEx…