1 Star 0 Fork 0

李浩德/mqtt-benchmark

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
client.go 3.41 KB
一键复制 编辑 原始数据 按行查看 历史
Alexandr Krylovskiy 提交于 2015-06-07 21:25 . Initial commit
package main
import (
"fmt"
"log"
"time"
)
import (
mqtt "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
"github.com/GaryBoone/GoStats/stats"
)
type Client struct {
ID int
BrokerURL string
BrokerUser string
BrokerPass string
MsgTopic string
MsgSize int
MsgCount int
MsgQoS byte
}
func (c *Client) Run(res chan *RunResults) {
newMsgs := make(chan *Message)
pubMsgs := make(chan *Message)
doneGen := make(chan bool)
donePub := make(chan bool)
runResults := new(RunResults)
started := time.Now()
// start generator
go c.genMessages(newMsgs, doneGen)
// start publisher
go c.pubMessages(newMsgs, pubMsgs, doneGen, donePub)
runResults.ID = c.ID
times := []float64{}
for {
select {
case m := <-pubMsgs:
if m.Error {
log.Printf("CLIENT %v ERROR publishing message: %v: at %v\n", c.ID, m.Topic, m.Sent.Unix())
runResults.Failures++
} else {
// log.Printf("Message published: %v: sent: %v delivered: %v flight time: %v\n", m.Topic, m.Sent, m.Delivered, m.Delivered.Sub(m.Sent))
runResults.Successes++
times = append(times, m.Delivered.Sub(m.Sent).Seconds()*1000) // in milliseconds
}
case <-donePub:
// calculate results
duration := time.Now().Sub(started)
runResults.MsgTimeMin = stats.StatsMin(times)
runResults.MsgTimeMax = stats.StatsMax(times)
runResults.MsgTimeMean = stats.StatsMean(times)
runResults.MsgTimeStd = stats.StatsSampleStandardDeviation(times)
runResults.RunTime = duration.Seconds()
runResults.MsgsPerSec = float64(runResults.Successes) / duration.Seconds()
// report results and exit
res <- runResults
return
}
}
}
func (c *Client) genMessages(ch chan *Message, done chan bool) {
for i := 0; i < c.MsgCount; i++ {
ch <- &Message{
Topic: c.MsgTopic,
QoS: c.MsgQoS,
Payload: make([]byte, c.MsgSize),
}
}
done <- true
// log.Printf("CLIENT %v is done generating messages\n", c.ID)
return
}
func (c *Client) pubMessages(in, out chan *Message, doneGen, donePub chan bool) {
onConnected := func(client *mqtt.Client) {
log.Printf("CLIENT %v is connected to the broker %v\n", c.ID, c.BrokerURL)
ctr := 0
for {
select {
case m := <-in:
m.Sent = time.Now()
token := client.Publish(m.Topic, m.QoS, false, m.Payload)
token.Wait()
if token.Error() != nil {
log.Printf("CLIENT %v Error sending message: %v\n", c.ID, token.Error())
m.Error = true
} else {
m.Delivered = time.Now()
m.Error = false
}
out <- m
if ctr > 0 && ctr%100 == 0 {
log.Printf("CLIENT %v published %v messages and keeps publishing...\n", c.ID, ctr)
}
ctr++
case <-doneGen:
donePub <- true
log.Printf("CLIENT %v is done publishing\n", c.ID)
return
}
}
}
opts := mqtt.NewClientOptions().
AddBroker(c.BrokerURL).
SetClientID(fmt.Sprintf("mqtt-benchmark-%v-%v", time.Now(), c.ID)).
SetCleanSession(true).
SetAutoReconnect(true).
SetOnConnectHandler(onConnected).
SetConnectionLostHandler(func(client *mqtt.Client, reason error) {
log.Printf("CLIENT %v lost connection to the broker: %v. Will reconnect...\n", c.ID, reason.Error())
})
if c.BrokerUser != "" && c.BrokerPass != "" {
opts.SetUsername(c.BrokerUser)
opts.SetPassword(c.BrokerPass)
}
client := mqtt.NewClient(opts)
token := client.Connect()
token.Wait()
if token.Error() != nil {
log.Printf("CLIENT %v had error connecting to the broker: %v\n", c.ID, token.Error())
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/socratesa/mqtt-benchmark.git
git@gitee.com:socratesa/mqtt-benchmark.git
socratesa
mqtt-benchmark
mqtt-benchmark
0.1.0

搜索帮助

0d507c66 1850385 C8b1a773 1850385