(以下はgo 1.2.x時点での話です。将来的に仕様がかわるかどうかはわかりません)

これを読んでいて、こういうの気にしてない人多いんだろうなーと思って、書いてみます。元のポストはdeferの挙動について語っているように見受けられるけれども、これは要は複数スレッドで実行されるコードについて、プログラム終了時に同期とか取りたくない、という話だと思ったので、このポストのdeferを正しく動かすには…というところからどういう形でgoroutine同士で同期を取る方法があるのか、一例を書き出していきます。

TL;DR;

goでいくらgoroutineが気軽にかけるからと言って、複数スレッドで処理が行われているので同期はキチンとやらないとダメですよ。

deferの基本

goではLLのノリでコードを書けるのが売りの一つですが、メモリ管理はしてくれるものの、様々なリソース解放も全て自動というわけではありません。リソースを解放する処理はdeferを使って解放用の関数を登録しておくのがgoでの基本的なやりかたです。

まずgoroutine無しで普通にファイル操作をする場合の事を考えてみましょう。ファイルハンドルを作り、操作をしたい場合、最後にはこのファイルハンドルを解放してやらなければいけません。goではこのような場合にはdeferを使って、関数終了時に呼び出されるコールバックを指定します。以下のような感じになります:

package main

import (
	"log"
	"os"
)

