摘要:Goreplay 前稱是 Gor,一個簡單的 TCP/HTTP 流量錄制及重放的工具,主要用 Go 語言撰寫,
本文分享自華為云社區《流量回放工具之 goreplay 核心原始碼分析》,作者:zuozewei,
一、前言
Goreplay 前稱是 Gor,一個簡單的 TCP/HTTP 流量錄制及重放的工具,主要用 Go 語言撰寫,
Github地址:https://github.com/buger/goreplay
二、工程結構
這里以最新的 v1.3 版本為例,與 v1.0 的代碼存在較大差異,
~/GoProjects/gor_org/goreplay release-1.3 ±? tree -L 1
.
├── COMM-LICENSE
├── Dockerfile
├── Dockerfile.dev
├── ELASTICSEARCH.md
├── LICENSE.txt
├── Makefile
├── Procfile
├── README.md
├── byteutils
├── capture
├── circle.yml
├── docs
├── elasticsearch.go
├── emitter.go
├── emitter_test.go
├── examples
├── go.mod
├── go.sum
├── gor.go
├── gor_stat.go
├── homebrew
├── http_modifier.go
├── http_modifier_settings.go
├── http_modifier_settings_test.go
├── http_modifier_test.go
├── http_prettifier.go
├── http_prettifier_test.go
├── input_dummy.go
├── input_file.go
├── input_file_test.go
├── input_http.go
├── input_http_test.go
├── input_kafka.go
├── input_kafka_test.go
├── input_raw.go
├── input_raw_test.go
├── input_tcp.go
├── input_tcp_test.go
├── kafka.go
├── limiter.go
├── limiter_test.go
├── middleware
├── middleware.go
├── middleware_test.go
├── mkdocs.yml
├── output_binary.go
├── output_dummy.go
├── output_file.go
├── output_file_test.go
├── output_http.go
├── output_http_test.go
├── output_kafka.go
├── output_kafka_test.go
├── output_null.go
├── output_s3.go
├── output_tcp.go
├── output_tcp_test.go
├── plugins.go
├── plugins_test.go
├── pro.go
├── proto
├── protocol.go
├── ring
├── s3
├── s3_reader.go
├── s3_test.go
├── settings.go
├── settings_test.go
├── sidenav.css
├── simpletime
├── site
├── size
├── snapcraft.yaml
├── tcp
├── tcp_client.go
├── test_input.go
├── test_output.go
├── vendor
└── version.go
工程目錄比較扁平,主要看 plugin.go,settings.go,emitter.go 幾個主要檔案,其它分 input_xxx ,output_xxx 都是適配具體協議的輸入輸出插件,程式入口是 gor.go 的 main 函式,
主要檔案說明:
- settings.go:實作對于啟動命令引數的決議,決定其注冊那些插件到 Plugin.Inputs,Plugin.Outputs兩個串列里,
- plugin.go:主要是所有輸入輸出插件的管理,
- emitter.go:程式核心事件處理,實作對于 Plugin.Inputs 輸入流的讀取、判斷是否需要進行 middlewear 的處理、http修改等,然后異步復制流量到所有 Plugin.outputs,同時將所有 Plugin.outputs 中有 response 的資料,復制到所有 outputs 中,
- input_xxx.go:主要是輸入的插件,實作 tcp/http/raw/kafka等協議, 實作 io.Reader 介面,最后根據配置注冊到 Plugin.inputs佇列里,
- output_xxx.go:主要是輸出的插件,實作 tcp/http/raw/kafka 等協議, 實作 io.Writer 介面,最后根據配置注冊到 Plugin.outputs 佇列里,
三、主要核心流程

goreplay 只有 input 和 output 兩個概念,是 goreplay 對資料流的抽象,統稱為 plugin,
gor.go 中 main 函式,它主要做了以下事情:

1、決議命令列引數:
// Parse parses the command-line flags from os.Args[1:]. Must be called
// after all flags are defined and before flags are accessed by the program.
func Parse() {
// Ignore errors; CommandLine is set for ExitOnError.
CommandLine.Parse(os.Args[1:])
}

2、初始化全域的 Settings 變數,
func checkSettings() {
if Settings.OutputFileConfig.SizeLimit < 1 {
Settings.OutputFileConfig.SizeLimit.Set("32mb")
}
if Settings.OutputFileConfig.OutputFileMaxSize < 1 {
Settings.OutputFileConfig.OutputFileMaxSize.Set("1tb")
}
if Settings.CopyBufferSize < 1 {
Settings.CopyBufferSize.Set("5mb")
}
}
3、命令列引數的定義在 settings.go 的 init 函式中,會先于 main 函式執行,
func init() {
flag.Usage = usage
flag.StringVar(&Settings.Pprof, "http-pprof", "", "Enable profiling. Starts http server on specified port, exposing special /debug/pprof endpoint. Example: `:8181`")
flag.IntVar(&Settings.Verbose, "verbose", 0, "set the level of verbosity, if greater than zero then it will turn on debug output")
flag.BoolVar(&Settings.Stats, "stats", false, "Turn on queue stats output")
if DEMO == "" {
flag.DurationVar(&Settings.ExitAfter, "exit-after", 0, "exit after specified duration")
} else {
Settings.ExitAfter = 5 * time.Minute
}
flag.BoolVar(&Settings.SplitOutput, "split-output", false, "By default each output gets same traffic. If set to `true` it splits traffic equally among all outputs.")
flag.BoolVar(&Settings.RecognizeTCPSessions, "recognize-tcp-sessions", false, "[PRO] If turned on http output will create separate worker for each TCP session. Splitting output will session based as well.")
......
// default values, using for tests
Settings.OutputFileConfig.SizeLimit = 33554432
Settings.OutputFileConfig.OutputFileMaxSize = 1099511627776
Settings.CopyBufferSize = 5242880
}

