乐趣区

关于python:label-studio-结合-MMDetection-实现数据集自动标记模型迭代训练的闭环

前言

一个 AI 方向的敌人因为标数据集发了篇 SCI 论文,看着他标了两个多月的数据集这么辛苦,就想着人工智能都能站在围棋巅峰了,难道不能动动小手为本人标数据吗?查了一下还真有一些可能满足此需要的框架,比方 cvat、doccano、label studio 等,通过简略的比照后发现还是 label studio 最好用。本文首先介绍了 label studio 的装置过程;而后应用 MMDetection 作为后端人脸检测标记框架,并通过 label studio ml 将 MMDetection 模型封装成 label studio 后端服务,实现数据集的主动标记 1;最初参考 label studio ml 示例,为本人的 MMDetection 人脸标记模型设计了一种迭代训练方法,使之可能一直随着标记数据的减少而跟进训练,最终实现了模型主动标记数据集、数据集更新迭代训练模型的闭环。

依赖装置

本我的项目波及的源码已开源在 label-studio-demo 中,所应用的软件版本如下,其中 MMDetection 的版本及配置参考 MMDetection 应用示例:从入门到出门:

软件 版本
label-studio 1.6.0
label-studio-ml 1.0.8
label-studio-tools 0.0.1

本文最终我的项目目录构造如下:

LabelStudio
├── backend         // 后端性能
│   ├── examples    // label studio ml 官网示例(非必须)│   ├── mmdetection // mmdetection 人脸检测模型
│   ├── model       // label studio ml 生成的后端服务(主动生成)│   ├── workdir     // 模型训练时工作目录
│   |   ├── fcos_common_base.pth    // 后端模型根底权重文件
│   |   └── latest.pth              // 后端模型最新权重文件
│   └── runbackend.bat  // 生成并启动后端服务的脚本文件
├── dataset         // 试验所用数据集(非必须)├── label_studio.sqlite3    // label studio 数据库文件
├── media      
│   ├── export
│   └── upload  // 上传的待标记数据集
└── run.bat     // 启动 label studio 的脚本文件(非必须)

label studio 装置启动

label-studio 是一个开源的多媒体数据标注工具(用来提供根本标注性能的 GUI),并且能够很不便的将标注后果导出为多种常见的数据格式。其装置办法次要有以下几种:

  1. Docker

    docker pull heartexlabs/label-studio:latest
  2. pip

    pip install label-studio

    倡议是通过 pip 装置,其配置更清晰不便。环境装置实现后在任意地位关上命令行,应用以下命令启动 label studio:

    label-studio --data-dir LabelStudio -p 80

    其中 --data-dir 用于指定工作目录,-p 用来指定运行端口,运行胜利后会当前目录会生成 LabelStudio 目录:

    并弹出浏览器关上 label studio 工作界面,创立用户后即可登录应用:

label studio ml 装置

label studio ml 是 label studio 的后端配置,其次要提供了一种可能疾速将 AI 模型封装为 label studio 可应用的预标记服务(提供模型预测服务)。其装置办法有以下几种:

  1. GitHub 装置

    git clone https://github.com/heartexlabs/label-studio-ml-backend 
    cd label-studio-ml-backend
    pip install -U -e .
  2. pip 装置:

    pip install label-studio-ml

    依然倡议通过 pip 装置,GitHub 装置可能会有依赖问题。装置实现后应用 label-studio-ml -h 命令查看是否装置胜利。

前端配置

在 label studio 前端主页中抉择创立我的项目:

  1. 我的项目根本信息
  2. 导入数据
    间接将图片选中拖入数据框即可。
  3. 抉择标记模板
    label studio 内置了很多常见的深度学习标记模板,本示例是人脸识别,所以抉择 Object Detection with Bounding Boxes 模板,确定后将模板内自带的 Airplane、Car 标签删除,而后增加自定义的标签 face(标签的类别数量能够比后端反对的类别多,也能够更少,然而同类别的标签名必须统一)。

