乐趣区

关于机器学习:MindSpore易点通精讲系列数据集加载之MindDataset

Dive Into MindSpore – MindDataset For Dataset LoadMindSpore 易点通·精讲系列–数据集加载之 MindDataset 本文开发环境 Ubuntu 20.04Python 3.8MindSpore 1.7.0 本文内容摘要背景介绍先看文档数据生成数据加载问题解答本文总结本文参考 1. 背景介绍在后面的文章中,咱们介绍了 ImageFolderDataset、CSVDataset 及 TFRecordDataset 三个数据集加载 API。本文为数据集加载局部的最初一篇文章(当然,如果后续读者有须要,再思考补充其余 API 精讲),咱们将介绍 MindSpore 中官网数据格式 MindRecord 加载所波及的 API 的 MindDataset。一个残缺的机器学习工作流包含数据集读取(可能蕴含数据处理)、模型定义、模型训练、模型评估。如何在工作流中更好的读取数据,是各个深度学习框架须要解决的一个重要问题。为此,TensorFlow 推出了 TFRecord 数据格式,而 MindSpore 给出的解决方案就是 MindRecord。在正式开始本文的解说之前,先来看看 MindRecord 数据格式的特点:实现数据对立存储、拜访,使得训练时数据读取更加简便。数据聚合存储、高效读取,使得训练时数据方便管理和挪动。高效的数据编解码操作,使得用户能够对数据操作无感知。能够灵便控制数据切分的分区大小,实现分布式数据处理。2. 先看文档老传统,先看官网文档。

上面对官网文档中的参数,做简略解读:dataset_files – 类型为字符串或者列表。如果为字符串则依照匹配规定主动寻找并加载相应前缀的 MindRecord 文件;如果为列表,则读取列表内的 MindRecord 文件,即列表内要为具体的文件名。columns_list – 指定从 MindRecord 数据文件中读取的数据字段,或者说数据列。默认值为 None,即读取全副字段或数据列。其余参数参见之前文章中的相干解读。3. 数据生成本文应用的是 THUCNews 数据集,如果须要将该数据集用于商业用途,请分割数据集作者。数据集启智社区下载地址在下面 API 解读中,咱们讲到 MindDasetset 读取的是 MindRecord 文件,上面就来介绍一下如何生成 MindRecord 数据文件。MindRecord 数据文件生成能够简略蕴含以下几个局部(非程序):读取及解决原始数据申明 MindRecord 文件格式定义 MindRecord 数据字段增加 MindRecord 索引字段写入 MindRecord 数据内容 3.1 生成代码上面咱们基于 THUCNews 数据集,来生成 MindRecord 数据。3.1.1 代码局部 import codecs
import os
import re

import numpy as np

from collections import Counter
from mindspore.mindrecord import FileWriter

def get_txt_files(data_dir):

cls_txt_dict = {}
txt_file_list = []

# get files list and class files list.
sub_data_name_list = next(os.walk(data_dir))[1]
sub_data_name_list = sorted(sub_data_name_list)
for sub_data_name in sub_data_name_list:
    sub_data_dir = os.path.join(data_dir, sub_data_name)
    data_name_list = next(os.walk(sub_data_dir))[2]
    data_file_list = [os.path.join(sub_data_dir, data_name) for data_name in data_name_list]
    cls_txt_dict[sub_data_name] = data_file_list
    txt_file_list.extend(data_file_list)
    num_data_files = len(data_file_list)
    print("{}: {}".format(sub_data_name, num_data_files), flush=True)
num_txt_files = len(txt_file_list)
print("total: {}".format(num_txt_files), flush=True)

return cls_txt_dict, txt_file_list

def get_txt_data(txt_file):

with codecs.open(txt_file, "r", "UTF8") as fp:
    txt_content = fp.read()
txt_data = re.sub("\s+", " ", txt_content)

return txt_data

def build_vocab(txt_file_list, vocab_size=7000):

counter = Counter()
for txt_file in txt_file_list:
    txt_data = get_txt_data(txt_file)
    counter.update(txt_data)

num_vocab = len(counter)
if num_vocab < vocab_size - 1:
    real_vocab_size = num_vocab + 2
else:
    real_vocab_size = vocab_size

