资讯

精准传达 • 有效沟通

从品牌网站建设到网络营销策划,从策略到执行的一站式服务

go语言实现线程池 golang 线程池和协程池

如何用go语言每分钟处理100万个请求

在Malwarebytes 我们经历了显著的增长,自从我一年前加入了硅谷的公司,一个主要的职责成了设计架构和开发一些系统来支持一个快速增长的信息安全公司和所有需要的设施来支持一个每天百万用户使用的产品。我在反病毒和反恶意软件行业的不同公司工作了12年,从而我知道由于我们每天处理大量的数据,这些系统是多么复杂。

成都创新互联专业为企业提供林口网站建设、林口做网站、林口网站设计、林口网站制作等企业网站建设、网页设计与制作、林口企业网站模板建站服务,10余年林口做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。

有趣的是,在过去的大约9年间,我参与的所有的web后端的开发通常是通过Ruby on Rails技术实现的。不要错怪我。我喜欢Ruby on Rails,并且我相信它是个令人惊讶的环境。但是一段时间后,你会开始以ruby的方式开始思考和设计系统,你会忘记,如果你可以利用多线程、并行、快速执行和小内存开销,软件架构本来应该是多么高效和简单。很多年期间,我是一个c/c++、Delphi和c#开发者,我刚开始意识到使用正确的工具可以把复杂的事情变得简单些。

作为首席架构师,我不会很关心在互联网上的语言和框架战争。我相信效率、生产力。代码可维护性主要依赖于你如何把解决方案设计得很简单。

问题

当工作在我们的匿名遥测和分析系统中,我们的目标是可以处理来自于百万级别的终端的大量的POST请求。web处理服务可以接收包含了很多payload的集合的JSON数据,这些数据需要写入Amazon S3中。接下来,map-reduce系统可以操作这些数据。

按照习惯,我们会调研服务层级架构,涉及的软件如下:

Sidekiq

Resque

DelayedJob

Elasticbeanstalk Worker Tier

RabbitMQ

and so on…

搭建了2个不同的集群,一个提供web前端,另外一个提供后端处理,这样我们可以横向扩展后端服务的数量。

但是,从刚开始,在 讨论阶段我们的团队就知道我们应该使用Go,因为我们看到这会潜在性地成为一个非常庞大( large traffic)的系统。我已经使用了Go语言大约2年时间,我们开发了几个系统,但是很少会达到这样的负载(amount of load)。

我们开始创建一些结构,定义从POST调用得到的web请求负载,还有一个上传到S3 budket的函数。

type PayloadCollection struct {

WindowsVersion string `json:"version"`

Token string `json:"token"`

Payloads []Payload `json:"data"`

}

type Payload struct {

// [redacted]

}

func (p *Payload) UploadToS3() error {

// the storageFolder method ensures that there are no name collision in

// case we get same timestamp in the key name

storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())

bucket := S3Bucket

b := new(bytes.Buffer)

encodeErr := json.NewEncoder(b).Encode(payload)

if encodeErr != nil {

return encodeErr

}

// Everything we post to the S3 bucket should be marked 'private'

var acl = s3.Private

var contentType = "application/octet-stream"

return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})

}

本地Go routines方法

刚开始,我们采用了一个非常本地化的POST处理实现,仅仅尝试把发到简单go routine的job并行化:

func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {

w.WriteHeader(http.StatusMethodNotAllowed)

return

}

// Read the body into a string for json decoding

var content = PayloadCollection{}

err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(content)

if err != nil {

w.Header().Set("Content-Type", "application/json; charset=UTF-8")

w.WriteHeader(http.StatusBadRequest)

return

}

// Go through each payload and queue items individually to be posted to S3

for _, payload := range content.Payloads {

go payload.UploadToS3() // ----- DON'T DO THIS

}

w.WriteHeader(http.StatusOK)

}

对于中小负载,这会对大多数的人适用,但是大规模下,这个方案会很快被证明不是很好用。我们期望的请求数,不在我们刚开始计划的数量级,当我们把第一个版本部署到生产环境上。我们完全低估了流量。

上面的方案在很多地方很不好。没有办法控制我们产生的go routine的数量。由于我们收到了每分钟1百万的POST请求,这段代码很快就崩溃了。