此时咱们曾经能够通过 label studio 进行一般的图片标记工作,如果要应用其提供的辅助预标记性能,则须要进行后续配置。

后端配置

选取后端模型

在 MMDetection 应用示例:从入门到出门 中,咱们曾经实现了基于 celeba100 数据集的人脸检测模型的训练,本文将间接应用其中训练的后果模型。

后端服务实现

引入后端模型

在根目录下创立 backend 目录,并将 MMDetection 应用示例:从入门到出门 中的整个我的项目文件复制其中,此时我的项目目录为:

.
├── backend
│   └── mmdetection             // 复制的 mmdetection 文件夹
│        ├── checkpoints
│        ├── completion.json
│        ├── configs
│        ├── conf.yaml
│        ├── detect.py
│        ├── label_studio_backend.py      // 须要本人实现的后端模型
│        ├── mmdet
│        ├── model
│        ├── test.py
│        ├── tools
│        └── train.py
├── dataset
├── export
├── label-studio-ml-backend
├── label_studio.sqlite3
├── media
└── run.bat

创立后端模型

label studio 的后端模型有本人固定的写法,只有继承 label_studio_ml.model.LabelStudioMLBase 类并实现其中的接口都能够作为 label studio 的后端服务。在 mmdetection 文件夹下创立 label_studio_backend.py 文件,而后在文件中引入通用配置:

ROOT = os.path.join(os.path.dirname(__file__))
print('=> ROOT =', ROOT)
# label-studio 启动的前端服务地址
os.environ['HOSTNAME'] = 'http://localhost:80'
# label-studio 中对应用户的 API_KEY
os.environ['API_KEY'] = '37edbb42f1b3a73376548ea6c4bc7b3805d63453'
HOSTNAME = get_env('HOSTNAME')
API_KEY = get_env('API_KEY')

print('=> LABEL STUDIO HOSTNAME =', HOSTNAME)
if not API_KEY:
    print('=> WARNING! API_KEY is not set')

with open(os.path.join(ROOT, "conf.yaml"), errors='ignore') as f:
    conf = yaml.safe_load(f)

这里的 API_KEY 能够在前端的 Account & Settings 中找到。

而后在 label_studio_backend.py 中创立本人预标记模型的类,使其继承 label_studio_ml.model.LabelStudioMLBase 并实现要害办法,不同办法对应不同性能,前面会陆续实现:

class MyModel(LabelStudioMLBase):
    def __init__(self, **kwargs):
        pass
    def predict(self, tasks, **kwargs):
        pass
    def fit(self, completions, batch_size=32, num_epochs=5, **kwargs):
        pass
    def gen_train_data(self, project_id):
        pass

实现其中的 __init__ 办法,以实现模型初始化性能(必须):

    def __init__(self, **kwargs):
        super(MyModel, self).__init__(**kwargs)
        # 按 mmdetection 的形式加载模型及权重
        if self.train_output:
            self.detector = init_detector(conf['config_file'], self.train_output['model_path'], device=conf['device'])
        else:
            self.detector = init_detector(conf['config_file'], conf['checkpoint_file'], device=conf['device'])
        # 获取后端模型标签列表
        self.CLASSES = self.detector.CLASSES
        # 前端配置的标签列表
        self.labels_in_config = set(self.labels_in_config)  
        # 一些我的项目相干常量
        self.from_name, self.to_name, self.value, self.labels_in_config = get_single_tag_keys(self.parsed_label_config, 'RectangleLabels', 'Image')  # 前端获取工作属性 

