亚洲精品久久久中文字幕-亚洲精品久久片久久-亚洲精品久久青草-亚洲精品久久婷婷爱久久婷婷-亚洲精品久久午夜香蕉

您的位置:首頁技術文章
文章詳情頁

Java操作Kafka執行不成功

瀏覽:60日期:2023-12-29 14:49:40

問題描述

使用kafka-clients操作kafka始終不成功,原因不清楚,下面貼出相關代碼及配置,請懂得指點一下,謝謝!

環境及依賴

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version></dependency>

JDK版本為1.8、Kafka版本為2.12-0.10.2.0,服務器使用CentOS-7構建。

測試代碼

TestBase.java

public class TestBase { protected Logger log = LoggerFactory.getLogger(this.getClass()); protected String kafka_server = '192.168.60.160:9092' ; protected String topic = 'zlikun_topic';}

ProducerTest.java

public class ProducerTest extends TestBase { protected Properties props = new Properties(); @Before public void init() {props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);props.put(ProducerConfig.ACKS_CONFIG, 'all');props.put(ProducerConfig.RETRIES_CONFIG, 0);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ; } @Test public void test() throws InterruptedException {KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 發送消息for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) {System.out.printf('offset = %d ,partition = %d n', recordMetadata.offset() ,recordMetadata.partition()); } else {log.error('send error !' ,e); }} });}TimeUnit.SECONDS.sleep(3);producer.close(); }}

ConsumerTest.java

public class ConsumerTest extends TestBase { private Properties props = new Properties(); @Before public void init() {props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);props.put(ConsumerConfig.GROUP_ID_CONFIG ,'zlikun') ;props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 'true');props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, '1000');props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); } @Test public void test() {Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));//consumer.assign(Arrays.asList(new TopicPartition(topic, 1)));while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) {System.out.printf('offset = %d, key = %s, value = %s%n', record.offset(), record.key(), record.value()); }} }}問題

# 測試topic為手動創建$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic

控制臺輸出信息

[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time

問題解答

回答1:

測試了下, 正常 https://github.com/MOBX/kafka...

建議檢查下kafka集群連接是否正常,你報的是TimeoutException;如果不行, kafka-clients降到0.8.2.0試試

回答2:

我把日志調成DEBUG級別,觀察日志發現是不能正確解析主機名造成的。

2017-04-11 13:49:46.046 [main] DEBUG org.apache.kafka.clients.NetworkClient - Error connecting to node 0 at m160:9092:java.io.IOException: Can’t resolve address: m160:9092 at org.apache.kafka.common.network.Selector.connect(Selector.java:182) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629) at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:57) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:768) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:684) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:347) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at com.zlikun.mq.ConsumerTest.test(ConsumerTest.java:34) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:107) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:649) at org.apache.kafka.common.network.Selector.connect(Selector.java:179) ... 36 more

網上找到一篇博文http://blog.sina.com.cn/s/blo...也支持了這一點,同樣我是在hosts文件中配置了主機名,測試就正常了。不過感覺這樣做似乎不太合理,實際應用中這樣用,太影響運維了吧,不知道有沒有其它更好的解決辦法。

[2017/04/11 16:16]剛從網上找到一篇文章http://www.tuicool.com/articl...,解決了這個問題!

標簽: java
相關文章:
主站蜘蛛池模板: 玖玖香蕉视频 | 亚洲黄色小视频 | 亚洲色图在线视频 | 亚洲欧美中文日韩二区一区 | 色综合久久亚洲国产日韩 | 邪恶工番口番大全全彩色 | 日韩高清在线日韩大片观看网址 | 国产美女久久久亚洲 | 国模私拍福利一区二区 | 1024手机在线观看视频 | 久久99精品久久久久久国产越南 | 亚洲第一黄色网 | 日本美女视频韩国视频网站免费 | 婷婷中文 | japanxxxx日本中文字幕 | 国产日韩欧美在线观看 | 综合九九 | 日韩a级在线 | 日本黄色免费在线观看 | 亚洲最大的黄色网址 | 黄色欧美视频 | 亚洲国产精品婷婷久久久久 | 91视频免费网址 | 成人做爰又黄又爽免费视频 | 久久久久青草大香线综合精品 | 2022国产成人福利精品视频 | 欧美一区=区三区 | 美国免费黄色片 | 91网视频在线观看 | 伊人久久综在合线亚洲91 | 哪个网站能看毛片 | 中文婷婷 | 91在线国内在线播放老师 | 九九热这里只有国产精品 | 亚洲成人黄色 | 之后3在线观看完整免费酷客 | 亚洲欧美日本国产综合在线 | 456极品嫩模在线视频 | 日韩手机在线视频 | 国产九九免费视频 | 91久久亚洲国产成人精品性色 |