For-learning-Go-Tutorial

Go语言是谷歌2009发布的第二款开源编程语言。

Go语言专门针对多处理器系统应用程序的编程进行了优化,使用Go编译的程序可以媲美C或C++代码的速度,而且更加安全、支持并行进程。

因而一直想的是自己可以根据自己学习和使用Go语言编程的心得,写一本Go的书可以帮助想要学习Go语言的初学者快速入门开发和使用!

Sync.Map解析

在Go1.9之前,Go自带的Map不是并发安全的,因此我们需要自己再封装一层,给Map加上把读写锁,例如:

  1. type MapWithLock struct {
  2. sync.RWMutex
  3. M map[string]Kline
  4. }

用MapWithLock的读写锁去控制map的并发安全。

但是到了Go1.9发布,它有了一个新的特性,那就是sync.map,它是原生支持并发安全的map,不过它的用法和以前我们熟悉的map完全不一样,主要还是因为sync.map封装了更为复杂的数据结构,用来实现比之前加锁map更优秀的性能。

在Go 1.9中sync.Map是怎么实现的呢,它又是如何解决并发提升性能的呢?我们一起看看Sync.Map的实现和优化的点在哪里。

空间换时间。 通过冗余的两个数据结构(read、dirty),实现加锁对性能的影响。使用只读数据(read),避免读写冲突。动态调整,miss次数多了之后,将dirty数据提升为read。double-checking。 延迟删除。 删除一个键值只是打标记,只有在提升dirty的时候才清理删除的数据。 优先从read读取、更新、删除,因为对read的读取不需要锁。

sync.Map的数据结构:

  1. type Map struct {
  2. // 当涉及到dirty数据的操作的时候,需要使用这个锁
  3. mu Mutex
  4. // 一个只读的数据结构,因为只读,所以不会有读写冲突。
  5. // 所以从这个数据中读取总是安全的。
  6. // 实际上,实际也会更新这个数据的entries,如果entry是未删除的(unexpunged), 并不需要加锁。如果entry已经被删除了,需要加锁,以便更新dirty数据。
  7. read atomic.Value // readOnly
  8. // dirty数据包含当前的map包含的entries,它包含最新的entries(包括read中未删除的数据,虽有冗余,但是提升dirty字段为read的时候非常快,不用一个一个的复制,而是直接将这个数据结构作为read字段的一部分),有些数据还可能没有移动到read字段中。
  9. // 对于dirty的操作需要加锁,因为对它的操作可能会有读写竞争。
  10. // 当dirty为空的时候, 比如初始化或者刚提升完,下一次的写操作会复制read字段中未删除的数据到这个数据中。
  11. dirty map[interface{}]*entry
  12. // 当从Map中读取entry的时候,如果read中不包含这个entry,会尝试从dirty中读取,这个时候会将misses加一,
  13. // 当misses累积到 dirty的长度的时候, 就会将dirty提升为read,避免从dirty中miss太多次。因为操作dirty需要加锁。
  14. misses int
  15. }

它的数据结构很简单,值包含四个字段:read、mu、dirty、misses。

readOnly.m和Map.dirty存储的值类型是*entry,它包含一个指针p, 指向用户存储的value值。

  1. type entry struct {
  2. p unsafe.Pointer // *interface{}
  3. }

p通常有三种类型的值:

  • nil: entry已被删除了,并且m.dirty为nil
  • expunged: entry已被删除了,并且m.dirty不为nil,而且这个entry不存在于m.dirty中
  • 其它: entry是一个正常的值

它使用了冗余的数据结构read、dirty。dirty中会包含read中为删除的entries,新增加的entries会加入到dirty中。 read的数据结构是:

  1. type readOnly struct {
  2. m map[interface{}]*entry
  3. amended bool // 如果Map.dirty有些数据不在中的时候,这个值为true
  4. }

amended指明Map.dirty中有readOnly.m未包含的数据,所以如果从Map.read找不到数据的话,还要进一步到Map.dirty中查找。

