在 Go 语言并发编程中,合理的并发模式能显著提升程序的可维护性和性能。本文将深入解析三种典型的并发模式实现,通过具体案例展示如何优雅地管理任务生命周期、资源池和工作 goroutine 池。
一、runner 模式:任务生命周期管理
在定时任务、批处理等场景中,我们需要对任务执行时间进行控制,并在收到中断信号时安全终止任务。runner 模式通过通道和超时机制实现了这一需求。
1. 核心实现原理
runner 模式的核心在于通过三个通道协同管理任务状态:
interrupt
通道接收操作系统中断信号complete
通道报告任务完成状态timeout
通道控制任务执行超时
下面是 runner 包的核心实现:
// Runner 管理任务执行生命周期
type Runner struct {
interrupt chan os.Signal // 接收中断信号
complete chan error // 任务完成通知
timeout <-chan time.Time // 超时控制
tasks []func(int) // 任务列表
closed bool // 运行状态
}
// New 创建新的Runner实例
func New(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
}
}
// Add 添加任务到Runner
func (r *Runner) Add(tasks ...func(int)) {
r.tasks = append(r.tasks, tasks...)
}
// Start 启动任务执行并监视状态
func (r *Runner) Start() error {
// 注册中断信号处理
signal.Notify(r.interrupt, os.Interrupt)
// 启动任务执行goroutine
go func() {
r.complete <- r.run()
}()
// 等待任务完成或超时
select {
case err := <-r.complete:
return err
case <-r.timeout:
return errors.New("任务执行超时")
}
}
// run 按顺序执行注册的任务
func (r *Runner) run() error {
for id, task := range r.tasks {
// 检查是否收到中断信号
if r.gotInterrupt() {
return errors.New("收到中断信号")
}
// 执行任务
task(id)
}
return nil
}
// gotInterrupt 检测中断信号
func (r *Runner) gotInterrupt() bool {
select {
case <-r.interrupt:
signal.Stop(r.interrupt)
return true
default:
return false
}
}
2. 应用场景示例
以下是使用 runner 模式实现定时任务的案例,任务将在 3 秒内执行,超时或收到中断时终止:
func main() {
log.Println("开始执行任务...")
// 创建3秒超时的Runner
r := runner.New(3 * time.Second)
// 添加三个任务
r.Add(
func(id int) {
log.Printf("任务 %d 执行中...", id)
time.Sleep(1 * time.Second)
},
func(id int) {
log.Printf("任务 %d 执行中...", id)
time.Sleep(2 * time.Second)
},
func(id int) {
log.Printf("任务 %d 执行中...", id)
time.Sleep(3 * time.Second)
},
)
// 执行任务并处理结果
if err := r.Start(); err != nil {
switch err {
case errors.New("任务执行超时"):
log.Println("任务超时,终止执行")
case errors.New("收到中断信号"):
log.Println("收到中断,终止执行")
}
}
log.Println("任务处理完成")
}
3. 关键特性解析
- 超时控制:通过
time.After
设置任务整体执行超时时间 - 中断处理:利用
signal.Notify
捕获系统中断信号 - 任务顺序执行:按添加顺序依次执行任务,适合有依赖关系的场景
- 优雅退出:无论超时还是中断,都能确保资源释放
二、pool 模式:资源池管理
在数据库连接、文件句柄等资源管理场景中,资源池模式能有效复用资源,避免频繁创建和销毁带来的性能损耗。
1. 资源池核心设计
pool 模式通过有缓冲通道实现资源的获取与释放,确保资源复用:
// Pool 管理可复用资源池
type Pool struct {
m sync.Mutex // 互斥锁保护资源池
resources chan io.Closer // 资源通道
factory func() (io.Closer, error) // 资源创建工厂
closed bool // 资源池状态
}
// New 创建新的资源池
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("资源池大小不能小于1")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
// Acquire 从资源池获取资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
// 有空闲资源时直接获取
case r, ok := <-p.resources:
if !ok {
return nil, errors.New("资源池已关闭")
}
return r, nil
// 无空闲资源时创建新资源
default:
return p.factory()
}
}
// Release 释放资源回池
func (p *Pool) Release(r io.Closer) {
p.m.Lock()
defer p.m.Unlock()
// 池已关闭时直接关闭资源
if p.closed {
r.Close()
return
}
// 尝试将资源放回池,满时关闭资源
select {
case p.resources <- r:
log.Println("资源放回池")
default:
log.Println("资源池已满,关闭资源")
r.Close()
}
}
// Close 关闭资源池并释放所有资源
func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true
// 关闭通道并释放资源
close(p.resources)
for r := range p.resources {
r.Close()
}
}
2. 数据库连接池应用案例
以下是使用 pool 模式管理数据库连接的示例,模拟创建和复用数据库连接:
// dbConnection 模拟数据库连接
type dbConnection struct {
ID int32
}
// Close 实现io.Closer接口
func (db *dbConnection) Close() error {
log.Printf("关闭连接 %d\n", db.ID)
return nil
}
var idCounter int32
// createConnection 连接创建工厂
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Printf("创建新连接 %d\n", id)
return &dbConnection{ID: id}, nil
}
func main() {
// 创建包含2个连接的资源池
p, err := pool.New(createConnection, 2)
if err != nil {
log.Fatal(err)
}
defer p.Close()
var wg sync.WaitGroup
wg.Add(5) // 5个任务竞争2个连接
// 模拟5个任务获取连接
for i := 0; i < 5; i++ {
go func(taskID int) {
defer wg.Done()
// 获取连接
conn, err := p.Acquire()
if err != nil {
log.Fatal(err)
}
defer p.Release(conn)
// 模拟数据库操作
log.Printf("任务 %d 使用连接 %d\n", taskID, conn.(*dbConnection).ID)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}(i)
}
wg.Wait()
log.Println("所有任务完成")
}
3. 资源池设计要点
- 接口抽象:通过
io.Closer
接口实现资源统一管理 - 动态扩容:无空闲资源时自动创建新资源
- 安全释放:通过互斥锁保证并发安全
- 优雅关闭:关闭时释放所有资源,避免泄漏
三、work 模式:goroutine 池实现
在需要控制并发量的场景中,work 模式通过固定数量的 goroutine 池处理任务,避免创建过多 goroutine 导致资源耗尽。
1. 工作池核心实现
work 模式通过无缓冲通道实现任务与工作 goroutine 的同步:
// Worker 定义工作接口
type Worker interface {
Task()
}
// Pool 工作goroutine池
type Pool struct {
work chan Worker // 任务通道
wg sync.WaitGroup // 等待组
}
// New 创建新的工作池
func New(maxGoroutines int) *Pool {
p := Pool{
work: make(chan Worker),
}
p.wg.Add(maxGoroutines)
for i := 0; i < maxGoroutines; i++ {
go func() {
// 从通道获取任务并执行
for w := range p.work {
w.Task()
}
p.wg.Done()
}()
}
return &p
}
// Run 提交任务到工作池
func (p *Pool) Run(w Worker) {
p.work <- w
}
// Shutdown 关闭工作池
func (p *Pool) Shutdown() {
close(p.work)
p.wg.Wait()
}
2. 任务处理应用案例
以下是使用 work 模式处理批量任务的示例,限制同时运行 3 个 goroutine:
// task 实现Worker接口
type task struct {
id int
}
func (t task) Task() {
log.Printf("任务 %d 开始处理\n", t.id)
// 模拟任务处理时间
time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond)
log.Printf("任务 %d 处理完成\n", t.id)
}
func main() {
// 创建包含3个工作goroutine的池
p := work.New(3)
defer p.Shutdown()
var wg sync.WaitGroup
wg.Add(10) // 10个任务
// 提交10个任务
for i := 0; i < 10; i++ {
go func(id int) {
defer wg.Done()
p.Run(task{id: id})
}(i)
}
wg.Wait()
log.Println("所有任务处理完毕")
}
3. 工作池特性分析
- 固定并发量:通过控制 goroutine 数量避免系统负载过高
- 任务同步:无缓冲通道保证任务与工作 goroutine 一一对应
- 简洁易用:通过接口抽象任务逻辑,解耦业务与并发控制
- 优雅退出:Shutdown 方法确保所有任务完成后退出
四、三种模式的应用场景对比
模式 | 核心特性 | 适用场景 | 典型案例 |
---|---|---|---|
runner | 任务超时控制与中断处理 | 定时任务、批处理作业 | 数据备份、定时报表生成 |
pool | 资源复用与管理 | 数据库连接、文件句柄等资源管理 | 高并发 Web 服务连接池 |
work | 固定并发量任务处理 | 批量任务处理、限制并发请求 | 图片处理、日志分析 |
五、并发模式最佳实践
根据场景选择模式:
- 需要超时控制时优先使用 runner 模式
- 资源复用场景选择 pool 模式
- 限制并发量场景使用 work 模式
接口抽象原则:
通过接口解耦业务逻辑与并发控制,如 runner 的任务函数、pool 的资源接口、work 的 Task 方法资源释放策略:
所有模式都应实现优雅关闭机制,确保资源正确释放,避免泄漏监控与调优:
在生产环境中添加监控指标,根据负载调整参数,如 pool 的大小、work 的 goroutine 数量
Go 语言的并发模式通过简洁的设计解决了复杂的并发控制问题,合理应用这些模式能让代码更清晰、更健壮,同时提升系统的性能和稳定性。在实际开发中,可根据具体需求组合或扩展这些模式,打造更适合业务场景的并发解决方案。