# pad_id is 0, unk_id is 1
vocab_dict = {word_freq[0]: ix + 1 for ix, word_freq in enumerate(counter.most_common(real_vocab_size - 2))}

print("real vocab size: {}".format(real_vocab_size), flush=True)
print("vocab dict:\n{}".format(vocab_dict), flush=True)

return vocab_dict

def make_mindrecord_files(

    data_dir, mindrecord_dir, vocab_size=7000, min_seq_length=10, max_seq_length=800,
    num_train_shard=16, num_test_shard=4):
# get txt files
cls_txt_dict, txt_file_list = get_txt_files(data_dir=data_dir)
# map word to id
vocab_dict = build_vocab(txt_file_list=txt_file_list, vocab_size=vocab_size)
# map class to id
class_dict = {class_name: ix for ix, class_name in enumerate(cls_txt_dict.keys())}

data_schema = {"seq_ids": {"type": "int32", "shape": [-1]},
    "seq_len": {"type": "int32", "shape": [-1]},
    "seq_cls": {"type": "int32", "shape": [-1]}
}

train_file = os.path.join(mindrecord_dir, "train.mindrecord")
test_file = os.path.join(mindrecord_dir, "test.mindrecord")
train_writer = FileWriter(train_file, shard_num=num_train_shard, overwrite=True)
test_writer = FileWriter(test_file, shard_num=num_test_shard, overwrite=True)

train_writer.add_schema(data_schema, "train")
test_writer.add_schema(data_schema, "test")

# indexes = ["seq_ids", "seq_len", "seq_cls"]
# train_writer.add_index(indexes)
# test_writer.add_index(indexes)

pad_id = 0
unk_id = 1
num_samples = 0
num_train_samples = 0
num_test_samples = 0

train_samples = []
test_samples = []
for class_name, class_file_list in cls_txt_dict.items():
    class_id = class_dict[class_name]
    num_class_pass = 0
    for txt_file in class_file_list:
        txt_data = get_txt_data(txt_file=txt_file)
        txt_len = len(txt_data)
        if txt_len < min_seq_length:
            num_class_pass += 1
            continue
        if txt_len > max_seq_length:
            txt_data = txt_data[:max_seq_length]
            txt_len = max_seq_length
        word_ids = []
        for word in txt_data:
            word_id = vocab_dict.get(word, unk_id)
            word_ids.append(word_id)
        for _ in range(max_seq_length - txt_len):
            word_ids.append(pad_id)

        num_samples += 1
        sample = {"seq_ids": np.array(word_ids, dtype=np.int32),
            "seq_len": np.array(txt_len, dtype=np.int32),
            "seq_cls": np.array(class_id, dtype=np.int32)}
        if num_samples % 10 == 0:
            train_samples.append(sample)
            num_train_samples += 1
            if num_train_samples % 10000 == 0:
                train_writer.write_raw_data(train_samples)
                train_samples = []
        else:
            test_samples.append(sample)
            num_test_samples += 1
            if num_test_samples % 10000 == 0:
                test_writer.write_raw_data(test_samples)
                test_samples = []

if train_samples:
    train_writer.write_raw_data(train_samples)
if test_samples:
    test_writer.write_raw_data(test_samples)

train_writer.commit()
test_writer.commit()

print("num samples: {}".format(num_samples), flush=True)
print("num train samples: {}".format(num_train_samples), flush=True)
print("num test samples: {}".format(num_test_samples), flush=True)

def main():

data_dir = "/Users/kaierlong/Documents/DownFiles/tmp/009_resources/THUCNews"
mindrecord_dir = "/Users/kaierlong/Documents/DownFiles/tmp/009_resources/mindrecords"

make_mindrecord_files(data_dir=data_dir, mindrecord_dir=mindrecord_dir)

if name == “__main__”:

main()

3.1.2 代码解读 get_txt_files、get_txt_data 和 build_vocab 不再开展,这里重点介绍 make_mindrecord_files。申明 MindRecord 文件格式 train_writer = FileWriter(train_file, shard_num=num_train_shard, overwrite=True)
test_writer = FileWriter(test_file, shard_num=num_test_shard, overwrite=True)
解读:导入写入工具类之后,创立 FileWriter 对象实例,有三个参数,train_file、share_num 和 overwrite。train_file 并非严格的具体数据写入文件,能够了解为文件前缀。num_shard 为写入的数据文件数量。定义 MindRecord 数据字段 data_schema = {

    "seq_ids": {"type": "int32", "shape": [-1]},
    "seq_len": {"type": "int32", "shape": [-1]},
    "seq_cls": {"type": "int32", "shape": [-1]}
}

