关于pytorch:基于梅尔频谱的音频信号分类识别Pytorch

48次阅读

共计 13414 个字符,预计需要花费 34 分钟才能阅读完成。

本我的项目将应用 Pytorch,实现一个简略的的音频信号分类器,可利用于机械信号分类辨认,鸟叫声信号辨认等利用场景。

我的项目应用 librosa 进行音频信号处理,backbone 应用 mobilenet_v2,在 Urbansound8K 数据上,最终收敛的准确率在训练集 99%,测试集 82%,如果想进一步提高辨认准确率能够应用更重的 backbone 和更多的数据加强办法。

残缺的我的项目代码:https://download.csdn.net/dow…

目录

  1. 我的项目构造
  2. 环境配置

3. 数据处理

(1)数据集 Urbansound8K

(2)自定义数据集

(3)音频特征提取:

4. 训练 Pipeline

5. 预测 demo.py

  1. 我的项目构造
  2. 环境配置
    应用 pip 命令装置 libsora 和 pyaudio,pydub 等库

    3. 数据处理

    (1)数据集 Urbansound8K
    Urbansound8K 是目前利用较为宽泛的用于主动城市环境声分类钻研的公共数据集,
    蕴含 10 个分类:空调声、汽车鸣笛声、儿童游玩声、狗叫声、钻孔声、引擎空转声、枪声、手提钻、警笛声和街道音乐声。

数据集下载:https://www.ctocio.com/?s=%E9…

(2)自定义数据集
能够本人录制音频信号,制作本人的数据集,参考
每个文件夹寄存一个类别的音频数据,每条音频数据长度在 3 秒左右, 倡议每类的音频数据平衡
生产 train 和 test 数据列表:参考

(3)音频特征提取:
音频信号是一维的语音信号,不能间接用于模型训练,须要应用 librosa 将音频转为梅尔频谱(Mel Spectrogram)。

librosa 提供 python 接口,在音频、噪音信号的剖析中常常用到

wav, sr = librosa.load(data_path, sr=16000)

应用 librosa 取得音频的梅尔频谱

spec_image = librosa.feature.melspectrogram(y=wav, sr=sr, hop_length=256)
对于 librosa 的应用办法,请参考:

音频特征提取——librosa 工具包应用
梅尔频谱 (mel spectrogram) 原理与应用
4. 训练 Pipeline
(1)构建训练和测试数据

def build_dataset(self, cfg):
    """构建训练数据和测试数据"""
    input_shape = eval(cfg.input_shape)
    # 获取数据
    train_dataset = AudioDataset(cfg.train_data, data_dir=cfg.data_dir, mode='train', spec_len=input_shape[3])
    train_loader = DataLoader(dataset=train_dataset, batch_size=cfg.batch_size, shuffle=True,
                              num_workers=cfg.num_workers)

    test_dataset = AudioDataset(cfg.test_data, data_dir=cfg.data_dir, mode='test', spec_len=input_shape[3])
    test_loader = DataLoader(dataset=test_dataset, batch_size=cfg.batch_size, shuffle=False,
                             num_workers=cfg.num_workers)
    print("train nums:{}".format(len(train_dataset)))
    print("test  nums:{}".format(len(test_dataset)))
    return train_loader, test_loader

因为 librosa.load 加载音频数据特地慢,倡议应用 cache 先进行缓存,不便减速

def load_audio(audio_file, cache=False):

"""
加载并预处理音频
:param audio_file:
:param cache: librosa.load 加载音频数据特地慢,倡议应用进行缓存进行减速
:return:
"""
# 读取音频数据
cache_path = audio_file + ".pk"
# t = librosa.get_duration(filename=audio_file)
if cache and os.path.exists(cache_path):
    tmp = open(cache_path, 'rb')
    wav, sr = pickle.load(tmp)
else:
    wav, sr = librosa.load(audio_file, sr=16000)
    if cache:
        f = open(cache_path, 'wb')
        pickle.dump([wav, sr], f)
        f.close()
