代码拉取完成,页面将自动刷新
import io.vertx.mqtt.MqttServer
import io.vertx.core.buffer.Buffer
import io.netty.handler.codec.mqtt.MqttQoS
def mqttServer = MqttServer.create(vertx)
mqttServer.endpointHandler({ endpoint ->
// shows main connect info
println("MQTT client [${endpoint.clientIdentifier()}] request to connect, clean session = ${endpoint.isCleanSession()}")
if (endpoint.auth() != null) {
println("[username = ${endpoint.auth().username}, password = ${endpoint.auth().password}]")
}
if (endpoint.will() != null) {
println("[will flag = ${endpoint.will().isWillFlag} topic = ${endpoint.will().willTopic} msg = ${endpoint.will().willMessage} QoS = ${endpoint.will().willQos} isRetain = ${endpoint.will().isWillRetain}]")
}
println("[keep alive timeout = ${endpoint.keepAliveTimeSeconds()}]")
// accept connection from the remote client
endpoint.accept(false)
// handling requests for subscriptions
endpoint.subscribeHandler({ subscribe ->
def grantedQosLevels = []
subscribe.topicSubscriptions().each { s ->
println("Subscription for ${s.topicName()} with QoS ${s.qualityOfService()}")
grantedQosLevels.add(s.qualityOfService())
}
// ack the subscriptions request
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels)
// just as example, publish a message on the first topic with requested QoS
endpoint.publish(subscribe.topicSubscriptions()[0].topicName(), Buffer.buffer("Hello from the Vert.x MQTT server"), subscribe.topicSubscriptions()[0].qualityOfService(), false, false)
// specifing handlers for handling QoS 1 and 2
endpoint.publishAcknowledgeHandler({ messageId ->
println("Received ack for message = ${messageId}")
}).publishReceivedHandler({ messageId ->
endpoint.publishRelease(messageId)
}).publishCompletionHandler({ messageId ->
println("Received ack for message = ${messageId}")
})
})
// handling requests for unsubscriptions
endpoint.unsubscribeHandler({ unsubscribe ->
unsubscribe.topics().each { t ->
println("Unsubscription for ${t}")
}
// ack the subscriptions request
endpoint.unsubscribeAcknowledge(unsubscribe.messageId())
})
// handling ping from client
endpoint.pingHandler({ v ->
println("Ping received from client")
})
// handling disconnect message
endpoint.disconnectHandler({ v ->
println("Received disconnect from client")
})
// handling closing connection
endpoint.closeHandler({ v ->
println("Connection closed")
})
// handling incoming published messages
endpoint.publishHandler({ message ->
println("Just received message on [${message.topicName()}] payload [${message.payload()}] with QoS [${message.qosLevel()}]")
if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
endpoint.publishAcknowledge(message.messageId())
} else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
endpoint.publishReceived(message.messageId())
}
}).publishReleaseHandler({ messageId ->
endpoint.publishComplete(messageId)
})
}).listen(1883, "0.0.0.0", { ar ->
if (ar.succeeded()) {
println("MQTT server is listening on port ${mqttServer.actualPort()}")
} else {
System.err.println("Error on starting the server${ar.cause().getMessage()}")
}
})
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。