ITBEAR科技资讯
网站首页 科技资讯 财经资讯 分享好友

开源项目:用环信MQTT实现“世界频道”只需5分钟【附源码】

时间:2022-04-27 15:47:55来源:互联网编辑:星辉

说到“世界频道”想必大家都不陌生,常见的如王者荣耀的世界广播摇人组队以及最近兴起的Discord社区交友等等。究其目的就是在应用内让海量用户可以实时互动。有些开发者为了实现这种场景会选择聊天室方案来实现,但是这种方式存在一定的局限性,比如聊天室人数上限、海量消息处理等各种情况。

当然如果有钱有颜,可以直接选择云厂商产品(比如环信的聊天室方案和超级社区),如果有才有time,也可以选择平替版MQTT实现方案。今天小猿将介绍用环信MQTT消息云实现应用内的世界频道,满满干货,不要错过~~

使用MQTT实现世界频道-Demo效果演示

协议优势:

在介绍具体方案之前,我们先唠一唠为啥选择MQTT协议。

轻量级:MQTT本身是物联网的连接协议,专为受限设备和低带宽场景使用。所以其代码占用空间较小,同样适用于注重SDK大小的移动应用领域(比如:游戏领域)。

易集成:MQTT作为标准开放的消息协议,经过多年演进,已支持30多种开发语言,10余种SDK,无论何种开发环境,都可以快速找到开源SDK。

高并发:MQTT是轻量级的消息传输协议,2字节心跳报文,最小化传输和连接成本,云厂商broker产品都可支持千万级并发接入,适用于高并发连接场景。

低成本:MQTT是基于客户端-服务器的订阅/发布模型,通过服务器中间件实现消息分发,减少消息复制成本,快速实现一对多在线推送。

灵活性:MQTT协议支持多种消息特性,包括:topic主题层级、消息分级(QoS0,1,2)、遗嘱消息、保留消息等,可以灵活实现多种业务场景。

衍生功能:随着MQTT云服务的发展,部分服务器厂商已支持消息存储、获取在线设备列表、查看历史消息等衍生功能,降低开发工作量与消息存储成本。

实现方案:

言归正传,上干货。本次技术实现方案包含:移动客户端(Android)、后端服务(Java)以及MQTT服务器。这里提一下,MQTT服务器使用环信MQTT消息云,使用三方云服务比较省心,既节省开发时间,产品性能也不需要担心,现在注册可以直接使用环信MQTT消息云超高额度的免费版:每月100并发连接、300万消息,完全满足功能开发使用。

客户端实现:

客户端实现主要包含以下两部分:

底层MQTT业务集成:包含引入SDK、MQTT方法封装、业务交互(消息收发)。

APP上层交互:在APP首页提供世界频道入口,实现心情弹幕飘窗(接收)和发送。

接下来上底层MQTT业务集成代码。

引入SDK:

这一步环信官方文档比较明确,就是根据自己的平台引入相应的mqtt客户端sdk,这里简单贴一下AndroidStudio的引入配置

1// 在根目录 build.gradle repositories 下加入配置

2maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }

3...

4// 然后加入 MQTT 依赖

5// MQTT sdk https://docs-im.easemob.com/mqtt/qsandroidsdk

6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'

7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

方法封装

这里贴一下对mqtt相关方法的简单封装,代码在vmmqtt模块儿的MQTTHelper类下:

1 /**

2 * Create by lzan13 on 2022/3/22

3 * 描述:MQTT 帮助类

4 */

