并发 Go 程 Go 程(goroutine)是由 Go 运行时管理的轻量级线程。
会启动一个新的 Go 程并执行
f
, x
, y
和 z
的求值发生在当前的 Go 程中,而 f
的执行发生在新的 Go 程中。
Go 程在相同的地址空间中运行,因此在访问共享的内存时必须进行同步。sync
包提供了这种能力,不过在 Go 中并不经常用到,因为还有其它的办法(见下一节)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package mainimport ( "fmt" "time" ) func say (s string ) { for i := 0 ; i < 5 ; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) } } func main () { go say("world" ) say("hello" ) }
输出:
1 2 3 4 5 6 7 8 9 hello world hello world world hello hello world hello
信道 信道是带有类型的管道,你可以通过它用信道操作符 <-
来发送或者接收值。
1 2 ch <- v // 将 v 发送至信道 ch。 v := <-ch // 从 ch 接收值并赋予 v。
(“箭头”就是数据流的方向。)
和映射与切片一样,信道在使用前必须创建:
默认情况下,发送和接收操作在另一端准备好之前都会阻塞。这使得 Go 程可以在没有显式的锁或竞态变量的情况下进行同步。
以下示例对切片中的数进行求和,将任务分配给两个 Go 程。一旦两个 Go 程完成了它们的计算,它就能算出最终的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package mainimport "fmt" func sum (s []int , c chan int ) { sum := 0 for _, v := range s { sum += v } c <- sum } func main () { s := []int {7 , 2 , 8 , -9 , 4 , 0 } c := make (chan int ) go sum(s[:len (s)/2 ], c) go sum(s[len (s)/2 :], c) x, y := <-c, <-c fmt.Println(x, y, x+y) }
带缓冲的信道 信道可以是 带缓冲的 。将缓冲长度作为第二个参数提供给 make
来初始化一个带缓冲的信道:
1 ch := make(chan int, 100)
仅当信道的缓冲区填满后,向其发送数据时才会阻塞。当缓冲区为空时,接受方会阻塞。
修改示例填满缓冲区,然后看看会发生什么。
1 2 3 4 5 6 7 8 9 10 11 package mainimport "fmt" func main () { ch := make (chan int , 2 ) ch <- 1 ch <- 2 fmt.Println(<-ch) fmt.Println(<-ch) }
紧跟 ch <- 2
加一行 ch <- 3
会抛出 fatal error: all goroutines are asleep - deadlock!
。
range 和 close 发送者可通过 close
关闭一个信道来表示没有需要发送的值了。接收者可以通过为接收表达式分配第二个参数来测试信道是否被关闭:若没有值可以接收且信道已被关闭,那么在执行完
之后 ok
会被设置为 false
。
循环 for i := range c
会不断从信道接收值,直到它被关闭。如果一直不关闭信道,for range 这里会出现 fatal error: all goroutines are asleep - deadlock!
。
注意 : 只有发送者才能关闭信道,而接收者不能。向一个已经关闭的信道发送数据会引发程序恐慌(panic)。
还要注意 : 信道与文件不同,通常情况下无需关闭它们。只有在必须告诉接收者不再有需要发送的值时才有必要关闭,例如终止一个 range
循环。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package mainimport ( "fmt" ) func fibonacci (n int , c chan int ) { x, y := 0 , 1 for i := 0 ; i < n; i++ { c <- x x, y = y, x+y } close (c) } func main () { c := make (chan int , 10 ) go fibonacci(cap (c), c) for i := range c { fmt.Println(i) } }
输出:
select 语句 select
语句使一个 Go 程可以等待多个通信操作。
select
会阻塞到某个分支可以继续执行为止,这时就会执行该分支。当多个分支都准备好时会随机选择一个执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package mainimport "fmt" func fibonacci (c, quit chan int ) { x, y := 0 , 1 for { select { case c <- x: x, y = y, x+y case <-quit: fmt.Println("quit" ) return } } } func main () { c := make (chan int ) quit := make (chan int ) go func () { for i := 0 ; i < 10 ; i++ { fmt.Println(<-c) } quit <- 0 }() fibonacci(c, quit) }
输出:
1 2 3 4 5 6 7 8 9 10 11 0 1 1 2 3 5 8 13 21 34 quit
默认选择 当 select
中的其它分支都没有准备好时,default
分支就会执行。
为了在尝试发送或者接收时不发生阻塞,可使用 default
分支:
1 2 3 4 5 6 select {case i := <-c: default : }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package mainimport ( "fmt" "time" ) func main () { tick := time.Tick(100 * time.Millisecond) boom := time.After(500 * time.Millisecond) for { select { case <-tick: fmt.Println("tick." ) case <-boom: fmt.Println("BOOM!" ) return default : fmt.Println(" ." ) time.Sleep(50 * time.Millisecond) } } }
输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 . . tick. . . tick. . . tick. . . tick. . . tick. BOOM!
练习:等价二叉查找树 不同二叉树的叶节点上可以保存相同的值序列。例如,以下两个二叉树都保存了序列 1,1,2,3,5,8,13
。
在大多数语言中,检查两个二叉树是否保存了相同序列的函数都相当复杂。 我们将使用 Go 的并发和信道来编写一个简单的解法。
本例使用了 tree
包,它定义了类型:
1 2 3 4 5 type Tree struct { Left *Tree Value int Right *Tree }
TODO:
1. 实现 Walk
函数。
2. 测试 Walk
函数。
函数 tree.New(k)
用于构造一个随机结构的已排序二叉查找树,它保存了值 k
, 2k
, 3k
, …, 10k
。
创建一个新的信道 ch
并且对其进行步进:
1 go Walk(tree.New(1), ch)
然后从信道中读取并打印 10 个值。应当是数字 1, 2, 3, ..., 10
。
3. 用 Walk
实现 Same
函数来检测 t1
和 t2
是否存储了相同的值。
4. 测试 Same
函数。
Same(tree.New(1), tree.New(1))
应当返回 true
,而 Same(tree.New(1), tree.New(2))
应当返回 false
。
Tree
的文档可在这里 找到。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package mainimport ( "./tree" "fmt" ) func Walk (t *tree.Tree, ch chan int ) { if t != nil { Walk(t.Left, ch) ch <- t.Value Walk(t.Right, ch) } } func Same (t1, t2 *tree.Tree) bool { c1 := make (chan int , 10 ) c2 := make (chan int , 10 ) go Walk(t1, c1) go Walk(t2, c2) for i := 0 ; i < 10 ; i++ { if <-c1 != <-c2 { return false } } return true } func main () { testWalk() testSame() } func testWalk () { c := make (chan int , 10 ) go Walk(tree.New(1 ), c) for i := 0 ; i < cap (c); i++ { fmt.Println(<-c) } } func testSame () { fmt.Println(Same(tree.New(1 ), tree.New(1 ))) fmt.Println(Same(tree.New(1 ), tree.New(2 ))) }
输出:
1 2 3 4 5 6 7 8 9 10 11 12 1 2 3 4 5 6 7 8 9 10 true false
P.S. 把树的节点数增加到10000000,在同一机器上,执行同样的testSame函数,可以看到使用 go程,比不使用要快上一些(我的不使用go程的实现是把信道改为用*[]int
传递数据):
使用Go程:
1 2 3 real 1m4.306s user 1m20.332s sys 0m1.277s
不使用:
1 2 3 real 1m7.141s user 1m24.107s sys 0m1.851s
sync.Mutex 我们已经看到信道非常适合在各个 Go 程间进行通信。
但是如果我们并不需要通信呢?比如说,若我们只是想保证每次只有一个 Go 程能够访问一个共享的变量,从而避免冲突?
这里涉及的概念叫做 互斥 (mutual exclusion),我们通常使用 互斥锁 (Mutex)这一数据结构来提供这种机制。
Go 标准库中提供了 sync.Mutex
互斥锁类型及其两个方法:
我们可以通过在代码前调用 Lock
方法,在代码后调用 Unlock
方法来保证一段代码的互斥执行。参见下例 Inc
方法。
我们也可以用 defer
语句来保证互斥锁一定会被解锁。参见下例 Value
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package mainimport ( "fmt" "sync" "time" ) type SafeCounter struct { v map [string ]int mux sync.Mutex } func (c *SafeCounter) Inc (key string ) { c.mux.Lock() c.v[key]++ c.mux.Unlock() } func (c *SafeCounter) Value (key string ) int { c.mux.Lock() defer c.mux.Unlock() return c.v[key] } func main () { c := SafeCounter{v: make (map [string ]int )} for i := 0 ; i < 1000 ; i++ { go c.Inc("somekey" ) } time.Sleep(time.Second) fmt.Println(c.Value("somekey" )) }
输出:
练习:Web 爬虫 在这个练习中,我们将会使用 Go 的并发特性来并行化一个 Web 爬虫。
修改 Crawl
函数来并行地抓取 URL,并且保证不重复。
提示 :你可以用一个 map 来缓存已经获取的 URL,但是要注意 map 本身并不是并发安全的!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 package mainimport ( "fmt" "sync" "time" ) type Fetcher interface { Fetch(url string ) (body string , urls []string , err error) } type SafeCrawler struct { fetched map [string ]bool mux sync.Mutex } func (c *SafeCrawler) Crawl (url string , depth int , fetcher Fetcher) { c.mux.Lock() defer c.mux.Unlock() if depth <= 0 { return } if c.fetched[url] { return } c.fetched[url] = true body, urls, err := fetcher.Fetch(url) if err != nil { fmt.Println(err) return } fmt.Printf("found: %s %q\n" , url, body) for _, u := range urls { if !c.fetched[u] { go c.Crawl(u, depth-1 , fetcher) } } return } func main () { c := SafeCrawler{fetched: make (map [string ]bool )} c.Crawl("https://golang.org/" , 4 , fetcher) time.Sleep(time.Second) } type fakeFetcher map [string ]*fakeResulttype fakeResult struct { body string urls []string } func (f fakeFetcher) Fetch (url string ) (string , []string , error) { if res, ok := f[url]; ok { return res.body, res.urls, nil } return "" , nil , fmt.Errorf("not found: %s" , url) } var fetcher = fakeFetcher{ "https://golang.org/" : &fakeResult{ "The Go Programming Language" , []string { "https://golang.org/pkg/" , "https://golang.org/cmd/" , }, }, "https://golang.org/pkg/" : &fakeResult{ "Packages" , []string { "https://golang.org/" , "https://golang.org/cmd/" , "https://golang.org/pkg/fmt/" , "https://golang.org/pkg/os/" , }, }, "https://golang.org/pkg/fmt/" : &fakeResult{ "Package fmt" , []string { "https://golang.org/" , "https://golang.org/pkg/" , }, }, "https://golang.org/pkg/os/" : &fakeResult{ "Package os" , []string { "https://golang.org/" , "https://golang.org/pkg/" , }, }, }
输出:
1 2 3 4 5 found: https://golang.org/ "The Go Programming Language" not found: https://golang.org/cmd/ found: https://golang.org/pkg/ "Packages" found: https://golang.org/pkg/os/ "Package os" found: https://golang.org/pkg/fmt/ "Package fmt"