Go言語の設計思想の中でも、並行処理のモデルは特に優れています。「共有メモリで通信するのではなく、通信によってメモリを共有せよ」というGoの格言は、goroutineとchannelを使った並行処理パターンの本質を端的に表しています。筆者はGoで複数のマイクロサービスやCLIツールを開発してきましたが、Goの並行処理モデルに慣れると、他の言語でスレッドやロックを扱うのがとても煩雑に感じるようになります。本記事では、実務で頻出するgoroutineとchannelのパターンを、実装例とともに解説します。
goroutineは、Goのランタイムが管理する軽量スレッドです。OSスレッドと比較して、初期スタックサイズがわずか数KBと小さく、数十万のgoroutineを同時に実行することも現実的です。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("goroutine %d が実行中\n", id)
}(i)
}
wg.Wait()
fmt.Println("全てのgoroutineが完了")
}
sync.WaitGroupはgoroutineの完了を待つ最も基本的な同期プリミティブです。Addでカウンタを増やし、Doneで減らし、Waitでカウンタがゼロになるまでブロックします。注意すべきは、ループ変数iをgoroutineに引数として渡しています点です。クロージャで直接参照するとレースコンディションの原因になります。
channelはgoroutine間でデータを安全に受け渡すための仕組みです。バッファなしchannelとバッファ付きchannelの2種類があり、用途に応じて使い分ける。
// バッファなしchannel:送信と受信が同期する
ch := make(chan int)
// バッファ付きchannel:バッファが満杯になるまで送信がブロックされない
buffered := make(chan int, 10)
バッファなしchannelは、送信側と受信側が同時にchannelに到達するまでブロックします。これにより、goroutine間の同期ポイントとして機能します。バッファ付きchannelは、生産者と消費者の速度差を吸収するバッファとして働く。
複数のgoroutineに処理を分散(Fan-Out)し、結果を1つのchannelに集約(Fan-In)するパターンです。大量のデータを並列処理する場面で頻繁に使う。
package main
import (
"fmt"
"sync"
)
func producer(items []int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, item := range items {
out <- item
}
}()
return out
}
func worker(id int, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
result := n * n // 何らかの処理
fmt.Printf("worker %d: %d -> %d\n", id, n, result)
out <- result
}
}()
return out
}
func merge(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
items := []int{1, 2, 3, 4, 5, 6, 7, 8}
in := producer(items)
// Fan-Out: 3つのworkerに分散
w1 := worker(1, in)
w2 := worker(2, in)
w3 := worker(3, in)
// Fan-In: 結果を集約
for result := range merge(w1, w2, w3) {
fmt.Println("結果:", result)
}
}
このパターンのポイントは、channelのcloseを確実に行うことです。rangeによるchannelの読み取りは、channelがcloseされるまで継続します。closeを忘れるとgoroutineがリークする原因になります。
select文は、複数のchannel操作を同時に待ち受けるための構文です。タイムアウトやキャンセル処理と組み合わせることで、堅牢な並行処理を実現できます。
package main
import (
"context"
"fmt"
"time"
)
func fetchData(ctx context.Context, url string) (string, error) {
resultCh := make(chan string, 1)
errCh := make(chan error, 1)
go func() {
// 模擬的な外部API呼び出し
time.Sleep(2 * time.Second)
resultCh <- fmt.Sprintf("%s のデータ", url)
}()
select {
case result := <-resultCh:
return result, nil
case err := <-errCh:
return "", err
case <-ctx.Done():
return "", ctx.Err()
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
result, err := fetchData(ctx, "https://api.example.com")
if err != nil {
fmt.Println("エラー:", err)
return
}
fmt.Println("取得:", result)
}
context.Contextとの組み合わせは、Goの並行処理において最も重要なパターンの一つです。タイムアウト、キャンセル、デッドラインを統一的に扱えるため、HTTPサーバーのリクエストハンドラからバッチ処理まで幅広く活用します。
バッファ付きchannelをセマフォとして使い、同時実行数を制限するパターンです。外部APIへのレート制限やリソース保護に有効です。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
sem := make(chan struct{}, 3) // 最大3並行
var wg sync.WaitGroup
tasks := []string{"A", "B", "C", "D", "E", "F", "G", "H"}
for _, task := range tasks {
wg.Add(1)
go func(t string) {
defer wg.Done()
sem <- struct{}{} // セマフォを取得
defer func() { <-sem }() // セマフォを解放
fmt.Printf("タスク %s を開始\n", t)
time.Sleep(1 * time.Second) // 処理を模擬
fmt.Printf("タスク %s を完了\n", t)
}(task)
}
wg.Wait()
fmt.Println("全タスク完了")
}
空の構造体struct{}{}をchannelに送信していますのは、メモリを一切消費しないためです。セマフォとしての用途ではデータの中身は不要なので、この書き方がGoのイディオムとなっています。
複数の処理ステージをchannelで繋ぎ、データをパイプライン的に流すパターンです。各ステージが独立したgoroutineで動作するため、ステージ間の並行処理が自然に実現されます。
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func filter(in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if predicate(n) {
out <- n
}
}
}()
return out
}
func transform(in <-chan int, fn func(int) int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- fn(n)
}
}()
return out
}
このパイプラインは、UNIXのパイプと同じ発想です。各ステージは入力channelから読み、処理結果を出力channelに書く。ステージの追加や入れ替えが容易で、テストも書きやすい。
goroutineリークは、Goの並行処理で最も注意すべき問題です。channelへの送信や受信でブロックされたまま解放されないgoroutineが蓄積すると、メモリリークの原因になります。防止策として以下を徹底します。
Goの並行処理は、goroutineとchannelというシンプルなプリミティブの組み合わせで、複雑な並行処理パターンを表現できる点が魅力です。Fan-Out/Fan-In、selectによる多重化、セマフォ、パイプラインといったパターンを理解しておけば、実務で遭遇するほとんどの並行処理の要件に対応できます。重要なのは、goroutineのライフサイクルを常に意識し、リークを防ぐことです。channelのcloseとcontextの伝播を習慣づけることが、Goで安全な並行処理を書くための鍵となります。