TensorFlow data加载 and 预processing

Learningusingtf.data API加载 and 预processingdata, including批processing, 打乱, map and cacheetc.operation, improvingmodel训练efficiency

1. tf.data API Introduction

tf.data API is TensorFlow 2.xin用于构建输入pipeline 强 big tool, 它providing了 high 效, flexible dataprocessingcapacity. usingtf.data API可以:

  • high 效加载 big 型data集
  • for datafor各种预processingoperation
  • parallelprocessingdata, improving训练efficiency
  • 构建 complex 输入pipeline
  • support many 种data格式

tf.data API corecomponent is tf.data.Datasetclass, 它表示一个元素序列, 每个元素可以 is a or many 个张量.

2. creationdata集

我们可以 from many 种来sourcescreationtf.data.Dataset:

2.1 from memoryin datacreation

usingtf.data.Dataset.from_tensor_slices() from memoryin 张量creationdata集:

import tensorflow as tf
import numpy as np

#  from numpyarraycreationdata集
X = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
y = np.array([2, 4, 6, 8, 10, 12, 14, 16, 18, 20])

# creationdata集
dataset = tf.data.Dataset.from_tensor_slices((X, y))

# 打印data集元素
for x, y in dataset:
    print(f"x: {x.numpy()}, y: {y.numpy()}")

2.2 from filecreation

我们可以 from filecreationdata集, support many 种file格式, such asCSV, graph像, 文本etc..

2.2.1 from CSVfilecreation

#  from CSVfilecreationdata集
csv_dataset = tf.data.experimental.make_csv_dataset(
    'data.csv',  # CSVfilepath
    batch_size=2,  # 批次 big  small 
    column_names=['feature1', 'feature2', 'label'],  # 列名
    label_name='label',  # tag列名
    num_epochs=1,  # iteration次数
    shuffle=True  #  is 否打乱data
)

# 打印data集元素
for features, label in csv_dataset:
    print(f"特征: {features}")
    print(f"tag: {label}")
    print()

2.2.2 from graph像filecreation

#  from graph像filecreationdata集
# 首先获取所 has graph像filepath
import os

# fake设我们 has 一个package含graph像 Table of Contents, Table of Contentsstructure for : 
# images/
#   cat/
#     cat1.jpg
#     cat2.jpg
#   dog/
#     dog1.jpg
#     dog2.jpg

image_dir = 'images/'
class_names = ['cat', 'dog']

# 获取所 has graph像filepath and tag
file_paths = []
labels = []

for class_name in class_names:
    class_dir = os.path.join(image_dir, class_name)
    for filename in os.listdir(class_dir):
        if filename.endswith('.jpg') or filename.endswith('.png'):
            file_paths.append(os.path.join(class_dir, filename))
            labels.append(class_names.index(class_name))

# creationdata集
file_dataset = tf.data.Dataset.from_tensor_slices(file_paths)
label_dataset = tf.data.Dataset.from_tensor_slices(labels)

# mergedata集
dataset = tf.data.Dataset.zip((file_dataset, label_dataset))

# 定义graph像加载function
def load_image(file_path, label):
    # 读取graph像file
    image = tf.io.read_file(file_path)
    # 解码JPEGgraph像
    image = tf.image.decode_jpeg(image, channels=3)
    # 调整graph像 big  small 
    image = tf.image.resize(image, [224, 224])
    # 归一化graph像
    image = image / 255.0
    return image, label

# mapgraph像加载function
dataset = dataset.map(load_image)

# 打印data集元素
for image, label in dataset:
    print(f"graph像形状: {image.shape}, tag: {label.numpy()}")
    break

3. data集operation

tf.data APIproviding了 many 种operation来processingdata集:

3.1 打乱data

usingshuffle()method打乱data:

# 打乱data
dataset = dataset.shuffle(buffer_size=100)  # buffer_size is 缓冲区 big  small 

提示

buffer_sizeparameter表示用于打乱 缓冲区 big small . for 于 big 型data集, 设置较 big 缓冲区 big small 可以获得更 good 打乱效果, 但会占用更 many memory.

3.2 批processing

usingbatch()method将data分成批次:

# 批processing
dataset = dataset.batch(batch_size=32)

3.3 重复data集

usingrepeat()method重复data集, 用于 many 轮训练:

# 重复data集
dataset = dataset.repeat(num_epochs)  # num_epochs is 重复次数, None表示无限重复

3.4 mapfunction

usingmap()method将functionapplication于data集 每个元素:

# 定义预processingfunction
def preprocess(x, y):
    x = tf.cast(x, tf.float32)  # 转换 for float32class型
    x = x / 255.0  # 归一化
    y = tf.cast(y, tf.float32)  # 转换 for float32class型
    return x, y

# application预processingfunction
dataset = dataset.map(preprocess)

3.5 cachedata