# Compute a mel-scaled spectrogram: 梅尔频谱图
spec_image = librosa.feature.melspectrogram(y=wav, sr=sr, hop_length=256)
return spec_image

(2)构建 backbone 模型

backbone 是一个基于 CNN+FC 的网络结构,与图像 CNN 分类模型不同的是,图像 CNN 分类模型的输出维度 (batch,3,H,W) 输出数据 depth=3,而音频信号的梅尔频谱图是深度为 depth=1,能够认为是灰度图,输出维度(batch,1,H,W),因而理论应用中,只须要将传统的 CNN 图像分类的 backbone 的第一层卷积层的 in_channels= 1 即可。须要留神的是,因为维度不统一,导致不能应用 imagenet 的 pretrained 模型。

当然能够将梅尔频谱图 (灰度图) 是转为 3 通道 RGB 图,这样就跟一般的 RGB 图像没有什么区别了,也能够 imagenet 的 pretrained 模型,如

将梅尔频谱图 (灰度图) 是转为为 3 通道 RGB 图

spec_image = cv2.cvtColor(spec_image, cv2.COLOR_GRAY2RGB)

def build_model(self, cfg):
    if cfg.net_type == "mbv2":
        model = mobilenet_v2.mobilenet_v2(num_classes=cfg.num_classes)
    elif cfg.net_type == "resnet34":
        model = resnet.resnet34(num_classes=args.num_classes)
    elif cfg.net_type == "resnet18":
        model = resnet.resnet18(num_classes=args.num_classes)
    else:
        raise Exception("Error:{}".format(cfg.net_type))
    model.to(self.device)
    return model

(3)训练参数配置

相干的命令行参数,可参考:

def get_parser():

data_dir = "/media/pan/ 新加卷 /dataset/UrbanSound8K"
# data_dir = "E:/dataset/UrbanSound8K"
train_data = 'data/UrbanSound8K/train.txt'
test_data = 'data/UrbanSound8K/test.txt'
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--batch_size', type=int, default=32, help='训练的批量大小')
parser.add_argument('--num_workers', type=int, default=4, help='读取数据的线程数量')
parser.add_argument('--num_epoch', type=int, default=100, help='训练的轮数')
parser.add_argument('--num_classes', type=int, default=10, help='分类的类别数量')
parser.add_argument('--learning_rate', type=float, default=1e-3, help='初始学习率的大小')
parser.add_argument('--input_shape', type=str, default='(None, 1, 128, 128)', help='数据输出的形态')
parser.add_argument('--gpu_id', type=int, default=0, help='GPU ID')
parser.add_argument('--net_type', type=str, default="mbv2", help='backbone')
parser.add_argument('--data_dir', type=str, default=data_dir, help='数据门路')
parser.add_argument('--train_data', type=str, default=train_data, help='训练数据的数据列表门路')
parser.add_argument('--test_data', type=str, default=test_data, help='测试数据的数据列表门路')
parser.add_argument('--work_dir', type=str, default='work_space/', help='模型保留的门路')
return parser

配置好数据门路,其余参数默认设置,即能够开始训练了:

python train.py

训练实现,应用 mobilenet_v2, 最终训练集准确率 99% 左右,测试集 81% 左右,看起来有点过拟合了。

如果想进一步提高辨认准确率能够应用更重的 backbone,如 resnet34, 采纳更多的数据加强办法,进步模型的泛发性。

残缺的训练代码 train.py:

-coding: utf-8 –

“””

@Author : panjq
@E-mail : pan_jinquan@163.com
@Date   : 2021-07-28 09:09:32

“””

import argparse
import os
import numpy as np
import torch
import tensorboardX as tensorboard
from datetime import datetime
from tqdm import tqdm
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import StepLR, MultiStepLR
from audio.dataloader.audio_dataset import AudioDataset
from audio.utils.utility import print_arguments
from audio.utils import file_utils
from audio.models import mobilenet_v2, resnet

