Go并发 -- 竞态

关于并发编程,需要了解一些基础的概念

一、什么是并发

在串行程序中,程序中各个步骤的执行顺序由程序逻辑来决定。例如,第一条语句先于第二条语句执行,以此类推。 在不同的goroutine中,各自的语句也是按顺序执行。但是,不同goroutine语句(或者说事件)的执行顺序关系,我们是不知道的。我们无法确定地说一个事件先于另一个事件,那么这两个事件就是并发的。

二、什么是并发安全

如果一个函数在并发调用时,仍然能够正确工作,那么称这个函数时并发安全的。

如果一个类型的所有可访问方法和操作都是并发安全的,这可以称为是并发安全的类型。

导出的包级别的函数通常可以认为是并发安全的,因为包级别的变量无法限制在一个goroutine内,修改这些变量的函数就必须采用互斥机制。

三、数据竞态

3.1.什么是数据竞态

两个goroutine并发地去读写同一个变量,并且至少有一个是写入的时候,就会发生数据竞态。(可参考4.1例子)

3.2.如何避免数据竞态

可以参考以下三种方式

  1. 第一种方式是不要去修改变量,多个goroutine并发读,是不会产生数据竞态的。从来不用修改的数据本质上是并发安全的。
  2. 使用channel来改变变量,而不要直接修改另一个goroutine中的变量。 即:”不要通过共享内存来通信,而应通过通信来共享内存“
  3. 使用互斥机制,同一时间只有一个goroutine可以操作变量

四、数据竞态代码示例

4.1.数据竞态带来的影响

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (
"fmt"
"sync"
"time"
)

type Bank struct {
balance int
}

func (b *Bank) Deposit(amount int) { b.balance = b.balance + amount }

func (b *Bank) Balance() int { return b.balance }

func test() {
// A
var bank = Bank{0}
var wg sync.WaitGroup
wg.Add(2)
go func() {
bank.Deposit(100)
wg.Done()
}()
// B
go func() {
bank.Deposit(200)
wg.Done()
}()
wg.Wait()
//fmt.Println(bank.Balance())
if bank.Balance() != 300 {
fmt.Println("最终余额不等于300,结果是:", bank.Balance())
}
// 如果是以下情形,最终的结果,有可能不是300
// 当A在执行 balance + amount 的时候,B执行完了 balance = balance + amount
// 然后A执行 balance = balance + amount
// 这时候,最终的余额是100
}

func main() {

for i := 0; i < 100000; i++ {
test()
}

time.Sleep(5 * time.Second)
}

执行结果如下(10w次循环中,发生了11次错误):

1
2
3
4
5
6
7
8
9
10
11
最终余额不等于300,结果是: 100
最终余额不等于300,结果是: 100
最终余额不等于300,结果是: 200
最终余额不等于300,结果是: 100
最终余额不等于300,结果是: 200
最终余额不等于300,结果是: 100
最终余额不等于300,结果是: 200
最终余额不等于300,结果是: 200
最终余额不等于300,结果是: 100
最终余额不等于300,结果是: 100
最终余额不等于300,结果是: 100

4.3.使用互斥来避免数据竞态

解决并发读写共享变量的一种方式是使用互斥机制,即同一时间只有一个goroutine可以操作变量。我们可以通过sync包提供的互斥锁来实现。

互斥锁sync.Mutex

接着上个bank的例子,使用互斥锁可以这么写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import "sync"

var (
mu sync.Mutex
// 一般被互斥量保护的变量声明应该紧接在互斥量声明之后
balance int
)

func Deposit(amount int){
mu.Lock()
defer mu.Unlock()
balance += amount

}

func Balance() int {
mu.Lock()
defer mu.Unlock()
return balance
}

现在有一个Withdraw函数,成功的时候,减少余额,当余额不足的时候,补上扣除的余额:

1
2
3
4
5
6
7
8
9
10
11
12
13

// 非原子
func Withdraw(amount int) bool {
Deposit(-amount)

// 中间这个状态下,可能导致余额不足

if Balance() < 0 {
Deposit(amount)
return false
}
return true
}

虽然每个Deposit()使用锁保护了变量,但是Withdraw()不是原子操作。

可以参考如下的改造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func Withdraw(amount int) bool {
mu.Lock()
defer mu.Unlock()
deposit(-amount)
if balance() < 0 {
deposit(amount)
return false
}
return true
}

func Deposit(amount int){
mu.Lock()
defer mu.Unlock()
deposit(amount)

}

// 使用这个函数的前提是需要已经上锁
func deposit(amount int){
balance += amount
}

读写互斥锁sync.RWMutex

对于Balance,仅仅只是读取balance的状态,当Balance的并发量很大时,可以考虑使用多读单写锁,提高性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

var mu sync.RWMutex
var balance int

func Balance() int {
mu.RLock() // 读锁
defer mu.RUnlock()
return balance
}

func Deposit(amount int){
mu.Lock()
defer mu.Unlock()
balance += amount
}
  • Rlock和RUnlock用来获取和释放一个读锁(也叫做共享锁)
  • Lock和Unlock用来获取和释放一个写锁(也叫互斥锁)

不过对于锁的竞争不激烈时,不应该使用RWMutex,因为RWMutex需要更复杂的内部实现,竞争不激烈的情况下,比普通的互斥锁慢

延迟初始化sync.Once

主要是用来避免变量在正确构造前就被其他gorountine分享的情况。

互斥锁其他例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func TestCounter(t *testing.T) {
counter := 0
var mut sync.Mutex
for i := 0; i < 5000; i++ {
go func() {
defer func() {
mut.Unlock()
}()
mut.Lock() // 需要在对counter调用前加锁
counter++
}()
}
time.Sleep(1 * time.Second) // 可能携程还没有执行完,主进程执行完,因此Sleep一会儿
t.Log(counter)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

func TestCounterWaitGroup(t *testing.T) {
counter := 0
var wg sync.WaitGroup
var mut sync.Mutex
for i := 0; i < 5000; i++ {
wg.Add(1)
go func() {
defer func() {
mut.Unlock()
wg.Done()
}()
mut.Lock() // 需要在对counter调用前加锁
counter++
}()
}
wg.Wait()
t.Log(counter)
}