Dive Into MindSpore – MindDataset For Dataset LoadMindSpore易点通·精讲系列–数据集加载之MindDataset本文开发环境Ubuntu 20.04Python 3.8MindSpore 1.7.0本文内容摘要背景介绍先看文档数据生成数据加载问题解答本文总结本文参考1. 背景介绍在后面的文章中,咱们介绍了ImageFolderDataset、CSVDataset及TFRecordDataset三个数据集加载API。本文为数据集加载局部的最初一篇文章(当然,如果后续读者有须要,再思考补充其余API精讲),咱们将介绍MindSpore中官网数据格式MindRecord加载所波及的API的MindDataset。一个残缺的机器学习工作流包含数据集读取(可能蕴含数据处理)、模型定义、模型训练、模型评估。如何在工作流中更好的读取数据,是各个深度学习框架须要解决的一个重要问题。为此,TensorFlow推出了TFRecord数据格式,而MindSpore给出的解决方案就是MindRecord。在正式开始本文的解说之前,先来看看MindRecord数据格式的特点:实现数据对立存储、拜访,使得训练时数据读取更加简便。数据聚合存储、高效读取,使得训练时数据方便管理和挪动。高效的数据编解码操作,使得用户能够对数据操作无感知。能够灵便控制数据切分的分区大小,实现分布式数据处理。2. 先看文档老传统,先看官网文档。

上面对官网文档中的参数,做简略解读:dataset_files – 类型为字符串或者列表。如果为字符串则依照匹配规定主动寻找并加载相应前缀的MindRecord文件;如果为列表,则读取列表内的MindRecord文件,即列表内要为具体的文件名。columns_list – 指定从MindRecord数据文件中读取的数据字段,或者说数据列。默认值为None,即读取全副字段或数据列。其余参数参见之前文章中的相干解读。3. 数据生成本文应用的是THUCNews数据集,如果须要将该数据集用于商业用途,请分割数据集作者。数据集启智社区下载地址在下面API解读中,咱们讲到MindDasetset读取的是MindRecord文件,上面就来介绍一下如何生成MindRecord数据文件。MindRecord数据文件生成能够简略蕴含以下几个局部(非程序):读取及解决原始数据申明MindRecord文件格式定义MindRecord数据字段增加MindRecord索引字段写入MindRecord数据内容3.1 生成代码上面咱们基于THUCNews数据集,来生成MindRecord数据。3.1.1 代码局部import codecs
import os
import re

import numpy as np

from collections import Counter
from mindspore.mindrecord import FileWriter

def get_txt_files(data_dir):

cls_txt_dict = {}txt_file_list = []# get files list and class files list.sub_data_name_list = next(os.walk(data_dir))[1]sub_data_name_list = sorted(sub_data_name_list)for sub_data_name in sub_data_name_list:    sub_data_dir = os.path.join(data_dir, sub_data_name)    data_name_list = next(os.walk(sub_data_dir))[2]    data_file_list = [os.path.join(sub_data_dir, data_name) for data_name in data_name_list]    cls_txt_dict[sub_data_name] = data_file_list    txt_file_list.extend(data_file_list)    num_data_files = len(data_file_list)    print("{}: {}".format(sub_data_name, num_data_files), flush=True)num_txt_files = len(txt_file_list)print("total: {}".format(num_txt_files), flush=True)return cls_txt_dict, txt_file_list

def get_txt_data(txt_file):

with codecs.open(txt_file, "r", "UTF8") as fp:    txt_content = fp.read()txt_data = re.sub("\s+", " ", txt_content)return txt_data

def build_vocab(txt_file_list, vocab_size=7000):

counter = Counter()for txt_file in txt_file_list:    txt_data = get_txt_data(txt_file)    counter.update(txt_data)num_vocab = len(counter)if num_vocab < vocab_size - 1:    real_vocab_size = num_vocab + 2else:    real_vocab_size = vocab_size# pad_id is 0, unk_id is 1vocab_dict = {word_freq[0]: ix + 1 for ix, word_freq in enumerate(counter.most_common(real_vocab_size - 2))}print("real vocab size: {}".format(real_vocab_size), flush=True)print("vocab dict:\n{}".format(vocab_dict), flush=True)return vocab_dict

