乐趣区

关于机器学习:使用Paddle进行图像分类教程

最近学习简略钻研了一下 python 机器学习相干常识,写一个博客也是对本人近期学习的总结吧,内容比拟通俗大佬勿喷

在本教程中,咱们将应用 PaddlePaddle 框架进行图像分类。咱们将应用 CIFAR-10 数据集,该数据集蕴含 10 个不同类别的图像。咱们将通过构建一个卷积神经网络(CNN)模型来对这些图像进行分类。让咱们一步一步地来实现这个工作。

步骤 1:导入所需的包

首先,咱们须要导入所需的包,这些包包含 paddle、paddle.fluid、numpy、PIL 和 matplotlib.pyplot。请确保您曾经装置了这些包,如果没有,请先进行装置。

import paddle as paddle
import paddle.fluid as fluid
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import os

步骤 2:下载数据集

接下来,咱们须要下载 CIFAR-10 数据集,以便用于训练和测试。请留神,这段代码在正文块中,因而须要勾销正文能力执行。如果您曾经下载了数据集,能够跳过这一步。

'''
!mkdir -p  /home/aistudio/.cache/paddle/dataset/cifar/
!wget "http://ai-atest.bj.bcebos.com/cifar-10-python.tar.gz" -O cifar-10-python.tar.gz
!mv cifar-10-python.tar.gz  /home/aistudio/.cache/paddle/dataset/cifar/
!ls -a /home/aistudio/.cache/paddle/dataset/cifar/
'''

步骤 3:定义数据提供器

咱们须要定义数据提供器来加载训练和测试数据。在这里,咱们应用 paddle.batch 函数对数据进行批处理,并应用 paddle.reader.shuffle 函数对训练数据进行随机打乱。

BATCH_SIZE = 128

# 用于训练的数据提供器
train_reader = paddle.batch(paddle.reader.shuffle(paddle.dataset.cifar.train10(), buf_size=128*100),           
    batch_size=BATCH_SIZE)                                

# 用于测试的数据提供器
test_reader = paddle.batch(paddle.dataset.cifar.test10(),                            
    batch_size=BATCH_SIZE) 

步骤 4:定义卷积神经网络模型

咱们将应用一个卷积神经网络(CNN)模型对图像进行分类。在这个示例中,咱们应用了三个卷积 - 池化层和一个全连贯层。每个卷积 - 池化层都应用 ReLU 激活函数和批归一化(Batch Normalization)。

def convolutional_neural_network(img):
    # 第一个卷积 - 池化层
    conv_pool_1 = fluid.nets.simple_img_conv_pool(
        input=img,
        filter_size=5,
        num_filters=20,
        pool_size=2,
        pool_stride=2,
        act="relu")
        conv_pool_1 = fluid.layers.batch_norm(conv_pool_1)
    # 第二个卷积 - 池化层
    conv_pool_2 = fluid.nets.simple_img_conv_pool(
        input=conv_pool_1,
        filter_size=5,
        num_filters=50,
        pool_size=2,
        pool_stride=2,
        act="relu")
    conv_pool_2 = fluid.layers.batch_norm(conv_pool_2)

    # 第三个卷积 - 池化层
    conv_pool_3 = fluid.nets.simple_img_conv_pool(
        input=conv_pool_2,
        filter_size=5,
        num_filters=50,
        pool_size=2,
        pool_stride=2,
        act="relu")

    # 全连贯层,输入 10 个类别
    prediction = fluid.layers.fc(input=conv_pool_3, size=10, act='softmax')
    return prediction

步骤 5:定义输出数据和模型预测

咱们须要定义输出数据和模型预测。在这里,咱们应用 fluid.layers.data 定义图像数据和标签的输出。而后,咱们调用之前定义的卷积神经网络模型 convolutional_neural_network 进行预测。

data_shape = [3, 32, 32]
images = fluid.layers.data(name='images', shape=data_shape, dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')

# 获取分类器,用 CNN 进行分类
predict = convolutional_neural_network(images)

步骤 6:定义损失函数和准确率

咱们定义穿插熵损失函数和准确率来评估模型的性能。穿插熵损失函数用于掂量预测后果与实在标签之间的差别,准确率用于掂量模型在测试数据上的分类准确度。

cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(cost)
acc = fluid.layers.accuracy(input=predict, label=label)

步骤 7:定义优化办法

咱们抉择 Adam 优化器作为优化办法,并将学习率设置为 0.001。而后,应用优化器的 minimize 办法来最小化均匀损失。

optimizer = fluid.optimizer.Adam(learning_rate=0.001)
optimizer.minimize(avg_cost)

步骤 8:设置执行环境和参数初始化

依据您的抉择,咱们能够在 CPU 或 GPU 上运行代码。依据 use_cuda 的值,咱们创立一个执行器并初始化参数。

use_cuda = False
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()

exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())

步骤 9:训练和测试模型

在此步骤中,咱们开始训练和测试模型。咱们应用一个循环来遍历训练数据集,并计算损失并进行参数更新。在每个 epoch 完结时,咱们应用测试数据集评估模型的性能。

