1. tf.data API Introduction
tf.data API is TensorFlow 2.xin用于构建输入pipeline 强 big tool, 它providing了 high 效, flexible dataprocessingcapacity. usingtf.data API可以:
- high 效加载 big 型data集
- for datafor各种预processingoperation
- parallelprocessingdata, improving训练efficiency
- 构建 complex 输入pipeline
- support many 种data格式
tf.data API corecomponent is tf.data.Datasetclass, 它表示一个元素序列, 每个元素可以 is a or many 个张量.
2. creationdata集
我们可以 from many 种来sourcescreationtf.data.Dataset:
2.1 from memoryin datacreation
usingtf.data.Dataset.from_tensor_slices() from memoryin 张量creationdata集:
import tensorflow as tf
import numpy as np
# from numpyarraycreationdata集
X = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
y = np.array([2, 4, 6, 8, 10, 12, 14, 16, 18, 20])
# creationdata集
dataset = tf.data.Dataset.from_tensor_slices((X, y))
# 打印data集元素
for x, y in dataset:
print(f"x: {x.numpy()}, y: {y.numpy()}")
2.2 from filecreation
我们可以 from filecreationdata集, support many 种file格式, such asCSV, graph像, 文本etc..
2.2.1 from CSVfilecreation
# from CSVfilecreationdata集
csv_dataset = tf.data.experimental.make_csv_dataset(
'data.csv', # CSVfilepath
batch_size=2, # 批次 big small
column_names=['feature1', 'feature2', 'label'], # 列名
label_name='label', # tag列名
num_epochs=1, # iteration次数
shuffle=True # is 否打乱data
)
# 打印data集元素
for features, label in csv_dataset:
print(f"特征: {features}")
print(f"tag: {label}")
print()
2.2.2 from graph像filecreation
# from graph像filecreationdata集
# 首先获取所 has graph像filepath
import os
# fake设我们 has 一个package含graph像 Table of Contents, Table of Contentsstructure for :
# images/
# cat/
# cat1.jpg
# cat2.jpg
# dog/
# dog1.jpg
# dog2.jpg
image_dir = 'images/'
class_names = ['cat', 'dog']
# 获取所 has graph像filepath and tag
file_paths = []
labels = []
for class_name in class_names:
class_dir = os.path.join(image_dir, class_name)
for filename in os.listdir(class_dir):
if filename.endswith('.jpg') or filename.endswith('.png'):
file_paths.append(os.path.join(class_dir, filename))
labels.append(class_names.index(class_name))
# creationdata集
file_dataset = tf.data.Dataset.from_tensor_slices(file_paths)
label_dataset = tf.data.Dataset.from_tensor_slices(labels)
# mergedata集
dataset = tf.data.Dataset.zip((file_dataset, label_dataset))
# 定义graph像加载function
def load_image(file_path, label):
# 读取graph像file
image = tf.io.read_file(file_path)
# 解码JPEGgraph像
image = tf.image.decode_jpeg(image, channels=3)
# 调整graph像 big small
image = tf.image.resize(image, [224, 224])
# 归一化graph像
image = image / 255.0
return image, label
# mapgraph像加载function
dataset = dataset.map(load_image)
# 打印data集元素
for image, label in dataset:
print(f"graph像形状: {image.shape}, tag: {label.numpy()}")
break
3. data集operation
tf.data APIproviding了 many 种operation来processingdata集:
3.1 打乱data
usingshuffle()method打乱data:
# 打乱data
dataset = dataset.shuffle(buffer_size=100) # buffer_size is 缓冲区 big small
提示
buffer_sizeparameter表示用于打乱 缓冲区 big small . for 于 big 型data集, 设置较 big 缓冲区 big small 可以获得更 good 打乱效果, 但会占用更 many memory.
3.2 批processing
usingbatch()method将data分成批次:
# 批processing
dataset = dataset.batch(batch_size=32)
3.3 重复data集
usingrepeat()method重复data集, 用于 many 轮训练:
# 重复data集
dataset = dataset.repeat(num_epochs) # num_epochs is 重复次数, None表示无限重复
3.4 mapfunction
usingmap()method将functionapplication于data集 每个元素:
# 定义预processingfunction
def preprocess(x, y):
x = tf.cast(x, tf.float32) # 转换 for float32class型
x = x / 255.0 # 归一化
y = tf.cast(y, tf.float32) # 转换 for float32class型
return x, y
# application预processingfunction
dataset = dataset.map(preprocess)
3.5 cachedata
usingcache()methodcachedata, reducing重复计算:
# cachedata
dataset = dataset.cache() # cache to memory
# or cache to file
dataset = dataset.cache('cache_file')
3.6 跳过 and 获取data
usingskip() and take()method跳过 or 获取指定数量 元素:
# 跳过 before 2个元素
dataset_skip = dataset.skip(2)
# 获取 before 5个元素
dataset_take = dataset.take(5)
3.7 parallelprocessing
usingnum_parallel_callsparameterimplementationparallelprocessing:
# parallelmap
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
# parallel批processing
dataset = dataset.batch(batch_size=32, num_parallel_calls=tf.data.AUTOTUNE)
tf.data.AUTOTUNE表示自动选择最佳parallel度.
4. 构建 high 效 输入pipeline
构建 high 效 输入pipeline for 于improvingmodel训练速度至关 important . 以 under is 一些best practices:
4.1 pipeline顺序
正确 pipeline顺序应该 is :
# 正确 pipeline顺序
dataset = tf.data.Dataset.from_tensor_slices((X, y))
dataset = dataset.shuffle(buffer_size=1000)
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.cache()
dataset = dataset.batch(batch_size=32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
4.2 usingprefetch
usingprefetch()method in 训练model 同时预取 under 一批data, reducingI/Oetc.待时间:
# 预取data
dataset = dataset.prefetch(tf.data.AUTOTUNE)
4.3 usingtf.data.AUTOTUNE
usingtf.data.AUTOTUNE自动调整parallel度 and 预取缓冲区 big small :
# usingAUTOTUNE
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.batch(batch_size=32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
5. processing big 型data集
for 于 big 型data集, 我们可以using以 under techniques来improvingprocessingefficiency:
5.1 memorymap
for 于 big 型file, 可以usingmemorymap来reducingI/Ooperation:
# usingmemorymap
def load_large_file(file_path):
# memorymapfile
mmapped_file = np.memmap(file_path, dtype=np.float32, mode='r')
# 转换 for TensorFlow张量
tensor = tf.convert_to_tensor(mmapped_file)
return tensor
5.2 增量加载
for 于无法一次性加载 to memory big 型data集, 可以using增量加载:
# 增量加载example
def generate_data():
# 生成data 逻辑
for i in range(10000):
yield np.random.rand(10), np.random.randint(0, 2)
# from 生成器creationdata集
dataset = tf.data.Dataset.from_generator(
generate_data, # 生成器function
output_signature=( # 输出signature
tf.TensorSpec(shape=(10,), dtype=tf.float32),
tf.TensorSpec(shape=(), dtype=tf.int32)
)
)
6. data增强
data增强 is a用于增加训练data many 样性 techniques, 可以improvingmodel 泛化capacity. tf.data API可以 and tf.imagemodule结合using, implementationgraph像data增强.
6.1 graph像data增强
# graph像data增强function
def augment_image(image, label):
# 随机翻转graph像
image = tf.image.random_flip_left_right(image)
image = tf.image.random_flip_up_down(image)
# 随机旋转graph像
image = tf.image.rot90(image, k=tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32))
# 随机调整亮度, for 比度, 饱 and 度
image = tf.image.random_brightness(image, max_delta=0.1)
image = tf.image.random_contrast(image, lower=0.8, upper=1.2)
image = tf.image.random_saturation(image, lower=0.8, upper=1.2)
# 随机裁剪graph像
image = tf.image.random_crop(image, size=[224, 224, 3])
# 归一化graph像
image = image / 255.0
return image, label
# applicationdata增强
dataset = dataset.map(augment_image, num_parallel_calls=tf.data.AUTOTUNE)
6.2 文本data增强
# 文本data增强example
def augment_text(text, label):
# 随机插入空格
def insert_spaces(text):
text = tf.strings.regex_replace(text, r'(.)', r'\1 ')
text = tf.strings.strip(text)
return text
# 随机replace字符
def replace_chars(text):
# simple replaceexample, practicalapplicationin可以using更 complex replace策略
text = tf.strings.regex_replace(text, r'a', r'@')
text = tf.strings.regex_replace(text, r'e', r'3')
return text
# 随机选择增强方式
if tf.random.uniform(shape=()) > 0.5:
text = insert_spaces(text)
if tf.random.uniform(shape=()) > 0.5:
text = replace_chars(text)
return text, label
# application文本data增强
text_dataset = text_dataset.map(augment_text, num_parallel_calls=tf.data.AUTOTUNE)
7. processing不同class型 data
tf.data APIsupportprocessingvarious types ofdata:
7.1 数值data
# 数值dataprocessing
# creation数值data集
X = tf.random.normal(shape=(100, 10))
y = tf.random.uniform(shape=(100,), minval=0, maxval=2, dtype=tf.int32)
# creationdata集
dataset = tf.data.Dataset.from_tensor_slices((X, y))
# 预processing
def preprocess_numeric(x, y):
x = tf.cast(x, tf.float32)
y = tf.one_hot(y, depth=2) # 独热编码
return x, y
dataset = dataset.map(preprocess_numeric)
7.2 文本data
# 文本dataprocessing
# fake设我们 has 一个文本list and for 应 tag
texts = ["这 is a good message", "今天天气很 good ", "我喜欢programming", "TensorFlow很强 big "]
labels = [1, 1, 0, 0]
# creationdata集
dataset = tf.data.Dataset.from_tensor_slices((texts, labels))
# 文本预processing
def preprocess_text(text, label):
# 分词 (这里using simple 空格分词, practicalapplicationin可以using更 complex 分词method)
words = tf.strings.split(text)
# 将词转换 for 词ID (这里using simple map, practicalapplicationin应该using预训练 词嵌入)
word_ids = tf.map_fn(lambda word: tf.strings.to_hash_bucket(word, num_buckets=1000), words, dtype=tf.int64)
# 填充 or 截断序列
word_ids = tf.pad(word_ids, [[0, 10 - tf.shape(word_ids)[0]]]) # 填充 to long 度10
word_ids = word_ids[:10] # 截断 to long 度10
return word_ids, label
dataset = dataset.map(preprocess_text)
7.3 时间序列data
# 时间序列dataprocessing
# creation时间序列data
# fake设我们 has 100个时间步, 每个时间步 has 3个特征
time_series_data = tf.random.normal(shape=(100, 3))
# creation滑动窗口data集
def create_sliding_window_dataset(data, window_size=10, batch_size=32):
# creation滑动窗口
dataset = tf.data.Dataset.from_tensor_slices(data)
dataset = dataset.window(window_size + 1, shift=1, drop_remainder=True)
# 展平窗口
dataset = dataset.flat_map(lambda window: window.batch(window_size + 1))
# 分割特征 and tag
dataset = dataset.map(lambda window: (window[:-1], window[-1]))
# 打乱 and 批processing
dataset = dataset.shuffle(buffer_size=1000)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
return dataset
# creation滑动窗口data集
dataset = create_sliding_window_dataset(time_series_data, window_size=10, batch_size=32)
8. performanceoptimization
以 under is 一些optimizationtf.dataperformance techniques:
8.1 usingtf.data.AUTOTUNE
usingtf.data.AUTOTUNE自动调整parallel度 and 预取缓冲区 big small , 可以根据systemresource自动optimizationperformance.
8.2 cachedata
for 于重复using data集, usingcache()methodcachedata, 可以reducingI/Ooperation and 重复计算.
8.3 parallelprocessing
usingnum_parallel_callsparameterparallelprocessingdata, 可以充分利用 many 核CPU.
8.4 预取data
usingprefetch()method预取 under 一批data, 可以隐藏I/Olatency, improving训练速度.
8.5 批processing big small 调优
调整批processing big small 可以影响训练速度 and modelperformance. 较 big 批processing big small 可以improvingGPU利用率, 但会占用更 many memory, 并且可能导致model泛化capacity under 降.
8.6 reducingdatacopy
尽量reducingdatacopy, usingfrom_tensor_slices()而不 is from_tensors(), 因 for from_tensor_slices()不会copydata.
8.7 usingtf.function
using@tf.function装饰器将Pythonfunction转换 for TensorFlowgraph, 可以improvingfunction执行efficiency.
# usingtf.functionoptimization预processingfunction
@tf.function
def preprocess(x, y):
x = tf.cast(x, tf.float32)
x = x / 255.0
y = tf.one_hot(y, depth=2)
return x, y
9. 练习
练习 1: 构建graph像classification输入pipeline
- creation一个package含猫 and 狗graph像 Table of Contentsstructure.
- usingtf.data APIcreationgraph像classificationdata集.
- implementationdata加载 and 预processingfunction.
- 添加data增强operation.
- 构建 high 效 输入pipeline, including打乱, 批processing, cache and 预取.
- test输入pipeline performance.
练习 2: 时间序列预测
- 生成 or 加载时间序列data.
- usingtf.data APIcreation滑动窗口data集.
- 构建时间序列预测model.
- 训练model并assessmentperformance.
练习 3: 文本classification
- 加载文本classificationdata集.
- implementation文本预processingfunction, including分词, 词嵌入etc..
- usingtf.data API构建输入pipeline.
- 构建 and 训练文本classificationmodel.