首先选择稳定版本https://unpkg.com/mqtt@2.18.8/dist/mqtt.min.js
微信小程序协议要用wxs
下面是方法类 和使用
class MqttService { static instance = null static getInstance() { if (!MqttService.instance) { MqttService.instance = new MqttService() } return MqttService.instance } constructor() { this.client = null this.offlineMsgQueue = new Map() // key:topic, value:[]消息数组 this.isConnected = false this.isReconnecting = false // 防止重复重连 this.subMap = new Map() // 保存订阅:topic => [callback] this.config = null // 连接配置 this.reconnectTimer = null // 重连配置(指数退避) this.reconnect = { delay: 2000, maxDelay: 10000, count: 0, maxCount: 30 } // 监听网络变化(UniApp 通用) this.watchNetwork() } // 连接 connect(options) { if (this.isConnected) return Promise.resolve() this.config = { ...options } return new Promise((resolve, reject) => { this._createClient(resolve, reject) }) } // 创建客户端 _createClient(resolve, reject) { const { host, port, clientId, username, password } = this.config const url = `${host}:${port}/mqtt` console.log("【MQTT】开始连接 =>", url) console.log("【MQTT】客户端ID =>", clientId) const mqttOptions = { clientId, username, password, clean: false, keepalive: 60, connectTimeout: 15000, rejectUnauthorized: false, } loadSDK("mqtt").then((mqtt) => { console.log("【MQTT】SDK 加载完成") const client = mqtt.connect(url, mqttOptions) console.log(client) // ============================================= // 🔥 🔥 🔥 这里打印【所有状态】你就能看到过程了 // ============================================= client.on("connect", () => { console.log("【MQTT】连接成功!") this.isConnected = true this.isReconnecting = false this.reconnect.count = 0 this._resubscribe() resolve && resolve() }) client.on("error", (err) => { console.error("❌【MQTT】错误 =>", err) this._onDisconnect() reject && reject(err) }) client.on("message", (topic, msg) => { console.log("收到消息:", topic, msg.toString()) try { const data = JSON.parse(msg.toString()) // ========== 关键逻辑 ========== // 1. 如果已经订阅,直接执行回调 if (this.subMap.has(topic)) { const callbacks = this.subMap.get(topic) callbacks.forEach(cb => cb(data, topic)) } else { // 2. 还没订阅 → 存入离线缓存,等订阅后补发 if (!this.offlineMsgQueue.has(topic)) { this.offlineMsgQueue.set(topic, []) } this.offlineMsgQueue.get(topic).push(data) console.log(`暂存离线消息:${topic},等待订阅`) } } catch (e) { console.error("消息解析失败", e) } }) client.on("close", () => { console.log("【MQTT】连接关闭") this._onDisconnect() }) // client.on("packetsend", (packet) => { // console.log(packet); // if (packet.cmd === "connect") { // console.log("【MQTT】已发送连接包,等待服务器响应...") // } // }) // client.on("packetreceive", (packet) => { // console.log("【MQTT】收到服务器包 =>", packet.cmd) // }) this.client = client }) } // 断开后处理 _onDisconnect() { this.isConnected = false if (this.client) { this.client.removeAllListeners() this.client.end() this.client = null } this._startReconnect() } // 核心:自动重连(指数退避) _startReconnect() { if (this.isReconnecting || this.isConnected) return if (this.reconnect.count >= this.reconnect.maxCount) return this.isReconnecting = true this.reconnect.count++ let delay = this.reconnect.delay * this.reconnect.count delay = Math.min(delay, this.reconnect.maxDelay) console.log(` MQTT 第 ${this.reconnect.count} 次重连,${delay}ms 后执行`) clearTimeout(this.reconnectTimer) this.reconnectTimer = setTimeout(() => { this._createClient() }, delay) } // 🔥 新增:等待连接成功 waitConnected() { return new Promise(resolve => { if (this.isConnected) { resolve() } else { // 等待连接成功事件 const check = setInterval(() => { if (this.isConnected) { clearInterval(check) resolve() } }, 100) } }) } // 重连成功后自动重新订阅 _resubscribe() { console.log('自动恢复所有订阅') // this.subMap.forEach((_, topic) => { // this.client.subscribe(topic) // }) } // 订阅 subscribe(topic, callback) { if (!this.subMap.has(topic)) { this.subMap.set(topic, []) if (this.isConnected) { this.client.subscribe(topic, { qos: 1 }) } } const cbs = this.subMap.get(topic) if (!cbs.includes(callback)) cbs.push(callback) // ========== 订阅成功后,补发之前缓存的离线消息 ========== if (this.offlineMsgQueue.has(topic)) { const msgs = this.offlineMsgQueue.get(topic) msgs.forEach(data => callback(data, topic)) // 补发完清空 this.offlineMsgQueue.delete(topic) console.log(`补发离线消息:${topic},共${msgs.length}条`) } } // 取消订阅 unsubscribe(topic, callback) { if (!this.subMap.has(topic)) return const cbs = this.subMap.get(topic).filter(cb => cb !== callback) if (cbs.length === 0) { this.subMap.delete(topic) this.client?.unsubscribe(topic) } else { this.subMap.set(topic, cbs) } } // 发布消息 publish(topic, data) { if (!this.isConnected || !this.client) return const msg = typeof data === 'string' ? data : JSON.stringify(data) this.client.publish(topic, msg) } // 手动断开 disconnect() { clearTimeout(this.reconnectTimer) this.isConnected = false this.isReconnecting = false this.reconnect.count = 0 this.client?.end() this.client = null this.subMap.clear() console.log('MQTT 手动断开') } // 监听网络状态(UniApp 通用) watchNetwork() { uni.onNetworkStatusChange(res => { if (res.isConnected && !this.isConnected && !this.isReconnecting) { console.log(' 网络恢复,触发重连') this._startReconnect() } }) } }this.mqtt = MqttService.getInstance() this.mqtt.connect({ host: 'wxs://www.hczh3d.com', // 小程序必须 wxs:// port: 8084, clientId: 'uniapp_' + 唯一id,//唯一id username: 'xxx', password: 'xxx' }) //等待mqtt连接成功后发起订阅 this.mqtt.waitConnected().then(() => { console.log('MQTT已连接,开始订阅') this.mqtt.subscribe(`errand/${this.baseUserInfo.id}`, (data, topic) => { console.log('收到消息:', data) })