章
目
录
Apache Kafka一直占据着消息队列技术方向重要地位。2025年3月18日,Kafka迎来了4.0版本的更新,这次更新带来了许多令人瞩目的新特性,对开发者和运维人员来说意义重大。下面咱们就详细聊聊这些新变化。
一、告别ZooKeeper,开启KRaft
在Kafka 4.0版本中,最大的亮点之一就是默认运行在KRaft模式下,这意味着它不再依赖Apache ZooKeeper。以前,ZooKeeper的存在虽然为Kafka提供了协调服务,但也增加了部署和管理的复杂性。现在,Kafka 4.0去掉了这一依赖,使得整个部署过程变得更加简单直接。运维人员不用再花费大量精力去维护ZooKeeper集群,降低了运营成本的同时,还提高了系统的可扩展性,管理任务也大大简化了。
二、新消费者组协议
Kafka 4.0引入了新的消费者组协议(KIP – 848),这个更新主要是为了优化重新平衡性能。在大规模部署场景下,消费者组的重新平衡操作经常会导致停机时间延长和延迟增加。而新协议将相关逻辑转移到了代理端,很好地解决了这些问题,提升了消费者组的可靠性和响应速度。简单来说,就是在处理大量消息时,系统能够更加稳定、高效地运行。
三、支持队列功能
Kafka 4.0新增的队列功能(KIP – 932),支持了传统队列语义。以前,Kafka在消息处理模式上有一定的局限性,现在通过这个功能,允许多个消费者协同处理同一个分区的消息。这就使得Kafka可以应用在更多场景中,尤其是那些需要点对点消息模式的场景,它变成了一个更通用的消息平台。
四、Java版本要求升级
随着Kafka 4.0的发布,对Java版本的要求也有所改变。Kafka客户端和Kafka Streams现在需要Java 11的支持,而Kafka代理、Connect和相关工具则需要Java 17。在项目升级到Kafka 4.0时,开发者需要注意确保Java环境符合要求,以免出现兼容性问题。
五、API更新
为了让平台更加简洁,同时鼓励开发者采用新功能,Kafka 4.0删除了至少12个月前被废弃的API。这一操作虽然可能会对部分依赖旧API的项目造成一定影响,但从长远来看,有助于Kafka生态系统的健康发展,让开发者能够更快地接触和使用到新的、更强大的功能。
六、代码示例
使用Kafka 4.0的KRaft模式创建主题
下面这段代码展示了如何在KRaft模式下创建Kafka主题:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
public class KafkaTopicCreator {
public static void main(String[] args) {
// 配置Kafka连接属性
Properties props = new Properties();
// 设置Kafka服务器地址
props.put("bootstrap.servers", "localhost:9092");
// 设置消息确认机制
props.put("acks", "all");
// 创建AdminClient实例,用于管理Kafka集群
AdminClient adminClient = AdminClient.create(props);
// 创建一个新的主题,指定主题名为my-topic,分区数为1,副本因子为1
NewTopic newTopic = new NewTopic("my-topic", 1, (short) 1);
// 创建主题并获取创建结果
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));
try {
// 输出主题创建结果
System.out.println("Topic created: " + result.all().get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
使用新消费者组协议
接下来的代码示例展示了如何使用新消费者组协议:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置消费者属性
Properties props = new Properties();
// 设置Kafka服务器地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 设置消费者组ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
// 设置键的反序列化类
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置值的反序列化类
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建Kafka消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题my-topic
consumer.subscribe(Collections.singleton("my-topic"));
while (true) {
// 拉取消息,设置拉取超时时间为100毫秒
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 输出消息内容
System.out.println(record.value());
}
// 同步提交消费位移,确保消息被正确处理
consumer.commitSync();
}
}
}
队列功能示例
下面的代码展示了如何在Kafka中实现类似队列的行为:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaQueueExample {
public static void main(String[] args) {
// 配置消费者属性
Properties props = new Properties();
// 设置Kafka服务器地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 设置消费者组ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-queue-group");
// 设置键的反序列化类
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置值的反序列化类
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建Kafka消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题my-queue-topic
consumer.subscribe(Collections.singleton("my-queue-topic"));
while (true) {
// 拉取消息,设置拉取超时时间为100毫秒
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 输出消息内容
System.out.println(record.value());
// 处理消息后,手动确认以避免重复消费
consumer.commitSync(Collections.singleton(record));
}
}
}
}
通过这些示例,希望能帮助大家更好地理解和使用Kafka 4.0的新特性。如果你在实际应用中遇到问题,欢迎在评论区留言讨论。