1 Star 0 Fork 1

jackxiao / mynewt-newtmgr

forked from mirrors_apache / mynewt-newtmgr 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
mtech_lora_xport.go 8.87 KB
一键复制 编辑 原始数据 按行查看 历史
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package mtech_lora
import (
"bytes"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"fmt"
"net"
"strings"
"sync"
log "github.com/sirupsen/logrus"
"github.com/runtimeco/go-coap"
"github.com/ugorji/go/codec"
"mynewt.apache.org/newtmgr/nmxact/lora"
"mynewt.apache.org/newtmgr/nmxact/nmxutil"
"mynewt.apache.org/newtmgr/nmxact/sesn"
)
type LoraConfig struct {
Addr string
SegSz int
ConfirmedTx bool
Port uint8
}
type LoraJoinedCb func(dev LoraConfig)
type LoraXport struct {
sync.Mutex
cfg *LoraXportCfg
started bool
rxConn *net.UDPConn
txConn *net.UDPConn
msgMap *ListenerMap
reassMap *ListenerMap
tgtMap *ListenerSlice
exitChan chan int
joinCb LoraJoinedCb
}
type LoraXportCfg struct {
AppPortDown int // UDP TX port
AppPortUp int // UDP RX port
}
type LoraData struct {
Data string `codec:"data"`
Port uint8 `codec:"port"`
}
type LoraPacketSent struct {
DataRate string `codec:"datr"`
}
/*
* This maps datarate string into a max mtu we can use
*/
var LoraDataRateMapUS = map[string]int{
"SF12BW500": 33,
"SF11BW500": 109,
"SF10BW500": 222,
"SF9BW500": 222,
"SF8BW500": 222,
"SF7BW500": 222,
}
const MAX_PACKET_SIZE_IN = 2048
const MAX_PACKET_SIZE_OUT = 256
const UDP_RX_PORT = 1784
const UDP_TX_PORT = 1786
func NewXportCfg() *LoraXportCfg {
return &LoraXportCfg{
AppPortDown: UDP_TX_PORT,
AppPortUp: UDP_RX_PORT,
}
}
func NewLoraXport(cfg *LoraXportCfg) *LoraXport {
log.SetLevel(log.DebugLevel)
return &LoraXport{
cfg: cfg,
msgMap: NewListenerMap(),
reassMap: NewListenerMap(),
tgtMap: NewListenerSlice(),
}
}
func (lx *LoraXport) minMtu() int {
return 33
}
func NormalizeAddr(addr string) (string, error) {
a := strings.Replace(addr, ":", "", -1)
a = strings.Replace(addr, "-", "", -1)
// XXX check that there's 8 components, 2 chars each, which are in [0-9,a-f]
if len(a) != 16 {
return "", fmt.Errorf("Invalid addr")
}
return a, nil
}
func DenormalizeAddr(addr string) string {
if len(addr) != 16 {
return "<invalid>"
}
rc := ""
for i := 0; i < 16; i += 2 {
rc += addr[i : i+2]
if 16-i > 2 {
rc += "-"
}
}
return rc
}
func (lx *LoraXport) BuildSesn(cfg sesn.SesnCfg) (sesn.Sesn, error) {
if cfg.Lora.Addr == "" && cfg.MgmtProto != sesn.MGMT_PROTO_COAP_SERVER {
return nil, fmt.Errorf("Need an address of endpoint")
}
return NewLoraSesn(cfg, lx)
}
func (lx *LoraXport) acceptServerSesn(sl *Listener, dev string, port uint8) (*LoraSesn, error) {
sc := sesn.NewSesnCfg()
sc.MgmtProto = sesn.MGMT_PROTO_COAP_SERVER
sc.Lora.Addr = dev
sc.Lora.Port = port
ls, err := NewLoraSesn(sc, lx)
if err != nil {
return nil, fmt.Errorf("NewSesn():%v", err)
}
err = ls.Open()
if err != nil {
return nil, fmt.Errorf("Open():%v", err)
}
sl.ConnChan <- ls
return ls, nil
}
func (lx *LoraXport) reass(dev string, port uint8, data []byte) {
lx.Lock()
defer lx.Unlock()
var err error
_, l := lx.reassMap.FindListener(dev, port, "reass")
if l == nil {
_, l = lx.msgMap.FindListener("", port, "req")
if l == nil {
log.Debugf("No LoRa listener for %s", dev)
return
}
lx.Unlock()
s, err := lx.acceptServerSesn(l, dev, port)
lx.Lock()
if err != nil {
log.Errorf("Cannot create server sesn: %v", err)
return
}
l = s.reassListener
}
str := hex.Dump(data)
log.Debugf("Raw data: %s", str)
bufR := bytes.NewReader(data)
var frag lora.CoapLoraFrag
var sFrag lora.CoapLoraFragStart
err = binary.Read(bufR, binary.LittleEndian, &frag)
if err != nil {
log.Debugf("Can't read header")
return
}
fragNum := frag.FragNum &^ lora.COAP_LORA_LAST_FRAG
if l.Data.Len() == 0 {
if fragNum != 0 {
log.Debugf("frag num != 0 with empty queue")
return
}
bufR.Seek(0, 0)
err = binary.Read(bufR, binary.LittleEndian, &sFrag)
if err != nil {
log.Debugf("Can't read in start header")
return
}
l.Crc = sFrag.Crc
l.Data.Write(data[3:])
l.NextFrag = 1
} else {
if fragNum != l.NextFrag {
log.Debugf("Frag out of sequence")
l.Data.Reset()
return
}
l.Data.Write(data[1:])
l.NextFrag++
}
if (frag.FragNum & lora.COAP_LORA_LAST_FRAG) != 0 {
data := l.Data.Bytes()
l.Data.Reset()
code := coap.COAPCode(data[1])
if code <= coap.DELETE {
_, l = lx.msgMap.FindListener(dev, port, "req")
if l == nil {
_, l = lx.msgMap.FindListener("", port, "req")
if l == nil {
log.Debugf("No LoRa listener for %s", dev)
return
}
lx.Unlock()
s, err := lx.acceptServerSesn(l, dev, port)
lx.Lock()
if err != nil {
log.Errorf("Cannot create server sesn: %v", err)
return
}
l = s.msgListener
}
} else {
_, l = lx.msgMap.FindListener(dev, port, "rsp")
}
if l != nil {
l.MsgChan <- data
} else {
log.Debugf("No LoRa listener for %s", dev)
}
}
}
func (lx *LoraXport) dataRateSeen(dev string, dataRate string) {
lx.Lock()
defer lx.Unlock()
_, lSlice := lx.tgtMap.FindListener(dev, "rx")
if len(lSlice) == 0 {
return
}
mtu, ok := LoraDataRateMapUS[dataRate]
if !ok {
mtu = lx.minMtu()
}
for _, l := range lSlice {
l.MtuChan <- mtu
}
}
/*
* lora/00-13-50-04-04-50-13-00/up
*/
func (lx *LoraXport) processData(data string) {
if strings.HasPrefix(data, "lora/") == false {
return
}
splitMsg := strings.Fields(data)
if len(splitMsg) == 0 {
return
}
splitHdr := strings.Split(splitMsg[0], "/")
if len(splitHdr) != 3 {
return
}
dev, _ := NormalizeAddr(splitHdr[1])
switch splitHdr[2] {
case "joined":
log.Debugf("loraxport rx: %s", data)
log.Debugf("%s joined", dev)
lx.reportJoin(dev)
case "up":
var msg LoraData
log.Debugf("loraxport rx: %s", data)
pload := []byte(splitMsg[1])
err := codec.NewDecoderBytes(pload, new(codec.JsonHandle)).Decode(&msg)
if err != nil {
log.Debugf("loraxport rx: error decoding json: %v", err)
return
}
dec, err := base64.StdEncoding.DecodeString(msg.Data)
if err != nil {
log.Debugf("loraxport rx: error decoding base64: %v", err)
return
}
lx.reass(dev, msg.Port, dec)
case "packet_sent":
var sent LoraPacketSent
log.Debugf("loraxport rx: %s", data)
pload := []byte(splitMsg[1])
err := codec.NewDecoderBytes(pload, new(codec.JsonHandle)).Decode(&sent)
if err != nil {
log.Debugf("loraxport rx: error decoding json: %v", err)
return
}
lx.dataRateSeen(dev, sent.DataRate)
}
}
func (lx *LoraXport) Start() error {
if lx.started {
return nmxutil.NewXportError("Lora xport started twice")
}
addr, err := net.ResolveUDPAddr("udp",
fmt.Sprintf("127.0.0.1:%d", lx.cfg.AppPortUp))
if err != nil {
return fmt.Errorf("Failure resolving name for UDP session: %s",
err.Error())
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return fmt.Errorf("Failed to open RX to lora-network-server %s", addr)
}
lx.rxConn = conn
addr, err = net.ResolveUDPAddr("udp",
fmt.Sprintf("127.0.0.1:%d", lx.cfg.AppPortDown))
if err != nil {
lx.rxConn.Close()
lx.rxConn = nil
return fmt.Errorf("Failure resolving name for UDP session: %s",
err.Error())
}
conn, err = net.DialUDP("udp", nil, addr)
if err != nil {
lx.rxConn.Close()
lx.rxConn = nil
return fmt.Errorf("Failed to open TX to lora-network-server")
}
lx.txConn = conn
lx.started = true
go func() {
data := make([]byte, MAX_PACKET_SIZE_IN*4/3+512)
for {
nr, _, err := lx.rxConn.ReadFromUDP(data)
if err != nil {
return
}
lx.processData(string(data[0:nr]))
}
}()
return nil
}
func (lx *LoraXport) reportJoin(dev string) {
lx.Lock()
if lx.joinCb != nil {
dev := LoraConfig{
Addr: dev,
}
lx.Unlock()
lx.joinCb(dev)
} else {
lx.Unlock()
}
}
func (lx *LoraXport) SetJoinCb(joinCb LoraJoinedCb) {
lx.Lock()
defer lx.Unlock()
lx.joinCb = joinCb
}
func (lx *LoraXport) Stop() error {
if !lx.started {
return nmxutil.NewXportError("Lora xport stopped twice")
}
lx.rxConn.Close()
lx.rxConn = nil
lx.txConn.Close()
lx.txConn = nil
lx.started = false
lx.joinCb = nil
return nil
}
func (lx *LoraXport) DumpListeners() {
lx.msgMap.Dump()
lx.reassMap.Dump()
lx.tgtMap.Dump()
}
func (lx *LoraXport) Tx(bytes []byte) error {
log.Debugf("loraxport tx: %s", bytes)
_, err := lx.txConn.Write(bytes)
return err
}
1
https://gitee.com/cloud_vr_ar/mynewt-newtmgr.git
git@gitee.com:cloud_vr_ar/mynewt-newtmgr.git
cloud_vr_ar
mynewt-newtmgr
mynewt-newtmgr
691c1077103e

搜索帮助