1 Star 1 Fork 0

凡卡/libp2parea

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
peer.go 26.01 KB
一键复制 编辑 原始数据 按行查看 历史
凡卡 提交于 2023-11-29 11:01 +08:00 . first commit
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747
package main
import (
"crypto/sha256"
"math/big"
"path/filepath"
"sort"
"strconv"
"sync"
"time"
"gitee.com/prestonTao/libp2parea/config"
"gitee.com/prestonTao/libp2parea/nodeStore"
"gitee.com/prestonTao/libp2parea/virtual_node"
"gitee.com/prestonTao/keystore"
"gitee.com/prestonTao/libp2parea"
"gitee.com/prestonTao/libp2parea/engine"
mc "gitee.com/prestonTao/libp2parea/message_center"
"gitee.com/prestonTao/libp2parea/message_center/flood"
"gitee.com/prestonTao/utils"
)
func main() {
engine.Log.Info("start")
engine.SetLogPath("log.txt")
Startzyz()
}
var sstt int
var (
addrPre = "SELF"
areaName = sha256.Sum256([]byte("zyz111"))
keyPwd = "123456789"
serverHost = "124.221.170.43"
host = "127.0.0.1"
basePort = 19960
)
/*
启动所有节点
*/
func Startzyz() {
n := 7
areaPeers := make([]*TestPeer, 0, n)
for i := 0; i < n; i++ {
area := StartOnePeer(i)
areaPeers = append(areaPeers, area)
area.area.OpenVnode()
area.area.Vm.AddVnode()
area.area.Vm.AddVnode()
area.area.Vm.AddVnode()
vnodeinfos := area.area.Vm.GetVnodeSelf()
for _, vnode := range vnodeinfos {
engine.Log.Info("虚拟地址:%s %d", vnode.Vid.B58String(), vnode.Index)
}
time.Sleep(time.Second)
}
kad := new(nodeStore.IdDESC)
for _, v := range areaPeers {
engine.Log.Info("NetId: %s VnodeId : %s",
v.area.GetNetId().B58String(),
v.area.GetVnodeId().B58String())
*kad = append(*kad, new(big.Int).SetBytes([]byte(v.area.GetNetId())))
}
var small string
var snum int
var lnum int
var large string
sort.Sort(kad)
for i := 0; i < len(*kad); i++ {
// engine.Log.Info("selfAddrBI: ", selfAddrBI, " one", one).
one := (*kad)[i]
IdBs := one.Bytes()
IdBsP := utils.FullHighPositionZero(&IdBs, 32)
engine.Log.Info("排序 %d : %s \n", i, nodeStore.AddressNet(*IdBsP).B58String())
if i == 0 {
large = nodeStore.AddressNet(*IdBsP).B58String()
}
if i == len(*kad)-1 {
small = nodeStore.AddressNet(*IdBsP).B58String()
}
}
// for i := 0; i < len(areaPeers); i++ {
// if areaPeers[i].area.GetNetId().B58String() == small {
// snum = i
// engine.Log.Info("ssss %s", areaPeers[i].area.GetNetId().B58String())
// }
// if areaPeers[i].area.GetNetId().B58String() == large {
// lnum = i
// engine.Log.Info("llll %s", areaPeers[i].area.GetNetId().B58String())
// }
// // if areaPeers[i].area.GetNetId().B58String() == "BYito3Uzea3wHeHmHpi9ckE98AtVQVXFWxSG6cAmXxQA" {
// // sstt = i
// // }
// // engine.Log.Info("地址:%s 虚拟节点 : %s", areaPeers[i].area.GetNetId().B58String(), areaPeers[i].area.Vm.GetVnodeSelf()[1].Vid.B58String())
// logicNodes := areaPeers[i].area.Vm.FindLogicInVnodeSelf(areaPeers[i].area.Vm.GetVnodeSelf()[0].Vid)
// for _, one := range logicNodes {
// engine.Log.Info("逻辑节点:%s %d %s", one.Nid.B58String(), one.Index, one.Vid.B58String())
// }
// }
engine.Log.Info("--------------------------")
kadV := new(nodeStore.IdDESC)
for _, v := range areaPeers {
//*kadV = append(*kadV, new(big.Int).SetBytes([]byte(v.Vm.DiscoverVnodes.Vnode.Vid)))
for _, vInfo := range v.area.Vm.VnodeMap {
if vInfo.Vnode.Index == 0 {
continue
}
engine.Log.Info("Vnode.Vid : %s", vInfo.Vnode.Vid.B58String())
*kadV = append(*kadV, new(big.Int).SetBytes([]byte(vInfo.Vnode.Vid)))
}
}
sort.Sort(kadV)
for i := 0; i < len(*kadV); i++ {
// engine.Log.Info("selfAddrBI: ", selfAddrBI, " one", one).
one := (*kadV)[i]
IdBs := one.Bytes()
IdBsP := utils.FullHighPositionZero(&IdBs, 32)
engine.Log.Info("Vnode 排序 %d : %s \n", i, nodeStore.AddressNet(*IdBsP).B58String())
}
{
//排序 打印
kad := new(nodeStore.IdDESC)
for _, v := range areaPeers {
/* engine.Log.Info("NetId: %s VnodeId : %s",
v.GetNetId().B58String(),
v.GetVnodeId().B58String())
*/
*kad = append(*kad, new(big.Int).SetBytes([]byte(v.area.GetNetId())))
}
sort.Sort(kad)
for i := 0; i < len(*kad); i++ {
// engine.Log.Info("selfAddrBI: ", selfAddrBI, " one", one).
one := (*kad)[i]
IdBs := one.Bytes()
IdBsP := utils.FullHighPositionZero(&IdBs, 32)
engine.Log.Info("真实排序 %d : %s \n", i, nodeStore.AddressNet(*IdBsP).B58String())
}
}
engine.Log.Info("开始等待节点自治")
//等待各个节点都准备好
for i, one := range areaPeers {
engine.Log.Info("----------------------------> start %d", i)
one.area.WaitAutonomyFinish()
// engine.Log.Info("----------------------------> mid %d", i)
// one.area.WaitAutonomyFinishVnode()
engine.Log.Info("----------------------------> end %d", i)
}
for i, one := range areaPeers {
// engine.Log.Info("----------------------------> start %d", i)
// one.area.WaitAutonomyFinish()
engine.Log.Info("----------------------------> mid %d", i)
one.area.WaitAutonomyFinishVnode()
engine.Log.Info("----------------------------> end %d", i)
}
engine.Log.Info("--------------------------")
engine.Log.Info("节点自治完成,打印逻辑节点")
engine.Log.Info("", snum, lnum, small, large)
engine.Log.Info("--------------------------")
engine.Log.Info("发送消息测试开始")
// engine.Log.Info("最小节点: %d 最大节点: %d", snum, lnum)
// tt := time.NewTicker(5 * time.Second)
// <-tt.C
// engine.Log.Info("删除areaPeers[2]虚拟节点 %s self %s", areaPeers[2].area.Vm.GetVnodeSelf()[1].Vid.B58String(),
// areaPeers[2].area.NodeManager.NodeSelf.IdInfo.Id.B58String())
// areaPeers[2].area.DelSelfVnodeByAddress(nodeStore.AddressNet(areaPeers[2].area.Vm.GetVnodeSelf()[1].Vid))
// engine.Log.Info("删除vnode完成")
//time.Sleep(60 * time.Second)
// sendMsgLoop(areaPeers[0].area, areaPeers, lnum, snum)
// engine.Log.Info("--------------------------")
// engine.Log.Info("--------------------------")
//<-time.NewTicker(5 * time.Second).C
// for i := 0; i < 50000; i++ {
// searchVnoideId(areaPeers[0].area, 0)
// }
// engine.Log.Info("获取真实节点1111")
// toNetidStr := "E2AH5YrEwr27GmGbxkzt4CZfy9mrtsLggjmqAxLFf2N1"
// id := nodeStore.AddressFromB58String(toNetidStr)
// engine.Log.Info("sender : %s", areaPeers[2].area.NodeManager.NodeSelf.IdInfo.Id.B58String())
{
engine.Log.Info("普通消息发送 开始")
con := []byte("heeeeellllllllooooooo")
for i := 0; i < len(areaPeers); i++ {
if i == 6 {
continue
}
for n := len(areaPeers) - 1; n >= 0; n-- {
if n == i {
continue
}
engine.Log.Error("zyz zyz 1111 self %s to %s", areaPeers[i].area.NodeManager.NodeSelf.IdInfo.Id.B58String(), areaPeers[n].area.NodeManager.NodeSelf.IdInfo.Id.B58String())
_, _, _, err := areaPeers[i].area.SendP2pMsgWaitRequest(msg_id_p2p, &areaPeers[n].area.NodeManager.NodeSelf.IdInfo.Id, &con, 2*time.Second)
if err != nil {
engine.Log.Error("zyz zyz 2222 error : %s", err.Error())
continue
}
}
}
engine.Log.Info("普通消息发送 结束")
}
{
searchAddr := []string{
"EdbKNMnG9nSarP76X73K8qnZyu3C6oqtithkrfZYcNFV",
"EdbKNMnG9nSarP76X73K8qnZyu3C6oqtith4rfZYcNF1",
"9b7LU2QW9ED9gX55vVTmT33kC9YJWKPECpVbWY5k2BB1",
"7YuHM6tydLbjsnBSrb6jN3KrA4TqNBeH3FurmC4hpYg1",
"6av3TYMGUyuMA31WfV36fzUoFGd2EE8WjwcKYr8AFGn1",
"aXTebYN68gs2rsAdZ35zJMMZfA56nruSUpMwQwkxXk1",
"DdnVnUFSdfQBrJ43kirMqQEARUVy8hsi8DP5EApx8D1",
"CEEJz55QSokCmW5GxyZNuysQN7EQQv4EbmWymmuy6H1",
}
engine.Log.Info("searchnetaddr 开始")
for i := 0; i < len(areaPeers); i++ {
if i == 6 {
for u := range searchAddr {
engine.Log.Info("")
engine.Log.Info("======================代理使用============================================")
engine.Log.Info("sender %s searchaddr %s", areaPeers[i].area.NodeManager.NodeSelf.IdInfo.Id.B58String(), searchAddr[u])
s := nodeStore.AddressFromB58String(searchAddr[u])
re, err := areaPeers[i].area.SearchNetAddrOneByOneAuto(&s, 3)
if err != nil {
engine.Log.Error("SearchNetAddrOneByOneAuto err : %s", err.Error())
continue
}
if len(re) != 3 {
engine.Log.Error("SearchNetAddrOneByOneAuto Num : %d", len(re))
continue
}
for n := 0; n < len(re); n++ {
engine.Log.Info("index %d addr %s", n, re[n].B58String())
}
}
}
for u := range searchAddr {
engine.Log.Info("")
engine.Log.Info("==================================================================")
engine.Log.Info("sender %s searchaddr %s", areaPeers[i].area.NodeManager.NodeSelf.IdInfo.Id.B58String(), searchAddr[u])
s := nodeStore.AddressFromB58String(searchAddr[u])
re, err := areaPeers[i].area.SearchNetAddrWithNum(&s, 3)
if err != nil {
engine.Log.Error("SearchNetAddrWithNum err : %s", err.Error())
continue
}
if len(re) != 3 {
engine.Log.Error("SearchNetAddrWithNum Num : %d", len(re))
continue
}
for n := 0; n < len(re); n++ {
engine.Log.Info("index %d addr %s", n, re[n].B58String())
}
}
}
engine.Log.Info("searchnetaddr 结束")
}
// var wg sync.WaitGroup
// var successCnt, failedCnt int32
// startTime := time.Now().UnixMilli()
// for i := 0; i < 40000; i++ {
// wg.Add(1)
// go func() {
// defer wg.Done()
// r1, err := areaPeers[2].area.SearchNetAddrOneByOne(&id, 0)
// if err != nil {
// atomic.AddInt32(&failedCnt, 1)
// engine.Log.Warn(err.Error())
// return
// }
// if len(r1) == 0 {
// atomic.AddInt32(&failedCnt, 1)
// engine.Log.Info("失败")
// return
// }
// atomic.AddInt32(&successCnt, 1)
// }()
// }
// wg.Wait()
// endTime := time.Now().UnixMilli()
// useTime := endTime - startTime
// engine.Log.Info("--------------------------------------------------")
// engine.Log.Info("")
// engine.Log.Warn("cost time: %d毫秒", useTime)
// engine.Log.Warn("success cnt:%d", successCnt)
// engine.Log.Warn("failed cnt:%d", failedCnt)
// engine.Log.Warn("TPS:%v", (successCnt*10000)/int32(useTime))
// engine.Log.Info("--------------------------------------------------")
// engine.Log.Info("")
// for _, one := range r1 {
// engine.Log.Info("7777 :%s", one.B58String())
// }
// engine.Log.Info("获取真实节点2222")
// engine.Log.Info("sender : %s", areaPeers[2].area.NodeManager.NodeSelf.IdInfo.Id.B58String())
// r2, err := areaPeers[2].area.SearchNetAddrOneByOne(&id, 0)
// if err != nil {
// engine.Log.Warn(err.Error())
// }
// for _, one := range r2 {
// engine.Log.Info("8888 :%s", one.B58String())
// }
// engine.Log.Info("sender : %s", areaPeers[lnum].area.NodeManager.NodeSelf.IdInfo.Id.B58String())
// r2, err := areaPeers[lnum].area.SearchNetAddrOneByOne(&id, 0)
// if err != nil {
// engine.Log.Warn(err.Error())
// }
// for _, one := range r2 {
// engine.Log.Info("9999 :%s", one.B58String())
// }
engine.Log.Info("发送消息测试完成")
select {}
}
type TestPeer struct {
area *libp2parea.Area
}
func StartOnePeer(i int) *TestPeer {
keyPath1 := filepath.Join("conf", "keystore"+strconv.Itoa(i)+".key")
key1 := keystore.NewKeystore(keyPath1, addrPre)
err := key1.Load()
if err != nil {
//没有就创建
err = key1.CreateNewKeystore(keyPwd)
if err != nil {
panic("创建key1错误:" + err.Error())
}
}
if key1.NetAddr == nil {
_, _, err = key1.CreateNetAddr(keyPwd, keyPwd)
if err != nil {
panic("创建NetAddr错误:" + err.Error())
}
}
if len(key1.GetAddr()) < 1 {
_, err = key1.GetNewAddr(keyPwd, keyPwd)
if err != nil {
panic("创建Addr错误:" + err.Error())
}
}
if len(key1.GetDHKeyPair().SubKey) < 1 {
_, err = key1.GetNewDHKey(keyPwd, keyPwd)
if err != nil {
panic("创建Addr错误:" + err.Error())
}
}
area, err := libp2parea.NewArea(areaName, key1, keyPwd)
if err != nil {
panic(err.Error())
}
area.SetLeveldbPath(config.Path_leveldb + strconv.Itoa(i))
area.SetNetTypeToTest()
//area.OpenVnode()
//serverHost
if i == 6 {
area.SetAreaGodAddr(host, basePort)
area.SetPhoneNode()
}
area.SetDiscoverPeer(host + ":" + strconv.Itoa(basePort))
area.StartUP(false, host, uint16(basePort+i))
if i == 6 {
ok, addr := area.SetAreaGodAddr(host, basePort)
engine.Log.Info("==============================设置代理 %s 设置代理成功 %t self %s", addr, ok, area.NodeManager.NodeSelf.IdInfo.Id.B58String())
area.SetPhoneNode()
}
peer := TestPeer{
area: area,
}
peer.InitHandler(area)
return &peer
}
func sendMsgOneSearchSuper(area *libp2parea.Area, toArea *libp2parea.Area) {
engine.Log.Info("===============sendMsgOneSearchSuper> 节点:%s 发送消息给:%s 开始!!", area.GetNodeSelf().IdInfo.Id.B58String(), toArea.Vm.GetVnodeDiscover().Vnode.Nid.B58String())
engine.Log.Info("")
_, err := area.SendSearchSuperMsgWaitRequest(msg_id_searchSuper, &toArea.Vm.GetVnodeDiscover().Vnode.Nid, nil, time.Second*10)
if err != nil {
engine.Log.Error("发送P2p消息失败:%s", err.Error())
return
}
engine.Log.Info("")
engine.Log.Info("===============sendMsgOneSearchSuper< 节点:%s 发送消息给:%s 结束!!", area.GetNodeSelf().IdInfo.Id.B58String(), toArea.Vm.GetVnodeDiscover().Vnode.Nid.B58String())
}
func sendVnodeP2pMsgHE(area *libp2parea.Area, toArea *libp2parea.Area) {
//真实节点 from : GVi4gfzEVzGBH5uNHocZTLPG8ywMi5oZbME7rPWbKDcY to : 6SnYLyD8oPoMyKLKr7MKmZ5Wqjis2hLFw6dbKSDVsMUE
con := []byte("123")
toVnodes := area.Vm.GetVnodeSelf()
engine.Log.Info("===============sendVnodeP2pMsgHE> 虚拟节点:%s 发送消息给 虚拟节点:%s || 真实节点 from : %s to : %s 开始!!", toVnodes[1].Vid.B58String(), toArea.Vm.GetVnodeSelf()[1].Vid.B58String(),
toVnodes[1].Nid.B58String(), toArea.NodeManager.NodeSelf.IdInfo.Id.B58String())
engine.Log.Info("")
_, err := area.SendVnodeP2pMsgHEWaitRequest(msg_id_vnode_p2p, &toVnodes[1].Vid, &toArea.Vm.GetVnodeSelf()[1].Vid, &toArea.NodeManager.NodeSelf.IdInfo.Id, &con, time.Second*10)
if err != nil {
engine.Log.Error("发送VnodeP2p消息失败:%s", err.Error())
return
}
engine.Log.Info("")
engine.Log.Info("===============sendVnodeP2pMsgHE< 虚拟节点:%s 发送消息给 虚拟节点:%s 结束!!", toVnodes[1].Vid.B58String(), toArea.Vm.GetVnodeSelf()[1].Vid.B58String())
}
func sendMsgOneP2pMsg(area *libp2parea.Area, toArea *libp2parea.Area, ll int) {
toNetid := toArea.GetNetId()
// //主动发地址
// toNetidStr := "AuWrFLnPttkfypro3WMStx9QiFRHRx1HGqHsiAxwTFqq"
// toNetid = nodeStore.AddressNet(engine.AddressFromB58String(toNetidStr))
cont := []byte("00")
engine.Log.Info("")
engine.Log.Info("===============sendMsgOneP2pMsg> 节点:%s 发送消息给:%s 开始!!", area.GetNodeSelf().IdInfo.Id.B58String(), toNetid.B58String())
engine.Log.Info("")
_, _, _, err := area.SendP2pMsgWaitRequest(msg_id_p2p, &toNetid, &cont, time.Second*10)
if err != nil {
engine.Log.Error("发送P2p消息失败:%s", err.Error())
return
}
engine.Log.Info("")
engine.Log.Info("===============sendMsgOneP2pMsg< 节点:%s 发送消息给:%s 结束!!", area.GetNodeSelf().IdInfo.Id.B58String(), toNetid.B58String())
engine.Log.Info("")
}
func sendVnodeSearchMsg(area *libp2parea.Area, toArea *libp2parea.Area) {
engine.Log.Info("")
engine.Log.Info("===============sendVnodeSearchMsg> 节点:%s 发送消息给:%s 开始!!", area.Vm.GetVnodeSelf()[1].Vid.B58String(), toArea.Vm.GetVnodeSelf()[1].Vid.B58String())
engine.Log.Info("")
_, err := area.SendVnodeSearchMsgWaitRequest(msg_id_vnode_search, &area.Vm.GetVnodeSelf()[1].Vid, &toArea.Vm.GetVnodeSelf()[1].Vid, nil, time.Second*10)
if err != nil {
engine.Log.Info("发送VnodeSearchMsg error:%s", err.Error())
return
}
engine.Log.Info("")
engine.Log.Info("===============sendVnodeSearchMsg< 节点:%s 发送消息给:%s 结束!!", area.Vm.GetVnodeSelf()[1].Vid.B58String(), toArea.Vm.GetVnodeSelf()[1].Vid.B58String())
engine.Log.Info("")
}
func searchVnoideId(area *libp2parea.Area, num uint16) {
// engine.Log.Info("===============> 节点 %d", num)
// toNetidStr := "75Q8RGVmHfE7qDt616oJn5zBtcvkciM5N2zm97hyDa1Z2"
toNetidStr := "FntW7YmLqoq1xV7rabu6GASUHXNhLhuLQtaJpc1rL531"
id := nodeStore.AddressFromB58String(toNetidStr)
vid := virtual_node.AddressNetExtend(id)
//engine.Log.Info("sender %s", area.Vm.VnodeMap[1].Vnode.Vid.B58String())
bs1, err := area.SearchVnodeIdOnebyone(&vid, num)
if err != nil {
engine.Log.Info("发送VnodeSearchMsg error:%s", err.Error())
return
}
// for _, v := range bs1 {
// engine.Log.Info("66666 v:%s n:%s", v.Vid.B58String(), v.Nid.B58String())
// }
if len(bs1) < 5 {
engine.Log.Warn("出错 出错 出错 出错 出错")
}
// bs2, err := area.SearchVnodeId(&vid)
// if err != nil {
// engine.Log.Info("发送VnodeSearchMsg error:%s", err.Error())
// return
// }
// engine.Log.Info("7777", bs2.B58String())
// // updown := make([]string, 0)
// // err = json.Unmarshal(*bs, &updown)
// if err != nil {
// engine.Log.Info("marsh err : %s", err.Error())
// }
// for i := 0; i <= len(*bs); {
// v := i + 64
//
// }
// engine.Log.Info("===============< 节点 %d", num)
}
func sendMsgLoop(area *libp2parea.Area, toAddrs []*TestPeer, ll int, ss int) {
// //version_p2p msgid = 5
// for i := 0; i < len(toAddrs); i++ {
// to := toAddrs[i]
// sendMsgOneP2pMsg(toAddrs[len(toAddrs)-1].area, to.area, ll)
// time.Sleep(3 * time.Second)
// }
// //version_search_super msgid = 3
// for i := 0; i < len(toAddrs); i++ {
// sendMsgOneSearchSuper(area, toAddrs[len(toAddrs)-1].area)
// }
// // //version_vnode_p2pHE msgid = 8
// for i := 0; i < len(toAddrs); i++ {
// sendVnodeP2pMsgHE(toAddrs[len(toAddrs)-1].area, toAddrs[1].area)
// }
// // version_vnode_search msgid = 7
// for i := 0; i < len(toAddrs); i++ {
// sendVnodeSearchMsg(toAddrs[len(toAddrs)-1].area, toAddrs[1].area)
// }
// con := []byte("HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH")
// engine.Log.Info(" -------- con %d", len(con))
//searchVnoideId(toAddrs[1].area, 3)
searchVnoideId(toAddrs[ll].area, uint16(3))
searchVnoideId(toAddrs[ss].area, uint16(0))
searchVnoideId(toAddrs[1].area, uint16(1))
// from := toAddrs[ss].area
// to := toAddrs[ll].area
// engine.Log.Info("1000条消息发送开始")
// for i := 0; i < 1000; i++ {
// sendMsgOne(from, to, &con)
// }
// engine.Log.Info("1000条消息发送结束")
// engine.Log.Info("1000条加密消息发送开始")
// for i := 0; i < 1000; i++ {
// sendMsgOneHe(from, to, &con)
// }
// engine.Log.Info("1000条加密消息发送结束")
}
func sendMsgOneHe(area *libp2parea.Area, toArea *libp2parea.Area, con *[]byte) error {
//发送p2p消息
engine.Log.Info("节点:%s 发送消息给:%s", area.GetNetId().B58String(), toArea.GetNetId().B58String())
toNetid := toArea.GetNetId()
_, _, _, err := area.SendP2pMsgHEWaitRequest(msg_id_p2p, &toNetid, con, time.Second*10)
if err != nil {
engine.Log.Error("发送P2p消息失败:%s", err.Error())
return err
}
engine.Log.Info("结束")
return nil
}
func sendMsgOne(area *libp2parea.Area, toArea *libp2parea.Area, con *[]byte) error {
//发送p2p消息
engine.Log.Info("节点:%s 发送消息给:%s", area.GetNetId().B58String(), toArea.GetNetId().B58String())
toNetid := toArea.GetNetId()
_, _, _, err := area.SendP2pMsgWaitRequest(msg_id_p2p, &toNetid, con, time.Second*10)
if err != nil {
engine.Log.Error("发送P2p消息失败:%s", err.Error())
return err
}
engine.Log.Info("结束")
return nil
}
const msg_id_p2p = 1001
const msg_id_p2p_recv = 1002 //加密消息
const msg_id_searchSuper = 1003
const msg_id_searchSuper_recv = 1004 //加密消息
const msg_id_vnode_p2p = 1005
const msg_id_vnode_p2p_recv = 1006 //加密消息
const msg_id_vnode_search = 1007 //搜索节点消息
const msg_id_vnode_search_recv = 1008 //搜索节点消息 返回
func (this *TestPeer) InitHandler(area *libp2parea.Area) {
area.Register_p2p(msg_id_p2p, this.RecvP2PMsgHandler)
area.Register_p2p(msg_id_p2p_recv, this.RecvP2PMsgHandler_recv)
area.Register_p2p(msg_id_searchSuper, this.SearchSuperHandler)
area.Register_p2p(msg_id_searchSuper_recv, this.SearchSuperHandler_recv)
area.Register_vnode_p2pHE(msg_id_vnode_p2p, this.RecvMsgHandler)
area.Register_vnode_p2pHE(msg_id_vnode_p2p_recv, this.RecvMsgHEHandler)
area.Register_vnode_search(msg_id_vnode_search, this.SearchVnodeHandler)
area.Register_vnode_p2pHE(msg_id_vnode_search_recv, this.SearchVnodeHandler_recv)
}
var MsgCountLock = new(sync.Mutex)
var MsgCount = make(map[string]uint64)
func (this *TestPeer) RecvP2PMsgHandler(c engine.Controller, msg engine.Packet, message *mc.Message) {
selfId := message.Head.RecvId.B58String()
engine.Log.Info("收到P2P消息 from:%s self:%s", message.Head.Sender.B58String(), this.area.GetNetId().B58String())
if message.Body.Content != nil {
engine.Log.Info("messs body %s", string(*message.Body.Content))
}
MsgCountLock.Lock()
count, ok := MsgCount[selfId]
if ok {
MsgCount[selfId] = count + 1
} else {
MsgCount[selfId] = 1
}
MsgCountLock.Unlock()
this.area.SendP2pReplyMsg(message, msg_id_p2p_recv, nil)
}
func (this *TestPeer) RecvP2PMsgHandler_recv(c engine.Controller, msg engine.Packet, message *mc.Message) {
// engine.Log.Info("收到P2P消息返回 from:%s", message.Head.Sender.B58String())
flood.ResponseBytes(utils.Bytes2string(message.Body.Hash), message.Body.Content)
// selfId := message.Head.RecvId.B58String()
// MsgCountLock.Lock()
// count, ok := MsgCount[selfId]
// if ok {
// MsgCount[selfId] = count + 1
// } else {
// MsgCount[selfId] = 1
// }
// MsgCountLock.Unlock()
}
func (this *TestPeer) SearchSuperHandler(c engine.Controller, msg engine.Packet, message *mc.Message) {
selfId := message.Head.RecvId.B58String()
// engine.Log.Info("收到SearchSuper消息 from:%s self:%s", message.Head.Sender.B58String(), this.area.GetNetId().B58String())
MsgCountLock.Lock()
count, ok := MsgCount[selfId]
if ok {
MsgCount[selfId] = count + 1
} else {
MsgCount[selfId] = 1
}
MsgCountLock.Unlock()
err := this.area.SendSearchSuperReplyMsg(message, msg_id_searchSuper_recv, nil)
if err != nil {
engine.Log.Error("SendSearchSuperReplyMsg error:%s", err.Error())
}
}
func (this *TestPeer) SearchSuperHandler_recv(c engine.Controller, msg engine.Packet, message *mc.Message) {
// engine.Log.Info("收到SearchSuper消息返回 from:%s", message.Head.Sender.B58String())
flood.ResponseBytes(utils.Bytes2string(message.Body.Hash), message.Body.Content)
// selfId := message.Head.RecvId.B58String()
// MsgCountLock.Lock()
// count, ok := MsgCount[selfId]
// if ok {
// MsgCount[selfId] = count + 1
// } else {
// MsgCount[selfId] = 1
// }
// MsgCountLock.Unlock()
}
func (this *TestPeer) RecvMsgHandler(c engine.Controller, msg engine.Packet, message *mc.Message) {
// selfId := message.Head.RecvId.B58String()
// engine.Log.Info("收到vnode p2p消息 from:%s self:%s", message.Head.Sender.B58String(), this.area.GetNetId().B58String())
// MsgCountLock.Lock()
// count, ok := MsgCount[selfId]
// if ok {
// MsgCount[selfId] = count + 1
// } else {
// MsgCount[selfId] = 1
// }
// MsgCountLock.Unlock()
err := this.area.SendVnodeP2pReplyMsgHE(message, msg_id_vnode_p2p_recv, nil)
if err != nil {
engine.Log.Info("SendVnodeP2pReplyMsgHE error:%s", err.Error())
} else {
}
}
func (this *TestPeer) RecvMsgHEHandler(c engine.Controller, msg engine.Packet, message *mc.Message) {
// engine.Log.Info("收到vnode p2p消息返回 from:%s", message.Head.Sender.B58String())
flood.ResponseBytes(utils.Bytes2string(message.Body.Hash), message.Body.Content)
// selfId := message.Head.RecvId.B58String()
// MsgCountLock.Lock()
// count, ok := MsgCount[selfId]
// if ok {
// MsgCount[selfId] = count + 1
// } else {
// MsgCount[selfId] = 1
// }
// MsgCountLock.Unlock()
}
func (this *TestPeer) SearchVnodeHandler(c engine.Controller, msg engine.Packet, message *mc.Message) {
// selfId := message.Head.RecvId.B58String()
selfId := message.Head.RecvVnode.B58String()
// engine.Log.Info("收到 VnodeSearch 消息 self:%s from:%s", this.area.GetNetId().B58String(), message.Head.Sender.B58String())
MsgCountLock.Lock()
count, ok := MsgCount[selfId]
if ok {
MsgCount[selfId] = count + 1
} else {
MsgCount[selfId] = 1
}
MsgCountLock.Unlock()
err := this.area.SendVnodeSearchReplyMsg(message, msg_id_vnode_search_recv, nil)
if err != nil {
engine.Log.Info("回复消息错误:%s", err.Error())
}
}
func (this *TestPeer) SearchVnodeHandler_recv(c engine.Controller, msg engine.Packet, message *mc.Message) {
// engine.Log.Info("收到 VnodeSearch 消息返回 from:%s self:%s", message.Head.Sender.B58String(), this.area.GetNetId().B58String())
flood.ResponseBytes(utils.Bytes2string(message.Body.Hash), message.Body.Content)
// selfId := message.Head.RecvId.B58String()
// MsgCountLock.Lock()
// count, ok := MsgCount[selfId]
// if ok {
// MsgCount[selfId] = count + 1
// } else {
// MsgCount[selfId] = 1
// }
// MsgCountLock.Unlock()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/prestonTao/libp2parea.git
git@gitee.com:prestonTao/libp2parea.git
prestonTao
libp2parea
libp2parea
3aaa451ef873

搜索帮助