稳定可扩展的WebSocket代理

稳定可扩展的WebSocket代理 

第二期宏恩观技术论坛系列推文

本文为第二期宏恩观技术论坛系列推文第一篇:

《稳定可扩展的WebSocket代理》

作者:VIPKID架构师 李志博

目   录

Topic 1:ISCP语音评测平台概览

Topic 2:为什么重构

Topic 3:重构的核心设计

Topic 4:重构解决了哪些问题

Topic 5:一些经验分享

用技术赋能教育

全文字数:4750字

阅读时间:15分钟

ISCP语音评测平台

概览

我们开发的这个WebSocket代理叫做"ISCP语音评测平台"。它的上游是H5、IPAD、小程序等其他客户端,这些客户端统一通过WebSocket 协议进行交互,然后WebSocket代理在中间做选择路由转发和限流降级等工作,最后将请求转给下游具体的应用。

它的时间线是这样的:

第一版只有驰声一个下游,只是简单做了下游消息的转发。

第二版对接腾讯,讯飞,自研三个平台,同时增加限流、降级、下游之间的选择等功能。

第三版对原有逻辑进行了系统性重构,降低代码耦合,提高扩展性,并解决了一些历史遗留的bug。比如分成两套策略选择筛选下游,一套策略按用户传入的属性把不能满足支持条件的平台过滤掉;另一套策略直接按权重进行随机,这就有可能出现随机出来的平台,不满足请求的音频的情况,这种情况可能实际发生的概率较小,所以之前的版本没有注意到,在新版本里发现并顺带解决了这个问题。

为什么

重构

那么我们为什么要重构呢?

首先是糟糕的设计:它通过一个大的HashMap乱用的数据传递,导致代码难以理解和维护,就比如在一个类put,然后在另一个类get。

其次是业务层代码里混合着session资源的关闭,响应客户端输出等代码,分不清楚头和尾,也容易出现因资源管理混乱导致出现Session没有关闭的情况。这个其实和第一点有一定关系,因为这里关闭的session其实就是从上面说的那个大的HashMap去获取的。

第三是我们的下游有同步模型也有异步模型,之前版本没有做模型的统一化,导致代码没办法做通用处理,比如说cat的打点,响应客户端输出,资源关闭等。

最后是扩展性不好,扩展对接1个新的下游语音平台,往往需要改很多个位置,稍有不慎会引发线上bug,甚至会影响之前已经扩展完的下游平台。

重构

核心设计

然后说下重构核心设计。下面这个图是这次重构的一个整体的核心组件流程图。

首先说入口:DIspatcherWebSocketHandler。为什么叫这个名字呢?因为我对第1版和第2版代码进行了一次通读,觉得之前版本代码写的相对不好的原因是,因为我们是基于spring web socket 进行开发,那么web socket 发的消息流是我们自定义的协议,它并不像spring mvc 是基于一个固定的http 协议进行开发的,那么由于协议是固定的,所以spring mvc 可以为我们提供像拦截器这样的组件去帮我们进行业务逻辑和非业务逻辑的解耦。大家都知道,Spring Web Mvc 的拦截器的入参是HttpServletRequest,HttpServletResponse,那么这些都是和HTTP协议强绑定的,而我们的协议是自定义的,spring web socket 也就不能为我们提供这些东西,所以这就需要我们自己实现,于是我整个设计有很大一部分是参考了spring mvc 的一些设计,比如刚刚说到的入口叫DispatcherWebSocketHandler ,那么WebSocket 最开始会握手建立连接,在这个过程中我们会创建WebSocketProxyContext ,然后把它保存到WebSocketProxyContextManager 里去,然后发消息的时候,通过WebSocketProxyContextManager找到具体的Context去进行处理,Context里做的事,首先就是要通过HandlerSelector 去找到一个具体的BusinessHandler ,BusinessHandler 表示的是一个业务逻辑处理器,其内部执行的事情,一开始是Before拦截器,然后在具体的业务处理里,会通过ChannelSelector选择一个具体的RemoteChannel 进行处理,RemoteChannel 表示的是一个下游的抽象,那么在处理的过程中,如果出错会走error 拦截器,最后会走 After拦截器。在都处理完后,会统一返回一个WebSocketResponse对象,我们在Context里,根据Response进行判断,统一在这一个位置做响应客户端的处理。

说完整体,再来看细节。先从DispatcherWebSocketHandler 入口说起,首先是握手阶段,我们创建Context并以sessionId 作为key 将其保存到一个ConcurrentHashMap里去,这个ConcurrentHashMap是存放在WebSocketProxyContextManager里的。然后在发消息的时候,我们通过sessionId 找到具体的Context 对象,调context对象的handleMessage方法进行处理。连接关闭阶段,我们通过sessionId 把map 里的Context对象删除,然后掉被删除Context 对象里的destory 方法。而我们Context 对象它主要有HandlerSelector WebSocketSession RemoteChannel 等核心组件。

