1 Star 0 Fork 0

melody / go-ipfs

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
p2p.go 14.25 KB
一键复制 编辑 原始数据 按行查看 历史
Steven Allen 提交于 2019-03-01 12:17 . gx: update cmds and flatfs
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
package commands
import (
"context"
"errors"
"fmt"
"io"
"strconv"
"strings"
"text/tabwriter"
"time"
core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
p2p "github.com/ipfs/go-ipfs/p2p"
cmds "gx/ipfs/QmQkW9fnCsg9SLHdViiAh6qfBppodsPZVpU92dZLqYtEfs/go-ipfs-cmds"
ma "gx/ipfs/QmTZBfrPJmjWsCvHEtX5FE6KimVJhsJg5sBbqEFYf4UZtL/go-multiaddr"
madns "gx/ipfs/QmU98UaAEh4WJAcir2qjfztU77JQ14kAwHNFkjUXHZA3Vy/go-multiaddr-dns"
protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
pstore "gx/ipfs/QmaCTz9RkrU13bm9kMB54f7atgqM4qkjDZpRwRoJiWXEqs/go-libp2p-peerstore"
cmdkit "gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
ipfsaddr "gx/ipfs/Qmdf1djucJ1jX5RMF1bDbFg5ybZnupmSAeETQQ3ZV7z6dU/go-ipfs-addr"
)
// P2PProtoPrefix is the default required prefix for protocol names
const P2PProtoPrefix = "/x/"
// P2PListenerInfoOutput is output type of ls command
type P2PListenerInfoOutput struct {
Protocol string
ListenAddress string
TargetAddress string
}
// P2PStreamInfoOutput is output type of streams command
type P2PStreamInfoOutput struct {
HandlerID string
Protocol string
OriginAddress string
TargetAddress string
}
// P2PLsOutput is output type of ls command
type P2PLsOutput struct {
Listeners []P2PListenerInfoOutput
}
// P2PStreamsOutput is output type of streams command
type P2PStreamsOutput struct {
Streams []P2PStreamInfoOutput
}
const (
allowCustomProtocolOptionName = "allow-custom-protocol"
reportPeerIDOptionName = "report-peer-id"
)
var resolveTimeout = 10 * time.Second
// P2PCmd is the 'ipfs p2p' command
var P2PCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Libp2p stream mounting.",
ShortDescription: `
Create and use tunnels to remote peers over libp2p
Note: this command is experimental and subject to change as usecases and APIs
are refined`,
},
Subcommands: map[string]*cmds.Command{
"stream": p2pStreamCmd,
"forward": p2pForwardCmd,
"listen": p2pListenCmd,
"close": p2pCloseCmd,
"ls": p2pLsCmd,
},
}
var p2pForwardCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Forward connections to libp2p service",
ShortDescription: `
Forward connections made to <listen-address> to <target-address>.
<protocol> specifies the libp2p protocol name to use for libp2p
connections and/or handlers. It must be prefixed with '` + P2PProtoPrefix + `'.
Example:
ipfs p2p forward ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/4567 /ipfs/QmPeer
- Forward connections to 127.0.0.1:4567 to '` + P2PProtoPrefix + `myproto' service on /ipfs/QmPeer
`,
},
Arguments: []cmdkit.Argument{
cmdkit.StringArg("protocol", true, false, "Protocol name."),
cmdkit.StringArg("listen-address", true, false, "Listening endpoint."),
cmdkit.StringArg("target-address", true, false, "Target endpoint."),
},
Options: []cmdkit.Option{
cmdkit.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
if err != nil {
return err
}
protoOpt := req.Arguments[0]
listenOpt := req.Arguments[1]
targetOpt := req.Arguments[2]
proto := protocol.ID(protoOpt)
listen, err := ma.NewMultiaddr(listenOpt)
if err != nil {
return err
}
targets, err := parseIpfsAddr(targetOpt)
if err != nil {
return err
}
allowCustom, _ := req.Options[allowCustomProtocolOptionName].(bool)
if !allowCustom && !strings.HasPrefix(string(proto), P2PProtoPrefix) {
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
}
return forwardLocal(n.Context(), n.P2P, n.Peerstore, proto, listen, targets)
},
}
// parseIpfsAddr is a function that takes in addr string and return ipfsAddrs
func parseIpfsAddr(addr string) ([]ipfsaddr.IPFSAddr, error) {
mutiladdr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
if _, err := mutiladdr.ValueForProtocol(ma.P_IPFS); err == nil {
iaddrs := make([]ipfsaddr.IPFSAddr, 1)
iaddrs[0], err = ipfsaddr.ParseMultiaddr(mutiladdr)
if err != nil {
return nil, err
}
return iaddrs, nil
}
// resolve mutiladdr whose protocol is not ma.P_IPFS
ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout)
addrs, err := madns.Resolve(ctx, mutiladdr)
cancel()
if len(addrs) == 0 {
return nil, errors.New("fail to resolve the multiaddr:" + mutiladdr.String())
}
iaddrs := make([]ipfsaddr.IPFSAddr, len(addrs))
for i, addr := range addrs {
iaddrs[i], err = ipfsaddr.ParseMultiaddr(addr)
if err != nil {
return nil, err
}
}
return iaddrs, nil
}
var p2pListenCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Create libp2p service",
ShortDescription: `
Create libp2p service and forward connections made to <target-address>.
<protocol> specifies the libp2p handler name. It must be prefixed with '` + P2PProtoPrefix + `'.
Example:
ipfs p2p listen ` + P2PProtoPrefix + `myproto /ip4/127.0.0.1/tcp/1234
- Forward connections to 'myproto' libp2p service to 127.0.0.1:1234
`,
},
Arguments: []cmdkit.Argument{
cmdkit.StringArg("protocol", true, false, "Protocol name."),
cmdkit.StringArg("target-address", true, false, "Target endpoint."),
},
Options: []cmdkit.Option{
cmdkit.BoolOption(allowCustomProtocolOptionName, "Don't require /x/ prefix"),
cmdkit.BoolOption(reportPeerIDOptionName, "r", "Send remote base58 peerid to target when a new connection is established"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
if err != nil {
return err
}
protoOpt := req.Arguments[0]
targetOpt := req.Arguments[1]
proto := protocol.ID(protoOpt)
target, err := ma.NewMultiaddr(targetOpt)
if err != nil {
return err
}
// port can't be 0
if err := checkPort(target); err != nil {
return err
}
allowCustom, _ := req.Options[allowCustomProtocolOptionName].(bool)
reportPeerID, _ := req.Options[reportPeerIDOptionName].(bool)
if !allowCustom && !strings.HasPrefix(string(proto), P2PProtoPrefix) {
return errors.New("protocol name must be within '" + P2PProtoPrefix + "' namespace")
}
_, err = n.P2P.ForwardRemote(n.Context(), proto, target, reportPeerID)
return err
},
}
// checkPort checks whether target multiaddr contains tcp or udp protocol
// and whether the port is equal to 0
func checkPort(target ma.Multiaddr) error {
// get tcp or udp port from multiaddr
getPort := func() (string, error) {
sport, _ := target.ValueForProtocol(ma.P_TCP)
if sport != "" {
return sport, nil
}
sport, _ = target.ValueForProtocol(ma.P_UDP)
if sport != "" {
return sport, nil
}
return "", fmt.Errorf("address does not contain tcp or udp protocol")
}
sport, err := getPort()
if err != nil {
return err
}
port, err := strconv.Atoi(sport)
if err != nil {
return err
}
if port == 0 {
return fmt.Errorf("port can not be 0")
}
return nil
}
// forwardLocal forwards local connections to a libp2p service
func forwardLocal(ctx context.Context, p *p2p.P2P, ps pstore.Peerstore, proto protocol.ID, bindAddr ma.Multiaddr, addrs []ipfsaddr.IPFSAddr) error {
for _, addr := range addrs {
ps.AddAddr(addr.ID(), addr.Multiaddr(), pstore.TempAddrTTL)
}
// TODO: return some info
// the length of the addrs must large than 0
// peerIDs in addr must be the same and choose addr[0] to connect
_, err := p.ForwardLocal(ctx, addrs[0].ID(), proto, bindAddr)
return err
}
const (
p2pHeadersOptionName = "headers"
)
var p2pLsCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "List active p2p listeners.",
},
Options: []cmdkit.Option{
cmdkit.BoolOption(p2pHeadersOptionName, "v", "Print table headers (Protocol, Listen, Target)."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
if err != nil {
return err
}
output := &P2PLsOutput{}
n.P2P.ListenersLocal.Lock()
for _, listener := range n.P2P.ListenersLocal.Listeners {
output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
Protocol: string(listener.Protocol()),
ListenAddress: listener.ListenAddress().String(),
TargetAddress: listener.TargetAddress().String(),
})
}
n.P2P.ListenersLocal.Unlock()
n.P2P.ListenersP2P.Lock()
for _, listener := range n.P2P.ListenersP2P.Listeners {
output.Listeners = append(output.Listeners, P2PListenerInfoOutput{
Protocol: string(listener.Protocol()),
ListenAddress: listener.ListenAddress().String(),
TargetAddress: listener.TargetAddress().String(),
})
}
n.P2P.ListenersP2P.Unlock()
return cmds.EmitOnce(res, output)
},
Type: P2PLsOutput{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PLsOutput) error {
headers, _ := req.Options[p2pHeadersOptionName].(bool)
tw := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
for _, listener := range out.Listeners {
if headers {
fmt.Fprintln(tw, "Protocol\tListen Address\tTarget Address")
}
fmt.Fprintf(tw, "%s\t%s\t%s\n", listener.Protocol, listener.ListenAddress, listener.TargetAddress)
}
tw.Flush()
return nil
}),
},
}
const (
p2pAllOptionName = "all"
p2pProtocolOptionName = "protocol"
p2pListenAddressOptionName = "listen-address"
p2pTargetAddressOptionName = "target-address"
)
var p2pCloseCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Stop listening for new connections to forward.",
},
Options: []cmdkit.Option{
cmdkit.BoolOption(p2pAllOptionName, "a", "Close all listeners."),
cmdkit.StringOption(p2pProtocolOptionName, "p", "Match protocol name"),
cmdkit.StringOption(p2pListenAddressOptionName, "l", "Match listen address"),
cmdkit.StringOption(p2pTargetAddressOptionName, "t", "Match target address"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
if err != nil {
return err
}
closeAll, _ := req.Options[p2pAllOptionName].(bool)
protoOpt, p := req.Options[p2pProtocolOptionName].(string)
listenOpt, l := req.Options[p2pListenAddressOptionName].(string)
targetOpt, t := req.Options[p2pTargetAddressOptionName].(string)
proto := protocol.ID(protoOpt)
listen, err := ma.NewMultiaddr(listenOpt)
if err != nil {
return err
}
target, err := ma.NewMultiaddr(targetOpt)
if err != nil {
return err
}
if !(closeAll || p || l || t) {
return errors.New("no matching options given")
}
if closeAll && (p || l || t) {
return errors.New("can't combine --all with other matching options")
}
match := func(listener p2p.Listener) bool {
if closeAll {
return true
}
if p && proto != listener.Protocol() {
return false
}
if l && !listen.Equal(listener.ListenAddress()) {
return false
}
if t && !target.Equal(listener.TargetAddress()) {
return false
}
return true
}
done := n.P2P.ListenersLocal.Close(match)
done += n.P2P.ListenersP2P.Close(match)
return cmds.EmitOnce(res, done)
},
Type: int(0),
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out int) error {
fmt.Fprintf(w, "Closed %d stream(s)\n", out)
return nil
}),
},
}
///////
// Stream
//
// p2pStreamCmd is the 'ipfs p2p stream' command
var p2pStreamCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "P2P stream management.",
ShortDescription: "Create and manage p2p streams",
},
Subcommands: map[string]*cmds.Command{
"ls": p2pStreamLsCmd,
"close": p2pStreamCloseCmd,
},
}
var p2pStreamLsCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "List active p2p streams.",
},
Options: []cmdkit.Option{
cmdkit.BoolOption(p2pHeadersOptionName, "v", "Print table headers (ID, Protocol, Local, Remote)."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
if err != nil {
return err
}
output := &P2PStreamsOutput{}
n.P2P.Streams.Lock()
for id, s := range n.P2P.Streams.Streams {
output.Streams = append(output.Streams, P2PStreamInfoOutput{
HandlerID: strconv.FormatUint(id, 10),
Protocol: string(s.Protocol),
OriginAddress: s.OriginAddr.String(),
TargetAddress: s.TargetAddr.String(),
})
}
n.P2P.Streams.Unlock()
return cmds.EmitOnce(res, output)
},
Type: P2PStreamsOutput{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *P2PStreamsOutput) error {
headers, _ := req.Options[p2pHeadersOptionName].(bool)
tw := tabwriter.NewWriter(w, 1, 2, 1, ' ', 0)
for _, stream := range out.Streams {
if headers {
fmt.Fprintln(tw, "ID\tProtocol\tOrigin\tTarget")
}
fmt.Fprintf(tw, "%s\t%s\t%s\t%s\n", stream.HandlerID, stream.Protocol, stream.OriginAddress, stream.TargetAddress)
}
tw.Flush()
return nil
}),
},
}
var p2pStreamCloseCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Close active p2p stream.",
},
Arguments: []cmdkit.Argument{
cmdkit.StringArg("id", false, false, "Stream identifier"),
},
Options: []cmdkit.Option{
cmdkit.BoolOption(p2pAllOptionName, "a", "Close all streams."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := p2pGetNode(env)
if err != nil {
return err
}
closeAll, _ := req.Options[p2pAllOptionName].(bool)
var handlerID uint64
if !closeAll {
if len(req.Arguments) == 0 {
return errors.New("no id specified")
}
handlerID, err = strconv.ParseUint(req.Arguments[0], 10, 64)
if err != nil {
return err
}
}
toClose := make([]*p2p.Stream, 0, 1)
n.P2P.Streams.Lock()
for id, stream := range n.P2P.Streams.Streams {
if !closeAll && handlerID != id {
continue
}
toClose = append(toClose, stream)
if !closeAll {
break
}
}
n.P2P.Streams.Unlock()
for _, s := range toClose {
n.P2P.Streams.Reset(s)
}
return nil
},
}
func p2pGetNode(env cmds.Environment) (*core.IpfsNode, error) {
nd, err := cmdenv.GetNode(env)
if err != nil {
return nil, err
}
config, err := nd.Repo.Config()
if err != nil {
return nil, err
}
if !config.Experimental.Libp2pStreamMounting {
return nil, errors.New("libp2p stream mounting not enabled")
}
if !nd.OnlineMode() {
return nil, ErrNotOnline
}
return nd, nil
}
Go
1
https://gitee.com/Crazyrw/go-ipfs.git
git@gitee.com:Crazyrw/go-ipfs.git
Crazyrw
go-ipfs
go-ipfs
v0.4.19

搜索帮助