1 Star 0 Fork 0

祝雨昕 / go-bitswap

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
session.go 15.12 KB
一键复制 编辑 原始数据 按行查看 历史
祝雨昕 提交于 2022-04-05 15:58 . change
package session
import (
"context"
"time"
bsbpm "gitee.com/yuviazhu/go-bitswap/internal/blockpresencemanager"
bsgetter "gitee.com/yuviazhu/go-bitswap/internal/getter"
notifications "gitee.com/yuviazhu/go-bitswap/internal/notifications"
bspm "gitee.com/yuviazhu/go-bitswap/internal/peermanager"
bssim "gitee.com/yuviazhu/go-bitswap/internal/sessioninterestmanager"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
loggables "github.com/libp2p/go-libp2p-loggables"
"go.uber.org/zap"
)
var log = logging.Logger("bs:sess")
var sflog = log.Desugar()
const (
broadcastLiveWantsLimit = 64
)
// PeerManager keeps track of which sessions are interested in which peers
// and takes care of sending wants for the sessions
type PeerManager interface {
// RegisterSession tells the PeerManager that the session is interested
// in a peer's connection state
RegisterSession(peer.ID, bspm.Session)
// UnregisterSession tells the PeerManager that the session is no longer
// interested in a peer's connection state
UnregisterSession(uint64)
// SendWants tells the PeerManager to send wants to the given peer
SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
// BroadcastWantHaves sends want-haves to all connected peers (used for
// session discovery)
BroadcastWantHaves(context.Context, []cid.Cid)
// SendCancels tells the PeerManager to send cancels to all peers
SendCancels(context.Context, []cid.Cid)
}
// SessionManager manages all the sessions
type SessionManager interface {
// Remove a session (called when the session shuts down)
RemoveSession(sesid uint64)
// Cancel wants (called when a call to GetBlocks() is cancelled)
CancelSessionWants(sid uint64, wants []cid.Cid)
}
// SessionPeerManager keeps track of peers in the session
type SessionPeerManager interface {
// PeersDiscovered indicates if any peers have been discovered yet
PeersDiscovered() bool
// Shutdown the SessionPeerManager
Shutdown()
// Adds a peer to the session, returning true if the peer is new
AddPeer(peer.ID) bool
// Removes a peer from the session, returning true if the peer existed
RemovePeer(peer.ID) bool
// All peers in the session
Peers() []peer.ID
// Whether there are any peers in the session
HasPeers() bool
// Protect connection from being pruned by the connection manager
ProtectConnection(peer.ID)
}
// ProviderFinder is used to find providers for a given key
type ProviderFinder interface {
// FindProvidersAsync searches for peers that provide the given CID
FindProvidersAsync(ctx context.Context, k cid.Cid) <-chan peer.ID
}
// opType is the kind of operation that is being processed by the event loop
type opType int
const (
// Receive blocks
opReceive opType = iota
// Want blocks
opWant
// Cancel wants
opCancel
// Broadcast want-haves
opBroadcast
// Wants sent to peers
opWantsSent
)
type op struct {
op opType
keys []cid.Cid
}
// Session holds state for an individual bitswap transfer operation.
// This allows bitswap to make smarter decisions about who to send wantlist
// info to, and who to request blocks from.
type Session struct {
// dependencies
ctx context.Context
shutdown func()
sm SessionManager
pm PeerManager
sprm SessionPeerManager
providerFinder ProviderFinder
sim *bssim.SessionInterestManager
sw sessionWants
sws sessionWantSender
latencyTrkr latencyTracker
// channels
incoming chan op
tickDelayReqs chan time.Duration
// do not touch outside run loop
idleTick *time.Timer
periodicSearchTimer *time.Timer
baseTickDelay time.Duration
consecutiveTicks int
initialSearchDelay time.Duration
periodicSearchDelay delay.D
// identifiers
notif notifications.PubSub
uuid logging.Loggable
id uint64
self peer.ID
}
// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(
ctx context.Context,
sm SessionManager,
id uint64,
sprm SessionPeerManager,
providerFinder ProviderFinder,
sim *bssim.SessionInterestManager,
pm PeerManager,
bpm *bsbpm.BlockPresenceManager,
notif notifications.PubSub,
initialSearchDelay time.Duration,
periodicSearchDelay delay.D,
self peer.ID) *Session {
ctx, cancel := context.WithCancel(ctx)
s := &Session{
sw: newSessionWants(broadcastLiveWantsLimit),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
shutdown: cancel,
sm: sm,
pm: pm,
sprm: sprm,
providerFinder: providerFinder,
sim: sim,
incoming: make(chan op, 128),
latencyTrkr: latencyTracker{},
notif: notif,
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: id,
initialSearchDelay: initialSearchDelay,
periodicSearchDelay: periodicSearchDelay,
self: self,
}
s.sws = newSessionWantSender(id, pm, sprm, sm, bpm, s.onWantsSent, s.onPeersExhausted)
go s.run(ctx)
return s
}
func (s *Session) ID() uint64 {
return s.id
}
func (s *Session) Shutdown() {
s.shutdown()
}
// ReceiveFrom receives incoming blocks from the given peer.
func (s *Session) ReceiveFrom(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
// The SessionManager tells each Session about all keys that it may be
// interested in. Here the Session filters the keys to the ones that this
// particular Session is interested in.
interestedRes := s.sim.FilterSessionInterested(s.id, ks, haves, dontHaves)
ks = interestedRes[0]
haves = interestedRes[1]
dontHaves = interestedRes[2]
s.logReceiveFrom(from, ks, haves, dontHaves)
// Inform the session want sender that a message has been received
s.sws.Update(from, ks, haves, dontHaves)
if len(ks) == 0 {
return
}
// Inform the session that blocks have been received
select {
case s.incoming <- op{op: opReceive, keys: ks}:
case <-s.ctx.Done():
}
}
func (s *Session) logReceiveFrom(from peer.ID, interestedKs []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
// Save some CPU cycles if log level is higher than debug
if ce := sflog.Check(zap.DebugLevel, "Bitswap <- rcv message"); ce == nil {
return
}
for _, c := range interestedKs {
log.Debugw("Bitswap <- block", "local", s.self, "from", from, "cid", c, "session", s.id)
}
for _, c := range haves {
log.Debugw("Bitswap <- HAVE", "local", s.self, "from", from, "cid", c, "session", s.id)
}
for _, c := range dontHaves {
log.Debugw("Bitswap <- DONT_HAVE", "local", s.self, "from", from, "cid", c, "session", s.id)
}
}
// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
}
// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
ctx = logging.ContextWithLoggable(ctx, s.uuid)
return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
func(ctx context.Context, keys []cid.Cid) {
select {
case s.incoming <- op{op: opWant, keys: keys}:
case <-ctx.Done():
case <-s.ctx.Done():
}
},
func(keys []cid.Cid) {
select {
case s.incoming <- op{op: opCancel, keys: keys}:
case <-s.ctx.Done():
}
},
)
}
// SetBaseTickDelay changes the rate at which ticks happen.
func (s *Session) SetBaseTickDelay(baseTickDelay time.Duration) {
select {
case s.tickDelayReqs <- baseTickDelay:
case <-s.ctx.Done():
}
}
// onWantsSent is called when wants are sent to a peer by the session wants sender
func (s *Session) onWantsSent(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
allBlks := append(wantBlocks[:len(wantBlocks):len(wantBlocks)], wantHaves...)
s.nonBlockingEnqueue(op{op: opWantsSent, keys: allBlks})
}
// onPeersExhausted is called when all available peers have sent DONT_HAVE for
// a set of cids (or all peers become unavailable)
func (s *Session) onPeersExhausted(ks []cid.Cid) {
s.nonBlockingEnqueue(op{op: opBroadcast, keys: ks})
}
// We don't want to block the sessionWantSender if the incoming channel
// is full. So if we can't immediately send on the incoming channel spin
// it off into a go-routine.
func (s *Session) nonBlockingEnqueue(o op) {
select {
case s.incoming <- o:
default:
go func() {
select {
case s.incoming <- o:
case <-s.ctx.Done():
}
}()
}
}
// Session run loop -- everything in this function should not be called
// outside of this loop
func (s *Session) run(ctx context.Context) {
go s.sws.Run()
s.idleTick = time.NewTimer(s.initialSearchDelay)
s.periodicSearchTimer = time.NewTimer(s.periodicSearchDelay.NextWaitTime())
for {
select {
case oper := <-s.incoming:
switch oper.op {
case opReceive:
// Received blocks
s.handleReceive(oper.keys)
case opWant:
// Client wants blocks
s.wantBlocks(ctx, oper.keys)
case opCancel:
// Wants were cancelled
s.sw.CancelPending(oper.keys)
s.sws.Cancel(oper.keys)
case opWantsSent:
// Wants were sent to a peer
s.sw.WantsSent(oper.keys)
case opBroadcast:
// Broadcast want-haves to all peers
s.broadcast(ctx, oper.keys)
default:
panic("unhandled operation")
}
case <-s.idleTick.C:
// The session hasn't received blocks for a while, broadcast
s.broadcast(ctx, nil)
case <-s.periodicSearchTimer.C:
// Periodically search for a random live want
s.handlePeriodicSearch(ctx)
case baseTickDelay := <-s.tickDelayReqs:
// Set the base tick delay
s.baseTickDelay = baseTickDelay
case <-ctx.Done():
// Shutdown
s.handleShutdown()
return
}
}
}
// Called when the session hasn't received any blocks for some time, or when
// all peers in the session have sent DONT_HAVE for a particular set of CIDs.
// Send want-haves to all connected peers, and search for new peers with the CID.
func (s *Session) broadcast(ctx context.Context, wants []cid.Cid) {
// If this broadcast is because of an idle timeout (we haven't received
// any blocks for a while) then broadcast all pending wants
if wants == nil {
wants = s.sw.PrepareBroadcast()
}
// Broadcast a want-have for the live wants to everyone we're connected to
s.broadcastWantHaves(ctx, wants)
// do not find providers on consecutive ticks
// -- just rely on periodic search widening
if len(wants) > 0 && (s.consecutiveTicks == 0) {
// Search for providers who have the first want in the list.
// Typically if the provider has the first block they will have
// the rest of the blocks also.
log.Debugw("FindMorePeers", "session", s.id, "cid", wants[0], "pending", len(wants))
s.findMorePeers(ctx, wants[0])
}
s.resetIdleTick()
// If we have live wants record a consecutive tick
if s.sw.HasLiveWants() {
s.consecutiveTicks++
}
}
// handlePeriodicSearch is called periodically to search for providers of a
// randomly chosen CID in the sesssion.
func (s *Session) handlePeriodicSearch(ctx context.Context) {
randomWant := s.sw.RandomLiveWant()
if !randomWant.Defined() {
return
}
// TODO: come up with a better strategy for determining when to search
// for new providers for blocks.
s.findMorePeers(ctx, randomWant)
s.broadcastWantHaves(ctx, []cid.Cid{randomWant})
s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
}
// findMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) {
go func(k cid.Cid) {
for p := range s.providerFinder.FindProvidersAsync(ctx, k) {
// When a provider indicates that it has a cid, it's equivalent to
// the providing peer sending a HAVE
s.sws.Update(p, nil, []cid.Cid{c}, nil)
}
}(c)
}
// handleShutdown is called when the session shuts down
func (s *Session) handleShutdown() {
// Stop the idle timer
s.idleTick.Stop()
// Shut down the session peer manager
s.sprm.Shutdown()
// Shut down the sessionWantSender (blocks until sessionWantSender stops
// sending)
s.sws.Shutdown()
// Signal to the SessionManager that the session has been shutdown
// and can be cleaned up
s.sm.RemoveSession(s.id)
}
// handleReceive is called when the session receives blocks from a peer
func (s *Session) handleReceive(ks []cid.Cid) {
// Record which blocks have been received and figure out the total latency
// for fetching the blocks
wanted, totalLatency := s.sw.BlocksReceived(ks)
if len(wanted) == 0 {
return
}
// Record latency
s.latencyTrkr.receiveUpdate(len(wanted), totalLatency)
// Inform the SessionInterestManager that this session is no longer
// expecting to receive the wanted keys
s.sim.RemoveSessionWants(s.id, wanted)
s.idleTick.Stop()
// We've received new wanted blocks, so reset the number of ticks
// that have occurred since the last new block
s.consecutiveTicks = 0
s.resetIdleTick()
}
// wantBlocks is called when blocks are requested by the client
func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
if len(newks) > 0 {
// Inform the SessionInterestManager that this session is interested in the keys
s.sim.RecordSessionInterest(s.id, newks)
// Tell the sessionWants tracker that that the wants have been requested
s.sw.BlocksRequested(newks)
// Tell the sessionWantSender that the blocks have been requested
s.sws.Add(newks)
}
// If we have discovered peers already, the sessionWantSender will
// send wants to them
if s.sprm.PeersDiscovered() {
return
}
// No peers discovered yet, broadcast some want-haves
ks := s.sw.GetNextWants()
if len(ks) > 0 {
log.Infow("No peers - broadcasting", "session", s.id, "want-count", len(ks))
s.broadcastWantHaves(ctx, ks)
}
}
// Send want-haves to all connected peers
func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
log.Debugw("broadcastWantHaves", "session", s.id, "cids", wants)
s.pm.BroadcastWantHaves(ctx, wants)
}
// The session will broadcast if it has outstanding wants and doesn't receive
// any blocks for some time.
// The length of time is calculated
// - initially
// as a fixed delay
// - once some blocks are received
// from a base delay and average latency, with a backoff
func (s *Session) resetIdleTick() {
var tickDelay time.Duration
if !s.latencyTrkr.hasLatency() {
tickDelay = s.initialSearchDelay
} else {
avLat := s.latencyTrkr.averageLatency()
tickDelay = s.baseTickDelay + (3 * avLat)
}
tickDelay = tickDelay * time.Duration(1+s.consecutiveTicks)
s.idleTick.Reset(tickDelay)
}
// latencyTracker keeps track of the average latency between sending a want
// and receiving the corresponding block
type latencyTracker struct {
totalLatency time.Duration
count int
}
func (lt *latencyTracker) hasLatency() bool {
return lt.totalLatency > 0 && lt.count > 0
}
func (lt *latencyTracker) averageLatency() time.Duration {
return lt.totalLatency / time.Duration(lt.count)
}
func (lt *latencyTracker) receiveUpdate(count int, totalLatency time.Duration) {
lt.totalLatency += totalLatency
lt.count += count
}
1
https://gitee.com/yuviazhu/go-bitswap.git
git@gitee.com:yuviazhu/go-bitswap.git
yuviazhu
go-bitswap
go-bitswap
f20a32d77e37

搜索帮助

53164aa7 5694891 3bd8fe86 5694891