Commit 258b25c8 authored by Kostas Christidis's avatar Kostas Christidis Committed by Gerrit Code Review
Browse files

Merge "[FAB-5264] Move interfaces from multichannel"

parents f5e9647d 9b1490ed
/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
// Package msgprocessor provides the implementations for processing of the assorted message
// types which may arrive in the system through Broadcast.
package msgprocessor
import (
cb "github.com/hyperledger/fabric/protos/common"
)
// Classification represents the possible message types for the system.
type Classification int
const (
// NormalMsg is the class of standard (endorser or otherwise non-config) messages.
// Messages of this type should be processed by ProcessNormalMsg.
NormalMsg Classification = iota
// ConfigUpdateMsg is the class of configuration related messages.
// Messages of this type should be processed by ProcessConfigUpdateMsg.
ConfigUpdateMsg
)
// Processor provides the methods necessary to classify and process any message which
// arrives through the Broadcast interface.
type Processor interface {
// ClassifyMsg inspects the message to determine which type of processing is necessary
ClassifyMsg(env *cb.Envelope) (Classification, error)
// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error)
// ProcessConfigUpdateMsg will attempt to apply the config update to the current configuration, and if successful
// return the resulting config message and the configSeq the config was computed from. If the config update message
// is invalid, an error is returned.
ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error)
}
......@@ -19,111 +19,25 @@ package multichannel
import (
"fmt"
"github.com/hyperledger/fabric/common/config"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/broadcast"
"github.com/hyperledger/fabric/orderer/common/configtxfilter"
"github.com/hyperledger/fabric/orderer/common/deliver"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/ledger"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/sigfilter"
"github.com/hyperledger/fabric/orderer/common/sizefilter"
"github.com/hyperledger/fabric/orderer/consensus"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
)
// MsgClassification represents the types of possible messages.
type MsgClassification int
const (
// NormalMsg is the class of standard (endorser or otherwise non-config) messages.
// Messages of this type should be processed by ProcessNormalMsg.
NormalMsg MsgClassification = iota
// ConfigUpdateMsg is the class of configuration related messages.
// Messages of this type should be processed by ProcessConfigUpdateMsg.
ConfigUpdateMsg
)
// Consenter defines the backing ordering mechanism
type Consenter interface {
// HandleChain should create and return a reference to a Chain for the given set of resources
// It will only be invoked for a given chain once per process. In general, errors will be treated
// as irrecoverable and cause system shutdown. See the description of Chain for more details
// The second argument to HandleChain is a pointer to the metadata stored on the `ORDERER` slot of
// the last block committed to the ledger of this Chain. For a new chain, this metadata will be
// nil, as this field is not set on the genesis block
HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
}
// Chain defines a way to inject messages for ordering
// Note, that in order to allow flexibility in the implementation, it is the responsibility of the implementer
// to take the ordered messages, send them through the blockcutter.Receiver supplied via HandleChain to cut blocks,
// and ultimately write the ledger also supplied via HandleChain. This flow allows for two primary flows
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
type Chain interface {
// Enqueue accepts a message and returns true on acceptance, or false on failure
Enqueue(env *cb.Envelope) bool
// Errored returns a channel which will close when an error has occurred
// This is especially useful for the Deliver client, who must terminate waiting
// clients when the consenter is not up to date
Errored() <-chan struct{}
// Start should allocate whatever resources are needed for staying up to date with the chain
// Typically, this involves creating a thread which reads from the ordering source, passes those
// messages to a block cutter, and writes the resulting blocks to the ledger
Start()
// Halt frees the resources which were allocated for this Chain
Halt()
}
// ConsenterSupport provides the resources available to a Consenter implementation
type ConsenterSupport interface {
crypto.LocalSigner
MsgProcessor
BlockCutter() blockcutter.Receiver
SharedConfig() config.Orderer
CreateNextBlock(messages []*cb.Envelope) *cb.Block
WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block
WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block
ChainID() string // ChainID returns the chain ID this specific consenter instance is associated with
Height() uint64 // Returns the number of blocks on the chain this specific consenter instance is associated with
}
// MsgProcessor defines the methods necessary to interact with Broadcast messages.
type MsgProcessor interface {
// ClassifyMsg inspects the message to determine which type of processing is necessary.
ClassifyMsg(env *cb.Envelope) (MsgClassification, error)
// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error)
// ProcessConfigUpdateMsg will attempt to apply the config impetus msg to the current configuration, and if successful
// return the resulting config message and the configSeq the config was computed from. If the config impetus message
// is invalid, an error is returned.
ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error)
}
// ChainSupport provides a wrapper for the resources backing a chain
type ChainSupport interface {
broadcast.Support
deliver.Support
ConsenterSupport
// ProposeConfigUpdate applies a CONFIG_UPDATE to an existing config to produce a *cb.ConfigEnvelope
ProposeConfigUpdate(env *cb.Envelope) (*cb.ConfigEnvelope, error)
}
type chainSupport struct {
// ChainSupport holds the resources for a particular channel.
type ChainSupport struct {
*ledgerResources
chain Chain
chain consensus.Chain
cutter blockcutter.Receiver
filters *filter.RuleSet
signer crypto.LocalSigner
......@@ -134,9 +48,9 @@ type chainSupport struct {
func newChainSupport(
filters *filter.RuleSet,
ledgerResources *ledgerResources,
consenters map[string]Consenter,
consenters map[string]consensus.Consenter,
signer crypto.LocalSigner,
) *chainSupport {
) *ChainSupport {
cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig(), filters)
consenterType := ledgerResources.SharedConfig().ConsensusType()
......@@ -145,7 +59,7 @@ func newChainSupport(
logger.Fatalf("Error retrieving consenter of type: %s", consenterType)
}
cs := &chainSupport{
cs := &ChainSupport{
ledgerResources: ledgerResources,
cutter: cutter,
filters: filters,
......@@ -196,39 +110,43 @@ func createStandardFilters(ledgerResources *ledgerResources) *filter.RuleSet {
}
// createSystemChainFilters creates the set of filters for the ordering system chain
func createSystemChainFilters(ml *multiLedger, ledgerResources *ledgerResources) *filter.RuleSet {
func createSystemChainFilters(r *Registrar, ledgerResources *ledgerResources) *filter.RuleSet {
return filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
sizefilter.MaxBytesRule(ledgerResources.SharedConfig()),
sigfilter.New(policies.ChannelWriters, ledgerResources.PolicyManager()),
newSystemChainFilter(ledgerResources, ml),
newSystemChainFilter(ledgerResources, r),
configtxfilter.NewFilter(ledgerResources),
filter.AcceptRule,
})
}
func (cs *chainSupport) start() {
func (cs *ChainSupport) start() {
cs.chain.Start()
}
func (cs *chainSupport) NewSignatureHeader() (*cb.SignatureHeader, error) {
// NewSignatureHeader passes through to the signer NewSignatureHeader method.
func (cs *ChainSupport) NewSignatureHeader() (*cb.SignatureHeader, error) {
return cs.signer.NewSignatureHeader()
}
func (cs *chainSupport) Sign(message []byte) ([]byte, error) {
// Sign passes through to the signer Sign method.
func (cs *ChainSupport) Sign(message []byte) ([]byte, error) {
return cs.signer.Sign(message)
}
func (cs *chainSupport) Filters() *filter.RuleSet {
// Filters returns the set of filters created for this channel.
func (cs *ChainSupport) Filters() *filter.RuleSet {
return cs.filters
}
func (cs *chainSupport) BlockCutter() blockcutter.Receiver {
// BlockCutter returns the blockcutter.Receiver instance for this channel.
func (cs *ChainSupport) BlockCutter() blockcutter.Receiver {
return cs.cutter
}
// ClassifyMsg inspects the message to determine which type of processing is necessary
func (cs *chainSupport) ClassifyMsg(env *cb.Envelope) (MsgClassification, error) {
func (cs *ChainSupport) ClassifyMsg(env *cb.Envelope) (msgprocessor.Classification, error) {
payload, err := utils.UnmarshalPayload(env.Payload)
if err != nil {
return 0, fmt.Errorf("bad payload: %s", err)
......@@ -245,23 +163,23 @@ func (cs *chainSupport) ClassifyMsg(env *cb.Envelope) (MsgClassification, error)
switch chdr.Type {
case int32(cb.HeaderType_CONFIG_UPDATE):
return ConfigUpdateMsg, nil
return msgprocessor.ConfigUpdateMsg, nil
case int32(cb.HeaderType_ORDERER_TRANSACTION):
return ConfigUpdateMsg, nil
return msgprocessor.ConfigUpdateMsg, nil
// XXX Eventually, these types cannot be allowed to be submitted directly
// return 0, fmt.Errorf("Transactions of type ORDERER_TRANSACTION cannot be Broadcast")
case int32(cb.HeaderType_CONFIG):
return ConfigUpdateMsg, nil
return msgprocessor.ConfigUpdateMsg, nil
// XXX Eventually, these types cannot be allowed to be submitted directly
// return 0, fmt.Errorf("Transactions of type CONFIG cannot be Broadcast")
default:
return NormalMsg, nil
return msgprocessor.NormalMsg, nil
}
}
// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
func (cs *chainSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
func (cs *ChainSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
configSeq = cs.Sequence()
_, err = cs.filters.Apply(env)
return
......@@ -270,27 +188,31 @@ func (cs *chainSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, er
// ProcessConfigUpdateMsg will attempt to apply the config update msg to the current configuration, and if successful
// return the resulting config message and the configSeq the config was computed from. If the config update message
// is invalid, an error is returned.
func (cs *chainSupport) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
func (cs *ChainSupport) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
return nil, cs.Sequence(), fmt.Errorf("Config update message not yet implemented")
}
func (cs *chainSupport) Reader() ledger.Reader {
// Reader returns a reader for the underlying ledger.
func (cs *ChainSupport) Reader() ledger.Reader {
return cs.ledger
}
func (cs *chainSupport) Enqueue(env *cb.Envelope) bool {
// Enqueue takes a message and sends it to the consenter for ordering.
func (cs *ChainSupport) Enqueue(env *cb.Envelope) bool {
return cs.chain.Enqueue(env)
}
func (cs *chainSupport) Errored() <-chan struct{} {
// Errored returns whether the backing consenter has errored
func (cs *ChainSupport) Errored() <-chan struct{} {
return cs.chain.Errored()
}
func (cs *chainSupport) CreateNextBlock(messages []*cb.Envelope) *cb.Block {
// CreateNextBlock creates a new block with the next block number, and the given contents.
func (cs *ChainSupport) CreateNextBlock(messages []*cb.Envelope) *cb.Block {
return ledger.CreateNextBlock(cs.ledger, messages)
}
func (cs *chainSupport) addBlockSignature(block *cb.Block) {
func (cs *ChainSupport) addBlockSignature(block *cb.Block) {
logger.Debugf("%+v", cs)
logger.Debugf("%+v", cs.signer)
......@@ -312,7 +234,7 @@ func (cs *chainSupport) addBlockSignature(block *cb.Block) {
})
}
func (cs *chainSupport) addLastConfigSignature(block *cb.Block) {
func (cs *ChainSupport) addLastConfigSignature(block *cb.Block) {
configSeq := cs.Sequence()
if configSeq > cs.lastConfigSeq {
logger.Debugf("[channel: %s] Detected lastConfigSeq transitioning from %d to %d, setting lastConfig from %d to %d", cs.ChainID(), cs.lastConfigSeq, configSeq, cs.lastConfig, block.Header.Number)
......@@ -337,7 +259,8 @@ func (cs *chainSupport) addLastConfigSignature(block *cb.Block) {
})
}
func (cs *chainSupport) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// WriteConfigBlock should be invoked for blocks which contain a config transaction.
func (cs *ChainSupport) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// XXX This hacky path is temporary and will be removed by the end of this change series
// The panics here are just fine
committer, err := cs.filters.Apply(utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0]))
......@@ -349,7 +272,8 @@ func (cs *chainSupport) WriteConfigBlock(block *cb.Block, encodedMetadataValue [
return cs.WriteBlock(block, encodedMetadataValue)
}
func (cs *chainSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// WriteBlock should be invoked for blocks which contain normal transactions.
func (cs *ChainSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// Set the orderer-related metadata field
if encodedMetadataValue != nil {
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
......@@ -366,6 +290,7 @@ func (cs *chainSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte)
return block
}
func (cs *chainSupport) Height() uint64 {
// Height passes through to the underlying ledger's Height.
func (cs *ChainSupport) Height() uint64 {
return cs.Reader().Height()
}
......@@ -66,7 +66,7 @@ func (mc *mockCommitter) Commit() {
func TestCommitConfig(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{
cs := &ChainSupport{
ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml},
filters: filter.NewRuleSet([]filter.Rule{filter.AcceptRule}),
signer: mockCrypto(),
......@@ -89,7 +89,7 @@ func TestCommitConfig(t *testing.T) {
func TestWriteBlockSignatures(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs := &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
actual := utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil), cb.BlockMetadataIndex_SIGNATURES)
assert.NotNil(t, actual, "Block should have block signature")
......@@ -98,7 +98,7 @@ func TestWriteBlockSignatures(t *testing.T) {
func TestWriteBlockOrdererMetadata(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs := &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
value := []byte("foo")
expected := &cb.Metadata{Value: value}
......@@ -110,7 +110,7 @@ func TestWriteBlockOrdererMetadata(t *testing.T) {
func TestSignature(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs := &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
message := []byte("Darth Vader")
signed, _ := cs.Sign(message)
......@@ -124,7 +124,7 @@ func TestSignature(t *testing.T) {
func TestWriteLastConfig(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs := &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
expected := uint64(0)
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil))
......@@ -144,7 +144,7 @@ func TestWriteLastConfig(t *testing.T) {
cm.SequenceVal = 2
expected = uint64(4)
cs = &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs = &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(4, nil), nil))
assert.Equal(t, expected, lc, "Second block should have config block index of %d, but got %d", expected, lc)
......
......@@ -20,6 +20,7 @@ import (
"github.com/hyperledger/fabric/msp"
"github.com/hyperledger/fabric/orderer/common/ledger"
ramledger "github.com/hyperledger/fabric/orderer/common/ledger/ram"
"github.com/hyperledger/fabric/orderer/consensus"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
......@@ -132,10 +133,10 @@ func TestGetConfigTxFailure(t *testing.T) {
func TestNoSystemChain(t *testing.T) {
lf := ramledger.New(10)
consenters := make(map[string]Consenter)
consenters := make(map[string]consensus.Consenter)
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
assert.Panics(t, func() { NewManagerImpl(lf, consenters, mockCrypto()) }, "Should have panicked when starting without a system chain")
assert.Panics(t, func() { NewRegistrar(lf, consenters, mockCrypto()) }, "Should have panicked when starting without a system chain")
}
// This test checks to make sure that the orderer refuses to come up if there are multiple system channels
......@@ -150,20 +151,20 @@ func TestMultiSystemChannel(t *testing.T) {
assert.NoError(t, err)
}
consenters := make(map[string]Consenter)
consenters := make(map[string]consensus.Consenter)
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
assert.Panics(t, func() { NewManagerImpl(lf, consenters, mockCrypto()) }, "Two system channels should have caused panic")
assert.Panics(t, func() { NewRegistrar(lf, consenters, mockCrypto()) }, "Two system channels should have caused panic")
}
// This test essentially brings the entire system up and is ultimately what main.go will replicate
func TestManagerImpl(t *testing.T) {
lf, rl := NewRAMLedgerAndFactory(10)
consenters := make(map[string]Consenter)
consenters := make(map[string]consensus.Consenter)
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
manager := NewManagerImpl(lf, consenters, mockCrypto())
manager := NewRegistrar(lf, consenters, mockCrypto())
_, ok := manager.GetChain("Fake")
assert.False(t, ok, "Should not have found a chain that was not created")
......@@ -197,9 +198,9 @@ func TestManagerImpl(t *testing.T) {
func TestNewChannelConfig(t *testing.T) {
lf, _ := NewRAMLedgerAndFactoryWithMSP()
consenters := make(map[string]Consenter)
consenters := make(map[string]consensus.Consenter)
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
manager := NewManagerImpl(lf, consenters, mockCrypto())
manager := NewRegistrar(lf, consenters, mockCrypto())
t.Run("BadPayload", func(t *testing.T) {
_, err := manager.NewChannelConfig(&cb.Envelope{Payload: []byte("bad payload")})
......@@ -447,10 +448,10 @@ func TestMismatchedChannelIDs(t *testing.T) {
lf, _ := NewRAMLedgerAndFactory(10)
consenters := make(map[string]Consenter)
consenters := make(map[string]consensus.Consenter)
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
manager := NewManagerImpl(lf, consenters, mockCrypto())
manager := NewRegistrar(lf, consenters, mockCrypto())
_, err = manager.NewChannelConfig(createTx)
assert.Error(t, err, "Mismatched channel IDs")
......@@ -465,10 +466,10 @@ func TestNewChain(t *testing.T) {
lf, rl := NewRAMLedgerAndFactory(10)
consenters := make(map[string]Consenter)
consenters := make(map[string]consensus.Consenter)
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
manager := NewManagerImpl(lf, consenters, mockCrypto())
manager := NewRegistrar(lf, consenters, mockCrypto())
envConfigUpdate, err := configtx.MakeChainCreationTransaction(newChainID, genesisconfig.SampleConsortiumName, mockSigningIdentity)
assert.NoError(t, err, "Constructing chain creation tx")
......@@ -560,10 +561,8 @@ func TestNewChain(t *testing.T) {
testRestartedChainSupport(t, chainSupport, consenters, expectedLastConfigSeq)
}
func testRestartedChainSupport(t *testing.T, cs ChainSupport, consenters map[string]Consenter, expectedLastConfigSeq uint64) {
ccs, ok := cs.(*chainSupport)
assert.True(t, ok, "Casting error")
rcs := newChainSupport(ccs.filters, ccs.ledgerResources, consenters, mockCrypto())
func testRestartedChainSupport(t *testing.T, cs *ChainSupport, consenters map[string]consensus.Consenter, expectedLastConfigSeq uint64) {
rcs := newChainSupport(cs.filters, cs.ledgerResources, consenters, mockCrypto())
assert.Equal(t, expectedLastConfigSeq, rcs.lastConfigSeq, "On restart, incorrect lastConfigSeq")
}
......
......@@ -17,6 +17,7 @@ import (
configtxapi "github.com/hyperledger/fabric/common/configtx/api"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/ledger"
"github.com/hyperledger/fabric/orderer/consensus"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
......@@ -32,19 +33,6 @@ const (
epoch = 0
)
// Manager coordinates the creation and access of chains
type Manager interface {
// GetChain retrieves the chain support for a chain (and whether it exists)
GetChain(chainID string) (ChainSupport, bool)
// SystemChannelID returns the channel ID for the system channel
SystemChannelID() string
// NewChannelConfig returns a bare bones configuration ready for channel
// creation request to be applied on top of it
NewChannelConfig(envConfigUpdate *cb.Envelope) (configtxapi.Manager, error)
}
type configResources struct {
configtxapi.Manager
}
......@@ -62,13 +50,14 @@ type ledgerResources struct {
ledger ledger.ReadWriter
}
type multiLedger struct {
chains map[string]*chainSupport
consenters map[string]Consenter
// Registrar serves as a point of access and control for the individual channel resources.
type Registrar struct {
chains map[string]*ChainSupport
consenters map[string]consensus.Consenter
ledgerFactory ledger.Factory
signer crypto.LocalSigner
systemChannelID string
systemChannel *chainSupport
systemChannel *ChainSupport
}
func getConfigTx(reader ledger.Reader) *cb.Envelope {
......@@ -85,10 +74,10 @@ func getConfigTx(reader ledger.Reader) *cb.Envelope {
return utils.ExtractEnvelopeOrPanic(configBlock, 0)
}
// NewManagerImpl produces an instance of a Manager
func NewManagerImpl(ledgerFactory ledger.Factory, consenters map[string]Consenter, signer crypto.LocalSigner) Manager {
ml := &multiLedger{
chains: make(map[string]*chainSupport),
// NewRegistrar produces an instance of a *Registrar.
func NewRegistrar(ledgerFactory ledger.Factory, consenters map[string]consensus.Consenter, signer crypto.LocalSigner) *Registrar {
r := &Registrar{
chains: make(map[string]*ChainSupport),
ledgerFactory: ledgerFactory,
consenters: consenters,
signer: signer,
......@@ -104,21 +93,21 @@ func NewManagerImpl(ledgerFactory ledger.Factory, consenters map[string]Consente
if configTx == nil {
logger.Panic("Programming error, configTx should never be nil here")
}
ledgerResources := ml.newLedgerResources(configTx)
ledgerResources := r.newLedgerResources(configTx)
chainID := ledgerResources.ChainID()
if _, ok := ledgerResources.ConsortiumsConfig(); ok {
if ml.systemChannelID != "" {
logger.Panicf("There appear to be two system chains %s and %s", ml.systemChannelID, chainID)
if r.systemChannelID != "" {
logger.Panicf("There appear to be two system chains %s and %s", r.systemChannelID, chainID)
}
chain := newChainSupport(createSystemChainFilters(ml, ledgerResources),
chain := newChainSupport(createSystemChainFilters(r, ledgerResources),
ledgerResources,
consenters,
signer)
logger.Infof("Starting with system channel %s and orderer type %s", chainID, chain.SharedConfig().ConsensusType())
ml.chains[chainID] = chain
ml.systemChannelID = chainID
ml.systemChannel = chain
r.chains[chainID] = chain
r.systemChannelID = chainID
r.systemChannel = chain
// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
defer chain.start()
} else {
......@@ -127,30 +116,31 @@ func NewManagerImpl(ledgerFactory ledger.Factory, consenters map[string]Consente
ledgerResources,
consenters,
signer)
ml.chains[chainID] = chain
r.chains[chainID] = chain
chain.start()
}
}
if ml.systemChannelID == "" {
if r.systemChannelID == "" {
logger.Panicf("No system chain found. If bootstrapping, does your system channel contain a consortiums group definition?")
}
return ml
return r
}
func (ml *multiLedger) SystemChannelID() string {
return ml.systemChannelID