4、根據命令列傳參初始化插件,在 main 函式中呼叫 InitPlugins 函式,
// NewPlugins specify and initialize all available plugins
func NewPlugins() *InOutPlugins {
plugins := new(InOutPlugins)
for _, options := range Settings.InputDummy {
plugins.registerPlugin(NewDummyInput, options)
}
......
return plugins
}
5、呼叫 Start 函式,啟動 emitter,每個 input 插件,都啟動一個協程,讀取 input,寫 output,?
/ Start initialize loop for sending data from inputs to outputs
func (e *Emitter) Start(plugins *InOutPlugins, middlewareCmd string) {
if Settings.CopyBufferSize < 1 {
Settings.CopyBufferSize = 5 << 20
}
e.plugins = plugins
if middlewareCmd != "" {
middleware := NewMiddleware(middlewareCmd)
for _, in := range plugins.Inputs {
middleware.ReadFrom(in)
}
e.plugins.Inputs = append(e.plugins.Inputs, middleware)
e.plugins.All = append(e.plugins.All, middleware)
e.Add(1)
go func() {
defer e.Done()
if err := CopyMulty(middleware, plugins.Outputs...); err != nil {
Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
}
}()
} else {
for _, in := range plugins.Inputs {
e.Add(1)
go func(in PluginReader) {
defer e.Done()
if err := CopyMulty(in, plugins.Outputs...); err != nil {
Debug(2, fmt.Sprintf("[EMITTER] error during copy: %q", err))
}
}(in)
}
}
}
如果只有一個協程,存在性能瓶頸,默認是一個 input 復制多份,寫多個 output,如果傳了 --split-output 引數,并且有多個 output ,則使用簡單的 Round Robin 演算法來選 output,不會寫多份,多個 input 之間是并行的,但單個 input 到多個 output,是串行的,所有 input 都實作了 io.Reader 介面,output 都實作了 io.Writer 介面,所以閱讀代碼時,input 的入口是 Read() 方法,output 的入口是 Write() 方法,
// CopyMulty copies from 1 reader to multiple writers
func CopyMulty(src PluginReader, writers ...PluginWriter) error {
wIndex := 0
modifier := NewHTTPModifier(&Settings.ModifierConfig)
filteredRequests := make(map[string]int64)
filteredRequestsLastCleanTime := time.Now().UnixNano()
filteredCount := 0
for {
msg, err := src.PluginRead()
if err != nil {
if err == ErrorStopped || err == io.EOF {
return nil
}
return err
}
if msg != nil && len(msg.Data) > 0 {
if len(msg.Data) > int(Settings.CopyBufferSize) {
msg.Data = msg.Data[:Settings.CopyBufferSize]
}
meta := payloadMeta(msg.Meta)
if len(meta) < 3 {
Debug(2, fmt.Sprintf("[EMITTER] Found malformed record %q from %q", msg.Meta, src))
continue
}
requestID := byteutils.SliceToString(meta[1])
// start a subroutine only when necessary
if Settings.Verbose >= 3 {
Debug(3, "[EMITTER] input: ", byteutils.SliceToString(msg.Meta[:len(msg.Meta)-1]), " from: ", src)
}
if modifier != nil {
Debug(3, "[EMITTER] modifier:", requestID, "from:", src)
if isRequestPayload(msg.Meta) {
msg.Data = modifier.Rewrite(msg.Data)
// If modifier tells to skip request
if len(msg.Data) == 0 {
filteredRequests[requestID] = time.Now().UnixNano()
filteredCount++
continue
}
Debug(3, "[EMITTER] Rewritten input:", requestID, "from:", src)
} else {
if _, ok := filteredRequests[requestID]; ok {
delete(filteredRequests, requestID)
filteredCount--
continue
}
}
}
if Settings.PrettifyHTTP {
msg.Data = prettifyHTTP(msg.Data)
if len(msg.Data) == 0 {
continue
}
}
if Settings.SplitOutput {
if Settings.RecognizeTCPSessions {
if !PRO {
log.Fatal("Detailed TCP sessions work only with PRO license")
}
hasher := fnv.New32a()
hasher.Write(meta[1])
wIndex = int(hasher.Sum32()) % len(writers)
if _, err := writers[wIndex].PluginWrite(msg); err != nil {
return err
}
} else {
// Simple round robin
if _, err := writers[wIndex].PluginWrite(msg); err != nil {
return err
}
wIndex = (wIndex + 1) % len(writers)
}
} else {
for _, dst := range writers {
if _, err := dst.PluginWrite(msg); err != nil && err != io.ErrClosedPipe {
return err
}
}
}
}
// Run GC on each 1000 request
if filteredCount > 0 && filteredCount%1000 == 0 {
// Clean up filtered requests for which we didn't get a response to filter
now := time.Now().UnixNano()
if now-filteredRequestsLastCleanTime > int64(60*time.Second) {
for k, v := range filteredRequests {
if now-v > int64(60*time.Second) {
delete(filteredRequests, k)
filteredCount--
}
}
filteredRequestsLastCleanTime = time.Now().UnixNano()
}
}
}
}
輪詢調度演算法的原理是每一次把來自用戶的請求輪流分配給內部中的服務器,從1開始,直到 N(內部服務器個數),然后重新開始回圈,
演算法的優點是其簡潔性,它無需記錄當前所有連接的狀態,所以它是一種無狀態調度,