usingcache()methodcachedata, reducing重复计算:

# cachedata
dataset = dataset.cache()  # cache to memory
#  or cache to file
dataset = dataset.cache('cache_file')

3.6 跳过 and 获取data

usingskip() and take()method跳过 or 获取指定数量 元素:

# 跳过 before 2个元素
dataset_skip = dataset.skip(2)

# 获取 before 5个元素
dataset_take = dataset.take(5)

3.7 parallelprocessing

usingnum_parallel_callsparameterimplementationparallelprocessing:

# parallelmap
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)

# parallel批processing
dataset = dataset.batch(batch_size=32, num_parallel_calls=tf.data.AUTOTUNE)

tf.data.AUTOTUNE表示自动选择最佳parallel度.

4. 构建 high 效 输入pipeline

构建 high 效 输入pipeline for 于improvingmodel训练速度至关 important . 以 under is 一些best practices:

4.1 pipeline顺序

正确 pipeline顺序应该 is :

# 正确 pipeline顺序
dataset = tf.data.Dataset.from_tensor_slices((X, y))
dataset = dataset.shuffle(buffer_size=1000)
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.cache()
dataset = dataset.batch(batch_size=32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)

4.2 usingprefetch

usingprefetch()method in 训练model 同时预取 under 一批data, reducingI/Oetc.待时间:

# 预取data
dataset = dataset.prefetch(tf.data.AUTOTUNE)

4.3 usingtf.data.AUTOTUNE

usingtf.data.AUTOTUNE自动调整parallel度 and 预取缓冲区 big small :

# usingAUTOTUNE
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.batch(batch_size=32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)

5. processing big 型data集

for 于 big 型data集, 我们可以using以 under techniques来improvingprocessingefficiency:

5.1 memorymap

for 于 big 型file, 可以usingmemorymap来reducingI/Ooperation:

# usingmemorymap
def load_large_file(file_path):
    # memorymapfile
    mmapped_file = np.memmap(file_path, dtype=np.float32, mode='r')
    # 转换 for TensorFlow张量
    tensor = tf.convert_to_tensor(mmapped_file)
    return tensor

5.2 增量加载

for 于无法一次性加载 to memory big 型data集, 可以using增量加载:

# 增量加载example
def generate_data():
    # 生成data 逻辑
    for i in range(10000):
        yield np.random.rand(10), np.random.randint(0, 2)

#  from 生成器creationdata集
dataset = tf.data.Dataset.from_generator(
    generate_data,  # 生成器function
    output_signature=(  # 输出signature
        tf.TensorSpec(shape=(10,), dtype=tf.float32),
        tf.TensorSpec(shape=(), dtype=tf.int32)
    )
)

6. data增强

data增强 is a用于增加训练data many 样性 techniques, 可以improvingmodel 泛化capacity. tf.data API可以 and tf.imagemodule结合using, implementationgraph像data增强.

6.1 graph像data增强

# graph像data增强function
def augment_image(image, label):
    # 随机翻转graph像
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)
    
    # 随机旋转graph像
    image = tf.image.rot90(image, k=tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32))
    
    # 随机调整亮度,  for 比度, 饱 and 度
    image = tf.image.random_brightness(image, max_delta=0.1)
    image = tf.image.random_contrast(image, lower=0.8, upper=1.2)
    image = tf.image.random_saturation(image, lower=0.8, upper=1.2)
    
    # 随机裁剪graph像
    image = tf.image.random_crop(image, size=[224, 224, 3])
    
    # 归一化graph像
    image = image / 255.0
    
    return image, label

# applicationdata增强
dataset = dataset.map(augment_image, num_parallel_calls=tf.data.AUTOTUNE)

6.2 文本data增强

# 文本data增强example
def augment_text(text, label):
    # 随机插入空格
    def insert_spaces(text):
        text = tf.strings.regex_replace(text, r'(.)', r'\1 ')
        text = tf.strings.strip(text)
        return text
    
    # 随机replace字符
    def replace_chars(text):
        #  simple  replaceexample, practicalapplicationin可以using更 complex  replace策略
        text = tf.strings.regex_replace(text, r'a', r'@')
        text = tf.strings.regex_replace(text, r'e', r'3')
        return text
    
    # 随机选择增强方式
    if tf.random.uniform(shape=()) > 0.5:
        text = insert_spaces(text)
    if tf.random.uniform(shape=()) > 0.5:
        text = replace_chars(text)
    
    return text, label

# application文本data增强
text_dataset = text_dataset.map(augment_text, num_parallel_calls=tf.data.AUTOTUNE)

7. processing不同class型 data

tf.data APIsupportprocessingvarious types ofdata:

7.1 数值data

# 数值dataprocessing
# creation数值data集
X = tf.random.normal(shape=(100, 10))
y = tf.random.uniform(shape=(100,), minval=0, maxval=2, dtype=tf.int32)

