Kafka 简单使用(2)
1. Kafka 命令行工具
Kafka 提供了一些命令行工具,可以对 Kafka 进行管理,其实是 shell 脚本,放在 bin 目录下:
1.1 Topic 相关
创建 topic
kafka-topics.sh --zookeeper zoo1:2181 --create --topic "ustc" --partitions 3 --replication-factor 2
修改 topic
# 增加分区数 kafka-topics.sh --bootstrap-server kafka2:9093 --alter --topic "ustc" --partitions 5
查看 topic 列表
$ kafka-topics.sh --bootstrap-server kafka2:9093 --list __consumer_offsets insert_order ustc
- 可以看到一个名为
__consumer_offsets
的主题,Kafka 0.9.0.0 以后,topic 的 offset 信息不再存放在 zookeeper 中,而是放在该 topic中 - –zookeeper zookeeper:port 也可以用 –bootstrap-server kafka:port 来替换
- 可以看到一个名为
查看 topic 具体信息
kafka-topics.sh --bootstrap-server kafka1:9092 --describe --topic "ustc"
删除 topic
kafka-topics.sh --zookeeper zoo1:2181 --delete --topic "ustc"
1.2 生产者 Producer
创建生产者
kafka-console-producer.sh --bootstrap-server kafka1:9092 --topic "ustc"
1.3 消费组 Consumer
创建消费者
kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic insert_order
其它参数
- –from-beginning:从头开始消费
- –offset <offset>: 从指定的 offset 开始消费
- –group <group id>: 指定消费者组
1.4 消费组组
查看消费组组列表
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --list
查看消费组的具体信息
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --describe --group myId
删除组
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 --group myId --delete
2. 生产者 Java 客户端
2.1 环境搭建
依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.7.1</version> </dependency>
生产者
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "222.195.87.224:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer<>(props);
发送消息
ProducerRecord<String, String> record = new ProducerRecord<>("user", UUID.randomUUID().toString(), "hello world"); RecordMetadata metadata = null; try { metadata = producer.send(record).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } log.info("topic: {}, partition: {}, offset: {}", metadata.topic(), metadata.partition(), metadata.offset());
2.2 同步发送消息
同步的两种方式
- 上面代码用的是同步发送消息的方式,在收到kafka的ack通知之前⼀直处于阻塞状态
- 另一种方式是发送后不等待通知
同步的第二种方式
userKafkaProducer.send(record); userKafkaProducer.close();
2.3 异步发送消息
说明
- Producer 提供了异步发送消息的方法
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
callback
是一个接口,用于回调- 异步可能会有消息丢失的问题,使用较少
- Producer 提供了异步发送消息的方法
异步发送
producer.send(record, (RecordMetadata metadata, Exception exception)->{ System.out.println("进入异步发送回调函数"); if(exception != null){ System.out.println("出现异常:"+exception.getMessage()); } System.out.println(String.format("发送结果:topic:%s,存储的partition:%s,offset:%s", metadata.topic(), metadata.partition(), metadata.offset())); });
2.4 生产者参数
消息确认 ack 设置
props.put(ProducerConfig.ACKS_CONFIG, "0");
发送失败重试设置
// 重试次数 props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试时间间隔, 默认 100 ms props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 300);
缓冲区相关设置
// 消息缓冲区,先将要发送的消息存入缓冲区,提高性能,默认32M(单位byte) props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 本地线程拉取该参数指定的大小的数据发送到 broker props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16); // batch 最大等待时间,数据达不到指定大小时超时也会发出(ms) props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
3. 消费者 Java 客户端
3.1 环境搭建
依赖同上
消费者
@Slf4j public class AnotherConsumer { public static final String TOPIC_NAME = "user"; public static final String CONSUMER_GROUP_NAME = "testGroup"; public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "222.195.87.224:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(TOPIC_NAME)); while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { log.info("partition:{}, offset:{}, key:{}, value:{}", record.partition(), record.offset(), record.key(), record.value()); } } } }
3.2 消费者参数
自动提交 offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 时间间隔 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
手动提交 offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
手动提交可以同步也可以异步:
/** * 同步提交 */ if (records.count() > 0){ consumer.commitSync();; }
/** * 异步提交 */ if (records.count() > 0){ consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { // ... } }); }
poll() 消息
// 一次 poll 的最大消息条数 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
长轮询时间内未 poll 够指定数量消息则继续poll;
超过长轮询时间仍未 poll 到指定数量的消息,则直接向下执行:// 长轮询时间 consumer.poll(Duration.ofMillis(1000));
reblance相关
// 两次poll的时间间隔超出该时间(30s)则消费者会被踢出消费者组 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);
// 消费者每1s向集群发送心跳,超过10s未收到则踢出 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);
指定分区等
// 指定分区 consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 1))); // 回溯消费(从头开始) consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 1))); // 指定 offset consumer.seek(new TopicPartition(TOPIC_NAME, 1), 1);
指定时间点
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME); //从1⼩时前开始消费 long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; Map<TopicPartition, Long> map = new HashMap<>(); for (PartitionInfo par : topicPartitions) { map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime); } Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) { TopicPartition key = entry.getKey(); OffsetAndTimestamp value = entry.getValue(); if (key == null || value == null) continue; Long offset = value.offset(); System.out.println("partition-" + key.partition() + "|offset-" + offset); //根据消费⾥的timestamp确定offset if (value != null) { consumer.assign(Arrays.asList(key)); consumer.seek(key, offset); ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("partition:%s, offset:%s, key:%s, value:%s", record.partition(), record.offset(), record.key(), record.value()) ); } } }
新消费者 offset 设置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
- latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据