5 object MQTTHelper {

6

7 private var mqttClient: MqttAndroidClient? = null

8

9 // 缓存主题集合

10 private val topicList = mutableListOf()

11

12 /**

13 * 链接MQTT

14 * @param id 用户 Id

15 * @param token 用户链接 MQTT 的 Token

16 * @param topic 需要订阅的主题,不为空就会在连接成功后进行订阅

17 */

18 fun connect(id: String, token: String, topic: String = "") {

19 // 处理订阅主题

20 if (topic.isNotEmpty()) topicList.add(topic)

21

22 // 拼接链接地址

23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"

24 // 拼接 clientId

25 val clientId = "${id}@${MQTTConstants.mqttAppId()}"

26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId)

27

28 //连接参数

29 val options = MqttConnectOptions()

30 options.isAutomaticReconnect = true //设置自动重连

31 options.isCleanSession = true // 缓存

32 options.connectionTimeout = CConstants.timeMinute.toInt() // 设置超时时间,单位:秒

33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包发送间隔,单位:秒

34 options.userName = id // 用户名

35 options.password = token.toCharArray() // 密码

36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;

37 // 设置MQTT监听

38 mqttClient?.setCallback(object : MqttCallback {

39 override fun connectionLost(t: Throwable) {

40 // 通知链接断开

41 VMLog.d("MQTT 链接断开 $t")

42 }

43

44 @Throws(Exception::class)

45 override fun messageArrived(topic: String, message: MqttMessage) {

46 // 通知收到消息

47 VMLog.d("MQTT 收到消息:$message")

48 // 如果未订阅则直接丢弃

49 if (!topicList.contains(topic)) return

50 notifyEvent(topic, String(message.payload))

51 }

52

53 override fun deliveryComplete(token: IMqttDeliveryToken) {}

54 })

55 //进行连接

56 mqttClient?.connect(options, null, object : IMqttActionListener {

57 override fun onSuccess(token: IMqttToken) {

58 VMLog.d("MQTT 链接成功")

59 // 链接成功,循环订阅缓存的主题

60 topicList.forEach { subscribe(it) }

61 }

62

63 override fun onFailure(token: IMqttToken, t: Throwable) {

64 VMLog.d("MQTT 链接失败 $t")

65 }

66 })

67 }

68

69 /**

70 * 订阅主题

71 * @param topic 主题

72 */

73 fun subscribe(topic: String) {

74 if (!topicList.contains(topic)) {

75 topicList.add(topic)

76 }

77 try {

78 //连接成功后订阅主题

79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {

80 override fun onSuccess(token: IMqttToken) {

81 VMLog.d("MQTT 订阅成功 $topic")

82 }

83

84 override fun onFailure(token: IMqttToken, t: Throwable) {

85 VMLog.d("MQTT 订阅失败 $topic $t")

86 }

87 })

88 } catch (e: MqttException) {

89 e.printStackTrace()

90 }

91 }

92

93 /**

94 * 取消订阅

95 * @param topic 主题

96 */

97 fun unsubscribe(topic: String) {

98 if (topicList.contains(topic)) {

99 topicList.remove(topic)

100 }

101 try {

102 mqttClient?.unsubscribe(topic)

103 } catch (e: MqttException) {

104 e.printStackTrace()

105 }

106 }

107

108 /**

109 * 发送 MQTT 消息

110 * @param topic 主题

111 * @param content 内容

112 */

113 fun sendMsg(topic: String, content: String) {

114 val msg = MqttMessage()

115 msg.payload = content.encodeToByteArray() // 设置消息内容

116 msg.qos = 0 //设置消息发送质量,可为0,1,2.

117 // 设置消息的topic,并发送。

118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener {

119 override fun onSuccess(asyncActionToken: IMqttToken) {

120 VMLog.d("MQTT 消息发送成功")

121 }

122

123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {

124 VMLog.d("MQTT 消息发送失败 ${exception.message}")

125 }

126 })

127 }

128

129 /**

130 * 通知 MQTT 事件

131 */

132 private fun notifyEvent(topic: String, data: String) {

133 LDEventBus.post(topic, data)

134 }

135 }

业务交互

和业务相关的就是在启动APP后,使用后端服务器返回的鉴权token信息及连接封装接口登录环信通MQTT服务器,登录成功后订阅主题并监听消息。

1// 请求 token 成功后,调用MQTTHelper.connect()链接 MQTT 服务器,这里会同时传递监听的主题

