Go进阶高并发处理教程
目录
- Go并发编程基础
- Goroutine深入理解
- 同步原语详解
- 并发模式与最佳实践
- 性能优化技巧
- 实战案例
Go并发编程基础
什么是并发?
并发是指程序能够同时处理多个任务的能力。Go语言从设计之初就将并发作为核心特性,提供了简洁而强大的并发编程模型。
Go并发模型的优势
- 轻量级协程:Goroutine比传统线程更轻量
- CSP模型:通过通信来共享内存,而不是通过共享内存来通信
- 内置调度器:Go运行时自动管理goroutine的调度
Goroutine深入理解
创建和启动Goroutine
package mainimport ("fmt""time"
)func worker(id int) {fmt.Printf("Worker %d starting\n", id)time.Sleep(time.Second)fmt.Printf("Worker %d done\n", id)
}func main() {// 启动多个goroutinefor i := 1; i <= 5; i++ {go worker(i)}// 等待所有goroutine完成time.Sleep(2 * time.Second)fmt.Println("All workers completed")
}
Goroutine的生命周期
- 创建:使用
go
关键字创建 - 调度:由Go调度器管理
- 执行:在可用的OS线程上执行
- 结束:函数返回时自动结束
调度器工作原理
Go使用M:N调度模型:
- M:OS线程(Machine)
- P:处理器(Processor)
- G:Goroutine
G1 G2 G3 G4\ | | /\ | | /\ | |/\| |P1 P2| |M1 M2
同步原语详解
sync.WaitGroup
用于等待一组goroutine完成:
package mainimport ("fmt""sync""time"
)func worker(id int, wg *sync.WaitGroup) {defer wg.Done() // 完成时调用Done()fmt.Printf("Worker %d starting\n", id)time.Sleep(time.Second)fmt.Printf("Worker %d done\n", id)
}func main() {var wg sync.WaitGroupfor i := 1; i <= 5; i++ {wg.Add(1) // 增加等待计数go worker(i, &wg)}wg.Wait() // 等待所有goroutine完成fmt.Println("All workers completed")
}
sync.Mutex
互斥锁用于保护共享资源:
package mainimport ("fmt""sync"
)type Counter struct {mu sync.Mutexvalue int
}func (c *Counter) Increment() {c.mu.Lock()defer c.mu.Unlock()c.value++
}func (c *Counter) Value() int {c.mu.Lock()defer c.mu.Unlock()return c.value
}func main() {counter := &Counter{}var wg sync.WaitGroup// 启动100个goroutine同时增加计数器for i := 0; i < 100; i++ {wg.Add(1)go func() {defer wg.Done()for j := 0; j < 1000; j++ {counter.Increment()}}()}wg.Wait()fmt.Printf("Final counter value: %d\n", counter.Value())
}
sync.RWMutex
读写锁允许多个读操作同时进行:
type SafeMap struct {mu sync.RWMutexdata map[string]int
}func (sm *SafeMap) Get(key string) (int, bool) {sm.mu.RLock()defer sm.mu.RUnlock()val, ok := sm.data[key]return val, ok
}func (sm *SafeMap) Set(key string, value int) {sm.mu.Lock()defer sm.mu.Unlock()sm.data[key] = value
}
sync.Once
确保某个操作只执行一次:
package mainimport ("fmt""sync"
)var once sync.Once
var instance *Singletontype Singleton struct {data string
}func GetInstance() *Singleton {once.Do(func() {fmt.Println("Creating singleton instance")instance = &Singleton{data: "singleton"}})return instance
}func main() {var wg sync.WaitGroupfor i := 0; i < 10; i++ {wg.Add(1)go func(id int) {defer wg.Done()s := GetInstance()fmt.Printf("Goroutine %d got instance: %s\n", id, s.data)}(i)}wg.Wait()
}
并发模式与最佳实践
Worker Pool模式
package mainimport ("fmt""sync""time"
)type Job struct {ID intData string
}type Result struct {Job JobOutput string
}func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {defer wg.Done()for job := range jobs {fmt.Printf("Worker %d processing job %d\n", id, job.ID)time.Sleep(time.Millisecond * 100) // 模拟工作result := Result{Job: job,Output: fmt.Sprintf("Processed by worker %d", id),}results <- result}
}func main() {const numWorkers = 3const numJobs = 10jobs := make(chan Job, numJobs)results := make(chan Result, numJobs)var wg sync.WaitGroup// 启动workerfor i := 1; i <= numWorkers; i++ {wg.Add(1)go worker(i, jobs, results, &wg)}// 发送任务for i := 1; i <= numJobs; i++ {jobs <- Job{ID: i, Data: fmt.Sprintf("data-%d", i)}}close(jobs)// 等待所有worker完成go func() {wg.Wait()close(results)}()// 收集结果for result := range results {fmt.Printf("Job %d result: %s\n", result.Job.ID, result.Output)}
}
扇入扇出模式
// 扇出:将工作分发给多个goroutine
func fanOut(input <-chan int, workers int) []<-chan int {outputs := make([]<-chan int, workers)for i := 0; i < workers; i++ {output := make(chan int)outputs[i] = outputgo func(out chan<- int) {defer close(out)for n := range input {out <- n * n // 计算平方}}(output)}return outputs
}// 扇入:将多个channel的结果合并
func fanIn(inputs ...<-chan int) <-chan int {output := make(chan int)var wg sync.WaitGroupfor _, input := range inputs {wg.Add(1)go func(in <-chan int) {defer wg.Done()for n := range in {output <- n}}(input)}go func() {wg.Wait()close(output)}()return output
}
性能优化技巧
1. 合理设置GOMAXPROCS
import "runtime"func init() {// 设置使用的CPU核心数runtime.GOMAXPROCS(runtime.NumCPU())
}
2. 避免goroutine泄漏
// 错误示例:可能导致goroutine泄漏
func badExample() {ch := make(chan int)go func() {ch <- 1 // 如果没有接收者,这个goroutine会永远阻塞}()// 函数返回,但goroutine仍在运行
}// 正确示例:使用context控制goroutine生命周期
func goodExample(ctx context.Context) {ch := make(chan int, 1) // 使用缓冲channelgo func() {select {case ch <- 1:case <-ctx.Done():return}}()
}
3. 使用对象池减少GC压力
import "sync"var pool = sync.Pool{New: func() interface{} {return make([]byte, 1024)},
}func processData(data []byte) {buf := pool.Get().([]byte)defer pool.Put(buf)// 使用buf处理数据
}
实战案例
并发HTTP客户端
package mainimport ("fmt""net/http""sync""time"
)type Result struct {URL stringStatusCode intDuration time.DurationError error
}func fetchURL(url string, results chan<- Result, wg *sync.WaitGroup) {defer wg.Done()start := time.Now()resp, err := http.Get(url)duration := time.Since(start)result := Result{URL: url,Duration: duration,Error: err,}if err == nil {result.StatusCode = resp.StatusCoderesp.Body.Close()}results <- result
}func main() {urls := []string{"https://www.google.com","https://www.github.com","https://www.stackoverflow.com","https://www.golang.org",}results := make(chan Result, len(urls))var wg sync.WaitGroup// 并发请求所有URLfor _, url := range urls {wg.Add(1)go fetchURL(url, results, &wg)}// 等待所有请求完成go func() {wg.Wait()close(results)}()// 处理结果for result := range results {if result.Error != nil {fmt.Printf("Error fetching %s: %v\n", result.URL, result.Error)} else {fmt.Printf("%s: %d (%v)\n", result.URL, result.StatusCode, result.Duration)}}
}
总结
Go语言的并发编程提供了强大而简洁的工具:
- Goroutine:轻量级协程,易于创建和管理
- Channel:类型安全的通信机制
- sync包:提供各种同步原语
- 并发模式:Worker Pool、扇入扇出等经典模式
掌握这些概念和技巧,能够帮助您构建高性能、可扩展的并发应用程序。记住Go的并发哲学:通过通信来共享内存,而不是通过共享内存来通信。
参考资源
- Go官方文档 - 并发
- Go并发模式
- Go内存模型