class Train(object):

"""Training  Pipeline"""

def __init__(self, cfg):
    self.device = "cuda:{}".format(cfg.gpu_id) if torch.cuda.is_available() else "cpu"
    self.num_epoch = cfg.num_epoch
    self.net_type = cfg.net_type
    self.work_dir = os.path.join(cfg.work_dir, self.net_type)
    self.model_dir = os.path.join(self.work_dir, "model")
    self.log_dir = os.path.join(self.work_dir, "log")
    file_utils.create_dir(self.model_dir)
    file_utils.create_dir(self.log_dir)

    self.tensorboard = tensorboard.SummaryWriter(self.log_dir)
    self.train_loader, self.test_loader = self.build_dataset(cfg)
    # 获取模型
    self.model = self.build_model(cfg)
    # 获取优化办法
    self.optimizer = torch.optim.Adam(params=self.model.parameters(),
                                      lr=cfg.learning_rate,
                                      weight_decay=5e-4)
    # 获取学习率衰减函数
    self.scheduler = MultiStepLR(self.optimizer, milestones=[50, 80], gamma=0.1)
    # 获取损失函数
    self.losses = torch.nn.CrossEntropyLoss()

def build_dataset(self, cfg):
    """构建训练数据和测试数据"""
    input_shape = eval(cfg.input_shape)
    # 获取数据
    train_dataset = AudioDataset(cfg.train_data, data_dir=cfg.data_dir, mode='train', spec_len=input_shape[3])
    train_loader = DataLoader(dataset=train_dataset, batch_size=cfg.batch_size, shuffle=True,
                              num_workers=cfg.num_workers)

    test_dataset = AudioDataset(cfg.test_data, data_dir=cfg.data_dir, mode='test', spec_len=input_shape[3])
    test_loader = DataLoader(dataset=test_dataset, batch_size=cfg.batch_size, shuffle=False,
                             num_workers=cfg.num_workers)
    print("train nums:{}".format(len(train_dataset)))
    print("test  nums:{}".format(len(test_dataset)))
    return train_loader, test_loader

def build_model(self, cfg):
    """构建模型"""
    if cfg.net_type == "mbv2":
        model = mobilenet_v2.mobilenet_v2(num_classes=cfg.num_classes)
    elif cfg.net_type == "resnet34":
        model = resnet.resnet34(num_classes=args.num_classes)
    elif cfg.net_type == "resnet18":
        model = resnet.resnet18(num_classes=args.num_classes)
    else:
        raise Exception("Error:{}".format(cfg.net_type))
    model.to(self.device)
    return model

def epoch_test(self, epoch):
    """模型测试"""
    loss_sum = []
    accuracies = []
    self.model.eval()
    with torch.no_grad():
        for step, (inputs, labels) in enumerate(tqdm(self.test_loader)):
            inputs = inputs.to(self.device)
            labels = labels.to(self.device).long()
            output = self.model(inputs)
            # 计算损失值
            loss = self.losses(output, labels)
            # 计算准确率
            output = torch.nn.functional.softmax(output, dim=1)
            output = output.data.cpu().numpy()
            output = np.argmax(output, axis=1)
            labels = labels.data.cpu().numpy()
            acc = np.mean((output == labels).astype(int))
            accuracies.append(acc)
            loss_sum.append(loss)
    acc = sum(accuracies) / len(accuracies)
    loss = sum(loss_sum) / len(loss_sum)
    print("Test epoch:{:3.3f},Acc:{:3.3f},loss:{:3.3f}".format(epoch, acc, loss))
    print('=' * 70)
    return acc, loss

