源码阅读之初探EMQX

一、EMQX项目简介

EMQX (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP 是出色的软实时(Soft-Realtime)、低延时(Low-Latency)、分布式(Distributed) 的语言平台。MQTT 是轻量的(Lightweight)、发布订阅模式(PubSub) 的物联网消息协议。

二、协议简介

MQTT是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。 MQTT官网: http://mqtt.org MQTT V3.1.1协议规范: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html 这里我们一切从简,直接上图:

协议流程图(图一)

MQTT的控制包类型(MQTT Control Packet type)包含:CONNECT、CONNACK、PUBLISH、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK、PINGREQ、PINGRESP、DISCONNECT,基本上都在图中体现了,这些协议包在图中的流转基本上实现了MQTT定义的连接、断开、订阅、取消订阅、发布的功能。

三、代码阅读

本篇笔记仅仅初探EMQX(3.0版本),主要包含连接层与会话层代码分析、订阅/取消订阅过程分析、发布路由跳转,因为篇幅有限,通篇点到为止。

连接层与会话层

这里相关代码也比较多,这里主要关注emqx_connection.erl、emqx_protocol.erl、emqx_session.erl、emqx_broker.erl。 每一个设备连接进来都会有两个进程,一个是连接进程(conn),一个是会话进程(session),连接进程的ID我们成为ConnPid,会话进程的ID我们成为SPid。

连接进程:主要负责报文的收发、解析。会话进程:处理 MQTT 协议发布订阅(Publish/Subscribe)业务交互流程,处理 Qos0/1/2 消息接收与下发,消息超时重传与离线消息保存,通过飞行窗口(Inflight Window),实现下发消息吞吐控制与顺序保证。

(一)流程图

下面是我根据代码画出的流程图:

连接、订阅、取消订阅、断连(图二)发布(图三)图例说明:Device 表示连接服务的Client端,Conn表示服务端连接进程,Session表示会话进程。粗实线表Client发送服务端的协议包;还表示服务中的同步处理。细虚线表示服务端发给Client端的协议包。粗虚线表示服务端的异步调用。

1、连接

设备与服务端建立长链接后,会派生出一个Conn进程来维护设备与服务端的通信。

客户端收到CONNECT协议包,校验完权限后会建立session,session进程建立成功后会与conn进程建立双向绑定,随后conn进程给设备返回CONNACK协议包。

我们看一下两个进程的state conn进程

{status,<0.2190.0>, {module,gen_server}, [[{incoming_bytes,166}, {{subscribe,<<"mqttbroker/xxx">>},{allow,16}}, {$ancestors,[<0.1876.0>,<0.1875.0>,esockd_sup,<0.1555.0>]}, {force_shutdown_policy,#{max_heap_size => 0,message_queue_len => 0}}, {acl_keys_q,{[{subscribe,<<"mqttbroker/xxx">>}],[]}}, {rand_seed,{#{bits => 58,jump => #Fun<rand.8.>, next => #Fun<rand.5.>,type => exrop, uniform => #Fun<rand.6.>, uniform_n => #Fun<rand.7.>,weak_low_bits => 1}, [|]}}, {$logger_metadata$,#{client_id => <<"mqttbroker/slw">>, peername => "127.0.0.1:55952",pid => <0.2190.0>}}, {guid,{88773,1070,0}}, {$initial_call,{emqx_connection,init,1}}, {acl_cache_size,1}], running,<0.1876.0>,[], [{header,"Status for generic server <0.2190.0>"}, {data,[{"Status",running}, {"Parent",<0.1876.0>}, {"Logged events",[]}]}, {data,[{"State", #state{transport = esockd_transport,socket = #Port<0.29>, peername = {{127,0,0,1},55952}, sockname = ,conn_state = running,active_n = 100, proto_state = #pstate{zone = external, sendfun = #Fun<emqx_connection.0.>, peername = {{127,0,0,1},55952}, peercert = nossl,proto_ver = 4,proto_name = <<"MQTT">>, client_id = <<"mqttbroker/slw">>,is_assigned = false, conn_pid = <0.2190.0>,conn_props = #{}, ack_props = ,username = <<"mqttbroker/slw">>, session = <0.2192.0>,clean_start = true,topic_aliases = #{}, packet_size = ,keepalive = 60,mountpoint = , is_super = false,is_bridge = false, prod_key = <<"mqttbroker">>,dev_name = <<"slw">>, enable_ban = true,enable_acl = true, acl_deny_action = ignore, recv_stats = #{msg => 0,pkt => 3}, send_stats = #{msg => 0,pkt => 3}, connected = true, connected_at = {1570,,}, ignore_loop = false, topic_alias_maximum = #{from_client => 0,to_client => 0}}, parser_state = {none,#{max_packet_size => ,version => 4}}, gc_state = {emqx_gc,#{cnt => {1000,1000}, oct => {,}}}, keepalive = {keepalive,#Fun<emqx_connection.1.>,164, 45, {keepalive,check}, #Ref<0...>,1}, enable_stats = true,stats_timer = , rate_limit = ,pub_limit = , limit_timer = ,idle_timeout = 15000}}]}]]}{status,<0.2192.0>, {module,gen_server}, [[{$ancestors,[emqx_session_sup,emqx_sm_sup,emqx_sup, <0.1561.0>]}, {force_shutdown_policy,#{max_heap_size => 0,message_queue_len => 0}}, {$logger_metadata$,#{client_id => <<"mqttbroker/slw">>}}, {guid,{17372,1072,0}}, {$initial_call,{emqx_session,init,1}}], running,<0.1713.0>,[], [{header,"Status for generic server <0.2192.0>"}, {data,[{"Status",running}, {"Parent",<0.1713.0>}, {"Logged events",[]}]}, {data,[{"State", #state{idle_timeout = 15000,clean_start = true, binding = local,client_id = <<"mqttbroker/slw">>, username = <<"mqttbroker/slw">>,conn_pid = <0.2190.0>, old_conn_pid = ,next_pkt_id = 1, max_subscriptions = 0, subscriptions = #{<<"mqttbroker/xxx">> => #{nl => 0,pktid => 1,qos => 2,rap => 0,rc => 0,rh => 0, <<"r">> => <<"02">>, <<"s">> => <<"EAB78B0E25D6C5A5EDCEEB78ABCB7B58">>}}, upgrade_qos = false, inflight = {emqx_inflight,32,{0,nil}}, retry_interval = 20000,retry_timer = , mqueue = {mqueue,true,1000,0,0,none,infinity, {queue,[],[],0}}, awaiting_rel = #{},max_awaiting_rel = 100, await_rel_timeout = ,await_rel_timer = , expiry_interval = 0,expiry_timer = , enable_stats = true,stats_timer = , gc_state = {emqx_gc,#{cnt => {1000,1000}, oct => {,}}}, created_at = {1570,,}, will_msg = ,will_delay_timer = }}]}]]}

conn进程中有session字段,session进程也有conn字段,可以保证他们相互找到对方,进行同步/异步调用。

2、订阅、取消订阅

具体的订阅步骤我放在后面的章节来讲,这里只看conn和session进程的逻辑,这样来看订阅与取消订阅差不多,我们以订阅为例。

conn进程收到SUBSCRIBE协议包后,校验权限后,调用 emqx_session:subscribe/4,实际是想session进程cast了消息 {subscribe, self(), SubReq},session进程处理完订阅逻辑后,给conn进程发送suback消息 From ! {deliver, {suback, PacketId, ReasonCodes}}

conn进程收到session进程的deliver消息后给设备端发送SUBACK消息。

3、心跳

心跳逻辑仅用的了conn进程,即conn收到PINGREQ立刻返回PINGRESP。

4、断开连接

图二中仅仅画出了主动断开逻辑,设备端发送DISCONNECT, conn进程执行正常stop,session进程监听到EXIT消息后自动退出。

5、发布

图三中为方便理解,省掉了消息路由与分发的具体逻辑(后面章节来讲),并刻意画出了一对一发消息的流程,方便分析conn、session进程在其中扮演的角色。

Qos0消息发布Qos0消息比较简单,从发送方来看,ConnA收到PUBLISH协议包,调用emqx_session:publish/3,执行结果调用emqx_protocol的puback/2,该函数Qos0不做处理。从接收方来看,分发到订阅该topic的B的步骤是向SessionB发送dispatch消息 SubPid ! {dispatch, Topic, Msg}。SessionB进程handle dispatch ,执行函数是 emqx_session:dispatch/2 ,Qos0消息直接do_deliver ,给ConnB进程发送deliver消息 ConnPid ! {deliver, {publish, PacketId, Msg}},ConnB进程收到deliver消息,调用 emqx_ptotocol:deliver/2 ,将PUBLISH协议包发送给DeviceB。Qos1消息发布Qos1消息相比Qos0消息多了一个PUBACK。从发送方看,ConnA收到PUBLISH包,调用emqx_session:publish/3,调用emqx_protocol的puback/2,Qos1消息deliver PUBACK 消息,通过 emqx_ptotocol:deliver/2 将PUBACK协议包发送给DeviceA。从接收方来看,Qos1比Qos0多了一个inflight的操作。当SessionB收到 SubPid ! {dispatch, Topic, Msg}消息,在给ConnB deliver ConnPid ! {deliver, {publish, PacketId, Msg}} 消息的同时,执行emqx_inflight:insert/3操作。ConnB将PUBLISH发送给DeviceB,DeviceB会回应PUBACK消息,ConnB收到PUBACK消息的时候会执行emqx_session:puback/2,实际上就是向SessionB执行cast调用, gen_server:cast(SPid, {puback, PacketId, ReasonCode}),SessionB收到cast调用时,执行emqx_inflight:delete/2操作。inflight飞行窗口操作是下行消息确保可达和保证消息顺序的逻辑,Qos2也有此逻辑,但是稍微不同。Qos2消息发布Qos2消息相比Qos0多了三次交互。从发送方看,ConnA收到PUBLISH包,emqx_session:publish/3函数执行时,先向SessionA进行call调用,gen_server:call(SPid, {register_publish_packet_id, PacketId, Ts}, infinity),等待SessionA在state中将要发布的消息插入到awaiting_rel中,再执行消息发布,用执行结果调用emqx_protocol的puback/2,ConnA会根据Qos2消息给DeviceA发送PUBREC协议包。DevcieA收到PUBREC会回应PUBREL协议包。ConnA收到协议包会执行emqx_session:pubrel/3,它会同步调用SessionA gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity) ,SessionA会将State里面的awaiting_rel 之前记录的消息删除, ConnA得到执行结果后给DeviceA发送PUBCOMP包。从接收方来看,Qos2的inflight操作略有不同。emqx_inflight:insert/3的操作时机相同,当SessionB分别收到ConnB的同步callgen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity)与异步cast gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}) ,sessionB会分别操作emqx_inflight:update/3emqx_inflight:delete/2。 Qos2的inflight飞行窗口操作同样是下行消息确保可达和保证消息顺序的逻辑。 Qos2在上行消息中比Qos1多了awaiting_rel的操作,是从发送方确保消息可达。

(二)Conn和Session进程的解读

本节仍然只关注进程间的消息流转。

1、Conn进程

先看连接层代码,主要看入口与出口 emqx_connection.erl

process_incoming(Data, State) -> Oct = iolist_size(Data), ?LOG(debug, "RECV ~p", [Data]), emqx_pd:update_counter(incoming_bytes, Oct), emqx_metrics:trans(inc, bytes/received, Oct), case handle_packet(Data, State) of {noreply, State1} -> State2 = maybe_gc({1, Oct}, State1), {noreply, ensure_stats_timer(State2)}; Shutdown -> Shutdown end. ...... %% Parse and handle packets ...... handle_packet(Data, State = #state{proto_state= ProtoState, parser_state = ParserState, idle_timeout = IdleTimeout}) -> try emqx_frame:parse(Data, ParserState) of {more, ParserState1} -> {noreply, State#state{parser_state = ParserState1}, IdleTimeout}; {ok, Packet = ?PACKET(Type), Rest} -> emqx_metrics:received(Packet), (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1), case emqx_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> handle_packet(Rest, reset_parser(State#state{proto_state = ProtoState1})); {error, Reason} -> ?LOG(warning, "Process packet error - ~p", [Reason]), shutdown(Reason, State); {error, Reason, ProtoState1} -> shutdown(Reason, State#state{proto_state = ProtoState1}); {stop, Error, ProtoState1} -> stop(Error, State#state{proto_state = ProtoState1}) end; {error, Reason} -> ?LOG(warning, "Parse frame error - ~p", [Reason]), shutdown(Reason, State) catch _:Error -> ?LOG(warning, "Parse failed for ~p~nError data:~p", [Error, Data]), shutdown(parse_error, State) end.

emqx_protocol.erl (process_packet)

received(Packet = ?PACKET(Type), PState) -> PState1 = set_protover(Packet, PState), trace(recv, Packet), try emqx_packet:validate(Packet) of true -> case preprocess_properties(Packet, PState1) of {error, ReasonCode} -> {error, ReasonCode, PState1}; {Packet1, PState2} -> process_packet(Packet1, inc_stats(recv, Type, PState2)) end catch ...... error : Reason -> deliver({disconnect, ?RC_MALFORMED_PACKET}, PState1), {error, Reason, PState1} end.

conn进程收到上行的数据后,执行handle_packet函数,emqx_frame:parse/2函数进行协议解析,将二进制数据转成term格式的协议包(CONNECT,CONNACK,SUBSCRIBE,SUBACK等等),emqx_protocol:received/2 校验协议包,并做预处理,emqx_protocol:proccess_packet/2函数根据接收到的协议包执行协议动作。我们记住这个函数:

emqx_protocol:process_packet/2,是处理设备端发来的协议包,并执行动作的重要函数。

emqx_connection.erl

handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> case emqx_protocol:deliver(PubOrAck, ProtoState) of {ok, ProtoState1} -> State1 = State#state{proto_state = ProtoState1}, {noreply, maybe_gc(PubOrAck, ensure_stats_timer(State1))}; {error, Reason} -> shutdown(Reason, State) end;

emqx_prptocol.erl

deliver({connack, ReasonCode}, PState) -> send(?CONNACK_PACKET(ReasonCode), PState); ...... send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) -> trace(send, Packet), case SendFun(Packet, #{version => Ver}) of ok -> emqx_metrics:sent(Packet), {ok, inc_stats(send, Type, PState)}; {error, Reason} -> {error, Reason} end.

从前面看下行消息有Session打过来的(比如ConnPid ! {deliver, {publish, PacketId, Msg}}),有Conn进程自己回应的(比如通过 emqx_ptotocol:deliver/2 将PUBACK协议包发送给DeviceA)。这两个地方都最终会调用emqx_procotol:send/2函数。

2、Session

这里也只关注流程图上的逻辑,主要关注handle_info/2handle_call/3handle_cast/2

来自Conn的订阅与取消订阅:gen_server:cast(SPid, {subscribe, self(), SubReq})gen_server:cast(SPid, {unsubscribe, self(), UnsubReq})来自路由投递的下行消息: SubPid ! {dispatch, Topic, Msg} 各种来自Conn的各种回应包处理:来自左端的ConnA: gen_server:call(SPid, {register_publish_packet_id, PacketId, Ts}, infinity)gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity)来自右端的ConnB: gen_server:cast(SPid, {puback, PacketId, ReasonCode})gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity)gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode})

来自左端的消息对应awaiting_rel的处理,来自右端的消息对inflight的处理,在没有画出流程图前,这是我曾经很迷惑的地方,现在则一目了然。

至此设备到Conn进程与Session进程之间的消息流转已经讲清楚了。