5-1,数据管道Dataset

如果需要训练的数据大小不大,例如不到1G,那么可以直接全部读入内存中进行训练,这样一般效率最高。

但如果需要训练的数据很大,例如超过10G,无法一次载入内存,那么通常需要在训练的过程中分批逐渐读入。

使用 tf.data API 可以构建数据输入管道,轻松处理大量的数据,不同的数据格式,以及不同的数据转换。

一,构建数据管道

可以从 Numpy array, Pandas DataFrame, Python generator, csv文件, 文本文件, 文件路径, tfrecords文件等方式构建数据管道。

其中通过Numpy array, Pandas DataFrame, 文件路径构建数据管道是最常用的方法。

通过tfrecords文件方式构建数据管道较为复杂,需要对样本构建tf.Example后压缩成字符串写到tfrecords文件,读取后再解析成tf.Example。

但tfrecords文件的优点是压缩后文件较小,便于网络传播,加载速度较快。

1,从Numpy array构建数据管道

  1. # 从Numpy array构建数据管道
  2. import tensorflow as tf
  3. import numpy as np
  4. from sklearn import datasets
  5. iris = datasets.load_iris()
  6. ds1 = tf.data.Dataset.from_tensor_slices((iris["data"],iris["target"]))
  7. for features,label in ds1.take(5):
  8. print(features,label)
  1. tf.Tensor([5.1 3.5 1.4 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)
  2. tf.Tensor([4.9 3. 1.4 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)
  3. tf.Tensor([4.7 3.2 1.3 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)
  4. tf.Tensor([4.6 3.1 1.5 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)
  5. tf.Tensor([5. 3.6 1.4 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)

2,从 Pandas DataFrame构建数据管道

  1. # 从 Pandas DataFrame构建数据管道
  2. import tensorflow as tf
  3. from sklearn import datasets
  4. import pandas as pd
  5. iris = datasets.load_iris()
  6. dfiris = pd.DataFrame(iris["data"],columns = iris.feature_names)
  7. ds2 = tf.data.Dataset.from_tensor_slices((dfiris.to_dict("list"),iris["target"]))
  8. for features,label in ds2.take(3):
  9. print(features,label)
  1. {'sepal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=5.1>, 'sepal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=3.5>, 'petal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=1.4>, 'petal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=0.2>} tf.Tensor(0, shape=(), dtype=int64)
  2. {'sepal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=4.9>, 'sepal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=3.0>, 'petal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=1.4>, 'petal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=0.2>} tf.Tensor(0, shape=(), dtype=int64)
  3. {'sepal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=4.7>, 'sepal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=3.2>, 'petal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=1.3>, 'petal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=0.2>} tf.Tensor(0, shape=(), dtype=int64)

3,从Python generator构建数据管道

  1. # 从Python generator构建数据管道
  2. import tensorflow as tf
  3. from matplotlib import pyplot as plt
  4. from tensorflow.keras.preprocessing.image import ImageDataGenerator
  5. # 定义一个从文件中读取图片的generator
  6. image_generator = ImageDataGenerator(rescale=1.0/255).flow_from_directory(
  7. "./data/cifar2/test/",
  8. target_size=(32, 32),
  9. batch_size=20,
  10. class_mode='binary')
  11. classdict = image_generator.class_indices
  12. print(classdict)
  13. def generator():
  14. for features,label in image_generator:
  15. yield (features,label)
  16. ds3 = tf.data.Dataset.from_generator(generator,output_types=(tf.float32,tf.int32))
  1. %matplotlib inline
  2. %config InlineBackend.figure_format = 'svg'
  3. plt.figure(figsize=(6,6))
  4. for i,(img,label) in enumerate(ds3.unbatch().take(9)):
  5. ax=plt.subplot(3,3,i+1)
  6. ax.imshow(img.numpy())
  7. ax.set_title("label = %d"%label)
  8. ax.set_xticks([])
  9. ax.set_yticks([])
  10. plt.show()

5-1,数据管道Dataset - 图1

4,从csv文件构建数据管道

  1. # 从csv文件构建数据管道
  2. ds4 = tf.data.experimental.make_csv_dataset(
  3. file_pattern = ["./data/titanic/train.csv","./data/titanic/test.csv"],
  4. batch_size=3,
  5. label_name="Survived",
  6. na_value="",
  7. num_epochs=1,
  8. ignore_errors=True)
  9. for data,label in ds4.take(2):
  10. print(data,label)
  1. OrderedDict([('PassengerId', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([540, 58, 764], dtype=int32)>), ('Pclass', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([1, 3, 1], dtype=int32)>), ('Name', <tf.Tensor: shape=(3,), dtype=string, numpy=
  2. array([b'Frolicher, Miss. Hedwig Margaritha', b'Novel, Mr. Mansouer',
  3. b'Carter, Mrs. William Ernest (Lucile Polk)'], dtype=object)>), ('Sex', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'female', b'male', b'female'], dtype=object)>), ('Age', <tf.Tensor: shape=(3,), dtype=float32, numpy=array([22. , 28.5, 36. ], dtype=float32)>), ('SibSp', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([0, 0, 1], dtype=int32)>), ('Parch', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([2, 0, 2], dtype=int32)>), ('Ticket', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'13568', b'2697', b'113760'], dtype=object)>), ('Fare', <tf.Tensor: shape=(3,), dtype=float32, numpy=array([ 49.5 , 7.2292, 120. ], dtype=float32)>), ('Cabin', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'B39', b'', b'B96 B98'], dtype=object)>), ('Embarked', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'C', b'C', b'S'], dtype=object)>)]) tf.Tensor([1 0 1], shape=(3,), dtype=int32)
  4. OrderedDict([('PassengerId', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([845, 66, 390], dtype=int32)>), ('Pclass', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([3, 3, 2], dtype=int32)>), ('Name', <tf.Tensor: shape=(3,), dtype=string, numpy=
  5. array([b'Culumovic, Mr. Jeso', b'Moubarek, Master. Gerios',
  6. b'Lehmann, Miss. Bertha'], dtype=object)>), ('Sex', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'male', b'male', b'female'], dtype=object)>), ('Age', <tf.Tensor: shape=(3,), dtype=float32, numpy=array([17., 0., 17.], dtype=float32)>), ('SibSp', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([0, 1, 0], dtype=int32)>), ('Parch', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([0, 1, 0], dtype=int32)>), ('Ticket', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'315090', b'2661', b'SC 1748'], dtype=object)>), ('Fare', <tf.Tensor: shape=(3,), dtype=float32, numpy=array([ 8.6625, 15.2458, 12. ], dtype=float32)>), ('Cabin', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'', b'', b''], dtype=object)>), ('Embarked', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'S', b'C', b'C'], dtype=object)>)]) tf.Tensor([0 1 1], shape=(3,), dtype=int32)

