微軟支持并發的Key-Value 存盤庫有C++與C#兩個版本,號稱迄今為止最快的并發鍵值存盤,下面是C#版本翻譯:
FASTER C#可在.NET Framework和.NET Core中運行,并且可以在單執行緒和并發設定中使用,經過測驗,可以在Windows和Linux上使用,它公開了一種API,該API可以執行讀取,盲更新(Upserts)和讀取-修改-寫入(RMW)操作的混合,它支持大于記憶體的資料,并接受IDevice將日志存盤在檔案中的實作,提供了IDevice本地檔案系統的實作,也可以寫入遠程檔案系統,或者將遠程存盤映射到本地檔案系統中,FASTER可以用作傳統并發資料結構類似ConcurrentDictionary的高性能替代品,并且還支持大于記憶體的資料,它支持增量或非增量資料結構型別的檢查點,
FASTER支持三種基本操作:
- Read:從鍵值存盤中讀取資料
- Upsert:將值盲目向上插入到存盤中(不檢查先前的值)
- Read-Modify-Write:更新存盤區中的值,用于實作“求和”和“計數”之類的操作,
構建
在實體化FASTER之前,您需要創建FASTER將使用的存盤設備,如果使用的是可移植型別(byte、int、double)型別,則僅需要混合日志設備,如果使用物件,則需要創建一個單獨的物件日志設備,
IDevice log = Devices.CreateLogDevice("C:\\Temp\\hybridlog_native.log");
然后,按如下方式創建一個FASTER實體:
fht = new FasterKV<Key, Value, Input, Output, Empty, Functions>
(1L << 20, new Functions(), new LogSettings { LogDevice = log });
建構式的型別引數
有六個基本概念,在實體化FASTER時作為通用型別引數提供:
- Key:這是鍵的型別,例如long,
- Value:這是存盤在FASTER中的值的型別,
- Input:這是呼叫Read或RMW時提供給FASTER的輸入型別,它可以被視為讀取或RMW操作的引數,例如,對于RMW,可是增量累加到值,
- Output:這是讀操作的輸出型別,將值的相關部分復制到輸出,
- Context:操作的用戶定義背景關系,如果沒有必要使用Empty,
- Functions:需要回呼時,使用IFunctions<>呼叫,
回呼函式
用戶提供一個實體化IFunctions<>,此型別封裝了所有回呼,下面將對其進行介紹:
- SingleReader和并發讀ConcurrentReader:這些用于讀取存盤值并將它們復制到Output,單個讀取器可以假定沒有并發操作,
- SingleWriter和ConcurrentWriter:這些用于將值從源值寫入存盤,
- Completion callbacks完成回呼:各種操作完成時呼叫,
- RMWUpdaters:用戶指定了三個更新器,InitialUpdater,InPlaceUpdater和CopyUpdater,它們一起用于實作RMW操作,
- Hash Table Siz哈希表大小:這是分配給FASTER的存盤行數,其中每個行為64位元組,
- LogSettings 日志設定:這些設定與日志的大小、設備,
- Checkpoint設定:這些是與檢查相關的設定,例如檢查型別和檔案夾,
- Serialization序列化設定:用于為鍵和值型別提供自定義序列化程式,序列化程式實作IObjectSerializer<Key>鍵和IObjectSerializer<Value>值,只有C#類物件非可移植型別才需要這些,
- Key比較器:用于為key提供更好的比較器IFasterEqualityComparer<Key>,
建構式引數
FASTER的總記憶體占用量由以下引數控制:
- 哈希表大小:此引數(第一個建構式引數)乘以64是記憶體中哈希表的大小(以位元組為單位),
- 日志大小:logSettings.MemorySizeBits表示混合日志的記憶體部分的大小(以位為單位),換句話說對于引數設定B,日志的大小為2 ^ B位元組,如果日志指向類物件,則此大小不包括物件的大小,因為FASTER無法訪問此資訊,日志的較舊部分溢位到存盤中,
Sessions (Threads)會話(執行緒)
實體化FASTER之后,執行緒可以使用Session來使用FASTER
fht.StartSession();
fht.StopSession();
當所有執行緒都在FASTER上完成操作后,您最終銷毀FASTER實體:
fht.Dispose();
示例
以下是一個簡單示例,其中所有資料都在記憶體中,因此我們不必擔心掛起的I / O操作,在此示例中也沒有檢查點,
public static void Test()
{
var log = Devices.CreateLogDevice("C:\\Temp\\hlog.log");
var fht = new FasterKV<long, long, long, long, Empty, Funcs>
(1L << 20, new Funcs(), new LogSettings { LogDevice = log });
fht.StartSession();
long key = 1, value = https://www.cnblogs.com/knoww/p/1, input = 10, output = 0;
fht.Upsert(ref key, ref value, Empty.Default, 0);
fht.Read(ref key, ref input, ref output, Empty.Default, 0);
Debug.Assert(output == value);
fht.RMW(ref key, ref input, Empty.Default, 0);
fht.RMW(ref key, ref input, Empty.Default, 0);
fht.Read(ref key, ref input, ref output, Empty.Default, 0);
Debug.Assert(output == value + 20);
fht.StopSession();
fht.Dispose();
log.Close();
}
此示例的函式:
public class Funcs : IFunctions<long, long, long, long, Empty>
{
public void SingleReader(ref long key, ref long input, ref long value, ref long dst) => dst = value;
public void SingleWriter(ref long key, ref long src, ref long dst) => dst = src;
public void ConcurrentReader(ref long key, ref long input, ref long value, ref long dst) => dst = value;
public void ConcurrentWriter(ref long key, ref long src, ref long dst) => dst = src;
public void InitialUpdater(ref long key, ref long input, ref long value) => value = https://www.cnblogs.com/knoww/p/input;
public void CopyUpdater(ref long key, ref long input, ref long oldv, ref long newv) => newv = oldv + input;
public void InPlaceUpdater(ref long key, ref long input, ref long value) => value += input;
public void UpsertCompletionCallback(ref long key, ref long value, Empty ctx) { }
public void ReadCompletionCallback(ref long key, ref long input, ref long output, Empty ctx, Status s) { }
public void RMWCompletionCallback(ref long key, ref long input, Empty ctx, Status s) { }
public void CheckpointCompletionCallback(Guid sessionId, long serialNum) { }
}
更多例子
檢查點和恢復
FASTER支持基于檢查點的恢復,每個新的檢查點都會保留(或使之持久)其他用戶操作(讀取,更新或RMW),FASTER允許客戶端執行緒跟蹤已持久的操作和未使用基于會話的API的操作,
回想一下,每個FASTER執行緒都會啟動一個與唯一的Guid相關聯的會話,所有FASTER執行緒操作(讀取,Upsert,RMW)都帶有單調序列號,在任何時間點,都可以呼叫Checkpoint以啟動FASTER的異步檢查點,在呼叫之后Checkpoint,(最終)向每個FASTER執行緒通知一個序列號,這樣可以確保直到該序列號之前的所有操作以及在該序列號之后沒有任何操作被保留為該檢查點的一部分,FASTER執行緒可以使用此序列號來清除等待執行的操作的任何記憶體緩沖區,
在恢復期間,執行緒可以使用繼續使用相同的Guid進行會話ContinueSession,該函式回傳執行緒本地序列號,直到恢復該會話哈希為止,從那時起,新執行緒可以使用此資訊來重播所有未提交的操作,
下面一個單執行緒的簡單恢復示例,
public class PersistenceExample
{
private FasterKV<long, long, long, long, Empty, Funcs> fht;
private IDevice log;
public PersistenceExample()
{
log = Devices.CreateLogDevice("C:\\Temp\\hlog.log");
fht = new FasterKV<long, long, long, long, Empty, Funcs>
(1L << 20, new Funcs(), new LogSettings { LogDevice = log });
}
public void Run()
{
IssuePeriodicCheckpoints();
RunSession();
}
public void Continue()
{
fht.Recover();
IssuePeriodicCheckpoints();
ContinueSession();
}
/* Helper Functions */
private void RunSession()
{
Guid guid = fht.StartSession();
System.IO.File.WriteAllText(@"C:\\Temp\\session1.txt", guid.ToString());
long seq = 0; // sequence identifier
long key = 1, input = 10;
while(true)
{
key = (seq % 1L << 20);
fht.RMW(ref key, ref input, Empty.Default, seq);
seq++;
}
// fht.StopSession() - outside infinite loop
}
private void ContinueSession()
{
string guidText = System.IO.File.ReadAllText(@"C:\\Temp\session1.txt");
Guid sessionGuid = Guid.Parse(guidText);
long seq = fht.ContinueSession(sessionGuid); // recovered seq identifier
seq++;
long key = 1, input = 10;
while(true)
{
key = (seq % 1L << 20);
fht.RMW(ref key, ref input, Empty.Default, seq);
seq++;
}
}
private void IssuePeriodicCheckpoints()
{
var t = new Thread(() =>
{
while(true)
{
Thread.Sleep(10000);
fht.StartSession();
fht.TakeCheckpoint(out Guid token);
fht.CompleteCheckpoint(token, true);
fht.StopSession();
}
});
t.Start();
}
}
FASTER支持兩種檢查點概念:“快照”和“折疊”,前者是將記憶體中的完整快照復制到一個單獨的快照檔案中,而后者是自上一個檢查點以來更改的增量檢查點,折疊有效地將混合日志的只讀標記移到尾部,因此所有資料都作為同一混合日志的一部分保留(沒有單獨的快照檔案),所有后續更新均寫入新的混合日志尾部位置,這使Fold-Over具有增量性質,
專案路徑:
https://github.com/Microsoft/FASTER/tree/master/cs
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/110563.html
標籤:C#
上一篇:C#決議深淺拷貝
下一篇:C# 列舉