train_writer.add_schema(data_schema, "train")
test_writer.add_schema(data_schema, "test")

解读:首先定义数据集文件构造 schema,而后通过 add_schema 增加到 FileWriter 实例对象中。schema 蕴含字段名、字段数据类型 type 和字段数据维度维数 shape,其中字段数据维度维数 shape 为可选的。如果字段有属性 shape,则用户传入 write_raw_data 接口的数据必须为 numpy.ndarray 类型,对应数据类型必须为 int32、int64、float32、float64。字段名:字段的援用名称,能够蕴含字母、数字和下划线。字段数据类型:蕴含 int32、int64、float32、float64、string、bytes。字段维数:一维数组用 [-1] 示意,更高维度可示意为 [m, n, …],其中 m、n 为各维度维数。增加 MindRecord 索引字段(可选)能够通过增加索引字段进行数据读取减速。然而要留神的是,索引字段的数据类型必须为主类型,即 int/float/str,其余类型的话会报错,具体报错信息参考 5.1 问题 1。写入 MindRecord 数据内容 train_samples = []
sample = {
“seq_ids”: np.array(word_ids, dtype=np.int32),
“seq_len”: np.array(txt_len, dtype=np.int32),
“seq_cls”: np.array(class_id, dtype=np.int32)}
train_samples.append(sample)
train_writer.write_raw_data(train_samples)
train_writer.commit()
解读:FileWriter 实例对象的写入内容为列表 list,列表内的数据单元为字典 dict,具体内容格局要与前文中 schema 格局雷同。通过调用 write_raw_data 办法进行数据写入,全副数据写入后,通过 commit 办法进行提交。留神:数据写入内容列表长度为 1 即可进行写入,为了放慢写入速度,通常能够等到列表长度达到肯定值(依据写入数据大小和设施内存大小确定)再进行写入,如本文设定为 10000。3.2 生成数据将 3.1.1 中的代码保留到文件 generate_mindrecord.py,应用如下命令:留神替换代码中的 data_dir 和 mindrecord_dirpython3 generate_mindrecord.py
在 mindrecord 数据目录下,应用 tree . 命令查看生成的数据状况。内容如下:阐明:生成的 Mindrecord 训练数据文件为 16 个,即代码中对应的参数 num_train_shard。生成的 Mindrecord 测试数据文件为 4 个,即代码中对应的参数 num_test_shard。数据文件的前缀如代码中 train_file 和 test_file。这里也再次阐明 FileWriter 中的 file_name 参数并非具体的数据文件名。.
├── test.mindrecord0
├── test.mindrecord0.db
├── test.mindrecord1
├── test.mindrecord1.db
├── test.mindrecord2
├── test.mindrecord2.db
├── test.mindrecord3
├── test.mindrecord3.db
├── train.mindrecord00
├── train.mindrecord00.db
├── train.mindrecord01
├── train.mindrecord01.db
├── train.mindrecord02
├── train.mindrecord02.db
├── train.mindrecord03
├── train.mindrecord03.db
├── train.mindrecord04
├── train.mindrecord04.db
├── train.mindrecord05
├── train.mindrecord05.db
├── train.mindrecord06
├── train.mindrecord06.db
├── train.mindrecord07
├── train.mindrecord07.db
├── train.mindrecord08
├── train.mindrecord08.db
├── train.mindrecord09
├── train.mindrecord09.db
├── train.mindrecord10
├── train.mindrecord10.db
├── train.mindrecord11
├── train.mindrecord11.db
├── train.mindrecord12
├── train.mindrecord12.db
├── train.mindrecord13
├── train.mindrecord13.db
├── train.mindrecord14
├── train.mindrecord14.db
├── train.mindrecord15
└── train.mindrecord15.db