然后看WebSocketProxyContext 主要做的事情。首先根据消息内容,构造一个Request对象,根据Request对象选择BusinessHandler,调handler做具体业务逻辑处理返回Response对象,根据Response对象响应客户端。

再来看BusinessHandler 里具体做的事情。首先是先进入一个抽象类里创建Response对象,然后走Before拦截器,然后走业务子类去做具体的业务逻辑处理,处理完以后将数据组装到Response对象上,出错的话走Error拦截器,最后会执行After拦截器。

最后看BusinessHandler会做什么事情。我们主要有2个BusinessHandler,分别处理init 消息和stream消息,StreamBusinessHandler 内部会区分是不是最后一次发流,在InitBusinessHandler 通过ChannelSelector 创建具体的RemoteChannel ,并设置到上下文上,然后Init 和 Stream 就会通过之前创建的RemoteChannel 进行和下游的交互。

重构

解决的问题

现在让我们来看一下重构待解决的问题。

第一个问题就是之前说的session 资源关闭的问题。之前版本session资源的关闭是分散在多个类里的,这样我在实际维护的时候就会担心是否因为逻辑问题,导致session没有关闭的情况,比如说session.close 是写在if 块里的,还会担心是不是某个地方漏写了session.close

我们是如何解决这个问题的呢?我整理了实际需要关闭session的3个点:close 、transportError 事件和在context统一输出响应客户端的时候,会根据response 内部的信息,选择是否关闭session,并且我在设计的时候,就刻意将session在Context里设置为私有的,并且没有开放任何api 让外部的其他类去获取它,保证session的操作只能发生在Context类的内部,那么实际session的关闭逻辑全都写在了removeSession方法里,就保证了session资源关闭在一个位置,减轻了维护的负担。

我们再来看下响应客户端代码过于分散的问题。这个问题和Session关闭的问题有点类似,不过我想多提一句,这里响应客户端代码被封装成一个Util类里去做,这样做表面上来看,可能我们觉得封装在一个Util里面调用变的特别简单,但是这是我觉得恰巧不好的地方,调用简单使我们写不好代码的成本变的更低了,它扩大了这种代码的调用范围,而我们应该将这种代码的可调用范围缩小,使得更加可控。

解决这个问题的方法就是通过BusinessHandler统一返回的Response对象解耦,在Context里统一根据Response对象做响应客户端的逻辑,而不在分散在各个类的内部。

下一个问题是缺少拦截器机制,导致业务层代码和框架层处理的代码混合在了一起。业务打点,解析请求Header头,请求消息的Decoder,后续业务逻辑处理,其中业务打点和后续业务逻辑处理是属于业务层做的事情,而解析请求Header和请求消息Decoder是属于框架层做的事情,而之前的版本是将框架层做的事情和业务层做的事情混合在一起了。

我们的解决方式是通过设计的拦截器机制去将框架层代码和业务层代码进行解耦,有前置的校验,解码,限流等拦截器,出错的话会走AppError和Throwable拦截器,还有后置拦截比如说超并发走mock,执行阶段状态更新等拦截器。

下一个问题是一个大的try cache,ppt上是我写的一个伪代码,实际的代码里面的catch块比现在的伪代码还要在多几个,比如说会有一下具体我们使用下游的相关SDK 的异常,可以说之前的代码的模式是每接一个新的下游,或者新写一个新的需要抛受检异常的代码,都需要在这个大的try catch 加新的catch块,这样就会导致这个catch 块特别庞大,越来越难以维护。

解决办法就是我们在具体的下游的实现里面去捕获各SDK相关异常,并去做个性化的处理,然后统一转成一个AppError 异常并向上抛出,在AppError拦截器里面做通用化的处理。

最后一个问题是下游编程模型不一致的问题。像腾讯,自研,讯飞都是请求响应的同步模式,而驰声他是WebSocket 的异步模式,也正是因为编程模型不统一的问题,导致之前代码资源关闭,响应客户端输出,没办法在同一个位置去做。

我们的解决方案就是异步转同步,通过Future 的技术,在异步线程设置结果,主线程去获取结果这种模式来将异步转成了同步,方便我们代码做统一的处理。

重构

解决的问题

最后是我的经验分享。

首先是异常逻辑测试。正常的逻辑我们自己或者qa 去点一点功能,基本都能测到;异常逻辑可能是我们平时测试无法覆盖全面的,比如说我们在一个线程发生中断异常的时候,我们通过catch 块进行了捕获,并且在catch 块内部写了一些处理异常的逻辑,那么我们如何保证在catch块内的逻辑没问题呢?

