1 Star 1 Fork 0

凡卡 / libp2parea

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
peer.go 9.30 KB
一键复制 编辑 原始数据 按行查看 历史
凡卡 提交于 2023-11-29 11:01 . first commit
package main
import (
"bytes"
"crypto/sha256"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"gitee.com/prestonTao/keystore"
"gitee.com/prestonTao/libp2parea"
"gitee.com/prestonTao/libp2parea/config"
"gitee.com/prestonTao/libp2parea/engine"
mc "gitee.com/prestonTao/libp2parea/message_center"
"gitee.com/prestonTao/libp2parea/message_center/flood"
"gitee.com/prestonTao/libp2parea/nodeStore"
"gitee.com/prestonTao/utils"
)
func main() {
utils.PprofMem(time.Minute * 2)
engine.Log.Info("start")
engine.SetLogPath("log.txt")
StartAllPeer()
}
var (
addrPre = "SELF"
areaName = sha256.Sum256([]byte("nihaoa a a!"))
keyPwd = "123456789"
host = "172.28.0.149"
basePort = 19862
)
/*
启动所有节点
*/
func StartAllPeer() {
nsm := nodeStore.NodeSimulationManager{IDdepth: 32 * 8}
n := 1
areas := make([]*libp2parea.Area, 0, n)
areaPeers := make([]*TestPeer, 0, n)
for i := 0; i < n; i++ {
area := StartOnePeer(2)
areaPeers = append(areaPeers, area)
areas = append(areas, area.area)
nsm.AddNodeSuperIDs(area.area.GetNetId())
}
engine.Log.Info("--------------------------")
engine.Log.Info("开始等待节点自治")
//等待各个节点都准备好
for _, one := range areaPeers {
one.area.WaitAutonomyFinish()
one.area.WaitAutonomyFinishVnode()
}
engine.Log.Info("--------------------------")
engine.Log.Info("节点自治完成,打印逻辑节点")
sleepTime := time.Second * 30
engine.Log.Info("等待%s后打印关系", sleepTime)
time.Sleep(sleepTime)
targetID := nodeStore.AddressFromB58String("5E6a58A49zNS5otG6EXdU4oQHQYXTerXq5WGjRjpYA7E")
// 测试磁力节点消息到达是否准确
for {
showLogicNode(areas[0])
sendMsgOne(areas[0], &targetID)
engine.Log.Info("qlw----休息20秒后,将再次尝试获取目标地址")
engine.Log.Info("")
engine.Log.Info("")
time.Sleep(time.Second * 20)
// engine.Log.Info("")
}
}
func showLogicNode(area *libp2parea.Area) {
engine.Log.Info("show nodes cur:%s------------------------------begin", area.NodeManager.NodeSelf.IdInfo.Id.B58String())
logicNodes := area.NodeManager.GetAllNodes()
for i := range logicNodes {
engine.Log.Info("qlw----logic node:%s", logicNodes[i].IdInfo.Id.B58String())
}
engine.Log.Info("show nodes cur:%s------------------------------end", area.NodeManager.NodeSelf.IdInfo.Id.B58String())
engine.Log.Info("")
}
type TestPeer struct {
area *libp2parea.Area
}
func StartOnePeer(i int) *TestPeer {
keyPath1 := filepath.Join("conf", "keystore"+strconv.Itoa(i)+".key")
key1 := keystore.NewKeystore(keyPath1, addrPre)
err := key1.Load()
if err != nil {
//没有就创建
err = key1.CreateNewKeystore(keyPwd)
if err != nil {
panic("创建key1错误:" + err.Error())
}
}
if key1.NetAddr == nil {
_, _, err = key1.CreateNetAddr(keyPwd, keyPwd)
if err != nil {
panic("创建NetAddr错误:" + err.Error())
}
}
if len(key1.GetAddr()) < 1 {
_, err = key1.GetNewAddr(keyPwd, keyPwd)
if err != nil {
panic("创建Addr错误:" + err.Error())
}
}
if len(key1.GetDHKeyPair().SubKey) < 1 {
_, err = key1.GetNewDHKey(keyPwd, keyPwd)
if err != nil {
panic("创建Addr错误:" + err.Error())
}
}
area, err := libp2parea.NewArea(areaName, key1, keyPwd)
if err != nil {
panic(err.Error())
}
area.SetLeveldbPath(config.Path_leveldb + strconv.Itoa(i))
// area.SetNetTypeToTest()
// area.OpenVnode()
//serverHost
area.SetDiscoverPeer("8.210.59.108:19960")
area.StartUP(false, host, uint16(basePort+i))
// area.CloseVnode()
// area.Vm.AddVnodeByIndex(100)
// area.Vm.AddVnodeByIndex(200)
// area.Vm.AddVnodeByIndex(300)
// area.Vm.AddVnodeByIndex(400)
peer := TestPeer{
area: area,
}
peer.InitHandler(area)
return &peer
}
var index int = 20000
var targetProxyId string
func sendMsgOne(area *libp2parea.Area, toAddr *nodeStore.AddressNet) error {
//发送p2p消息
index++
content := []byte(toAddr.B58String() + "__|__" + strconv.Itoa(index))
var recvProxyId *nodeStore.AddressNet
if targetProxyId != "" {
tProxyId := nodeStore.AddressFromB58String(targetProxyId)
recvProxyId = &tProxyId
}
engine.Log.Info("节点:%s 发送消息给:%s, content:%s, proxyId:%s", area.GetNetId().B58String(), toAddr.B58String(), content, targetProxyId)
engine.Log.Info("")
_, _, _, err := area.SendP2pMsgProxyWaitRequest(msg_id_p2p, toAddr, recvProxyId, nil, &content, time.Second*10)
if err != nil {
engine.Log.Error("发送P2p消息失败:%s", err.Error())
return err
}
return nil
}
const msg_id_p2p = 1001
const msg_id_p2p_recv = 1002 //加密消息
const msg_id_searchSuper = 1003
const msg_id_searchSuper_recv = 1004 //加密消息
const msg_id_vnode_p2p = 1005
const msg_id_vnode_p2p_recv = 1006 //加密消息
const msg_id_vnode_search = 1007 //搜索节点消息
const msg_id_vnode_search_recv = 1008 //搜索节点消息 返回
const msg_id_multicast = 1009 //
func (this *TestPeer) InitHandler(area *libp2parea.Area) {
area.Register_p2p(msg_id_p2p, this.RecvP2PMsgHandler)
area.Register_p2p(msg_id_p2p_recv, this.RecvP2PMsgHandler_recv)
area.Register_p2p(msg_id_searchSuper, this.SearchSuperHandler)
area.Register_p2p(msg_id_searchSuper_recv, this.SearchSuperHandler_recv)
area.Register_vnode_p2pHE(msg_id_vnode_p2p, this.RecvMsgHandler)
area.Register_vnode_p2pHE(msg_id_vnode_p2p_recv, this.RecvMsgHEHandler)
area.Register_vnode_search(msg_id_vnode_search, this.SearchVnodeHandler)
area.Register_vnode_p2pHE(msg_id_vnode_search_recv, this.SearchVnodeHandler_recv)
area.Register_multicast(msg_id_multicast, this.MulticastMsgHandler)
}
var MsgCountLock = new(sync.Mutex)
var MsgCount = make(map[string]uint64)
func (this *TestPeer) RecvP2PMsgHandler(c engine.Controller, msg engine.Packet, message *mc.Message) {
if !bytes.Equal(this.area.GetNetId(), *message.Head.RecvId) {
engine.Log.Error("收到错误p2p消息self:%s recv:%s", this.area.GetNetId().B58String(), message.Head.RecvId.B58String())
return
}
content := string(*message.Body.Content)
engine.Log.Error("收到P2P消息 from:%s self:%s, content:%s", message.Head.Sender.B58String(), this.area.GetNetId().B58String(), content)
engine.Log.Info("")
res := strings.Split(content, "||")
if len(res) > 1 {
targetProxyId = res[1]
}
this.area.SendP2pReplyMsg(message, msg_id_p2p_recv, nil)
}
func (this *TestPeer) RecvP2PMsgHandler_recv(c engine.Controller, msg engine.Packet, message *mc.Message) {
// engine.Log.Info("收到P2P消息返回 from:%s", message.Head.Sender.B58String())
flood.ResponseBytes(utils.Bytes2string(message.Body.Hash), message.Body.Content)
}
func (this *TestPeer) SearchSuperHandler(c engine.Controller, msg engine.Packet, message *mc.Message) {
// selfId := message.Head.RecvId.B58String()
// engine.Log.Info("收到SearchSuper消息 from:%s self:%s", message.Head.Sender.B58String(), this.area.GetNetId().B58String())
err := this.area.SendSearchSuperReplyMsg(message, msg_id_searchSuper_recv, nil)
if err != nil {
engine.Log.Error("SendSearchSuperReplyMsg error:%s", err.Error())
}
}
func (this *TestPeer) SearchSuperHandler_recv(c engine.Controller, msg engine.Packet, message *mc.Message) {
// engine.Log.Info("收到SearchSuper消息返回 from:%s", message.Head.Sender.B58String())
flood.ResponseBytes(utils.Bytes2string(message.Body.Hash), message.Body.Content)
}
func (this *TestPeer) RecvMsgHandler(c engine.Controller, msg engine.Packet, message *mc.Message) {
// selfId := message.Head.RecvId.B58String()
// vnodeinfo := vnc.FindInVnodeinfoSelf(*this.Head.RecvVnode)
if this.area.Vm.FindInVnodeSelf(*message.Head.RecvVnode) {
engine.Log.Info("收到vnode p2p消息self:%s from:%s to:%s", this.area.GetNetId().B58String(),
message.Head.Sender.B58String(), message.Head.RecvVnode.B58String())
} else {
engine.Log.Error("收到错误vnode p2p消息self:%s from:%s to:%s", this.area.GetNetId().B58String(),
message.Head.Sender.B58String(), message.Head.RecvVnode.B58String())
return
}
err := this.area.SendVnodeP2pReplyMsgHE(message, msg_id_vnode_p2p_recv, nil)
if err != nil {
engine.Log.Info("SendVnodeP2pReplyMsgHE error:%s", err.Error())
} else {
}
}
func (this *TestPeer) RecvMsgHEHandler(c engine.Controller, msg engine.Packet, message *mc.Message) {
// engine.Log.Info("收到vnode p2p消息返回 from:%s", message.Head.Sender.B58String())
flood.ResponseBytes(utils.Bytes2string(message.Body.Hash), message.Body.Content)
}
func (this *TestPeer) SearchVnodeHandler(c engine.Controller, msg engine.Packet, message *mc.Message) {
// selfId := message.Head.RecvId.B58String()
selfId := message.Head.RecvVnode.B58String()
// engine.Log.Info("收到 VnodeSearch 消息 self:%s from:%s", this.area.GetNetId().B58String(), message.Head.Sender.B58String())
MsgCountLock.Lock()
count, ok := MsgCount[selfId]
if ok {
MsgCount[selfId] = count + 1
} else {
MsgCount[selfId] = 1
}
MsgCountLock.Unlock()
err := this.area.SendVnodeSearchReplyMsg(message, msg_id_vnode_search_recv, nil)
if err != nil {
engine.Log.Info("回复消息错误:%s", err.Error())
}
}
func (this *TestPeer) SearchVnodeHandler_recv(c engine.Controller, msg engine.Packet, message *mc.Message) {
// engine.Log.Info("收到 VnodeSearch 消息返回 from:%s self:%s", message.Head.Sender.B58String(), this.area.GetNetId().B58String())
flood.ResponseBytes(utils.Bytes2string(message.Body.Hash), message.Body.Content)
}
func (this *TestPeer) MulticastMsgHandler(c engine.Controller, msg engine.Packet, message *mc.Message) {
}
Go
1
https://gitee.com/prestonTao/libp2parea.git
git@gitee.com:prestonTao/libp2parea.git
prestonTao
libp2parea
libp2parea
58a2b3547940

搜索帮助