RabbitMQ新连接被拒绝由于java线程异步写法问题,怎么解决

由于Kafka采用解耦的设计思想,并非原始的发布订阅,生产者负责产生消息,直接推送给消费者。而是在中间加入持久化层——broker,生产者把数据存放在broker中,消费者从broker中取数据。这样就带来了几个好处:
1 生产者的负载与消费者的负载解耦
2 消费者按照自己的能力fetch数据
3 消费者可以自定义消费的数量
另外,由于broker采用了主题topic--&分区的思想,使得某个分区内部的顺序可以保证有序性,但是分区间的数据不保证有序性。这样,消费者可以以分区为单位,自定义读取的位置——offset。
Kafka采用zookeeper作为管理,记录了producer到broker的信息,以及consumer与broker中partition的对应关系。因此,生产者可以直接把数据传递给broker,broker通过zookeeper进行leader--&followers的选举管理;消费者通过zookeeper保存读取的位置offset以及读取的topic的partition分区信息。
根据Kafka官方的文档,Kafka可以被认为一个高大上的集群消息中间件,但是读了下以前一个朋友给的部署文档和Kafka的官方的文档。发现Kafka确实不错,真的可以说是集群消息中间件。
用topic来进行消息管理,每个topic包含多个part,每个part对应一个逻辑log,有多个segment组成。
segment中的消息id由其逻辑位置决定,可以用消息id直接定位到消息的存储位置,避免id到位置的额外映射。
生产者发到某个topic的消息会被均匀的分布到多个part上,broker收到消息会写入最后的segment文件中,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息消费者才能收到。并且通过rolling的机制,保证segment的文件不至于过大。
消费者可以rewind back到任意位置重新进行消费,当消费者故障时,可以选择最小的offset进行重新读取消费消息。
是不是看起来很爽,但是深入往下看,发现了一些深坑
Kafka对消息的重复、丢失、错误以及顺序型没有严格的要求。但是part只会被consumer group内的一个consumer消费,故kafka保证每个parti内的消息会被顺序的消费。
broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。同时broker是无状态的,broker不保存消费者的状态,由消费者自己保存。无状态导也致消息的删除成为难题,所以Kafka选择消息保存一定时间后会被删除。
大量的依赖Zookeeper,需要Zookeeper来管理broker与consumer的动态加入与离开。以及消费关系及每个partion的消费信息。
看到这里,你如果还明白我说这些深坑是什么意思,那就请带入运维场景和特定故障场景思考下。我稍后会说一下这些坑会带来什么问题。
关于RabbitMQ
RabbitMQ是使用Erlang开发的一个消息队列,可以构建成集群,也可以单独使用。
根据测试,RabbitMQ在不使用ACK机制的,Msg大小为1K的情况下,QPS可达6W+。再双方ACK机制,Msg大小为1K的情况下,QPS瞬间降到了1W+。从某种意义上RabbitMQ还真是慢,但是我们需要思考下。
我们真的每个消息都能到1K吗?
我们真的需要双方都对消息ACK的系统吗?
好了,如果两个回答都是YES,那么RabbitMQ就是慢的。如果是No,那么RabbitMQ还是一个非常快的队列。
RabbitMQ慢有几个原因:
RabbitMQ做为一个Broker,不单单做到了简单的数据转发功能,还保证了单个队列上的数据有序,即便是有多个消费者和多个生产者。
RabbitMQ的策略是实时转发,而不像Kafka那样等待刷盘之后才让消费者来消费。
如果消费者和生产者不对等,会产生大量的磁盘IO操作,进行消息换出。
RabbitMQ为什么不好用:
AMQP协议本身比较复杂,参数比较多。
Erlang写的,很多人不熟悉,并且Mnesia出现问题好多人解决不了。
RabbitMQ和Kafka相比没价值了吗?
很多亲们读到这里,就会想RabbitMQ好像也不怎么样呀。和Kafka相比没什么价值可言了,但是我前面说了一些Kafka的坑,我就在这里面揭示一下。
Kafka大量依赖Zookeeper,它的broker并不保存任何状态,如果Zookeeper集群不幸悲剧了,那么整个Kafka集群的消息就全完蛋了。
上面问题有人会说这概率好小,我也同样认为这个概率很小,那么一个broker当机呢?当一个broker当机了整个消息队列由于负载均衡的算法,在一瞬间消费者和生产者之间的消息就全乱掉了。很多需要保证消息顺序的系统一下子就完蛋了。
这就是RabbitMQ存在的价值和意义,同时RabbitMQ使用了MirrorQueue的机制,也可以做到多个机器进行热备。
RabbitMQ该怎么用
RabbitMQ的消息应当尽可能的小,并且只用来处理实时且要高可靠性的消息。
消费者和生产者的能力尽量对等,否则消息堆积会严重影响RabbitMQ的性能。
集群部署,使用热备,保证消息的可靠性。
Kafka该怎么用
应当有一个非常好的运维监控系统,不单单要监控Kafka本身,还要监控Zookeeper。
对消息顺序不依赖,且不是那么实时的系统。
对消息丢失并不那么敏感的系统。
PUB-SUB 发布订阅模型
Consumer消费消...
消息队列选型
Sep 27, 2015
什么是消息队列
顾名思义,消息队列就是用存放消息的队列结构,简称MQ。那什么是消息呢?广义上来说,所有的网络通信都可以看做是消息的传递。在通信的过程中,添...
消息队列中间件
消息队列中间件(简称消息中间件)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下提供应用解...
Kafka、RabbitMQ、RocketMQ等消息中间件的对比 —— 消息发送性能和优势
分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦。现在开源的...
redis rabbitmq kafka都有mq的功能但是之间还是有区别的。redis:轻量型的mq,如果量大,那么效率低。redis 消息推送(基于分布式 pub/sub)多用于实时性较高的消息推送...
RabbitMQ和Kafka转自通九大神的博客起因最近公司RabbitMQ的集群出了点问题,然后有些亲就说RabbitMQ慢且不好用,是一个瓶颈,不如换成Kafka。而我本人,使用RabbitMQ有一...
RabbitMQ和kafka从几个角度简单的对比
业界对于消息的传递有多种方案和产品,本文就比较有代表性的两个MQ(rabbitMQ,kafka)进行阐述和做简单的对比,
在应用场景方面,...
一、消息队列概述
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,Rabbi...
Kafka作为时下最流行的开源消息系统,被广泛地应用在数据缓冲、异步通信、汇集日志、系统解耦等方面。相比较于RocketMQ等其他常见消息系统,Kafka在保障了大部分功能特性的同时,还提供了超一流的...
没有更多推荐了,评论-2691&
消息队列在目前分布式系统下具备非常重要的地位,如下的场景是比较适合消息队列的:
跨系统的调用,异步性质的调用最佳。
高并发问题,利用队列串行特点。
订阅模式,数据被未知数量的消费者订阅,比如某种数据的变更会影响多个系统的数据,订单数据就是比较好理解的。
之前有一个场景是商品数据在修改后需要推送到elasticsearch中,由于修改产品的并发量以及数据量均不大,所以对于消息未做持久化,而且为了快速上线简化系统,生产者与消费者更是部署在一个应用中,自生产自消费。这篇将从头搭建RabbitMQ环境,并且将之集成在Spring boot中。
搭建RabbitMQ环境
由于RabbitMQ是基于erlang开发的,所以要安装RabbitMQ先必须安装erlang。
更换软件源
使用apt-get时默认的软件源是us.archive.ubuntu.com,这会经常发生安装问题,比如速度特别慢或者由于下载不了造成不能安装。
可以更换成国内的数据源cn.archive.ubuntu.com,速度那是不用说的了(这里感谢我的同事的提醒)。找到下面这个文件然后进行替换。
/etc/apt/sources.list
:%s/us.archive/cn.archive/g
在没有更新软件源时,我采取的是源码编译安装方法,参考这篇文章。我安装的是最新19.2版本,安装过程中还遇到各种问题就不一一记录了。
测试erlang安装是否正确,输入erl,如果看到如下图所示就说明安装成功了。
安装RabbitMQ
在未更换软件源之前我也是选择了源码编译安装方法,安装的最新的3.6.6,但手动启动时总是不成功,错误信息如下:
RabbitMQ 3.6.6+ erlang 19.2 启动失败的问题暂时未解决,有谁知道的可以告诉我。
由于启动不成功,最后在更新成国内软件源之后,再次通过 apt-get 安装RabbitMQ,默认的版本是3.5.7,好像也可以选版本,以后再尝试。可喜的是通过apt-get安装的RabbitMQ成功的启动起来了。可以通过如下命令查看RabbitMQ状态。
./rabbitmqctl stauts
RabbitMQ管理工具
这是自带的一个web插件,可以用来管理消息队列,启动它的方法比较简单:
rabbitmq-plugins enable rabbitmq_management
然后重启RabbitMQ即可生效。默认生成了guest用户,但这个guest用户只能在RabbitMQ所在主机才能访问,所以要想远程访问就需要重新分配一个用户,有两种办法:
通过网页,以guest登录然后在页面上完成操作。
通过命令,创建用户,授权也可以。
创建用户,指定用户名以及密码
./rabbitmqctl add_user root root //用户名密码都是root
分配角色,administrator是可以操作和guest本地用户一样的功能,当登录上rabbitmq_management之后,里面的所有功能都可以使用。
rabbitmqctl set_user_tags root administrator
授权,队列的操作管理权限。如果不配置,那么客户端在连接消息队列时会出问题。
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
上面语句我没有执行成功,后续再研究下是不是写法问题
Spring boot集成RabbitMQ
我们在rabbitmq_management上面可以正常访问操作后,就可以放心的写demo了,这里采用spring boot。先看简单看下,容易理解下面提到的一些组件。
生产者,消息,消费者
消息内部:Exchange,Binding,Queues
引用amqp的starter
&dependency&
&groupId&org.springframework.boot&/groupId&
&artifactId&spring-boot-starter-amqp&/artifactId&
&/dependency&
增加配置信息
这里没有采用自动配置
mq.rabbit.host=192.168.21.128
mq.rabbit.port=5672
mq.rabbit.virtualHost=/
mq.rabbit.username=root
mq.rabbit.password=root
创建RabbitMQConfig
ConnectionFactory,类似于数据库连接等。
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(this.mqRabbitHost,this.mqRabbitPort);
connectionFactory.setUsername(this.mqRabbitUserName);
connectionFactory.setPassword(this.mqRabbitPassword);
connectionFactory.setVirtualHost(this.mqRabbitVirtualHost);
connectionFactory.setPublisherConfirms(true);
return connectionF
RabbitTemplate,用来发送消息。
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
DirectExchange
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE_NAME);
Queue,构建队列,名称,是否持久化之类
public Queue queue() {
return new Queue(QUEUE_NAME, true);
Binding,将DirectExchange与Queue进行绑定
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(ROUTING_KEY);
SimpleMessageListenerContainer,消费者
需要将ACK修改为手动确认,避免消息在处理过程中发生异常造成被误认为已经成功消费的假象。
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
logger.info("消费端接收到消息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
服务端,业务逻辑,调用消息队列。
为了让客户端知道消息是否已经成功,消息队列提供了回调机制(需要实现ConfirmCallback),当消息服务器接收到消息之后会给客户端一个通知,此时客户端根据消息应答来决定后续的流程。
public class ProductServiceImpl extends BaseService implements ProductService, RabbitTemplate.ConfirmCallback {
@Autowired
private ProductMapper productM
private RabbitTemplate rabbitT
public ProductServiceImpl(RabbitTemplate rabbitTemplate){
this.rabbitTemplate=rabbitT
this.rabbitTemplate.setConfirmCallback(this);
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
this.logger.info(" 消息id:" + correlationData);
if (ack) {
this.logger.info("消息发送确认成功");
this.logger.info("消息发送确认失败:" + cause);
public void save(Product product) {
//执行保存
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, product.getName(),correlationId);
可以清晰的看到RabbitMQ发给生产者的信息收到的确认信息,也能看到消息被消费端消费后的信息。
RabbitMQ的其它方面
高可用方案
与常见的数据库类似,都是主从模式来保证高可用,可以利用HAProxy来实现主从备份方案。
水平扩展方案
主要是为了解决垂直优化的瓶颈问题,主要有这三种:
clustering,这是默认内置的一种集群模式,与下面两种不同的是clustering一般应用于同一局域网。
federation,有待后续学习
shovel,有待后续学习
不丢消息特性
这个不是RabbitMQ的专利,将消息持久化可以确保RabbitMQ重启或者死机过程中不至于丢掉没有消费的消息。
消息不被重复消费
这点要靠消费端来完成,尽管消费端可以通过ACK来通知消息队列消息已经被消费,但如果消费端消费了消息,此时ACK过程中的通知出现异常,消息队列会认为消息未被消费会继续发给消费端。
初次安装可能会出现一堆问题,特别是需要安装所依赖的众多包。RabbitMQ与Erlang可能存在版本依赖问题待后续确认。spring boot下集成RabbitMQ异常简单,可以根据需求部署集群来实现可扩展高可用的消息系统。
阅读(...) 评论()关于bootstrapValidator的AJAX提交有几种方法:
1、form中一定要放一个类型为submit的按钮,然后添加 success.form.bv 事件,如下
1 on('success.form.bv', function(e) {
e.preventDefault();
var form = $(e.target);
/**提交代码**/
2、如果form中没有submit类型的按钮(项目要求,需要在表单外部按钮提交),除了添加1的代码外,外部按钮事件代码如下
form.submit(function(e){
e.preventDefault();//必须添加,不然就重复提交
3、不使用 success.form.bv 事件,使用 submitHandler 方法,这应该是官方推荐的方法。不过,这个方法在源码中是找不到的,需要手动修改源码。如果使用 submitHandler 方法,form中一定要放一个类型为submit的按钮。
1.提交表单的时候用按钮类型submit的时候,提交ajax会导致重复提交,解决办法是把按钮类型改为button。
2.将提交按钮类型改为button后,bootstrapValidator校验失效...
修复BootstrapValidator重复提交的bug
问题:经常编辑信息时出现通过bootstrap validator校验Ajax请求提交两次问题
原因:可以看到remote那里有一个ajax验证重名,效果是bootstrapValidator没有等...
verbose : false // 同一个字段多个验证时,一个验证没有通过不会去验证该字段的其它验证规则trigger: 'click focus input' // 表示字段触发字段验证的动作, ...
当我们在使用bootstrapValidator今天验证的时候,大多数情况都是直接使用validate这个这个方法,大多数情况是正常的,不会有任何问题,但是当我们在写代码的时候,居然有的时候回碰到自动...
bootstrapValidator验证时,使用submit提交表单,验证通过,页面没有错误信息。但submit按钮未没有提交。原因 : submit标签的name或id属性值为submit。$('#...
问题描述:在使用bootstrapValidator插件校验表单属性,当表单属性过多需要每行并列多个属性 ,会出现校验第一个属性,发现整行被校验的效果 ,这不是我们工作想要的效果。如图:问题分析:因为...
bootstrapValidator 使用中,由于字段检查等原因,致使提交按钮失效。如何重新启用提交按钮呢?下面一句代码可以实现启用提交按钮:$('#loginForm').bootstrapVali...
用bootstrap validator验证表单后,点提交试了各种办法都无法提交。
最后是这么解决的:
document.getElementById(&addUserForm&).submit(...
刚开始写博客,都说不写博客不能成为大神,这个习惯倒是不错,以前遇到问题一直在百度上问,是时候给别人做做贡献了,刚使用bootstrap框架,这个前端框架确实很厉害,不过因为一直是自己一个人研究,网上的...
没有更多推荐了,RabbitMq应用一的补充(RabbitMQ的应用场景) - CL静淡 - 博客园
随笔 - 19, 文章 - 0, 评论 - 12, 引用 - 0
直接进入正题。
一.异步处理
场景:发送手机验证码,邮件
传统古老处理方式如下图
这个流程,全部在主线程完成,注册-》入库-》发送邮件-》发送短信,由于都在主线程,所以要等待每一步完成才能继续执行。由于每一步的操作时间响应时间不固定,所以主线程的请求耗时可能会非常长,如果请求过多,会导致IIS站点巨慢,排队请求,甚至宕机,严重影响用户体验。
现在大多数的处理方式如下图
这个做法是主线程只做耗时非常短的入库操作,发送邮件和发送短信,会开启2个异步线程,扔进去并行执行,主线程不管,继续执行后续的操作,这种处理方式要远远好过第一种处理方式,极大的增强了请求响应速度,用户体验良好。缺点是,由于异步线程里的操作都是很耗时间的操作,一个请求要开启2个线程,而一台标准配置的ECS服务器支撑的并发线程数大概在800左右,假设一个线程在10秒做完,这个单个服务器最多能支持400个请求的并发,后面的就要排队。出现这种情况,就要考虑增加服务器做负载,尴尬的增加成本。
消息队列RabbitMq的处理方式
这个流程是,主线程依旧处理耗时低的入库操作,然后把需要处理的消息写进消息队列中,这个写入耗时可以忽略不计,非常快,然后,独立的发邮件子系统,和独立的发短信子系统,同时订阅消息队列,进行单独处理。处理好之后,向队列发送ACK确认,消息队列整条数据删除。这个流程也是现在各大公司都在用的方式,以SOA服务化各个系统,把耗时操作,单独交给独立的业务系统,通过消息队列作为中间件,达到应用解耦的目的,并且消耗的资源很低,单台服务器能承受更大的并发请求。
二.应用解耦
以电商的下订单为例子,假设中间的流程为下单=》减库存=》发货
第一种方式,通过连续操作表,在单一系统中,通过主线程,连续操作。呵呵哒,这种做法,相信很多人刚入门,甚至几年经验了,由于项目小,也在继续使用吧。用户量少,或者都是内部人使用,必然没问题,因为不会在意出的问题,这种做法,只要一个环节出问题,请求直接报错,导致用户懵逼,假设在执行到减库存操作报错了,整个流程没有用事务回滚的话,还会造成数据不一致。
第二种方式,把这三个业务,拆成三个独立系统,通过JSON方式相互调用请求。这个做法,其实已经很不错了,起码独立出来,各自做各自的事情,一定程度上减小了整个系统的耦合性。但是问题是,就算是通过API形式请求,发送请求的系统一般情况下会等待被请求方的响应,如果响应错了,整个程序还是会终止,前面的业务系统假如已经做了入库操作,就必须要混滚了。很麻烦。如果说不等待被请求方响应的话,如果出错,如果还要保证数据一致性,就要做更多的操作,去补全数据,比如,下单成功,减库存失败,发货成功,这样当减库存系统修复后,就要通过订单数据,去补库存表的对应数据。先对麻烦,难处理。
第三种方式,
把消息队列作为中间件,当订单系统下完单后,把数据消息写入消息队列中,库存系统和发货系统同时订阅这个消息队列,思想上和纯API系统调用类似,但是,消息队列RabbitMq本身的强大功能,会帮我们做大量的出错善后处理,还是,假设下单成功,库存失败,发货成功,当我们修复库存的时候,不需要任何管数据的不一致性,因为库存队列未被处理的消息,会直接发送到库存系统,库存系统会进行处理。实现了应用的大幅度解耦。
三.流量削峰
这个主要用在团购,秒杀活动中
这个主要原理就是,控制队列长度,当请求来了,往队列里写入,超过队列的长度,就返回失败,给用户报一个可爱的错误页的等等。
四.日志处理
这个场景应该都很熟悉,一个系统有大量的业务需要各种日志来保证后续的分析工作,而且实时性要求不高,用队列处理再好不过了
五.消息通讯
现在上线的各大社交通讯项目中,实际上是没有用消息队列做即时通讯的,但是它确实可以用来做,有兴趣的不妨去试试吧
这个是点对点通信,消费者A和B同时订阅消息队列,又同时是制造者,就能实现点对点通信
群聊的做法,所有客户端同时订阅队列,又同时是发送,制造者。
-------------------------------------------------------------------------------------------------------------------------------------------------------------
上述大致的5种RabbitMq的应用场景,下面来介绍几个消息队列的区别
ActiveMq:这个应用于JAVA中间件较多
ZeroMq:这个是分发效率最高的队列,是其他队列的十倍以上,缺点是不能数据持久化。
kafka:这是一种高吞吐量的发布订阅消息系统,当每秒达到10W+的分发要求时,可以用这个尝试,新浪微博就是用这个做分发。
先写这么多吧,大致的应用场景,欢迎各路大神来补充,我也是公司需要,学习整理出来的,可能会有理解偏差,见谅哈!}

我要回帖

更多关于 线程同步问题是指 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信