1、前言
EMQX 是一款强大的开源 MQTT 消息代理,它支持大量的连接和高吞吐量,适用于各种物联网应用。Webhook 是 EMQX 提供的扩展功能之一,用于将消息推送到外部的 HTTP 服务。在本文中,我们将介绍如何使用 EMQX 开源版的 Webhook 机制,并展示如何处理收到的 Webhook 请求,将其中的数据存储到数据库中。
2、Webhook 简介
Webhook 是一种常见的 HTTP 回调机制,用于将事件或数据推送到外部服务器。当 MQTT 客户端发布消息时,EMQX 可以通过 Webhook 将该消息发送给指定的 HTTP 端点,方便我们在接收到消息后进一步处理数据。
3、搭建 Webhook 服务
接下来,我们编写一个简单的 SpringBoot 2.7服务,用于接收 EMQX 的 Webhook 请求并将其中的数据存储到数据库中。
3.1、项目依赖
在 pom.xml
中添加以下依赖:
org.springframework.boot spring-boot-starter-web org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test com.fasterxml.jackson.core jackson-core 2.13.5 com.fasterxml.jackson.core jackson-databind 2.13.5} com.fasterxml.jackson.core jackson-annotations 2.13.5 mysql mysql-connector-java 8.0.33 com.baomidou mybatis-plus-boot-starter 3.5.6
3.2、实现 Webhook 控制器
3.2.1、Controller
package ....这里填写你自己的
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
import java.util.Map;
@RestController
@RequestMapping("/emqx/test")
@AllArgsConstructor
@Slf4j
public class WebhookController {
private final EmqxTestService emqxTestService;
private final ObjectMapper objectMapper = new ObjectMapper();
@PostMapping("/webhook")
public String webhook(@RequestBody String payload) {
try {
// 解析主 JSON 字符串为 Map
Map payloadMap = objectMapper.readValue(payload, new TypeReference
3.2.2、Service
package ....这里填写你自己的
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ldb.tool.entity.EmqxTest;
import com.ldb.tool.mapper.EmqxTestMapper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.List;
@Service
@AllArgsConstructor
@Slf4j
public class EmqxTestService {
private final EmqxTestMapper emqxTestMapper;
public EmqxTest insertData(EmqxTest testData) {
EmqxTest emqxTest = new EmqxTest();
// 你可以手动设置其他需要的字段,如 clientId, topic, data 等
emqxTest.setClientId(testData.getClientId());
emqxTest.setTopic(testData.getTopic());
emqxTest.setData(testData.getData());
emqxTest.setCreateTime(new Date()); // 如果你有自动填充策略,可以忽略这行
this.emqxTestMapper.insert(emqxTest);
return emqxTest;
}
}
3.2.3、Mapper
package ...这里填写你自己的;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ldb.tool.entity.EmqxTest;
public interface EmqxTestMapper extends BaseMapper {
}
3.2.4、Entity
package ...这里填写你自己的;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@TableName("emqx_test")
@Data
public class EmqxTest implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(value = "id", type = IdType.ASSIGN_ID)
private Long id;
private String clientId;
private String topic;
private String data;
private Date createTime;
private Date updateTime;
}
4、配置 EMQX Webhook
4.1、运行
我们这里使用docker来运行EMQX。
4.1.1、获取镜像
docker pull emqx/emqx:5.8.0
4.1.2、启动容器
docker run -d --name emqx
-p 1883:1883 -p 8083:8083
-p 8084:8084 -p 8883:8883
-p 18083:18083
-v $PWD/data:/opt/emqx/data
-v $PWD/log:/opt/emqx/log
emqx/emqx:5.8.0
4.2、配置EMQX-Webhook
4.2.1、创建Webhook
访问EMQX可视化后台(http://localhost:18083/)=>集成=>Webhook=>创建Webhook
在填写设置的时候,需要注意的是我们本地docke访问宿主机,在容器内部URL:127.0.0.1,指向的是容器本身,你可以获取宿主机IP作为URL,比如192.168.30.44。
我们通过URL选项的测试按钮可以点击测试是否正常请求。
5、测试 Webhook
在保证我们的Java-Webhook、EMQX服务运行的情况下,我们可以通过MQTTX(简介 – MQTTX 文档)软件去模拟一台直连的MQTT设备发起一个主题,因为我们在创建Webhook的时候触发者是消息发布。
5.1、MQTTX发送主题
首先我们需要新建一个MQTT连接,配置如下所以,未设置认证的话不需要用户名密码。
右下角,我们填写主题(Topic)的消息路由为listen/me,消息内容为{“msg”: “send messgae”,”status”:1},点击小飞机按钮发送。
5.2、查看Webhook触发情况
在EMQX后台,集成=>Webhook,查看送达情况。
在查看我们的Java服务的日志打印,也收到了。
查看sql表,也已经正常保存。
6、结论
Webhook 是一种强大的机制,MQTT 消息发布事件触发后,通过 HTTP 推送到 Spring Boot 服务,对接收到的数据进行解析和存储。这种机制能够让我们轻松地将消息从 EMQX 转发到其他服务,从而实现复杂的业务逻辑处理。
评论(0)