Go并发 -- goroutine

goroutine

一、goroutine基础

1.1.什么是goroutine

goroutine的概念类似于线程,但 goroutine是由Go的运行时(runtime)调度和管理的,其他语言,类似java/c++需要自己维护线程池

1.2.goroutine和操作系统的线程

可增长的栈空间

栈内存区域用来保存在其他函数调用期间那些正在执行或者临时暂停的函数中的局部变量

  • 每个操作系统线程都有一个固定大小的栈内存(一般是2MB)
  • 一个goroutine刚开始的时候栈空间很小(一般是2KB),不过是可以变化的,最大可以达到1GB

调度方式不同

  • 操作系统线程是由OS内核调度的。每隔几ms,一个硬件时钟中断发到CPU,然后CPU调用 调度器 这个内核函数,暂停正在执行的线程,将其寄存器的信息保存到内存中,查看线程表并执行下一个线程,再从内存中恢复线程的注册表信息,继续执行。这样一次切换,是一个完整的上下文切换。
  • Go运行时包含一个自己的调度器。Go调度器由Go语言的结构来触发,比如一个goroutine中使用了time.sleep或者被channel阻塞或者操作互斥量的时候,调度器会把这个goroutine设置成睡眠模式,运行其他的goroutine。因为其不需要切换到内核环境,因此调用一个goroutine比调度一个线程成本低。

GOMAXPROCS

GO调度器使用 GOMAXPROCS 参数来确定需要使用多少个操作系统线程来执行Go代码。这个值默认是CPU的核数。

来看下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
package main

import "fmt"

func main() {

for {
go fmt.Print(0)
fmt.Print(1)
}
}

只使用一个线程来运行:

1
2
3
4
GOMAXPROCS=1 go run main.go

// 循环输出一大串0,一大串1
000000000000000000000011111111111111111111111111111111...

使用两个现场来运行:

1
2
3
4
GOMAXPROCS=2 go run main.go

// 0和1交替输出(数量不一定)
000000000000000000111110000111100000001111010010001111011111111100000....

对比起来,效果还是挺直观的。

二、使用goroutine

2.1.开启goroutine

默认代码都是串行执行的

1
2
3
4
5
6
7
func hello() {
fmt.Println("hello goroutine!")
}
func main() {
hello()
fmt.Println("main goroutine done!")
}

在函数前加一个go关键字,即可使用goroutine实现并发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func hello() {
fmt.Println("hello goroutine")
}
func main() {
go hello() // 加一个go关键字即可
fmt.Println("main goroutine")
}

// 以上代码不会输出hello goroutine 因为main函数结束


func main() {
go hello()
fmt.Println("main goroutine")
time.Sleep(time.Second)
}

// 简单地sleep一下,就可以输出hello函数执行的内容,但是这么做肯定是不推荐的
// 因为创建goruntine需要花费一些时间,所以会先执行main函数的Println

开启多个goruntine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
"sync"
)

var wg sync.WaitGroup

func hello(i int) {
defer wg.Done() // goroutine结束就注册-1
fmt.Println("hello goroutine!", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 启动一个goroutine就注册+1
go hello(i)
}
fmt.Println("main goroutine")
wg.Wait() // 等待所有注册的goroutine都结束
}

每次执行上述代码,输出的顺序都是不一样的,因为goroutine是并发执行,且调度是随机的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 第一次执行
hello goroutine! 0
hello goroutine! 5
hello goroutine! 6
hello goroutine! 7
hello goroutine! 8
hello goroutine! 3
hello goroutine! 1
hello goroutine! 9
main goroutine
hello goroutine! 4
hello goroutine! 2

// 第二次执行
hello goroutine! 7
hello goroutine! 9
hello goroutine! 0
hello goroutine! 3
hello goroutine! 4
hello goroutine! 8
hello goroutine! 6
hello goroutine! 1
hello goroutine! 2
main goroutine
hello goroutine! 5

2.2.退出goroutine

goroutine被设计为自己退出(无法在外部强制去关闭一个goroutine,除了main函数结束)。有几种能够让goroutine退出的方式:

  1. 通过channel通知退出
  2. 通过context通知退出
  3. panic退出

例一:tcp服务器,每隔一秒返回当前时间

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
"io"
"log"
"net"
"os"
)

func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
fmt.Println(err)
}
defer conn.Close()
mustCopy(os.Stdout, conn)
}

func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"fmt"
"io"
"log"
"net"
"time"
)

func main() {
address := "localhost:8000"
fmt.Println("Starting to listen on: ", address)
socket, err := net.Listen("tcp", address)
if err != nil {
log.Fatal(err)
}
for {
c, err := socket.Accept()
fmt.Println("accept a client")
if err != nil {
log.Print(err)
continue
}
// 加了一个go关键字,就可以接收多个客户端请求,否则只能处理一个请求
go HandleTCPConnection(c)
}
}

func HandleTCPConnection(c net.Conn) {
defer c.Close()
for {
_, err := io.WriteString(c, time.Now().Format("03:04:05\n"))
if err != nil {
fmt.Println(err) // write tcp 127.0.0.1:8000->127.0.0.1:52383: write: broken pipe
return
}
time.Sleep(time.Second)
}
}

例二:tcp服务器 echo

server

在goroutine中继续使用goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
"bufio"
"fmt"
"log"
"net"
"strings"
"time"
)

func main() {
address := "localhost:8000"
fmt.Println("Starting to listen on: ", address)
socket, err := net.Listen("tcp", address)
if err != nil {
log.Fatal(err)
}
for {
c, err := socket.Accept()
fmt.Println("accept a client")
if err != nil {
log.Print(err)
continue
}
go HandleTCPConnection(c)
}
}

func Echo(c net.Conn, str string, delay time.Duration) {
_, err := fmt.Fprintln(c, "\t", strings.ToUpper(str))
if err != nil {
fmt.Println(err)
}
time.Sleep(delay)
fmt.Fprintln(c, "\t", str)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(str))

}

func HandleTCPConnection(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
go Echo(c, input.Text(), 1*time.Second)
}
c.Close()
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"io"
"log"
"net"
"os"
)

func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
fmt.Println(err)
}
defer conn.Close()
go mustCopy(os.Stdout, conn)
mustCopy(conn, os.Stdin)
//b := []byte("test")
//conn.Write(b)
//time.Sleep(2 * time.Second)
}

func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
fmt.Println("mustCopy函数执行结束")
}

client with channel

前面那个client,在使用ctrl + D 退出之后,只会输出一次mustCopy函数执行结束,因为main函数先执行完了,Stdout的那个goroutine也跟着结束了

使用无缓冲通道可以让main函数阻塞,等到其他goroutine发过来消息之后才结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"fmt"
"io"
"log"
"net"
"os"
)

func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
fmt.Println(err)
}
done := make (chan struct{})

go func(){
io.Copy(os.Stdout, conn)
fmt.Println("2.后台goroutine执行结束")
done <- struct{}{}
}()

mustCopy(conn, os.Stdin)
conn.Close()
fmt.Println("1.等待执行main函数goroutine结束")

// 注意:1和2的输出顺序可能相反

<- done // 等待后台goroutine完成
fmt.Println("3.main函数goroutine结束")
}

func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}