对Map.read的修改是通过原子操作进行的。

虽然read和dirty有冗余数据,但这些数据是通过指针指向同一个数据,所以尽管Map的value会很大,但是冗余的空间占用还是有限的。

sync.Map主要有五个方法:

1、Load 取key对应的value

2、Store 存 key,value

3、Delete 删除key,及其value

4、Range 遍历所有的key,value

  • Load方法 Load方法,提供一个键key,查找对应的值value,如果不存在,通过ok反映:
    1. func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    2. // 1.首先从m.read中得到只读readOnly,从它的map中查找,不需要加锁
    3. read, _ := m.read.Load().(readOnly)
    4. e, ok := read.m[key]
    5. // 2. 如果没找到,并且m.dirty中有新数据,需要从m.dirty查找,这个时候需要加锁
    6. if !ok && read.amended {
    7. m.mu.Lock()
    8. // 双检查,避免加锁的时候m.dirty提升为m.read,这个时候m.read可能被替换了。
    9. read, _ = m.read.Load().(readOnly)
    10. e, ok = read.m[key]
    11. // 如果m.read中还是不存在,并且m.dirty中有新数据
    12. if !ok && read.amended {
    13. // 从m.dirty查找
    14. e, ok = m.dirty[key]
    15. // 不管m.dirty中存不存在,都将misses计数加一
    16. // missLocked()中满足条件后就会提升m.dirty
    17. m.missLocked()
    18. }
    19. m.mu.Unlock()
    20. }
    21. if !ok {
    22. return nil, false
    23. }
    24. return e.load()
    25. }
    这里会先从m.read中加载,不存在的情况下,并且m.dirty中有新数据,加锁,然后从m.dirty中加载。其次是这里使用了双检查的处理,因为在下面的两个语句中,这两行语句并不是一个原子操作。
  1. if !ok && read.amended {
  2. m.mu.Lock()

当第一句执行的时候条件满足,但是在加锁之前,m.dirty可能被提升为m.read,所以加锁后还得再检查m.read,后续的方法中都使用了这个方法。

如果我们查询的键值正好存在于m.read中,无须加锁,直接返回,理论上性能优异。即使不存在于m.read中,经过miss几次之后,m.dirty会被提升为m.read,又会从m.read中查找。所以对于更新/增加较少,加载存在的key很多的case,性能基本和无锁的map类似。 接着我们看下如何m.dirty是如何被提升的。 missLocked方法中可能会将m.dirty提升。

  1. func (m *Map) missLocked() {
  2. m.misses++
  3. if m.misses < len(m.dirty) {
  4. return
  5. }
  6. m.read.Store(readOnly{m: m.dirty})
  7. m.dirty = nil
  8. m.misses = 0
  9. }

上面的最后三行代码就是提升m.dirty的,很简单的将m.dirty作为readOnly的m字段,原子更新m.read。提升后m.dirty、m.misses重置, 并且m.read.amended为false。

  • Store方法 Store方法是更新或者新增一个entry。

    1. func (m *Map) Store(key, value interface{}) {
    2. // 如果m.read存在这个键,并且这个entry没有被标记删除,尝试直接存储。
    3. // 因为m.dirty也指向这个entry,所以m.dirty也保持最新的entry。
    4. read, _ := m.read.Load().(readOnly)
    5. if e, ok := read.m[key]; ok && e.tryStore(&value) {
    6. return
    7. }
    8. // 如果`m.read`不存在或者已经被标记删除
    9. m.mu.Lock()
    10. read, _ = m.read.Load().(readOnly)
    11. if e, ok := read.m[key]; ok {
    12. if e.unexpungeLocked() { //标记成未被删除
    13. m.dirty[key] = e //m.dirty中不存在这个键,所以加入m.dirty
    14. }
    15. e.storeLocked(&value) //更新
    16. } else if e, ok := m.dirty[key]; ok { // m.dirty存在这个键,更新
    17. e.storeLocked(&value)
    18. } else { //新键值
    19. if !read.amended { //m.dirty中没有新的数据,往m.dirty中增加第一个新键
    20. m.dirtyLocked() //从m.read中复制未删除的数据
    21. m.read.Store(readOnly{m: read.m, amended: true})
    22. }
    23. m.dirty[key] = newEntry(value) //将这个entry加入到m.dirty中
    24. }
    25. m.mu.Unlock()
    26. }
    27. func (m *Map) dirtyLocked() {
    28. if m.dirty != nil {
    29. return
    30. }
    31. read, _ := m.read.Load().(readOnly)
    32. m.dirty = make(map[interface{}]*entry, len(read.m))
    33. for k, e := range read.m {
    34. if !e.tryExpungeLocked() {
    35. m.dirty[k] = e
    36. }
    37. }
    38. }
    39. func (e *entry) tryExpungeLocked() (isExpunged bool) {
    40. p := atomic.LoadPointer(&e.p)
    41. for p == nil {
    42. // 将已经删除标记为nil的数据标记为expunged
    43. if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
    44. return true
    45. }
    46. p = atomic.LoadPointer(&e.p)
    47. }
    48. return p == expunged
    49. }

    通常是先从操作m.read开始的,如果不满足条件再加锁,然后操作m.dirty。Store 方法可能会在某种情况下(初始化或者m.dirty刚被提升后)从m.read中复制数据,如果这个时候m.read中数据量非常大,可能会影响性能。

  • Delete方法 Delete方法用来删除一个键值。

    1. func (m *Map) Delete(key interface{}) {
    2. read, _ := m.read.Load().(readOnly)
    3. e, ok := read.m[key]
    4. if !ok && read.amended {
    5. m.mu.Lock()
    6. read, _ = m.read.Load().(readOnly)
    7. e, ok = read.m[key]
    8. if !ok && read.amended {
    9. delete(m.dirty, key)
    10. }
    11. m.mu.Unlock()
    12. }
    13. if ok {
    14. e.delete()
    15. }
    16. }

    这里的删除操作还是从m.read中开始, 如果这个entry不存在于m.read中,并且m.dirty中有新数据,则加锁尝试从m.dirty中删除。

