我正在讀取一堆檔案(超過七千個)作為資料框。所有檔案都在同一個父檔案夾中,子目錄有組織且一致。這些檔案目前是按時間戳組織的。我想讀入檔案,然后將檔案匯出到不同的檔案夾,其中每個檔案都是一個播放器 ID。同一輸入資料幀中的單個玩家可能有多個時間戳。有時玩家 ID 根本不會出現在輸入資料框中。我已經弄清楚了資料整理(非常簡單),但是由于每個檔案大約有 150 萬行,因此單個檔案需要大約 5 個小時。所以我不能簡單地遍歷所有七千個檔案。我想通過輸入檔案并行化(盡管通過輸出檔案并行化可能更好?)。我將在具有足夠 CPU 的 HPC 上運行它,但我不會 在使用 HPC 之前不需要指定我的 CPU 要求。我知道 doParallel 包存在,但是教程介紹vignette("gettingstartedParallel")沒有用,我不明白其他 doParallel 帖子。(請不要只是在沒有相關代碼的情況下向我推薦 doParallel 包。)我還擔心代碼會在嘗試多次寫入同一個 csv 時崩潰。CSV 無法并行寫入,即使我設定了append = TRUE. 這是我如何讀入檔案以及如何將檔案寫入新檔案夾的代碼。
# Example input data frames (in the real code I create a vector of Alltimes using list.files() )
times1 <- data.frame(
ID = c('PL1', 'PL2', 'PL3', 'PL2','PL1'),
times = c(42.6, 41.5, 42.9, 47.0, 44.3),
speed = c(64, 66, 43, 39, 55)
)
times2 <- data.frame(
ID = c('PL3', 'PL3', 'PL3', 'PL1','PL1'),
times = c(62.1, 51.7, 65.9, 62.1, 55.3),
speed = c(71, 73, 45, 64, 66)
)
# Create vector of all parquets filepaths
Alltimes <- list.files(path = 'Input_Folder_Path)',
pattern = '*.snappy.parquet$',
recursive = TRUE,
full.names = TRUE)
# Iterate through timestamp input files (I want this part parallelized instead of a loop)
# for( i in 1:length(Alltimes)){
# Read in the individual file
# when the Alltimes vector is the file path I use read_parquet( Alltimes[i] ), but
# times1 is a substitute for this example.
df = times1
# df = times2
# df = read_parquet( Alltimes[i] )
# get vector of all player ids in this data frame
all_ids_vec <- unique(x = df$ID)
# write out individual csv for each player ID
for(j in 1:length(all_ids_vec)){
# Subset the df by that specific player ID
one_player <- df %>% filter(ID == all_ids_vec[j])
write.table(x = one_player,
file = "C:/Users/Juliet/Desktop/", all_ids_vec[j],".csv",
append = TRUE,
quote = FALSE,
sep = ",",
row.names = FALSE,
col.names = FALSE)
}
# }
uj5u.com熱心網友回復:
要將檔案并行寫入磁盤,以下代碼適用于 Windows。請注意,該軟體包doParallel不適用于 Windows。
library(parallel)
dirname <- "C:/Users/611913/Desktop"
#dirname <- path.expand("~/tmp/so")
sp <- split(df, df$ID)
id_vec <- names(sp)
f <- function(X, filename, path){
filename <- file.path(path, filename)
write.table(x = X,
file = filename,
append = TRUE,
quote = FALSE,
sep = ",",
row.names = FALSE,
col.names = FALSE)
}
# Windows
ncores <- detectCores()
cl <- makeCluster(ncores - 1L)
clusterExport(cl, "sp")
clusterExport(cl, "id_vec")
clusterExport(cl, "dirname")
clusterExport(cl, "f")
clusterEvalQ(cl, "sp")
clusterEvalQ(cl, "id_vec")
clusterEvalQ(cl, "dirname")
clusterEvalQ(cl, "f")
res <- parLapply(cl, seq_along(sp), function(j){
f(sp[[j]], id_vec[j], dirname)
})
stopCluster(cl)
測驗資料
此代碼創建測驗資料。
n <- 1000L
ID <- sprintf("idd", seq_len(n))
ID <- rep(ID, 3)
df <- data.frame(ID, x = rnorm(3*n), y = sample(10, 3*n, TRUE))
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/329525.html
