我正在創建一個 RNN 模型來處理一定長度(10 幀)的視頻。每個視頻在其各自的檔案夾中存盤為多個影像(長度不同)。然而,在將這批幀傳遞給 RNN 模型之前,我正在使用 ResNet 特征提取器對每個幀的影像進行預處理。我正在使用自定義資料生成器來獲取包含影像的檔案夾路徑,預處理影像,然后將其傳遞給模型。
在沒有資料生成器的情況下,我一直很笨拙地這樣做,但這并不實用,因為我有一個超過 10,000 個視頻的訓練集,并且后來還希望執行資料增強。
這是我的自定義資料生成器的代碼
class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, list_IDs, labels, video_paths,
batch_size=32, video_length=10, dim=(224,224),
n_channels=3, n_classes=4, IMG_SIZE = 224, MAX_SEQ_LENGTH = 10,
NUM_FEATURES = 2048, shuffle=True):
'Initialization'
self.list_IDs = list_IDs
self.labels = labels
self.video_paths = video_paths
self.batch_size = batch_size
self.dim = dim
self.video_length = video_length
self.n_channels = n_channels
self.n_classes = n_classes
self.IMG_SIZE = IMG_SIZE
self.MAX_SEQ_LENGTH = MAX_SEQ_LENGTH
self.NUM_FEATURES = NUM_FEATURES
self.shuffle = shuffle
self.on_epoch_end()
def crop_center_square(frame):
y, x = frame.shape[0:2]
min_dim = min(y, x)
start_x = (x // 2) - (min_dim // 2)
start_y = (y // 2) - (min_dim // 2)
return frame[start_y : start_y min_dim, start_x : start_x min_dim]
def load_series(self, videopath):
frames = []
image_paths = [os.path.join(videopath, o) for o in os.listdir(videopath)]
frame_num = np.linspace(0,len(image_paths)-1, num=10)
frame_num = frame_num.astype(int)
resize=(self.IMG_SIZE, self.IMG_SIZE)
# resize=(IMG_SIZE, IMG_SIZE)
for ix in frame_num:
image = Image.open(image_paths[ix])
im_array = np.asarray(image)
im_array = self.crop_center_square(im_array)
# im_array = crop_center_square(im_array)
im_array = cv2.resize(im_array, resize)
stacked_im_array = np.stack((im_array,)*3, axis=-1)
frames.append(stacked_im_array)
# plt.imshow(stacked_im_array)
# plt.show()
return np.array(frames)
def build_feature_extractor(self):
feature_extractor = keras.applications.resnet_v2.ResNet152V2(
weights="imagenet",
include_top=False,
pooling="avg",
input_shape=(self.IMG_SIZE, self.IMG_SIZE, 3),
)
preprocess_input = keras.applications.resnet_v2.preprocess_input
inputs = keras.Input((self.IMG_SIZE, self.IMG_SIZE, 3))
preprocessed = preprocess_input(inputs)
outputs = feature_extractor(preprocessed)
return keras.Model(inputs, outputs, name="feature_extractor")
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
indexes = self.indexes[index*self.batch_size: (index 1)*self.batch_size]
# Find list of IDs
list_IDs_temp = [self.list_IDs[k] for k in indexes]
# Generate data
[frame_features, frame_masks], frame_labels = self._generate_X(list_IDs_temp)
return [frame_features, frame_masks], frame_labels
def _generate_X(self, list_IDs_temp):
'Generates data containing batch_size videos'
# Initialization
frame_masks = np.zeros(shape=(self.batch_size, self.MAX_SEQ_LENGTH), dtype="bool")
frame_features = np.zeros(shape=(self.batch_size, self.MAX_SEQ_LENGTH, self.NUM_FEATURES), dtype="float32")
frame_labels = np.zeros(shape=(self.batch_size), dtype="int")
feature_extractor = self.build_feature_extractor()
tt = time.time()
# frame_masks = np.zeros(shape=(batch_size, MAX_SEQ_LENGTH), dtype="bool")
# frame_features = np.zeros(shape=(batch_size, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")
# frame_labels = np.zeros(shape=(batch_size), dtype="int")
for idx, ID in enumerate(list_IDs_temp):
videopath = self.video_paths[ID]
# videopath = video_paths[ID]
video_frame_label = self.labels[ID]
# Gather all its frames and add a batch dimension.
frames = self.load_series(Path(videopath))
# frames = load_series(Path(videopath))
# At this point frames.shape = (10, 224, 224, 3)
frames = frames[None, ...]
# After this, frames.shape = (1, 10, 224, 224, 3)
# Initialize placeholders to store the masks and features of the current video.
temp_frame_mask = np.zeros(shape=(1, self.MAX_SEQ_LENGTH,), dtype="bool")
# temp_frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
# temp_frame_mask.shape = (1,60)
temp_frame_features = np.zeros(shape=(1, self.MAX_SEQ_LENGTH, self.NUM_FEATURES), dtype="float32")
# temp_frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")
# temp_frame_features.shape = (1, 60, 2048)
# Extract features from the frames of the current video.
for i, batch in enumerate(frames):
video_length = batch.shape[0]
length = min(self.MAX_SEQ_LENGTH, video_length)
# length = min(MAX_SEQ_LENGTH, video_length)
for j in range(length):
temp_frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
# temp_frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
temp_frame_mask[i, :length] = 1 # 1 = not masked, 0 = masked
frame_features[idx,] = temp_frame_features.squeeze()
frame_masks[idx,] = temp_frame_mask.squeeze()
frame_labels[idx] = video_frame_label
tf = time.time() - tt
print(f'Pre-process length: {tf}')
return [frame_features, frame_masks], frame_labels
def on_epoch_end(self):
'Updates indexes after each epoch'
self.indexes = np.arange(len(self.list_IDs))
if self.shuffle == True:
np.random.shuffle(self.indexes)
這是RNN模型的代碼
label_processor = keras.layers.StringLookup(num_oov_indices=0, vocabulary=np.unique(train_df["view"]))
print(label_processor.get_vocabulary())
train_list_IDs = train_df.index
train_labels = train_df["view"].values
train_labels = label_processor(train_labels[..., None]).numpy()
train_video_paths = train_df['series']
training_generator = DataGenerator(train_list_IDs, train_labels, train_video_paths)
test_list_IDs = test_df.index
test_labels = test_df["view"].values
test_labels = label_processor(test_labels[..., None]).numpy()
test_video_paths = test_df['series']
testing_generator = DataGenerator(test_list_IDs, test_labels, test_video_paths)
# Utility for our sequence model.
def get_sequence_model():
class_vocab = label_processor.get_vocabulary()
frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")
# Refer to the following tutorial to understand the significance of using `mask`:
# https://keras.io/api/layers/recurrent_layers/gru/
x = keras.layers.GRU(16, return_sequences=True)(
frame_features_input, mask=mask_input
)
x = keras.layers.GRU(8)(x)
x = keras.layers.Dropout(0.4)(x)
x = keras.layers.Dense(8, activation="relu")(x)
output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)
rnn_model = keras.Model([frame_features_input, mask_input], output)
rnn_model.compile(
loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
)
return rnn_model
# Utility for running experiments.
def run_experiment():
now = datetime.now()
current_time = now.strftime("%d_%m_%Y_%H_%M_%S")
filepath = os.path.join(Path('F:/RNN'), f'RNN_ResNet_Model_{current_time}')
checkpoint = keras.callbacks.ModelCheckpoint(
filepath, save_weights_only=True, save_best_only=True, verbose=1
)
seq_model = get_sequence_model()
history = seq_model.fit(training_generator,
epochs=EPOCHS,
callbacks=[checkpoint],
)
seq_model.load_weights(filepath)
_, accuracy = seq_model.evaluate(testing_generator)
print(f"Test accuracy: {round(accuracy * 100, 2)}%")
return history, accuracy, seq_model
_, accuracy, sequence_model = run_experiment()
我正在努力弄清楚如何將自定義資料生成器的結果傳遞給我的 RNN 模型?如何最好地重寫我的代碼以使用 model.fit() 或 model.fit_generator()?
先感謝您!
uj5u.com熱心網友回復:
請在您的問題中具體說明您正在努力解決的問題。你期待不同的結果,你的代碼是慢還是你得到錯誤?根據您的代碼,我發現了一些問題,并建議進行以下調整:
__getitem__()每次從生成器中檢索一批資料時,都會呼叫 DataGenerator 中的函式。在您呼叫_generate_X()的該函式中,該函式還會在每次批量生成時再次初始化預訓練的 ResNet 特征提取器feature_extractor = self.build_feature_extractor()。這是非常低效的。
作為替代方案,我建議洗掉生成器類中的模型創建,而是在主筆記本中創建特征提取器并將其作為 DataGenerator 實體的引數:
在您的主檔案中:
def build_feature_extractor(self): [...]
feature_extractor = build_feature_extractor()
testing_generator = DataGenerator(test_list_IDs, test_labels, test_video_paths, feature_extractor)
對于生成器類:
class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, list_IDs, labels, video_paths, feature_extractor,
batch_size=32, video_length=10, dim=(224,224),
n_channels=3, n_classes=4, IMG_SIZE = 224, MAX_SEQ_LENGTH = 10,
NUM_FEATURES = 2048, shuffle=True):
'Initialization'
self.list_IDs = list_IDs
[...]
self.feature_extractor = feature_extractor [...]
然后調整到這個:
temp_frame_features[i, j, :] = self.feature_extractor.predict(batch[None, j, :])
您已在您的 中正確使用了生成器.fit call,使用model.fit(training_generator, ...)將為您的模型提供創建的批次__getitem__()。
uj5u.com熱心網友回復:
我得到的錯誤是
raise NotImplementedError keras
相當愚蠢的是,我忘記將以下函式放在 DataGenerator 函式中
def __len__(self):
'Denotes the number of batches per epoch'
return int(np.floor(len(self.list_IDs) / self.batch_size))
之后錯誤就消失了。
obsolete_hegemony 確實給了我一個很好的建議來優化我的代碼并分離特征提取預處理!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/459067.html
