一般社団法人 全国個人事業主支援協会

COLUMN コラム

  • Go言語の並行処理パターン:goroutineとchannelを使いこなす

Go言語における並行処理の哲学

Go言語の設計思想の中でも、並行処理のモデルは特に優れています。「共有メモリで通信するのではなく、通信によってメモリを共有せよ」というGoの格言は、goroutineとchannelを使った並行処理パターンの本質を端的に表しています。筆者はGoで複数のマイクロサービスやCLIツールを開発してきましたが、Goの並行処理モデルに慣れると、他の言語でスレッドやロックを扱うのがとても煩雑に感じるようになります。本記事では、実務で頻出するgoroutineとchannelのパターンを、実装例とともに解説します。

goroutineの基礎

goroutineは、Goのランタイムが管理する軽量スレッドです。OSスレッドと比較して、初期スタックサイズがわずか数KBと小さく、数十万のgoroutineを同時に実行することも現実的です。

package main

import (
"fmt"
"sync"
)

func main() {
var wg sync.WaitGroup

for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("goroutine %d が実行中\n", id)
}(i)
}

wg.Wait()
fmt.Println("全てのgoroutineが完了")
}

sync.WaitGroupはgoroutineの完了を待つ最も基本的な同期プリミティブです。Addでカウンタを増やし、Doneで減らし、Waitでカウンタがゼロになるまでブロックします。注意すべきは、ループ変数iをgoroutineに引数として渡しています点です。クロージャで直接参照するとレースコンディションの原因になります。

channelの基礎と使い分け

channelはgoroutine間でデータを安全に受け渡すための仕組みです。バッファなしchannelとバッファ付きchannelの2種類があり、用途に応じて使い分ける。

// バッファなしchannel:送信と受信が同期する
ch := make(chan int)

// バッファ付きchannel:バッファが満杯になるまで送信がブロックされない
buffered := make(chan int, 10)

バッファなしchannelは、送信側と受信側が同時にchannelに到達するまでブロックします。これにより、goroutine間の同期ポイントとして機能します。バッファ付きchannelは、生産者と消費者の速度差を吸収するバッファとして働く。

パターン1:Fan-Out / Fan-In

複数のgoroutineに処理を分散(Fan-Out)し、結果を1つのchannelに集約(Fan-In)するパターンです。大量のデータを並列処理する場面で頻繁に使う。

package main

import (
"fmt"
"sync"
)

func producer(items []int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, item := range items {
out <- item
}
}()
return out
}

func worker(id int, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
result := n * n // 何らかの処理
fmt.Printf("worker %d: %d -> %d\n", id, n, result)
out <- result
}
}()
return out
}

func merge(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}

func main() {
items := []int{1, 2, 3, 4, 5, 6, 7, 8}
in := producer(items)

// Fan-Out: 3つのworkerに分散
w1 := worker(1, in)
w2 := worker(2, in)
w3 := worker(3, in)

// Fan-In: 結果を集約
for result := range merge(w1, w2, w3) {
fmt.Println("結果:", result)
}
}

このパターンのポイントは、channelのcloseを確実に行うことです。rangeによるchannelの読み取りは、channelがcloseされるまで継続します。closeを忘れるとgoroutineがリークする原因になります。

パターン2:selectによる多重化

select文は、複数のchannel操作を同時に待ち受けるための構文です。タイムアウトやキャンセル処理と組み合わせることで、堅牢な並行処理を実現できます。

package main

import (
"context"
"fmt"
"time"
)

func fetchData(ctx context.Context, url string) (string, error) {
resultCh := make(chan string, 1)
errCh := make(chan error, 1)

go func() {
// 模擬的な外部API呼び出し
time.Sleep(2 * time.Second)
resultCh <- fmt.Sprintf("%s のデータ", url)
}()

select {
case result := <-resultCh:
return result, nil
case err := <-errCh:
return "", err
case <-ctx.Done():
return "", ctx.Err()
}
}

func main() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

result, err := fetchData(ctx, "https://api.example.com")
if err != nil {
fmt.Println("エラー:", err)
return
}
fmt.Println("取得:", result)
}

context.Contextとの組み合わせは、Goの並行処理において最も重要なパターンの一つです。タイムアウト、キャンセル、デッドラインを統一的に扱えるため、HTTPサーバーのリクエストハンドラからバッチ処理まで幅広く活用します。

パターン3:セマフォによる並行数制御

バッファ付きchannelをセマフォとして使い、同時実行数を制限するパターンです。外部APIへのレート制限やリソース保護に有効です。

package main

import (
"fmt"
"sync"
"time"
)

func main() {
sem := make(chan struct{}, 3) // 最大3並行
var wg sync.WaitGroup

tasks := []string{"A", "B", "C", "D", "E", "F", "G", "H"}

for _, task := range tasks {
wg.Add(1)
go func(t string) {
defer wg.Done()
sem <- struct{}{} // セマフォを取得
defer func() { <-sem }() // セマフォを解放

fmt.Printf("タスク %s を開始\n", t)
time.Sleep(1 * time.Second) // 処理を模擬
fmt.Printf("タスク %s を完了\n", t)
}(task)
}

wg.Wait()
fmt.Println("全タスク完了")
}

空の構造体struct{}{}をchannelに送信していますのは、メモリを一切消費しないためです。セマフォとしての用途ではデータの中身は不要なので、この書き方がGoのイディオムとなっています。

パターン4:パイプライン

複数の処理ステージをchannelで繋ぎ、データをパイプライン的に流すパターンです。各ステージが独立したgoroutineで動作するため、ステージ間の並行処理が自然に実現されます。

func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}

func filter(in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if predicate(n) {
out <- n
}
}
}()
return out
}

func transform(in <-chan int, fn func(int) int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- fn(n)
}
}()
return out
}

このパイプラインは、UNIXのパイプと同じ発想です。各ステージは入力channelから読み、処理結果を出力channelに書く。ステージの追加や入れ替えが容易で、テストも書きやすい。

goroutineリークを防ぐ

goroutineリークは、Goの並行処理で最も注意すべき問題です。channelへの送信や受信でブロックされたまま解放されないgoroutineが蓄積すると、メモリリークの原因になります。防止策として以下を徹底します。

  • contextによるキャンセル伝播:すべての長時間実行goroutineにcontextを渡し、キャンセル時に終了させる
  • channelのclose:送信側は処理完了時に必ずchannelをcloseします
  • selectでのデフォルトケース:ブロックを避けたい場合にdefaultケースを使う
  • runtime.NumGoroutine()での監視:テストやメトリクスでgoroutine数を監視します

まとめ

Goの並行処理は、goroutineとchannelというシンプルなプリミティブの組み合わせで、複雑な並行処理パターンを表現できる点が魅力です。Fan-Out/Fan-In、selectによる多重化、セマフォ、パイプラインといったパターンを理解しておけば、実務で遭遇するほとんどの並行処理の要件に対応できます。重要なのは、goroutineのライフサイクルを常に意識し、リークを防ぐことです。channelのcloseとcontextの伝播を習慣づけることが、Goで安全な並行処理を書くための鍵となります。

この記事をシェアする

  • Twitterでシェア
  • Facebookでシェア
  • LINEでシェア