Messaging client library for use by Go implementation of EdgeX micro services. This project contains the abstract Message Bus interface and an implementation for MQTT and NATS. These interface functions connect, publish, subscribe and disconnect to/from the Message Bus. For more information see the MessageBus documentation.
GO111MODULE=on
is setgo get github.com/edgexfoundry/go-mod-messaging/v4
This library is used by Go programs for interacting with the Message Bus (i.e. MQTT).
The Message Bus connection information as well as which implementation to use is stored in the service's toml configuration as:
MessageBus:
Protocol: mqtt
Host: localhost
Port: 1883
Type: mqtt
Individual client abstractions allow additional configuration properties which can be provided via configuration file:
MessageBus:
Protocol: tcp
Host: localhost
Port: 1883
Type: mqtt
Topic: events
Optional:
ClientId: MyClient
Username: MyUsername
...
Or programmatically in the Optional field of the MessageBusConfig struct. For example,
types.MessageBusConfig{
Broker: types.HostInfo{Host: "example.com", Port: 9090, Protocol: "tcp"},
Optional: map[string]string{
"ClientId": "MyClientID",
"Username": "MyUser",
"Password": "MyPassword",
...
}}
NOTE
For complete details on configuration options see the MessageBus documentation
The following code snippets demonstrate how a service uses this messaging module to create a connection, send messages, and receive messages.
This code snippet shows how to connect to the abstract message bus.
var messageBus messaging.MessageClient
var err error
messageBus, err = msgFactory.NewMessageClient(types.MessageBusConfig{
Broker: types.HostInfo{
Host: Configuration.MessageBus.Host,
Port: Configuration.MessageBus.Port,
Protocol: Configuration.MessageBus.Protocol,
},
Type: Configuration.MessageBus.Type,})
if err != nil {
LoggingClient.Error("failed to create messaging client: " + err.Error())
}
err = messsageBus.Connect()
if err != nil {
LoggingClient.Error("failed to connect to message bus: " + err.Error())
}
This code snippet shows how to publish a message to the abstract message bus.
...
payload, err := json.Marshal(evt)
...
msgEnvelope := types.MessageEnvelope{
CorrelationID: evt.CorrelationId,
Payload: payload,
ContentType: clients.ContentJson,
}
err = messageBus.Publish(msgEnvelope, Configuration.MessageBus.Topic)
This code snippet shows how to subscribe to the abstract message bus.
messageBus, err := factory.NewMessageClient(types.MessageBusConfig{
Broker: types.HostInfo{
Host: Configuration.MessageBus.Host,
Port: Configuration.MessageBus.Port,
Protocol: Configuration.MessageBus.Protocol,
},
Type: Configuration.MessageBus.Type,
})
if err != nil {
LoggingClient.Error("failed to create messaging client: " + err.Error())
return
}
if err := messageBus.Connect(); err != nil {
LoggingClient.Error("failed to connect to message bus: " + err.Error())
return
}
topics := []types.TopicChannel{
{
Topic: Configuration.MessageBus.Topic,
Messages: messages,
},
}
err = messageBus.Subscribe(topics, messageErrors)
if err != nil {
LoggingClient.Error("failed to subscribe for event messages: " + err.Error())
return
}
This code snippet shows how to receive data on the message channel after you have subscribed to the bus.
...
for {
select {
case e := <-errors:
// handle errors
...
case msgEnvelope := <-messages:
LoggingClient.Info(fmt.Sprintf("Event received on message queue. Topic: %s, Correlation-id: %s ", Configuration.MessageBus.Topic, msgEnvelope.CorrelationID))
if msgEnvelope.ContentType != clients.ContentJson {
LoggingClient.Error(fmt.Sprintf("Incorrect content type for event message. Received: %s, Expected: %s", msgEnvelope.ContentType, clients.ContentJson))
continue
}
str := string(msgEnvelope.Payload)
event := parseEvent(str)
if event == nil {
continue
}
}
...
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。