def epoch_train(self, epoch):
    """模型训练"""
    loss_sum = []
    accuracies = []
    self.model.train()
    for step, (inputs, labels) in enumerate(tqdm(self.train_loader)):
        inputs = inputs.to(self.device)
        labels = labels.to(self.device).long()
        output = self.model(inputs)
        # 计算损失值
        loss = self.losses(output, labels)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # 计算准确率
        output = torch.nn.functional.softmax(output, dim=1)
        output = output.data.cpu().numpy()
        output = np.argmax(output, axis=1)
        labels = labels.data.cpu().numpy()
        acc = np.mean((output == labels).astype(int))
        accuracies.append(acc)
        loss_sum.append(loss)
        if step % 50 == 0:
            lr = self.optimizer.state_dict()['param_groups'][0]['lr']
            print('[%s] Train epoch %d, batch: %d/%d, loss: %f, accuracy: %f,lr:%f' % (datetime.now(), epoch, step, len(self.train_loader), sum(loss_sum) / len(loss_sum),
                sum(accuracies) / len(accuracies), lr))
    acc = sum(accuracies) / len(accuracies)
    loss = sum(loss_sum) / len(loss_sum)
    print("Train epoch:{:3.3f},Acc:{:3.3f},loss:{:3.3f}".format(epoch, acc, loss))
    print('=' * 70)
    return acc, loss

def run(self):
    # 开始训练
    for epoch in range(self.num_epoch):
        train_acc, train_loss = self.epoch_train(epoch)
        test_acc, test_loss = self.epoch_test(epoch)
        self.tensorboard.add_scalar("train_acc", train_acc, epoch)
        self.tensorboard.add_scalar("train_loss", train_loss, epoch)
        self.tensorboard.add_scalar("test_acc", test_acc, epoch)
        self.tensorboard.add_scalar("test_loss", test_loss, epoch)
        self.scheduler.step()
        self.save_model(epoch, test_acc)

def save_model(self, epoch, acc):
    """放弃模型"""
    model_path = os.path.join(self.model_dir, 'model_{:0=3d}_{:.3f}.pth'.format(epoch, acc))
    if not os.path.exists(os.path.dirname(model_path)):
        os.makedirs(os.path.dirname(model_path))
    torch.jit.save(torch.jit.script(self.model), model_path)

def get_parser():

data_dir = "/media/pan/ 新加卷 /dataset/UrbanSound8K"
# data_dir = "E:/dataset/UrbanSound8K"
train_data = 'data/UrbanSound8K/train.txt'
test_data = 'data/UrbanSound8K/test.txt'
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--batch_size', type=int, default=32, help='训练的批量大小')
parser.add_argument('--num_workers', type=int, default=4, help='读取数据的线程数量')
parser.add_argument('--num_epoch', type=int, default=100, help='训练的轮数')
parser.add_argument('--num_classes', type=int, default=10, help='分类的类别数量')
parser.add_argument('--learning_rate', type=float, default=1e-3, help='初始学习率的大小')
parser.add_argument('--input_shape', type=str, default='(None, 1, 128, 128)', help='数据输出的形态')
parser.add_argument('--gpu_id', type=int, default=0, help='GPU ID')
parser.add_argument('--net_type', type=str, default="mbv2", help='backbone')
parser.add_argument('--data_dir', type=str, default=data_dir, help='数据门路')
parser.add_argument('--train_data', type=str, default=train_data, help='训练数据的数据列表门路')
parser.add_argument('--test_data', type=str, default=test_data, help='测试数据的数据列表门路')
parser.add_argument('--work_dir', type=str, default='work_space/', help='模型保留的门路')
return parser

if name == ‘__main__’:

parser = get_parser()
args = parser.parse_args()
print_arguments(args)
t = Train(args)
t.run()

5. 预测 demo.py

-coding: utf-8 –

“””

@Author : panjq
@E-mail : pan_jinquan@163.com
@Date   : 2021-07-28 09:09:32

“””

