文章

Go 标准库学习:sync

sync包提供了基本的同步原语,如互斥锁、Once和WaitGroup类型等。

本包的类型的值不应被拷贝。

代码示例

Mutex 互斥锁

Mutex 是一个互斥锁,可以创建为其他结构体的字段;零值为解锁状态。Mutex 类型的锁和线程无关,可以由不同的线程加锁和解锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type ConcurrentMap struct {
	l *sync.Mutex
	m map[string]int
}

func (m *ConcurrentMap) Set(key string, val int) {
	m.l.Lock()
	defer m.l.Unlock()
	m.m[key] = val
}

func (m *ConcurrentMap) Get(key string) (val int, ok bool) {
	m.l.Lock()
	defer m.l.Unlock()
	val, ok = m.m[key]
	return
}

func TestMutex(t *testing.T) {

	concurrentMap := &ConcurrentMap{
		l: &sync.Mutex{},
		m: make(map[string]int, 10),
	}
	concurrentMap.Set("jason:age", 10)

	if val, ok := concurrentMap.Get("jason:age"); ok {
		fmt.Printf("val: %v\n", val)
	}
}

RWMutex 读写锁

RWMutex 是读写互斥锁。该锁可以被同时多个读取者持有或唯一个写入者持有。RWMutex 可以创建为其他结构体的字段;零值为解锁状态。RWMutex 类型的锁也和线程无关,可以由不同的线程加读取锁/写入和解读取锁/写入锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type ConcurrentMap struct {
	l *sync.RWMutex
	m map[string]int
}

func (m *ConcurrentMap) Set(key string, val int) {
	m.l.Lock()
	defer m.l.Unlock()
	m.m[key] = val
}

func (m *ConcurrentMap) Get(key string) (val int, ok bool) {
	m.l.RLock()
	defer m.l.RUnlock()
	val, ok = m.m[key]
	return
}

func TestMutex(t *testing.T) {

	concurrentMap := &ConcurrentMap{
		l: &sync.RWMutex{},
		m: make(map[string]int, 10),
	}
	concurrentMap.Set("jason:age", 10)

	if val, ok := concurrentMap.Get("jason:age"); ok {
		fmt.Printf("val: %v\n", val)
	}
}

WaitGroup 用法

WaitGroup 用于等待一组线程的结束。父线程调用 Add 方法来设定应等待的线程的数量。每个被等待的线程在结束时应调用 Done 方法。同时,主线程里可以调用 Wait 方法阻塞至所有线程结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func TestWaitGroup(t *testing.T) {

	var wg sync.WaitGroup

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			fmt.Printf("i: %v\n", i)
		}(i)
	}

	wg.Wait()
}

Once 用法

Do方法当且仅当第一次被调用时才执行函数f

如果 once.Do(f) 被多次调用,只有第一次调用会执行 f,即使f每次调用 Do 提供的 f 值不同。需要给每个要执行仅一次的函数都建立一个 Once 类型的实例。

Do用于必须刚好运行一次的初始化。因为f是没有参数的,因此可能需要使用闭包来提供给Do方法调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Person struct{}
var once = sync.Once{}
var p *Person

func New() *Person {
	once.Do(func() {
		p = &Person{}
	})
	return p
}

func TestOnce(t *testing.T) {
	p1 := New()
	p2 := New()
	fmt.Println(p1 == p2)
}

pool 用法

  • Pool 是一个可以分别存取的临时对象的集合。
  • Pool 中保存的任何 item 都可能随时不做通告的释放掉。如果 Pool 持有该对象的唯一引用,这个 item 就可能被回收。
  • Pool 可以安全的被多个线程同时使用。
  • Pool 的目的是缓存申请但未使用的 item 用于之后的重用,以减轻 GC 的压力。也就是说,让创建高效而线程安全的空闲列表更容易。但Pool并不适用于所有空闲列表。
  • Pool 的合理用法是用于管理一组静静的被多个独立并发线程共享并可能重用的临时itemPool 提供了让多个线程分摊内存申请消耗的方法。
  • Pool 的一个好例子在fmt包里。该 Pool 维护一个动态大小的临时输出缓存仓库。该仓库会在过载(许多线程活跃的打印时)增大,在沉寂时缩小。
  • 另一方面,管理着短寿命对象的空闲列表不适合使用 Pool,因为这种情况下内存申请消耗不能很好的分配。这时应该由这些对象自己实现空闲列表。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func TestPool(t *testing.T) {

	wg := sync.WaitGroup{}
	p := &sync.Pool{}

	p.New = func() any {
		return make([]byte, 0, 10)
	}
	wg.Add(10)
	for i := 10000; i < 10010; i++ {

		go func(i int) {
			defer wg.Done()
			buffer := p.Get()
			defer p.Put(buffer)
			b := buffer.([]byte)
			b = binary.BigEndian.AppendUint32(b, uint32(i))
			fmt.Println(b)
		}(i)

	}
	wg.Wait()
}

Cond 用法

sync.Cond 是基于互斥锁/读写锁实现的条件变量,用来协调想要访问共享资源的那些 Goroutine 。当共享资源状态发生变化时,sync.Cond 可以用来通知等待条件发生而阻塞的 Goroutine

互斥锁 sync.Mutex 通常用来保护共享的临界资源,条件变量 sync.Cond 用来协调想要访问共享资源的 Goroutine。当共享资源的状态发生变化时,sync.Cond 可以用来通知被阻塞的 Goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func TestCond(t *testing.T) {
	mu := &sync.Mutex{}
	c := sync.NewCond(mu)
	for i := 0; i < 5; i++ {
		var item = i
		go func() {
			mu.Lock()
			c.Wait()
			fmt.Printf("item: %v\n", item)
			mu.Unlock()
		}()
	}
	fmt.Println("signal ... ")
	c.Signal()
	<-time.After(time.Second)
	c.Broadcast()
	<-time.After(time.Second)
}
本文由作者按照 CC BY 4.0 进行授权