关于并发编程,需要了解一些基础的概念
一、什么是并发
在串行程序中,程序中各个步骤的执行顺序由程序逻辑来决定。例如,第一条语句先于第二条语句执行,以此类推。 在不同的goroutine中,各自的语句也是按顺序执行。但是,不同goroutine语句(或者说事件)的执行顺序关系,我们是不知道的。我们无法确定地说一个事件先于另一个事件,那么这两个事件就是并发的。
二、什么是并发安全
如果一个函数在并发调用时,仍然能够正确工作,那么称这个函数时并发安全的。
如果一个类型的所有可访问方法和操作都是并发安全的,这可以称为是并发安全的类型。
导出的包级别的函数通常可以认为是并发安全的,因为包级别的变量无法限制在一个goroutine内,修改这些变量的函数就必须采用互斥机制。
三、数据竞态
3.1.什么是数据竞态
两个goroutine并发地去读写同一个变量,并且至少有一个是写入的时候,就会发生数据竞态。(可参考4.1例子)
3.2.如何避免数据竞态
可以参考以下三种方式
- 第一种方式是不要去修改变量,多个goroutine并发读,是不会产生数据竞态的。从来不用修改的数据本质上是并发安全的。
- 使用channel来改变变量,而不要直接修改另一个goroutine中的变量。 即:”不要通过共享内存来通信,而应通过通信来共享内存“
- 使用互斥机制,同一时间只有一个goroutine可以操作变量
四、数据竞态代码示例
4.1.数据竞态带来的影响
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package main
import ( "fmt" "sync" "time" )
type Bank struct { balance int }
func (b *Bank) Deposit(amount int) { b.balance = b.balance + amount }
func (b *Bank) Balance() int { return b.balance }
func test() { var bank = Bank{0} var wg sync.WaitGroup wg.Add(2) go func() { bank.Deposit(100) wg.Done() }() go func() { bank.Deposit(200) wg.Done() }() wg.Wait() if bank.Balance() != 300 { fmt.Println("最终余额不等于300,结果是:", bank.Balance()) } }
func main() {
for i := 0; i < 100000; i++ { test() }
time.Sleep(5 * time.Second) }
|
执行结果如下(10w次循环中,发生了11次错误):
1 2 3 4 5 6 7 8 9 10 11
| 最终余额不等于300,结果是: 100 最终余额不等于300,结果是: 100 最终余额不等于300,结果是: 200 最终余额不等于300,结果是: 100 最终余额不等于300,结果是: 200 最终余额不等于300,结果是: 100 最终余额不等于300,结果是: 200 最终余额不等于300,结果是: 200 最终余额不等于300,结果是: 100 最终余额不等于300,结果是: 100 最终余额不等于300,结果是: 100
|
4.3.使用互斥来避免数据竞态
解决并发读写共享变量的一种方式是使用互斥机制,即同一时间只有一个goroutine可以操作变量。我们可以通过sync包提供的互斥锁来实现。
互斥锁sync.Mutex
接着上个bank的例子,使用互斥锁可以这么写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import "sync"
var ( mu sync.Mutex balance int )
func Deposit(amount int){ mu.Lock() defer mu.Unlock() balance += amount }
func Balance() int { mu.Lock() defer mu.Unlock() return balance }
|
现在有一个Withdraw函数,成功的时候,减少余额,当余额不足的时候,补上扣除的余额:
1 2 3 4 5 6 7 8 9 10 11 12 13
|
func Withdraw(amount int) bool { Deposit(-amount)
if Balance() < 0 { Deposit(amount) return false } return true }
|
虽然每个Deposit()使用锁保护了变量,但是Withdraw()不是原子操作。
可以参考如下的改造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func Withdraw(amount int) bool { mu.Lock() defer mu.Unlock() deposit(-amount) if balance() < 0 { deposit(amount) return false } return true }
func Deposit(amount int){ mu.Lock() defer mu.Unlock() deposit(amount) }
func deposit(amount int){ balance += amount }
|
读写互斥锁sync.RWMutex
对于Balance,仅仅只是读取balance的状态,当Balance的并发量很大时,可以考虑使用多读单写锁,提高性能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| var mu sync.RWMutex var balance int
func Balance() int { mu.RLock() defer mu.RUnlock() return balance }
func Deposit(amount int){ mu.Lock() defer mu.Unlock() balance += amount }
|
- Rlock和RUnlock用来获取和释放一个读锁(也叫做共享锁)
- Lock和Unlock用来获取和释放一个写锁(也叫互斥锁)
不过对于锁的竞争不激烈时,不应该使用RWMutex,因为RWMutex需要更复杂的内部实现,竞争不激烈的情况下,比普通的互斥锁慢
延迟初始化sync.Once
主要是用来避免变量在正确构造前就被其他gorountine分享的情况。
互斥锁其他例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func TestCounter(t *testing.T) { counter := 0 var mut sync.Mutex for i := 0; i < 5000; i++ { go func() { defer func() { mut.Unlock() }() mut.Lock() counter++ }() } time.Sleep(1 * time.Second) t.Log(counter) }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func TestCounterWaitGroup(t *testing.T) { counter := 0 var wg sync.WaitGroup var mut sync.Mutex for i := 0; i < 5000; i++ { wg.Add(1) go func() { defer func() { mut.Unlock() wg.Done() }() mut.Lock() counter++ }() } wg.Wait() t.Log(counter) }
|