1.添加依赖

在pom.xml文件中添加以下依赖:


  org.springframework.bootspring-boot-starter-parent3.3.3pom17org.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starterorg.springframework.bootspring-boot-starter-testtestorg.springframework.kafkaspring-kafkaorg.springframework.bootspring-boot-maven-plugin

2.添加配置类

添加链接kafka的一些配置信息:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public Map producerConfigs() {
        Map props = new HashMap();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate(producerFactory());
    }

    @Bean
    public DefaultKafkaProducerFactory producerFactory() {
        return new DefaultKafkaProducerFactory(producerConfigs());
    }

    // Kafka消费者配置
    @Bean
    public ConsumerFactory consumerFactory() {
        Map configProps = new HashMap();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

并在application.yml文件中配置项目启动端口,若不配置,默认为8080:

spring:
  application:
    name: ProviderTest
server:
  port: 7749

3.消息生产者

模拟生产者发送消息到topic中心的方法:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletableFuture;

@Component
public class KafkaProducerService {

    @Autowired
    private  KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, Object object) {
        CompletableFuture> future = kafkaTemplate.send(topic, object)
                .thenApply(result -> {
                    // 在这里可以添加发送成功后的处理逻辑
                    System.out.println("消息发送成功: " + result);
                    return result;
                })
                .exceptionally(exception -> {
                    // 处理发送失败的情况
                    System.err.println("消息发送失败: " + exception.getMessage());
                    return null; // 或者根据业务需求返回适当的默认值
                });

    }
}

想要发送不同的消息,在这里配置一个controller类,通过访问url地址的方式发送不同的消息,需要如下配置:

@Controller
@RequestMapping("/kafka")
public class KafkaController {

    @Autowired
    KafkaProducerService kafkaProducerService;

    @GetMapping("/sendMessage/{topic}/{object}")
    public void sendMessage(@PathVariable("topic") String topic, @PathVariable("object") String object)
    {
        kafkaProducerService.sendMessage(topic, object);
    }
}

4.消息消费者

模拟消息消费者消费消息的方法:

@Component
public class KafkaConsumerService {

    /**
     *  @KafkaListener监听用户组为test-group的topic02主题的消息
     * @param message
     */
    @KafkaListener(groupId = "test-group", topics = "topic02")
    public void listen(String message) {
        System.out.println("获取到的消息为:" + message);
    }
}

5.测试

5.1先启动zookeeper客户端

到zookeeper安装目录下:双击zkServer.cmd文件:

看到如下服务端启动成功:

5.2启动kafka

到kafka的安装目录下进入cmd窗口:

输入命令启动kafka:

.binwindowskafka-server-start.bat .configserver.properties

若出现如下表示启动成功:

5.3在浏览器测试

启动kafka后端项目,在浏览器输入url访问进行发送消息到topic中:

localhost:7749/kafka/sendMessage/topic02/测试

其中7749是后端定义的端口号,“topic02”是在消息消费者端设置的监听的主题名称,“测试”为发送的具体内容并且值可以改为自己想要发送的任何数据:

可以看到控制台打印出消费者消费到的数据:

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。