5,从文本文件构建数据管道

  1. # 从文本文件构建数据管道
  2. ds5 = tf.data.TextLineDataset(
  3. filenames = ["./data/titanic/train.csv","./data/titanic/test.csv"]
  4. ).skip(1) #略去第一行header
  5. for line in ds5.take(5):
  6. print(line)
  1. tf.Tensor(b'493,0,1,"Molson, Mr. Harry Markland",male,55.0,0,0,113787,30.5,C30,S', shape=(), dtype=string)
  2. tf.Tensor(b'53,1,1,"Harper, Mrs. Henry Sleeper (Myna Haxtun)",female,49.0,1,0,PC 17572,76.7292,D33,C', shape=(), dtype=string)
  3. tf.Tensor(b'388,1,2,"Buss, Miss. Kate",female,36.0,0,0,27849,13.0,,S', shape=(), dtype=string)
  4. tf.Tensor(b'192,0,2,"Carbines, Mr. William",male,19.0,0,0,28424,13.0,,S', shape=(), dtype=string)
  5. tf.Tensor(b'687,0,3,"Panula, Mr. Jaako Arnold",male,14.0,4,1,3101295,39.6875,,S', shape=(), dtype=string)

6,从文件路径构建数据管道

  1. ds6 = tf.data.Dataset.list_files("./data/cifar2/train/*/*.jpg")
  2. for file in ds6.take(5):
  3. print(file)
  1. tf.Tensor(b'./data/cifar2/train/automobile/1263.jpg', shape=(), dtype=string)
  2. tf.Tensor(b'./data/cifar2/train/airplane/2837.jpg', shape=(), dtype=string)
  3. tf.Tensor(b'./data/cifar2/train/airplane/4264.jpg', shape=(), dtype=string)
  4. tf.Tensor(b'./data/cifar2/train/automobile/4241.jpg', shape=(), dtype=string)
  5. tf.Tensor(b'./data/cifar2/train/automobile/192.jpg', shape=(), dtype=string)
  1. from matplotlib import pyplot as plt
  2. def load_image(img_path,size = (32,32)):
  3. label = 1 if tf.strings.regex_full_match(img_path,".*/automobile/.*") else 0
  4. img = tf.io.read_file(img_path)
  5. img = tf.image.decode_jpeg(img) #注意此处为jpeg格式
  6. img = tf.image.resize(img,size)
  7. return(img,label)
  8. %matplotlib inline
  9. %config InlineBackend.figure_format = 'svg'
  10. for i,(img,label) in enumerate(ds6.map(load_image).take(2)):
  11. plt.figure(i)
  12. plt.imshow((img/255.0).numpy())
  13. plt.title("label = %d"%label)
  14. plt.xticks([])
  15. plt.yticks([])

