服務計算Homework06
- 專案地址
- 使用說明:直接運行
hw06/main.go即可
課程任務
- 閱讀 ReactiveX 檔案,請在pmlpml/RxGo基礎上,
- 修改、改進它的實作
- 或添加一組新的操作,如filtering
- 該庫的基本組成:
rxgo.go給出了基礎型別、抽象定義、框架實作、Debug工具等generators.go給出了 sourceOperater 的通用實作和具體函式實作transforms.go給出了 transOperater 的通用實作和具體函式實作
RxGo的簡單使用
在pmlpml/RxGo下載對應包并匯入本地goworks路徑
- 呼叫
generators.go中定義的Just方法:使用,分行列印hello、world、!
package main
import (
"fmt"
RxGo "hw06/rxgo-master"
)
func main() {
RxGo.Just("Hello", "World", "!").Subscribe(func(x string) {
fmt.Println(x)
})
}
// output:
// Hello
// World
// !
- 函式鏈呼叫:呼叫
generators.go中的start方法,通過source.operator1().operator2().operator3().subscribe(observer)方式進行鏈呼叫- 呼叫
fibonacci(10)實作求出10以內的斐波那契數列 - 呼叫
map將結果×2 - 呼叫
Subscribe輸出
- 呼叫
package main
import (
"fmt"
RxGo "hw06/rxgo-master"
)
func fibonacci(max int) func() (int, bool) {
a, b := 0, 1
return func() (r int, end bool) {
r = a
a, b = b, a + b
if r > max {
end = true
}
return
}
}
func main() {
RxGo.Start(fibonacci(10)).Map(func(x int) int {
return 2 * x
}).Subscribe(func(x int) {
fmt.Print(x)
})
}
// output
// 022461016
- 連接的observables,與管道類似,不同的是,它在訂閱時不會開始發送專案,而只有在呼叫其connect()方法時才會開始發送
package main
import (
"fmt"
RxGo "hw06/rxgo-master"
)
func main() {
//define pipeline
source := RxGo.Just("Hello", "World", "!")
next := source.Map(func(s string) string {
return s + " "
})
//run pipeline
next.Subscribe(func(x string) {
fmt.Print("next")
fmt.Print(x)
})
fmt.Println()
source.Subscribe(func(x string) {
fmt.Print("source")
fmt.Print(x)
})
}
// output
// next Hello next World next !
// source Hello source World source !
實作程序
新增
filter.go檔案,實作了部分方法
- Distinct - 抑制可觀察物件發出的重復項
- ElementAt - 只發射由可觀察物件發射的項n
- First - 只發出來自可觀察物件的第一項,或滿足條件的第一項
- Last - 只發出被觀察物件發出的最后一項
- Skip - 抑制被觀察物件發出的前n個項
- SkipLast - 抑制被觀察物件發出的最后n個專案
- Take - 只發出一個可觀察物件發出的前n個項
- TakeLast - 只發出可觀察物件發出的最后n個項
具體實作如下
rxgo.go是整個專案的基本框架,在其Observable結構體中新增filter方法中對應需要用到的成員
distinct bool // 是否只選擇不同的值
elementAt int // 選擇指定的索引的元素
first bool // 是否選擇第一資料
last bool // 是否選擇最后一個資料
skip int // 正值跳過前幾項,負值跳過后幾項
take int // 正值選擇前幾項,負值選擇后幾項
takeOrLast bool // 判斷是哪種take操作
-
自定義filter結構體
filterOperator、并初始化一個filter對應的Observable-newFilterObservable()方法仿照transforms.go實作,只是修改對應方法名即可,此處不再贅述,詳見專案
-
方法實作框架也是與
transforms.go相同,先定義方法,初始化observable對應變數,然后再定義一個對應的operator即可,展示Distinct()方法實作如下,其他方法實作思路相同,詳見專案代碼
// 抑制可觀察物件發出的重復項
func (parent *Observable) Distinct() (o *Observable) {
o = parent.newFilterObservable("distinct")
o.first, o.last, o.distinct = false, false, true
o.take, o.skip = 0, 0
o.operator = distinctOperator
return o
}
var distinctOperator = filterOperator{opFunc: func(ctx context.Context, o *Observable, x reflect.Value, out chan interface{}) (end bool) {
var params = []reflect.Value{x}
x = params[0]
if !end {
end = o.sendToFlow(ctx, x.Interface(), out)
}
return
},
}
- 實作
takeOrSkip()方法
func takeOrSkip(_op bool, div int, in []interface{}) ([]interface{}, error) {
if (_op && div > 0) || (!_op && div < 0) {
if !_op {
div = len(in) + div
}
if div >= len(in) || div <= 0 {
return nil, errors.New("Out Of Bound!")
}
return in[:div], nil
}
if (_op && div < 0) || (!_op && div > 0) {
if _op {
div = len(in) + div
}
if div >= len(in) || div <= 0 {
return nil, errors.New("Out Of Bound!")
}
return in[div:], nil
}
return nil, errors.New("Out Of Bound!")
}
- 實作
op(),用來獲取資料流以及對上述操作進行初始化和資訊報錯處理,具體實作也是參考了transforms.go中transOperater的實作
func (tsop filterOperator) op(ctx context.Context, o *Observable) {
// must hold defintion of flow resourcs here, such as chan etc., that is allocated when connected
// this resurces may be changed when operation routine is running.
in := o.pred.outflow
out := o.outflow
//fmt.Println(o.name, "operator in/out chan ", in, out)
var wg sync.WaitGroup
var _out []interface{}
go func() {
end := false
flag := make(map[interface{}]bool)
for x := range in {
if end {
continue
}
// can not pass a interface as parameter (pointer) to gorountion for it may change its value outside!
xv := reflect.ValueOf(x)
// send an error to stream if the flip not accept error
if e, ok := x.(error); ok && !o.flip_accept_error {
o.sendToFlow(ctx, e, out)
continue
}
o.mu.Lock()
_out = append(_out, x)
o.mu.Unlock()
if o.elementAt > 0 {
continue
}
if o.take != 0 || o.skip != 0 {
continue
}
if o.last {
continue
}
if o.distinct && flag[xv.Interface()] {
continue
}
o.mu.Lock()
flag[xv.Interface()] = true
o.mu.Unlock()
// scheduler
switch threading := o.threading; threading {
case ThreadingDefault:
if tsop.opFunc(ctx, o, xv, out) {
end = true
}
case ThreadingIO:
fallthrough
case ThreadingComputing:
wg.Add(1)
go func() {
defer wg.Done()
if tsop.opFunc(ctx, o, xv, out) {
end = true
}
}()
default:
}
if o.first {
break
}
}
if o.last && len(_out) > 0 {
wg.Add(1)
go func() {
defer wg.Done()
xv := reflect.ValueOf(_out[len(_out)-1])
tsop.opFunc(ctx, o, xv, out)
}()
}
if o.take != 0 || o.skip != 0 {
wg.Add(1)
go func() {
defer wg.Done()
var div int
if o.takeOrLast {
div = o.take
} else {
div = o.skip
}
new_in, err := takeOrSkip(o.takeOrLast, div, _out)
if err != nil {
o.sendToFlow(ctx, err, out)
} else {
xv := new_in
for _, val := range xv {
tsop.opFunc(ctx, o, reflect.ValueOf(val), out)
}
}
}()
}
if o.elementAt != 0 {
if o.elementAt < 0 || o.elementAt > len(_out) {
o.sendToFlow(ctx, errors.New("Out Of Bound!"), out)
} else {
xv := reflect.ValueOf(_out[o.elementAt-1])
tsop.opFunc(ctx, o, xv, out)
}
}
wg.Wait() //waiting all go-routines completed
if (o.last || o.first) && len(_out) == 0 && !o.flip_accept_error {
o.sendToFlow(ctx, errors.New("No Input!"), out)
}
o.closeFlow(out)
}()
}
結果展示
-
測驗輸出為

-
gotest測驗結果如下,測驗檔案見專案

-
自動生成的API檔案,詳見專案

參考資料
- RxGo決議
- Filtering Observables
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/209973.html
標籤:其他
上一篇:瑞波新的網路協議
下一篇:區塊鏈的監管性質怎么樣?
