使用Go开发一个简单反向代理服务

最近,团队的小伙伴反映,我们这边一个短连接服务在一台普通的服务器上吞吐量受到限制,所以把服务迁移到高性能机器上,虽然硬件是数倍的提升但压测发现吞吐量并没有预期的效果。

结合后台服务本身的特点初步原因分析:

1、从下往上看:服务属于计算IO密集型,性能瓶颈多在于计算请求,但高配机压测过程中,受到单实例模块之间通讯采用串行调用的特点,虽然单点请求计算性能有很大提速,但总体并行上不去,CPU利用率低2、从上往下看: 吞吐量受服务器的接受能力影响很大,由于短连接接入层目前只有一个实例,无论部署在中配或是高配,除非是多实例模式或者类似nginx这种多worker工作模型,一般情况下,单实例accept的效果有限,高并发时容易成为瓶颈3、从服务进程的角度看,单个web api的请求accept队列(backlog)是有限制的,如果多实例部署也许能补短。

分析到这里,很多人都想到可以通过扩容+分布式通讯的方式来弥补短板。是的,方法是摆在面前,但是你想到一个方法不难,难的是你要如何去验证你的想法。毕竟对于一个成熟的产品技术框架,不是随便都能重构的,一定要数据说话。

不过如何调优不是本文的目的,本文的目的是如何使用Go来快速实现一个反向代理服务来验证前面的背景想法。

设计

一个反向代理层,无论是四层还是七层,我觉得实现上主要需要具备以下工作:

负载均衡算法请求可传递endpoints可权重配置endpoints故障处理

关于使用Go写负载均衡算法,之前在 《关于Round-Robin》这文章提及过,这里不延伸。

以http为例,go如何快速实现反向代理?

查看go的文档,发现源码net/http/httputil提供了一个叫 ReverseProxy:https://godoc.org/net/http/httputil#ReverseProxy 的玩意,这个就是golang自带反向代理功能,而且使用很简单

ReverseProxy提供了ServerHTTP方法,这意味着我们可以跟普通http handler一样简单地使用它来处理请求

ReverseProxy 暴露了NewSingleHostReverseProxy的方法

// NewSingleHostReverseProxy returns a new ReverseProxy that rewrites // URLs to the scheme, host, and base path provided in target. If the // targets path is "/base" and the incoming request was for "/dir", // the target request will be for /base/dir. func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy { targetQuery := target.RawQuery director := func(req *http.Request) { req.URL.Scheme = target.Scheme req.URL.Host = target.Host req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path) if targetQuery == "" || req.URL.RawQuery == "" { req.URL.RawQuery = targetQuery + req.URL.RawQuery } else { req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery } } return &ReverseProxy{Director: director} }

这样,我们可以通过一行代码就基本上实现了主体的反向代理功能了,如下:

httputil.NewSingleHostReverseProxy(address)

实现

结合Round-Robin,我们尝试实现我们的反向代理层

带权重的负载均衡实现 round-robin.go

package roundrobin // RR: 基于 权重round robin算法的接口 type RR interface { Next() interface{} Add(node interface{}, weight int) RemoveAll() Reset() } const ( RR_NGINX = 0 //Nginx算法 RR_LVS = 1 //LVS算法 ) //算法实现工厂类 func NewWeightedRR(rtype int) RR { if rtype == RR_NGINX { return &WNGINX{} } else if rtype == RR_LVS { return &WLVS{} } return nil } //节点结构 type WeightNginx struct { Nodeinterface{} Weightint CurrentWeight int EffectiveWeight int } func (ww *WeightNginx) fail() { ww.EffectiveWeight -= ww.Weight if ww.EffectiveWeight < 0 { ww.EffectiveWeight = 0 } } //nginx算法实现类 type WNGINX struct { nodes []*WeightNginx n int } //增加权重节点 func (w *WNGINX) Add(node interface{}, weight int) { weighted := &WeightNginx{ Node:node, Weight:weight, EffectiveWeight: weight} w.nodes = append(w.nodes, weighted) w.n++ } func (w *WNGINX) RemoveAll() { w.nodes = w.nodes[:0] w.n = 0 } //下次轮询事件 func (w *WNGINX) Next() interface{} { if w.n == 0 { return nil } if w.n == 1 { return w.nodes[0].Node } return nextWeightedNode(w.nodes).Node } func nextWeightedNode(nodes []*WeightNginx) (best *WeightNginx) { total := 0 for i := 0; i < len(nodes); i++ { w := nodes[i] if w == nil { continue } w.CurrentWeight += w.EffectiveWeight total += w.EffectiveWeight if w.EffectiveWeight < w.Weight { w.EffectiveWeight++ } if best == nil || w.CurrentWeight > best.CurrentWeight { best = w } } if best == nil { return nil } best.CurrentWeight -= total return best } func (w *WNGINX) Reset() { for _, s := range w.nodes { s.EffectiveWeight = s.Weight s.CurrentWeight = 0 } } //节点结构 type WeightLvs struct { Node interface{} Weight int } //lvs算法实现类 type WLVS struct { nodes []*WeightLvs n int gcd int //通用的权重因子 maxWint //最大权重 i int //被选择的次数 cwint //当前的权重值 } //下次轮询事件 func (w *WLVS) Next() interface{} { if w.n == 0 { return nil } if w.n == 1 { return w.nodes[0].Node } for { w.i = (w.i + 1) % w.n if w.i == 0 { w.cw = w.cw - w.gcd if w.cw <= 0 { w.cw = w.maxW if w.cw == 0 { return nil } } } if w.nodes[w.i].Weight >= w.cw { return w.nodes[w.i].Node } } } //增加权重节点 func (w *WLVS) Add(node interface{}, weight int) { weighted := &WeightLvs{Node: node, Weight: weight} if weight > 0 { if w.gcd == 0 { w.gcd = weight w.maxW = weight w.i = -1 w.cw = 0 } else { w.gcd = gcd(w.gcd, weight) if w.maxW < weight { w.maxW = weight } } } w.nodes = append(w.nodes, weighted) w.n++ } func gcd(x, y int) int { var t int for { t = (x % y) if t > 0 { x = y y = t } else { return y } } } func (w *WLVS) RemoveAll() { w.nodes = w.nodes[:0] w.n = 0 w.gcd = 0 w.maxW = 0 w.i = -1 w.cw = 0 } func (w *WLVS) Reset() { w.i = -1 w.cw = 0 }

主体部分 main.go

var RR = rr.NewWeightedRR(rr.RR_NGINX) type handle struct { addrs []string } func (this *handle) ServeHTTP(w http.ResponseWriter, r *http.Request) { addr := RR.Next().(string) remote, err := url.Parse("http://" + addr) if err != nil { panic(err) } proxy := httputil.NewSingleHostReverseProxy(remote) proxy.ServeHTTP(w, r) } func startServer() { //被代理的服务器host和port h := &handle{} h.addrs = []string{"172.17.0.2:28080", "172.17.0.3:28080"} w := 1 for _, e := range h.addrs { RR.Add(e, w) w++ } err := http.ListenAndServe(":28080", h) if err != nil { log.Fatalln("ListenAndServe: ", err) } } func main() { startServer() }

在ReverseProxy中的ServeHTTP方法实现了这个具体的过程,主要是对源http包头进行重新封装,而后发送到后端服务器。

这样,我们一个简单快速的反向代理层就实现了,日常可以基于它自定义负载我们的服务。