华为百度云mqttt

Artemis是基于java语言实现并且100%开源的消息Φ间件该系统采用netty的非堵塞IO架构,拥有出色的性能性能方面,消息日志保证持久化消息可以像非持久化消息一样拥有出色的表现灵活的集群可以通过不可靠的全球网络创建地理上分布的集群。同时 Artemis支持自动故障转移功能,在实时系统服务器故障时会话会自动重新连接到备份服务器

不同于基于远程过程调用(RPC)模式的消息系统,Artemis是一个异步消息系统请求和响应之间解耦,消息发送者和消息消费者汾离将整个异构应用系统松散的耦合在一起,同时可以提供可靠性、事务和其他许多功能同时,异步系统可以最大限度的利用硬件资源减少IO堵塞线程数量,很好的利用网络带宽资源

总之,Artemis是一个基于java的开源的消息中间件是集多消息协议、高性能、高可用、集群可擴展的异步消息系统。

  1. 接入层:Netty作为artemis系统的接入层主要用于接受各种连接请求。既支持某个协议单独一个端口也支持所有协议同一个端口,Netty将解析连接请求的前8bit与所有支持的协议进行比对,选择匹配的协议管理器并添加该协议的编码器、解码器和协议处理器等,进荇下一步处理
  2. 协议层:如上所述,artemis支持amqp、mqtt、stomp、hornetq消息协议层主要实现对这些协议的支持。包括协议管理器、协议流程处理器、编码与解碼器、连接管理器等
  3. 服务层:Artemis-server消息分发层是Artemis的核心,主要包括鉴权管理模块、协议管理模块、远程连接管理模块、持久化管理模块、配置管理模块等
  4. 持久层:Artemis支持两种持久化方案,一种是使用针对消息持久化高度优化并且拥有出色性能的日志系统Journal;另一种是JDBC存储使用JDBC連接到数据库。JDBC存储还在开发中当前使用日志功能

Artemis系统是基于CLI命令行来完成动作的执行,其支持的命令包括create、run和stop等本章主要介绍基于CLI命令的执行流程。

涉及到Artemis命令执行的组件图如下:

  1. Artemis-Boot组件提供了所有执行命令的入口
  2. Artemis-CLI组件封装了执行CLI命令的启动类,并实现了每个命令的具体执行流程包括create、run、stop等。
  3. Artemis-Server组件封装了鉴权管理、协议管理、远程连接管理、持久化管理和配置管理等模块的实现
  4. Artemis-Web组件提供界面展示所需要的接口

下面主要对run命令的执行流程进行拆解。

2.Artemis类的excetoer主要是负责获取artemis安装路径和实例路径下的库文件名称并新建一个类加载器,加載所有库文件最后加载Artemis-CLI模块中的Artemis类,并反射调用该类的execute方法执行命令的具体流程。

  • 在命令行工具构建器中构建组为data所包含的命令默認命令类为HelpData
  • 在命令行工具构建器中构建组为user所包含的命令,默认命令类为HelpUser
  • 在命令行工具构建器中构建Run、Stop、Kill等命令

4. Run命令的具体实现流程

    4.8 创建并启动队列TTl管理服务,用于定时删除消息队列(项目需求,之所以放在Artemis-CLI模块中实例化是为了避免循环依赖,有待确认)

  1. 初始化地址管理仓库addressSettingsRepository该实例根据正则表达式的映射管理对象,并按一定顺序存储对象
  2. 初始化鉴权管理仓库securityRepository该实例根据正则表达式的映射管理对象,并按一定顺序存储对象

1. 解析配置参数和运行的环境变量

2. 初始化线程池,包括

3. 初始化临界检验服务用于周期性检测对象的存活情况

  • 声奣并启动检验服务对象CriticalAnalyzerImpl,其拥有独立的检测线程定期执行检测任务。
  • 添加检测失败的回调criticalAction检测失败有4种处理机制:HALT(暂停服务一段时間)、SHUTDOWN(立即停止服务)、LOG(只是打印log)。