这里有一个简单的方式,就是通过idea的手段,在idea的debug面板,选中要抛异常的堆栈,在菜单栏里选中Throw Exception ,在弹出的提示框里,可以随便new任何一个我们想要抛出的异常,点ok进行抛出,来模拟代码发生异常的情况。

在比如说,假设我们有段代码写着if (a == 5) {xx} else {xx} 这种代码,那么假设当前a的值不等于5,只能测else块,我们想测 if 快怎么办,也可以通过ppt 里截图显示的方式,通过debug面板找到 a 这个变量,然后右键菜单,选择set value,然后去修改 a 的值,叫它等于5,来测 if 块里的逻辑。

然后我们看单元测试。单元测试其实是一种更加推荐的测试方式,也是我们程序员一种保证代码质量,提升开发效率的一种有效的手段。那么在我们的场景里,要想做单元测试,稍微有点麻烦,因为我们客户端像我们的服务器发送的是flatbuffer包装的二进制流,通过flatbuffer的api 去模拟数据的话,会比较麻烦(个人觉得它的api实际上很难用),于是我通过一个相对来讲比较简单的方式去做这个事情,就是第一次先人工debug ,然后在入口处加了一段代码,通过对象流将客户端传给我们的消息写入到本地文件。然后每次单元测试启动的时候读文件,用于模拟数据。通过单元测试的手段,不仅帮我在上线之前减少了出现bug的可能性,还在上线以后出现稀奇古怪的问题的时候,有效的帮我们在本地模拟线上的场景,复现问题,只要问题能复现,那解决自然也就不是问题了。

然后是防御式编程,之所以要提这个,是因为网络问题是千奇百怪的,比如说正常的请求流程是先握手初始化,发消息,然后关闭连接。那么异常情况可能是握手初始化,关闭连接,然后在发消息,这种异常情况是真实存在的,因为我们在线上日志观测到的日志输出顺序和sessionId 都确认了就是同一个请求,同一个session.那么由于我们1开始的代码,并没有针对这种异常逻辑进行严格的校验,导致在灰度过程中出了一些问题。

然后要跟大家说的是,一定要慎用第三方的native 方法,在我们的灰度过程中,由于在极小概率的情况下,可能会出现之前防御式编程提到的请求顺序不正确的情况,由于我们之前没有对这种情况做严格的校验判断,导致代码在不应该调native方法的场景调了native方法,会导致native方法内部操作指针的时候发生异常,进而导致jvm crash ,简单解释一下jvm crash 什么意思,就是在系统其它指标都非常正常的时候,进程突然没了,并且业务日志上没有任何异常日志,这种问题排查起来非常头疼,而且也非常恐怖,建议大家加上我ppt里写着的这个jvm启动参数 -XX:ErrorFile=path ,这样在发生这种异常的时候,才会出现相关的异常日志,另外建议大家能不用这种native方法的包还是别用,如果是普通java的包,顶多就是出现空指针之类的异常,但是不会导致jvm的进程直接消失。

最后想跟大家分享的是前2天我们在接入一个新的下游语音平台发现的问题,由于是最近发现的问题,所以ppt没写,具体的下游名字我就不提了,在我们实际接入,并压测的时候,出现了内存泄露的问题,通过jmap dump 文件,我们发现是一个叫Port的抽象类里有一个HashMap,它里面有5万多元素没有被回收,于是我查找这个HashMap的删除元素的代码是哪个方法,看看这个方法是不是没有被执行,于是发现是tomcat websocket 内部有有一个类叫WsSession,那么在这个WsSession内部有一个方法叫onClose,在onClose内部的最后1句代码的内部会删除map里的元素,也就是说,基本可以断定是onClose 方法没走导致的内存泄露,而为什么这个方法会不执行呢,于是我debug session.close的源码发现,WebSocketSession 的close 的逻辑其实是这样的,客户端的session在close 的时候,会向服务端发一个请求关闭的消息,然后服务端回客户端一个消息,在客户端接入服务端回的消息的时候,会调用onClose方法,销毁资源,那么想想看,假设服务端因为某种原因没有回消息怎么办,那么这个Map就永远也删除不了元素最终导致内存泄露了,其实我是不太愿意相信tomcat 会出现这种低级错误的,居然没有一种托底的手段,来保证内存不泄露的问题,但是实际上,我们通过主动人工写代码去调这个onClose方法,然后重新压测,并且压测的并发量和时长都比之前出现内存泄露那次压测的量要大,时间也要更长,但是最终并没有出现之前发生的内存泄露问题,由此可以证明我们的猜测是正确的。所以大家一定要注意下这里。

谢谢大家!

敬请期待

《核心系统稳定性建设》

《实时直播系统的故障管理实践》

《打造上课系统的诺亚方舟》

《面向故障编程》