主頁 > 後端開發 > Go中回應式編程庫RxGo詳細介紹

Go中回應式編程庫RxGo詳細介紹

2023-04-23 07:32:39 後端開發

最近的專案用到了 RxGo ,因為之前從沒有接觸過,特意去學了學,特此記錄下,文章很多內容是復制了參考資料或者官方檔案,如果涉及侵權,請聯系洗掉,謝謝,

1、RxGo簡介

1.1 基礎介紹

RxGo是一個基于Go語言的回應式編程庫,它提供了一種簡單而強大的方式來處理異步事件流和資料流,RxGo的設計靈感來自于ReactiveX,它提供了類似于ReactiveX的運算子和概念,如Observable、Observer、Subject、Scheduler等,

RxGo的目標是提供一種簡單而強大的方式來處理異步事件流和資料流,使得開發人員可以更容易地撰寫高效、可維護和可擴展的代碼,RxGo的特點包括:

  1. 回應式編程:RxGo提供了Observable和Observer兩個核心概念,使得開發人員可以更容易地處理異步事件流和資料流,
  2. 運算子:RxGo提供了類似于ReactiveX的運算子,如map、filter、reduce等,使得開發人員可以更容易地對事件流進行轉換、過濾和聚合等操作,
  3. 調度器:RxGo提供了調度器,使得開發人員可以更容易地控制事件流的執行執行緒和順序,
  4. 可組合性:RxGo的運算子具有可組合性,使得開發人員可以更容易地組合多個運算子來實作復雜的操作,
  5. 高效性:RxGo的設計和實作都非常高效,可以處理大量的事件流和資料流,

總之,RxGo是一個非常強大和實用的回應式編程庫,它可以幫助開發人員更容易地處理異步事件流和資料流,提高代碼的可維護性和可擴展性,

1.2 RxGo 資料流程圖

RxGo的實作基于管道的概念,管道是由通道連接的一系列階段,其中每個階段是運行相同功能的一組goroutine,

  • 使用Just運算子創建一個基于固定串列的靜態可觀測資料,
  • 使用Map運算子定義了一個轉換函式(把圓形變成方形),
  • Filter運算子過濾掉黃色方形,

從上面的例子中可以看出來,最終生成的資料被發送到一個通道中,消費者讀取資料進行消費,RxGo中有很多種消費和生成資料的方式,發布結果到通道中只是其中一種方式,

2、快速入門

2.1 安裝 RxGo v2

go get -u github.com/reactivex/rxgo/v2

2.2 簡單案例

我們先寫一個簡單的案例,來學習RxGo的簡單使用,

package main

import (
  "fmt"

  "github.com/reactivex/rxgo/v2"
)

func main() {
  observable := rxgo.Just(1, 2, 3, 4, 5)()
  ch := observable.Observe()
  for item := range ch {
    fmt.Println(item.V)
  }
}

使用 RxGo 的一般流程如下:

  • 使用相關的 Operator 創建 ObservableOperator 就是用來創建 Observable 的,
  • 中間各個階段可以使用過濾操作篩選出我們想要的資料,使用轉換操作對資料進行轉換;
  • 呼叫 ObservableObserve()方法,該方法回傳一個<- chan rxgo.Item,然后for range遍歷即可,

結合上面的這張圖,我們就比較容易理解RxGo的資料處理流程,因為例子比較簡單,沒有用到Map、Filter操作,

執行結果:

$ go run main.go 
1
2
3
4
5

Just使用到柯里化的編程思想,
柯里化(Currying)是一種函式式編程的技術,它將一個接受多個引數的函式轉換成一系列接受單個引數的函式,這些單引數函式可以被組合起來,以便在后續的計算中使用,

柯里化的主要優點是它可以使函式更加靈活和可復用,通過將函式分解為一系列單引數函式,我們可以更容易地組合和重用這些函式,從而減少代碼的重復性和冗余性,

例如:

//柯里化的例子
func addCurried(x int) func(int) int {
	return func(y int) int {
		return x + y
	}
}

func main()  {
	add5 := addCurried(5)
	fmt.Println(add5(10))
}

