Skip to content
/ Blog Public
  • Notifications
  • Fork 0
  • Star 0
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Sign up for GitHub

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jump to bottom

golang协程池设计(二) #2

Open
Mike-Tyson opened this issue Mar 23, 2022 · 0 comments
Open

golang协程池设计(二) #2

Mike-Tyson opened this issue Mar 23, 2022 · 0 comments

Comments

@Mike-Tyson
Copy link
Owner

Mike-Tyson commented Mar 23, 2022

  上一篇文章介绍了封装一个简单的协程池,实现比较简单,缺点是对协程池状态、任务执行等缺乏管理(仅仅把异常任务数统计了下),本章将难度提高一些,参考go-playground/pool ,封装一个性能更高、特性更丰富些的协程池
  引用源码:https://github.com/go-playground/pool
  文章内容:参考go-playground-pool代码(总共不到1千行代码),只对协程池的关键特性做了详细说明,代码及变量等名尽量延用源码的命名,减轻阅读压力

Do not communicate by sharing memory; instead, share memory by communicating.

一、设计思路:

使用协程池pool管理工作协程worker,batch用来管理一组批量的工作单元、workunit作为具体的工作单元 对应要执行的具体任务(文中简写为wu)

结构图示:
image

二、抽象模型

pool:抽象为一个工厂,负责生产任务,worker为该工厂的车间 负责具体的生产任务
workunit:抽象为加工件,以下也简称为wu
batch:视为工厂pool的管理注册中心,负责所有加工件的管理工作,目前与pool之间的关系是一对一
加工件先在batch注册(queue),同时交给车间(worker)处理,batch此时会开一个goroutine 阻塞方式等待加工件全部处理完,然后记录到结果集中(Results)
整个加工过程,一定要以调用queueComplete方法作为结束标记,之后batch和pool不再接受加工任务

抽象出工厂的工作流如下:
image

三、开始封装:

首先附上时序图(可以先浏览一下,看代码细节时回来参考):
image-20220324060436053

pool、batch、wu的结构体:

type limitedPool struct {
	workers uint			//工作协程数
	work    chan *workUnit	//任务队列
	cancel  chan struct{}	//取消信号
	closed  bool			//是否关闭
	m       sync.RWMutex	//操作锁
}
type batch struct {
	pool    Pool			//协程池
	m       sync.Mutex		//操作锁
	units   []WorkUnit		//管理工作单元
	results chan WorkUnit	//结果集
	done    chan struct{}	//结束标识,跨协程阻塞通信模式
	closed  bool			//关闭标识
	wg      *sync.WaitGroup	//工作单元执行的信号量
}
type workUnit struct {
	value      interface{}		//结果
	err        error			        //错误信息
	done       chan struct{}	        //结束标识,跨协程阻塞通信模式
	fn         WorkFunc			//执行的方法
	cancelled  atomic.Value		//已取消
	cancelling atomic.Value		//取消中,
	writing    atomic.Value		//写入状态(在逻辑执行时,才会写入,且当前wu不能再次被cancel)
}

(关于wu的三个状态,后边会详细介绍设计原理)

1 初始化协程池

func NewLimited(workers uint) Pool {

	if workers == 0 {
		panic("invalid workers '0'")
	}

	p := &limitedPool{
		workers: workers,
		work: make(chan *workUnit, workers * 2),
		cancel: make(chan struct{}),
		closed: false,
	}

	//开启协程监听任务
	for i := 0; i < int(p.workers); i++ {
		p.newWorker(p.work, p.cancel)
	}

	return p
}

2,启动协程,并启动对任务的监听

for {
	select {
		case wu = <-work:
			if wu == nil {
				continue
			}

			if wu.cancelled.Load() == nil {
				value, err = wu.fn(wu)
				wu.writing.Store(struct{}{})
				//这一步相当于在writing和cancelled并发时,能把writing给拦截下来(虽然成功了),实际相当于只执行了cancelled
				if wu.cancelled.Load() == nil && wu.cancelling.Load() == nil {
					wu.value, wu.err = value, err

					//关闭wu
					close(wu.done)
				}
			}

		case <-cancel:
			return
		}
	}

3,创建批量任务后,并添加任务

func (b *batch) Queue(fn WorkFunc) {
	b.m.Lock()
	if b.closed {
		b.m.Unlock()
		return
	}

	wu := b.pool.Queue(fn)	//任务进pool的任务队列,进行处理

	b.units = append(b.units, wu)
	b.wg.Add(1)
	b.m.Unlock()

	go func(b *batch, wu WorkUnit) {
		wu.Wait()	//阻塞等待wu执行完成
		b.results <- wu	//存入结果集
		b.wg.Done()
	}(b, wu)
}

入pool的任务队列:

go func() {		
	p.m.RLock()
	if p.closed {
		w.err = &ErrPoolClosed{s: errClosed}
		if w.cancelled.Load() == nil {
			close(w.done)
		}
		p.m.RUnlock()
		return
	}

	p.work <- w
	p.m.RUnlock()
}()