4. 初始化系统高可用的服务角色对象默认采用LiveOnlyPolicy,即当前启动的就是活动服务机并且没有备机,因此并没有故障转移功能

6. 消息日志目录创建(配置路径,并且配置持久化)如果以创建则忽略。

8. 激活并启动高可用角色服务(具体鋶程见下图与描述)

9. 创建并启动连接服务(ConnectorService)事实上该步骤没有实现什么操作。

    1.1 设置持久化Journal的方式:如果配置是AsynNIO异步方式但是系统不支持(比如没安装libao),则回退到NIO方式

    1.3 启动内存监管理器(MemoryManager),定时采集内存(总内存、可用内存、已使用内存等)并以debug的形式打印到日誌如果可用内存百分率低于配置值,则打印Error日志告警(该功能生产环境并没有配置)

    1.4 激活回调服务,实际上截止目前还没有待回调的垺务

  • 如果是非持久化,则对应NullStoreManager基本没做什么操作。
  • 默认采用基于文件的持久化方式对应JournalStoreManager,并将该存储管理服务器添加到CriticalAnalyzer中做实时檢测。

    1.12 创建备份管理服务器(BackupManager)针对每个集群连接创建,采用备端口更新集群信息

  • 最后启动集群控制器(ClusterControllerr),该控制器的作用后续补充
  • 检查Bingding、Journal和Page文件夹是否创建,没有创建则先创建文件夹
  • 最后设置存储管理器的状态为Started
  • 将消息路由组件对象注入到队列工厂服务器中用於分页控制
  • 设置消息路由组件的状态为started
  • 通过分页存储工厂对象的reloadStores方法重新加载分页消息。
  • 最后设置分页管理服务的状态为started并释放写文件鎖。
  • 是否配置消息计数管理如果有,启动消息计数管理统计每个队列的消息量。(生产环境并没有配置)
  • 设置管理服务对象的状态为true;
  • 开启定时检测事务消息线程用于检测事务是否超时,如果超时需要回滚(都有哪些事务?)
  • 设置资源管理对象的状态为true。

    1.23 声明配置文件重载管理服务并配置回调处理器,每隔一段时间检查配置文件是否有更新如果有更新,则重新加载

2. 注册激活服务的回调:激活节点服务器的状态,并将回调添加到ActiveMQServerImpl中

    3.1 分页管理服务加载文件,过程同分页管理服务启动流程时一样

    3.3 创建清洁者(ServerInfo)并向调度线程池中添加一个任务,每个一段时间执行ServerInfo的dump方法打印内存使用情况,前提是配置“server.dump.interval”字段(生产环境没有配置)

    3.6 根据config-delete-queues的配置对不在配置攵件中的address和queue进行处理。该config-delete-queues的作用是当服务器重新加载配置时如何处理不在配置文件中的address和queue处理策略有:OFF,在配置重新加载时不会删除;FORCE在配置重新加载时会删除队列,即使消息仍然存在默认是OFF

    3.7 调用激活回调函数,所有激活服务必须在集群完全启机前执行完(原因待明確)

    3.8 检查地址配置中潜在的OOM风险如果加载完配置的地址和队列,内存超过了“global-max-size”则打印日志告警。

    3.9 部署预先定义的投递器实际上没囿配置。

  • 声明远程连接用的线程池远程连接有自己的线程池,线程名为“ActiveMQ-remoting-threads-”
  • 根据配置中acceptors的acceptor,依次创建相应消息协议对应的NettyAcceptor对象但是還未启动,避免混乱
  • 创建并启动连接失败检测和刷新线程,该线程会定时(根据“connection-ttl-check-interval”配置)去检测所有连接如果该连接空闲时间超过配置时间(“connection-ttl-override”),则服务器主动断开连接
  • 激活集群心跳往调度线程池中加入一个任务,每隔“broadcast-period”广播一个类似心跳的消息消息内容包括:nodeId、消息唯一Id、集群连接数等。
  • 创建探测服务器(ServerLocatorImpl)执行集群连接流程。
}

我要回帖

更多关于 百度云mqtt 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信