由于 Go 不支持多個可變引數,Just通過柯里化迂回地實作了這個功能:

//Just creates an Observable with the provided items.
func Just(items ...interface{}) func(opts ...Option) Observable {
  return func(opts ...Option) Observable {
    return &ObservableImpl{
      iterable: newJustIterable(items...)(opts...),
    }
  }
}

Observe()回傳一個 Item 的chan ,Item的結構如下:

// Item is a wrapper having either a value or an error.
type	Item struct {
		V interface{}
		E error
	}

所以通過Just生成observable物件時,傳入的資料可以包含錯誤,在使用時通過 item.Error() 來區分,

func main() {
  observable := rxgo.Just(1, 2, errors.New("unknown"), 3, 4, 5)()
  ch := observable.Observe()
  for item := range ch {
    if item.Error() {
      fmt.Println("error:", item.E)
    } else {
      fmt.Println(item.V)
    }
  }
}

我們使用item.Error()檢查是否出現錯誤,然后使用item.V訪問資料,item.E訪問錯誤,

除了使用for range之外,我們還可以呼叫 ObservableForEach()方法來實作遍歷,ForEach()接受 3 個回呼函式:

  • NextFunc:型別為func (v interface {}),傳入的資料不包含錯誤型別時走此函式處理,
  • ErrFunc:型別為func (err error),當傳入的資料包含錯誤時走此函式;
  • CompletedFunc:型別為func ()Observable 完成時呼叫,

有點Promise那味了,使用ForEach(),可以將上面的示例改寫為:

func main() {
  observable := rxgo.Just(1, 2, errors.New("這是一個測驗錯誤!"), 4, 5)()
  <-observable.ForEach(func(v interface{}) {
    fmt.Println("received:", v)
  }, func(err error) {
    fmt.Println("error:", err)
  }, func() {
    fmt.Println("completed")
  })
}
$ go run main.go 
received: 1
received: 2
error: 這是一個測驗錯誤!
received: 4
received: 5
completed

ForEach()回傳的是一個 chan,用于當 observable 關閉時會向此chan發送資料,所以在 observable前面加了 <-來阻塞等待 ForEach()處理完資料,

3、RxGo 深入學習

上面的簡單案例,我們是使用Just來創建observable,其實還有其他的方式創建observable,一起來看一看,

3.1 rxgo.Create

傳入一個[]rxgo.Producer的切片,其中rxgo.Producer的型別為func(ctx context.Context, next chan<- Item),我們可以在代碼中呼叫rxgo.Of(value)生成資料,rxgo.Error(err)生成錯誤,然后發送到next通道中:

package main

import (
	"context"
	"errors"
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main()  {
	observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
		next <- rxgo.Of(1)
		next <- rxgo.Of("aaa")
		next <- rxgo.Of(errors.New("test"))
	}})

	ch := observable.Observe()
	for item := range ch {
		if item.Error() {
			fmt.Println("err:", item.E)
		}else {
			fmt.Println(item.V)
		}
	}
}

因為rxgo.Create中的引數是[]rxgo.Producer,所以分成兩個rxgo.Producer也是一樣的效果:

observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
  next <- rxgo.Of(1)
  next <- rxgo.Of(2)
  next <- rxgo.Of(3)
  next <- rxgo.Error(errors.New("unknown"))
  }, func(ctx context.Context, next chan<- rxgo.Item) {
  next <- rxgo.Of(4)
  next <- rxgo.Of(5)
}})

3.2 rxgo.FromChannel

FromChannel可以直接從一個已存在的<-chan rxgo.Item物件中創建 Observable

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)



func main()  {

	ch := make(chan rxgo.Item)
	go func() {
		for i := 0; i < 5; i++ {
			ch <- rxgo.Of(i)
		}

		//需要手動關閉 ch 通道
		close(ch)
	}()

	observable := rxgo.FromChannel(ch)
	for item := range observable.Observe() {
		if item.Error() {
			fmt.Println("err:", item.E)
		}else {
			fmt.Println(item.V)
		}
	}
}

注意:

通道需要手動呼叫close()關閉,上面Create()方法內部rxgo自動幫我們執行了這個步驟,

