1.添加依赖
在pom.xml文件中添加以下依赖:
org.springframework.boot spring-boot-starter-parent 3.3.3 pom 17 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test org.springframework.kafka spring-kafka org.springframework.boot spring-boot-maven-plugin
2.添加配置类
添加链接kafka的一些配置信息:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public Map producerConfigs() {
Map props = new HashMap();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
@Bean
public DefaultKafkaProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory(producerConfigs());
}
// Kafka消费者配置
@Bean
public ConsumerFactory consumerFactory() {
Map configProps = new HashMap();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
并在application.yml文件中配置项目启动端口,若不配置,默认为8080:
spring:
application:
name: ProviderTest
server:
port: 7749
3.消息生产者
模拟生产者发送消息到topic中心的方法:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
@Component
public class KafkaProducerService {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, Object object) {
CompletableFuture> future = kafkaTemplate.send(topic, object)
.thenApply(result -> {
// 在这里可以添加发送成功后的处理逻辑
System.out.println("消息发送成功: " + result);
return result;
})
.exceptionally(exception -> {
// 处理发送失败的情况
System.err.println("消息发送失败: " + exception.getMessage());
return null; // 或者根据业务需求返回适当的默认值
});
}
}
想要发送不同的消息,在这里配置一个controller类,通过访问url地址的方式发送不同的消息,需要如下配置:
@Controller
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
KafkaProducerService kafkaProducerService;
@GetMapping("/sendMessage/{topic}/{object}")
public void sendMessage(@PathVariable("topic") String topic, @PathVariable("object") String object)
{
kafkaProducerService.sendMessage(topic, object);
}
}
4.消息消费者
模拟消息消费者消费消息的方法:
@Component
public class KafkaConsumerService {
/**
* @KafkaListener监听用户组为test-group的topic02主题的消息
* @param message
*/
@KafkaListener(groupId = "test-group", topics = "topic02")
public void listen(String message) {
System.out.println("获取到的消息为:" + message);
}
}
5.测试
5.1先启动zookeeper客户端
到zookeeper安装目录下:双击zkServer.cmd文件:
看到如下服务端启动成功:
5.2启动kafka
到kafka的安装目录下进入cmd窗口:
输入命令启动kafka:
.binwindowskafka-server-start.bat .configserver.properties
若出现如下表示启动成功:
5.3在浏览器测试
启动kafka后端项目,在浏览器输入url访问进行发送消息到topic中:
localhost:7749/kafka/sendMessage/topic02/测试
其中7749是后端定义的端口号,“topic02”是在消息消费者端设置的监听的主题名称,“测试”为发送的具体内容并且值可以改为自己想要发送的任何数据:
可以看到控制台打印出消费者消费到的数据:
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。
评论(0)