当我们想要对某个变量并发安全的修改,除了使用官方提供的mutex,还可以使用sync/atomic包的原子操作,它能够保证对变量的读取或修改期间不被其他的协程所影响。
Golang提供的原子操作都是非侵入式的,由标准库sync/atmoic包提供,直接由底层CPU硬件支持。
也就是在硬件层次去实现的,性能较好,不需要像mutex那样记录很多状态。当然,mutex不止是对变量的并发控制,更多的是对代码块的并发控制,两者侧重点不一样。
一.介绍
原子操作即是进行过程中不能被中断的操作,针对某个值的原子操作在被进行的过程中,CPU绝不会再去进行其他的针对该值的操作。
具体的原子操作在不同的操作系统中实现是不同的。比如在Intel的CPU架构机器上,主要是使用总线锁的方式实现的。大致的意思就是当一个CPU需要操作一个内存块的时候,向总线发送一个LOCK信号,所有CPU收到这个信号后就不对这个内存块进行操作了。等待操作的CPU执行完操作后,发送UNLOCK信号,才结束。在AMD的CPU架构机器上就是使用MESI一致性协议的方式来保证原子操作。所以我们在看atomic源码的时候,我们看到它针对不同的操作系统有不同汇编语言文件。
Golang在sync包中已经提供了锁,为什么还需要使用atomic原子操作呢?
1)加锁的代价比较高,耗时多,需要上下文切换。
2)原子操作只针对基本数据类型,不支持结构体、自定义数据类型
3)原子操作在用户态可以完成,性能比互斥锁要高。
4)针对特定需求原子操作步骤简单,无需加锁解锁步骤。
为什么 atomic 比mutex快?
1)原子操作很快,因为它们依赖于CPU指令而不是依赖外部锁。使用互斥锁时,每次获得锁时,goroutine都会短暂暂停或中断,这种阻塞占使用互斥锁所花费时间的很大一部分(他们是由操作系统调度的)。原子操作可以在没有任何中断的情况下执行。
2)原子操作是能够保证执行期间是连续且不会被中断的,临界区只能保证访问共享数据是按顺序访问的,但并不能保证访问期间不会被切换上下文。
CAS
CAS是CPU硬件同步原语,是Compare And Swap的缩写
Go中的CAS操作,是借用了CPU提供的原子性指令来实现。CAS操作修改共享变量时候不需要对共享变量加锁,
而是通过类似乐观锁的方式进行检查,本质还是不断的占用CPU资源换取加锁带来的开销(比如上下文切换开销)
原子操作中的CAS,在sync/atomic包中,这类原子操作由名称以CompareAndSwap为前缀的若干个函数提供
优势:
- 可以在不形成临界区和创建互斥量的情况下完成并发安全的值替换操作。这可以大大的减少同步对程序性育
劣势:
- 在被操作值被频繁变更的情况下,CAS操作并不那么容易成功。因为需要对oLd值进行匹配,只有匹配成功
当前atomic 包有以下几种原子操作:Add、CompareAndSwap、Load、Store、Swap
二.操作
Add,CompareAndSwap,Load,Store,Swap
2.1 Add (增或减)
- 用于处理增加和减少的原子操作,函数名以Add为前缀,后跟针对特定类型的名称。
- 原子增被操作的只能是数值类型,即int32,int64,uint32,uint64,uinptr。
- 原子增减函数的第一个参数为原值,第二个是要增多少。
- 方法:
func AddInt32(addr *int32, delta int32) (new int32)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
package mainimport ("fmt""sync/atomic"
)func main() {var num int32fmt.Println("num is ", num)atomic.AddInt32(&num, 10)fmt.Println("new num is ", num)
}
减法就很简单,就直接负值即可,就可以实现。
2.2 CompareAndSwap (比较并交换)
什么是比较并交换?
我们来看它的参数,有一个old和new值,分别表示它的原始值和新值,如果说这个原始值输入不对,则不会改变这个new值。
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
package mainimport ("fmt""sync/atomic"
)func main() {var num int32fmt.Println("num is ", num)atomic.CompareAndSwapInt32(&num, 0, 10)fmt.Println("new num is ", num)
}
可以试试,把old值不设置为0,它并不报错,只是不改变值而已
2.3 Load (读取)
Load原子性的读取操作接受一个对应类型的指针值,返回该指针指向的值。原子性读取意味着读取值的同时,当前计算机的任何CPU都不会进行针对值的读写操作。
比方说在32位计算架构的计算机上写入一个64位的整数时,如果在这个写操作尚未完成的时候,有一个读操作被并发的执行了,那么这个读操作很有可能会读取到一个只有被修改了一半的数据。
另一个需要注意的是for循环中,当使用v:=value为变量v赋值时,需要注意的是由于读取value的值的操作并不是并发安全的。因此在读取操作时其它对其的读写操作可能会同时发生。
先来看下它的函数和简单使用
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
package mainimport ("fmt""sync/atomic"
)func main() {var num int32fmt.Println("num is ", num)atomic.LoadInt32(&num)fmt.Println("new num is ", atomic.LoadInt32(&num))
}
针对上述所说的问题做一个详细的解答
首先就是非原子性读取的风险:
假设有一个共享变量 value
,多个协程(goroutine)可能同时读写它。如果直接通过普通读取(如 v := value
)获取值,可能会遇到以下问题:
- 部分写入:例如在 32 位系统上写入一个 64 位整数需要两次操作(先写低 32 位,再写高 32 位)。如果读取操作在两次写入之间发生,可能读到“半新半旧”的值。
- 缓存不一致:不同 CPU 核心的缓存可能不同步,普通读取可能读到过时的缓存值,而非内存中的最新值。
这个时候就该我们的Load出手了:
- 它可以确保读操作是从内存中获取,而不是缓存
- 原子性保证,确保读到的值是完整的,不会读到部分写入
2.4 Store(存储)
- 原子性存储会将val值保存在*addr中
- 与读操作对应的写操作,sync/atomic提供了与原子值载入Load函数相对应的原子值存储Store函数,原子性存储函数均为Store开头
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
package mainimport ("fmt""sync/atomic"
)func main() {var num int32atomic.StoreInt32(&num, 10)fmt.Println("num is ", num)
}
2.5 Swap (交换)
他和比较并交换是不同的,他只有新值作为参数,但是旧值是返回的
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
package mainimport ("fmt""sync/atomic"
)func main() {var num int32old := atomic.SwapInt32(&num, 10)fmt.Println("new is ", num)fmt.Println("old is", old)
}
2.6 Or (位或)和 And (位与)
都是用于设置某些特定位(如标志位),不影响其他位。
就是用于处理bit位,专门用于做位运算的。
func OrInt32(addr *int32, mask int32) (old int32)
func OrUint32(addr *uint32, mask uint32) (old uint32)
func OrInt64(addr *int64, mask int64) (old int64)
func OrUint64(addr *uint64, mask uint64) (old uint64)
func OrUintptr(addr *uintptr, mask uintptr) (old uintptr)func AndInt32(addr *int32, mask int32) (old int32)
func AndUint32(addr *uint32, mask uint32) (old uint32)
func AndInt64(addr *int64, mask int64) (old int64)
func AndUint64(addr *uint64, mask uint64) (old uint64)
func AndUintptr(addr *uintptr, mask uintptr) (old uintptr)
三.atomic.Value
网上对这个有详细的介绍底层,感兴趣的同学可以自行上网查阅
上述的操作你会发现,他只针对了一些一部分类型,其他数据结构依旧要使用到锁,如果把这些互斥锁换成用atomic.LoadPointer/StorePointer
来做并发控制,那性能将能提升。
针对这个问题,就有人提出,在已有的atomic包的基础之上,封装出一个atomic.Value类型,这样用户就可以在不依赖Go内部类型的情况下使用原子操作了。
在他下面也提供了上述的Add,Swap,CompareAndSwap,Load和Store
type Value struct {v any
}
package mainimport ("fmt""sync/atomic""time"
)// 定义配置结构体
type Config struct {DatabaseURL stringMaxConnects intTimeout time.Duration
}func main() {// 初始化 atomic.Value,存储初始配置var config atomic.Valueconfig.Store(Config{DatabaseURL: "localhost:3306",MaxConnects: 10,Timeout: 5 * time.Second,})// 启动一个 goroutine 定时更新配置go func() {updateCount := 0for {time.Sleep(2 * time.Second) // 每2秒更新一次配置newConfig := Config{DatabaseURL: fmt.Sprintf("new_host_%d:3306", updateCount),MaxConnects: 10 + updateCount,Timeout: 5*time.Second + time.Duration(updateCount)*time.Second,}config.Store(newConfig) // 原子存储新配置updateCount++}}()// 启动多个读取协程,并发读取配置for i := 0; i < 3; i++ {go func(id int) {for {time.Sleep(500 * time.Millisecond)// 原子读取配置currentConfig := config.Load().(Config) // 类型断言fmt.Printf("Goroutine %d: Config={URL: %s, MaxConnects: %d, Timeout: %s}\n",id, currentConfig.DatabaseURL, currentConfig.MaxConnects, currentConfig.Timeout)}}(i)}// 主协程等待time.Sleep(10 * time.Second)
}
这里是以一个结构体为例,如果是字符串也是可以的
需要注意一点:
Load()
返回interface{}
,需强制转换为实际类型
换句话说就是写用 Store
,读用 Load
,类型要一致