func newCreateIterable(fs []Producer, opts ...Option) Iterable {
	...

	go func() {
		// Create方法內部自動關閉了 next 通道
		defer close(next)
		for _, f := range fs {
			f(ctx, next)
		}
	}()

	...
}

3.3 rxgo.Interval

Interval以傳入的時間間隔生成一個無窮的數字序列,從 0 開始:

func main()  {
	
	observable := rxgo.Interval(rxgo.WithDuration(time.Second))
	for item := range observable.Observe() {
		if item.Error() {
			fmt.Println("err:", item.E)
		}else {
			fmt.Println(item.V)
		}
	}
}

運行后,第一秒輸出 0,第二秒輸出 1,以此類推,

3.4 rxgo.Range

func main() {
  observable := rxgo.Range(0, 3)
  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

Range可以生成一個范圍內的數字:

上面代碼依次輸出 0,1,2,3,

3.5 Repeat

這個和之前的不太一樣,這個是對已經存在的 observable物件呼叫 Repeat方法,從而實作重復生成資料,

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)

func main()  {

	observable := rxgo.Range(0,3).Repeat(2, rxgo.WithDuration(time.Second))
	for item := range observable.Observe() {
		if item.Error() {
			fmt.Println("err:", item.E)
		}else {
			fmt.Println(item.V)
		}
	}
}

輸出:

0
1
2
0
1
2
0
1
2

注意:這里執行的次數一共是3次,Repeat中的引數是2,重復2次,一共3次,

3.6 rxgo.Start

可以給Start方法傳入[]rxgo.Supplier作為引數,它可以包含任意數量的rxgo.Supplier型別,rxgo.Supplier的底層型別為:

var Supplier func(ctx context.Context) rxgo.Item

Observable 內部會依次呼叫這些rxgo.Supplier生成rxgo.Item

package main

import (
	"context"
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)



func Supplier1(ctx context.Context) rxgo.Item {
	deadline, ok  := ctx.Deadline()
	fmt.Println("Supplier1", deadline, ok)
	time.Sleep(time.Second)
	return rxgo.Of(1)
}

func Supplier2(ctx context.Context) rxgo.Item {
	deadline, ok  := ctx.Deadline()
	fmt.Println("Supplier2", deadline, ok)
	time.Sleep(time.Second)
	return rxgo.Of(2)
}

func Supplier3(ctx context.Context) rxgo.Item {
	deadline, ok  := ctx.Deadline()
	fmt.Println("Supplier3", deadline, ok)
	time.Sleep(time.Second)
	return rxgo.Of(3)
}

func main() {
	ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
	observable := rxgo.Start([]rxgo.Supplier{Supplier1, Supplier2, Supplier3}, rxgo.WithContext(ctx))
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

4、Observable 分類

根據資料在何處生成,Observable 被分為 HotCold 兩種型別,

  • Hot Observable:熱可觀測量,資料由可觀測量外部產生,
  • Cold Observable:冷可觀測量,資料由可觀測量內部產生,

通常不想一次性的創建所有的資料,使用 熱可觀測量,

4.1 熱可觀測量示例

func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 0; i < 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch)

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

結果:

0
1
2

上面創建的是 Hot Observable,但是有個問題,第一次Observe()消耗了所有的資料,第二個就沒有資料輸出了,(可以用可連接的觀測量來修改這一行為,后面再說),

4.2 冷可觀測量示例

Cold Observable 就不會有這個問題,因為它創建的流是獨立于每個觀察者的,即每次呼叫Observe()都創建一個新的 channel,我們使用Defer()方法創建 Cold Observable,它的引數與Create()方法一樣,

func main() {
  observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
    for i := 0; i < 3; i++ {
      ch <- rxgo.Of(i)
    }
  }})

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

Defer原始碼介紹:

// Defer does not create the Observable until the observer subscribes,
// and creates a fresh Observable for each observer.
func Defer(f []Producer, opts ...Option) Observable {
	return &ObservableImpl{
		iterable: newDeferIterable(f, opts...),
	}
}

執行結果:

