目录
路由层与分布层
上一篇文章,将消息订阅与投递过程中偏重Conn进程与Session进程间的关系;这一篇文章则偏重于系统如何维护路由层和分布层数据以及如何通过他们组织分布式服务并进行消息投递的。
(感兴趣的可以看看上一篇,两篇文章一起“食用”更香哦( ^_^))
图四 订阅与发布订阅发布是一个解耦的过程,图三中一对一消息发布只是为了方便理解,真正的订阅发布应该是图四这种,同一个消息可以有多个设备订阅接受,并且还可以进行通配订阅的。
一、整体介绍
1、路由层
路由层维护订阅者(subscriber)与订阅关系表(subscription),并在本节点发布订阅模式派发(Dispatch)消息:
图五 路由层设计消息派发到会话(Session)后,由会话负责按不同 QoS 送达消息。
路由层分为两种,普通订阅和共享订阅。涉及的表如下:
普通订阅: emqx_suboption、emqx_subscription、emqx_subscriber共享订阅: emqx_suboption、emqx_subscription、emqx_shared_subscription(全局)、emqx_shared_subscriber、emqx_alive_shared_subscribersemqx_suboption字段说明示例key主键{<0.511.1>,<<"test/set/#">>} SPid和Topic组成的元组valueSubMap#{nl => 0,pktid => 1,qos => 1,rap => 0,rc => 0,rh => 0,share => <<"[email protected]">>,subid => <<"broker/tcp">>} share关键字在共享订阅Topic时才出现set表。这个表是和Session进程关联最密切的表,每个Session进程会在State里面存着自己进程订阅的Topic的信息,如下:
subscriptions = #{<<"test/set/#">> => #{nl => 0,pktid => 1,qos => 1,rap => 0,rc => 0,rh => 0,share => <<"[email protected]">>}},emqx_subscription字段说明示例keySession进程ID 主键<0.511.1> SPidvalueTopic<<"test/set/#">> Topic订阅关系表,SPid为主键,duplicate_bag表。普通/共享订阅时使用。 目前主要的使用的地方有两处,一处是给http Api 提供查询使用,一处是处理Session进程挂掉时执行unsubscribe操作。
emqx_subscriber字段说明示例keyTopic 主键<<"test/set/#">> Topic / {shard,<<"test/set/#">>,1} 分片时使用的keyvalueSession进程ID 或者分片<0.511.1> SPid / {shared, 1} 分片时使用的value订阅者表,Topic为主键,bag表。普通订阅使用。 是下发消息使用的重要的表。下发时以Topic查询所有订阅者的SPid。 因为是bag表,执行取消订阅时,在删除后,需要判断表中是否还存有key决定是否do_delete_route。
关于分片:当订阅订阅同一个Topic的设备太多时,publish消息时会出现大key查询,影响效率,所有当订阅者超过1024个时会出现分片存在。分片存储是将一条数据拆成两条数据。 举个例子 {<<"test/set/#">>, <0.511.1>} ,插入时发现这个Topic的订阅者已经超过1024个,就会进行分片,分两次插入{{shard,<<"test/set/#">>,1},<0.511.1>} 、 {<<"test/set/#">>, {shard,1}}。分发消息时,会先命中后一项,在根据{shard,1} 去命中前一项。 例子中的1是获取的分片id,可以理解为分表id。emqx_shared_subscription字段说明示例group组<<"[email protected]">>topicTopic<<"test/set/#">>subpidSessionPid<0.511.1>共享订阅关系表 ,以Group、Topic为维度,bag表。共享订阅时使用。 是共享订阅下发消息使用的重要的表。 下发时以Group、Topic查询所有订阅者的SPid。
emqx_shared_subscriber字段说明示例key主键{<<"[email protected]">>,<<"test/set/#">>} Group和Topic组成的元组valueSession进程ID<0.511.1> SPid共享订阅者表,bag表,以{Group,Topic}为主键,共享订阅时使用。 因为是bag表,可以用来在订阅/取消订阅时判断是否需要do_add_route/do_delete_route。 订阅操作时,插入emqx_shared_subscriber前,表中没有此key需要do_add_route;取消订阅时,删除emqx_shared_subscriber后,表中仍存在此key,就不能执行do_delete_route。
emqx_alive_shared_subscribers字段说明示例key主键<0.511.1> SPid用来跟踪订阅Session进程的状态的,表中只存活着非本节点的订阅者SessionPid。 SessionPid进程挂掉时,删除该项。并取消该进程的所有共享订阅信息。
2、分布层
分布层维护全局主题树(Topic Trie)与路由表(Route Table)。主题树由通配主题构成,路由表映射主题到节点:
图六 分布层设计(一)分布层通过匹配主题树(Topic Trie)和查找路由表(Route Table),在集群的节点间转发路由 MQTT 消息:
图七 分布层设计(二)分布层主要涉及的表如下: emqx_route (全局)、emqx_trie(全局)、emqx_trie_node(全局) 当topic中出现通配符时,订阅动作才会写入emqx_trie_node、emqx_trie两张表。 下发消息时,会首先查找后两张表,看有没有满足通配匹配的TopicFilter。
emqx_route字段说明示例topic主键<<"test/set/#">> Topicdest目标节点[email protected] / {<<"[email protected]">>, [email protected]}查找路由表,以topic为主键,bag表。消息下发时确定消息路由节点。
emqx_trie_node字段说明示例node_id主键以<<"test/set/#">>为例, node_id 可以为以下值 <<"test">> <<"test/set">> <<"test/set/#">>edge_count子节点数量topicTopic<<"test/set/#">> / undefind ,只有叶子节点才有值flagsemqx_trie字段说明示例egde主键#trie_edge{node_id = <<"test/set">>,word = #}node_id子节点node_id<<"test/set/#">>这两张表共同构成了匹配主题树表。
二、举例说明
前面匹配主题树表结构有点复杂,我们举个例子讲解一下。
当A订阅 <<"test/set/nick">> 时, 因为这个TopicFilter中没有通配符,所有直接插入emqx_route表中 {<<"test/set/nick">>,PidA}。 当B订阅 <<"test/set/#">>时,因为这个TopicFilter有通配符,需要先插入匹配主题树,最后再在emqx_route 中插入{<<"test/set/nick">>,PidA}。
我们细看插入匹配主题树的过程,先将<<"test/set/#">> 进行分词拆分 [<<"test">>,<<"set">>,#],插入emqx_trie中,我们有了以下记录:
[ #trie{edge = #trie_edge{node_id = root,word = <<"test">>}, node_id = <<"test">>}, #trie{edge = #trie_edge{node_id = <<"test">>, word = <<"set">>}, node_id = <<"test/set">>}, #trie{edge = #trie_edge{node_id = <<"test/set">>, word = #}, node_id = <<"test/set/#">>}]emqx_trie的结构就是 {{node_id, word}, child} 再看一下
{root, <<"test">>}, <<"test">> {<<"test">>, "set"} , <<"test/set">> {<<"test/set">>, #}, <<"test/set/#">>除了根节点, node_id 拼接 word 就是 child 的node_id ,通过这样的方式够成了树。
我们查看emqx_trie_node:
[ #trie_node{node_id = root,edge_count = 1,topic = , flags = }, #trie_node{node_id = <<"test">>,edge_count = 1, topic = ,flags = }, #trie_node{node_id = <<"test/set">>,edge_count = 1, topic = ,flags = }, #trie_node{node_id = <<"test/set/#">>,edge_count = 0, topic = <<"test/set/#">>,flags = }]这张表记录了所有的节点,和子节点数量,只有叶子节topic有值。
下面我们看一下发布流程: 当C向<<"test/set/nick">> 发布一条消息时。
会先查emqx_trie和emqx_trie_node两种表,同样是先把Topic进行拆分, [<<"test">>,<<"set">>,<<"nick">>],加上根节点root,即 [root, <<"test">>,<<"set">>,<<"nick">>];先按{root, #} 查询emqx_trie表,此时记录为空,如果有会存入Acc里面,再查 {<<"test">>, <<"set">>} 和 {<<"test", +>>} 查到记录就递归查询,前者有记录 {{<<"test">>,<<"set">>},<<"test/set">>} ,所以递归查询{<<"test/set">>, <<"nick">>} 和 {<<"test/set">>, +} ,查到结果为空,此时分词表到头了;接着查询{<<"test/set">>, #} 查到结果是{{<<"test/set">>, #}, <<"test/set/#">>} ,根据这条记录查询emqx_trie_node表,得到 #trie_node{node_id = <<"test/set/#">>,edge_count = 0,topic = <<"test/set/#">>,flags = },至此我们通过匹配主题树查到了一个topicFilter <<"test/set/#">>。接着我们通过前面查到的<<"test/set/#">> 和最初的 <<"test/set/nick">> 一起查询emqx_route表,根据路由表将消息路由的目标节点去执行下一步的分发。这里我们关注一下匹配主题树,除了首尾尾查询{root, #}、{<<"test/set">>, #, 其他环节都是在查询一个word和+二叉树root / \ # test / \ + set / \ +#查询过程就是这颗树。
具体代码如下: emqx_trie.erl
match(Topic) when is_binary(Topic) -> TrieNodes = match_node(root, emqx_topic:words(Topic)), [Name || #trie_node{topic = Name} <- TrieNodes, Name =/= ]. match_node(root, [NodeId = <<$$, _/binary>>|Words]) -> match_node(NodeId, Words, []); match_node(NodeId, Words) -> match_node(NodeId, Words, []). match_node(NodeId, [], ResAcc) -> mnesia:read(?TRIE_NODE, NodeId) ++ match_#(NodeId, ResAcc); match_node(NodeId, [W|Words], ResAcc) -> lists:foldl(fun(WArg, Acc) -> case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = WArg}) of [#trie{node_id = ChildId}] -> match_node(ChildId, Words, Acc); [] -> Acc end end, match_#(NodeId, ResAcc), [W, +]). %% @private %% @doc Match node with #. match_#(NodeId, ResAcc) -> case mnesia:read(?TRIE, #trie_edge{node_id = NodeId, word = #}) of [#trie{node_id = ChildId}] -> mnesia:read(?TRIE_NODE, ChildId) ++ ResAcc; [] -> ResAcc end.接着我们串一下路由层如何继续分发吧。前面分布层我们查询emqx_route 得到了 [{<<"test/set/nick">>, Node1}, {<<"test/set/#">>, Node2}] ,Node1和Node2如果是普通订阅,则forward到目标节点(目标节点是本地则省略该步骤),查询emqx_subercriber表中SessionPid,直接send SubPid ! {dispatch, Topic, Msg},后续流程就接上前面章节的会话层逻辑了。 (如果出现分片情况,参见前面emqx_subscriber的逻辑)。如果是共享订阅呢,也省掉了forward逻辑(因为emqx_shared_subscription是全局表),进入共享订阅的分发逻辑,共享订阅的emqx_route 记录格式类似这样{<<"test/set/nick">>, {Group,Node}} ,实际上Group就是binary格式的Node。共享订阅的逻辑就是从emqx_shared_subscription表中pick一个会话,发布消息,失败了则进行下一个,直到有一个成功。共享订阅的概念就是从订阅这个Topic的Subscriber中选择一个进行投递,保证只有一个人收到。三、结语
本文仅仅从整体上梳理了一下emqx3.0的功能流程,各个环节点到为止,后续有机会再出一些具体功能的详细介绍吧。
四、附
https://github.com/emqx/emqx https://docs.emqx.io/broker/v3/cn/design.html#distributed-layer
看完还不快快点赞,嗷呜~(>o<)~呜嗷