ITKeyword,专注技术干货聚合推荐

注册 | 登录

kafka 生产者消费者 api接口

u010220089 分享于 2015-07-29

推荐:Kafka之Java API-生产者(Producers)

配置完Kafka集群后,下面通过Java API的方式来操作 需要导入的Jar包 kafka_2.10-0.8.1.1.jar log4j-1.2.15.jar metrics-core-2.2.0.jar scala-l

2019阿里云双11返场狂欢继续,
地址https://www.aliyun.com/1111/2019/home

生产者

import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
public class KafkaProducer  extends Thread{
private String topic;

//构造方法
public KafkaProducer(String topic) {
this.topic = topic;
}


@Override
public void run() {
Producer<Integer, String> producer = createProducer();
for(int i =1 ;i<=10;i++){
String message = "message"+i; 
producer.send(new KeyedMessage<Integer, String>(topic, message));
System.out.println("生产者生成---------->"+message);

try {
//睡眠 1S
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


public Producer createProducer() {

Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "192.168.80.20:2181,192.168.80.21:2181,192.168.80.22:2181");
// properties.setProperty("zookeeper.connect", "cloud1:2181,cloud2:2181,cloud3:2181");
properties.setProperty("serializer.class", StringEncoder.class.getName());
//properties.setProperty("serializer.class", "kafka.serializer.StringEncoder");
properties.setProperty("metadata.broker.list", "192.168.80.20:9092");
return new Producer<Integer, String>(new ProducerConfig(properties));
}
public static void main(String[] args) {
new KafkaProducer("test").start();
}
}



消费者

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaComsumer extends Thread {
private String topic;

public KafkaComsumer(String topic) {
this.topic = topic;
}


@Override
public void run() {
ConsumerConnector consumer = createConsumer();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap );
KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();

while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("接受到的信息------------》"+message);

}


}


public ConsumerConnector createConsumer() {
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "192.168.80.20:2181,192.168.80.21:2181,192.168.80.22:2181");
properties.setProperty("group.id", "group1");
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
public static void main(String[] args) {
new KafkaComsumer("test").start();
}
}

推荐:分布式消息中间件(三)——Kafka生产者消费者模型

一、Kafka回顾 1、AMQP协议      消息队列中消息交互规范,多数分布式消息中间件基于该协议进行消息传输 2、Broker      对于kafka,将生产者发送的消息,动态的

生产者 import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.String

相关阅读排行


用户评论

游客

相关内容推荐

最新文章

×

×

请激活账号

为了能正常使用评论、编辑功能及以后陆续为用户提供的其他产品,请激活账号。

您的注册邮箱: 修改

重新发送激活邮件 进入我的邮箱

如果您没有收到激活邮件,请注意检查垃圾箱。