2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo)

3

4/**

5 * 发送匹配信息

6 */

7private fun sendMatchInfo() {

8 if (selfMatch.user.nickname.isEmpty()) return

9 // 提交自己的匹配信息到服务器

10 mViewModel.submitMatch(selfMatch)

11 val json = JSONObject()

12 json.put("content", selfMatch.content)

13 json.put("emotion", selfMatch.emotion)

14 json.put("gender", selfMatch.gender)

15 json.put("type", selfMatch.type)

16 val jsonUser = JSONObject()

17 jsonUser.put("avatar", mUser.avatar)

18 jsonUser.put("id", mUser.id)

19 jsonUser.put("nickname", mUser.nickname)

20 jsonUser.put("username", mUser.username)

21 json.put("user", jsonUser)

22 MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString())

23}

24

25// 监听消息这里使用了一个事件总线进行通知,在上边封装 MQTTHelper 发送消息也使用了这个,

26// 订阅 MQTT 事件

27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) {

28 val match = JsonUtils.fromJson(it, Match::class.java)

29 // 这里收到匹配信息之后就增加一条弹幕

30 addBarrage(match)

31}

后端服务实现

接下来介绍后端服务实现,主要包含以下两部分:

配置连接信息:配置环信MQTT消息云连接信息。

获取鉴权信息:获取客户端连接需要的鉴权信息。

配置连接信息

配置部分只需要按照环信后台配置信息进行替换就好,配置在config目录下的config.xxx.json文件内

1/**

2 * Easemob MQTT 配置 https://console.easemob.com/app/generalizeMsg/overviewService

3 */

4config.mqtt = {

5 host: 'mqtt host', // MQTT 链接地址

6 appId: 'appId', // MQTT AppId

7 port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss)

8 restHost: 'https://api.cn1.mqtt.chat/app/8igtc0', // MQTT 服务 API 地址

9 clientId: 'client id', // 替换环信后台 clientId

10 clientSecret: 'client secret', // 替换环信后台 clientSecret

11};

获取鉴权信息

这里主要是获取客户端连接所需要的鉴权信息token,为了安全token肯定是要放在服务器端生成的,废话不多说,上代码:

1/**

2 * Create by lzan13 on 2022/3/22

3 * 描述:MQTT 帮助类

4 */

5object MQTTHelper {

6

7 private var mqttClient: MqttAndroidClient? = null

8

9 // 缓存主题集合

10 private val topicList = mutableListOf()

11

12 /**

13 * 链接MQTT

14 * @param id 用户 Id

15 * @param token 用户链接 MQTT 的 Token

16 * @param topic 需要订阅的主题,不为空就会在连接成功后进行订阅

17 */

18 fun connect(id: String, token: String, topic: String = "") {

19 // 处理订阅主题

20 if (topic.isNotEmpty()) topicList.add(topic)

21

22 // 拼接链接地址

23 val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"

24 // 拼接 clientId

25 val clientId = "${id}@${MQTTConstants.mqttAppId()}"

26 mqttClient = MqttAndroidClient(VMTools.context, url, clientId)

27

28 //连接参数

29 val options = MqttConnectOptions()

30 options.isAutomaticReconnect = true //设置自动重连

31 options.isCleanSession = true // 缓存

32 options.connectionTimeout = CConstants.timeMinute.toInt() // 设置超时时间,单位:秒

33 options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包发送间隔,单位:秒

34 options.userName = id // 用户名

35 options.password = token.toCharArray() // 密码

36 options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;

37 // 设置MQTT监听

38 mqttClient?.setCallback(object : MqttCallback {

39 override fun connectionLost(t: Throwable) {

40 // 通知链接断开

41 VMLog.d("MQTT 链接断开 $t")

42 }

43

44 @Throws(Exception::class)

45 override fun messageArrived(topic: String, message: MqttMessage) {

46 // 通知收到消息

47 VMLog.d("MQTT 收到消息:$message")

48 // 如果未订阅则直接丢弃

49 if (!topicList.contains(topic)) return

50 notifyEvent(topic, String(message.payload))

51 }

52

53 override fun deliveryComplete(token: IMqttDeliveryToken) {}

54 })

55 //进行连接

56 mqttClient?.connect(options, null, object : IMqttActionListener {

57 override fun onSuccess(token: IMqttToken) {

58 VMLog.d("MQTT 链接成功")

59 // 链接成功,循环订阅缓存的主题

60 topicList.forEach { subscribe(it) }

61 }

62

63 override fun onFailure(token: IMqttToken, t: Throwable) {

64 VMLog.d("MQTT 链接失败 $t")

65 }

66 })

67 }

68

69 /**

70 * 订阅主题

71 * @param topic 主题

72 */

73 fun subscribe(topic: String) {

74 if (!topicList.contains(topic)) {

75 topicList.add(topic)

76 }

77 try {

78 //连接成功后订阅主题

79 mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {

80 override fun onSuccess(token: IMqttToken) {

81 VMLog.d("MQTT 订阅成功 $topic")

82 }

83

84 override fun onFailure(token: IMqttToken, t: Throwable) {

85 VMLog.d("MQTT 订阅失败 $topic $t")

86 }

87 })

88 } catch (e: MqttException) {

89 e.printStackTrace()

90 }

91 }

