3 Star 6 Fork 7

Gitee 极速下载/Hyperledger fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/hyperledger/fabric
registrar.go 9.79 KB
一键复制 编辑 原始数据 按行查看 历史
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
// Package multichannel tracks the channel resources for the orderer. It initially
// loads the set of existing channels, and provides an interface for users of these
// channels to retrieve them, or create new ones.
package multichannel
import (
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
const (
pkgLogID = "orderer/commmon/multichannel"
msgVersion = int32(0)
epoch = 0
var logger = flogging.MustGetLogger(pkgLogID)
// checkResources makes sure that the channel config is compatible with this binary and logs sanity checks
func checkResources(res channelconfig.Resources) error {
oc, ok := res.OrdererConfig()
if !ok {
return errors.New("config does not contain orderer config")
if err := oc.Capabilities().Supported(); err != nil {
return errors.Wrapf(err, "config requires unsupported orderer capabilities: %s", err)
if err := res.ChannelConfig().Capabilities().Supported(); err != nil {
return errors.Wrapf(err, "config requires unsupported channel capabilities: %s", err)
return nil
// checkResourcesOrPanic invokes checkResources and panics if an error is returned
func checkResourcesOrPanic(res channelconfig.Resources) {
if err := checkResources(res); err != nil {
logger.Panicf("[channel %s] %s", res.ConfigtxValidator().ChainID(), err)
type mutableResources interface {
type configResources struct {
func (cr *configResources) CreateBundle(channelID string, config *cb.Config) (*channelconfig.Bundle, error) {
return channelconfig.NewBundle(channelID, config)
func (cr *configResources) Update(bndl *channelconfig.Bundle) {
func (cr *configResources) SharedConfig() channelconfig.Orderer {
oc, ok := cr.OrdererConfig()
if !ok {
logger.Panicf("[channel %s] has no orderer configuration", cr.ConfigtxValidator().ChainID())
return oc
type ledgerResources struct {
// Registrar serves as a point of access and control for the individual channel resources.
type Registrar struct {
lock sync.RWMutex
chains map[string]*ChainSupport
consenters map[string]consensus.Consenter
ledgerFactory blockledger.Factory
signer crypto.LocalSigner
systemChannelID string
systemChannel *ChainSupport
templator msgprocessor.ChannelConfigTemplator
callbacks []func(bundle *channelconfig.Bundle)
func getConfigTx(reader blockledger.Reader) *cb.Envelope {
lastBlock := blockledger.GetBlock(reader, reader.Height()-1)
index, err := utils.GetLastConfigIndexFromBlock(lastBlock)
if err != nil {
logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err)
configBlock := blockledger.GetBlock(reader, index)
if configBlock == nil {
logger.Panicf("Config block does not exist")
return utils.ExtractEnvelopeOrPanic(configBlock, 0)
// NewRegistrar produces an instance of a *Registrar.
func NewRegistrar(ledgerFactory blockledger.Factory, consenters map[string]consensus.Consenter,
signer crypto.LocalSigner, callbacks ...func(bundle *channelconfig.Bundle)) *Registrar {
r := &Registrar{
chains: make(map[string]*ChainSupport),
ledgerFactory: ledgerFactory,
consenters: consenters,
signer: signer,
callbacks: callbacks,
existingChains := ledgerFactory.ChainIDs()
for _, chainID := range existingChains {
rl, err := ledgerFactory.GetOrCreate(chainID)
if err != nil {
logger.Panicf("Ledger factory reported chainID %s but could not retrieve it: %s", chainID, err)
configTx := getConfigTx(rl)
if configTx == nil {
logger.Panic("Programming error, configTx should never be nil here")
ledgerResources := r.newLedgerResources(configTx)
chainID := ledgerResources.ConfigtxValidator().ChainID()
if _, ok := ledgerResources.ConsortiumsConfig(); ok {
if r.systemChannelID != "" {
logger.Panicf("There appear to be two system chains %s and %s", r.systemChannelID, chainID)
chain := newChainSupport(
r.templator = msgprocessor.NewDefaultTemplator(chain)
chain.Processor = msgprocessor.NewSystemChannel(chain, r.templator, msgprocessor.CreateSystemChannelFilters(r, chain))
// Retrieve genesis block to log its hash. See FAB-5450 for the purpose
iter, pos := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}})
defer iter.Close()
if pos != uint64(0) {
logger.Panicf("Error iterating over system channel: '%s', expected position 0, got %d", chainID, pos)
genesisBlock, status := iter.Next()
if status != cb.Status_SUCCESS {
logger.Panicf("Error reading genesis block of system channel '%s'", chainID)
logger.Infof("Starting system channel '%s' with genesis block hash %x and orderer type %s", chainID, genesisBlock.Header.Hash(), chain.SharedConfig().ConsensusType())
r.chains[chainID] = chain
r.systemChannelID = chainID
r.systemChannel = chain
// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
defer chain.start()
} else {
logger.Debugf("Starting chain: %s", chainID)
chain := newChainSupport(
r.chains[chainID] = chain
if r.systemChannelID == "" {
logger.Panicf("No system chain found. If bootstrapping, does your system channel contain a consortiums group definition?")
return r
// SystemChannelID returns the ChannelID for the system channel.
func (r *Registrar) SystemChannelID() string {
return r.systemChannelID
// BroadcastChannelSupport returns the message channel header, whether the message is a config update
// and the channel resources for a message or an error if the message is not a message which can
// be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, *ChainSupport, error) {
chdr, err := utils.ChannelHeader(msg)
if err != nil {
return nil, false, nil, fmt.Errorf("could not determine channel ID: %s", err)
cs, ok := r.GetChain(chdr.ChannelId)
if !ok {
cs = r.systemChannel
isConfig := false
switch cs.ClassifyMsg(chdr) {
case msgprocessor.ConfigUpdateMsg:
isConfig = true
case msgprocessor.ConfigMsg:
return chdr, false, nil, errors.New("message is of type that cannot be processed directly")
return chdr, isConfig, cs, nil
// GetChain retrieves the chain support for a chain (and whether it exists)
func (r *Registrar) GetChain(chainID string) (*ChainSupport, bool) {
defer r.lock.RUnlock()
cs, ok := r.chains[chainID]
return cs, ok
func (r *Registrar) newLedgerResources(configTx *cb.Envelope) *ledgerResources {
payload, err := utils.UnmarshalPayload(configTx.Payload)
if err != nil {
logger.Panicf("Error umarshaling envelope to payload: %s", err)
if payload.Header == nil {
logger.Panicf("Missing channel header: %s", err)
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Panicf("Error unmarshaling channel header: %s", err)
configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
if err != nil {
logger.Panicf("Error umarshaling config envelope from payload data: %s", err)
bundle, err := channelconfig.NewBundle(chdr.ChannelId, configEnvelope.Config)
if err != nil {
logger.Panicf("Error creating channelconfig bundle: %s", err)
ledger, err := r.ledgerFactory.GetOrCreate(chdr.ChannelId)
if err != nil {
logger.Panicf("Error getting ledger for %s", chdr.ChannelId)
return &ledgerResources{
configResources: &configResources{
mutableResources: channelconfig.NewBundleSource(bundle, r.callbacks...),
ReadWriter: ledger,
func (r *Registrar) newChain(configtx *cb.Envelope) {
defer r.lock.Unlock()
ledgerResources := r.newLedgerResources(configtx)
ledgerResources.Append(blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx}))
// Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is
newChains := make(map[string]*ChainSupport)
for key, value := range r.chains {
newChains[key] = value
cs := newChainSupport(r, ledgerResources, r.consenters, r.signer)
chainID := ledgerResources.ConfigtxValidator().ChainID()
logger.Infof("Created and starting new chain %s", chainID)
newChains[string(chainID)] = cs
r.chains = newChains
// ChannelsCount returns the count of the current total number of channels.
func (r *Registrar) ChannelsCount() int {
defer r.lock.RUnlock()
return len(r.chains)
// NewChannelConfig produces a new template channel configuration based on the system channel's current config.
func (r *Registrar) NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfig.Resources, error) {
return r.templator.NewChannelConfig(envConfigUpdate)
// CreateBundle calls channelconfig.NewBundle
func (r *Registrar) CreateBundle(channelID string, config *cb.Config) (channelconfig.Resources, error) {
return channelconfig.NewBundle(channelID, config)
马建仓 AI 助手
Hyperledger fabric