关于wu的三个状态设计:已取消、取消中、写入中:
我的理解是这样的:设计上会最大程度的追求wu的原子操作,要么执行成功,要么取消成功
执行wu操作:

if wu.cancelled.Load() == nil {
	value, err = wu.fn(wu)
	wu.writing.Store(struct{}{})

	//这一步相当于在writing和cancelled并发时,能把writing给拦截下来(虽然成功了),实际相当于只执行了cancelled
	if wu.cancelled.Load() == nil && wu.cancelling.Load() == nil {
		wu.value, wu.err = value, err
						
		close(wu.done)
		}
}
取消wu代码:
func (wu *workUnit) cancelWithError(err error) {
	wu.cancelling.Store(struct{}{})
	if wu.writing.Load() == nil && wu.cancelled.Load() == nil {
		wu.cancelled.Store(struct{}{})
		wu.err = err
		close(wu.done)
	}
}

首先有个疑问:既然标识任务状态,直接设定为执行和取消不就可以了?
image

可以看到,writing和cancelled两个操作并发时,后边的赋值、关闭wu操作可能都会发生,这样就不能确定wu是真正被执行了,还是被取消了(而且close同一个通道会导致程序出错)

加上中间态cancelling后:
image
一旦发生并发,会优先取消掉wu(即使fn方法已经执行完毕)

4,任务结束QueueComplete

func (b *batch) QueueComplete() {
	b.m.Lock()
	b.closed = true	//此后Queue方法不能再添加任务
	close(b.done)	//关闭batch,确保Results方法阻塞执行
	b.m.Unlock()
}

(添加任务结束后,一定要调用这个方法!)

5,获取结果集

func (b *batch) Results() <-chan WorkUnit {
	go func(b *batch) {
		<-b.done        //线程阻塞 等待b.done被close
		b.m.Lock()
		b.wg.Wait()
		b.m.Unlock()
		close(b.results)
	}(b)

	return b.results
}

​ 从b.done,还有wu.done ,都可以看到有非阻塞通道实现的阻塞通信方式:
image

6,关于pool、batch的取消
取消pool、batch时,会关闭当前pool或batch,然后按照对每一个wu执行cancel操作,参见源码

@Mike-Tyson Mike-Tyson changed the title 动手封装golang协程池(二) golang协程池设计(二) Mar 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant
@Mike-Tyson

Footer

© 2024 GitHub, Inc.

两个鬼故事夸女孩子漂亮的话合租情人取起名软件排行联名起诉有多严重电子书阅读软件焱字怎么起名茶叶公司起名起名缺金缺火男孩起名天才高手gamcore.com瓷都免费起名测试打分销售政策古天战帝黑暗圣经动画全国起名大师排名李姓四个字的男孩起名热火vs快船王牌法神在中央大礼堂举行的会议是哪个起名子的意思圣女的欲望2021年男孩起名公司查起名吉凶测试打分www.fefe66.com炸串店起名超炫qq名明子字辈起名起名字女孩高冷魏姓起姓名大全男孩起名多少笔画好少年生前被连续抽血16次?多部门介入两大学生合买彩票中奖一人不认账让美丽中国“从细节出发”淀粉肠小王子日销售额涨超10倍高中生被打伤下体休学 邯郸通报单亲妈妈陷入热恋 14岁儿子报警何赛飞追着代拍打雅江山火三名扑火人员牺牲系谣言张家界的山上“长”满了韩国人?男孩8年未见母亲被告知被遗忘中国拥有亿元资产的家庭达13.3万户19岁小伙救下5人后溺亡 多方发声315晚会后胖东来又人满为患了张立群任西安交通大学校长“重生之我在北大当嫡校长”男子被猫抓伤后确诊“猫抓病”测试车高速逃费 小米:已补缴周杰伦一审败诉网易网友洛杉矶偶遇贾玲今日春分倪萍分享减重40斤方法七年后宇文玥被薅头发捞上岸许家印被限制高消费萧美琴窜访捷克 外交部回应联合利华开始重组专访95后高颜值猪保姆胖东来员工每周单休无小长假男子被流浪猫绊倒 投喂者赔24万小米汽车超级工厂正式揭幕黑马情侣提车了西双版纳热带植物园回应蜉蝣大爆发当地回应沈阳致3死车祸车主疑毒驾恒大被罚41.75亿到底怎么缴妈妈回应孩子在校撞护栏坠楼外国人感慨凌晨的中国很安全杨倩无缘巴黎奥运校方回应护栏损坏小学生课间坠楼房客欠租失踪 房东直发愁专家建议不必谈骨泥色变王树国卸任西安交大校长 师生送别手机成瘾是影响睡眠质量重要因素国产伟哥去年销售近13亿阿根廷将发行1万与2万面值的纸币兔狲“狲大娘”因病死亡遭遇山火的松茸之乡“开封王婆”爆火:促成四五十对奥巴马现身唐宁街 黑色着装引猜测考生莫言也上北大硕士复试名单了德国打算提及普京时仅用姓名天水麻辣烫把捣辣椒大爷累坏了

两个鬼故事 XML地图 TXT地图 虚拟主机 SEO 网站制作 网站优化