再次尝试

我们需要找一个不同的方式。自开始我们就讨论过, 我们需要保持请求处理程序的生命周期很短,并且进程在后台产生。当然,这是你在Ruby on Rails的世界里必须要做的事情,否则你会阻塞在所有可用的工作 web处理器上,不管你是使用puma、unicore还是passenger(我们不要讨论JRuby这个话题)。然后我们需要利用常用的处理方案来做这些,比如Resque、 Sidekiq、 SQS等。这个列表会继续保留,因为有很多的方案可以实现这些。

所以,第二次迭代,我们创建了一个缓冲channel,我们可以把job排队,然后把它们上传到S3。因为我们可以控制我们队列中的item最大值,我们有大量的内存来排列job,我们认为只要把job在channel里面缓冲就可以了。

var Queue chan Payload

func init() {

Queue = make(chan Payload, MAX_QUEUE)

}

func payloadHandler(w http.ResponseWriter, r *http.Request) {

...

// Go through each payload and queue items individually to be posted to S3

for _, payload := range content.Payloads {

Queue - payload

}

...

}

接下来,我们再从队列中取job,然后处理它们。我们使用类似于下面的代码:

func StartProcessor() {

for {

select {

case job := -Queue:

job.payload.UploadToS3() // -- STILL NOT GOOD

}

}

}

说实话,我不知道我们在想什么。这肯定是一个满是Red-Bulls的夜晚。这个方法不会带来什么改善,我们用了一个 有缺陷的缓冲队列并发,仅仅是把问题推迟了。我们的同步处理器同时仅仅会上传一个数据到S3,因为来到的请求远远大于单核处理器上传到S3的能力,我们的带缓冲channel很快达到了它的极限,然后阻塞了请求处理逻辑的queue更多item的能力。

我们仅仅避免了问题,同时开始了我们的系统挂掉的倒计时。当部署了这个有缺陷的版本后,我们的延时保持在每分钟以常量增长。

最好的解决方案

我们讨论过在使用用Go channel时利用一种常用的模式,来创建一个二级channel系统,一个来queue job,另外一个来控制使用多少个worker来并发操作JobQueue。

想法是,以一个恒定速率并行上传到S3,既不会导致机器崩溃也不好产生S3的连接错误。这样我们选择了创建一个Job/Worker模式。对于那些熟悉Java、C#等语言的开发者,可以把这种模式想象成利用channel以golang的方式来实现了一个worker线程池,作为一种替代。

var (

MaxWorker = os.Getenv("MAX_WORKERS")

MaxQueue = os.Getenv("MAX_QUEUE")

)

// Job represents the job to be run

type Job struct {

Payload Payload

}

// A buffered channel that we can send work requests on.

var JobQueue chan Job

// Worker represents the worker that executes the job

type Worker struct {

WorkerPool chan chan Job

JobChannel chan Job

quit chan bool

}

func NewWorker(workerPool chan chan Job) Worker {

return Worker{

WorkerPool: workerPool,

JobChannel: make(chan Job),

quit: make(chan bool)}

}

// Start method starts the run loop for the worker, listening for a quit channel in

// case we need to stop it

func (w Worker) Start() {

go func() {

for {

// register the current worker into the worker queue.

w.WorkerPool - w.JobChannel

select {

case job := -w.JobChannel:

// we have received a work request.

if err := job.Payload.UploadToS3(); err != nil {

log.Errorf("Error uploading to S3: %s", err.Error())

}

case -w.quit:

// we have received a signal to stop

return

}

}

}()

}

// Stop signals the worker to stop listening for work requests.

func (w Worker) Stop() {

go func() {

w.quit - true

}()

}

我们已经修改了我们的web请求handler,用payload创建一个Job实例,然后发到JobQueue channel,以便于worker来获取。

func payloadHandler(w http.ResponseWriter, r *http.Request) {

if r.Method != "POST" {

w.WriteHeader(http.StatusMethodNotAllowed)

return

}

// Read the body into a string for json decoding

var content = PayloadCollection{}

err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(content)

if err != nil {

w.Header().Set("Content-Type", "application/json; charset=UTF-8")

w.WriteHeader(http.StatusBadRequest)

return

}

// Go through each payload and queue items individually to be posted to S3

for _, payload := range content.Payloads {

// let's create a job with the payload

work := Job{Payload: payload}

// Push the work onto the queue.

JobQueue - work

}

w.WriteHeader(http.StatusOK)

}

