Go并发同步的实现策略与技巧 - 详解与实例

silverwq
2023-12-31 / 0 评论 / 95 阅读 / 正在检测是否收录...

业务模型介绍。

  1. 上游:M 个协程接收用户注册,把用户添加到集合 users 中。
  2. 下游:N个协程监听users,当发现它里面攒够100个用户时,给前3(常量H)个注册的用户发放积分,然后协程终止。(这部分逻辑上游不关心,相关代码不能放到上游协程里实现)

实现代码

package main

import (
    "math/rand"
    "sort"
    "sync"
    "sync/atomic"
    "time"
)

type User struct {
    RegTime time.Time //注册时间
    Score   int       //积分
}

var (
    users        []*User    //用户集合
    mu           sync.Mutex //读写users之前先加锁,保证它的并发安全性
    listenNumber int32      //监听全局变量users的次数
    prized       bool       //是否已经执行过积分奖励
)

const (
    H = 3 // 达到100个用户后,给前3个用户积分增加1
    M = 8 // 8个协程并发注册
    N = 5 // 5个协程并发检查
)

func InitGlobalVar() {
    users = make([]*User, 0, 500)
    mu = sync.Mutex{}
    listenNumber = 0
    prized = false
}

// 给前H个用户积分奖励
func prize() {
    //按注册时间排序
    sort.Slice(users, func(i, j int) bool {
        return users[i].RegTime.Before(users[j].RegTime)
    })
    //把前H个用户的Score加1
    for _, user := range users[:H] {
        user.Score += 1
    }
}

// 业务模型介绍。
// 上游:M个协程接收用户注册,把用户添加到集合users中。
// 下游:N个协程监听users,当发现它里面攒够100个用户时,给前3(常量H)个注册的用户发放积分,然后协程终止。(这部分逻辑上游不关心,相关代码不能放到上游协程里实现)

/*
方式一:通过for轮询的方式,会消耗很多cpu性能
*/
func BusinessModel() {
    InitGlobalVar()
    downstreamOver := false
    //上游
    for i := 0; i < M; i++ {
        go func() {
            for { //不停地注册新用户
                if downstreamOver {
                    break
                }
                mu.Lock()
                users = append(users, &User{RegTime: time.Now()}) //注册用户
                mu.Unlock()
                time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) //随机休息一段时间,再注册下一个用户
            }
        }()
    }
    //下游
    wg := sync.WaitGroup{}
    wg.Add(N)
    for i := 0; i < N; i++ {
        go func() {
            defer wg.Done()
            for {
                mu.Lock()
                if !prized {
                    atomic.AddInt32(&listenNumber, 1)
                    if len(users) >= 100 {
                        prize() // 达到100,发放奖励
                        prized = true
                    }
                }
                mu.Unlock()
                if prized {
                    break
                }
            }
        }()
    }
    wg.Wait()
    downstreamOver = true
}

// 减少对全局变量users的监听次数。上游每次改变users时向一个channel里发送一条数据
/*
方式二:通过channel的方式
*/
func SignalWithChannel() {
    InitGlobalVar()
    ch := make(chan struct{}, 10*N)
    downstreamOver := false
    //上游
    for i := 0; i < M; i++ {
        go func() {
            for { //不停地注册新用户
                if downstreamOver {
                    break
                }
                mu.Lock()
                users = append(users, &User{RegTime: time.Now()}) //注册用户
                mu.Unlock()
                ch <- struct{}{}                                             // 每次新增一个user的时候,向channel里添加一个用户
                time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) //随机休息一段时间,再注册下一个用户
            }
        }()
    }
    //下游
    wg := sync.WaitGroup{}
    wg.Add(N)
    for i := 0; i < N; i++ {
        go func() {
            defer wg.Done()
            for {
                <-ch //阻塞,直到users有改变
                mu.Lock()
                if !prized {
                    atomic.AddInt32(&listenNumber, 1)
                    if len(users) >= 100 {
                        prize()
                        prized = true
                    }
                }
                mu.Unlock()
                if prized {
                    break
                }
            }
        }()
    }
    wg.Wait()
    downstreamOver = true
}

