kafkakafka 发送消息非常慢的时候报超时,有人遇到过吗

[转载]Kafka发送消息到HDFS
本文采用的是kafka0.7.2,安装好kafka后在kafka的contrib目录下有关于kafka与hadoop的一系列文件,我们可以使用hadoop-consumer目录下的脚本及配置文件将kafka中某topic的信息发送到HDFS中。
1.修改test目录下的test.properties的配置文件。
kafka.etl.topic:topic的名称
hdfs.default.classpath.dir:hdfs的类路径
input:HDFS的输入路径
output:HDFS的输出路径
2.生成topic的offset
执行命令./run-class.sh kafka.etl.impl.DataGenerator
test/test.properties,会在HDFS的/tmp/kafka/data目录下生成.dat的文件
3.复制kakfa的相关jar到HDFS的类路径
执行命令./copy-jars.sh /usr/lib/hadoop/lib
4.运行hadoop job
./run-class.sh kafka.etl.impl.SimpleKafkaETLJob
test/test.properties,该job会将kafka中test-topic该话题的消息写入HDFS中。
在HDFS的/tmp/kafka/output目录中我们会看到写入的消息。
以上网友发言只代表其个人观点,不代表新浪网的观点或立场。问题对人有帮助,内容完整,我也想知道答案
问题没有实际价值,缺少关键内容,没有改进余地
kafka 典型的场景是日志场景做数据分析,但是对于聊天服务器或者推送场景这种场景有人测试过吗?
这两种场景的区别:
日志类:连接到中心服务器的终端较少并且比较固定,但是终端与服务器交换的数据量很大。
推送或聊天:连接到中心服务器的终端很多并且不固定,但是交换的数据量不大。
答案对人有帮助,有参考价值
答案没帮助,是错误的答案,答非所问
用传统mq就可以了
分享到微博?
你好!看起来你挺喜欢这个内容,但是你还没有注册帐号。 当你创建了帐号,我们能准确地追踪你关注的问题,在有新答案或内容的时候收到网页和邮件通知。还能直接向作者咨询更多细节。如果上面的内容有帮助,记得点赞 (????)? 表示感谢。
明天提醒我
关闭理由:
删除理由:
忽略理由:
推广(招聘、广告、SEO 等)方面的内容
与已有问题重复(请编辑该提问指向已有相同问题)
答非所问,不符合答题要求
宜作评论而非答案
带有人身攻击、辱骂、仇恨等违反条款的内容
无法获得确切结果的问题
非开发直接相关的问题
非技术提问的讨论型问题
其他原因(请补充说明)
我要该,理由是:kafka - 推酷
kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:
1、通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
2、高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
3、支持通过kafka服务器和消费机集群来分区消息。
4、支持Hadoop并行数据加载。
Kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。
了解一下Kafka的基本概念。
(1)Kafka维护按类区分的消息,称为主题(topic)
(2)生产者(producer)向kafka的主题发布消息
(3)消费者(consumer)向主题注册,并且接收发布到这些主题的消息
(4)kafka以一个拥有一台或多台服务器的集群运行着,每一台服务器称为broker
Kafka分布式集群的创建主要有三种模式:
(1)、Single node – single broker集群;
(2)、Single node – multiple broker集群;
(3)、 Multiple node – multiple broker集群。
如何部署这三种模式的Kafka集群?
以多节点多broker为例说明:
一、环境:
m1: 192.168.1.172
m2: 192.168.1.186
m3: 192.168.1.187
二、软件需要及环境:
kafka_2.10-0.8.2.1.tgz
zookeeper-3.4.6.tar.gz
jdk-6u45-linux-x64-rpm.bin
chmod 777 jdk-6u45-linux-x64-rpm.bin
./jdk-6u45-linux-x64-rpm.bin
修改环境变量:
export JAVA_HOME=/usr/java/jdk1.6.0_45
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
三台机器分别安装 zookeeper(之前有介绍,这里不重复) kafka(解压即可用)
目录位置:/usr/local/zookeeper
/usr/local/kafka
修改配置文件:
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=
log.dirs=/tmp/kafka-logs-1
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=m1:1,m3:2181
zookeeper.connection.timeout.ms=6000
broker.id: 保证集群里面是唯一的
log.dirs:如果是单节点多broker时,设置不同目录 &日志目录
zookeeper.connect:这个是zookeeper集群里面每个节点的地址
三、三台配置文件全部修改好之后,启动:
# bin/kafka-server-start.sh config/server.properties &
..........
[ 11:08:48,789] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[ 11:08:48,839] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
四、创建topic:
# bin/kafka-topics.sh --create --zookeeper m1:1,m3:2181 --replication-factor 3 --partitions 1 --topic my-test-1
Created topic &my-test-1&.
[ 11:13:38,363] INFO Completed load of log my-test-1-0 with log end offset 0 (kafka.log.Log)
[ 11:13:38,369] INFO Created log for partition [my-test-1,0] in /tmp/kafka-logs-1 with properties {segment.index.bytes -& , file.delete.delay.ms -& 60000, segment.bytes -& , flush.ms -& 4775807, delete.retention.ms -& , index.interval.bytes -& 4096, retention.bytes -& -1, min.insync.replicas -& 1, cleanup.policy -& delete, unclean.leader.election.enable -& true, segment.ms -& , max.message.bytes -& 1000012, flush.messages -& 4775807, min.cleanable.dirty.ratio -& 0.5, retention.ms -& , segment.jitter.ms -& 0}. (kafka.log.LogManager)
[ 11:13:38,370] WARN Partition [my-test-1,0] on broker 0: No checkpointed highwatermark is found for partition [my-test-1,0] (kafka.cluster.Partition)
[ 11:13:38,378] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [my-test-1,0] (kafka.server.ReplicaFetcherManager)
[ 11:13:38,383] INFO Truncating log my-test-1-0 to offset 0. (kafka.log.Log)
[ 11:13:38,422] INFO [ReplicaFetcherThread-0-1], Starting
(kafka.server.ReplicaFetcherThread)
[ 11:13:38,427] INFO [ReplicaFetcherManager on broker 0] Added fetcher for partitions List([[my-test-1,0], initOffset 0 to broker id:1,host:m2,port:9092] ) (kafka.server.ReplicaFetcherManager)
[ 11:13:38,530] ERROR [ReplicaFetcherThread-0-1], Error for partition [my-test-1,0] to broker 1:mon.UnknownException (kafka.server.ReplicaFetcherThread)
创建一个分区会报错,分区数太少 checkpoint警报。。。。
# bin/kafka-topics.sh --create --zookeeper m1:1,m3:2181 --replication-factor 3 --partitions 5 --topic my-test-2
Created topic &my-test-2&.
[ 11:14:33,645] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [my-test-2,2] (kafka.server.ReplicaFetcherManager)
[ 11:14:33,676] INFO Completed load of log my-test-2-2 with log end offset 0 (kafka.log.Log)
[ 11:14:33,684] INFO Created log for partition [my-test-2,2] in /tmp/kafka-logs-1 with properties {segment.index.bytes -& , file.delete.delay.ms -& 60000, segment.bytes -& , flush.ms -& 4775807, delete.retention.ms -& , index.interval.bytes -& 4096, retention.bytes -& -1, min.insync.replicas -& 1, cleanup.policy -& delete, unclean.leader.election.enable -& true, segment.ms -& , max.message.bytes -& 1000012, flush.messages -& 4775807, min.cleanable.dirty.ratio -& 0.5, retention.ms -& , segment.jitter.ms -& 0}. (kafka.log.LogManager)
[ 11:14:33,684] WARN Partition [my-test-2,2] on broker 0: No checkpointed highwatermark is found for partition [my-test-2,2] (kafka.cluster.Partition)
[root@m1 kafka]# [ 11:14:33,731] INFO Completed load of log my-test-2-3 with log end offset 0 (kafka.log.Log)
[ 11:14:33,738] INFO Created log for partition [my-test-2,3] in /tmp/kafka-logs-1 with properties {segment.index.bytes -& , file.delete.delay.ms -& 60000, segment.bytes -& , flush.ms -& 4775807, delete.retention.ms -& , index.interval.bytes -& 4096, retention.bytes -& -1, min.insync.replicas -& 1, cleanup.policy -& delete, unclean.leader.election.enable -& true, segment.ms -& , max.message.bytes -& 1000012, flush.messages -& 4775807, min.cleanable.dirty.ratio -& 0.5, retention.ms -& , segment.jitter.ms -& 0}. (kafka.log.LogManager)
[ 11:14:33,742] WARN Partition [my-test-2,3] on broker 0: No checkpointed highwatermark is found for partition [my-test-2,3] (kafka.cluster.Partition)
[ 11:14:33,746] INFO Completed load of log my-test-2-0 with log end offset 0 (kafka.log.Log)
[ 11:14:33,753] INFO Created log for partition [my-test-2,0] in /tmp/kafka-logs-1 with properties {segment.index.bytes -& , file.delete.delay.ms -& 60000, segment.bytes -& , flush.ms -& 4775807, delete.retention.ms -& , index.interval.bytes -& 4096, retention.bytes -& -1, min.insync.replicas -& 1, cleanup.policy -& delete, unclean.leader.election.enable -& true, segment.ms -& , max.message.bytes -& 1000012, flush.messages -& 4775807, min.cleanable.dirty.ratio -& 0.5, retention.ms -& , segment.jitter.ms -& 0}. (kafka.log.LogManager)
[ 11:14:33,753] WARN Partition [my-test-2,0] on broker 0: No checkpointed highwatermark is found for partition [my-test-2,0] (kafka.cluster.Partition)
[ 11:14:33,761] INFO Completed load of log my-test-2-4 with log end offset 0 (kafka.log.Log)
[ 11:14:33,768] INFO Created log for partition [my-test-2,4] in /tmp/kafka-logs-1 with properties {segment.index.bytes -& , file.delete.delay.ms -& 60000, segment.bytes -& , flush.ms -& 4775807, delete.retention.ms -& , index.interval.bytes -& 4096, retention.bytes -& -1, min.insync.replicas -& 1, cleanup.policy -& delete, unclean.leader.election.enable -& true, segment.ms -& , max.message.bytes -& 1000012, flush.messages -& 4775807, min.cleanable.dirty.ratio -& 0.5, retention.ms -& , segment.jitter.ms -& 0}. (kafka.log.LogManager)
[ 11:14:33,769] WARN Partition [my-test-2,4] on broker 0: No checkpointed highwatermark is found for partition [my-test-2,4] (kafka.cluster.Partition)
[ 11:14:33,788] INFO Completed load of log my-test-2-1 with log end offset 0 (kafka.log.Log)
[ 11:14:33,800] INFO Created log for partition [my-test-2,1] in /tmp/kafka-logs-1 with properties {segment.index.bytes -& , file.delete.delay.ms -& 60000, segment.bytes -& , flush.ms -& 4775807, delete.retention.ms -& , index.interval.bytes -& 4096, retention.bytes -& -1, min.insync.replicas -& 1, cleanup.policy -& delete, unclean.leader.election.enable -& true, segment.ms -& , max.message.bytes -& 1000012, flush.messages -& 4775807, min.cleanable.dirty.ratio -& 0.5, retention.ms -& , segment.jitter.ms -& 0}. (kafka.log.LogManager)
[ 11:14:33,800] WARN Partition [my-test-2,1] on broker 0: No checkpointed highwatermark is found for partition [my-test-2,1] (kafka.cluster.Partition)
[ 11:14:33,804] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [my-test-2,3],[my-test-2,0],[my-test-2,4],[my-test-2,1] (kafka.server.ReplicaFetcherManager)
[ 11:14:33,808] INFO Truncating log my-test-2-3 to offset 0. (kafka.log.Log)
[ 11:14:33,808] INFO Truncating log my-test-2-0 to offset 0. (kafka.log.Log)
[ 11:14:33,808] INFO Truncating log my-test-2-4 to offset 0. (kafka.log.Log)
[ 11:14:33,808] INFO Truncating log my-test-2-1 to offset 0. (kafka.log.Log)
[ 11:14:33,834] INFO [ReplicaFetcherManager on broker 0] Added fetcher for partitions List([[my-test-2,3], initOffset 0 to broker id:1,host:m2,port:9092] , [[my-test-2,0], initOffset 0 to broker id:1,host:m2,port:9092] , [[my-test-2,4], initOffset 0 to broker id:2,host:m3,port:9092] , [[my-test-2,1], initOffset 0 to broker id:2,host:m3,port:9092] ) (kafka.server.ReplicaFetcherManager)
[ 11:14:33,836] INFO [ReplicaFetcherThread-0-2], Starting
(kafka.server.ReplicaFetcherThread)
五、查看状态:
# bin/kafka-topics.sh --describe --zookeeper m1:1,m3:2181 --topic my-test-1
Topic:my-test-1 PartitionCount:1 ReplicationFactor:3
Topic: my-test-1 Partition: 0
Replicas: 1,2,0 Isr: 1,2,0
[root@m1 kafka]# bin/kafka-topics.sh --describe --zookeeper m1:1,m3:2181 --topic my-test-2
Topic:my-test-2 PartitionCount:5 ReplicationFactor:3
Topic: my-test-2 Partition: 0
Replicas: 1,2,0 Isr: 1,2,0
Topic: my-test-2 Partition: 1
Replicas: 2,0,1 Isr: 2,0,1
Topic: my-test-2 Partition: 2
Replicas: 0,1,2 Isr: 0,1,2
Topic: my-test-2 Partition: 3
Replicas: 1,0,2 Isr: 1,0,2
Topic: my-test-2 Partition: 4
Replicas: 2,1,0 Isr: 2,1,0
[root@m1 kafka]# bin/kafka-topics.sh --describe --zookeeper m1:1,m3:2181 --topic my-test-3
Topic:my-test-3 PartitionCount:3 ReplicationFactor:3
Topic: my-test-3 Partition: 0
Replicas: 2,0,1 Isr: 2,0,1
Topic: my-test-3 Partition: 1
Replicas: 0,1,2 Isr: 0,1,2
Topic: my-test-3 Partition: 2
Replicas: 1,2,0 Isr: 1,2,0
# bin/kafka-topics.sh --list --zookeeper m1:1,m3:2181
test_group1
六、作为生产者发送消息:
# bin/kafka-console-producer.sh --broker-list m1:2,m3:9092 --topic my-test-2
[ 11:32:42,160] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
hello world
my test message1
my test message2
my test message3
七、消费topic数据:
[root@m1 kafka]# bin/kafka-console-consumer.sh --zookeeper m1:1,m3:2181 --from-beginning --topic my-test-2
hello world
my test message1
my test message2
my test message3
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致出现以下的错124
[main] INFO
kafka.utils.VerifiableProperties - Verifying properties157
[main] INFO
kafka.utils.VerifiableProperties - Property compression.codec is overridden to 1157
[main] INFO
kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to 112.74.109.244:9092157
[main] INFO
kafka.utils.VerifiableProperties - Property producer.type is overridden to async157
[main] INFO
kafka.utils.VerifiableProperties - Property serializer.class is overridden to kafka.serializer.StringEncoder158
[main] WARN
kafka.utils.VerifiableProperties - Property zk.connect is not valid203
[main] INFO
kafka.producer.Producer - Shutting down producer204
[main] INFO
kafka.producer.async.ProducerSendThread - Begin shutting down ProducerSendThread272
[ProducerSendThread-] INFO
kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:112.74.109.244,port:9092 with correlation id 0 for 1 topic(s) Set(order)293
[ProducerSendThread-] INFO
kafka.producer.SyncProducer - Connected to 112.74.109.244:9092 for producing327
[ProducerSendThread-] INFO
kafka.producer.SyncProducer - Disconnecting from 112.74.109.244:90922629 [ProducerSendThread-] ERROR kafka.producer.SyncProducer - Producer connection to iZ943qtt41eZ:9092 unsuccessfuljava.nio.channels.UnresolvedAddressException: nullat sun.nio.ch.Net.checkAddress(Net.java:29) ~[na:1.6.0_43]at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) ~[na:1.6.0_43]at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.send(SyncProducer.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.Iterator$class.foreach(Iterator.scala:772) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) [scala-library-2.9.2.jar:na]at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [kafka_2.9.2-0.8.1.1.jar:na]2640 [ProducerSendThread-] WARN
kafka.producer.async.DefaultEventHandler - Failed to send producer request with correlation id 2 to broker 0 with data for partitions [order,0]java.nio.channels.UnresolvedAddressException: nullat sun.nio.ch.Net.checkAddress(Net.java:29) ~[na:1.6.0_43]at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) ~[na:1.6.0_43]at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.send(SyncProducer.scala:100) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.Iterator$class.foreach(Iterator.scala:772) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) [scala-library-2.9.2.jar:na]at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [kafka_2.9.2-0.8.1.1.jar:na]2651 [ProducerSendThread-] INFO
kafka.producer.async.DefaultEventHandler - Back off for 100 ms before retrying send. Remaining retries = 32754 [ProducerSendThread-] INFO
kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:112.74.109.244,port:9092 with correlation id 3 for 1 topic(s) Set(order)2769 [ProducerSendThread-] INFO
kafka.producer.SyncProducer - Connected to 112.74.109.244:9092 for producing3094 [ProducerSendThread-] INFO
kafka.producer.SyncProducer - Disconnecting from 112.74.109.244:90923098 [ProducerSendThread-] ERROR kafka.producer.SyncProducer - Producer connection to iZ943qtt41eZ:9092 unsuccessfuljava.nio.channels.UnresolvedAddressException: nullat sun.nio.ch.Net.checkAddress(Net.java:29) ~[na:1.6.0_43]at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) ~[na:1.6.0_43]at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.send(SyncProducer.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.Iterator$class.foreach(Iterator.scala:772) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) [scala-library-2.9.2.jar:na]at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [kafka_2.9.2-0.8.1.1.jar:na]3101 [ProducerSendThread-] WARN
kafka.producer.async.DefaultEventHandler - Failed to send producer request with correlation id 5 to broker 0 with data for partitions [order,0]java.nio.channels.UnresolvedAddressException: nullat sun.nio.ch.Net.checkAddress(Net.java:29) ~[na:1.6.0_43]at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) ~[na:1.6.0_43]at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.send(SyncProducer.scala:100) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.Iterator$class.foreach(Iterator.scala:772) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) [scala-library-2.9.2.jar:na]at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [kafka_2.9.2-0.8.1.1.jar:na]3103 [ProducerSendThread-] INFO
kafka.producer.async.DefaultEventHandler - Back off for 100 ms before retrying send. Remaining retries = 23203 [ProducerSendThread-] INFO
kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:112.74.109.244,port:9092 with correlation id 6 for 1 topic(s) Set(order)3220 [ProducerSendThread-] INFO
kafka.producer.SyncProducer - Connected to 112.74.109.244:9092 for producing3541 [ProducerSendThread-] INFO
kafka.producer.SyncProducer - Disconnecting from 112.74.109.244:90923544 [ProducerSendThread-] ERROR kafka.producer.SyncProducer - Producer connection to iZ943qtt41eZ:9092 unsuccessfuljava.nio.channels.UnresolvedAddressException: nullat sun.nio.ch.Net.checkAddress(Net.java:29) ~[na:1.6.0_43]at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) ~[na:1.6.0_43]at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.send(SyncProducer.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.Iterator$class.foreach(Iterator.scala:772) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) [scala-library-2.9.2.jar:na]at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [kafka_2.9.2-0.8.1.1.jar:na]3546 [ProducerSendThread-] WARN
kafka.producer.async.DefaultEventHandler - Failed to send producer request with correlation id 8 to broker 0 with data for partitions [order,0]java.nio.channels.UnresolvedAddressException: nullat sun.nio.ch.Net.checkAddress(Net.java:29) ~[na:1.6.0_43]at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) ~[na:1.6.0_43]at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.send(SyncProducer.scala:100) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.Iterator$class.foreach(Iterator.scala:772) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) [scala-library-2.9.2.jar:na]at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [kafka_2.9.2-0.8.1.1.jar:na]3547 [ProducerSendThread-] INFO
kafka.producer.async.DefaultEventHandler - Back off for 100 ms before retrying send. Remaining retries = 13648 [ProducerSendThread-] INFO
kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:112.74.109.244,port:9092 with correlation id 9 for 1 topic(s) Set(order)6661 [ProducerSendThread-] INFO
kafka.producer.SyncProducer - Connected to 112.74.109.244:9092 for producing9698 [ProducerSendThread-] INFO
kafka.producer.SyncProducer - Disconnecting from 112.74.109.244:90929702 [ProducerSendThread-] ERROR kafka.producer.SyncProducer - Producer connection to iZ943qtt41eZ:9092 unsuccessfuljava.nio.channels.UnresolvedAddressException: nullat sun.nio.ch.Net.checkAddress(Net.java:29) ~[na:1.6.0_43]at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) ~[na:1.6.0_43]at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.send(SyncProducer.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.Iterator$class.foreach(Iterator.scala:772) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) [scala-library-2.9.2.jar:na]at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [kafka_2.9.2-0.8.1.1.jar:na]9705 [ProducerSendThread-] WARN
kafka.producer.async.DefaultEventHandler - Failed to send producer request with correlation id 11 to broker 0 with data for partitions [order,0]java.nio.channels.UnresolvedAddressException: nullat sun.nio.ch.Net.checkAddress(Net.java:29) ~[na:1.6.0_43]at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) ~[na:1.6.0_43]at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.connect(SyncProducer.scala:141) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.SyncProducer.send(SyncProducer.scala:100) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) [scala-library-2.9.2.jar:na]at scala.collection.Iterator$class.foreach(Iterator.scala:772) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) [scala-library-2.9.2.jar:na]at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) [scala-library-2.9.2.jar:na]at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [kafka_2.9.2-0.8.1.1.jar:na]9706 [ProducerSendThread-] INFO
kafka.producer.async.DefaultEventHandler - Back off for 100 ms before retrying send. Remaining retries = 09807 [ProducerSendThread-] INFO
kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host:112.74.109.244,port:9092 with correlation id 12 for 1 topic(s) Set(order)9821 [ProducerSendThread-] INFO
kafka.producer.SyncProducer - Connected to 112.74.109.244:9092 for producing10137 [ProducerSendThread-] INFO
kafka.producer.SyncProducer - Disconnecting from 112.74.109.244:909210138 [ProducerSendThread-] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics order with correlation ids in [0,12]10139 [ProducerSendThread-] ERROR kafka.producer.async.ProducerSendThread - Error in handling batch of 1 eventsmon.FailedToSendMessageException: Failed to send messages after 3 tries.at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) ~[kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:93) [kafka_2.9.2-0.8.1.1.jar:na]at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [kafka_2.9.2-0.8.1.1.jar:na]10139 [main] INFO
kafka.producer.async.ProducerSendThread - Shutdown ProducerSendThread complete10139 [main] INFO
kafka.producer.ProducerPool - Closing all sync producers这问题搞了我很多久,主要没仔细看日志,当我认真看时,注要看到一句 kafka.producer.SyncProducer - Producer connection to iZ943qtt41eZ:9092 unsuccessful然后我就想这应是识别不了主机名的原因,然后在 hosts文件上加上就可以了。当不知为什么他要用主机名,明明我是用了IP的另一个错是消费者里报的:ERROR backtype.storm.daemon.executor - java.lang.NoClassDefFoundError: org/I0Itec/zkclient/serialize/ZkSerializer找不到文件,这是在zkclient.jar里,加上就行了,下载地址如下:
如果您想留下此文,您可以将其发送至您的邮箱(将同时以邮件内容&PDF形式发送)
相关文章推荐
(Ctrl+Enter提交) &&
已有0人在此发表见解
&在& 18:47收藏到了
&&在信息爆炸的时代,您的知识需要整理,沉淀,积累!Lai18为您提供一个简单实用的文章整理收藏工具,在这里您可以收藏对您有用的技术文章,自由分门别类,在整理的过程中,用心梳理自己的知识!相信,用不了多久,您收藏整理的文章将是您一生的知识宝库!
· 蜀ICP备号-1}

我要回帖

更多关于 kafka 异步发送消息 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信