5-1,数据管道Dataset - 图2

7,从tfrecords文件构建数据管道

  1. import os
  2. import numpy as np
  3. # inpath:原始数据路径 outpath:TFRecord文件输出路径
  4. def create_tfrecords(inpath,outpath):
  5. writer = tf.io.TFRecordWriter(outpath)
  6. dirs = os.listdir(inpath)
  7. for index, name in enumerate(dirs):
  8. class_path = inpath +"/"+ name+"/"
  9. for img_name in os.listdir(class_path):
  10. img_path = class_path + img_name
  11. img = tf.io.read_file(img_path)
  12. #img = tf.image.decode_image(img)
  13. #img = tf.image.encode_jpeg(img) #统一成jpeg格式压缩
  14. example = tf.train.Example(
  15. features=tf.train.Features(feature={
  16. 'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[index])),
  17. 'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img.numpy()]))
  18. }))
  19. writer.write(example.SerializeToString())
  20. writer.close()
  21. create_tfrecords("./data/cifar2/test/","./data/cifar2_test.tfrecords/")
  1. from matplotlib import pyplot as plt
  2. def parse_example(proto):
  3. description ={ 'img_raw' : tf.io.FixedLenFeature([], tf.string),
  4. 'label': tf.io.FixedLenFeature([], tf.int64)}
  5. example = tf.io.parse_single_example(proto, description)
  6. img = tf.image.decode_jpeg(example["img_raw"]) #注意此处为jpeg格式
  7. img = tf.image.resize(img, (32,32))
  8. label = example["label"]
  9. return(img,label)
  10. ds7 = tf.data.TFRecordDataset("./data/cifar2_test.tfrecords").map(parse_example).shuffle(3000)
  11. %matplotlib inline
  12. %config InlineBackend.figure_format = 'svg'
  13. plt.figure(figsize=(6,6))
  14. for i,(img,label) in enumerate(ds7.take(9)):
  15. ax=plt.subplot(3,3,i+1)
  16. ax.imshow((img/255.0).numpy())
  17. ax.set_title("label = %d"%label)
  18. ax.set_xticks([])
  19. ax.set_yticks([])
  20. plt.show()

5-1,数据管道Dataset - 图3

二,应用数据转换

Dataset数据结构应用非常灵活,因为它本质上是一个Sequece序列,其每个元素可以是各种类型,例如可以是张量,列表,字典,也可以是Dataset。