EPOCH_NUM = 10

for epoch in range(EPOCH_NUM):
    for batch_id, data in enumerate(train_reader()):
        # 筹备输出数据
        image_data = np.array([x[0].reshape(data_shape) for x in data]).astype('float32')
        label_data = np.array([x[1] for x in data]).astype('int64').reshape(-1, 1)

        # 运行训练程序
        loss, accuracy = exe.run(fluid.default_main_program(),
                                 feed={'images': image_data, 'label': label_data},
                                 fetch_list=[avg_cost, acc])

        # 每训练 100 个 batch 打印一次损失和准确率
        if batch_id % 100 == 0:
            print("Epoch {}, Batch {}, Loss {:.4f}, Accuracy {:.2f}%".format(epoch, batch_id, loss[0], accuracy[0] * 100))

    # 在每个 epoch 完结时进行测试
    test_accs = []
    test_costs = []
    for batch_id, data in enumerate(test_reader()):
        # 筹备测试数据
        image_data = np.array([x[0].reshape(data_shape) for x in data]).astype('float32')
        label_data = np.array([x[1] for x in data]).astype('int64').reshape(-1, 1)

        # 运行测试程序
        loss, accuracy = exe.run(fluid.default_main_program(),
                                 feed={'images': image_data, 'label': label_data},
                                 fetch_list=[avg_cost, acc])
        test_accs.append(accuracy[0])
        test_costs.append(loss[0])

    # 计算均匀测试准确率和损失
    test_acc = np.mean(test_accs)
    test_cost = np.mean(test_costs)
    print("Epoch {}, Test Loss {:.4f}, Test Accuracy {:.2f}%".format(epoch, test_cost, test_acc * 100))

步骤 10:保留模型

训练完结后,咱们能够将模型保留到磁盘上以供当前应用。

model_save_dir = "./models"
if not os.path.exists(model_save_dir):
    os.makedirs(model_save_dir)
fluid.io.save_inference_model(model_save_dir, ['images'], [predict], exe)

至此,咱们实现了应用 PaddlePaddle 进行图像分类的教程。您能够依据须要对代码进行批改和扩大,以适应不同的利用场景。心愿本教程对您有所帮忙!

残缺代码

# 导入须要的包
import paddle as paddle
import paddle.fluid as fluid
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import os

# 数据集下载
'''
!mkdir -p  /home/aistudio/.cache/paddle/dataset/cifar/
!wget "http://ai-atest.bj.bcebos.com/cifar-10-python.tar.gz" -O cifar-10-python.tar.gz
!mv cifar-10-python.tar.gz  /home/aistudio/.cache/paddle/dataset/cifar/
!ls -a /home/aistudio/.cache/paddle/dataset/cifar/
'''
BATCH_SIZE = 128
# 用于训练的数据提供器
train_reader = paddle.batch(paddle.reader.shuffle(paddle.dataset.cifar.train10(),
                          buf_size=128 * 100),
    batch_size=BATCH_SIZE)
# 用于测试的数据提供器
test_reader = paddle.batch(paddle.dataset.cifar.test10(),
    batch_size=BATCH_SIZE)


def convolutional_neural_network(img):
    # 第一个卷积 - 池化层
    conv_pool_1 = fluid.nets.simple_img_conv_pool(
        input=img,  # 输出图像
        filter_size=5,  # 滤波器的大小
        num_filters=20,  # filter 的数量。它与输入的通道雷同
        pool_size=2,  # 池化核大小 2 *2
        pool_stride=2,  # 池化步长
        act="relu")  # 激活类型
    conv_pool_1 = fluid.layers.batch_norm(conv_pool_1)
    # 第二个卷积 - 池化层
    conv_pool_2 = fluid.nets.simple_img_conv_pool(
        input=conv_pool_1,
        filter_size=5,
        num_filters=50,
        pool_size=2,
        pool_stride=2,
        act="relu")
    conv_pool_2 = fluid.layers.batch_norm(conv_pool_2)
    # 第三个卷积 - 池化层
    conv_pool_3 = fluid.nets.simple_img_conv_pool(
        input=conv_pool_2,
        filter_size=5,
        num_filters=50,
        pool_size=2,
        pool_stride=2,
        act="relu")
    # 以 softmax 为激活函数的全连贯输入层,10 类数据输入 10 个数字
    prediction = fluid.layers.fc(input=conv_pool_3, size=10, act='softmax')
    return prediction


# 定义输出数据
data_shape = [3, 32, 32]
images = fluid.layers.data(name='images', shape=data_shape, dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')

# 获取分类器,用 cnn 进行分类
predict = convolutional_neural_network(images)

# 获取损失函数和准确率
cost = fluid.layers.cross_entropy(input=predict, label=label)  # 穿插熵
avg_cost = fluid.layers.mean(cost)  # 计算 cost 中所有元素的平均值
acc = fluid.layers.accuracy(input=predict, label=label)  # 应用输出和标签计算准确率

# 获取测试程序
test_program = fluid.default_main_program().clone(for_test=True)

# 定义优化办法
optimizer = fluid.optimizer.Adam(learning_rate=0.001)
optimizer.minimize(avg_cost)

# 定义应用 CPU 还是 GPU,应用 CPU 时 use_cuda = False, 应用 GPU 时 use_cuda = True
use_cuda = False
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()

# 创立执行器,初始化参数

exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())