实现其中的 predict 办法,以实现预标记模型的标记性能(必须):

    def predict(self, tasks, **kwargs):
        # 获取待标记图片
        images = [get_local_path(task['data'][self.value], hostname=HOSTNAME, access_token=API_KEY) for task in tasks]
        for image_path in images:
            w, h = get_image_size(image_path)
            # 推理演示图像
            img = mmcv.imread(image_path)
            # 以 mmdetection 的办法进行推理
            result = inference_detector(self.detector, img)
            # 手动获取标记框地位
            bboxes = np.vstack(result)
            # 手动获取推理后果标签
            labels = [np.full(bbox.shape[0], i, dtype=np.int32) for i, bbox in enumerate(result)]
            labels = np.concatenate(labels)
            # 推理分数  FCOS 算法后果会多进去两个分数极低的检测框,须要将其过滤掉
            scores = bboxes[:, -1]
            score_thr = 0.3
            inds = scores > score_thr
            bboxes = bboxes[inds, :]
            labels = labels[inds]
            results = []  # results 须要放在 list 中再返回
            for id, bbox in enumerate(bboxes):
                label = self.CLASSES[labels[id]]
                if label not in self.labels_in_config:
                    print(label + 'label not found in project config.')
                    continue
                results.append({'id': str(id),                                      # 必须为 str,否则前端不显示
                    'from_name': self.from_name,
                    'to_name': self.to_name,
                    'type': 'rectanglelabels',
                    'value': {'rectanglelabels': [label],
                        'x': bbox[0] / w * 100,                         # xy 为左上角坐标点
                        'y': bbox[1] / h * 100,
                        'width': (bbox[2] - bbox[0]) / w * 100,         # width,height 为宽高
                        'height': (bbox[3] - bbox[1]) / h * 100
                    },
                    'score': float(bbox[4] * 100)
                })
            avgs = bboxes[:, -1]
            results = [{'result': results, 'score': np.average(avgs) * 100}]
            return results

实现其中的 gen_train_data 办法,以获取标记实现的数据用来训练(非必须,其实 label studio 自带此类办法,但在实际过程中有各种问题,所以本人写了一遍):

    def gen_train_data(self, project_id):
        import zipfile
        import glob
        download_url = f'{HOSTNAME.rstrip("/")}/api/projects/{project_id}/export?export_type=COCO&download_all_tasks=false&download_resources=true'
        response = requests.get(download_url, headers={'Authorization': f'Token {API_KEY}'})
        zip_path = os.path.join(conf['workdir'], "train.zip")
        train_path = os.path.join(conf['workdir'], "train")

        with open(zip_path, 'wb') as file:
            file.write(response.content)  # 通过二进制写文件的形式保留获取的内容
            file.flush()
        f = zipfile.ZipFile(zip_path)  # 创立压缩包对象
        f.extractall(train_path)  # 压缩包解压缩
        f.close()
        os.remove(zip_path)
        if not os.path.exists(os.path.join(train_path, "images", str(project_id))):
            os.makedirs(os.path.join(train_path, "images", str(project_id)))
        for img in glob.glob(os.path.join(train_path, "images", "*.jpg")):
            basename = os.path.basename(img)
            shutil.move(img, os.path.join(train_path, "images", str(project_id), basename))
        return True

实现其中的 fit 办法,以实现预标记模型的自训练性能(非必须):

    def fit(self, completions, num_epochs=5, **kwargs):
        if completions:     # 应用办法 1 获取 project_id
            image_urls, image_labels = [], []
            for completion in completions:
                project_id = completion['project']
                u = completion['data'][self.value]
                image_urls.append(get_local_path(u, hostname=HOSTNAME, access_token=API_KEY))
                image_labels.append(completion['annotations'][0]['result'][0]['value'])
        elif kwargs.get('data'):    # 应用办法 2 获取 project_id
            project_id = kwargs['data']['project']['id']
            if not self.parsed_label_config:
                self.load_config(kwargs['data']['project']['label_config'])
        if self.gen_train_data(project_id):
            # 应用 mmdetection 的办法训练模型
            from tools.mytrain import MyDict, train
            args = MyDict()
            args.config = conf['config_file']
            data_root = os.path.join(conf['workdir'], "train")
            args.cfg_options = {}
            args.cfg_options['data_root'] = data_root
            args.cfg_options['runner'] = dict(type='EpochBasedRunner', max_epochs=num_epochs)
            args.cfg_options['data'] = dict(train=dict(img_prefix=data_root, ann_file=data_root + '/result.json'),
                val=dict(img_prefix=data_root, ann_file=data_root + '/result.json'),
                test=dict(img_prefix=data_root, ann_file=data_root + '/result.json'),
            )
            args.cfg_options['load_from'] = conf['checkpoint_file']
            args.work_dir = os.path.join(data_root, "work_dir")
            train(args)
            checkpoint_name = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time())) + ".pth"
            shutil.copy(os.path.join(args.work_dir, "latest.pth"), os.path.join(conf['workdir'], checkpoint_name))
            print("model train complete!")
            # 权重文件保留至运行环境,将在下次运行 init 初始化时加载
            return {'model_path': os.path.join(conf['workdir'], checkpoint_name)}
        else:
            raise "gen_train_data error"

