1 Star 0 Fork 0

育子 / nuciotsdk

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
nuciotclient.go 21.80 KB
一键复制 编辑 原始数据 按行查看 历史
育子 提交于 2021-12-09 09:32 . update logger
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
// Copyright 2021 Hugo SHEN
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
m "gitee.com/magicyu90/nuciotsdk/models"
u "gitee.com/magicyu90/nuciotsdk/utils"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
s3 "github.com/aws/aws-sdk-go/service/s3"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
log "github.com/jeanphorn/log4go"
)
// 定义常量
const (
// 保活时间
KEEPALIVE = 10
// S3 Endpoint
S3ENDPOINT = "https://ossnucloud.nuctech.com"
// 证书内容
CERT = `-----BEGIN CERTIFICATE-----
MIIDzzCCAregAwIBAgIJAPIayH3HbL2YMA0GCSqGSIb3DQEBCwUAMH0xCzAJBgNV
BAYTAkNOMRAwDgYDVQQIDAdCZWlqaW5nMRAwDgYDVQQHDAdCZWlqaW5nMQwwCgYD
VQQKDANOdWMxCzAJBgNVBAsMAkFJMRQwEgYDVQQDDAtudWN0ZWNoLmNvbTEZMBcG
CSqGSIb3DQEJARYKMTIzQHFxLmNvbTAgFw0yMDA3MDExMTA5MjdaGA8yMTIwMDYw
NzExMDkyN1owfTELMAkGA1UEBhMCQ04xEDAOBgNVBAgMB0JlaWppbmcxEDAOBgNV
BAcMB0JlaWppbmcxDDAKBgNVBAoMA051YzELMAkGA1UECwwCQUkxFDASBgNVBAMM
C251Y3RlY2guY29tMRkwFwYJKoZIhvcNAQkBFgoxMjNAcXEuY29tMIIBIjANBgkq
hkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA7xm4aVG4JXskOBHoRIc9EaXXC5274Ozi
u45VMAW01sp4cXwohBJ3xZwBqi5cTQiU1DArWdJ+6nx6C6Y0snn+SgkdEpkCQBQm
bHCj3B6FOxjEr7KLHsOSdaV6V6zA7C/ObuaxByF6At2MhMEuyOQFToZ28KMB/1SH
766b3JQ6uV2CPEa73ZihbpZ2ZV4iyrvZwO5VOJBB5pUWpMseZrIeGJVRgoZOKgsr
R50TQxQzy7FPAKxz99KnUQQYLmUDXAsUohjQhWzI7uNdeVfpHmiW7vgIQ55tePlX
ilmqlJFv9WFEQ1fJNrU6giZgkdDrD9c6raq3ks6WMXbTFbizdqlPXwIDAQABo1Aw
TjAdBgNVHQ4EFgQU/dvsbbtioeVTcAGUQ4WN74XHk20wHwYDVR0jBBgwFoAU/dvs
bbtioeVTcAGUQ4WN74XHk20wDAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC
AQEA2YySNsnQtMc7blO8+4D2A1K2RulWR76yAe+xG4aT9KJrpn9xCELLpJ5RtMOY
DbEDLgFHCi2om7qbLqK/ph+NXn3ZCiQbLjnoB3//wqmVAN7nFKA3NTWQtKGkgLgP
/kTY6N7CJYLYIJaExCIdFyiajHo6PeV4bRF71H9jc0mqD8CnQVRGLpqbVHJpC/pZ
Ggj2uOfrvesXWAkMUJnLF1mXidpThdYJGvZmhPvVPdf+dJxUHeld+xFbrH9E+7lA
CxUr4PUG9MRwcP541k9eKVNSYj9wjBJ+sr6wDQHHE1s+SPDI2vjOeiDBGBEcaAZN
JypqqGZOBsyHIz7Db7SiqfA/tw==
-----END CERTIFICATE-----`
)
// 定义变量
var (
// mqtt客户端
mqttClient mqtt.Client
// 锁
lock *sync.Mutex
// 信道
ch chan int
// iot事件
events map[string][]func(args ...interface{})
// 上行主题
upTopic string
// 下行主题
downTopic string
// 重连次数
retryTimes int = 1
// 注册回复码
regCode int = 9999
// 会话Id
sessionId string
)
// NucIotClient IoT客户端接口
type NucIotClient interface {
// 配置初始化
Configure()
// 设备注册
SendRegistration() int
// 设备运行信息
SendDeviceData(dataItems []m.DataItem)
// 设备状态
SendDeviceStat(status int)
// 文件信息
SendFileInfo(fileName, filePath string, fileSize int, fileType m.EnumFileType, userData string)
// 上传文件到S3
UploadFile2S3(filePath string, fileType m.EnumFileType)
// 注册事件
RegisterEvent(name m.EnumEvent, callback func(args ...interface{}))
// 上报属性
SendProperty(body interface{})
// 上报事件
SendEvent(body interface{}, identifier string)
}
// NucIotClient IoT客户端结构体
type nucIotClient struct {
// 设备信息
deviceReg m.DeviceRegistrationReq
// 存储信息
deviceStg m.DeviceStorage
// mqtt信息
mqttInfo m.MqttBrokerInfo
// s3客户端
s3Client s3.S3
}
// S3客户端
type S3Client struct {
Client *s3.S3
}
// @title NewIotClient
// @description 初始化nucIotClient
// @param DeviceReg m.DeviceRegistrationReq 设备注册信息
// @param DeviceStg m.DeviceStorage 存储相关信息
// @param MqttInfo m.MqttBrokerInfo Mqtt相关信息
// @return NucIotClient NucIotClient实例
func NewIotClient(DeviceReg m.DeviceRegistrationReq, DeviceStg m.DeviceStorage, MqttInfo m.MqttBrokerInfo) *nucIotClient {
// 实例化nucIotClient
n := &nucIotClient{}
// 设备信息
n.deviceReg = DeviceReg
// 存储信息
n.deviceStg = DeviceStg
// mqtt信息
n.mqttInfo = MqttInfo
// 上行主题
upTopic = fmt.Sprintf("base/nuctechiot/%s/%s/up", DeviceReg.ProductKey, DeviceReg.DeviceId)
// 下行主题
downTopic = fmt.Sprintf("base/nuctechiot/%s/%s/down", DeviceReg.ProductKey, DeviceReg.DeviceId)
// 创建事件
events = make(map[string][]func(args ...interface{}))
// 创建消息信道,容量是1
ch = make(chan int, 1)
// 创建锁
lock = &sync.Mutex{}
return n
}
// @title Configure
// @description 配置初始化
func (n *nucIotClient) Configure() {
// 配置日志
logFile := fmt.Sprintf("./nuciot/%s_%s.log", n.deviceReg.ModelId, n.deviceReg.DeviceId)
log.AddFilter("stdout", log.DEBUG, log.NewConsoleLogWriter()) //输出到控制台,级别为DEBUG
log.AddFilter("file", log.DEBUG, log.NewFileLogWriter(logFile, false))
//log.LoadConfiguration("config/iotlog.json")
// 配置s3客户端
n.newS3Client()
// 配置mqtt客户端
n.buildMqttClient()
// 连接mqtt
connect2Mqtt()
}
// @title SendRegistration
// @description 设备注册信息
// @return int 注册结果码
func (n *nucIotClient) SendRegistration() int {
// 判断mqtt是否连接
if mqttClient.IsConnectionOpen() {
// 如果当前注册码是0,则返回
if regCode == 0 {
return regCode
}
// 会话Id
sessionId = uuid.New().String()
n.deviceReg.SessionId = sessionId
// 时间戳
n.deviceReg.TimeStamp = u.GetCurrentTimestamp()
// 注册内容
content, logStr := n.setupContent(m.RegistrationCMD, n.deviceReg)
n.mqttMsgPublish(content, m.Qos1, upTopic, logStr)
for {
select {
// 消息信道有数据
case regCode = <-ch:
if regCode == 0 {
log.LOGGER("IoT").Info("registration success")
// 开启内部心跳...
n.startInternalHeartbeat()
} else {
log.LOGGER("IoT").Error("registration failed,code:" + strconv.Itoa(regCode))
}
return regCode
// 消息信道等待10s则超时
case <-time.After(time.Duration(KEEPALIVE) * time.Second):
log.LOGGER("IoT").Error("registration timeout")
return int(m.RegistrationError)
}
}
} else {
log.LOGGER("IoT").Error("mqtt unconected in registration")
return int(m.UnConnected)
}
}
// @title SendDeviceData
// @description 发送设备运行信息
// @param deviceId string 设备编号
// @param dataItems []m.DataItem 运行内容
func (n *nucIotClient) SendDeviceData(dataItems []m.DataItem) {
// 时间戳
clock := u.GetCurrentTimestamp()
// 实例化运行信息
deviceData := &m.DeviceData{
DeviceId: n.deviceReg.DeviceId, // 设备编号
DataItems: dataItems, // 运行信息项
IotBase: m.IotBase{ // IoTbase基类
TimeStamp: clock, // 时间戳
SessionId: uuid.New().String(), // 会话Id
},
}
deviceDataContent, logStr := n.setupContent(m.DeviceDataCMD, deviceData)
n.mqttMsgPublish(deviceDataContent, m.Qos0, upTopic, logStr)
}
// @title SendDeviceStat
// @description 发送设备状态
// @param deviceId string 设备编号
// @param status int 运行状态
func (n *nucIotClient) SendDeviceStat(deviceId string, status int) {
// 时间戳
clock := u.GetCurrentTimestamp()
// 实例化运行状态
deviceStat := &m.DeviceStat{
DeviceId: deviceId, // 设备编号
Stat: status, // 运行信息项
IotBase: m.IotBase{ // IoTbase基类
TimeStamp: clock, // 时间戳
SessionId: uuid.New().String(), // 会话Id
},
}
deviceStatContent, logStr := n.setupContent(m.DeviceStatCMD, deviceStat)
n.mqttMsgPublish(deviceStatContent, m.Qos0, upTopic, logStr)
}
// @title SendFileInfo
// @description 发送文件信息
// @param deviceId string 设备编号
// @param fileName string 文件名称
// @param filePath string 文件路径
// @param fileSize int 文件大小
// @param fileType EnumFileType 文件类型
// @param userData string 扩展字段
func (n *nucIotClient) SendFileInfo(fileName, filePath string, fileSize int, fileType m.EnumFileType, userData string) {
fileInfo := &m.FileInfo{
DeviceId: n.deviceReg.DeviceId, // 设备编号
FileName: fileName, // 文件名称
FilePath: filePath, // s3地址
FileSize: fileSize, // 文件大小
FileType: fileType, // 文件类型(img\log\video\other)
UserData: userData, // 扩展字段
IotBase: m.IotBase{
TimeStamp: u.GetCurrentTimestamp(),
SessionId: uuid.New().String()},
}
fileInfoContent, logStr := n.setupContent(m.DeviceStatCMD, fileInfo)
n.mqttMsgPublish(fileInfoContent, m.Qos0, upTopic, logStr)
}
// @title SendProperty
// @description 发送属性
// @param body interface{} 传输内容
// @param callback func(args ...interface{}) 回调函数
func (n *nucIotClient) SendProperty(body interface{}) {
// 创建物模型属性
thingProperty := m.ThingProperty{
DeviceId: n.deviceReg.DeviceId,
Params: body,
IotBase: m.IotBase{
TimeStamp: u.GetCurrentTimestamp(),
SessionId: uuid.New().String(),
},
Method: "thing.property.post",
}
topic := fmt.Sprintf("sys/nuctechiot/%s/%s/%s/thing/property/up", n.deviceReg.ProductKey, n.deviceReg.ModelId, n.deviceReg.DeviceId)
propertyContent, logStr := n.setupContent(m.PropertyCMD, thingProperty)
n.mqttMsgPublish(propertyContent, m.Qos1, topic, logStr)
}
// @title SendEvent
// @description 发送事件
// @param identifier string 事件标识符
// @param body interface{} 传输内容
// @param callback func(args ...interface{}) 回调函数
func (n *nucIotClient) SendEvent(identifier string, body interface{}) {
// 创建物模型事件
thingEvent := m.ThingEvent{
DeviceId: n.deviceReg.DeviceId,
Params: body,
IotBase: m.IotBase{
TimeStamp: u.GetCurrentTimestamp(),
SessionId: uuid.New().String(),
},
Method: fmt.Sprintf("thing.event.%s.post", identifier),
}
topic := fmt.Sprintf("sys/nuctechiot/%s/%s/%s/thing/event/%s/up", n.deviceReg.ProductKey, n.deviceReg.ModelId, n.deviceReg.DeviceId, identifier)
propertyContent, logStr := n.setupContent(m.EventCMD, thingEvent)
n.mqttMsgPublish(propertyContent, m.Qos1, topic, logStr)
}
// @title RegisterEvent
// @description 注册事件,提供事件名和回调函数
// @param name m.EnumEvent 事件名称
// @param callback func(args ...interface{}) 回调函数
func (n *nucIotClient) RegisterEvent(name m.EnumEvent, callback func(args ...interface{})) {
// 通过名字查找事件列表
eventList := events[string(name)]
// 在列表切片中添加函数
eventList = append(eventList, callback)
// 将修改的事件列表切片保存回去
events[string(name)] = eventList
}
// @title buildMqttClient
// @description 构建MQTT客户端
func (n *nucIotClient) buildMqttClient() {
// 验证信息完整性
if n.deviceReg.DeviceId == "" || n.deviceReg.DeveloperId == "" || n.deviceReg.DeviceType == "" || n.deviceReg.ProductKey == "" || n.deviceReg.ModelId == "" {
log.LOGGER("IoT").Error("DeviceReg is nil")
}
brokerUrl := ""
// 创建mqtt配置项
opts := mqtt.NewClientOptions()
// 设置tls配置
if n.mqttInfo.IsCert {
certpool := x509.NewCertPool()
// 读取加密证书
ca, err := ioutil.ReadAll(strings.NewReader(CERT))
if err != nil {
log.LOGGER("IoT").Error("read cert error:" + err.Error())
}
certpool.AppendCertsFromPEM(ca)
// tls配置
tlsConfig := &tls.Config{
RootCAs: certpool,
}
opts.SetTLSConfig(tlsConfig)
brokerUrl = fmt.Sprintf("tls://%s:%d", n.mqttInfo.BrokerUrl, n.mqttInfo.Port)
} else {
brokerUrl = fmt.Sprintf("%s:%d", n.mqttInfo.BrokerUrl, n.mqttInfo.Port)
}
var clientId string
if n.deviceReg.GlobalId != "" {
clientId = n.deviceReg.GlobalId
}
clientId = fmt.Sprintf("nuc_iot_client_%d", time.Now().UnixNano()/1e6)
log.LOGGER("IoT").Info("mqtt broker:" + brokerUrl + ",clientID:" + clientId + ",username:" + n.mqttInfo.MqttUsername + ",passwd:" + n.mqttInfo.MqttPass)
// 设置broker
opts.AddBroker(brokerUrl)
// 设置保活时间
opts.SetKeepAlive(time.Duration(KEEPALIVE*2) * time.Second)
// 设置mqtt client id
opts.SetClientID(clientId)
// 设置mqtt 用户名
opts.SetUsername(n.mqttInfo.MqttUsername)
// 设置mqtt 密码
opts.SetPassword(n.mqttInfo.MqttPass)
// 订阅回调
opts.SetDefaultPublishHandler(messagePubHandler)
// 遗嘱内容
willBytes, _ := n.setupContent(m.OfflineCMD, nil)
// 设置遗嘱
opts.SetWill(upTopic, string(willBytes), byte(m.Qos1), false)
// 连接回调
opts.OnConnect = connectHandler
// 断开连接回调
opts.OnConnectionLost = connectLostHandler
// 实例化mqttclient
mqttClient = mqtt.NewClient(opts)
}
// @title connect2Mqtt
// @description mqtt连接
func connect2Mqtt() error {
// 连接
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
log.LOGGER("IoT").Error(fmt.Sprintf("mqtt connection error:" + token.Error().Error()))
return token.Error()
}
return nil
}
// @title reConnect2Mqtt
// @description mqtt重连
func reConnect2Mqtt() {
// 若已经连接,则返回
if mqttClient.IsConnectionOpen() {
return
}
go func() {
for {
// 判断mqtt客户端是否为空||判断mqtt是否已经连接
if mqttClient.IsConnectionOpen() {
break
}
log.LOGGER("IoT").Warn("mqtt reconnect " + strconv.Itoa(retryTimes) + " times")
retryTimes++
connect2Mqtt()
// 如果还没连接不符合结束条件则睡3秒
if !mqttClient.IsConnectionOpen() {
time.Sleep(time.Duration(3) * time.Second)
}
}
}()
}
// mqtt消息接收处理器
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, message mqtt.Message) {
// 消息体
msg := message.Payload()
// 命令字节
commandBytes := msg[:4]
// 命令值 (网络字节序-大端)
commandValue := binary.BigEndian.Uint32(commandBytes)
// 密文
encryptBytes := msg[4:]
// Aes解密
origBody, _ := u.AesDecrypt(encryptBytes)
// 注册回复
if commandValue == uint32(m.RegistrationReplyCMD) {
var deviceRegistrationReply m.DeviceRegistrationReply
log.LOGGER("IoT").Info("[RegistrationReply],content:" + string(origBody))
err := json.Unmarshal(origBody, &deviceRegistrationReply)
if err != nil {
log.LOGGER("IoT").Error("Deserilization error:" + err.Error())
// 调用事件
callEvent(m.ErrorEvent, &m.IotErrorInfo{
Code: int(m.DeserializeError),
Message: err.Error(),
})
}
// 判断是否同一会话
if deviceRegistrationReply.IotBase.SessionId == sessionId {
// 判断信道是否可用
if len(ch) == 0 && regCode != 0 {
// 注册结果推送到信道
ch <- deviceRegistrationReply.Code
}
// 注册结果不成功,触发错误事件
if deviceRegistrationReply.Code != 0 {
// 调用事件
callEvent(m.ErrorEvent, &m.IotErrorInfo{
Code: deviceRegistrationReply.Code,
Message: deviceRegistrationReply.Message,
})
}
}
} else if commandValue == uint32(m.UploadCMD) {
var uploadReq m.UploadReq
log.LOGGER("IoT").Error("[UploadRequest],content:" + string(origBody))
err := json.Unmarshal(origBody, &uploadReq)
if err != nil {
log.LOGGER("IoT").Error("Deserilization error:" + err.Error())
// 调用事件
callEvent(m.ErrorEvent, &m.IotErrorInfo{
Code: int(m.DeserializeError),
Message: err.Error(),
})
}
// 调用事件
callEvent(m.UploadRequestEvent, uploadReq)
// 解析错误
} else if commandValue == uint32(m.DeserializeErrorCMD) {
log.LOGGER("IoT").Error("Cloud deserialize error")
// 调用事件
callEvent(m.ErrorEvent, &m.IotErrorInfo{
Code: int(m.DeserializeError),
Message: "Cloud deserialize error",
})
}
}
// mqtt连接处理器
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
// 重置重连次数
retryTimes = 1
log.LOGGER("IoT").Info("mqtt connected")
// 订阅下行主题
token := client.Subscribe(downTopic, byte(m.Qos0), nil)
if token.Wait() {
log.LOGGER("IoT").Info("mqtt subscribed to topic:" + downTopic)
}
}
// mqtt丢失连接处理器
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.LOGGER("IoT").Warn("mqtt connection lost:" + err.Error())
// 开启重连
reConnect2Mqtt()
}
// @title callEvent
// @description 调用事件
// @param name m.EnumEvent 事件名称
// @param args ...interface{} 回调函数
func callEvent(name m.EnumEvent, args ...interface{}) {
// 验证事件是否存在
if _, ok := events[string(name)]; ok {
// 通过名字找到事件列表
eventList := events[string(name)]
// 遍历这个事件的所有回调
for _, callback := range eventList {
// 传入参数调用回调
callback(args...)
}
}
}
// @title startInternalHeartbeat
// @description 开启内部心跳
func (n *nucIotClient) startInternalHeartbeat() {
go func() {
for {
content, logStr := n.setupContent(m.InternalHeartbeatCMD, nil)
n.mqttMsgPublish(content, m.Qos0, upTopic, logStr)
time.Sleep(time.Duration(KEEPALIVE) * time.Second)
}
}()
}
// @title mqttMsgPublish
// @description 发送消息至mqtt
// @param content []byte 消息内容
// @param qos int 消息qos级别
func (n *nucIotClient) mqttMsgPublish(content []byte, qos m.EnumMqttQos, topic string, logStr string) {
// 判断mqtt客户端是否连接
if mqttClient.IsConnectionOpen() {
token := mqttClient.Publish(topic, byte(qos), false, content)
// 是否成功发出
hasSend := token.Wait()
if hasSend {
log.LOGGER("IoT").Info("message published,topic:" + upTopic + "," + logStr)
}
}
}
// @title setupContent
// @description 建立包体内容
// @param cmd m.EnumCommand 命令类型
// @param body interface{} 结构体
// @return byte[] 字节数组
// @return string 日志内容
func (n *nucIotClient) setupContent(cmd m.EnumCommand, body interface{}) ([]byte, string) {
// 包体由版命令+版本组成
// 版本字节数组
versionBytes := []byte{1, 2, 1}
// 命令字节数组
cmdBytes := u.Int2Bytes(int(cmd))
var headerBytes = []byte{}
if cmd != m.OfflineCMD {
headerBytes = u.BytesCombine(cmdBytes, versionBytes)
} else {
headerBytes = cmdBytes
}
bodyJsonBytes, err := json.Marshal(body)
if err != nil {
log.LOGGER("IoT").Warn("setup content json serialize error:" + err.Error())
}
// 日志内容
logStr := fmt.Sprintf("cmd:%d,content:%s", uint32(cmd), string(bodyJsonBytes))
// 加密body
encryptBytes, err := u.AesEncrypt(bodyJsonBytes)
if err != nil {
log.LOGGER("IoT").Warn("setup content aes error:" + err.Error())
}
// 内容拼接
contentBytes := u.BytesCombine(headerBytes, encryptBytes)
return contentBytes, logStr
}
// @title NewS3Client
// @description 实例化S3客户端
func (n *nucIotClient) newS3Client() {
if n.deviceStg.S3AccessId != "" && n.deviceStg.S3Bucket != "" && n.deviceStg.S3AccessSecret != "" {
sess, _ := session.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentials(n.deviceStg.S3AccessId, n.deviceStg.S3AccessSecret, ""),
Endpoint: aws.String(S3ENDPOINT),
DisableSSL: aws.Bool(true),
Region: aws.String("us-east-1"),
S3ForcePathStyle: aws.Bool(true),
})
n.s3Client = *s3.New(sess)
}
}
// @title UploadFile2S3
// @description 上传文件到S3
func (n *nucIotClient) UploadFile2S3(filePath string, fileType m.EnumFileType, userData string) {
if n.deviceStg.S3AccessId != "" && n.deviceStg.S3Bucket != "" && n.deviceStg.S3AccessSecret != "" {
// 文件全名
fileNameAll := path.Base(filePath)
//fileSuffix := path.Ext(filePath)
// 文件名称
//fileName := strings.TrimSuffix(fileNameAll, fileSuffix)
now := time.Now()
// 文件keyName
keyName := fmt.Sprintf("%s/%d/%d/%d/%s", n.deviceReg.DeviceId, now.Year(), now.Month(), now.Day(), fileNameAll)
// 读取文件
f, err := os.Open(filePath)
if err != nil {
log.LOGGER("IoT").Error(fmt.Sprintf("s3 error,failed to open file %q, %v", filePath, err))
// 通知错误回调
callEvent(m.ErrorEvent, &m.IotErrorInfo{
Code: int(m.S3Error),
Message: fmt.Sprintf("s3 error,failed to open file %q, %v", filePath, err),
})
return
}
defer f.Close()
fi, _ := f.Stat()
// 文件大小
var fileSize int64 = fi.Size()
// 文件buffer
fileBuffer := make([]byte, fileSize)
f.Read(fileBuffer)
// 获取contentType
contentType := http.DetectContentType(fileBuffer)
log.LOGGER("IoT").Info(fmt.Sprintf("upload file,key:%s,fileName:%s,content-type:%s", keyName, fileNameAll, contentType))
prefix := ""
if fileType == m.EnumFileType(m.Log) {
prefix = "log/"
} else if fileType == m.EnumFileType(m.Img) {
prefix = "img/"
} else if fileType == m.EnumFileType(m.Video) {
prefix = "video/"
} else {
prefix = "other/"
}
// 设置keyName
keyName = fmt.Sprintf(n.deviceReg.ProductKey + "/" + prefix + keyName)
// 上传文件
_, err = n.s3Client.PutObject(&s3.PutObjectInput{
Body: bytes.NewReader(fileBuffer),
Bucket: aws.String(n.deviceStg.S3Bucket),
Key: aws.String(keyName),
ACL: aws.String("public-read"),
ContentType: aws.String(contentType),
ContentLength: aws.Int64(fileSize),
Metadata: map[string]*string{
"Key": aws.String(fileNameAll),
},
})
if err != nil {
// 通知错误回调
callEvent(m.ErrorEvent, &m.IotErrorInfo{
Code: int(m.S3Error),
Message: err.Error(),
})
}
// 回调信息
s3FileInfo := &m.IotFileInfo{
DeviceId: n.deviceReg.DeviceId,
FileName: fileNameAll,
FileSize: int(fileSize),
Url: keyName,
FileType: fileType,
UserData: userData,
}
// 通知上传完成回调
callEvent(m.UploadCompleteEvent, err == nil, s3FileInfo)
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/magicyu90/nuciotsdk.git
git@gitee.com:magicyu90/nuciotsdk.git
magicyu90
nuciotsdk
nuciotsdk
v1.1.2

搜索帮助

344bd9b3 5694891 D2dac590 5694891