四、其它的小知識
1、goreplay 抓包呼叫 google/gopacket 來實作,后者通過 cgo 來呼叫 libpcap,整體工具小巧而實用,既可以實作 rawsocket 的抓包,也可以實作 http 的錄制、回放,也支持多實體之間的級聯,RAW_SOCKET 允許監聽任何埠上的流量,因為它們是在IP級別上操作的,埠是 TCP 的特性,具有流量控制、傳輸可靠等優點,這個包實作了自己的TCP層: 使用tcp_packet 決議TCP包,流控制由 tcp_message.go管理
參考地址:http://en.wikipedia.org/wiki/Raw_socket

2、用三個猴頭 emoji 字符作為請求分隔符,第一眼看到感覺挺搞笑的,
比如:

3、配置資訊全靠啟動命令引數,
比如:
/usr/local/bin/gor --input-raw :80 --input-raw-track-response --input-raw-bpf-filter "host ! 167.xxx.xxx.xx" --input-raw-override-snaplen --prettify-http --output-http http://192.168.3.110:80 --output-http-timeout 10s --output-http-workers 1000 --output-http-workers-min 100 --http-allow-header "Aww-Csid: xxxxx" --output-http-track-response --http-allow-method POST --middleware "/production/www/go_replay/client/middleware/sync --project {project_name}" --output-http-compatibility-mode --http-allow-url /article/detail
4、goreplay 支持 Java 程式配合作業的,支持開啟插件模式:
gor --input-raw :80 --middleware "java -jar xxx.jar" --output-file request.gor
通過 middleware 引數可以傳遞一條命令給 gor ,gor 會拉起一個行程執行這個命令,在錄制程序中,gory 通過獲取行程的標準輸入和輸出與插件行程進行通信,
資料流向大致如下:
+-------------+ Original request +--------------+ Modified request +-------------+
| Gor input |----------STDIN---------->| Middleware |----------STDOUT---------->| Gor output |
+-------------+ +--------------+ +-------------+
input-raw java -jar xxx.jar output-file
5、攔截器的設定
參考地址:https://github.com/buger/goreplay/wiki/Dealing-with-missing-requests-and-responses
實際使用程序中,發現錄制流量并發達到一定量級會丟失很多請求,經過閱讀官方檔案和測驗,發現最相關的一個關鍵引數是 –input-raw-buffer-size,
其主要原因四由于 gor 本身需要對資料包進行讀取,協議決議等,借助于 pcap 及 os 緩沖區,當緩沖區不足,到達的資料包不足以組裝 Http 請求則出現丟失或失效請求,無法正確處理,
listener.go 該引數是作用在底層錄制上:
inactive.SetTimeout(t.messageExpire)
inactive.SetPromisc(true)
inactive.SetImmediateMode(t.immediateMode)
if t.immediateMode {
log.Println("Setting immediate mode")
}
if t.bufferSize > 0 {
inactive.SetBufferSize(int(t.bufferSize))
}
handle, herr := inactive.Activate()
if herr != nil {
log.Println("PCAP Activate error:", herr)
wg.Done()
return
}
在具體復制動作定義bufferSize:
// CopyMulty copies from 1 reader to multiple writers
func CopyMulty(src io.Reader, writers ...io.Writer) (err error) {
buf := make([]byte, Settings.copyBufferSize)
wIndex := 0
modifier := NewHTTPModifier(&Settings.modifierConfig)
filteredRequests := make(map[string]time.Time)
filteredRequestsLastCleanTime := time.Now()
......
}
五、代碼呼叫鏈路圖
最后附送一張 gor 代碼呼叫鏈路圖,
原圖地址:
- https://github.com/zuozewei/blog-example/tree/master/Performance-testing/04-full-link/gor-code
點擊關注,第一時間了解華為云新鮮技術~
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/428567.html
標籤:其他
上一篇:Pinpoint【環境搭建 02】Pinpoint Agent 安裝啟動及監控 SpringBoot 專案案例分享
