Apache Kafka 4.0 新特性详解示例

后端 潘老师 1个月前 (03-22) 119 ℃ (0) 扫码查看

Apache Kafka一直占据着消息队列技术方向重要地位。2025年3月18日,Kafka迎来了4.0版本的更新,这次更新带来了许多令人瞩目的新特性,对开发者和运维人员来说意义重大。下面咱们就详细聊聊这些新变化。

一、告别ZooKeeper,开启KRaft

在Kafka 4.0版本中,最大的亮点之一就是默认运行在KRaft模式下,这意味着它不再依赖Apache ZooKeeper。以前,ZooKeeper的存在虽然为Kafka提供了协调服务,但也增加了部署和管理的复杂性。现在,Kafka 4.0去掉了这一依赖,使得整个部署过程变得更加简单直接。运维人员不用再花费大量精力去维护ZooKeeper集群,降低了运营成本的同时,还提高了系统的可扩展性,管理任务也大大简化了。

二、新消费者组协议

Kafka 4.0引入了新的消费者组协议(KIP – 848),这个更新主要是为了优化重新平衡性能。在大规模部署场景下,消费者组的重新平衡操作经常会导致停机时间延长和延迟增加。而新协议将相关逻辑转移到了代理端,很好地解决了这些问题,提升了消费者组的可靠性和响应速度。简单来说,就是在处理大量消息时,系统能够更加稳定、高效地运行。

三、支持队列功能

Kafka 4.0新增的队列功能(KIP – 932),支持了传统队列语义。以前,Kafka在消息处理模式上有一定的局限性,现在通过这个功能,允许多个消费者协同处理同一个分区的消息。这就使得Kafka可以应用在更多场景中,尤其是那些需要点对点消息模式的场景,它变成了一个更通用的消息平台。

四、Java版本要求升级

随着Kafka 4.0的发布,对Java版本的要求也有所改变。Kafka客户端和Kafka Streams现在需要Java 11的支持,而Kafka代理、Connect和相关工具则需要Java 17。在项目升级到Kafka 4.0时,开发者需要注意确保Java环境符合要求,以免出现兼容性问题。

五、API更新

为了让平台更加简洁,同时鼓励开发者采用新功能,Kafka 4.0删除了至少12个月前被废弃的API。这一操作虽然可能会对部分依赖旧API的项目造成一定影响,但从长远来看,有助于Kafka生态系统的健康发展,让开发者能够更快地接触和使用到新的、更强大的功能。

六、代码示例

使用Kafka 4.0的KRaft模式创建主题

下面这段代码展示了如何在KRaft模式下创建Kafka主题:

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Collections;
import java.util.Properties;

public class KafkaTopicCreator {
    public static void main(String[] args) {
        // 配置Kafka连接属性
        Properties props = new Properties();
        // 设置Kafka服务器地址
        props.put("bootstrap.servers", "localhost:9092");
        // 设置消息确认机制
        props.put("acks", "all");

        // 创建AdminClient实例,用于管理Kafka集群
        AdminClient adminClient = AdminClient.create(props);

        // 创建一个新的主题,指定主题名为my-topic,分区数为1,副本因子为1
        NewTopic newTopic = new NewTopic("my-topic", 1, (short) 1);
        // 创建主题并获取创建结果
        CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));

        try {
            // 输出主题创建结果
            System.out.println("Topic created: " + result.all().get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

使用新消费者组协议

接下来的代码示例展示了如何使用新消费者组协议:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 配置消费者属性
        Properties props = new Properties();
        // 设置Kafka服务器地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 设置消费者组ID
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        // 设置键的反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 设置值的反序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 创建Kafka消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅主题my-topic
        consumer.subscribe(Collections.singleton("my-topic"));

        while (true) {
            // 拉取消息,设置拉取超时时间为100毫秒
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 输出消息内容
                System.out.println(record.value());
            }
            // 同步提交消费位移,确保消息被正确处理
            consumer.commitSync();
        }
    }
}

队列功能示例

下面的代码展示了如何在Kafka中实现类似队列的行为:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaQueueExample {
    public static void main(String[] args) {
        // 配置消费者属性
        Properties props = new Properties();
        // 设置Kafka服务器地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 设置消费者组ID
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-queue-group");
        // 设置键的反序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 设置值的反序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 创建Kafka消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅主题my-queue-topic
        consumer.subscribe(Collections.singleton("my-queue-topic"));

        while (true) {
            // 拉取消息,设置拉取超时时间为100毫秒
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 输出消息内容
                System.out.println(record.value());
                // 处理消息后,手动确认以避免重复消费
                consumer.commitSync(Collections.singleton(record));
            }
        }
    }
}

通过这些示例,希望能帮助大家更好地理解和使用Kafka 4.0的新特性。如果你在实际应用中遇到问题,欢迎在评论区留言讨论。


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/back/16089.html
喜欢 (0)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】