我正在為一個專案訓練一個多任務轉換器,并且想將我的資料結構切換到 TFRecords,因為我的訓練受到動態資料生成的瓶頸。我目前正在將單個資料樣本構建為張量字典,如下所示:
{'continuous_input': tf.Tensor(), 'categorical_input': tf.Tensor(), 'continuous_output': tf.Tensor(), 'categorical_output': tf.Tensor()}
在一個樣本中,這 4 個張量的長度相同,但在樣本之間,這些張量的長度不同。這兩個continuous_張量是 tf.float32,而這兩個categorical_張量是 tf.int32。這些張量的更明確的細節在下面的代碼中。
我認為我已經以正確的格式(位元組字串)成功地將我的資料寫入了 TFRecords。
問題陳述:我無法弄清楚如何將這些 TFRecords 讀回記憶體并將位元組字串決議到上面的張量結構字典中。我在下面包含一個完全可重現的問題示例,它使用 Numpy v1.23.4 和 Tensorflow v2.10.0。它使用上述字典結構創建假資料,將 TFRecords 保存到您的作業目錄,重新加載這些 TFRecords 并嘗試使用我的函式決議它們parse_tfrecord_fn()。我知道問題出在parse_tfrecord_fn()但我不知道tf.io解決此問題的適當工具。
可重現的例子:
import os
import os.path as op
import numpy as np
import tensorflow as tf
# Helper functions for writing TFRecords
def _tensor_feature(value):
serialized_nonscalar = tf.io.serialize_tensor(value)
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[serialized_nonscalar.numpy()]))
def create_example(sample):
feature = {
"continuous_input": _tensor_feature(sample['continuous_input']),
"categorical_input": _tensor_feature(sample['categorical_input']),
"continuous_output": _tensor_feature(sample['continuous_output']),
"categorical_output": _tensor_feature(sample['categorical_output']),
}
return tf.train.Example(features=tf.train.Features(feature=feature)).SerializeToString()
# Helper functions for reading/preparing TFRecord data
def parse_tfrecord_fn(example):
feature_description = {
"continuous_input": tf.io.VarLenFeature(tf.string),
"categorical_input": tf.io.VarLenFeature(tf.string),
"continuous_output": tf.io.VarLenFeature(tf.string),
"categorical_output": tf.io.VarLenFeature(tf.string)
}
example = tf.io.parse_single_example(example, feature_description)
# TODO: WHAT GOES HERE?
return example
def get_dataset(filenames, batch_size):
dataset = (
tf.data.TFRecordDataset(filenames, num_parallel_reads=tf.data.AUTOTUNE)
.map(parse_tfrecord_fn, num_parallel_calls=tf.data.AUTOTUNE)
.shuffle(batch_size * 10)
.batch(batch_size)
.prefetch(tf.data.AUTOTUNE)
)
return dataset
# Make fake data
num_samples_per_tfrecord = 100
num_train_samples = 1600
num_tfrecords = num_train_samples // num_samples_per_tfrecord
fake_sequence_lengths = np.random.randint(3, 35, num_train_samples)
fake_data = []
for i in range(num_train_samples):
seq_len = fake_sequence_lengths[i]
fake_data.append({'continuous_input': tf.random.uniform([seq_len], minval=0, maxval=1, dtype=tf.float32),
'categorical_input': tf.random.uniform([seq_len], minval=0, maxval=530, dtype=tf.int32),
'continuous_output': tf.fill(seq_len, -1.0),
'categorical_output': tf.fill(seq_len, -1)})
tfrecords_dir = './tfrecords'
if not op.exists(tfrecords_dir):
os.makedirs(tfrecords_dir) # create TFRecords output folder
# Write fake data to tfrecord files
for tfrec_num in range(num_tfrecords):
samples = fake_data[(tfrec_num * num_samples_per_tfrecord): ((tfrec_num 1) * num_samples_per_tfrecord)]
with tf.io.TFRecordWriter(tfrecords_dir "/file_%.2i.tfrec" % tfrec_num) as writer:
for sample in samples:
example = create_example(sample)
writer.write(example)
# (Try to) Load all the TFRecord data into a (parsed) tf dataset
train_filenames = tf.io.gfile.glob(f"{tfrecords_dir}/*.tfrec")
# Problem: the line below doesn't return the original tensors of fake_data, because my parse_tfrecord_fn is wrong
# Question: What must I add to parse_tfrecord_fn to give this the desired behavior?
dataset = get_dataset(train_filenames, batch_size=32)
# For ease of debugging parse_tfrecord_fn():
dataset = tf.data.TFRecordDataset(train_filenames, num_parallel_reads=tf.data.AUTOTUNE)
element = dataset.take(1).get_single_element()
parse_tfrecord_fn(element) # set your breakpoint here, then can step through parse_tfrecord_fn()
該函式parse_tfrecord_fn()接受一個位元組串作為輸入,如下所示:
示例 = "b'\n\xb4\x03\nj\n\x10continuous_input\x12V\nT\nR\x08\x01\x12\x04\x12\x02\x08\x12"H..."
命令example = tf.io.parse_single_example(example, feature_description),其中引數在我的可重現示例中定義,回傳SparseTensors具有所需 4 個鍵('continuous_input'、'categorical_input'等)的字典。但是,這些 SparseTensor 的值要么不存在,要么我無法訪問,因此我無法提取和決議它們,例如 with tf.io.parse_tensor(example['continuous_input'].values.numpy().tolist()[0], out_type=tf.float32)。
uj5u.com熱心網友回復:
我解決了這個問題,我最初的懷疑是正確的——這是決議器函式中需要的一個簡單更改,parse_tfrecord_fn. 我在下面包含了完整的作業代碼,對于任何可能有助于前進的人。我對用于撰寫 TFRecords 的輔助函式進行了微小的修改,以匹配常見的設計模式。實質性變化發生在parse_tfrecord_fn.
主要見解:
在
tf.io.FixedLenFeature([], tf.string)決議任何最初序列化為bytes_list. 這里的直覺是,盡管bytes_list字串的長度可能因物件而異,但它仍然只是 1 string,并且“1”是使其成為固定長度特征的原因。使用 撤消
bytes_list張量的序列化,使用引數tf.io.parse_tensor()指定張量的原始 dtypeout_type。
- 請注意,如果您用于
tf.io.VarLenFeature決議 TFRecord,這將不起作用,因為這將回傳某種SparseTensor我無法反序列化/決議的內容。
結合這兩個見解,正確的流程如下:
- 將 TFRecord 決議回其字典形式,將原始鍵和序列化(即未決議)張量作為值。
- 然后決議該字典中的各個張量。
import os
import os.path as op
import numpy as np
import tensorflow as tf
# Helper functions for writing TFRecords
def _bytes_feature(value):
"""Returns a bytes_list from a string / byte."""
# If the value is an eager tensor BytesList won't unpack a string from an EagerTensor.
if isinstance(value, type(tf.constant(0))):
value = value.numpy()
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def create_example(sample):
feature = {
"continuous_input": _bytes_feature(tf.io.serialize_tensor(sample['continuous_input'])),
"categorical_input": _bytes_feature(tf.io.serialize_tensor(sample['categorical_input'])),
"continuous_output": _bytes_feature(tf.io.serialize_tensor(sample['continuous_output'])),
"categorical_output": _bytes_feature(tf.io.serialize_tensor(sample['categorical_output'])),
}
return tf.train.Example(features=tf.train.Features(feature=feature)).SerializeToString()
# Helper functions for reading/preparing TFRecord data
def parse_tfrecord_fn(example_to_parse):
feature_description = {
"continuous_input": tf.io.FixedLenFeature([], tf.string),
"categorical_input": tf.io.FixedLenFeature([], tf.string),
"continuous_output": tf.io.FixedLenFeature([], tf.string),
"categorical_output": tf.io.FixedLenFeature([], tf.string)
}
parsed_example = tf.io.parse_single_example(example_to_parse, feature_description)
return {'continuous_input': tf.io.parse_tensor(parsed_example['continuous_input'], out_type=tf.float32),
'categorical_input': tf.io.parse_tensor(parsed_example['categorical_input'], out_type=tf.int32),
'continuous_output': tf.io.parse_tensor(parsed_example['continuous_output'], out_type=tf.float32),
'categorical_output': tf.io.parse_tensor(parsed_example['categorical_output'], out_type=tf.int32)}
def get_dataset(filenames, batch_size):
dataset = (
tf.data.TFRecordDataset(filenames, num_parallel_reads=tf.data.AUTOTUNE)
.map(parse_tfrecord_fn, num_parallel_calls=tf.data.AUTOTUNE)
.shuffle(batch_size * 10)
.padded_batch(batch_size=batch_size,
padding_values={'categorical_input': 0, 'continuous_input': 0.0,
'categorical_output': -1,
'continuous_output': -1.0},
padded_shapes={'categorical_input': [None], 'continuous_input': [None],
'categorical_output': [None],
'continuous_output': [None]},
drop_remainder=True)
.prefetch(tf.data.AUTOTUNE)
)
return dataset
# Make fake data
num_samples_per_tfrecord = 100
num_train_samples = 1600
num_tfrecords = num_train_samples // num_samples_per_tfrecord
fake_sequence_lengths = np.random.randint(3, 35, num_train_samples)
fake_data = []
for i in range(num_train_samples):
seq_len = fake_sequence_lengths[i]
fake_data.append({"continuous_input": tf.random.uniform([seq_len], minval=0, maxval=1, dtype=tf.float32),
"categorical_input": tf.random.uniform([seq_len], minval=0, maxval=530, dtype=tf.int32),
"continuous_output": tf.fill(seq_len, -1.0),
"categorical_output": tf.fill(seq_len, -1)})
tfrecords_dir = './tfrecords'
if not op.exists(tfrecords_dir):
os.makedirs(tfrecords_dir) # create TFRecords output folder
# Write fake data to tfrecord files
for tfrec_num in range(num_tfrecords):
samples = fake_data[(tfrec_num * num_samples_per_tfrecord): ((tfrec_num 1) * num_samples_per_tfrecord)]
with tf.io.TFRecordWriter(tfrecords_dir "/file_%.2i.tfrec" % tfrec_num) as writer:
for sample in samples:
example = create_example(sample)
writer.write(example)
# Load all the TFRecord data into a (parsed) tf dataset
train_filenames = tf.io.gfile.glob(f"{tfrecords_dir}/*.tfrec")
# The line below works now!
dataset = get_dataset(train_filenames, batch_size=32)
for el in dataset:
successful_element = el
break
print(successful_element)
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/531700.html
標籤:张量流记录