def make_mindrecord_files(

    data_dir, mindrecord_dir, vocab_size=7000, min_seq_length=10, max_seq_length=800,    num_train_shard=16, num_test_shard=4):# get txt filescls_txt_dict, txt_file_list = get_txt_files(data_dir=data_dir)# map word to idvocab_dict = build_vocab(txt_file_list=txt_file_list, vocab_size=vocab_size)# map class to idclass_dict = {class_name: ix for ix, class_name in enumerate(cls_txt_dict.keys())}data_schema = {    "seq_ids": {"type": "int32", "shape": [-1]},    "seq_len": {"type": "int32", "shape": [-1]},    "seq_cls": {"type": "int32", "shape": [-1]}}train_file = os.path.join(mindrecord_dir, "train.mindrecord")test_file = os.path.join(mindrecord_dir, "test.mindrecord")train_writer = FileWriter(train_file, shard_num=num_train_shard, overwrite=True)test_writer = FileWriter(test_file, shard_num=num_test_shard, overwrite=True)train_writer.add_schema(data_schema, "train")test_writer.add_schema(data_schema, "test")# indexes = ["seq_ids", "seq_len", "seq_cls"]# train_writer.add_index(indexes)# test_writer.add_index(indexes)pad_id = 0unk_id = 1num_samples = 0num_train_samples = 0num_test_samples = 0train_samples = []test_samples = []for class_name, class_file_list in cls_txt_dict.items():    class_id = class_dict[class_name]    num_class_pass = 0    for txt_file in class_file_list:        txt_data = get_txt_data(txt_file=txt_file)        txt_len = len(txt_data)        if txt_len < min_seq_length:            num_class_pass += 1            continue        if txt_len > max_seq_length:            txt_data = txt_data[:max_seq_length]            txt_len = max_seq_length        word_ids = []        for word in txt_data:            word_id = vocab_dict.get(word, unk_id)            word_ids.append(word_id)        for _ in range(max_seq_length - txt_len):            word_ids.append(pad_id)        num_samples += 1        sample = {            "seq_ids": np.array(word_ids, dtype=np.int32),            "seq_len": np.array(txt_len, dtype=np.int32),            "seq_cls": np.array(class_id, dtype=np.int32)}        if num_samples % 10 == 0:            train_samples.append(sample)            num_train_samples += 1            if num_train_samples % 10000 == 0:                train_writer.write_raw_data(train_samples)                train_samples = []        else:            test_samples.append(sample)            num_test_samples += 1            if num_test_samples % 10000 == 0:                test_writer.write_raw_data(test_samples)                test_samples = []if train_samples:    train_writer.write_raw_data(train_samples)if test_samples:    test_writer.write_raw_data(test_samples)train_writer.commit()test_writer.commit()print("num samples: {}".format(num_samples), flush=True)print("num train samples: {}".format(num_train_samples), flush=True)print("num test samples: {}".format(num_test_samples), flush=True)

def main():

data_dir = "/Users/kaierlong/Documents/DownFiles/tmp/009_resources/THUCNews"mindrecord_dir = "/Users/kaierlong/Documents/DownFiles/tmp/009_resources/mindrecords"make_mindrecord_files(data_dir=data_dir, mindrecord_dir=mindrecord_dir)

if name == "__main__":

main()

3.1.2 代码解读get_txt_files、get_txt_data和build_vocab不再开展,这里重点介绍make_mindrecord_files。申明MindRecord文件格式train_writer = FileWriter(train_file, shard_num=num_train_shard, overwrite=True)
test_writer = FileWriter(test_file, shard_num=num_test_shard, overwrite=True)
解读:导入写入工具类之后,创立FileWriter对象实例,有三个参数,train_file、share_num和overwrite。train_file并非严格的具体数据写入文件,能够了解为文件前缀。 num_shard为写入的数据文件数量。定义MindRecord数据字段 data_schema = {

    "seq_ids": {"type": "int32", "shape": [-1]},    "seq_len": {"type": "int32", "shape": [-1]},    "seq_cls": {"type": "int32", "shape": [-1]}}train_writer.add_schema(data_schema, "train")test_writer.add_schema(data_schema, "test")