0 directories, 40 files

  1. 数据加载在 3 中咱们解说了如何生成 MindRecord 数据,本节就来解说如何加载生成的 MindRecord 数据。4.1 加载代码加载 MindRecord 数据须要用 2 中提到的 MindDataset 数据加载接口。4.1.1 代码局部为保障复现后果统一,shuffle 设置为了 False。import os

from mindspore.dataset import MindDataset

def create_mindrecord_dataset(mindrecord_dir, train_mode=True):

if train_mode:
    file_prefix = os.path.join(mindrecord_dir, "train.mindrecord00")
else:
    file_prefix = os.path.join(mindrecord_dir, "test.mindrecord0")

dataset = MindDataset(dataset_files=file_prefix, columns_list=None, shuffle=False)

for item in dataset.create_dict_iterator():
    print(item, flush=True)
    break

def main():

mindrecord_dir = "/Users/kaierlong/Documents/DownFiles/tmp/009_resources/mindrecords"
create_mindrecord_dataset(mindrecord_dir=mindrecord_dir, train_mode=True)

if name == “__main__”:

main()

4.1.2 代码解读在代码解读局部,重点解说一下,MindDataset 中的 dataset_files 传入值。在 3.1.1 大节中,咱们将 num_train_shard 和 num_test_shard 别离设置为了 16 和 4。仔细的读者可能发现 3.2 生成数据局部中生成的数据文件的最初的数字局部有所不同,test 局部的数据是 0、1、2、3 结尾,而 train 局部的数据是 00、01、… 结尾。这就导致本节加载代码中 dataset_files 的传入值在针对 train 和 test 数据是不统一的,具体参考上文代码。如果 train 数据强行应用 train.mindrecord0,那么会报错,具体报错内容参见 5.2 问题 2。4.2 加载测试将 4.1.1 中的代码保留到 load_mindrecord.py 文件中,运行如下命令:python3 load_mindrecord.py
输入内容如下:阐明:读取数据胜利,蕴含三个字段:seq_cls、seq_ids、seq_len,且相应字段的 shape 与生成局部统一。{‘seq_cls’: Tensor(shape=[1], dtype=Int32, value= [0]), ‘seq_ids’: Tensor(shape=[800], dtype=Int32, value= [40, 80, 289, 400, 80, 163, 2239, 288, 413, 94, 309, 429, 3, 890, 664, 2941, 582, 539, 14,
……
55, 7, 5, 65, 7, 24, 40, 8, 40, 80, 1254, 396, 566, 276, 96, 42, 4, 73, 803, 857, 72, 3, 0, 0,

0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0, 
0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0, 
0,    0,    0,    0,    0,    0,    0,    0]), 'seq_len': Tensor(shape=[1], dtype=Int32, value= [742])}

补充:如果加载中 MIndRecord 数据文件过多,可能会导致报错,报错内容参见 5.3 问题 3。这时可应用命令:# ulimit -n ${num}
ulimit -n 1024
长期批改到能失常加载的值即可。5. 问题解答 5.1 问题 1Traceback (most recent call last):
File “/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_make.py”, line 167, in

main()

File “/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_make.py”, line 163, in main

make_mindrecord(data_dir=data_dir, mindrecord_dir=mindrecord_dir)

File “/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_make.py”, line 98, in make_mindrecord

train_writer.add_index(indexes)

File “/Users/kaierlong/Pyenvs/env_ms_1.7.0/lib/python3.9/site-packages/mindspore/mindrecord/filewriter.py”, line 223, in add_index

raise MRMDefineIndexError("Failed to set field {} since it's not primitive type.".format(field))

mindspore.mindrecord.common.exceptions.MRMDefineIndexError: [MRMDefineIndexError]: Failed to define index field. Detail: Failed to set field seq_ids since it’s not primitive type.
解答:The index fields should be primitive type. e.g. int/float/str.
5.2 问题 2Traceback (most recent call last):
File “/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_load.py”, line 36, in

main()

File “/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_load.py”, line 32, in main

create_mindrecord_dataset(mindrecord_dir=mindrecord_dir, train_mode=True)

File “/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_load.py”, line 23, in create_mindrecord_dataset

dataset = MindDataset(dataset_files=file_prefix, columns_list=None, shuffle=False)

