1 Star 0 Fork 0

陈文甲/fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
client.go 3.87 KB
一键复制 编辑 原始数据 按行查看 历史
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package main
import (
"flag"
"fmt"
"os"
"sync"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/localmsp"
genesisconfig "github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/orderer/common/localconfig"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"golang.org/x/net/context"
"google.golang.org/grpc"
"gopkg.in/cheggaaa/pb.v1"
)
type broadcastClient struct {
client ab.AtomicBroadcast_BroadcastClient
signer crypto.LocalSigner
channelID string
}
// newBroadcastClient creates a simple instance of the broadcastClient interface
func newBroadcastClient(client ab.AtomicBroadcast_BroadcastClient, channelID string, signer crypto.LocalSigner) *broadcastClient {
return &broadcastClient{client: client, channelID: channelID, signer: signer}
}
func (s *broadcastClient) broadcast(transaction []byte) error {
env, err := utils.CreateSignedEnvelope(cb.HeaderType_MESSAGE, s.channelID, s.signer, &cb.ConfigValue{Value: transaction}, 0, 0)
if err != nil {
panic(err)
}
return s.client.Send(env)
}
func (s *broadcastClient) getAck() error {
msg, err := s.client.Recv()
if err != nil {
return err
}
if msg.Status != cb.Status_SUCCESS {
return fmt.Errorf("Got unexpected status: %v - %s", msg.Status, msg.Info)
}
return nil
}
func main() {
config := config.Load()
// Load local MSP
err := mspmgmt.LoadLocalMsp(config.General.LocalMSPDir, config.General.BCCSP, config.General.LocalMSPID)
if err != nil { // Handle errors reading the config file
fmt.Println("Failed to initialize local MSP:", err)
os.Exit(0)
}
signer := localmsp.NewSigner()
var channelID string
var serverAddr string
var messages uint64
var goroutines uint64
var msgSize uint64
var bar *pb.ProgressBar
flag.StringVar(&serverAddr, "server", fmt.Sprintf("%s:%d", config.General.ListenAddress, config.General.ListenPort), "The RPC server to connect to.")
flag.StringVar(&channelID, "channelID", genesisconfig.TestChainID, "The channel ID to broadcast to.")
flag.Uint64Var(&messages, "messages", 1, "The number of messages to broadcast.")
flag.Uint64Var(&goroutines, "goroutines", 1, "The number of concurrent go routines to broadcast the messages on")
flag.Uint64Var(&msgSize, "size", 1024, "The size in bytes of the data section for the payload")
flag.Parse()
conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
defer func() {
_ = conn.Close()
}()
if err != nil {
fmt.Println("Error connecting:", err)
return
}
msgsPerGo := messages / goroutines
roundMsgs := msgsPerGo * goroutines
if roundMsgs != messages {
fmt.Println("Rounding messages to", roundMsgs)
}
bar = pb.New64(int64(roundMsgs))
bar.ShowPercent = true
bar.ShowSpeed = true
bar = bar.Start()
msgData := make([]byte, msgSize)
var wg sync.WaitGroup
wg.Add(int(goroutines))
for i := uint64(0); i < goroutines; i++ {
go func(i uint64, pb *pb.ProgressBar) {
client, err := ab.NewAtomicBroadcastClient(conn).Broadcast(context.TODO())
if err != nil {
fmt.Println("Error connecting:", err)
return
}
s := newBroadcastClient(client, channelID, signer)
done := make(chan (struct{}))
go func() {
for i := uint64(0); i < msgsPerGo; i++ {
err = s.getAck()
if err == nil && bar != nil {
bar.Increment()
}
}
if err != nil {
fmt.Printf("\nError: %v\n", err)
}
close(done)
}()
for i := uint64(0); i < msgsPerGo; i++ {
if err := s.broadcast(msgData); err != nil {
panic(err)
}
}
<-done
wg.Done()
client.CloseSend()
}(i, bar)
}
wg.Wait()
bar.FinishPrint("----------------------broadcast message finish-------------------------------")
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/venjia/fabric.git
git@gitee.com:venjia/fabric.git
venjia
fabric
fabric
v1.1.0-preview

搜索帮助