$ go run main.go
0
1
2
0
1
2

4.3 可連接的 Observable

可連接的(Connectable)Observable 對普通的 Observable 進行了一層組裝,呼叫它的Observe()方法時并不會立刻產生資料,使用它,我們可以等所有的觀察者都準備就緒了(即呼叫了Observe()方法)之后,再呼叫其Connect()方法開始生成資料,我們通過兩個示例比較使用普通的 Observable 和可連接的 Observable 有何不同,

4.3.1 普通的Observable,并不是可連接的Observable
func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 1; i <= 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch)

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("First observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
  fmt.Println("before subscribe second observer")

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("Second observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
}

上例中我們使用DoOnNext()方法來注冊觀察者,由于DoOnNext()方法是異步執行的,所以為了等待結果輸出,在最后增加了一行time.Sleep,運行結果:

First observer: 1
First observer: 2
First observer: 3
before subscribe second observer

由輸出可以看出,注冊第一個觀察者之后就開始產生資料了,第二個觀察者并不會得到資料,

4.3.2 可連接的Observable

通過在創建 Observable 的方法中指定rxgo.WithPublishStrategy()選項就可以創建可連接的 Observable

  • 重點是傳入rxgo.WithPublishStrategy()
func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 1; i <= 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("First observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
  fmt.Println("before subscribe second observer")

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("Second observer: %d\n", i)
  })
	
  //需要手動呼叫 observable.Connect 才會產生資料
  observable.Connect(context.Background())
  time.Sleep(3 * time.Second)
}

運行輸出:

$ go run main.go
before subscribe second observer
Second observer: 1
First observer: 1
First observer: 2
First observer: 3
Second observer: 2
Second observer: 3

上面是等兩個觀察者都注冊之后,并且手動呼叫了 Observable 的Connect()方法才產生資料,而且可連接的 Observable 有一個特性:它是冷啟動的!!!,即每個觀察者都會收到一份相同的拷貝,

5、轉換 Observable

通過 RxGo 資料流程圖我們知道,我們可以對rxgo.Item進行轉換,rxgo 提供了很多轉換函式,下面一起來學一學這些轉換函式,

5.1 Map

Map()方法簡單修改它收到的rxgo.Item然后發送到下一個階段(轉換或過濾),Map()接受一個型別為func (context.Context, interface{}) (interface{}, error)的函式,第二個引數就是rxgo.Item中的資料,回傳轉換后的資料,如果出錯,則回傳錯誤,

func main() {
	observable := rxgo.Just(1, 2, 3)()

	observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
		return i.(int), nil
	}).Map(func(_ context.Context, i interface{}) (interface{}, error) {
		b := i.(int)
		if b % 2 == 0 {
			return nil, errors.New("test")
		} else {
			return i, nil
		}
	})

	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

上例中每個數字經過兩個Map,第一個Map邏輯是原樣輸出,第二個Map邏輯是判斷i是不是偶數,如果是偶數,就回傳錯誤,否則原樣輸出,運行結果:

1
<nil>

我們將第一個Map中的陳述句改為下面的邏輯:

return i.(int) + 1, nil

運行結果:

<nil>

我們可以知道,資料的處理是串行的,第一個資料執行完所有的Map過后,第二個資料才會執行,當其中某一個執行回傳的結果包含錯誤,就不會繼續進行轉換了,即不會資料不會進入到 Observe() 中的通道中去,

5.2 Marshal

Marshal對經過它的資料進行一次Marshal,這個Marshal可以是json.Marshal/proto.Marshal,甚至我們自己寫的Marshal函式,它接受一個型別為func(interface{}) ([]byte, error)的函式用于對資料進行處理,

type User struct {
  Name string `json:"name"`
  Age  int    `json:"age"`
}

func main() {
  observable := rxgo.Just(
    User{
      Name: "dj",
      Age:  18,
    },
    User{
      Name: "jw",
      Age:  20,
    },
  )()

  observable = observable.Marshal(json.Marshal)

  for item := range observable.Observe() {
    fmt.Println(string(item.V.([]byte)))
  }
}

執行結果:

