java连接zookeeper报 KeeperErrorCode = AuthFailed

问题出现的环境背景及自己尝试过哪些方法

zookeeper已经启动

相关代码

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

/**

@author leicui bourne_cui@163.com

*/

public class Test extends Thread {

private final ConsumerConnector consumer;

private final String topic;

public Test(String topic)

{

consumer = Consumer.createJavaConsumerConnector(

createConsumerConfig());

this.topic = topic;

}

private static ConsumerConfig createConsumerConfig()

{

Properties props = new Properties();

props.put("zookeeper.connect", "10.73.241.253:2181");

props.put("group.id", "test-consumer-group");

// props.put("zookeeper.session.timeout.ms", "40000");

// props.put("zookeeper.sync.time.ms", "200");

// props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);

}

@Override

public void run() {

Map topicCountMap = new HashMap();

topicCountMap.put(topic, new Integer(1));

Map>> consumerMap = consumer.createMessageStreams(topicCountMap);

System.out.println("dssssssssssssss");

KafkaStream stream = consumerMap.get(topic).get(0);

ConsumerIterator it = stream.iterator();

System.out.println("ddddd");

while (it.hasNext()) {

System.out.println("receive:" + new String(it.next().message()));

try {

sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

public static void main(String[] args) {

new Test("test-topic").start();

}

}

你期待的结果是什么?实际看到的错误信息又是什么?

Logo

一站式 AI 云服务平台

更多推荐