# creationdata集
dataset = tf.data.Dataset.from_tensor_slices((X, y))

# 预processing
def preprocess_numeric(x, y):
    x = tf.cast(x, tf.float32)
    y = tf.one_hot(y, depth=2)  # 独热编码
    return x, y

dataset = dataset.map(preprocess_numeric)

7.2 文本data

# 文本dataprocessing
# fake设我们 has 一个文本list and  for 应 tag
texts = ["这 is a  good message", "今天天气很 good ", "我喜欢programming", "TensorFlow很强 big "]
labels = [1, 1, 0, 0]

# creationdata集
dataset = tf.data.Dataset.from_tensor_slices((texts, labels))

# 文本预processing
def preprocess_text(text, label):
    # 分词 (这里using simple  空格分词, practicalapplicationin可以using更 complex  分词method) 
    words = tf.strings.split(text)
    
    # 将词转换 for 词ID (这里using simple  map, practicalapplicationin应该using预训练 词嵌入) 
    word_ids = tf.map_fn(lambda word: tf.strings.to_hash_bucket(word, num_buckets=1000), words, dtype=tf.int64)
    
    # 填充 or 截断序列
    word_ids = tf.pad(word_ids, [[0, 10 - tf.shape(word_ids)[0]]])  # 填充 to  long 度10
    word_ids = word_ids[:10]  # 截断 to  long 度10
    
    return word_ids, label

dataset = dataset.map(preprocess_text)

7.3 时间序列data

# 时间序列dataprocessing
# creation时间序列data
# fake设我们 has 100个时间步, 每个时间步 has 3个特征
time_series_data = tf.random.normal(shape=(100, 3))

# creation滑动窗口data集
def create_sliding_window_dataset(data, window_size=10, batch_size=32):
    # creation滑动窗口
    dataset = tf.data.Dataset.from_tensor_slices(data)
    dataset = dataset.window(window_size + 1, shift=1, drop_remainder=True)
    
    # 展平窗口
    dataset = dataset.flat_map(lambda window: window.batch(window_size + 1))
    
    # 分割特征 and tag
    dataset = dataset.map(lambda window: (window[:-1], window[-1]))
    
    # 打乱 and 批processing
    dataset = dataset.shuffle(buffer_size=1000)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(tf.data.AUTOTUNE)
    
    return dataset

# creation滑动窗口data集
dataset = create_sliding_window_dataset(time_series_data, window_size=10, batch_size=32)

8. performanceoptimization

以 under is 一些optimizationtf.dataperformance techniques:

8.1 usingtf.data.AUTOTUNE

usingtf.data.AUTOTUNE自动调整parallel度 and 预取缓冲区 big small , 可以根据systemresource自动optimizationperformance.

8.2 cachedata

for 于重复using data集, usingcache()methodcachedata, 可以reducingI/Ooperation and 重复计算.

8.3 parallelprocessing

usingnum_parallel_callsparameterparallelprocessingdata, 可以充分利用 many 核CPU.

8.4 预取data

usingprefetch()method预取 under 一批data, 可以隐藏I/Olatency, improving训练速度.

8.5 批processing big small 调优

调整批processing big small 可以影响训练速度 and modelperformance. 较 big 批processing big small 可以improvingGPU利用率, 但会占用更 many memory, 并且可能导致model泛化capacity under 降.

8.6 reducingdatacopy

尽量reducingdatacopy, usingfrom_tensor_slices()而不 is from_tensors(), 因 for from_tensor_slices()不会copydata.

8.7 usingtf.function

using@tf.function装饰器将Pythonfunction转换 for TensorFlowgraph, 可以improvingfunction执行efficiency.

# usingtf.functionoptimization预processingfunction
@tf.function
def preprocess(x, y):
    x = tf.cast(x, tf.float32)
    x = x / 255.0
    y = tf.one_hot(y, depth=2)
    return x, y

9. 练习

练习 1: 构建graph像classification输入pipeline

  1. creation一个package含猫 and 狗graph像 Table of Contentsstructure.
  2. usingtf.data APIcreationgraph像classificationdata集.
  3. implementationdata加载 and 预processingfunction.
  4. 添加data增强operation.
  5. 构建 high 效 输入pipeline, including打乱, 批processing, cache and 预取.
  6. test输入pipeline performance.

练习 2: 时间序列预测

  1. 生成 or 加载时间序列data.
  2. usingtf.data APIcreation滑动窗口data集.
  3. 构建时间序列预测model.
  4. 训练model并assessmentperformance.

练习 3: 文本classification

  1. 加载文本classificationdata集.
  2. implementation文本预processingfunction, including分词, 词嵌入etc..
  3. usingtf.data API构建输入pipeline.
  4. 构建 and 训练文本classificationmodel.
on 一节: TensorFlow 构建 simple model under 一节: TensorFlow model训练 and optimization