92

93 /**

94 * 取消订阅

95 * @param topic 主题

96 */

97 fun unsubscribe(topic: String) {

98 if (topicList.contains(topic)) {

99 topicList.remove(topic)

100 }

101 try {

102 mqttClient?.unsubscribe(topic)

103 } catch (e: MqttException) {

104 e.printStackTrace()

105 }

106 }

107

108 /**

109 * 发送 MQTT 消息

110 * @param topic 主题

111 * @param content 内容

112 */

113 fun sendMsg(topic: String, content: String) {

114 val msg = MqttMessage()

115 msg.payload = content.encodeToByteArray() // 设置消息内容

116 msg.qos = 0 //设置消息发送质量,可为0,1,2.

117 // 设置消息的topic,并发送。

118 mqttClient?.publish(topic, msg, null, object : IMqttActionListener {

119 override fun onSuccess(asyncActionToken: IMqttToken) {

120 VMLog.d("MQTT 消息发送成功")

121 }

122

123 override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {

124 VMLog.d("MQTT 消息发送失败 ${exception.message}")

125 }

126 })

127 }

128

129 /**

130 * 通知 MQTT 事件

131 */

132 private fun notifyEvent(topic: String, data: String) {

133 LDEventBus.post(topic, data)

134 }

135}

源码地址

核心代码就这么多,不超过500行,这里没有直接调用环信历史消息接口获取消息存储记录,后续可以在进行改良,简化实现流程。源码链接附上,配合使用效果更佳。

服务端github源码:

https://github.com/lzan13/vmtemplateserver

客户端github源码:

https://gitee.com/lzan13/VMTemplateAndroid

写在最后

MQTT协议资源占用小,并发连接高,集成简单,特别适用于高频数据交互场景,比如:游戏的世界广场、视频平台弹幕等等等等,欢迎各位小伙伴集思广益,基于MQTT服务实现更多的业务场景,享受技术带来的便利与快乐。

更多热门内容
国补手机哪些适合大学生:联想razr60与edge 60精选
开学季来临,换新手机是许多大学生的刚需。作为学生党,既追求时尚外观和实用功能,又对价格敏感,有国补又能叠加学生优惠的手机无疑是双赢的选择。即使预算不多也可拿下主流直屏手机,还可考虑时尚潮流的折叠屏,比如联想moto razr 60系列。以下就给学生朋友们分享折叠

2025-08-21