feeder = fluid.DataFeeder(feed_list=[images, label], place=place)

all_train_iter = 0
all_train_iters = []
all_train_costs = []
all_train_accs = []


def draw_train_process(title, iters, costs, accs, label_cost, lable_acc):
    plt.title(title, fontsize=24)
    plt.xlabel("iter", fontsize=20)
    plt.ylabel("cost/acc", fontsize=20)
    plt.plot(iters, costs, color='red', label=label_cost)
    plt.plot(iters, accs, color='green', label=lable_acc)
    plt.legend()
    plt.grid()
    plt.show()


EPOCH_NUM = 20
model_save_dir = "/home/aistudio/work/catdog.inference.model"

for pass_id in range(EPOCH_NUM):
    # 开始训练
    for batch_id, data in enumerate(train_reader()):  # 遍历 train_reader 的迭代器,并为数据加上索引 batch_id
        train_cost, train_acc = exe.run(program=fluid.default_main_program(),  # 运行主程序
                                        feed=feeder.feed(data),  # 喂入一个 batch 的数据
                                        fetch_list=[avg_cost, acc])  # fetch 均方误差和准确率

        all_train_iter = all_train_iter + BATCH_SIZE
        all_train_iters.append(all_train_iter)
        all_train_costs.append(train_cost[0])
        all_train_accs.append(train_acc[0])

        # 每 100 次 batch 打印一次训练、进行一次测试
        if batch_id % 100 == 0:
            print('Pass:%d, Batch:%d, Cost:%0.5f, Accuracy:%0.5f' %
                  (pass_id, batch_id, train_cost[0], train_acc[0]))

    # 开始测试
    test_costs = []  # 测试的损失值
    test_accs = []  # 测试的准确率
    for batch_id, data in enumerate(test_reader()):
        test_cost, test_acc = exe.run(program=test_program,  # 执行测试程序
                                      feed=feeder.feed(data),  # 喂入数据
                                      fetch_list=[avg_cost, acc])  # fetch 误差、准确率
        test_costs.append(test_cost[0])  # 记录每个 batch 的误差
        test_accs.append(test_acc[0])  # 记录每个 batch 的准确率

    # 求测试后果的平均值
    test_cost = (sum(test_costs) / len(test_costs))  # 计算误差平均值(误差和 / 误差的个数)test_acc = (sum(test_accs) / len(test_accs))  # 计算准确率平均值(准确率的和 / 准确率的个数)print('Test:%d, Cost:%0.5f, ACC:%0.5f' % (pass_id, test_cost, test_acc))

# 保留模型
# 如果保留门路不存在就创立
if not os.path.exists(model_save_dir):
    os.makedirs(model_save_dir)
print('save models to %s' % (model_save_dir))
fluid.io.save_inference_model(model_save_dir,
                              ['images'],
                              [predict],
                              exe)
print('训练模型保留实现!')
draw_train_process("training", all_train_iters, all_train_costs, all_train_accs, "trainning cost", "trainning acc")

infer_exe = fluid.Executor(place)
inference_scope = fluid.core.Scope()


def load_image(file):
    # 关上图片
    im = Image.open(file)
    # 将图片调整为跟训练数据一样的大小  32*32,设定 ANTIALIAS,即抗锯齿.resize 是缩放
    im = im.resize((32, 32), Image.ANTIALIAS)
    # 建设图片矩阵 类型为 float32
    im = np.array(im).astype(np.float32)
    # 矩阵转置
    im = im.transpose((2, 0, 1))
    # 将像素值从【0-255】转换为【0-1】im = im / 255.0
    # print(im)
    im = np.expand_dims(im, axis=0)
    # 放弃和之前输出 image 维度统一
    print('im_shape 的维度:', im.shape)
    return im


with fluid.scope_guard(inference_scope):
    # 从指定目录中加载 推理 model(inference model)
    [inference_program,  # 预测用的 program
     feed_target_names,  # 是一个 str 列表,它蕴含须要在推理 Program 中提供数据的变量的名称。fetch_targets] = fluid.io.load_inference_model(model_save_dir,  # fetch_targets:是一个 Variable 列表,从中咱们能够失去推断后果。infer_exe)  # infer_exe: 运行 inference model 的 executor

    infer_path = 'dog2.jpg'
    img = Image.open(infer_path)
    plt.imshow(img)
    plt.show()

    img = load_image(infer_path)

    results = infer_exe.run(inference_program,  # 运行预测程序
                            feed={feed_target_names[0]: img},  # 喂入要预测的 img
                            fetch_list=fetch_targets)  # 失去揣测后果
    print('results', results)
    label_list = [
        "airplane", "automobile", "bird", "cat", "deer", "dog", "frog", "horse",
        "ship", "truck"
    ]
    print("infer results: %s" % label_list[np.argmax(results[0])])
退出移动版