Commit 9b1490ed authored by Jason Yellick's avatar Jason Yellick
Browse files

[FAB-5264] Move interfaces from multichannel



The multichannel (formerly multichain) package became a bit of a dumping
ground for all of the interface definitions used in the orderer system.
this makes the multichannel package clunky, difficult to mock, and
encourages poor separation between components.

This CR extracts the interface definitions and moves them to more
sensible locations based on the natural separation of function within
the orderer codebase.

It is also fixed the multichannel package to generally return pointers
to structs, rather than to interfaces, following the golang best
practice of accepting interfaces and returning structs.

Change-Id: Iaf004e1dadf7bf92d106bd7c90f244e0089b9924
Signed-off-by: default avatarJason Yellick <jyellick@us.ibm.com>
parent d6b54c89
/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
// Package msgprocessor provides the implementations for processing of the assorted message
// types which may arrive in the system through Broadcast.
package msgprocessor
import (
cb "github.com/hyperledger/fabric/protos/common"
)
// Classification represents the possible message types for the system.
type Classification int
const (
// NormalMsg is the class of standard (endorser or otherwise non-config) messages.
// Messages of this type should be processed by ProcessNormalMsg.
NormalMsg Classification = iota
// ConfigUpdateMsg is the class of configuration related messages.
// Messages of this type should be processed by ProcessConfigUpdateMsg.
ConfigUpdateMsg
)
// Processor provides the methods necessary to classify and process any message which
// arrives through the Broadcast interface.
type Processor interface {
// ClassifyMsg inspects the message to determine which type of processing is necessary
ClassifyMsg(env *cb.Envelope) (Classification, error)
// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error)
// ProcessConfigUpdateMsg will attempt to apply the config update to the current configuration, and if successful
// return the resulting config message and the configSeq the config was computed from. If the config update message
// is invalid, an error is returned.
ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error)
}
......@@ -19,111 +19,25 @@ package multichannel
import (
"fmt"
"github.com/hyperledger/fabric/common/config"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/broadcast"
"github.com/hyperledger/fabric/orderer/common/configtxfilter"
"github.com/hyperledger/fabric/orderer/common/deliver"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/ledger"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
"github.com/hyperledger/fabric/orderer/common/sigfilter"
"github.com/hyperledger/fabric/orderer/common/sizefilter"
"github.com/hyperledger/fabric/orderer/consensus"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
)
// MsgClassification represents the types of possible messages.
type MsgClassification int
const (
// NormalMsg is the class of standard (endorser or otherwise non-config) messages.
// Messages of this type should be processed by ProcessNormalMsg.
NormalMsg MsgClassification = iota
// ConfigUpdateMsg is the class of configuration related messages.
// Messages of this type should be processed by ProcessConfigUpdateMsg.
ConfigUpdateMsg
)
// Consenter defines the backing ordering mechanism
type Consenter interface {
// HandleChain should create and return a reference to a Chain for the given set of resources
// It will only be invoked for a given chain once per process. In general, errors will be treated
// as irrecoverable and cause system shutdown. See the description of Chain for more details
// The second argument to HandleChain is a pointer to the metadata stored on the `ORDERER` slot of
// the last block committed to the ledger of this Chain. For a new chain, this metadata will be
// nil, as this field is not set on the genesis block
HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
}
// Chain defines a way to inject messages for ordering
// Note, that in order to allow flexibility in the implementation, it is the responsibility of the implementer
// to take the ordered messages, send them through the blockcutter.Receiver supplied via HandleChain to cut blocks,
// and ultimately write the ledger also supplied via HandleChain. This flow allows for two primary flows
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
type Chain interface {
// Enqueue accepts a message and returns true on acceptance, or false on failure
Enqueue(env *cb.Envelope) bool
// Errored returns a channel which will close when an error has occurred
// This is especially useful for the Deliver client, who must terminate waiting
// clients when the consenter is not up to date
Errored() <-chan struct{}
// Start should allocate whatever resources are needed for staying up to date with the chain
// Typically, this involves creating a thread which reads from the ordering source, passes those
// messages to a block cutter, and writes the resulting blocks to the ledger
Start()
// Halt frees the resources which were allocated for this Chain
Halt()
}
// ConsenterSupport provides the resources available to a Consenter implementation
type ConsenterSupport interface {
crypto.LocalSigner
MsgProcessor
BlockCutter() blockcutter.Receiver
SharedConfig() config.Orderer
CreateNextBlock(messages []*cb.Envelope) *cb.Block
WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block
WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block
ChainID() string // ChainID returns the chain ID this specific consenter instance is associated with
Height() uint64 // Returns the number of blocks on the chain this specific consenter instance is associated with
}
// MsgProcessor defines the methods necessary to interact with Broadcast messages.
type MsgProcessor interface {
// ClassifyMsg inspects the message to determine which type of processing is necessary.
ClassifyMsg(env *cb.Envelope) (MsgClassification, error)
// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error)
// ProcessConfigUpdateMsg will attempt to apply the config impetus msg to the current configuration, and if successful
// return the resulting config message and the configSeq the config was computed from. If the config impetus message
// is invalid, an error is returned.
ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error)
}
// ChainSupport provides a wrapper for the resources backing a chain
type ChainSupport interface {
broadcast.Support
deliver.Support
ConsenterSupport
// ProposeConfigUpdate applies a CONFIG_UPDATE to an existing config to produce a *cb.ConfigEnvelope
ProposeConfigUpdate(env *cb.Envelope) (*cb.ConfigEnvelope, error)
}
type chainSupport struct {
// ChainSupport holds the resources for a particular channel.
type ChainSupport struct {
*ledgerResources
chain Chain
chain consensus.Chain
cutter blockcutter.Receiver
filters *filter.RuleSet
signer crypto.LocalSigner
......@@ -134,9 +48,9 @@ type chainSupport struct {
func newChainSupport(
filters *filter.RuleSet,
ledgerResources *ledgerResources,
consenters map[string]Consenter,
consenters map[string]consensus.Consenter,
signer crypto.LocalSigner,
) *chainSupport {
) *ChainSupport {
cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig(), filters)
consenterType := ledgerResources.SharedConfig().ConsensusType()
......@@ -145,7 +59,7 @@ func newChainSupport(
logger.Fatalf("Error retrieving consenter of type: %s", consenterType)
}
cs := &chainSupport{
cs := &ChainSupport{
ledgerResources: ledgerResources,
cutter: cutter,
filters: filters,
......@@ -196,39 +110,43 @@ func createStandardFilters(ledgerResources *ledgerResources) *filter.RuleSet {
}
// createSystemChainFilters creates the set of filters for the ordering system chain
func createSystemChainFilters(ml *multiLedger, ledgerResources *ledgerResources) *filter.RuleSet {
func createSystemChainFilters(r *Registrar, ledgerResources *ledgerResources) *filter.RuleSet {
return filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
sizefilter.MaxBytesRule(ledgerResources.SharedConfig()),
sigfilter.New(policies.ChannelWriters, ledgerResources.PolicyManager()),
newSystemChainFilter(ledgerResources, ml),
newSystemChainFilter(ledgerResources, r),
configtxfilter.NewFilter(ledgerResources),
filter.AcceptRule,
})
}
func (cs *chainSupport) start() {
func (cs *ChainSupport) start() {
cs.chain.Start()
}
func (cs *chainSupport) NewSignatureHeader() (*cb.SignatureHeader, error) {
// NewSignatureHeader passes through to the signer NewSignatureHeader method.
func (cs *ChainSupport) NewSignatureHeader() (*cb.SignatureHeader, error) {
return cs.signer.NewSignatureHeader()
}
func (cs *chainSupport) Sign(message []byte) ([]byte, error) {
// Sign passes through to the signer Sign method.
func (cs *ChainSupport) Sign(message []byte) ([]byte, error) {
return cs.signer.Sign(message)
}
func (cs *chainSupport) Filters() *filter.RuleSet {
// Filters returns the set of filters created for this channel.
func (cs *ChainSupport) Filters() *filter.RuleSet {
return cs.filters
}
func (cs *chainSupport) BlockCutter() blockcutter.Receiver {
// BlockCutter returns the blockcutter.Receiver instance for this channel.
func (cs *ChainSupport) BlockCutter() blockcutter.Receiver {
return cs.cutter
}
// ClassifyMsg inspects the message to determine which type of processing is necessary
func (cs *chainSupport) ClassifyMsg(env *cb.Envelope) (MsgClassification, error) {
func (cs *ChainSupport) ClassifyMsg(env *cb.Envelope) (msgprocessor.Classification, error) {
payload, err := utils.UnmarshalPayload(env.Payload)
if err != nil {
return 0, fmt.Errorf("bad payload: %s", err)
......@@ -245,23 +163,23 @@ func (cs *chainSupport) ClassifyMsg(env *cb.Envelope) (MsgClassification, error)
switch chdr.Type {
case int32(cb.HeaderType_CONFIG_UPDATE):
return ConfigUpdateMsg, nil
return msgprocessor.ConfigUpdateMsg, nil
case int32(cb.HeaderType_ORDERER_TRANSACTION):
return ConfigUpdateMsg, nil
return msgprocessor.ConfigUpdateMsg, nil
// XXX Eventually, these types cannot be allowed to be submitted directly
// return 0, fmt.Errorf("Transactions of type ORDERER_TRANSACTION cannot be Broadcast")
case int32(cb.HeaderType_CONFIG):
return ConfigUpdateMsg, nil
return msgprocessor.ConfigUpdateMsg, nil
// XXX Eventually, these types cannot be allowed to be submitted directly
// return 0, fmt.Errorf("Transactions of type CONFIG cannot be Broadcast")
default:
return NormalMsg, nil
return msgprocessor.NormalMsg, nil
}
}
// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
func (cs *chainSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
func (cs *ChainSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
configSeq = cs.Sequence()
_, err = cs.filters.Apply(env)
return
......@@ -270,27 +188,31 @@ func (cs *chainSupport) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, er
// ProcessConfigUpdateMsg will attempt to apply the config update msg to the current configuration, and if successful
// return the resulting config message and the configSeq the config was computed from. If the config update message
// is invalid, an error is returned.
func (cs *chainSupport) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
func (cs *ChainSupport) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
return nil, cs.Sequence(), fmt.Errorf("Config update message not yet implemented")
}
func (cs *chainSupport) Reader() ledger.Reader {
// Reader returns a reader for the underlying ledger.
func (cs *ChainSupport) Reader() ledger.Reader {
return cs.ledger
}
func (cs *chainSupport) Enqueue(env *cb.Envelope) bool {
// Enqueue takes a message and sends it to the consenter for ordering.
func (cs *ChainSupport) Enqueue(env *cb.Envelope) bool {
return cs.chain.Enqueue(env)
}
func (cs *chainSupport) Errored() <-chan struct{} {
// Errored returns whether the backing consenter has errored
func (cs *ChainSupport) Errored() <-chan struct{} {
return cs.chain.Errored()
}
func (cs *chainSupport) CreateNextBlock(messages []*cb.Envelope) *cb.Block {
// CreateNextBlock creates a new block with the next block number, and the given contents.
func (cs *ChainSupport) CreateNextBlock(messages []*cb.Envelope) *cb.Block {
return ledger.CreateNextBlock(cs.ledger, messages)
}
func (cs *chainSupport) addBlockSignature(block *cb.Block) {
func (cs *ChainSupport) addBlockSignature(block *cb.Block) {
logger.Debugf("%+v", cs)
logger.Debugf("%+v", cs.signer)
......@@ -312,7 +234,7 @@ func (cs *chainSupport) addBlockSignature(block *cb.Block) {
})
}
func (cs *chainSupport) addLastConfigSignature(block *cb.Block) {
func (cs *ChainSupport) addLastConfigSignature(block *cb.Block) {
configSeq := cs.Sequence()
if configSeq > cs.lastConfigSeq {
logger.Debugf("[channel: %s] Detected lastConfigSeq transitioning from %d to %d, setting lastConfig from %d to %d", cs.ChainID(), cs.lastConfigSeq, configSeq, cs.lastConfig, block.Header.Number)
......@@ -337,7 +259,8 @@ func (cs *chainSupport) addLastConfigSignature(block *cb.Block) {
})
}
func (cs *chainSupport) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// WriteConfigBlock should be invoked for blocks which contain a config transaction.
func (cs *ChainSupport) WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// XXX This hacky path is temporary and will be removed by the end of this change series
// The panics here are just fine
committer, err := cs.filters.Apply(utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0]))
......@@ -349,7 +272,8 @@ func (cs *chainSupport) WriteConfigBlock(block *cb.Block, encodedMetadataValue [
return cs.WriteBlock(block, encodedMetadataValue)
}
func (cs *chainSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// WriteBlock should be invoked for blocks which contain normal transactions.
func (cs *ChainSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte) *cb.Block {
// Set the orderer-related metadata field
if encodedMetadataValue != nil {
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
......@@ -366,6 +290,7 @@ func (cs *chainSupport) WriteBlock(block *cb.Block, encodedMetadataValue []byte)
return block
}
func (cs *chainSupport) Height() uint64 {
// Height passes through to the underlying ledger's Height.
func (cs *ChainSupport) Height() uint64 {
return cs.Reader().Height()
}
......@@ -66,7 +66,7 @@ func (mc *mockCommitter) Commit() {
func TestCommitConfig(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{
cs := &ChainSupport{
ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml},
filters: filter.NewRuleSet([]filter.Rule{filter.AcceptRule}),
signer: mockCrypto(),
......@@ -89,7 +89,7 @@ func TestCommitConfig(t *testing.T) {
func TestWriteBlockSignatures(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs := &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
actual := utils.GetMetadataFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil), cb.BlockMetadataIndex_SIGNATURES)
assert.NotNil(t, actual, "Block should have block signature")
......@@ -98,7 +98,7 @@ func TestWriteBlockSignatures(t *testing.T) {
func TestWriteBlockOrdererMetadata(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs := &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
value := []byte("foo")
expected := &cb.Metadata{Value: value}
......@@ -110,7 +110,7 @@ func TestWriteBlockOrdererMetadata(t *testing.T) {
func TestSignature(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs := &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
message := []byte("Darth Vader")
signed, _ := cs.Sign(message)
......@@ -124,7 +124,7 @@ func TestSignature(t *testing.T) {
func TestWriteLastConfig(t *testing.T) {
ml := &mockLedgerReadWriter{}
cm := &mockconfigtx.Manager{}
cs := &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs := &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
expected := uint64(0)
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(0, nil), nil))
......@@ -144,7 +144,7 @@ func TestWriteLastConfig(t *testing.T) {
cm.SequenceVal = 2
expected = uint64(4)
cs = &chainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
cs = &ChainSupport{ledgerResources: &ledgerResources{configResources: &configResources{Manager: cm}, ledger: ml}, signer: mockCrypto()}
lc := utils.GetLastConfigIndexFromBlockOrPanic(cs.WriteBlock(cb.NewBlock(4, nil), nil))
assert.Equal(t, expected, lc, "Second block should have config block index of %d, but got %d", expected, lc)
......
......@@ -20,6 +20,7 @@ import (
"github.com/hyperledger/fabric/msp"
"github.com/hyperledger/fabric/orderer/common/ledger"
ramledger "github.com/hyperledger/fabric/orderer/common/ledger/ram"
"github.com/hyperledger/fabric/orderer/consensus"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
......@@ -132,10 +133,10 @@ func TestGetConfigTxFailure(t *testing.T) {
func TestNoSystemChain(t *testing.T) {
lf := ramledger.New(10)
consenters := make(map[string]Consenter)
consenters := make(map[string]consensus.Consenter)
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
assert.Panics(t, func() { NewManagerImpl(lf, consenters, mockCrypto()) }, "Should have panicked when starting without a system chain")
assert.Panics(t, func() { NewRegistrar(lf, consenters, mockCrypto()) }, "Should have panicked when starting without a system chain")
}
// This test checks to make sure that the orderer refuses to come up if there are multiple system channels
......@@ -150,20 +151,20 @@ func TestMultiSystemChannel(t *testing.T) {
assert.NoError(t, err)
}
consenters := make(map[string]Consenter)
consenters := make(map[string]consensus.Consenter)
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
assert.Panics(t, func() { NewManagerImpl(lf, consenters, mockCrypto()) }, "Two system channels should have caused panic")
assert.Panics(t, func() { NewRegistrar(lf, consenters, mockCrypto()) }, "Two system channels should have caused panic")
}
// This test essentially brings the entire system up and is ultimately what main.go will replicate
func TestManagerImpl(t *testing.T) {
lf, rl := NewRAMLedgerAndFactory(10)
consenters := make(map[string]Consenter)
consenters := make(map[string]consensus.Consenter)
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
manager := NewManagerImpl(lf, consenters, mockCrypto())
manager := NewRegistrar(lf, consenters, mockCrypto())
_, ok := manager.GetChain("Fake")
assert.False(t, ok, "Should not have found a chain that was not created")
......@@ -197,9 +198,9 @@ func TestManagerImpl(t *testing.T) {
func TestNewChannelConfig(t *testing.T) {
lf, _ := NewRAMLedgerAndFactoryWithMSP()
consenters := make(map[string]Consenter)
consenters := make(map[string]consensus.Consenter)
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
manager := NewManagerImpl(lf, consenters, mockCrypto())
manager := NewRegistrar(lf, consenters, mockCrypto())
t.Run("BadPayload", func(t *testing.T) {
_, err := manager.NewChannelConfig(&cb.Envelope{Payload: []byte("bad payload")})
......@@ -447,10 +448,10 @@ func TestMismatchedChannelIDs(t *testing.T) {
lf, _ := NewRAMLedgerAndFactory(10)
consenters := make(map[string]Consenter)
consenters := make(map[string]consensus.Consenter)
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
manager := NewManagerImpl(lf, consenters, mockCrypto())
manager := NewRegistrar(lf, consenters, mockCrypto())
_, err = manager.NewChannelConfig(createTx)
assert.Error(t, err, "Mismatched channel IDs")
......@@ -465,10 +466,10 @@ func TestNewChain(t *testing.T) {
lf, rl := NewRAMLedgerAndFactory(10)
consenters := make(map[string]Consenter)
consenters := make(map[string]consensus.Consenter)
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
manager := NewManagerImpl(lf, consenters, mockCrypto())
manager := NewRegistrar(lf, consenters, mockCrypto())
envConfigUpdate, err := configtx.MakeChainCreationTransaction(newChainID, genesisconfig.SampleConsortiumName, mockSigningIdentity)
assert.NoError(t, err, "Constructing chain creation tx")
......@@ -560,10 +561,8 @@ func TestNewChain(t *testing.T) {
testRestartedChainSupport(t, chainSupport, consenters, expectedLastConfigSeq)
}
func testRestartedChainSupport(t *testing.T, cs ChainSupport, consenters map[string]Consenter, expectedLastConfigSeq uint64) {
ccs, ok := cs.(*chainSupport)
assert.True(t, ok, "Casting error")
rcs := newChainSupport(ccs.filters, ccs.ledgerResources, consenters, mockCrypto())
func testRestartedChainSupport(t *testing.T, cs *ChainSupport, consenters map[string]consensus.Consenter, expectedLastConfigSeq uint64) {
rcs := newChainSupport(cs.filters, cs.ledgerResources, consenters, mockCrypto())
assert.Equal(t, expectedLastConfigSeq, rcs.lastConfigSeq, "On restart, incorrect lastConfigSeq")
}
......
......@@ -17,6 +17,7 @@ import (
configtxapi "github.com/hyperledger/fabric/common/configtx/api"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/ledger"
"github.com/hyperledger/fabric/orderer/consensus"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
......@@ -32,19 +33,6 @@ const (
epoch = 0
)
// Manager coordinates the creation and access of chains
type Manager interface {
// GetChain retrieves the chain support for a chain (and whether it exists)
GetChain(chainID string) (ChainSupport, bool)
// SystemChannelID returns the channel ID for the system channel
SystemChannelID() string
// NewChannelConfig returns a bare bones configuration ready for channel
// creation request to be applied on top of it
NewChannelConfig(envConfigUpdate *cb.Envelope) (configtxapi.Manager, error)
}
type configResources struct {
configtxapi.Manager
}
......@@ -62,13 +50,14 @@ type ledgerResources struct {
ledger ledger.ReadWriter
}
type multiLedger struct {
chains map[string]*chainSupport
consenters map[string]Consenter
// Registrar serves as a point of access and control for the individual channel resources.
type Registrar struct {
chains map[string]*ChainSupport
consenters map[string]consensus.Consenter
ledgerFactory ledger.Factory
signer crypto.LocalSigner
systemChannelID string
systemChannel *chainSupport
systemChannel *ChainSupport
}
func getConfigTx(reader ledger.Reader) *cb.Envelope {
......@@ -85,10 +74,10 @@ func getConfigTx(reader ledger.Reader) *cb.Envelope {
return utils.ExtractEnvelopeOrPanic(configBlock, 0)
}
// NewManagerImpl produces an instance of a Manager
func NewManagerImpl(ledgerFactory ledger.Factory, consenters map[string]Consenter, signer crypto.LocalSigner) Manager {
ml := &multiLedger{
chains: make(map[string]*chainSupport),
// NewRegistrar produces an instance of a *Registrar.
func NewRegistrar(ledgerFactory ledger.Factory, consenters map[string]consensus.Consenter, signer crypto.LocalSigner) *Registrar {
r := &Registrar{
chains: make(map[string]*ChainSupport),
ledgerFactory: ledgerFactory,
consenters: consenters,
signer: signer,
......@@ -104,21 +93,21 @@ func NewManagerImpl(ledgerFactory ledger.Factory, consenters map[string]Consente
if configTx == nil {
logger.Panic("Programming error, configTx should never be nil here")
}
ledgerResources := ml.newLedgerResources(configTx)
ledgerResources := r.newLedgerResources(configTx)
chainID := ledgerResources.ChainID()
if _, ok := ledgerResources.ConsortiumsConfig(); ok {
if ml.systemChannelID != "" {
logger.Panicf("There appear to be two system chains %s and %s", ml.systemChannelID, chainID)
if r.systemChannelID != "" {
logger.Panicf("There appear to be two system chains %s and %s", r.systemChannelID, chainID)
}
chain := newChainSupport(createSystemChainFilters(ml, ledgerResources),
chain := newChainSupport(createSystemChainFilters(r, ledgerResources),
ledgerResources,
consenters,
signer)
logger.Infof("Starting with system channel %s and orderer type %s", chainID, chain.SharedConfig().ConsensusType())
ml.chains[chainID] = chain
ml.systemChannelID = chainID
ml.systemChannel = chain
r.chains[chainID] = chain
r.systemChannelID = chainID
r.systemChannel = chain
// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
defer chain.start()
} else {
......@@ -127,30 +116,31 @@ func NewManagerImpl(ledgerFactory ledger.Factory, consenters map[string]Consente
ledgerResources,
consenters,
signer)
ml.chains[chainID] = chain
r.chains[chainID] = chain
chain.start()
}
}
if ml.systemChannelID == "" {
if r.systemChannelID == "" {
logger.Panicf("No system chain found. If bootstrapping, does your system channel contain a consortiums group definition?")
}
return ml
return r
}
func (ml *multiLedger) SystemChannelID() string {