在web server初始化时,我们创建一个Dispatcher,然后调用Run()函数创建一个worker池子,然后开始监听JobQueue中的job。

dispatcher := NewDispatcher(MaxWorker)

dispatcher.Run()

下面是dispatcher的实现代码:

type Dispatcher struct {

// A pool of workers channels that are registered with the dispatcher

WorkerPool chan chan Job

}

func NewDispatcher(maxWorkers int) *Dispatcher {

pool := make(chan chan Job, maxWorkers)

return Dispatcher{WorkerPool: pool}

}

func (d *Dispatcher) Run() {

// starting n number of workers

for i := 0; i d.maxWorkers; i++ {

worker := NewWorker(d.pool)

worker.Start()

}

go d.dispatch()

}

func (d *Dispatcher) dispatch() {

for {

select {

case job := -JobQueue:

// a job request has been received

go func(job Job) {

// try to obtain a worker job channel that is available.

// this will block until a worker is idle

jobChannel := -d.WorkerPool

// dispatch the job to the worker job channel

jobChannel - job

}(job)

}

}

}

注意到,我们提供了初始化并加入到池子的worker的最大数量。因为这个工程我们利用了Amazon Elasticbeanstalk带有的docker化的Go环境,所以我们常常会遵守12-factor方法论来配置我们的生成环境中的系统,我们从环境变了读取这些值。这种方式,我们控制worker的数量和JobQueue的大小,所以我们可以很快的改变这些值,而不需要重新部署集群。

var (

MaxWorker = os.Getenv("MAX_WORKERS")

MaxQueue = os.Getenv("MAX_QUEUE")

)

直接结果

我们部署了之后,立马看到了延时降到微乎其微的数值,并未我们处理请求的能力提升很大。

Elastic Load Balancers完全启动后,我们看到ElasticBeanstalk 应用服务于每分钟1百万请求。通常情况下在上午时间有几个小时,流量峰值超过每分钟一百万次。

我们一旦部署了新的代码,服务器的数量从100台大幅 下降到大约20台。

我们合理配置了我们的集群和自动均衡配置之后,我们可以把服务器的数量降至4x EC2 c4.Large实例,并且Elastic Auto-Scaling设置为如果CPU达到5分钟的90%利用率,我们就会产生新的实例。

总结

在我的书中,简单总是获胜。我们可以使用多队列、后台worker、复杂的部署设计一个复杂的系统,但是我们决定利用Elasticbeanstalk 的auto-scaling的能力和Go语言开箱即用的特性简化并发。

我们仅仅用了4台机器,这并不是什么新鲜事了。可能它们还不如我的MacBook能力强大,但是却处理了每分钟1百万的写入到S3的请求。

处理问题有正确的工具。当你的 Ruby on Rails 系统需要更强大的web handler时,可以考虑下ruby生态系统之外的技术,或许可以得到更简单但更强大的替代方案。

【golang详解】go语言GMP(GPM)原理和调度

Goroutine调度是一个很复杂的机制,下面尝试用简单的语言描述一下Goroutine调度机制,想要对其有更深入的了解可以去研读一下源码。

首先介绍一下GMP什么意思:

G ----------- goroutine: 即Go协程,每个go关键字都会创建一个协程。

M ---------- thread内核级线程,所有的G都要放在M上才能运行。

P ----------- processor处理器,调度G到M上,其维护了一个队列,存储了所有需要它来调度的G。

Goroutine 调度器P和 OS 调度器是通过 M 结合起来的,每个 M 都代表了 1 个内核线程,OS 调度器负责把内核线程分配到 CPU 的核上执行

模型图:

避免频繁的创建、销毁线程,而是对线程的复用。

1)work stealing机制

当本线程无可运行的G时,尝试从其他线程绑定的P偷取G,而不是销毁线程。

2)hand off机制

