Kafka消费者API的应用
需求:使用Kafka消费者API分别完成自动提交offset、手动同步提交offset和手动异步提交offset方式消费主题englishscore中的消息,并打印消费消息的offset、键和值的信息
编写Kafka消费者API分别完成自动提交offset并打印消费消息的offset、键和值的信息。创建类CustomConsumer完成此功能,编写此类的代码
package cn.lesson;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
props.put("group.id", "test020");
props.put("enable.auto.commit", "true");
//earliest
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none
//topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("englishscore"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
2.运行CustomConsumer类,并截图查看执行效果

3.编写Kafka消费者API完成手动同步提交offset并打印消费消息的offset、键和值的信息。创建类CustomConsumer1完成此功能,编写此类的代码
package cn.lesson;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumer1 {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092");
//earliest
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none
//topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
props.put("group.id", "mytopic");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");//关闭自动提交 offset
//props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("englishscore"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
//同步提交,当前线程会阻塞直到 offset 提交成功
consumer.commitSync();
}
}
}
4.运行CustomConsumer1类,并截图查看执行效果

5.编写Kafka消费者API完成手动异步提交offset并打印消费消息的offset、键和值的信息。创建类CustomConsumer2完成此功能,编写此类的代码
package cn.lesson;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class CustomConsumer2 {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "node01:9092,node02:9092");
//earliest
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
//latest
//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
//none
//topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
props.put("group.id", "test15");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");//关闭自动提交 offset
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("englishscore"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);//消费者拉取数据
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
//异步提交
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition,
OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for" +
offsets);
}
}
});
}
}
}
6.kafka控制台启动生产者进程,并放入消息
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092 --topic englishscore

7.IDEA控制台,查看效果并截图