File “/Users/kaierlong/Pyenvs/env_mix_dl/lib/python3.9/site-packages/mindspore/dataset/engine/validators.py”, line 994, in new_method

check_file(dataset_file)

File “/Users/kaierlong/Pyenvs/env_mix_dl/lib/python3.9/site-packages/mindspore/dataset/core/validator_helpers.py”, line 578, in check_file

raise ValueError("The file {} does not exist or permission denied!".format(dataset_file))

ValueError: The file /Users/kaierlong/Documents/DownFiles/tmp/009_resources/mindrecords/train.mindrecord0 does not exist or permission denied!
解答:参见 4.1.2 局部 5.3 问题 3Line of code : 247
File : /Users/jenkins/agent-working-dir/workspace/Compile_CPU_ARM_MacOS_PY39/mindspore/mindspore/ccsrc/minddata/mindrecord/io/shard_reader.cc

(env_ms_1.7.0) [kaierlong@Long-De-MacBook-Pro-16]: ~/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01$ python3 04_mindrecord_load.py
Traceback (most recent call last):
File “/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_load.py”, line 36, in

main()

File “/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_load.py”, line 32, in main

create_mindrecord_dataset(mindrecord_dir=mindrecord_dir, train_mode=True)

File “/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_load.py”, line 25, in create_mindrecord_dataset

for item in dataset.create_dict_iterator():

File “/Users/kaierlong/Pyenvs/env_ms_1.7.0/lib/python3.9/site-packages/mindspore/dataset/engine/validators.py”, line 971, in new_method

return method(self, *args, **kwargs)

File “/Users/kaierlong/Pyenvs/env_ms_1.7.0/lib/python3.9/site-packages/mindspore/dataset/engine/datasets.py”, line 1478, in create_dict_iterator

return DictIterator(self, num_epochs, output_numpy)

File “/Users/kaierlong/Pyenvs/env_ms_1.7.0/lib/python3.9/site-packages/mindspore/dataset/engine/iterators.py”, line 95, in init

offload_model = offload.GetOffloadModel(consumer, self.__ori_dataset.get_col_names())

File “/Users/kaierlong/Pyenvs/env_ms_1.7.0/lib/python3.9/site-packages/mindspore/dataset/engine/datasets.py”, line 1559, in get_col_names

self._col_names = runtime_getter[0].GetColumnNames()

RuntimeError: Unexpected error. Invalid file, failed to open files for reading mindrecord files. Please check file path, permission and open files limit(ulimit -a): /Users/kaierlong/Documents/DownFiles/tmp/009_resources/mindrecords/train.mindrecord11
Line of code : 247
File : /Users/jenkins/agent-working-dir/workspace/Compile_CPU_ARM_MacOS_PY39/mindspore/mindspore/ccsrc/minddata/mindrecord/io/shard_reader.cc
解答:留神:依据设施具体情况确定 ${num}值。# ulimit -n ${num}
ulimit -n 1024
批改前,应用 ulimit - a 查看,内容如下:core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
file size (blocks, -f) unlimited
max locked memory (kbytes, -l) unlimited
max memory size (kbytes, -m) unlimited
open files (-n) 256
pipe size (512 bytes, -p) 1
stack size (kbytes, -s) 8176
cpu time (seconds, -t) unlimited
max user processes (-u) 5333
virtual memory (kbytes, -v) unlimited
批改后,应用 ulimit - a 查看,内容如下:core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
file size (blocks, -f) unlimited
max locked memory (kbytes, -l) unlimited
max memory size (kbytes, -m) unlimited
open files (-n) 1024
pipe size (512 bytes, -p) 1
stack size (kbytes, -s) 8176
cpu time (seconds, -t) unlimited
max user processes (-u) 5333
virtual memory (kbytes, -v) unlimited

  1. 本文总结本文解说了 MindSpore 官网数据格式 MindRecord 的生成及数据集加载波及的 MindDataset 的应用。对于数据生成,笔者依据本人的教训,给出了简略的步骤供读者参考;对于数据读取,笔者同样依据本人的经验总结了几个常见的谬误以供读者避坑。7. 本文参考 MindDataset API 转换数据集为 MindRecord 格局转换本文为原创文章,版权归作者所有,未经受权不得转载!
退出移动版