{"name":"dj","age":18}
{"name":"jw","age":20}

由于Marshal操作回傳的是[]byte型別,我們需要進行型別轉換之后再輸出,

5.3 Unmarshal

既然有Marshal,也就有它的相反操作UnmarshalUnmarshal用于將一個[]byte型別轉換為相應的結構體或其他型別,與Marshal不同,Unmarshal需要知道轉換的目標型別,所以需要提供一個函式用于生成該型別的物件,然后將[]byte資料Unmarshal到該物件中,Unmarshal接受兩個引數,引數一是型別為func([]byte, interface{}) error的函式,引數二是func () interface{}用于生成實際型別的物件,我們拿上面的例子中生成的 JSON 字串作為資料,將它們重新UnmarshalUser物件:

type User struct {
  Name string `json:"name"`
  Age  int    `json:"age"`
}

func main() {
  observable := rxgo.Just(
    `{"name":"dj","age":18}`,
    `{"name":"jw","age":20}`,
  )()

  observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
    return []byte(i.(string)), nil
  }).Unmarshal(json.Unmarshal, func() interface{} {
    return &User{}
  })

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

由于Unmarshaller接受[]byte型別的引數,我們在Unmarshal之前加了一個Map用于將string轉為[]byte,運行結果:

&{dj 18}
&{jw 20}

5.4 Buffer

Buffer按照一定的規則收集接收到的資料,然后一次性發送出去(作為切片),而不是收到一個發送一個,有 3 種型別的Buffer

  • BufferWithCount(n):每收到n個資料發送一次,最后一次可能少于n個;
  • BufferWithTime(n):發送在一個時間間隔n內收到的資料;
  • BufferWithTimeOrCount(d, n):收到n個資料,或經過d時間間隔,發送當前收到的資料,