Dataset包含了非常丰富的数据转换功能。

  • map: 将转换函数映射到数据集每一个元素。

  • flat_map: 将转换函数映射到数据集的每一个元素,并将嵌套的Dataset压平。

  • interleave: 效果类似flat_map,但可以将不同来源的数据夹在一起。

  • filter: 过滤掉某些元素。

  • zip: 将两个长度相同的Dataset横向铰合。

  • concatenate: 将两个Dataset纵向连接。

  • reduce: 执行归并操作。

  • batch : 构建批次,每次放一个批次。比原始数据增加一个维度。 其逆操作为unbatch。

  • padded_batch: 构建批次,类似batch, 但可以填充到相同的形状。

  • window :构建滑动窗口,返回Dataset of Dataset.

  • shuffle: 数据顺序洗牌。

  • repeat: 重复数据若干次,不带参数时,重复无数次。

  • shard: 采样,从某个位置开始隔固定距离采样一个元素。

  • take: 采样,从开始位置取前几个元素。

  1. #map:将转换函数映射到数据集每一个元素
  2. ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
  3. ds_map = ds.map(lambda x:tf.strings.split(x," "))
  4. for x in ds_map:
  5. print(x)
  1. tf.Tensor([b'hello' b'world'], shape=(2,), dtype=string)
  2. tf.Tensor([b'hello' b'China'], shape=(2,), dtype=string)
  3. tf.Tensor([b'hello' b'Beijing'], shape=(2,), dtype=string)
  1. #flat_map:将转换函数映射到数据集的每一个元素,并将嵌套的Dataset压平。
  2. ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
  3. ds_flatmap = ds.flat_map(lambda x:tf.data.Dataset.from_tensor_slices(tf.strings.split(x," ")))
  4. for x in ds_flatmap:
  5. print(x)
  1. tf.Tensor(b'hello', shape=(), dtype=string)
  2. tf.Tensor(b'world', shape=(), dtype=string)
  3. tf.Tensor(b'hello', shape=(), dtype=string)
  4. tf.Tensor(b'China', shape=(), dtype=string)
  5. tf.Tensor(b'hello', shape=(), dtype=string)
  6. tf.Tensor(b'Beijing', shape=(), dtype=string)
  1. # interleave: 效果类似flat_map,但可以将不同来源的数据夹在一起。
  2. ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
  3. ds_interleave = ds.interleave(lambda x:tf.data.Dataset.from_tensor_slices(tf.strings.split(x," ")))
  4. for x in ds_interleave:
  5. print(x)
  1. tf.Tensor(b'hello', shape=(), dtype=string)
  2. tf.Tensor(b'hello', shape=(), dtype=string)
  3. tf.Tensor(b'hello', shape=(), dtype=string)
  4. tf.Tensor(b'world', shape=(), dtype=string)
  5. tf.Tensor(b'China', shape=(), dtype=string)
  6. tf.Tensor(b'Beijing', shape=(), dtype=string)
  1. #filter:过滤掉某些元素。
  2. ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
  3. #找出含有字母a或B的元素
  4. ds_filter = ds.filter(lambda x: tf.strings.regex_full_match(x, ".*[a|B].*"))
  5. for x in ds_filter:
  6. print(x)
  1. tf.Tensor(b'hello China', shape=(), dtype=string)
  2. tf.Tensor(b'hello Beijing', shape=(), dtype=string)
  1. #zip:将两个长度相同的Dataset横向铰合。
  2. ds1 = tf.data.Dataset.range(0,3)
  3. ds2 = tf.data.Dataset.range(3,6)
  4. ds3 = tf.data.Dataset.range(6,9)
  5. ds_zip = tf.data.Dataset.zip((ds1,ds2,ds3))
  6. for x,y,z in ds_zip:
  7. print(x.numpy(),y.numpy(),z.numpy())
  1. 0 3 6
  2. 1 4 7
  3. 2 5 8
  1. #condatenate:将两个Dataset纵向连接。
  2. ds1 = tf.data.Dataset.range(0,3)
  3. ds2 = tf.data.Dataset.range(3,6)
  4. ds_concat = tf.data.Dataset.concatenate(ds1,ds2)
  5. for x in ds_concat:
  6. print(x)
  1. tf.Tensor(0, shape=(), dtype=int64)
  2. tf.Tensor(1, shape=(), dtype=int64)
  3. tf.Tensor(2, shape=(), dtype=int64)
  4. tf.Tensor(3, shape=(), dtype=int64)
  5. tf.Tensor(4, shape=(), dtype=int64)
  6. tf.Tensor(5, shape=(), dtype=int64)
  1. #reduce:执行归并操作。
  2. ds = tf.data.Dataset.from_tensor_slices([1,2,3,4,5.0])
  3. result = ds.reduce(0.0,lambda x,y:tf.add(x,y))
  4. result
  1. <tf.Tensor: shape=(), dtype=float32, numpy=15.0>
  1. #batch:构建批次,每次放一个批次。比原始数据增加一个维度。 其逆操作为unbatch。
  2. ds = tf.data.Dataset.range(12)
  3. ds_batch = ds.batch(4)
  4. for x in ds_batch:
  5. print(x)
  1. tf.Tensor([0 1 2 3], shape=(4,), dtype=int64)
  2. tf.Tensor([4 5 6 7], shape=(4,), dtype=int64)
  3. tf.Tensor([ 8 9 10 11], shape=(4,), dtype=int64)
  1. #padded_batch:构建批次,类似batch, 但可以填充到相同的形状。
  2. elements = [[1, 2],[3, 4, 5],[6, 7],[8]]
  3. ds = tf.data.Dataset.from_generator(lambda: iter(elements), tf.int32)
  4. ds_padded_batch = ds.padded_batch(2,padded_shapes = [4,])
  5. for x in ds_padded_batch:
  6. print(x)
  1. tf.Tensor(
  2. [[1 2 0 0]
  3. [3 4 5 0]], shape=(2, 4), dtype=int32)
  4. tf.Tensor(
  5. [[6 7 0 0]
  6. [8 0 0 0]], shape=(2, 4), dtype=int32)
  1. #window:构建滑动窗口,返回Dataset of Dataset.
  2. ds = tf.data.Dataset.range(12)
  3. #window返回的是Dataset of Dataset,可以用flat_map压平
  4. ds_window = ds.window(3, shift=1).flat_map(lambda x: x.batch(3,drop_remainder=True))
  5. for x in ds_window:
  6. print(x)
  1. tf.Tensor([0 1 2], shape=(3,), dtype=int64)
  2. tf.Tensor([1 2 3], shape=(3,), dtype=int64)
  3. tf.Tensor([2 3 4], shape=(3,), dtype=int64)
  4. tf.Tensor([3 4 5], shape=(3,), dtype=int64)
  5. tf.Tensor([4 5 6], shape=(3,), dtype=int64)
  6. tf.Tensor([5 6 7], shape=(3,), dtype=int64)
  7. tf.Tensor([6 7 8], shape=(3,), dtype=int64)
  8. tf.Tensor([7 8 9], shape=(3,), dtype=int64)
  9. tf.Tensor([ 8 9 10], shape=(3,), dtype=int64)
  10. tf.Tensor([ 9 10 11], shape=(3,), dtype=int64)
  1. #shuffle:数据顺序洗牌。
  2. ds = tf.data.Dataset.range(12)
  3. ds_shuffle = ds.shuffle(buffer_size = 5)
  4. for x in ds_shuffle:
  5. print(x)
  1. tf.Tensor(1, shape=(), dtype=int64)
  2. tf.Tensor(4, shape=(), dtype=int64)
  3. tf.Tensor(0, shape=(), dtype=int64)
  4. tf.Tensor(6, shape=(), dtype=int64)
  5. tf.Tensor(5, shape=(), dtype=int64)
  6. tf.Tensor(2, shape=(), dtype=int64)
  7. tf.Tensor(7, shape=(), dtype=int64)
  8. tf.Tensor(11, shape=(), dtype=int64)
  9. tf.Tensor(3, shape=(), dtype=int64)
  10. tf.Tensor(9, shape=(), dtype=int64)
  11. tf.Tensor(10, shape=(), dtype=int64)
  12. tf.Tensor(8, shape=(), dtype=int64)
  1. #repeat:重复数据若干次,不带参数时,重复无数次。
  2. ds = tf.data.Dataset.range(3)
  3. ds_repeat = ds.repeat(3)
  4. for x in ds_repeat:
  5. print(x)
  1. tf.Tensor(0, shape=(), dtype=int64)
  2. tf.Tensor(1, shape=(), dtype=int64)
  3. tf.Tensor(2, shape=(), dtype=int64)
  4. tf.Tensor(0, shape=(), dtype=int64)
  5. tf.Tensor(1, shape=(), dtype=int64)
  6. tf.Tensor(2, shape=(), dtype=int64)
  7. tf.Tensor(0, shape=(), dtype=int64)
  8. tf.Tensor(1, shape=(), dtype=int64)
  9. tf.Tensor(2, shape=(), dtype=int64)
  1. #shard:采样,从某个位置开始隔固定距离采样一个元素。
  2. ds = tf.data.Dataset.range(12)
  3. ds_shard = ds.shard(3,index = 1)
  4. for x in ds_shard:
  5. print(x)
  1. tf.Tensor(1, shape=(), dtype=int64)
  2. tf.Tensor(4, shape=(), dtype=int64)
  3. tf.Tensor(7, shape=(), dtype=int64)
  4. tf.Tensor(10, shape=(), dtype=int64)
  1. #take:采样,从开始位置取前几个元素。
  2. ds = tf.data.Dataset.range(12)
  3. ds_take = ds.take(3)
  4. list(ds_take.as_numpy_iterator())
  1. [0, 1, 2]