上述残缺代码如下:

import os
import yaml
import time
import shutil
import requests
import numpy as np
from label_studio_ml.model import LabelStudioMLBase
from label_studio_ml.utils import get_image_size, get_single_tag_keys
from label_studio_tools.core.utils.io import get_local_path
from label_studio_ml.utils import get_env

from mmdet.apis import init_detector, inference_detector
import mmcv

ROOT = os.path.join(os.path.dirname(__file__))
print('=> ROOT =', ROOT)
os.environ['HOSTNAME'] = 'http://localhost:80'
os.environ['API_KEY'] = '37edbb42f1b3a73376548ea6c4bc7b3805d63453'
HOSTNAME = get_env('HOSTNAME')
API_KEY = get_env('API_KEY')

print('=> LABEL STUDIO HOSTNAME =', HOSTNAME)
if not API_KEY:
    print('=> WARNING! API_KEY is not set')

with open(os.path.join(ROOT, "conf.yaml"), errors='ignore') as f:
    conf = yaml.safe_load(f)


class MyModel(LabelStudioMLBase):

    def __init__(self, **kwargs):
        super(MyModel, self).__init__(**kwargs)
        # 按 mmdetection 的形式加载模型及权重
        if self.train_output:
            self.detector = init_detector(conf['config_file'], self.train_output['model_path'], device=conf['device'])
        else:
            self.detector = init_detector(conf['config_file'], conf['checkpoint_file'], device=conf['device'])
        # 获取后端模型标签列表
        self.CLASSES = self.detector.CLASSES
        # 前端配置的标签列表
        self.labels_in_config = set(self.labels_in_config)  
        # 一些我的项目相干常量
        self.from_name, self.to_name, self.value, self.labels_in_config = get_single_tag_keys(self.parsed_label_config, 'RectangleLabels', 'Image')  # 前端获取工作属性

    def predict(self, tasks, **kwargs):
        # 获取待标记图片
        images = [get_local_path(task['data'][self.value], hostname=HOSTNAME, access_token=API_KEY) for task in tasks]
        for image_path in images:
            w, h = get_image_size(image_path)
            # 推理演示图像
            img = mmcv.imread(image_path)
            # 以 mmdetection 的办法进行推理
            result = inference_detector(self.detector, img)
            # 手动获取标记框地位
            bboxes = np.vstack(result)
            # 手动获取推理后果标签
            labels = [np.full(bbox.shape[0], i, dtype=np.int32) for i, bbox in enumerate(result)]
            labels = np.concatenate(labels)
            # 推理分数  FCOS 算法后果会多进去两个分数极低的检测框,须要将其过滤掉
            scores = bboxes[:, -1]
            score_thr = 0.3
            inds = scores > score_thr
            bboxes = bboxes[inds, :]
            labels = labels[inds]
            results = []  # results 须要放在 list 中再返回
            for id, bbox in enumerate(bboxes):
                label = self.CLASSES[labels[id]]
                if label not in self.labels_in_config:
                    print(label + 'label not found in project config.')
                    continue
                results.append({'id': str(id),                                      # 必须为 str,否则前端不显示
                    'from_name': self.from_name,
                    'to_name': self.to_name,
                    'type': 'rectanglelabels',
                    'value': {'rectanglelabels': [label],
                        'x': bbox[0] / w * 100,                         # xy 为左上角坐标点
                        'y': bbox[1] / h * 100,
                        'width': (bbox[2] - bbox[0]) / w * 100,         # width,height 为宽高
                        'height': (bbox[3] - bbox[1]) / h * 100
                    },
                    'score': float(bbox[4] * 100)
                })
            avgs = bboxes[:, -1]
            results = [{'result': results, 'score': np.average(avgs) * 100}]
            return results

    def fit(self, completions, num_epochs=5, **kwargs):
        if completions:     # 应用办法 1 获取 project_id
            image_urls, image_labels = [], []
            for completion in completions:
                project_id = completion['project']
                u = completion['data'][self.value]
                image_urls.append(get_local_path(u, hostname=HOSTNAME, access_token=API_KEY))
                image_labels.append(completion['annotations'][0]['result'][0]['value'])
        elif kwargs.get('data'):    # 应用办法 2 获取 project_id
            project_id = kwargs['data']['project']['id']
            if not self.parsed_label_config:
                self.load_config(kwargs['data']['project']['label_config'])
        if self.gen_train_data(project_id):
            # 应用 mmdetection 的办法训练模型
            from tools.mytrain import MyDict, train
            args = MyDict()
            args.config = conf['config_file']
            data_root = os.path.join(conf['workdir'], "train")
            args.cfg_options = {}
            args.cfg_options['data_root'] = data_root
            args.cfg_options['runner'] = dict(type='EpochBasedRunner', max_epochs=num_epochs)
            args.cfg_options['data'] = dict(train=dict(img_prefix=data_root, ann_file=data_root + '/result.json'),
                val=dict(img_prefix=data_root, ann_file=data_root + '/result.json'),
                test=dict(img_prefix=data_root, ann_file=data_root + '/result.json'),
            )
            args.cfg_options['load_from'] = conf['checkpoint_file']
            args.work_dir = os.path.join(data_root, "work_dir")
            train(args)
            checkpoint_name = time.strftime("%Y%m%d%H%M%S", time.localtime(time.time())) + ".pth"
            shutil.copy(os.path.join(args.work_dir, "latest.pth"), os.path.join(conf['workdir'], checkpoint_name))
            print("model train complete!")
            # 权重文件保留至运行环境,将在下次运行 init 初始化时加载
            return {'model_path': os.path.join(conf['workdir'], checkpoint_name)}
        else:
            raise "gen_train_data error"

    def gen_train_data(self, project_id):
        import zipfile
        import glob
        download_url = f'{HOSTNAME.rstrip("/")}/api/projects/{project_id}/export?export_type=COCO&download_all_tasks=false&download_resources=true'
        response = requests.get(download_url, headers={'Authorization': f'Token {API_KEY}'})
        zip_path = os.path.join(conf['workdir'], "train.zip")
        train_path = os.path.join(conf['workdir'], "train")

        with open(zip_path, 'wb') as file:
            file.write(response.content)  # 通过二进制写文件的形式保留获取的内容
            file.flush()
        f = zipfile.ZipFile(zip_path)  # 创立压缩包对象
        f.extractall(train_path)  # 压缩包解压缩
        f.close()
        os.remove(zip_path)
        if not os.path.exists(os.path.join(train_path, "images", str(project_id))):
            os.makedirs(os.path.join(train_path, "images", str(project_id)))
        for img in glob.glob(os.path.join(train_path, "images", "*.jpg")):
            basename = os.path.basename(img)
            shutil.move(img, os.path.join(train_path, "images", str(project_id), basename))
        return True