解读:首先定义数据集文件构造schema,而后通过add_schema增加到FileWriter实例对象中。schema蕴含字段名、字段数据类型type和字段数据维度维数shape,其中字段数据维度维数shape为可选的。如果字段有属性shape,则用户传入write_raw_data接口的数据必须为numpy.ndarray类型,对应数据类型必须为int32、int64、float32、float64。字段名:字段的援用名称,能够蕴含字母、数字和下划线。字段数据类型:蕴含int32、int64、float32、float64、string、bytes。字段维数:一维数组用[-1]示意,更高维度可示意为[m, n, …],其中m、n为各维度维数。增加MindRecord索引字段(可选)能够通过增加索引字段进行数据读取减速。然而要留神的是,索引字段的数据类型必须为主类型,即int/float/str,其余类型的话会报错,具体报错信息参考5.1问题1。写入MindRecord数据内容train_samples = []
sample = {
"seq_ids": np.array(word_ids, dtype=np.int32),
"seq_len": np.array(txt_len, dtype=np.int32),
"seq_cls": np.array(class_id, dtype=np.int32)}
train_samples.append(sample)
train_writer.write_raw_data(train_samples)
train_writer.commit()
解读:FileWriter实例对象的写入内容为列表list,列表内的数据单元为字典dict,具体内容格局要与前文中schema格局雷同。通过调用write_raw_data办法进行数据写入,全副数据写入后,通过commit办法进行提交。留神:数据写入内容列表长度为1即可进行写入,为了放慢写入速度,通常能够等到列表长度达到肯定值(依据写入数据大小和设施内存大小确定)再进行写入,如本文设定为10000。3.2 生成数据将3.1.1中的代码保留到文件generate_mindrecord.py,应用如下命令:留神替换代码中的data_dir和mindrecord_dirpython3 generate_mindrecord.py
在mindrecord数据目录下,应用tree . 命令查看生成的数据状况。内容如下:阐明:生成的Mindrecord训练数据文件为16个,即代码中对应的参数num_train_shard。生成的Mindrecord测试数据文件为4个,即代码中对应的参数num_test_shard。数据文件的前缀如代码中train_file和test_file。这里也再次阐明FileWriter中的file_name参数并非具体的数据文件名。.
├── test.mindrecord0
├── test.mindrecord0.db
├── test.mindrecord1
├── test.mindrecord1.db
├── test.mindrecord2
├── test.mindrecord2.db
├── test.mindrecord3
├── test.mindrecord3.db
├── train.mindrecord00
├── train.mindrecord00.db
├── train.mindrecord01
├── train.mindrecord01.db
├── train.mindrecord02
├── train.mindrecord02.db
├── train.mindrecord03
├── train.mindrecord03.db
├── train.mindrecord04
├── train.mindrecord04.db
├── train.mindrecord05
├── train.mindrecord05.db
├── train.mindrecord06
├── train.mindrecord06.db
├── train.mindrecord07
├── train.mindrecord07.db
├── train.mindrecord08
├── train.mindrecord08.db
├── train.mindrecord09
├── train.mindrecord09.db
├── train.mindrecord10
├── train.mindrecord10.db
├── train.mindrecord11
├── train.mindrecord11.db
├── train.mindrecord12
├── train.mindrecord12.db
├── train.mindrecord13
├── train.mindrecord13.db
├── train.mindrecord14
├── train.mindrecord14.db
├── train.mindrecord15
└── train.mindrecord15.db

0 directories, 40 files

  1. 数据加载在3中咱们解说了如何生成MindRecord数据,本节就来解说如何加载生成的MindRecord数据。4.1 加载代码加载MindRecord数据须要用2中提到的MindDataset数据加载接口。4.1.1 代码局部为保障复现后果统一,shuffle设置为了False。import os