三,提升管道性能

训练深度学习模型常常会非常耗时。

模型训练的耗时主要来自于两个部分,一部分来自数据准备,另一部分来自参数迭代

参数迭代过程的耗时通常依赖于GPU来提升。

而数据准备过程的耗时则可以通过构建高效的数据管道进行提升。

以下是一些构建高效数据管道的建议。

  • 1,使用 prefetch 方法让数据准备和参数迭代两个过程相互并行。

  • 2,使用 interleave 方法可以让数据读取过程多进程执行,并将不同来源数据夹在一起。

  • 3,使用 map 时设置num_parallel_calls 让数据转换过程多进程执行。

  • 4,使用 cache 方法让数据在第一个epoch后缓存到内存中,仅限于数据集不大情形。

  • 5,使用 map转换时,先batch, 然后采用向量化的转换方法对每个batch进行转换。

1,使用 prefetch 方法让数据准备和参数迭代两个过程相互并行。

  1. import tensorflow as tf
  2. #打印时间分割线
  3. @tf.function
  4. def printbar():
  5. ts = tf.timestamp()
  6. today_ts = ts%(24*60*60)
  7. hour = tf.cast(today_ts//3600+8,tf.int32)%tf.constant(24)
  8. minite = tf.cast((today_ts%3600)//60,tf.int32)
  9. second = tf.cast(tf.floor(today_ts%60),tf.int32)
  10. def timeformat(m):
  11. if tf.strings.length(tf.strings.format("{}",m))==1:
  12. return(tf.strings.format("0{}",m))
  13. else:
  14. return(tf.strings.format("{}",m))
  15. timestring = tf.strings.join([timeformat(hour),timeformat(minite),
  16. timeformat(second)],separator = ":")
  17. tf.print("=========="*8,end = "")
  18. tf.print(timestring)
  1. import time
  2. # 数据准备和参数迭代两个过程默认情况下是串行的。
  3. # 模拟数据准备
  4. def generator():
  5. for i in range(10):
  6. #假设每次准备数据需要2s
  7. time.sleep(2)
  8. yield i
  9. ds = tf.data.Dataset.from_generator(generator,output_types = (tf.int32))
  10. # 模拟参数迭代
  11. def train_step():
  12. #假设每一步训练需要1s
  13. time.sleep(1)
  1. # 训练过程预计耗时 10*2+10*1 = 30s
  2. printbar()
  3. tf.print(tf.constant("start training..."))
  4. for x in ds:
  5. train_step()
  6. printbar()
  7. tf.print(tf.constant("end training..."))
  1. # 使用 prefetch 方法让数据准备和参数迭代两个过程相互并行。
  2. # 训练过程预计耗时 max(10*2,10*1) = 20s
  3. printbar()
  4. tf.print(tf.constant("start training with prefetch..."))
  5. # tf.data.experimental.AUTOTUNE 可以让程序自动选择合适的参数
  6. for x in ds.prefetch(buffer_size = tf.data.experimental.AUTOTUNE):
  7. train_step()
  8. printbar()
  9. tf.print(tf.constant("end training..."))

2,使用 interleave 方法可以让数据读取过程多进程执行,并将不同来源数据夹在一起。

  1. ds_files = tf.data.Dataset.list_files("./data/titanic/*.csv")
  2. ds = ds_files.flat_map(lambda x:tf.data.TextLineDataset(x).skip(1))
  3. for line in ds.take(4):
  4. print(line)
  1. tf.Tensor(b'493,0,1,"Molson, Mr. Harry Markland",male,55.0,0,0,113787,30.5,C30,S', shape=(), dtype=string)
  2. tf.Tensor(b'53,1,1,"Harper, Mrs. Henry Sleeper (Myna Haxtun)",female,49.0,1,0,PC 17572,76.7292,D33,C', shape=(), dtype=string)
  3. tf.Tensor(b'388,1,2,"Buss, Miss. Kate",female,36.0,0,0,27849,13.0,,S', shape=(), dtype=string)
  4. tf.Tensor(b'192,0,2,"Carbines, Mr. William",male,19.0,0,0,28424,13.0,,S', shape=(), dtype=string)
  1. ds_files = tf.data.Dataset.list_files("./data/titanic/*.csv")
  2. ds = ds_files.interleave(lambda x:tf.data.TextLineDataset(x).skip(1))
  3. for line in ds.take(8):
  4. print(line)
  1. tf.Tensor(b'181,0,3,"Sage, Miss. Constance Gladys",female,,8,2,CA. 2343,69.55,,S', shape=(), dtype=string)
  2. tf.Tensor(b'493,0,1,"Molson, Mr. Harry Markland",male,55.0,0,0,113787,30.5,C30,S', shape=(), dtype=string)
  3. tf.Tensor(b'405,0,3,"Oreskovic, Miss. Marija",female,20.0,0,0,315096,8.6625,,S', shape=(), dtype=string)
  4. tf.Tensor(b'53,1,1,"Harper, Mrs. Henry Sleeper (Myna Haxtun)",female,49.0,1,0,PC 17572,76.7292,D33,C', shape=(), dtype=string)
  5. tf.Tensor(b'635,0,3,"Skoog, Miss. Mabel",female,9.0,3,2,347088,27.9,,S', shape=(), dtype=string)
  6. tf.Tensor(b'388,1,2,"Buss, Miss. Kate",female,36.0,0,0,27849,13.0,,S', shape=(), dtype=string)
  7. tf.Tensor(b'701,1,1,"Astor, Mrs. John Jacob (Madeleine Talmadge Force)",female,18.0,1,0,PC 17757,227.525,C62 C64,C', shape=(), dtype=string)
  8. tf.Tensor(b'192,0,2,"Carbines, Mr. William",male,19.0,0,0,28424,13.0,,S', shape=(), dtype=string)

3,使用 map 时设置num_parallel_calls 让数据转换过程多进行执行。

  1. ds = tf.data.Dataset.list_files("./data/cifar2/train/*/*.jpg")
  2. def load_image(img_path,size = (32,32)):
  3. label = 1 if tf.strings.regex_full_match(img_path,".*/automobile/.*") else 0
  4. img = tf.io.read_file(img_path)
  5. img = tf.image.decode_jpeg(img) #注意此处为jpeg格式
  6. img = tf.image.resize(img,size)
  7. return(img,label)
  1. #单进程转换
  2. printbar()
  3. tf.print(tf.constant("start transformation..."))
  4. ds_map = ds.map(load_image)
  5. for _ in ds_map:
  6. pass
  7. printbar()
  8. tf.print(tf.constant("end transformation..."))
  1. #多进程转换
  2. printbar()
  3. tf.print(tf.constant("start parallel transformation..."))
  4. ds_map_parallel = ds.map(load_image,num_parallel_calls = tf.data.experimental.AUTOTUNE)
  5. for _ in ds_map_parallel:
  6. pass
  7. printbar()
  8. tf.print(tf.constant("end parallel transformation..."))

4,使用 cache 方法让数据在第一个epoch后缓存到内存中,仅限于数据集不大情形。

  1. import time
  2. # 模拟数据准备
  3. def generator():
  4. for i in range(5):
  5. #假设每次准备数据需要2s
  6. time.sleep(2)
  7. yield i
  8. ds = tf.data.Dataset.from_generator(generator,output_types = (tf.int32))
  9. # 模拟参数迭代
  10. def train_step():
  11. #假设每一步训练需要0s
  12. pass
  13. # 训练过程预计耗时 (5*2+5*0)*3 = 30s
  14. printbar()
  15. tf.print(tf.constant("start training..."))
  16. for epoch in tf.range(3):
  17. for x in ds:
  18. train_step()
  19. printbar()
  20. tf.print("epoch =",epoch," ended")
  21. printbar()
  22. tf.print(tf.constant("end training..."))
  1. import time
  2. # 模拟数据准备
  3. def generator():
  4. for i in range(5):
  5. #假设每次准备数据需要2s
  6. time.sleep(2)
  7. yield i
  8. # 使用 cache 方法让数据在第一个epoch后缓存到内存中,仅限于数据集不大情形。
  9. ds = tf.data.Dataset.from_generator(generator,output_types = (tf.int32)).cache()
  10. # 模拟参数迭代
  11. def train_step():
  12. #假设每一步训练需要0s
  13. time.sleep(0)
  14. # 训练过程预计耗时 (5*2+5*0)+(5*0+5*0)*2 = 10s
  15. printbar()
  16. tf.print(tf.constant("start training..."))
  17. for epoch in tf.range(3):
  18. for x in ds:
  19. train_step()
  20. printbar()
  21. tf.print("epoch =",epoch," ended")
  22. printbar()
  23. tf.print(tf.constant("end training..."))

5,使用 map转换时,先batch, 然后采用向量化的转换方法对每个batch进行转换。

  1. #先map后batch
  2. ds = tf.data.Dataset.range(100000)
  3. ds_map_batch = ds.map(lambda x:x**2).batch(20)
  4. printbar()
  5. tf.print(tf.constant("start scalar transformation..."))
  6. for x in ds_map_batch:
  7. pass
  8. printbar()
  9. tf.print(tf.constant("end scalar transformation..."))
  1. #先batch后map
  2. ds = tf.data.Dataset.range(100000)
  3. ds_batch_map = ds.batch(20).map(lambda x:x**2)
  4. printbar()
  5. tf.print(tf.constant("start vector transformation..."))
  6. for x in ds_batch_map:
  7. pass
  8. printbar()
  9. tf.print(tf.constant("end vector transformation..."))

如果对本书内容理解上有需要进一步和作者交流的地方,欢迎在公众号”Python与算法之美”下留言。作者时间和精力有限,会酌情予以回复。

也可以在公众号后台回复关键字:加群,加入读者交流群和大家讨论。

image.png