此外需要,双检查的。 从m.dirty中直接删除即可,就当它没存在过,但是如果是从m.read中删除,并不会直接删除,而是打标记:

  1. func (e *entry) delete() (hadValue bool) {
  2. for {
  3. p := atomic.LoadPointer(&e.p)
  4. // 已标记为删除
  5. if p == nil || p == expunged {
  6. return false
  7. }
  8. // 原子操作,e.p标记为nil
  9. if atomic.CompareAndSwapPointer(&e.p, p, nil) {
  10. return true
  11. }
  12. }
  13. }
  • Range方法 因为for … range map是内建的语言特性,所以没有办法使用for range遍历sync.Map, 但是可以使用它的Range方法,通过回调的方式遍历。
    1. func (m *Map) Range(f func(key, value interface{}) bool) {
    2. read, _ := m.read.Load().(readOnly)
    3. // 如果m.dirty中有新数据,则提升m.dirty,然后在遍历
    4. if read.amended {
    5. //提升m.dirty
    6. m.mu.Lock()
    7. read, _ = m.read.Load().(readOnly) //双检查
    8. if read.amended {
    9. read = readOnly{m: m.dirty}
    10. m.read.Store(read)
    11. m.dirty = nil
    12. m.misses = 0
    13. }
    14. m.mu.Unlock()
    15. }
    16. // 遍历, for range是安全的
    17. for k, e := range read.m {
    18. v, ok := e.load()
    19. if !ok {
    20. continue
    21. }
    22. if !f(k, v) {
    23. break
    24. }
    25. }
    26. }
    Range方法调用前可能会做一个m.dirty的提升,不过提升m.dirty不是一个耗时的操作。