from mindspore.dataset import MindDataset

def create_mindrecord_dataset(mindrecord_dir, train_mode=True):

if train_mode:    file_prefix = os.path.join(mindrecord_dir, "train.mindrecord00")else:    file_prefix = os.path.join(mindrecord_dir, "test.mindrecord0")dataset = MindDataset(dataset_files=file_prefix, columns_list=None, shuffle=False)for item in dataset.create_dict_iterator():    print(item, flush=True)    break

def main():

mindrecord_dir = "/Users/kaierlong/Documents/DownFiles/tmp/009_resources/mindrecords"create_mindrecord_dataset(mindrecord_dir=mindrecord_dir, train_mode=True)

if name == "__main__":

main()

4.1.2 代码解读在代码解读局部,重点解说一下,MindDataset中的dataset_files传入值。在3.1.1大节中,咱们将num_train_shard和num_test_shard别离设置为了16和4。仔细的读者可能发现3.2生成数据局部中生成的数据文件的最初的数字局部有所不同,test局部的数据是0、1、2、3结尾,而train局部的数据是00、01、...结尾。这就导致本节加载代码中dataset_files的传入值在针对train和test数据是不统一的,具体参考上文代码。如果train数据强行应用train.mindrecord0,那么会报错,具体报错内容参见5.2问题2。4.2 加载测试将4.1.1中的代码保留到load_mindrecord.py文件中,运行如下命令:python3 load_mindrecord.py
输入内容如下:阐明:读取数据胜利,蕴含三个字段:seq_cls、seq_ids、seq_len,且相应字段的shape与生成局部统一。{'seq_cls': Tensor(shape=[1], dtype=Int32, value= [0]), 'seq_ids': Tensor(shape=[800], dtype=Int32, value= [ 40, 80, 289, 400, 80, 163, 2239, 288, 413, 94, 309, 429, 3, 890, 664, 2941, 582, 539, 14,
......
55, 7, 5, 65, 7, 24, 40, 8, 40, 80, 1254, 396, 566, 276, 96, 42, 4, 73, 803, 857, 72, 3, 0, 0,

0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0, 0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0, 0,    0,    0,    0,    0,    0,    0,    0]), 'seq_len': Tensor(shape=[1], dtype=Int32, value= [742])}

补充:如果加载中MIndRecord数据文件过多,可能会导致报错,报错内容参见5.3问题3。这时可应用命令:# ulimit -n ${num}
ulimit -n 1024
长期批改到能失常加载的值即可。5. 问题解答5.1 问题1Traceback (most recent call last):
File "/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_make.py", line 167, in

main()

File "/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_make.py", line 163, in main

make_mindrecord(data_dir=data_dir, mindrecord_dir=mindrecord_dir)

File "/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_make.py", line 98, in make_mindrecord

train_writer.add_index(indexes)

File "/Users/kaierlong/Pyenvs/env_ms_1.7.0/lib/python3.9/site-packages/mindspore/mindrecord/filewriter.py", line 223, in add_index

raise MRMDefineIndexError("Failed to set field {} since it's not primitive type.".format(field))

mindspore.mindrecord.common.exceptions.MRMDefineIndexError: [MRMDefineIndexError]: Failed to define index field. Detail: Failed to set field seq_ids since it's not primitive type.
解答:The index fields should be primitive type. e.g. int/float/str.
5.2 问题2Traceback (most recent call last):
File "/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_load.py", line 36, in

main()

File "/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_load.py", line 32, in main

create_mindrecord_dataset(mindrecord_dir=mindrecord_dir, train_mode=True)

File "/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_load.py", line 23, in create_mindrecord_dataset

dataset = MindDataset(dataset_files=file_prefix, columns_list=None, shuffle=False)

File "/Users/kaierlong/Pyenvs/env_mix_dl/lib/python3.9/site-packages/mindspore/dataset/engine/validators.py", line 994, in new_method

check_file(dataset_file)

