我有一個.csv包含約 3GB 資料的檔案。我想讀取所有這些資料并進行處理。以下程式從檔案中讀取資料并將其存盤到std::vector<std::vector<std::string>>. 但是,程式運行時間過長,應用程式(vscode)凍結,需要重新啟動。我做錯了什么?
#include <algorithm>
#include <iostream>
#include <fstream>
#include "sheet.hpp"
extern std::vector<std::string> split(const std::string& str, const std::string& delim);
int main() {
Sheet sheet;
std::ifstream inputFile;
inputFile.open("C:/Users/1032359/cpp-projects/Straggler Job Analyzer/src/part-00001-of-00500.csv");
std::string line;
while(inputFile >> line) {
sheet.addRow(split(line, ","));
}
return 0;
}
// splitandSheet的成員函式已經過徹底測驗并且作業正常。split雖然復雜度為 N^2...
EDIT1:已根據評論中的建議修復了讀取的檔案。
拆分功能:
std::vector<std::string> split(const std::string& str, const std::string& delim) {
std::vector<std::string> vec_of_tokens;
std::string token;
for (auto character : str) {
if (std::find(delim.begin(), delim.end(), character) != delim.end()) {
vec_of_tokens.push_back(token);
token = "";
continue;
}
token = character;
}
vec_of_tokens.push_back(token);
return vec_of_tokens;
}
EDIT2:虛擬 csv 行:
5612000000,5700000000,4665712499,798,3349189123,0.02698,0.06714,0.07715,0.004219,0.004868,0.06726,7.915e-05,0.0003681,0.27,0.00293,3.285,0.008261,0,0,0.01608
limits:
field1: starting timestamp (nanosecs)
field2: ending timestamp (nanosecs)
field3: job id (<= 1,000,000)
field4: task id (<= 10,000)
field5: machine id (<= 30,000,000)
field6: CPU time (sorry, no clue)
field7-20: no idea, unused for the current stage, but needed for later stages.
EDIT3:所需的輸出
還記得 Excel 中的 .thenby 函式嗎?
這里的排序順序是首先在第 5 列(基于 1 的索引)排序,然后在第 3 列,最后在第 4 列;都在上升。
uj5u.com熱心網友回復:
我將首先定義一個類來攜帶有關一條記錄的資訊,并為流添加多載operator>>并operator<<幫助從流中讀取/寫入記錄。我也可能會添加一個助手來處理逗號分隔符。
首先,我使用的標題集:
#include <algorithm> // sort
#include <array> // array
#include <cstdint> // integer types
#include <filesystem> // filesystem
#include <fstream> // ifstream
#include <iostream> // cout
#include <iterator> // istream_iterator
#include <tuple> // tie
#include <vector> // vector
一個簡單的分隔符助手可能如下所示。如果分隔符在流中,它會丟棄 ( ignore()) 分隔符,failbit如果分隔符不存在,則在流上設定 。
template <char Char> struct delimiter {};
template <char Char> // read a delimiter
std::istream& operator>>(std::istream& is, const delimiter<Char>) {
if (is.peek() == Char) is.ignore();
else is.setstate(std::ios::failbit);
return is;
}
template <char Char> // write a delimiter
std::ostream& operator<<(std::ostream& os, const delimiter<Char>) {
return os.put(Char);
}
record使用您提供的資訊,實際的類可以如下所示:
struct record {
uint64_t start; // ns
uint64_t end; // ns
uint32_t job_id; // [0,1000000]
uint16_t task_id; // [0,10000]
uint32_t machine_id; // [0,30000000]
double cpu_time;
std::array<double, 20 - 6> unknown;
};
然后可以像這樣使用delimiter類模板從流中讀取這樣的記錄(實體化為使用逗號和換行符作為分隔符):
std::istream& operator>>(std::istream& is, record& r) {
delimiter<','> del;
delimiter<'\n'> nl;
// first read the named fields
if (is >> r.start >> del >> r.end >> del >> r.job_id >> del >>
r.task_id >> del >> r.machine_id >> del >> r.cpu_time)
{
// then read the unnamed fields:
for (auto& unk : r.unknown) is >> del >> unk;
}
return is >> nl;
}
寫入記錄類似地通過以下方式完成:
std::ostream& operator<<(std::ostream& os, const record& r) {
delimiter<','> del;
delimiter<'\n'> nl;
os <<
r.start << del <<
r.end << del <<
r.job_id << del <<
r.task_id << del <<
r.machine_id << del <<
r.cpu_time;
for(auto&& unk : r.unknown) os << del << unk;
return os << nl;
}
將整個檔案讀入記憶體,對其進行排序,然后列印結果:
int main() {
std::filesystem::path filename = "C:/Users/1032359/cpp-projects/"
"Straggler Job Analyzer/src/part-00001-of-00500.csv";
std::vector<record> records;
// Reserve space for "3GB" / 158 (the length of a record some extra bytes)
// records. Increase the 160 below if your records are actually longer on average:
records.reserve(std::filesystem::file_size(filename) / 160);
// open the file
std::ifstream inputFile(filename);
// copy everything from the file into `records`
std::copy(std::istream_iterator<record>(inputFile),
std::istream_iterator<record>{},
std::back_inserter(records));
// sort on columns 5-3-4 (ascending)
auto sorter = [](const record& lhs, const record& rhs) {
return std::tie(lhs.machine_id, lhs.job_id, lhs.task_id) <
std::tie(rhs.machine_id, rhs.job_id, rhs.task_id);
};
std::sort(records.begin(), records.end(), sorter);
// display the result
for(auto& r : records) std::cout << r;
}
在我的帶有旋轉磁盤的舊計算機上,上述程序大約需要 2 分鐘。如果這太慢,我會測量長時間運行的部分的時間:
reservecopysort
然后,您可能可以使用該資訊來嘗試找出需要改進的地方。例如,如果排序有點慢,使用 astd::vector<double>而不是 astd::array<double, 20-6>來存盤未命名的欄位可能會有所幫助:
struct record {
record() : unknown(20-6) {}
uint64_t start; // ns
uint64_t end; // ns
uint32_t job_id; // [0,1000000]
uint16_t task_id; // [0,10000]
uint32_t machine_id; // [0,30000000]
double cpu_time;
std::vector<double> unknown;
};
uj5u.com熱心網友回復:
我會建議一種稍微不同的方法:
- 不要決議整行,只提取用于排序的欄位
- 請注意,您宣告的范圍需要少量位,它們一起適合一個 64 位值:
30,000,000 - 25 位
10,000 - 14 位
1,000,000 - 20 位
- 在您的矢量中保存一個“原始”源,以便您可以根據需要將其寫出來。
這是我得到的:
#include <fstream>
#include <iostream>
#include <string>
#include <vector>
#include <chrono>
#include <algorithm>
struct Record {
uint64_t key;
std::string str;
Record(uint64_t key, std::string&& str)
: key(key)
, str(std::move(str))
{}
};
int main()
{
auto t1 = std::chrono::high_resolution_clock::now();
std::ifstream src("data.csv");
std::vector<Record> v;
std::string str;
uint64_t key(0);
while (src >> str)
{
size_t pos = str.find(',') 1;
pos = str.find(',', pos) 1;
char* p(nullptr);
uint64_t f3 = strtoull(&str[pos], &p, 10);
uint64_t f4 = strtoull( p, &p, 10);
uint64_t f5 = strtoull( p, &p, 10);
key = f5 << 34;
key |= f3 << 14;
key |= f4;
v.emplace_back(key, std::move(str));
}
std::sort(v.begin(), v.end(), [](const Record& a, const Record& b) {
return a.key < b.key;
});
auto t2 = std::chrono::high_resolution_clock::now();
std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count() << std::endl;
std::ofstream out("out.csv");
for (const auto& r : v) {
out.write(r.str.c_str(), r.str.length());
out.write("\n", 1);
}
auto t3 = std::chrono::high_resolution_clock::now();
std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(t3 - t2).count() << std::endl;
}
當然,您可以reserve預先在向量中留出空間以避免重新分配。
我已經生成了一個包含 18,000,000 條記錄的檔案。我的時間顯示讀取/排序檔案約 30 秒,寫入輸出約 200 秒。
更新:
用 替換流式傳輸out.write(),將寫入時間從 200 秒減少到 17 秒!
uj5u.com熱心網友回復:
作為解決此問題的另一種方法,我建議不要讀取記憶體中的所有資料,而是使用最少的 RAM 來對巨大的 CSV 檔案進行排序:a std::vectorof line offsets。
重要的是理解概念,而不是精確的實作。
由于實作只需要每行 8 個位元組(在 64 位模式下),要對 3 GB 的資料檔案進行排序,我們只需要大約 150 MB 的 RAM。缺點是同一行需要多次決議數字,大致log2(17e6)= 24次。但是,我認為這種開銷可以通過使用更少的記憶體來部分補償,并且不需要決議行的所有數字。
#include <Windows.h>
#include <cstdint>
#include <vector>
#include <algorithm>
#include <array>
#include <fstream>
std::array<uint64_t, 5> readFirst5Numbers(const char* line)
{
std::array<uint64_t, 5> nbr;
for (int i = 0; i < 5; i )
{
nbr[i] = atoll(line);
line = strchr(line, ',') 1;
}
return nbr;
}
int main()
{
// 1. Map the input file in memory
const char* inputPath = "C:/Users/1032359/cpp-projects/Straggler Job Analyzer/src/part-00001-of-00500.csv";
HANDLE fileHandle = CreateFileA(inputPath, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, 0, NULL);
DWORD highsize;
DWORD lowsize = GetFileSize(fileHandle, &highsize);
HANDLE mappingHandle = CreateFileMapping(fileHandle, NULL, PAGE_READONLY, highsize, lowsize, NULL);
size_t fileSize = (size_t)lowsize | (size_t)highsize << 32;
const char* memoryAddr = (const char*)MapViewOfFile(mappingHandle, FILE_MAP_READ, 0, 0, fileSize);
// 2. Find the offset of the start of lines
std::vector<size_t> linesOffset;
linesOffset.push_back(0);
for (size_t i = 0; i < fileSize; i )
if (memoryAddr[i] == '\n')
linesOffset.push_back(i 1);
linesOffset.pop_back();
// 3. sort the offset according to some logic
std::sort(linesOffset.begin(), linesOffset.end(), [memoryAddr](const size_t& offset1, const size_t& offset2) {
std::array<uint64_t, 5> nbr1 = readFirst5Numbers(memoryAddr offset1);
std::array<uint64_t, 5> nbr2 = readFirst5Numbers(memoryAddr offset2);
if (nbr1[4] != nbr2[4])
return nbr1[4] < nbr2[4];
if (nbr1[2] != nbr2[2])
return nbr1[2] < nbr2[2];
return nbr1[4] < nbr2[4];
});
// 4. output sorted array
const char* outputPath = "C:/Users/1032359/cpp-projects/Straggler Job Analyzer/output/part-00001-of-00500.csv";
std::ofstream outputFile;
outputFile.open(outputPath);
for (size_t offset : linesOffset)
{
const char* line = memoryAddr offset;
size_t len = strchr(line, '\n') 1 - line;
outputFile.write(line, len);
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/471464.html
