Skip to content

Goの並行モデルは見た目ほど貧弱ではない

これを読んだ。

言語仕様や標準ライブラリと、サードパーティライブラリを比べているけれど、異なる責任のものを同列に比較して片方を下げるのは誠実じゃない態度にみえるのでやめたほうがいいと思う。

そのうえで、元記事と同じことをやってみる。

元コード。

supervised {
fork {
println("I am forked...")
}
}

Go

go func() {
fmt.Println("I am forked...")
}()

次、特定の処理が終わるまで待つ。

supervised {
fork {
scheduling.repeat(Schedule.fixedInterval(100.milliseconds)) {
println("repeat 1")
}
}
fork {
sleep(50.milliseconds)
scheduling.repeat(Schedule.fixedInterval(100.milliseconds)) {
println("repeat 2")
}
}
forkUser {
sleep(2.seconds)
println("timeout")
}
}

Goの標準ライブラリだけでもこのくらいは書ける。

func main() {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Go(func() {
repeat(ctx, 100*time.Millisecond, func() {
fmt.Println("repeat 1")
})
})
wg.Go(func() {
time.Sleep(50 * time.Millisecond)
repeat(ctx, 100*time.Millisecond, func() {
fmt.Println("repeat 2")
})
})
time.Sleep(2 * time.Second)
fmt.Println("timeout")
cancel()
wg.Wait()
}
func repeat(ctx context.Context, interval time.Duration, fn func()) {
tick := time.NewTicker(interval)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
fn()
}
}
}

しかしイディオムとしてはこちらの方がよく見る。

func main() {
ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
t1 := time.NewTicker(100 * time.Millisecond)
var t2 *time.Ticker
go func() {
// 50ミリ秒待ってから動作させるのが地味に面倒
time.Sleep(50 * time.Millisecond)
t2 = time.NewTicker(100 * time.Millisecond)
}()
for {
select {
case <-ctx.Done():
fmt.Println("timeout")
t1.Stop()
t2.Stop()
return
case <-tickC(t1):
fmt.Println("repeat 1")
case <-tickC(t2):
fmt.Println("repeat 2")
}
}
}
func tickC(t *time.Ticker) <-chan time.Time {
if t == nil {
return nil
}
return t.C
}

元コード。

par(func1(), func2())

Goは可変長の型パラメータを持っていないのでこれはうまく書けない。

var (
v1 string
v2 int
wg sync.WaitGroup
)
wg.Go(func() {
v1 = func1()
})
wg.Go(func() {
v2 = func2()
})
wg.Wait()

元コード

def f1: String = {
val howLong = math.random() * 1000
sleep(howLong.milliseconds)
"f1 won!"
}
def f2: String = {
val howLong = math.random() * 1000
sleep(howLong.milliseconds)
"f2 won!"
}
println(raceSuccess(f1, f2)) // => f2 won!

これも par と同じだけど、返す値の型が同じでいいなら

c := make(chan string, 2)
go func() {
howLong := rand.N[time.Duration](1000)
time.Sleep(howLong * time.Millisecond)
c <- "f1 won"
}()
go func() {
howLong := rand.N[time.Duration](1000)
time.Sleep(howLong * time.Millisecond)
c <- "f2 won"
}()
fmt.Println(<-c)

これは context.Context で一瞬なので省略。

中断しても必ず処理を実行する

Section titled “中断しても必ず処理を実行する”

これも単に何もしなければ動き続けるので省略。

これも defer でいいので省略。

コレクションに対する並行処理

Section titled “コレクションに対する並行処理”

元コード

val xs = Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
println(xs.mapPar(4)(_ * 2))
// => Vector(0, 2, 4, 6, 8, 10, 12, 14, 16, 18)

これも特筆するべきものはないけどiterする。

func main() {
xs := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
fmt.Println(slices.Collect(mapPar(4, xs, func(n int) int {
return n * 2
})))
}
func mapPar(n int, a []int, fn func(int) int) iter.Seq[int] {
return func(yield func(int) bool) {
var g errgroup.Group
g.SetLimit(n)
for _, v := range a {
g.Go(func() error {
yield(fn(v)) // 本当は戻り値をみるべき
return nil
})
}
g.Wait()
}
}

Goでは選ぶものがないため省略。

time.Ticker

元コード

def failableFunc() = ??? // たまに失敗してしまう
retry(
// Exponential Backoff を10msから開始し、最大幅は1秒にし、ジッタを設定する
Schedule
.exponentialBackoff(10.milliseconds)
.maxInterval(1.seconds)
.jitter(Jitter.Equal)
)(failableFunc())

指数バックオフのライブラリは無数に存在しているので好みのライブラリを選べばいい。

w := backoff.Backoff{
Initial: 10 * time.Millisecond,
Peak: 1 * time.Second,
}
for {
if n, err := failableFunc(); err == nil {
return n
}
if err := w.Wait(context.Background()); err != nil {
log.Fatalln(err)
}
}

元コード

supervised {
val rateLimiter = RateLimiter.fixedWindowWithStartTime(2, 1.second)
def callApi(): String = "200"
println(rateLimiter.runBlocking(callApi()))
println(rateLimiter.runBlocking(callApi()))
println(rateLimiter.runBlocking(callApi()))
println(rateLimiter.runBlocking(callApi()))
println(rateLimiter.runBlocking(callApi()))
println(rateLimiter.runBlocking(callApi()))
}

Go

func main() {
const N = 2
n := rate.Every(time.Second / N)
l := rate.NewLimiter(n, N)
callApi := func() string { return "200" }
runBlocking(l, callApi)
runBlocking(l, callApi)
runBlocking(l, callApi)
runBlocking(l, callApi)
runBlocking(l, callApi)
runBlocking(l, callApi)
}
func runBlocking(l *rate.Limiter, fn func() string) {
l.Wait(context.Background())
fmt.Println(fn())
}

チャネルは言語組み込みで存在しているので省略。