当本线程M0因为G0进行系统调用阻塞时,线程释放绑定的P,把P转移给其他空闲的线程执行。进而某个空闲的M1获取P,继续执行P队列中剩下的G。而M0由于陷入系统调用而进被阻塞,M1接替M0的工作,只要P不空闲,就可以保证充分利用CPU。M1的来源有可能是M的缓存池,也可能是新建的。当G0系统调用结束后,根据M0是否能获取到P,将会将G0做不同的处理:

如果有空闲的P,则获取一个P,继续执行G0。

如果没有空闲的P,则将G0放入全局队列,等待被其他的P调度。然后M0将进入缓存池睡眠。

如下图

GOMAXPROCS设置P的数量,最多有GOMAXPROCS个线程分布在多个CPU上同时运行

在Go中一个goroutine最多占用CPU 10ms,防止其他goroutine被饿死。

具体可以去看另一篇文章

【Golang详解】go语言调度机制 抢占式调度

当创建一个新的G之后优先加入本地队列,如果本地队列满了,会将本地队列的G移动到全局队列里面,当M执行work stealing从其他P偷不到G时,它可以从全局G队列获取G。

协程经历过程

我们创建一个协程 go func()经历过程如下图:

说明:

这里有两个存储G的队列,一个是局部调度器P的本地队列、一个是全局G队列。新创建的G会先保存在P的本地队列中,如果P的本地队列已经满了就会保存在全局的队列中;处理器本地队列是一个使用数组构成的环形链表,它最多可以存储 256 个待执行任务。

G只能运行在M中,一个M必须持有一个P,M与P是1:1的关系。M会从P的本地队列弹出一个可执行状态的G来执行,如果P的本地队列为空,就会想其他的MP组合偷取一个可执行的G来执行;

一个M调度G执行的过程是一个循环机制;会一直从本地队列或全局队列中获取G

上面说到P的个数默认等于CPU核数,每个M必须持有一个P才可以执行G,一般情况下M的个数会略大于P的个数,这多出来的M将会在G产生系统调用时发挥作用。类似线程池,Go也提供一个M的池子,需要时从池子中获取,用完放回池子,不够用时就再创建一个。

work-stealing调度算法:当M执行完了当前P的本地队列队列里的所有G后,P也不会就这么在那躺尸啥都不干,它会先尝试从全局队列队列寻找G来执行,如果全局队列为空,它会随机挑选另外一个P,从它的队列里中拿走一半的G到自己的队列中执行。

如果一切正常,调度器会以上述的那种方式顺畅地运行,但这个世界没这么美好,总有意外发生,以下分析goroutine在两种例外情况下的行为。

Go runtime会在下面的goroutine被阻塞的情况下运行另外一个goroutine:

用户态阻塞/唤醒

当goroutine因为channel操作或者network I/O而阻塞时(实际上golang已经用netpoller实现了goroutine网络I/O阻塞不会导致M被阻塞,仅阻塞G,这里仅仅是举个栗子),对应的G会被放置到某个wait队列(如channel的waitq),该G的状态由_Gruning变为_Gwaitting,而M会跳过该G尝试获取并执行下一个G,如果此时没有可运行的G供M运行,那么M将解绑P,并进入sleep状态;当阻塞的G被另一端的G2唤醒时(比如channel的可读/写通知),G被标记为,尝试加入G2所在P的runnext(runnext是线程下一个需要执行的 Goroutine。), 然后再是P的本地队列和全局队列。

系统调用阻塞

当M执行某一个G时候如果发生了阻塞操作,M会阻塞,如果当前有一些G在执行,调度器会把这个线程M从P中摘除,然后再创建一个新的操作系统的线程(如果有空闲的线程可用就复用空闲线程)来服务于这个P。当M系统调用结束时候,这个G会尝试获取一个空闲的P执行,并放入到这个P的本地队列。如果获取不到P,那么这个线程M变成休眠状态, 加入到空闲线程中,然后这个G会被放入全局队列中。

队列轮转

可见每个P维护着一个包含G的队列,不考虑G进入系统调用或IO操作的情况下,P周期性的将G调度到M中执行,执行一小段时间,将上下文保存下来,然后将G放到队列尾部,然后从队列中重新取出一个G进行调度。