func main() {
	file, err := os.Open("foo")
	if err != nil {
		log.Fatalf("Failed to open file: %s", err)
	}
	defer file.Close()
... // fileに対してなんらかの操作 }

これでmain()終了時には確実にfile.Close()が呼ばれるので、安心ですね!逆に言えばgoではメモリ以外の外部リソースは自前で管理する必要がある、という認識が必要です。

さて、そこでgoroutineが絡んでくると、deferの挙動についてはもう少し気をつけないといけません。まず以下のてきとーなプログラムを見てください:

package main

import (
        "log"
)

func main() {
	started := make(chan struct{})
	stop := make(chan struct{}) 
 
 	go func() {
		started <-struct{}{} // tell main thread we started 
		log.Println("Goroutine starts")
		defer func() {
		        log.Println("Defered func called!")
		}()

		<-stop // wait for main thread to tell us we should exit
	}()

	<-started  // wait for the goroutine to start
	log.Println("Main thread exits")
}
goroutineはstopに知らせを受け取るまでブロックするようになっています。deferも設定されていますが、メインスレッドは`stop`にお知らせを発行する事なく、main()から抜けて終了してしまいます。このプログラムを走らせると以下のような出力が得られます

Goroutine starts
Main thread exits

ぱっと見たところ、`go func()`部分で指定したdeferがプログラム終了時に実行されそうなものですが、deferは実行されていません。これはなぜかというと、deferは宣言された関数終了時に呼び出されるからです。先にメインスレッドが終了してしまったため、<-doneでブロック状態になっていた`go func()` 部分の関数が終了することがなくなってしまったのです。

ということは、当然このdeferがもっと複雑なリソース解放をしないといけない場合もこのやり方ではリソース解放がされない事になります。

これからわかる事は、いくらgoがgoroutineを色々気軽に扱えるようにしてくれていても、やはりリソース解放等を含めたgoroutineの状態同期はプログラマー側でキチンと考えないといけない、ということです。必要そうなところではちゃんとgoroutineにクリーンな形で終了をさせ(ブロックしたままの形で終わらせず、必ず関数からreturnして終わるようにする)、なおかつマルチプロセス/スレッドモデルのように wait(pid)ないしthread.join() 相当の事をする必要があります。

それでは同期してみましょう

基本的な実装パターンはいくつかあります。まず、先ほどの簡単なケースの場合は単純にstopに何かを送るという事で動きますが、本当に本当にgoroutineが終わってdeferが動いた事を条件としたいならもう一つchannelがあるととりあえず期待する動作になります:

package main

import (
        "log"
)

func main() {
	started := make(chan struct{})
	stop := make(chan struct{}) 
	done := make(chan struct{})
 
 	go func() {
		started <-struct{}{} // tell main thread we started 
		log.Println("Goroutine starts")

		defer func() {
		        log.Println("Defered func called!")
			done<-struct{}{} // tell main thread we're done 
		}()

		<-stop // wait for main thread to tell us we should exit
	}()

	<-started  // wait for the goroutine to start
	stop<-struct{}{} // tell goroutine to stop
	<-done // wait for goroutine to be really done 

	log.Println("Main thread exits")
}
ですがこれが複数のgoroutineになってくると、チャンネルにメッセージを送るのはちょっと面倒くさくなります。なぜなら、何個のgoroutineを作ったのかを管理しなくてはならなくなるからです。

とは言え、まずは固定数のgoroutineを作成して、その分のメッセージを待つ単純なモデルを考えてみましょう:

package main

import (
        "log"
)

// naive synchronization
func main() {
	const maxworkers int = 10

	started := make(chan struct{})
	stop := make(chan struct{}) 
	done := make(chan struct{})
 
	for i := 0; i < maxworkers; i++ {
		id := i // localize to be used in closure 
	 	go func() {
			started <-struct{}{} // tell main thread we started 
			log.Printf("Goroutine %d starts", id)

			defer func() {
		        	log.Printf("Defered func for %d called!", id)
				done<-struct{}{} // tell main thread we're done 
			}()

			<-stop // wait for main thread to tell us we should exit
		}()
	}

	for i := 0; i < maxworkers; i++ {
		<-started  // wait for the goroutine to start
	}

	// ... なにか処理...

	for i := 0; i < maxworkers; i++ {
		stop<-struct{}{} // tell goroutine to stop
	}

	for i :=0; i < maxworkers; i++ {
		<-done // wait for goroutine to be really done 
	} 

	log.Println("Main thread exits")
}
かー、面倒くさい!これだと常に何個ワーカーをspawnするのかとか考えていないといけないです。面倒くさいですね。

close()をブロードキャストとして使う

まず簡単なところからリファクタリングしてみましょう。ロックとしてチャンネルを使っている時にひとつのスレッドから特定のチャンネルで待機状態になっているスレッドに待機をやめるようにするのは実は簡単な方法があります:チャンネルをclose()すればいいのです。値が何か関係なく、とにかく「もうこのチャンネルの役目は終了ですよー」とお知らせする際にはこれが一番簡単です。ということで、stopをこの形で実装しなおすと:

package main

import (
        "log"
)

func main() {
	const maxworkers int = 10

	started := make(chan struct{})
	stop := make(chan struct{}) 
	done := make(chan struct{})
 
	for i := 0; i <  maxworkers; i++ {
		id := i // localize to be used in closure 

	 	go func() {
			started <-struct{}{} // tell main thread we started 
			log.Printf("Goroutine %d starts", id)

			defer func() {
		        	log.Printf("Defered func for %d called!", id)
				done<-struct{}{} // tell main thread we're done 
			}()

			<-stop // wait for main thread to tell us we should exit
		}()
	}

	for i := 0; i < maxworkers; i++ {
		<-started  // wait for the goroutine to start
	}

	// ... なにか処理...

	close(stop)

	for i :=0; i < maxworkers; i++ {
		<-done // wait for goroutine to be really done 
	} 

	log.Println("Main thread exits")
}
ちょっとだけ楽になりました!stopに関してはチャンネルをclose()することによって、全てのgoroutineで「あ、stopからはもう読み込めないのね。ブロック終了〜」という動作が期待できるので、終了をブロードキャストしたのと同じ効果が得られのです。

sync.WaitGroupでgoroutine達を待つ

でもまだ面倒くさいです。次にstartedとdoneですが、これらはN個の通知をひとつのスレッドが待っている(そして、特にデータのやりとりも必要ない)ので、sync.WaitGroupを使いましょう。

package main

import (
        "log"
	"sync" 
)

func main() {
	const maxworkers int = 10

	started := &sync.WaitGroup {}
	done := &sync.WaitGroup {}
	stop := make(chan struct{}) 
 
	for i := 0; i <  maxworkers; i++ {
		id := i // localize to be used in closure 
		started.Add(1)
		done.Add(1) 

	 	go func() {
			started.Done() // tell main thread we started 
			log.Printf("Goroutine %d starts", id)

			defer func() {
		        	log.Printf("Defered func for %d called!", id)
				done.Done() // tell main thread we're done 
			}()

			<-stop  // wait for main thread to tell us we should exit
		}()
	}

	started.Wait() // wait for the goroutine to start

	// ... なにか処理...

	close(stop)

	done.Wait() // wait for goroutine to be really done 

	log.Println("Main thread exits")
}

おお!これでmaxworkersが何個になろうとも(そしてそれが事前に知られてなくても)メインスレッドはWait()の結果を待つだけで良い事になりました!ちなみにPerlのAnyEventを知っている人にはsync.WaitGroupはAE::cv->begin / AE::cv->end / AE::cv->recvの使い方と全く一緒です。

同期しつつ、goroutineの戻り値も取る

最後にもしこのgoroutine達の結果を取得したい場合のケースを考えてみましょう。closureをgo関数に渡しているので、結果を共用のリストとかに入れておけばいいと思ったあなた、goの精神を理解していません。もう一度"Share Memory by Communicating"と"Codewalk: Share Memory By Communicating"を読んで出直してきてください。

以下は一例です。自分はメインスレッドが複数goroutineの結果を集めて利用する場合は別途、チャンネルのクローズ等の終了条件を調整するgoroutineを作ってやって、あとはチャンネルからさらっとデータを集めてしまうのが楽な気がします:

package main

import (
        "log"
        "sync"
)

func main() {
	const maxworkers int = 10

	started := &sync.WaitGroup{}
	done := &sync.WaitGroup{}
	results := make(chan int)
 	stop := make(chan struct{})

	for i := 0; i < maxworkers; i++ {
		id := i // localize to be used in closure 
		started.Add(1)
		done.Add(1)

		go func() {
			started.Done()

			log.Printf("Goroutine %d starts", id)

      			defer func() {
	      			log.Printf("Defered func for %d called!", id)
	      			done.Done()
      			}()

		      	<-stop // wait for main thread to tell us we should exit
      			log.Printf("Goroutine %d received stop", id)

      			results<-id // pass our result
    		}()
 	}

 	started.Wait()
  	log.Println("All goroutines started")

  	go func() { // Closer. Waits for all goroutines to stop, and then closes the results channel
    		done.Wait()
	    	log.Printf("Closing results")
    		close(results)
	}()

	close(stop)

	ids := []int {}
	for v := range results {
		ids = append(ids, v)
	} 

	log.Printf("Got %#q", ids)
	log.Println("Main thread exits")
}

チャンネルは「何かが起こった」という事を通知するには便利なツールなのですが、それぞれがスタンドアローンなgoroutine達全部が一定の状態になった、という通知をするのにはあまり向いていません。例えば上記のコードでは

for v := range results { // results = chan int
     ids = append(ids, v)
}
の部分でresultsから通知が来るまでブロックする状態を解除するにはresultsをclose()する必要があるのですが、goroutine達には自分が最後のワーカーなのかどうか知るすべはありません。なのでgoroutine達がresultsをclose()することはできません。

そこで、上記例ではdoneというsync.WaitGroupを待つためだけのgoroutineを動かしています。このdoneを待つという仕事、一見メインスレッドでやっても良いように見えますが、メインスレッドでこれをやるとデッドロックが発生します。というのも、resultsはunbuffered channelなのでresultsに書き込むタイミングでresultsから読み込むスレッドができるまでブロックします。しかし`for v := range ...`の部分もresultsがclose()されるまで読み込みブロック→ループという構造なので、done.Wait()を読んでブロックするタイミングがありません。

これをどうにかするために、「resultsから読み込む(メインスレッド)」というタスクと「goroutineが終わるまで待つ」というタスクを分離したわけです。これなら、メインスレッドはresultsを読み込みブロックしつつ、done.Wait()で条件が満たされたらresultsをclose()するということをデッドロックを回避しつつ正しい順番で実行できるわけです。

まとめ

というわけでdeferの話から、おまえらちゃんとgoroutineの状態を同期しろよ、というお話でした。実はブロードキャストの部分ではsync.Condを使うとかそういう方法もあるのですが、基本的にchannelでだいたい事足ります。自分はsyncパッケージでは基本的に複数のスレッドを集約するときにsync.WaitGroupを使うだけで、それ以外のものは使う必要になったことがありません。goのchannel、超便利です。

本当はgoに限らず、なんですが、特にgoに置いてはgoroutineが便利すぎるのでなんだか並列処理とか同期とか忘れがちになることが多い(し、それをあまり意識しないで書けるようにはなっている)のですが、やっぱりこういうところではちゃんとそれらを意識して書かないといけません。前にも書きましたがチャンネル等の使い方をちゃんと覚えましょう!

なお本エントリのコードは一応自分的には合ってるとは思いますが、vimで有名な人当たりに音速でツッコミを入れられるかもしれません。その際にはまた追記等する予定です。

蛇足

ちょっと脱線しますが、常々思っている事の中に「プログラムを書く事自体は全く基礎ができていない人でも実地でがんばればできるようになると思いますが、天才でない限りプログラムを書く人は少なくともロック/同期に関してはやはりキチンと基礎を勉強する必要がある」というのがあります。

もちろん実際できてる人はますが、この辺りのトピックに関しては素早く、確実に覚えるためには(特にこれから覚える若い人は)やっぱりちゃんとしたComputer Science系の授業を受けるのがよいなぁと思います。プログラミング「言語」の使い方を覚えるなんていつでもできるんで、やはりやるならそういうプログラミングの「概念」的な基礎をやることをお勧めします。

(注:筆者は決してこのあたりの授業を受けた時の成績が良かった訳ではないですが、「あのあたりはなめてかかってはいけない」という認識だけははっきり刷り込まれてその後のエンジニア人生を過ごしているので、一通り勉強をすることには意義があったと思います)