国补加持下的平板电脑推荐:从百元助学到旗舰AI
在当前国补政策的红利下,平板电脑市场的价格变动使得消费者迎来了难得的入手良机。联想作为国内平板市场的主流品牌,旗下多款产品均纳入国补范围,从百元级助学神器到旗舰级AI生产力工具,形成了完整的价格梯度覆盖。这里我将分享其中六款热门的平板型号,从性能解析、

2025-08-21

2025国补尾巴,笔记本电脑值得推荐的三大系列
2025 年笔记本国补政策进入最后三个月冲刺期,截至12月31日,购买联想指定机型可享双重福利:直接补贴最高2000元+学生认证额外9折,综合优惠力度创年内新高。本次聚焦拯救者、小新、YOGA三大系列八款核心机型,按游戏电竞、移动办公、创意设计三大场景分类推荐,结合国

2025-08-21

从“手动挡”进入“AI智能挡”:亚数TrustAsia 开启证书管理「服务化」新时代
在数字化转型加速推进的当下,TLS/SSL证书作为网络安全体系的关键基础设施,通过实现端到端加密传输、身份验证,为保障企业关键业务数据安全和构建可信网络环境发挥着至关重要的作用。随着密码学技术向抗量子计算范式演进,行业对加密敏捷性(Cryptographic Agility)要求

2025-08-21

科隆游戏展:纯血进化 ROGXBOX掌机X正式发布
核心亮点:l 全新软件生态体验:ROG联手XBOX共同打造,融合XBOX全新前端、奥创智控中心SE、Windows开放式体验优势,便携式游戏体验全面跃升l 握持体验大升级:全新手柄设计优化人体工学曲线,引入XBOX标志性的脉冲扳机,媲美专业XBOX手柄舒适握持l 次世代性能:首发搭载

2025-08-21

不止能干活!新能源汽车排行爆款上汽大通大拿V1重新定义轻客角色
说实话,要不是亲身体验过,我真不会相信我们家开得最多的车,竟然是一台轻客。以前一直觉得轻客就是干活用的,拉货、送快递那类,和家用根本不搭边。但生活总有些现实需求:家里有人做小本生意,有孩子要接送,还有老爸喜欢自驾游。原来的小轿车早就不够用了,换来换去

2025-08-21

声态+AI |2026中国国际音频产业大会(GAS)明年3月举办
随着生成式AI的快速发展声音已成为人机交互、虚拟现实和情感体验的核心纽带GAS 2026将共同探讨AI如何重塑音频创作、传输与消费的全链条2026中国国际音频产业大会(GAS 2026)时间:2026年3月25—26日地点:上海张江科学会堂主题:声态+AI主办单位:中国电子音响行业协会

2025-08-21

益生菌对增肥哪个牌子好看完这篇不再纠结
你有没有过这样的经历:明明吃得不多,体重却一直上不去,甚至有点“喝凉水都难胖”的无奈?我就是这样一个肠胃敏感、代谢快得像开了挂的人。试过增肌粉、蛋白棒、高热量饮食,效果微乎其微。直到三年前,一位营养师朋友提醒我:“你是不是忽略了肠道环境?吸收不好,吃再多也没

2025-08-21

深耕垂类大模型!中关村科金入选2025年《财富》中国科技50强
2025年8月21日,全球最具影响力的商业媒体之一《财富》(FORTUNE)正式发布“2025年中国科技50强”榜单。中关村科金凭借在垂类大模型领域从技术研发、平台应用到产业落地的全链条能力,成功登榜。《财富》系列榜单一直被视为全球商业发展趋势的风向标,具有极高的权威性和

2025-08-21

下一个爆款在哪儿?2025英特尔人工智能创新应用大赛获奖名单揭晓
8月16日,2025英特尔人工智能创新应用大赛总决赛暨颁奖典礼在深圳盛大启幕。历经层层选拔,从2817支年轻队伍中脱颖而出的40个优秀团队和作品,围绕工业、教育、心理健康、游戏等领域,展开了巅峰对决。最终,在总决赛现场,各项重磅大奖尘埃落定。其中,动力电池多机器

2025-08-21