启动后端服务

以下命令为 window 脚本,皆在 backend 根目录下执行。

  1. 依据后端模型生成服务代码

    label-studio-ml init model --script mmdetection/label_studio_backend.py --force

    label-studio-ml init 命令提供了一种依据后端模型主动生成后端服务代码的性能,model 为输入目录,--script 指定后端模型门路,--force 示意笼罩生成。该命令执行胜利后会在 backend 目录下生成 model 目录。

  2. 复制 mmdetection 依赖文件
    因为 label-studio-ml 生成的后端服务代码只蕴含根本的 label_studio_backend.py 中的内容,而咱们所用的 mmdetection 框架的执行须要大量额定的依赖,所以须要手动将这些依赖复制到生成的 model 目录中。应用以下命令实现主动复制依赖:

    md .\model\mmdet
    md .\model\model
    md .\model\configs
    md .\model\checkpoints
    md .\model\tools
    md .\model\workdir
    xcopy .\mmdetection\mmdet .\model\mmdet /S /Y /Q
    xcopy .\mmdetection\model .\model\model /S /Y /Q
    xcopy .\mmdetection\configs .\model\configs  /S /Y /Q
    xcopy .\mmdetection\checkpoints .\model\checkpoints  /S /Y /Q
    xcopy .\mmdetection\tools .\model\tools  /S /Y /Q
    copy .\mmdetection\conf.yaml .\model\conf.yaml
  3. 启动后端服务

    label-studio-ml start model --host 0.0.0.0 -p 8888

    启动胜利后成果如下:

    前端主动标注

    后面咱们曾经可能从 label studio 前端失常手动标注图片,要想实现主动标注,则须要在前端引入后端服务。在咱们创立的我的项目中顺次抉择 Settings ->
    Machine Learning -> Add model,而后输出后端地址 http://10.100.143.125:8888/ 点击保留(此地址为命令行打印地址,而非 http://127.0.0.1:8888/):

    此时咱们从前端我的项目中关上待标记图片,前端会主动申请后端对其进行标记(调用后端的 predict 办法),期待片刻后即可看见预标记后果,咱们只须要大抵核查无误后点击 submit 即可:

    如果感觉每次关上图片都须要期待片刻才会收到后端预测后果比拟费时,能够在 Settings -> Machine Learning 设置中抉择关上 Retrieve predictions when loading a task automatically,尔后前端会在咱们每次关上我的项目时主动对所有工作进行主动预测,根本可能做到无期待:

后端主动训练

当初所有的图片都曾经有了与标注信息,咱们先查看所有图片,查看并改良所有标注信息而后点击 submit 提交:

在 Settings -> Machine Learning 中点击后端服务的 Start Training 按钮,即可调用后端模型应用已标记信息进行训练:

该操作会调用后端模型的 fit 办法对模型进行训练,能够在后端命令行界面看见训练过程,训练实现后的所有新数据集都会应用新的模型进行预测:

也能够 Settings -> Machine Learning 中容许模型主动训练,但训练频率过高会影响程序效率。

局部常见问题

Q: 一种拜访权限不容许的形式做了一个拜访套接字的尝试。
A: label-studio-ml start 启动时指定端口 -p 8888

Q: Can’t connect to ML backend http://127.0.0.1:8888/, health check failed. Make sure it is up and your firewall is properly configured.
A: label-studio-ml start 启动后会打印一个监听地址,label studio 前端增加该地址而非 http://127.0.0.1:8888/。

Q: FileNotFoundError: Can’t resolve url, neither hostname or project_dir passed: /data/upload/1/db8f065a-000001.jpg
A: 接口返回的是我的项目的绝对地址,无奈通过该地址间接读取到图片原件,须要配合 get_local_path 函数应用。

Q: UnicodeEncodeError: ‘gbk’ codec can’t encode character ‘\xa0’ in position 2: illegal multibyte sequence
A: 批改 C:\Users\Fantasy.conda\envs\labelstudio\lib\json\__init__.py#line 179 为:

    for chunk in iterable:
        fp.write(chunk.replace(u'\xa0', u''))

参考

<!– 1: 作者. 文章题目. 发表地. [发表或更新日期] –>


  1. Cai Yichao. label_studio 主动预标注性能. CSDN. [2022-01-19] ↩
退出移动版