import os
import cv2
import argparse
import librosa
import torch
import numpy as np
from audio.dataloader.audio_dataset import load_audio, normalization
from audio.dataloader.record_audio import record_audio
from audio.utils import file_utils, image_utils
https://www.ctocio.com/?s=%E9…
https://www.ctocio.com/?s=%E5…
https://www.ctocio.com/?s=%E5…
https://www.ctocio.com/?s=%E8…
https://www.ctocio.com/?s=%E5…
https://www.ctocio.com/?s=%E5…
https://www.ctocio.com/?s=%E5…
https://www.ctocio.com/?s=%E5…
https://www.ctocio.com/?s=%E5…
https://www.ctocio.com/?s=%E5…
https://www.ctocio.com/?s=%E5…
https://www.ctocio.com/?s=%E5…
https://www.ctocio.com/?s=%E5…
https://www.ctocio.com/?s=%E5…

class Predictor(object):

def __init__(self, cfg):
    # self.device = "cuda:{}".format(cfg.gpu_id) if torch.cuda.is_available() else "cpu"
    self.device = "cpu"
    self.input_shape = eval(cfg.input_shape)
    self.spec_len = self.input_shape[3]
    self.model = self.build_model(cfg.model_file)

def build_model(self, model_file):
    # 加载模型
    model = torch.jit.load(model_file, map_location="cpu")
    model.to(self.device)
    model.eval()
    return model

def inference(self, input_tensors):
    with torch.no_grad():
        input_tensors = input_tensors.to(self.device)
        output = self.model(input_tensors)
    return output

def pre_process(self, spec_image):
    """音频数据预处理"""
    if spec_image.shape[1] > self.spec_len:
        input = spec_image[:, 0:self.spec_len]
    else:
        input = np.zeros(shape=(self.spec_len, self.spec_len), dtype=np.float32)
        input[:, 0:spec_image.shape[1]] = spec_image
    input = normalization(input)
    input = input[np.newaxis, np.newaxis, :]
    input_tensors = np.concatenate([input])
    input_tensors = torch.tensor(input_tensors, dtype=torch.float32)
    return input_tensors

def post_process(self, output):
    """输入后果后处理"""
    scores = torch.nn.functional.softmax(output, dim=1)
    scores = scores.data.cpu().numpy()
    # 显示图片并输入后果最大的 label
    label = np.argmax(scores, axis=1)
    score = scores[:, label]
    return label, score

def detect(self, audio_file):
    """
    :param audio_file: 音频文件
    :return: label: 预测音频的 label
             score: 预测音频的置信度
    """
    spec_image = load_audio(audio_file)
    input_tensors = self.pre_process(spec_image)
    # 执行预测
    output = self.inference(input_tensors)
    label, score = self.post_process(output)
    return label, score

def detect_file_dir(self, file_dir):
    """
    :param file_dir: 音频文件目录
    :return:
    """file_list = file_utils.get_files_lists(file_dir, postfix=["*.wav"])
    for file in file_list:
        print(file)
        label, score = self.detect(file)
        print(label, score)

def detect_record_audio(self, audio_dir):
    """
    :param audio_dir: 录制音频并进行辨认
    :return:
    """
    time = file_utils.get_time()
    file = os.path.join(audio_dir, time + ".wav")
    record_audio(file)
    label, score = self.detect(file)
    print(file)
    print(label, score)

def get_parser():

model_file = 'data/pretrained/model_060_0.827.pth'
file_dir = 'data/audio'
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--num_classes', type=int, default=10, help='分类的类别数量')
parser.add_argument('--input_shape', type=str, default='(None, 1, 128, 128)', help='数据输出的形态')
parser.add_argument('--net_type', type=str, default="mbv2", help='backbone')
parser.add_argument('--gpu_id', type=int, default=0, help='GPU ID')
parser.add_argument('--model_file', type=str, default=model_file, help='模型文件')
parser.add_argument('--file_dir', type=str, default=file_dir, help='音频文件的目录')
return parser

if name == ‘__main__’:

parser = get_parser()
args = parser.parse_args()
p = Predictor(args)
p.detect_file_dir(file_dir=args.file_dir)
# audio_dir = 'data/record_audio'

正文完
 0