/*
方式三:通过sync.Cond的方式
*/
func SignalWithCond() {
    InitGlobalVar()
    cond := sync.NewCond(&mu) //cond.L等价于mu
    downstreamOver := false
    //上游
    for i := 0; i < M; i++ {
        go func() {
            for { //不停地注册新用户
                if downstreamOver {
                    break
                }
                mu.Lock()
                users = append(users, &User{RegTime: time.Now()}) //注册用户
                mu.Unlock()
                //通知别人users有变化。Signal只能通知到一个协程
                cond.Signal()
                time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) //随机休息一段时间,再注册下一个用户
            }
        }()
    }
    //下游
    wg := sync.WaitGroup{}
    wg.Add(N)
    for i := 0; i < N; i++ {
        go func() {
            defer wg.Done()
            for {
                mu.Lock()   //等价于cond.L.Lock()
                cond.Wait() //阻塞,直到接收到通知。Wait内部会先执行mu.Unlock(),等接收到信号后再执行mu.Lock(),所以在调Wait()之前需要先上锁
                if !prized {
                    atomic.AddInt32(&listenNumber, 1)
                    if len(users) >= 100 {
                        prize()
                        prized = true
                    }
                }
                mu.Unlock() //等价于cond.L.Unlock()
                if prized {
                    break
                }
            }
        }()
    }
    wg.Wait()
    downstreamOver = true
}

/*
channel 广播的方式,一下子通过所有子协程
*/
func BroadcastWithChannel() {
    InitGlobalVar()
    ch := make(chan struct{}, 10*N)
    downstreamOver := false
    //上游
    for i := 0; i < M; i++ {
        go func() {
            for { //不停地注册新用户
                if downstreamOver {
                    break
                }
                mu.Lock()
                users = append(users, &User{RegTime: time.Now()}) //注册用户
                mu.Unlock()
                //把n个下游协程全部通知一遍。
                //close channel也能实现通知的功能,但是一个channl只能close一次,本业务中我们需要多次通知。实际中上游一般不知道下游协程的数目,这种情况下只能用cond.Broadcast()
                for j := 0; j < N; j++ {
                    ch <- struct{}{}
                }
                time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) //随机休息一段时间,再注册下一个用户
            }
        }()
    }
    //下游
    wg := sync.WaitGroup{}
    wg.Add(N)
    for i := 0; i < N; i++ {
        go func() {
            defer wg.Done()
            for {
                <-ch //阻塞,直到users有改变
                atomic.AddInt32(&listenNumber, 1)
                mu.Lock()
                done := false
                if len(users) >= 100 {
                    prize()
                    done = true
                }
                mu.Unlock()
                if done {
                    break
                }
            }
        }()
    }
    wg.Wait()
    downstreamOver = true
}

/*
*
cond.Broadcast广播的方式
*/
func BroadcastWithCond() {
    InitGlobalVar()
    cond := sync.NewCond(&mu) //cond.L等价于mu
    downstreamOver := false
    //上游
    for i := 0; i < M; i++ {
        go func() {
            for { //不停地注册新用户
                if downstreamOver {
                    break
                }
                mu.Lock()
                users = append(users, &User{RegTime: time.Now()}) //注册用户
                mu.Unlock()
                // 通知所有下游协程
                cond.Broadcast()
                time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) //随机休息一段时间,再注册下一个用户
            }
        }()
    }
    //下游
    wg := sync.WaitGroup{}
    wg.Add(N)
    for i := 0; i < N; i++ {
        go func() {
            defer wg.Done()
            for {
                mu.Lock()
                cond.Wait()
                atomic.AddInt32(&listenNumber, 1)
                done := false
                if len(users) >= 100 {
                    prize()
                    done = true
                }
                mu.Unlock()
                if done {
                    break
                }
            }
        }()
    }
    wg.Wait()
    downstreamOver = true
}
0

评论 (0)

取消