RocketMQ服务消费需要四个集群服务:Producer、Consumer、Broker以及NameServer,各个集群如何工作,服务之间如何交互,由于涉及内容较多,此篇暂不过多解释。
查看源码的最好方式就是debug,首先在码云上下载源码(ps:github对于国内网速有点不太友好,设置代理也不太好使):
,下载后如图:
各个model的作用以及相互关联,等整理好之后再详细说明。现主要的任务是将nameserver和broker启动。在本地配置好RocketMQ配置、日志等路径:C:\Users\lios\IdeaProjects\rocketmq(下文统称”rocketmq路径“),分别创建conf、logs、store三个文件夹。
第一步:配置nameserver及启动集群服务
将/distribution/conf下的logback_namesrv.xml文件放到rocketmq路径下的conf文件夹中,并且修改xml中的配置路径,改成创建好的rocketmq路径,如图:配置idea中的启动配置:
启动Nameserv,启动成功如图:
第二步:配置broker及启动服务
将distribution/conf下的broker.conf和logback_broker.xml文件复制到rocketmq路径下的conf文件夹中,将logback_broker.xml文件中的${user.home}路径替换为rocketmq路径,在broker.conf中新增如下配置:
##以下属于自己新增配置项#nameserAddr地址,集群用分号;分割namesrvAddr=127.0.0.1:9876#存储路径storePathRootDir=/Users/lios/IdeaProjects/rocketmq/store#commitLog存储路径storePathCommitLog=/Users/lios/IdeaProjects/rocketmq/store/commitLog#消息队列存储路径storePathConsumeQueue=/Users/lios/IdeaProjects/rocketmq/store/consumeQueue#消息索引存储路径storePathIndex=/Users/lios/IdeaProjects/rocketmq/store/index#checkPoint文件存储路径storeCheckpoint=/Users/lios/IdeaProjects/rocketmq/store/checkpoint#abort文件存储路径abortFile=/Users/lios/IdeaProjects/rocketmq/store/abor新增idea中的启动配置Broker,如图:
启动broker,成功如图:
第三步:启动producer和consumer
在example模块下的quickstart文件夹下有producer和consumer,直接启动即可,先启动producer,如图;
在Producer类中,默认使用“TopicTest”topic发送了1000条消息,由于此类是测试例子,所以发送完消息之后,用shutdown()方法关闭了Produce服务。
接下来启动Consumer,在Consumer类中,Nameserv并没有声明,需要声明如下:
#声明nameserv的IP和端口
consumer.setNamesrvAddr("127.0.0.1:9876");
声明完了启动立即消费刚刚Produce发送的消息,如下:
至此,Produce、Consumer、Broker、NameServer全部启动成功且正常服务,但是由于日常运维的web控制台呢?
新版的RocketMQ代码并没有将console放在同一代码仓库中,在码云上的地址:,web控制台启动较简单,修改application.xml的注册中心集群和控制台端口即可,如图:
随后启动App.class即可,控制台打印发送和消费日志如下:
web控制台如图:
到这,我们在本地启动和维护RocketMQ服务已经全部完成,接下来就是详细了解各个模块的作用。尽情期待。
写在最后:第一次在中写文章,如何把技术点、操作动作讲清楚,如何讲才能让大家记住,让大家更有兴趣看下去,对我来说还需要慢慢探索,请大家多多包涵和指出问题,多谢!