大家好, 今天跟大家聊聊在我们项目中的优先级队列的实现。
队列,是数据结构中实现先进先出策略的一种数据结构。而优先队列则是带有优先级的队列,即先按优先级分类,然后相同优先级的再 进行排队。优先级高的队列中的元素会优先被消费。如下图所示:
在Go中,可以定义一个切片,切片的每个元素代表一种优先级队列,切片的索引顺序代表优先级顺序,后面代码实现部分我们会详细讲解。
先来看现实生活中的例子。银行的办事窗口,有普通窗口和vip窗口,vip窗口因为排队人数少,等待的时间就短,比普通窗口就会优先处理。同样,在登机口,就有贵宾通道和普通,同样贵宾通道优先登机。
在互联网中,当然就是请求和响应。使用优先级队列的作用是将请求按特定的属性划分出优先级,然后按优先级的高低进行优先处理。在研发服务的时候这里有个隐含的约束条件就是服务器资源(CPU、内存、带宽等)是有限的。如果服务器资源是无限的,那么也就不需要队列进行排队了,来一个请求就立即处理一个请求就好了。所以,为了在最大限度的利用服务器资源的前提下,将更重要的任务(优先级高的请求)优先处理,以更好的服务用户。
对于请求优先级的划分可以根据业务的特点根据价值高的优先原则来进行划分即可。例如可以根据是否是否是会员、是否是VIP会员等属性进行划分优先级。也可以根据是否是付费用户进行划分。在博客的业务中,也可以根据是否是大V的属性进行优先级划分。在互联网广告业务中,可以根据广告位资源价值高低来划分优先级。
在完整的优先级队列中有三个重要的角色,分别是优先级队列、工作单元Job、消费者worker。
根据队列个数和消费者个数,我们可以将队列-消费者模式分为单队列-单消费者模式、多队列(优先级队列)- 单消费者模式、多队列(优先级队列)- 多消费者模式。
我们先从最简单的单队列-单消费者模式实现,然后一步步演化成多队列(优先级队列)-多消费者模式。
我们先来看下队列的实现。这里我们用Golang中的List数据结果来实现,List数据结构是一个双向链表,包含了将元素放到链表尾部、将头部元素弹出的操作,符合队列先进先出的特性。
好,我们看下具体的队列的数据结构:
type JobQueue struct {
mu sync.Mutex //队列的操作需要并发安全
jobList *list.List //List是golang库的双向队列实现,每个元素都是一个job
noticeChan chan struct{} //入队一个job就往该channel中放入一个消息,以供消费者消费
}
/**
* 队列的Push操作
*/
func (queue *JobQueue) PushJob(job Job) {
queue.jobList.PushBack(job) //将job加到队尾
queue.noticeChan <- struct{}{}
}
到这里有同学就会问了,为什么不直接将job推送到Channel中,然后让消费者依次消费不就行了么?是的,单队列这样是可以的,因为我们最终目标是为了实现优先级的多队列,所以这里即使是单队列,我们也使用List数据结构,以便后续的演变。
还有一点,大家注意到了,这里入队操作时有一个 这样的操作:
queue.noticeChan <- struct{}{}
消费者监听的实际上不是队列本身,而是通道noticeChan。当有一个元素入队时,就往noticeChan通道中输入一条消息,这里是一个空结构体,主要作用就是通知消费者worker,队列里有要处理的元素了,可以从队列中获取了。这个在后面演化成多队列以及多消费者模式时会很有用。
根据队列的先进先出原则,是要获取队列的最先进入的元素。Golang中List结构体的Front()函数是获取链表的第一个元素,然后通过Remove函数将该元素从链表中移出,即得到了队列中的第一个元素。这里的Job结构体先不用关心,我们后面实现工作单元Job时,会详细讲解。
/**
* 弹出队列的第一个元素
*/
func (queue *JobQueue) PopJob() Job {
queue.mu.Lock()
defer queue.mu.Unlock()
/**
* 说明在队列中没有元素了
*/
if queue.jobList.Len() == 0 {
return nil
}
elements := queue.jobList.Front() //获取队列的第一个元素
return queue.jobList.Remove(elements).(Job) //将元素从队列中移除并返回
}
上面我们提到,消费者监听的是noticeChan通道。当有元素入队时,会往noticeChan中输入一条消息,以便通知消费者进行消费。如果队列中没有要消费的元素,那么消费者就会阻塞在该通道上。
func (queue *JobQueue) WaitJob() <-chan struct{} {
return queue.noticeChan
}
一个工作单元就是一个要执行的任务。在系统中往往需要执行不同的任务,就是需要有不同类型的工作单元,但这些工作单元都有一组共同的执行流程。我们看下工作单元的类图。
我们看下类图中的几个角色:
接下来,我们以计算一个int类型数字的平方的SquareJob为例来看下具体的实现。
首先看下该结构体的定义
type BaseJob struct {
Err error
DoneChan chan struct{} //当作业完成时,或者作业被取消时,通知调用者
Ctx context.Context
cancelFunc context.CancelFunc
}
在该结构体中,我们主要关注DoneChan字段就行,该字段是当具体的Job的Execute执行完成后,来通知调用者的。
再来看Done函数,该函数就是在Execute函数完成后,要关闭DoneChan通道,以解除Job的阻塞而继续执行其他逻辑。
/**
* 作业执行完毕,关闭DoneChan,所有监听DoneChan的接收者都能收到关闭的信号
*/
func (job *BaseJob) Done() {
close(job.DoneChan)
}
再来看WaitDone函数,该函数是当Job执行后,要等待Job执行完成,在未完成之前,DoneChan里没有消息,通过该函数就能将job阻塞,直到Execute中调用了Done(),以便解除阻塞。
/**
* 等待job执行完成
*/
func (job *BaseJob) WaitDone() {
select {
case <-job.DoneChan:
return
}
}
type SquareJob struct {
*BaseJob
x int
}
从结构体的定义中可知,SquareJob嵌套了BaseJob,所以该结构体拥有BaseJob的所有字段和方法。在该结构体主要实现了Execute的逻辑:对x求平方。
func (s *SquareJob) Execute() error {
result := s.x * s.x
fmt.Println("the result is ", result)
return nil
}
Worker主要功能是通过监听队列里的noticeChan是否有需要处理的元素,如果有元素的话从队列里获取到要处理的元素job,然后执行job的Execute方法。
我们将该结构体定位为WorkerManager,因为在后面我们讲解多Worker模式时,会需要一个Worker的管理者,因此定义成了WorkerManager。
type WorkerManager struct {
queue *JobQueue
closeChan chan struct{}
}
StartWorker函数,只有一个for循环,不断的从队列中获取Job。获取到Job后,进行消费Job,即ConsumeJob。
func (m *WorkerManager) StartWork() error {
fmt.Println("Start to Work")
for {
select {
case <-m.closeChan:
return nil
case <-m.queue.noticeChan:
job := m.queue.PopJob()
m.ConsumeJob(job)
}
}
return nil
}
func (m *WorkerManager) ConsumeJob(job Job) {
defer func() {
job.Done()
}()
job.Execute()
}
到这里,单队列-单消费者模式中各角色的实现就讲解完了。我们通过main函数将其关联起来。
func main() {
//初始化一个队列
queue := &JobQueue{
jobList: list.New(),
noticeChan: make(chan struct{}, 10),
}
//初始化一个消费worker
workerManger := NewWorkerManager(queue)
// worker开始监听队列
go workerManger.StartWork()
// 构造SquareJob
job := &SquareJob{
BaseJob: &BaseJob{
DoneChan: make(chan struct{}, 1),
},
x: 5,
}
//压入队列尾部
queue.PushJob(job)
//等待job执行完成
job.WaitDone()
print("The End")
}
有了单队列-单消费者的基础,我们如何实现多队列-单消费者模式。也就是优先级队列。
优先级的队列,实质上就是根据工作单元Job的优先级属性,将其放到对应的优先级队列中,以便worker可以根据优先级进行消费。我们要在Job结构体中增加一个Priority属性。因为该属性是所有Job都共有的,因此定义在BaseJob上更合适.
type BaseJob struct {
Err error
DoneChan chan struct{} //当作业完成时,或者作业被取消时,通知调用者
Ctx context.Context
cancelFunc context.CancelFunc
priority int //工作单元的优先级
}
我们再来看看多队列如何实现。实际上就是用一个切片来存储各个队列,切片的每个元素存储一个JobQueue队列元素即可。
var queues = make([]*JobQueue, 10, 100)
那各优先级的队列在切片中是如何存储的呢?切片索引顺序只代表优先级的高于低,不代表具体是哪个优先级。
什么意思呢?假设我们现在对目前的工作单元定义了1、4、7三个优先级。这3个优先级在切片中是按优先级从小到到依次存储在queues切片中的,如下图:
那为什么不让切片的索引就代表优先级,让优先级为1的队列存储在索引1处,优先级4的队列存储在索引4处,优先级7的队列存储在索引7处呢?如果这样存储的话,就会变成如下这样:
由此可见,这样的存储会造成空间的浪费。所以,我们是将队列按优先级高低依次存放到了切片中。
那既然这样,当一个优先级的job来了之后,我该怎么知道该优先级的队列是存储在哪个索引中呢?我们用一个map来映射优先级和切片索引之间的关系。这样当一个工作单元Job入队的时候,以优先级为key,就可以查找到对应优先级的队列存储在切片的哪个位置了。如下图所示:
代码定义:
var priorityIdx map[int][int] //该map的key是优先级,value代表的是queues切片的索引
好了,我们重新定义一下队列的结构体:
type PriorityQueue struct {
mu sync.Mutex
noticeChan chan struct{}
queues []*JobQueue
priorityIdx map[int]int
}
//原来的JobQueue会变成如下这样:
type JobQueue struct {
priority int //代表该队列是哪种优先级的队列
jobList *list.List //List是golang库的双向队列实现,每个元素都是一个job
}
这里我们注意到有以下几个变化:
好了,数据结构定义完了,我们看看将工作单元Job推入队列和从队列中弹出Job又有什么变化。
优先级队列的入队操作,就需要根据入队Job的优先级属性放到对应的优先级队列中,入队流程图如下:
当一个Job加入队列的时候,有两种场景,一种是该优先级的队列已经存在,则直接Push到队尾即可。一种是该优先级的队列还不存在,则需要先创建该优先级的队列,然后再将该工作单元Push到队尾。如下是两种场景。
队列已经存在的场景
这种场景会比较简单。假设我们要插入优先级为7的工作单元,首先从映射表中查找7是否存在,发现对应关系是2,则直接找到切片中索引2的元素,即优先级为7的队列,将job加入即可。如下图。
队列不存在的场景
这种场景稍微复杂些,在映射表中找不到要插入优先级的队列的话,则需要在切片中插入一个优先级队列,而为了优先级队列在切片中也保持有序(保持有序就可以知道队列的优先级的高低了),则需要移动相关的元素。我们以插入优先级为6的工作单元为例来讲解。
1、首先,我们的队列有一个初始化的状态,存储了优先级1、4、7的队列。如下图。
2、当插入优先级为6的工作单元时,发现在映射表中没有优先级6的映射关系,说明在切片中还没有优先级为6的队列的元素。所以需要在切片中依次查找到优先级6应该插入的位置在4和7之间,也就是需要存储在切片2的位置。
3、将原来索引2位置的优先级为7的队列往后移动到3,同时更新映射表中的对应关系。
4、将优先级为6的工作单元插入到索引2的队列中,同时更新映射表中的优先级和索引的关系。
我们看下代码实现:
func (priorityQueue *PriorityQueue) Push(job Job) {
priorityQueue.mu.Lock()
defer priorityQueue.mu.Unlock()
//先根据job的优先级找要入队的队列
var idx int
var ok bool
//从优先级-切片索引的map中查找该优先级的队列是否存在
if idx, ok = priorityQueue.priorityIdx[job.Priority()]; !ok {
//如果不存在该优先级的队列,则需要初始化一个队列,并返回该队列在切片中的索引位置
idx = priorityQueue.addPriorityQueue(job.Priority)
}
//根据获取到的切片索引idx,找到具体的队列
queue := priority.queues[idx]
//将job推送到队列的队尾
queue.JobList.PushBack(job)
//队列job个数+1
priorityQueue.Size++
//如果队列job个数超过队列的最大容量,则从优先级最低的队列中移除工作单元
if priorityQueue.size > priorityQueue.capacity {
priorityQueue.RemoveLeastPriorityJob()
}else {
//通知新进来一个job
priorityQueue.noticeChan <- struct{}{}
}
}
代码中大部分也都做了注释,不难理解。这里我们来看下addPriorityQueue的具体实现:
func (priorityQueue *PriorityQueue) addPriorityQueue(priority int) int {
n := len(priorityQueue.queues)
//通过二分查找找到priority应插入的切片索引
pos := sort.Search(n, func(i int) bool {
return priority < priorityQueue.priority
})
//更新映射表中优先级和切片索引的对应关系
for i := pos; i < n; i++ {
priorityQueue.priorityIdx[priorityQueue.queues[i].priority] = i + 1
}
tail := make([]*jobQueue, n-pos)
copy(tail, priorityQueue.queues[pos:])
//初始化一个新的优先级队列,并将该元素放到切片的pos位置中
priorityQueue.queues = append(priorityQueue.queues[0:pos], newJobQueue(priority))
//将高于priority优先级的元素也拼接到切片后面
priorityQueue.queues = append(priorityQueue.queues, tail...)
return pos
}
最后,我们再来看一个实际的调用例子:
func main() {
//初始化一个队列
queue := &PriorityQueue{
noticeChan: make(chan struct{}, cap),
capacity: cap,
priorityIdx: make(map[int]int),
size: 0,
}
//初始化一个消费worker
workerManger := NewWorkerManager(queue)
// worker开始监听队列
go workerManger.StartWork()
// 构造SquareJob
job := &SquareJob{
BaseJob: &BaseJob{
DoneChan: make(chan struct{}, 1),
},
x: 5,
priority: 10,
}
//压入队列尾部
queue.PushJob(job)
//等待job执行完成
job.WaitDone()
print("The End")
}
我们在多队列-单消费者的基础上,再来看看多消费者模式。也就是增加worker的数量,提高Job的处理速度。
我们再来看下worker的定义:
type WorkerManager struct {
queue *PriorityQueue
closeChans []chan struct{}
}
这里需要注意,closeChans变成了切片数组。因为我们每启动一个worker,就需要有一个关闭通道。
然后看StartWorker函数的实现:
func (m *WorkerManager) StartWork(n int) error {
fmt.Println("Start to Work")
for i := 0; i < n; i++ {
m.createWorker();
}
return nil
}
func (m *WorkerManager) createWorker() {
closeChan := make(chan struct{})
//每个协程,就是一个worker
go func(closeChan chan struct{}) {
var job Job
for {
select {
case <-m.closeChan:
return nil
case <-m.queue.noticeChan:
job := m.queue.PopJob()
m.ConsumeJob(job)
}
}
}(closeChan)
m.closeChanMu.Lock()
defer m.closeChanMu.Unlock()
m.closeChans = append(m.closeChans, closeChan)
return nil
}
func (m *WorkerManager) ConsumeJob(job Job) {
defer func() {
job.Done()
}()
job.Execute()
}
这里需要注意的是,所有的worker都需要监听队列的noticeChan通道。测试的例子就留给读者自己了。
另外如下图的单队列-多消费者模式是多队列-多消费者模式的一个特例,这里就不再进行实现了。
优先级队列的实现主要利用了切片来存储多个队列,并将队列的优先级依次存储在切片索引中,并将具体的优先级和切片索引存储在映射表中,以便快速的定位一个具体优先级队列的存储位置。本文中一些细节的并发加锁操作做了忽略,大家在实际应用中根据需要进行完善即可。
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/H8vg3ye781zc9hMkPPy_uQ
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。