前一段有幸参与到一个智能家居项目的开发,由于之前都没有过这方面的开发经验,所以对智能硬件的开发模式和技术栈都颇为好奇。
智能可燃气体报警器
产品是一款可燃气体报警器,如果家中燃气泄露浓度到达一定阈值,报警器检测到并上传气体浓度值给后台,后台以电话、短信、微信等方式,提醒用户家中可能有气体泄漏。
用户还可能向报警器发一些关闭报警、调整音量的指令等。整体功能还是比较简单的,大致的逻辑如下图所示:
但当我真正的参与其中开发时,其实有一点小小的失望,因为在整个研发过程中,并没用到什么新的技术,还是常规的几种中间件,只不过换个用法而已。
技术选型用rabbitmq
来做核心的组件,主要考虑到运维成本低,组内成员使用的熟练度比较高。
下面和小伙伴分享一下如何用 springboot
+ rabbitmq
搭建物联网(IOT
)平台,其实智能硬件也没想象的那么高不可攀!
很多小伙伴可能有点懵?rabbitmq
不是消息队列吗?怎么又能做智能硬件了?
其实rabbitmq
有两种协议,我们平时接触的消息队列是用的AMQP
协议,而用在智能硬件中的是MQTT
协议。
MQTT
全称(Message Queue Telemetry Transport):一种基于发布/订阅(publish
/subscribe
)模式的轻量级
通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing
)中的一个标准传输协议。
该协议将消息的发布者(publisher
)与订阅者(subscriber
)进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务,使用方式与传统的MQ有点类似。
TCP
协议位于传输层,MQTT
协议位于应用层,MQTT
协议构建于TCP/IP
协议上,也就是说只要支持TCP/IP
协议栈的地方,都可以使用MQTT
协议。
MQTT
协议为什么在物联网(IOT)中如此受偏爱?而不是其它协议,比如我们更为熟悉的 HTTP
协议呢?
HTTP
协议它是一种同步协议,客户端请求后需要等待服务器的响应。而在物联网(IOT)环境中,设备会很受制于环境的影响,比如带宽低、网络延迟高、网络通信不稳定等,显然异步消息协议更为适合IOT
应用程序。HTTP
是单向的,如果要获取消息客户端必须发起连接,而在物联网(IOT)应用程序中,设备或传感器往往都是客户端,这意味着它们无法被动地接收来自网络的命令。HTTP
要实现这样的功能不但很困难,而且成本极高。前边说过MQTT
是一种轻量级的协议,它只专注于发消息, 所以此协议的结构也非常简单。
在MQTT
协议中,一个MQTT
数据包由:固定头
(Fixed header)、 可变头
(Variable header)、 消息体
(payload)三部分构成。
固定头部,使用两个字节,共16位: (4-7)位表示消息类型,使用4位二进制表示,可代表如下的16种消息类型,不过 0 和 15位置属于保留待用,所以共14种消息事件类型。
DUP Flag(重试标识)
DUP Flag:保证消息可靠传输,消息是否已送达的标识。默认为0,只占用一个字节,表示第一次发送,当值为1时,表示当前消息先前已经被传送过。
QoS Level(消息质量等级)
QoS Level:消息的质量等级,后边会详细介绍
RETAIN(持久化)
1
:表示发送的消息需要一直持久保存,而且不受服务器重启影响,不但要发送给当前的订阅者,且以后新加入的客户端订阅了此Topic
,订阅者也会马上得到推送。注意:新加入的订阅者,只会取出最新的一个RETAIN flag = 1
的消息推送。0
:仅为当前订阅者推送此消息。Remaining Length(剩余长度)
在当前消息中剩余的byte
(字节)数,包含可变头部和消息体payload。
固定头部仅定义了消息类型和一些标志位,一些消息的元数据需要放入可变头部中。可变头部内容字节长度 + 消息体payload = 剩余长度。
可变头部居于固定头部和payload中间,包含了协议名称,版本号,连接标志,用户授权,心跳时间等内容。
可变头存在于这些类型的消息:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。
消息体payload只存在于CONNECT
、PUBLISH
、SUBSCRIBE
、SUBACK
、UNSUBSCRIBE
这几种类型的消息:
CONNECT
:包含客户端的ClientId
、订阅的Topic
、Message
以及用户名
和密码
。PUBLISH
:向对应主题发送消息。SUBSCRIBE
:要订阅的主题以及QoS
。SUBACK
:服务器对于SUBSCRIBE
所申请的主题及QoS
进行确认和回复。UNSUBSCRIBE
:取消要订阅的主题。消息质量
(Quality of Service),即消息的发送质量,发布者(publisher
)和订阅者(subscriber
)都可以指定qos
等级,有QoS 0
、QoS 1
、QoS 2
三个等级。
下边分别说明一下这三个等级的区别。
Qos 0:At most once
(至多一次)只发送一次消息,不保证消息是否成功送达,没有确认机制,消息可能会丢失或重复。
图片源于网络,如有侵权联系删除
Qos 1:At least once
(至少一次),相对于QoS 0
而言Qos 1
增加了ack
确认机制,发送者(publisher
)推送消息到MQTT代理(broker
)时,两者自身都会先持久化消息,只有当publisher
或者 Broker
分别收到 PUBACK
确认时,才会删除自身持久化的消息,否则就会重发。
但有个问题,尽管我们可以通过确认来保证一定收到客户端 或 服务器的message
,可我们却不能保证仅收到一次message
,也就是当客户端publisher
没收到Broker
的puback
或者 Broker
没有收到subscriber
的puback
,那么就会一直重发。
publisher -> broker 大致流程:
图片源于网络,如有侵权联系删除
Qos 2:Exactly once
(只有一次),相对于QoS 1
,QoS 2
升级实现了仅接受一次message
,publisher
和 broker
同样对消息进行持久化,其中 publisher
缓存了message
和 对应的msgID
,而 broker
缓存了 msgID
,可以保证消息不重复,由于又增加了一个confirm
机制,整个流程变得复杂很多。
publisher -> broker 大致流程:
图片源于网络,如有侵权联系删除
LWT
全称为 Last Will and Testament
,其实遗嘱是一个由客户端预先定义好的主题和对应消息,附加在CONNECT
的数据包中,包括遗愿主题
、遗愿 QoS
、遗愿消息
等。
当MQTT代理 Broker
检测到有客户端client
非正常断开连接时,再由服务器主动发布此消息,然后相关的订阅者会收到消息。
举个栗子:聊天室中所有人都订阅一个叫talk
的主题 ,但小富由于网络抖动突然断开了链接,这时聊天室中所有订阅主题 talk
的客户端都会收到一个 “小富离开聊天室
” 的遗愿消息。
遗嘱的相关参数:
Will Flag
:是否使用 LWT,1 开启Will Topic
:遗愿主题名,不可使用通配符Will Qos
:发布遗愿消息时使用的 QoSWill Retain
:遗愿消息的 Retain 标识Will Message
:遗愿消息内容那客户端Client
有哪些场景是非正常断开连接呢?
Broker
检测到底层的 I/O 异常;Keep Alive
的间隔内和 Broker
进行消息交互;TCP
连接前没有发送 DISCONNECT
数据包;Broker
,导致关闭和客户端的连接等。注意:当客户端通过发布 DISCONNECT
数据包断开连接时,属于正常断开连接,并不会触发 LWT
的机制,与此同时Broker
还会丢弃掉当前客户端在连接时指定的相关 LWT
参数。
MQTT
协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等领域。使用的场景也是非常非常多,下边列举一些:
具体 rabbitmq
的环境搭建就不赘述了,网上教程比较多,有条件的用服务器,没条件的像我搞个Windows
版的也很快乐嘛。
我们先开启 rabbitmq
的 mqtt
协议,因为默认安装下是关闭的,命令如下:
rabbitmq-plugins enable rabbitmq_mqtt
上一步中安装rabbitmq
环境并开启 mqtt
协议后,实际上mqtt
消息代理服务就搭建好了,接下来要做的就是实现客户端消息的推送和订阅。
这里使用spring-integration-mqtt
、org.eclipse.paho.client.mqttv3
两个工具包实现。
<!--mqtt依赖包-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
消息的发送比较简单,主要是应用到@ServiceActivator
注解,需要注意messageHandler.setAsync
属性,如果设置成false
,关闭异步模式发送消息时可能会阻塞。
@Configuration
public class IotMqttProducerConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttConfig.getServers());
return factory;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "iotMqttInputChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getServerClientId(), mqttClientFactory());
messageHandler.setAsync(false);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
}
MQTT
对外提供发送消息的API
时,需要使用@MessagingGateway
注解,去提供一个消息网关代理,参数defaultRequestChannel
指定发送消息绑定的channel
。
可以实现三种API
接口,payload
为发送的消息,topic
发送消息的主题,qos
消息质量。
@MessagingGateway(defaultRequestChannel = "iotMqttInputChannel")
public interface IotMqttGateway {
// 向默认的 topic 发送消息
void sendMessage2Mqtt(String payload);
// 向指定的 topic 发送消息
void sendMessage2Mqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
// 向指定的 topic 发送消息,并指定服务质量参数
void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
消息订阅和我们平时用的MQ消息监听实现思路基本相似,@ServiceActivator
注解表明当前方法用于处理MQTT
消息,inputChannel
参数指定了用于接收消息的channel
。
/**
* @Author: xiaofu
* @Description: 消息订阅配置
* @date 2020/6/8 18:24
*/
@Configuration
public class IotMqttSubscriberConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttConfig.getServers());
return factory;
}
@Bean
public MessageChannel iotMqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(iotMqttInputChannel());
return adapter;
}
/**
* @author xiaofu
* @description 消息订阅
* @date 2020/6/8 18:20
*/
@Bean
@ServiceActivator(inputChannel = "iotMqttInputChannel")
public MessageHandler handlerTest() {
return message -> {
try {
String string = message.getPayload().toString();
System.out.println("接收到消息:" + string);
} catch (MessagingException ex) {
//logger.info(ex.getMessage());
}
};
}
}
额~ 由于本渣渣对硬件一窍不通,为了模拟硬件的发送消息,只能借助一下工具,其实硬件端实现MQTT
协议,跟我们前边的基本没什么区别,只不过换种语言嵌入到硬件中而已。
这里选的测试工具为mqttbox
,下载地址:http://workswithweb.com/mqttbox.html
我们用先用mqttbox
模拟向主题mqtt_test_topic
发送消息,看后台是否能成功接收到。
看到后台成功拿到了向主题mqtt_test_topic
发送的消息。
用mqttbox
模拟订阅主题mqtt_test_topic
,在后台向主题mqtt_test_topic
发送一条消息,这里我简单的写了个controller
调用API发送消息。
http://127.0.0.1:8080/fun/testMqtt?topic=mqtt_test_topic&message=我是后台向主题 mqtt_test_topic 发送的消息
我们看mqttbox
的订阅消息,已经成功的接收到了后台的消息,到此我们的MQTT
通信环境就算搭建成功了。如果把mqttbox
工具换成具体硬件设备,整个流程就是我们常说的智能家居了,其实真的没那么难。
在我们实际的生产环境中遇到过的问题,这里分享一下让大家少踩坑。
在客户端connect
连接的时,会有一个clientId
参数,需要每个客户端都保持唯一的。但我们在开发测试阶段clientId
直接在代码中写死了,而且服务都是单实例部署,并没有暴露出什么问题。
MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
然而在生产环境内侧的时候,由于服务是多实例集群部署,结果出现了下边的奇怪问题。同一时间内只能有一个客户端能拿到消息,其他客户端不但不能消费消息,而且还在不断的掉线重连:Lost connection: 已断开连接; retrying...
。
这就是由于clientId
相同导致客户端间相互竞争消费,最后将clientId
获取方式换成从发号器中拿,问题就好了,所以这个地方是需要特别注意的。
平时程序在开发环境没问题,可偏偏到了生产环境就一大堆问题,很多都是因为服务部署方式不同导致的。所以多学习分布式还是很有必要的。
MQTT
它只是一种协议,支持MQTT
协议的消息中间件产品非常多,下边的也只是其中的一部分
我也是第一次做和硬件相关的项目,之前听到智能家居都会觉得好高大上,但实际上手开发后发现,技术嘛万变不离其宗,也只是换种用法而已。
双手奉上项目 demo 的github
地址 :https://github.com/chengxy-nds/springboot-rabbitmq-mqtt.git,感兴趣的小伙伴可以下载跑一跑,实现起来非常的简单。
本文由哈喽比特于4年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/udFE6k9pPetIWsa6KeErrA
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。