File "/Users/kaierlong/Pyenvs/env_mix_dl/lib/python3.9/site-packages/mindspore/dataset/core/validator_helpers.py", line 578, in check_file

raise ValueError("The file {} does not exist or permission denied!".format(dataset_file))

ValueError: The file /Users/kaierlong/Documents/DownFiles/tmp/009_resources/mindrecords/train.mindrecord0 does not exist or permission denied!
解答:参见4.1.2局部5.3 问题3Line of code : 247
File : /Users/jenkins/agent-working-dir/workspace/Compile_CPU_ARM_MacOS_PY39/mindspore/mindspore/ccsrc/minddata/mindrecord/io/shard_reader.cc

(env_ms_1.7.0) [kaierlong@Long-De-MacBook-Pro-16]: ~/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01$ python3 04_mindrecord_load.py
Traceback (most recent call last):
File "/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_load.py", line 36, in

main()

File "/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_load.py", line 32, in main

create_mindrecord_dataset(mindrecord_dir=mindrecord_dir, train_mode=True)

File "/Users/kaierlong/Codes/OpenI/kaierlong/Dive_Into_MindSpore/code/chapter_01/04_mindrecord_load.py", line 25, in create_mindrecord_dataset

for item in dataset.create_dict_iterator():

File "/Users/kaierlong/Pyenvs/env_ms_1.7.0/lib/python3.9/site-packages/mindspore/dataset/engine/validators.py", line 971, in new_method

return method(self, *args, **kwargs)

File "/Users/kaierlong/Pyenvs/env_ms_1.7.0/lib/python3.9/site-packages/mindspore/dataset/engine/datasets.py", line 1478, in create_dict_iterator

return DictIterator(self, num_epochs, output_numpy)

File "/Users/kaierlong/Pyenvs/env_ms_1.7.0/lib/python3.9/site-packages/mindspore/dataset/engine/iterators.py", line 95, in init

offload_model = offload.GetOffloadModel(consumer, self.__ori_dataset.get_col_names())

File "/Users/kaierlong/Pyenvs/env_ms_1.7.0/lib/python3.9/site-packages/mindspore/dataset/engine/datasets.py", line 1559, in get_col_names

self._col_names = runtime_getter[0].GetColumnNames()

RuntimeError: Unexpected error. Invalid file, failed to open files for reading mindrecord files. Please check file path, permission and open files limit(ulimit -a): /Users/kaierlong/Documents/DownFiles/tmp/009_resources/mindrecords/train.mindrecord11
Line of code : 247
File : /Users/jenkins/agent-working-dir/workspace/Compile_CPU_ARM_MacOS_PY39/mindspore/mindspore/ccsrc/minddata/mindrecord/io/shard_reader.cc
解答:留神:依据设施具体情况确定${num}值。# ulimit -n ${num}
ulimit -n 1024
批改前,应用ulimit -a查看,内容如下:core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
file size (blocks, -f) unlimited
max locked memory (kbytes, -l) unlimited
max memory size (kbytes, -m) unlimited
open files (-n) 256
pipe size (512 bytes, -p) 1
stack size (kbytes, -s) 8176
cpu time (seconds, -t) unlimited
max user processes (-u) 5333
virtual memory (kbytes, -v) unlimited
批改后,应用ulimit -a查看,内容如下:core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
file size (blocks, -f) unlimited
max locked memory (kbytes, -l) unlimited
max memory size (kbytes, -m) unlimited
open files (-n) 1024
pipe size (512 bytes, -p) 1
stack size (kbytes, -s) 8176
cpu time (seconds, -t) unlimited
max user processes (-u) 5333
virtual memory (kbytes, -v) unlimited

  1. 本文总结本文解说了MindSpore官网数据格式MindRecord的生成及数据集加载波及的MindDataset的应用。对于数据生成,笔者依据本人的教训,给出了简略的步骤供读者参考;对于数据读取,笔者同样依据本人的经验总结了几个常见的谬误以供读者避坑。7. 本文参考MindDataset API转换数据集为MindRecord格局转换本文为原创文章,版权归作者所有,未经受权不得转载!