Goの並行モデルは見た目ほど貧弱ではない
これを読んだ。
言語仕様や標準ライブラリと、サードパーティライブラリを比べているけれど、異なる責任のものを同列に比較して片方を下げるのは誠実じゃない態度にみえるのでやめたほうがいいと思う。
そのうえで、元記事と同じことをやってみる。
軽量プロセスをスポーンする
Section titled “軽量プロセスをスポーンする”元コード。
supervised { fork { println("I am forked...") }}Go
go func() { fmt.Println("I am forked...")}()次、特定の処理が終わるまで待つ。
supervised { fork { scheduling.repeat(Schedule.fixedInterval(100.milliseconds)) { println("repeat 1") } } fork { sleep(50.milliseconds) scheduling.repeat(Schedule.fixedInterval(100.milliseconds)) { println("repeat 2") } } forkUser { sleep(2.seconds) println("timeout") }}Goの標準ライブラリだけでもこのくらいは書ける。
func main() { ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup wg.Go(func() { repeat(ctx, 100*time.Millisecond, func() { fmt.Println("repeat 1") }) }) wg.Go(func() { time.Sleep(50 * time.Millisecond) repeat(ctx, 100*time.Millisecond, func() { fmt.Println("repeat 2") }) }) time.Sleep(2 * time.Second) fmt.Println("timeout") cancel() wg.Wait()}
func repeat(ctx context.Context, interval time.Duration, fn func()) { tick := time.NewTicker(interval) defer tick.Stop() for { select { case <-ctx.Done(): return case <-tick.C: fn() } }}しかしイディオムとしてはこちらの方がよく見る。
func main() { ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) t1 := time.NewTicker(100 * time.Millisecond) var t2 *time.Ticker go func() { // 50ミリ秒待ってから動作させるのが地味に面倒 time.Sleep(50 * time.Millisecond) t2 = time.NewTicker(100 * time.Millisecond) }() for { select { case <-ctx.Done(): fmt.Println("timeout") t1.Stop() t2.Stop() return case <-tickC(t1): fmt.Println("repeat 1") case <-tickC(t2): fmt.Println("repeat 2") } }}
func tickC(t *time.Ticker) <-chan time.Time { if t == nil { return nil } return t.C}処理を並行実行する
Section titled “処理を並行実行する”元コード。
par(func1(), func2())Goは可変長の型パラメータを持っていないのでこれはうまく書けない。
var ( v1 string v2 int wg sync.WaitGroup)wg.Go(func() { v1 = func1()})wg.Go(func() { v2 = func2()})wg.Wait()処理を先勝ちで並行実行する
Section titled “処理を先勝ちで並行実行する”元コード
def f1: String = { val howLong = math.random() * 1000 sleep(howLong.milliseconds) "f1 won!"}def f2: String = { val howLong = math.random() * 1000 sleep(howLong.milliseconds) "f2 won!"}
println(raceSuccess(f1, f2)) // => f2 won!これも par と同じだけど、返す値の型が同じでいいなら
c := make(chan string, 2)go func() { howLong := rand.N[time.Duration](1000) time.Sleep(howLong * time.Millisecond) c <- "f1 won"}()go func() { howLong := rand.N[time.Duration](1000) time.Sleep(howLong * time.Millisecond) c <- "f2 won"}()fmt.Println(<-c)処理をタイムアウトさせる
Section titled “処理をタイムアウトさせる”これは context.Context で一瞬なので省略。
中断しても必ず処理を実行する
Section titled “中断しても必ず処理を実行する”これも単に何もしなければ動き続けるので省略。
リソースを必ず解放したい
Section titled “リソースを必ず解放したい”これも defer でいいので省略。
コレクションに対する並行処理
Section titled “コレクションに対する並行処理”元コード
val xs = Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)println(xs.mapPar(4)(_ * 2))// => Vector(0, 2, 4, 6, 8, 10, 12, 14, 16, 18)これも特筆するべきものはないけどiterする。
func main() { xs := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} fmt.Println(slices.Collect(mapPar(4, xs, func(n int) int { return n * 2 })))}
func mapPar(n int, a []int, fn func(int) int) iter.Seq[int] { return func(yield func(int) bool) { var g errgroup.Group g.SetLimit(n) for _, v := range a { g.Go(func() error { yield(fn(v)) // 本当は戻り値をみるべき return nil }) } g.Wait() }}スケジューリング
Section titled “スケジューリング”Goでは選ぶものがないため省略。
ox.scheduling.repeat
Section titled “ox.scheduling.repeat”time.Ticker
リトライ処理を行う
Section titled “リトライ処理を行う”元コード
def failableFunc() = ??? // たまに失敗してしまう
retry( // Exponential Backoff を10msから開始し、最大幅は1秒にし、ジッタを設定する Schedule .exponentialBackoff(10.milliseconds) .maxInterval(1.seconds) .jitter(Jitter.Equal))(failableFunc())指数バックオフのライブラリは無数に存在しているので好みのライブラリを選べばいい。
w := backoff.Backoff{ Initial: 10 * time.Millisecond, Peak: 1 * time.Second,}for { if n, err := failableFunc(); err == nil { return n } if err := w.Wait(context.Background()); err != nil { log.Fatalln(err) }}レートリミットを行う
Section titled “レートリミットを行う”元コード
supervised { val rateLimiter = RateLimiter.fixedWindowWithStartTime(2, 1.second) def callApi(): String = "200" println(rateLimiter.runBlocking(callApi())) println(rateLimiter.runBlocking(callApi())) println(rateLimiter.runBlocking(callApi())) println(rateLimiter.runBlocking(callApi())) println(rateLimiter.runBlocking(callApi())) println(rateLimiter.runBlocking(callApi()))}Go
func main() { const N = 2 n := rate.Every(time.Second / N) l := rate.NewLimiter(n, N) callApi := func() string { return "200" } runBlocking(l, callApi) runBlocking(l, callApi) runBlocking(l, callApi) runBlocking(l, callApi) runBlocking(l, callApi) runBlocking(l, callApi)}
func runBlocking(l *rate.Limiter, fn func() string) { l.Wait(context.Background()) fmt.Println(fn())}チャネルは言語組み込みで存在しているので省略。