除了每个P维护的G队列以外,还有一个全局的队列,每个P会周期性地查看全局队列中是否有G待运行并将其调度到M中执行,全局队列中G的来源,主要有从系统调用中恢复的G。之所以P会周期性地查看全局队列,也是为了防止全局队列中的G被饿死。

除了每个P维护的G队列以外,还有一个全局的队列,每个P会周期性地查看全局队列中是否有G待运行并将其调度到M中执行,全局队列中G的来源,主要有从系统调用中恢复的G。之所以P会周期性地查看全局队列,也是为了防止全局队列中的G被饿死。

M0

M0是启动程序后的编号为0的主线程,这个M对应的实例会在全局变量rutime.m0中,不需要在heap上分配,M0负责执行初始化操作和启动第一个G,在之后M0就和其他的M一样了

G0

G0是每次启动一个M都会第一个创建的goroutine,G0仅用于负责调度G,G0不指向任何可执行的函数,每个M都会有一个自己的G0,在调度或系统调用时会使用G0的栈空间,全局变量的G0是M0的G0

一个G由于调度被中断,此后如何恢复?

中断的时候将寄存器里的栈信息,保存到自己的G对象里面。当再次轮到自己执行时,将自己保存的栈信息复制到寄存器里面,这样就接着上次之后运行了。

我这里只是根据自己的理解进行了简单的介绍,想要详细了解有关GMP的底层原理可以去看Go调度器 G-P-M 模型的设计者的文档或直接看源码

参考: ()

()

golang的线程模型——GMP模型

内核线程(Kernel-Level Thread ,KLT)

轻量级进程(Light Weight Process,LWP):轻量级进程就是我们通常意义上所讲的线程,由于每个轻量级进程都由一个内核线程支持,因此只有先支持内核线程,才能有轻量级进程

用户线程与系统线程一一对应,用户线程执行如lo操作的系统调用时,来回切换操作开销相对比较大

多个用户线程对应一个内核线程,当内核线程对应的一个用户线程被阻塞挂起时候,其他用户线程也阻塞不能执行了。

多对多模型是可以充分利用多核CPU提升运行效能的

go线程模型包含三个概念:内核线程(M),goroutine(G),G的上下文环境(P);

GMP模型是goalng特有的。

P与M一般是一一对应的。P(上下文)管理着一组G(goroutine)挂载在M(内核线程)上运行,图中左边蓝色为正在执行状态的goroutine,右边为待执行状态的goroutiine队列。P的数量由环境变量GOMAXPROCS的值或程序运行runtime.GOMAXPROCS()进行设置。

当一个os线程在执行M1一个G1发生阻塞时,调度器让M1抛弃P,等待G1返回,然后另起一个M2接收P来执行剩下的goroutine队列(G2、G3...),这是golang调度器厉害的地方,可以保证有足够的线程来运行剩下所有的goroutine。

当G1结束后,M1会重新拿回P来完成,如果拿不到就丢到全局runqueue中,然后自己放到线程池或转入休眠状态。空闲的上下文P会周期性的检查全局runqueue上的goroutine,并且执行它。

另一种情况就是当有些P1太闲而其他P2很忙碌的时候,会从其他上下文P2拿一些G来执行。

详细可以翻看下方第一个参考链接,写得真好。

最后用大佬的总结来做最后的收尾————

Go语言运行时,通过核心元素G,M,P 和 自己的调度器,实现了自己的并发线程模型。调度器通过对G,M,P的调度实现了两级线程模型中操作系统内核之外的调度任务。整个调度过程中会在多种时机去触发最核心的步骤 “一整轮调度”,而一整轮调度中最关键的部分在“全力查找可运行G”,它保证了M的高效运行(换句话说就是充分使用了计算机的物理资源),一整轮调度中还会涉及到M的启用停止。最后别忘了,还有一个与Go程序生命周期相同的系统监测任务来进行一些辅助性的工作。

浅析Golang的线程模型与调度器

Golang CSP并发模型

Golang线程模型


本文题目:go语言实现线程池 golang 线程池和协程池
文章链接:http://www.cdkjz.cn/article/dddddcg.html
多年建站经验

多一份参考,总有益处

联系快上网,免费获得专属《策划方案》及报价

咨询相关问题或预约面谈,可以通过以下方式与我们联系

大客户专线   成都:13518219792   座机:028-86922220