5.4.1 BufferWithCount
func main() {
	observable := rxgo.Range(0, 5)

	observable = observable.BufferWithCount(2)

	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

執行結果:

[0 1]
[2 3]
[4]

最后一組只有一個,

5.4.2 BufferWithTime
unc main() {
	ch := make(chan rxgo.Item, 1)

	go func() {
		i := 0
		for range time.Tick(time.Second) {
			ch <- rxgo.Of(i)
			i++
		}
	}()

	observable := rxgo.FromChannel(ch).BufferWithTime(rxgo.WithDuration(2 * time.Second))

	layout := "2006-01-02 13:04:05"
	fmt.Println("startTime", time.Now().Format(layout))
	for item := range observable.Observe() {
		fmt.Println(item.V)
		fmt.Println("nextTime", time.Now().Format(layout))

	}
}

執行結果是不確定的,這里需要注意:

startTime 2023-04-22 44:15:49
[0]
nextTime 2023-04-22 44:15:51
[1 2]
nextTime 2023-04-22 44:15:53
[3 4 5]
nextTime 2023-04-22 44:15:55
...
5.4.3 BufferWithTimeOrCount
func main() {
	ch := make(chan rxgo.Item, 1)

	go func() {
		i := 0
		for range time.Tick(time.Second) {
			ch <- rxgo.Of(i)
			i++
		}
	}()

	observable := rxgo.FromChannel(ch).BufferWithTimeOrCount(rxgo.WithDuration(2*time.Second), 2)

	layout := "2006-01-02 13:04:05"
	fmt.Println("startTime", time.Now().Format(layout))
	for item := range observable.Observe() {
		fmt.Println(item.V)
		fmt.Println("nextTime", time.Now().Format(layout))
	}
}

執行結果:

startTime 2023-04-22 44:18:48
[0]
nextTime 2023-04-22 44:18:50
[1 2]
nextTime 2023-04-22 44:18:51
[3 4]
nextTime 2023-04-22 44:18:53

BufferWithTimeOrCount是以BufferWithCount、BufferWithTime誰先滿足條件為準,誰先滿足誰就先執行,

5.5 GroupBy

``GroupBy將一個Observable分成多個子Observable,每個子Observable`包含相同的索引值的元素,

GroupBy函式定義如下:

GroupBy(length int, distribution func(Item) int, opts ...Option) Observable

即將一個Observable分成length個子Observable,根據distribution函式回傳的int作為分組的依據,

package main

import (
	"fmt"

	"github.com/reactivex/rxgo/v2"
)

func main() {
	// 創建一個Observable,它發出一些整數值
	source := rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)()

	// 使用GroupBy運算子將整數值按照奇偶性進行分組
	grouped := source.GroupBy(2, func(item rxgo.Item) int {
		return item.V.(int) % 2
	}, rxgo.WithBufferedChannel(10))

	for subObservable := range grouped.Observe() {
		fmt.Println("new subObservable ------ ")
		for item := range subObservable.V.(rxgo.Observable).Observe() {
			fmt.Printf("%v\n", item.V)
		}
	}

}

上面根據每個數模 3 的余數將整個流分為 3 組,運行:

new subObservable ------ 
2
4
6
8
10
new subObservable ------ 
1
3
5
7
9

注意rxgo.WithBufferedChannel(10)的使用,由于我們的數字是連續生成的,依次為 0->1->2->…->9->10,而 Observable 默認是惰性的,即由Observe()驅動,內層的Observe()在回傳一個 0 之后就等待下一個數,但是下一個數 1 不在此 Observable 中,所以會陷入死鎖,使用rxgo.WithBufferedChannel(10),設定它們之間的連接 channel 緩沖區大小為 10,這樣即使我們未取出 channel 里面的數字,上游還是能發送數字進來,

6、并行操作

默認情況下,這些轉換操作都是串行的,即只有一個 goroutine 負責執行轉換函式,從上面的Map操作也可以得知默認是串行執行的,可以改變這一默認行為,使用rxgo.WithPool(n)選項設定運行n個 goroutine,或者rxgo.WitCPUPool()選項設定運行與邏輯 CPU 數量相等的 goroutine,

package main

import (
	"context"
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"math/rand"
	"time"
)

func main() {
	observable := rxgo.Range(1, 10)

	observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
		time.Sleep(time.Duration(rand.Int31()))
		return i.(int) + 1, nil
	}, rxgo.WithCPUPool())

	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

8
9
10
6
5
11
2
4
7
3

由于是并行運算,所以結果是不固定的,

我們可以直接看官網的介紹:https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md

7、過濾 Observable

我們可以對Observable 中發送過來的資料進行過濾,過濾掉不需要的資料,有以下方式:

  • Filter

  • ElementAt

  • Debounce

  • Distinct

  • Skip

  • Take

下面的內容大多來自官方的示例,地址:https://github.com/ReactiveX/RxGo/tree/v2.5.0/doc

7.1 Filter

Filter()接受一個型別為func (i interface{}) bool的引數,通過的資料使用這個函式斷言,回傳true的將發送給下一個階段,否則,丟棄,

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(1, 2, 3)().
		Filter(func(i interface{}) bool {
			return i != 2
		})
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

1
3

7.2 ElementAt

ElementAt()只發送指定索引的資料,如ElementAt(2)只發送索引為 2 的資料,即第 3 個資料,

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(0, 1, 2, 3, 4)().ElementAt(2)
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

2

7.3 Debounce

只有當特定的時間跨度已經過去而沒有發出另一個Item時,才從Observable發出一個Item

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)

func main() {
	ch := make(chan rxgo.Item)

	go func() {
		ch <- rxgo.Of(1)
		time.Sleep(2 * time.Second)
		ch <- rxgo.Of(2)
		ch <- rxgo.Of(3)
		time.Sleep(2 * time.Second)
		close(ch)
	}()

	observable := rxgo.FromChannel(ch).Debounce(rxgo.WithDuration(1 * time.Second))
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

1
3

上面示例,先收到 1,然后 2s 內沒收到資料,所以發送 1,接著收到了資料 2,由于馬上又收到了 3,所以 2 不會發送,收到 3 之后 2s 內沒有收到資料,發送了 3,所以最后輸出為 1,3,

7.4 Distinct

Distinct()會記錄它發送的所有資料,它不會發送重復的資料,由于資料格式多樣,Distinct()要求我們提供一個函式,根據原資料回傳一個唯一標識碼(有點類似哈希值),基于這個標識碼去重,

package main

import (
	"context"
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(1, 2, 2, 3, 4, 4, 5)().
		Distinct(func(_ context.Context, i interface{}) (interface{}, error) {
			return i, nil
		})
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

1
2
3
4
5

7.5 Skip

Skip可以跳過前若干個資料,

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(1, 2, 3, 4, 5)().Skip(2)
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

3
4
5

7.6 Take

Take只取前若干個資料,

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(1, 2, 3, 4, 5)().Take(2)
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

結果:

1
2

8、選項

因為golang中不支持默認引數,所以我們經常會用到選項設計模式,rxgo中也大量使用到了此模式,

  • rxgo.WithBufferedChannel(10):設定 channel 的快取大小;
  • rxgo.WithPool(n)/rxgo.WithCpuPool():使用多個 goroutine 執行轉換操作;
  • rxgo.WithPublishStrategy():使用發布策略,即創建可連接的 Observable

rxgo還有很多其他選項,具體看官方檔案,地址:

https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md

9、簡化的真實案例

假設現在有一個定時處理任務,結構如下:

type ScheduledTask struct {
	RecordId int
	HandleStartTime time.Time
	Status bool
}

在執行具體的任務時,需要去資料庫查詢下是否已經被取消了,如果已經被取消掉的,則不再執行,

完整代碼如下:

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)

type ScheduledTask struct {
	RecordId int
	HandleStartTime string
	Status bool
}

func main() {
	ch := make(chan rxgo.Item)
	go producer(ch)

	time.Sleep(time.Second*3)
	observable := rxgo.FromChannel(ch)
	observable = observable.Filter(func(i interface{}) bool {
		st := i.(*ScheduledTask)
		return st.Status
	}, rxgo.WithBufferedChannel(1))

	// 消費可觀測量
	for customer := range observable.Observe() {
		st := customer.V.(*ScheduledTask)
		fmt.Printf("resutl: --> %+v\n", st)
	}
}

func producer(ch chan <- rxgo.Item)  {
	for i := 0; i < 10; i++ {
		status := false
		if i % 2 == 0 {
			status = true
		}
		st := &ScheduledTask{
			RecordId: i,
			HandleStartTime: time.Now().Format("2006-01-02 13:04:05"),
			Status: status,
		}
		ch <- rxgo.Of(st)
	}
	
  // 這里千萬不要忘記了
	close(ch)
}

結果:

resutl: --> &{RecordId:0 HandleStartTime:2023-04-22 46:04:07 Status:true}
resutl: --> &{RecordId:2 HandleStartTime:2023-04-22 46:04:10 Status:true}
resutl: --> &{RecordId:4 HandleStartTime:2023-04-22 46:04:10 Status:true}
resutl: --> &{RecordId:6 HandleStartTime:2023-04-22 46:04:10 Status:true}
resutl: --> &{RecordId:8 HandleStartTime:2023-04-22 46:04:10 Status:true}

參考鏈接

Go 每日一庫之 rxgo

[官方例子](

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/550847.html

標籤:其他

上一篇:Midjourney 提示詞工具(10 個國內外最好最推薦的)

下一篇:返回列表

標籤雲
其他(157844) Python(38092) JavaScript(25381) Java(17985) C(15215) 區塊鏈(8256) C#(7972) AI(7469) 爪哇(7425) MySQL(7137) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4557) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2430) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1959) Web開發(1951) HtmlCss(1919) python-3.x(1918) 弹簧靴(1913) C++(1910) xml(1889) PostgreSQL(1872) .NETCore(1854) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Go中回應式編程庫RxGo詳細介紹

    最近的專案用到了 RxGo ,因為之前從沒有接觸過,特意去學了學,特此記錄下。文章很多內容是復制了參考資料或者官方檔案。如果涉及侵權,請聯系洗掉,謝謝。 1、RxGo簡介 1.1 基礎介紹 RxGo是一個基于Go語言的回應式編程庫,它提供了一種簡單而強大的方式來處理異步事件流和資料流。RxGo的設計 ......

    uj5u.com 2023-04-23 07:32:39 more
  • Midjourney 提示詞工具(10 個國內外最好最推薦的)

    Midjourney,是一個革命性的基于人工智能的藝術生成器,可以從被稱為提示的簡單文本描述中生成令人驚嘆的影像。Midjourney已經迅速成為藝術家、設計師和營銷人員的首選工具(包括像我這樣根本不會設計任何東西的無能之輩)。 為了幫助你開始使用這個強大的工具,我們匯編了一份15個資源的清單,可以 ......

    uj5u.com 2023-04-23 07:32:23 more
  • C++的拓撲排序實作

    template<typename T = CString, typename _Data = https://www.cnblogs.com/shizhimofa/archive/2023/04/22/CString> struct Union_node//!< 節點 { Union_node() :nColor(0) {} std::vector<Union_node*> vecNodeSon; T ......

    uj5u.com 2023-04-23 07:32:18 more
  • Python基礎—conda使用筆記

    Python基礎—conda使用筆記 1. 環境配置 由于用conda管理虛擬環境真滴很方便,所以主要使用conda,就不單獨去裝Python了。 1.1. Miniconda3安裝 Miniconda3官網下載地址:Miniconda Miniconda3清華鏡像下載:清華鏡像-Miniconda ......

    uj5u.com 2023-04-23 07:32:02 more
  • python學習-學生資訊管理系統并打包exe

    在B站自學Python 站主:Python_子木 授課:楊淑娟 平臺: 馬士兵教育 python: 3.9.9 #python打包exe檔案 #安裝PyInstaller pip install PyInstaller #-F打包exe檔案,stusystem\stusystem.py到py的路徑, ......

    uj5u.com 2023-04-23 07:31:51 more
  • windows10下golang使用protobuf前奏

    1.更改代理(方便步驟3) 方法一: go env -w GOPROXY="https://goproxy.cn" 方法二:(非永久性,該方法對我有效) $env:GOPROXY="https://goproxy.cn" 注: http://mirrors.aliyun.com/goproxy/ 阿 ......

    uj5u.com 2023-04-23 07:31:46 more
  • 影像梯度

    影像梯度影像梯度計算的是影像變化的速度 對于影像的邊緣部分,其灰度值變化較大,梯度值也較大相反,對于影像中比較平滑的部分,其灰度值變化較小,相應的梯度值也較小。影像梯度計算需要求導數,但是影像梯度一般通過計算像素值的差來得到梯度的近似值(近似導數值)。(差分,離散) Sobel算子 1 #Sobel ......

    uj5u.com 2023-04-23 07:31:41 more
  • Rust編程語言入門之模式匹配

    模式匹配 模式 模式是Rust中的一種特殊語法,用于匹配復雜和簡單型別的結構 將模式與匹配運算式和其他構造結合使用,可以更好地控制程式的控制流 模式由以下元素(的一些組合)組成: 字面值 解構的陣列、enum、struct 和 tuple 變數 通配符 占位符 想要使用模式,需要將其與某個值進行比較 ......

    uj5u.com 2023-04-23 07:31:37 more
  • 影像金字塔

    影像金字塔 簡單來說就是 自下而上影像一步一步縮小 1 高斯金字塔(涉及高斯分布) 向下采樣(縮小,對金字塔來說是自下向上) 第一步: 高斯濾波去噪 第二部:將偶數行和列去掉 向上采樣(放大,對金字塔來說是自上向下) 第一步:在每個方向上擴大兩倍,新增的行和列填充0 第二步:利用之前同樣的內核進行卷 ......

    uj5u.com 2023-04-23 07:25:47 more
  • 影像邊緣檢測(Canny)

    Canny檢測的流程 Canny檢測主要是用于邊緣檢測 1)使用高斯濾波器,以平滑影像,濾除噪聲。 2)計算影像中每個像素點的梯度強度和方向。 3)應用非極大值(Non-Maximum Suppression)抑制,以消除邊緣檢測帶來的雜散回應 4)應用雙閾值(Double-Threshold)檢測 ......

    uj5u.com 2023-04-23 07:20:41 more