Unverified Commit 4bc13c8e authored by yacovm's avatar yacovm Committed by Artem Barger
Browse files

[FAB-13180] Orderer: auto-join existing inactive chains



This change set makes cluster type OSNs autonomously detect channels
that exist and that they should be part of (the channel configuration
has their public credentials as a consenter for the channel),
but that they do not run chains for, or have the blocks in their ledger.

This can happen from several reasons:
- The OSN is added to an existing chain, and since it didn't participate
  in the chain so far, it didn't get the blocks that tell it is now
  part of the channel.
- The OSN tried to detect whether it is part of a channel, but it
  wasn't able, because all OSNs of the system channel returned
  service-unavailable. This can happen if:
  - a leader election takes place
  - the network is acting up so the leadership was lost
  - a channel has been deserted (all OSNs left it).

To take care of such use cases, all OSNs now:
- Track inactive chains that they know of, but they do not participate in
- Periodically(*) probe the system channel OSNs to see if they are now
  part of these chains or not.
- If so, then they replicate the chains, and create instances of them,
  and replace the instances of the inactive chains in the registrar
  with the new instances of type etcdraft.

(*) - 10 seconds after boot, then after 20 seconds,
      then after 40 seconds, etc. etc. eventually- every 5 minutes.

Change-Id: I3c2a84e6f4f402e011e7a895345b3d3982247083
Signed-off-by: default avataryacovm <yacovm@il.ibm.com>
Signed-off-by: default avatarArtem Barger <bartem@il.ibm.com>
parent e8514271
......@@ -316,8 +316,30 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
assertInvoke(network, peer, o4, mycc3.Name, "testchannel3", "channel testchannel3 is not serviced by me", 1)
Expect(string(orderer4Runner.Err().Contents())).To(ContainSubstring("I do not belong to channel testchannel2 or am forbidden pulling it (not in the channel), skipping chain retrieval"))
Expect(string(orderer4Runner.Err().Contents())).To(ContainSubstring("I do not belong to channel testchannel3 or am forbidden pulling it (forbidden), skipping chain retrieval"))
By("Adding orderer4 to testchannel2")
nwo.AddConsenter(network, peer, o1, "testchannel2", etcdraft.Consenter{
ServerTlsCert: orderer4Certificate,
ClientTlsCert: orderer4Certificate,
Host: "127.0.0.1",
Port: uint32(network.OrdererPort(o4, nwo.ListenPort)),
})
By("Waiting for orderer4 and to replicate testchannel2")
assertBlockReception(map[string]int{
"testchannel2": 2,
}, []*nwo.Orderer{o4}, peer, network)
By("Ensuring that all orderers don't log errors to the log")
assertNoErrorsAreLogged(ordererRunners)
By("Submitting a transaction through orderer4")
assertInvoke(network, peer, o4, mycc2.Name, "testchannel2", "Chaincode invoke successful. result: status:200", 0)
By("And ensuring it is propagated amongst all orderers")
assertBlockReception(map[string]int{
"testchannel2": 3,
}, orderers, peer, network)
})
})
})
......@@ -467,14 +489,17 @@ func waitForBlockReception(o *nwo.Orderer, submitter *nwo.Peer, network *nwo.Net
Eventually(func() string {
sess, err := network.OrdererAdminSession(o, submitter, c)
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(0))
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit())
if sess.ExitCode() != 0 {
return fmt.Sprintf("exit code is %d: %s", sess.ExitCode(), string(sess.Err.Contents()))
}
sessErr := string(sess.Err.Contents())
expected := fmt.Sprintf("Received block: %d", blockSeq)
if strings.Contains(sessErr, expected) {
return ""
}
return sessErr
}, network.EventuallyTimeout).Should(BeEmpty())
}, time.Minute, time.Second).Should(BeEmpty())
}
func assertNoErrorsAreLogged(ordererRunners []*ginkgomon.Runner) {
......
......@@ -77,14 +77,15 @@ type ChannelLister interface {
// Replicator replicates chains
type Replicator struct {
Filter func(string) bool
SystemChannel string
ChannelLister ChannelLister
Logger *flogging.FabricLogger
Puller *BlockPuller
BootBlock *common.Block
AmIPartOfChannel selfMembershipPredicate
LedgerFactory LedgerFactory
DoNotPanicIfClusterNotReachable bool
Filter func(string) bool
SystemChannel string
ChannelLister ChannelLister
Logger *flogging.FabricLogger
Puller *BlockPuller
BootBlock *common.Block
AmIPartOfChannel selfMembershipPredicate
LedgerFactory LedgerFactory
}
// IsReplicationNeeded returns whether replication is needed,
......@@ -112,13 +113,20 @@ func (r *Replicator) IsReplicationNeeded() (bool, error) {
}
// ReplicateChains pulls chains and commits them.
func (r *Replicator) ReplicateChains() {
// Returns the names of the chains replicated successfully.
func (r *Replicator) ReplicateChains() []string {
var replicatedChains []string
channels := r.discoverChannels()
pullHints := r.channelsToPull(channels)
totalChannelCount := len(pullHints.channelsToPull) + len(pullHints.channelsNotToPull)
r.Logger.Info("Found myself in", len(pullHints.channelsToPull), "channels out of", totalChannelCount, ":", pullHints)
for _, channel := range pullHints.channelsToPull {
r.PullChannel(channel.ChannelName)
err := r.PullChannel(channel.ChannelName)
if err == nil {
replicatedChains = append(replicatedChains, channel.ChannelName)
} else {
r.Logger.Warningf("Failed pulling channel %s: %v", channel.ChannelName, err)
}
}
// Next, just commit the genesis blocks of the channels we shouldn't pull.
for _, channel := range pullHints.channelsNotToPull {
......@@ -137,9 +145,10 @@ func (r *Replicator) ReplicateChains() {
}
// Last, pull the system chain
if err := r.PullChannel(r.SystemChannel); err != nil {
if err := r.PullChannel(r.SystemChannel); err != nil && err != ErrSkipped {
r.Logger.Panicf("Failed pulling system channel: %v", err)
}
return replicatedChains
}
func (r *Replicator) discoverChannels() []ChannelGenesisBlock {
......@@ -156,7 +165,7 @@ func (r *Replicator) discoverChannels() []ChannelGenesisBlock {
func (r *Replicator) PullChannel(channel string) error {
if !r.Filter(channel) {
r.Logger.Info("Channel", channel, "shouldn't be pulled. Skipping it")
return nil
return ErrSkipped
}
r.Logger.Info("Pulling channel", channel)
puller := r.Puller.Clone()
......@@ -239,7 +248,7 @@ type channelPullHints struct {
}
func (r *Replicator) channelsToPull(channels GenesisBlocks) channelPullHints {
r.Logger.Info("Will now pull channels:", channels.Names())
r.Logger.Info("Will now attempt to pull channels:", channels.Names())
var channelsNotToPull []ChannelGenesisBlock
var channelsToPull []ChannelGenesisBlock
for _, channel := range channels {
......@@ -266,9 +275,12 @@ func (r *Replicator) channelsToPull(channels GenesisBlocks) channelPullHints {
continue
}
if err != nil {
r.Logger.Panicf("Failed classifying whether I belong to channel %s: %v, skipping chain retrieval", channel.ChannelName, err)
if !r.DoNotPanicIfClusterNotReachable {
r.Logger.Panicf("Failed classifying whether I belong to channel %s: %v, skipping chain retrieval", channel.ChannelName, err)
}
continue
}
r.Logger.Infof("I need to pull channel %s", channel.ChannelName)
channelsToPull = append(channelsToPull, channel)
}
return channelPullHints{
......@@ -360,6 +372,9 @@ type ChainInspector struct {
LastConfigBlock *common.Block
}
// ErrSkipped denotes that replicating a chain was skipped
var ErrSkipped = errors.New("skipped")
// ErrForbidden denotes that an ordering node refuses sending blocks due to access control.
var ErrForbidden = errors.New("forbidden")
......@@ -520,6 +535,7 @@ func ChannelCreationBlockToGenesisBlock(block *common.Block) (*common.Block, err
return nil, err
}
block.Data.Data = [][]byte{payload.Data}
block.Header.DataHash = block.Data.Hash()
block.Header.Number = 0
block.Header.PreviousHash = nil
metadata := &common.BlockMetadata{
......
......@@ -1399,5 +1399,5 @@ func TestFilter(t *testing.T) {
},
Logger: logger,
}
assert.Nil(t, r.PullChannel("foo"))
assert.Equal(t, cluster.ErrSkipped, r.PullChannel("foo"))
}
......@@ -58,18 +58,19 @@ type General struct {
}
type Cluster struct {
ListenAddress string
ListenPort uint16
ServerCertificate string
ServerPrivateKey string
ClientCertificate string
ClientPrivateKey string
RootCAs []string
DialTimeout time.Duration
RPCTimeout time.Duration
ReplicationBufferSize int
ReplicationPullTimeout time.Duration
ReplicationRetryTimeout time.Duration
ListenAddress string
ListenPort uint16
ServerCertificate string
ServerPrivateKey string
ClientCertificate string
ClientPrivateKey string
RootCAs []string
DialTimeout time.Duration
RPCTimeout time.Duration
ReplicationBufferSize int
ReplicationPullTimeout time.Duration
ReplicationRetryTimeout time.Duration
ReplicationBackgroundRefreshInterval time.Duration
}
// Keepalive contains configuration for gRPC servers.
......
......@@ -11,6 +11,7 @@ package multichannel
import (
"fmt"
"reflect"
"sync"
"github.com/hyperledger/fabric/common/channelconfig"
......@@ -104,7 +105,9 @@ type Registrar struct {
callbacks []func(bundle *channelconfig.Bundle)
}
func getConfigTx(reader blockledger.Reader) *cb.Envelope {
// ConfigBlock retrieves the last configuration block from the given ledger.
// Panics on failure.
func ConfigBlock(reader blockledger.Reader) *cb.Block {
lastBlock := blockledger.GetBlock(reader, reader.Height()-1)
index, err := utils.GetLastConfigIndexFromBlock(lastBlock)
if err != nil {
......@@ -115,7 +118,11 @@ func getConfigTx(reader blockledger.Reader) *cb.Envelope {
logger.Panicf("Config block does not exist")
}
return utils.ExtractEnvelopeOrPanic(configBlock, 0)
return configBlock
}
func configTx(reader blockledger.Reader) *cb.Envelope {
return utils.ExtractEnvelopeOrPanic(ConfigBlock(reader), 0)
}
// NewRegistrar produces an instance of a *Registrar.
......@@ -140,7 +147,7 @@ func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
if err != nil {
logger.Panicf("Ledger factory reported chainID %s but could not retrieve it: %s", chainID, err)
}
configTx := getConfigTx(rl)
configTx := configTx(rl)
if configTx == nil {
logger.Panic("Programming error, configTx should never be nil here")
}
......@@ -275,12 +282,30 @@ func (r *Registrar) newLedgerResources(configTx *cb.Envelope) *ledgerResources {
}
}
// CreateChain makes the Registrar create a chain with the given name.
func (r *Registrar) CreateChain(chainName string) {
lf, err := r.ledgerFactory.GetOrCreate(chainName)
if err != nil {
logger.Panicf("Failed obtaining ledger factory for %s: %v", chainName, err)
}
chain := r.GetChain(chainName)
if chain != nil {
logger.Infof("A chain of type %v for channel %s already exists. "+
"Halting it.", reflect.TypeOf(chain.Chain), chainName)
chain.Halt()
}
r.newChain(configTx(lf))
}
func (r *Registrar) newChain(configtx *cb.Envelope) {
r.lock.Lock()
defer r.lock.Unlock()
ledgerResources := r.newLedgerResources(configtx)
ledgerResources.Append(blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx}))
// If we have no blocks, we need to create the genesis block ourselves.
if ledgerResources.Height() == 0 {
ledgerResources.Append(blockledger.CreateNextBlock(ledgerResources, []*cb.Envelope{configtx}))
}
// Copy the map to allow concurrent reads from broadcast/deliver while the new chainSupport is
newChains := make(map[string]*ChainSupport)
......
......@@ -82,7 +82,7 @@ func TestGetConfigTx(t *testing.T) {
block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIG] = utils.MarshalOrPanic(&cb.Metadata{Value: utils.MarshalOrPanic(&cb.LastConfig{Index: 7})})
rl.Append(block)
pctx := getConfigTx(rl)
pctx := configTx(rl)
assert.True(t, proto.Equal(pctx, ctx), "Did not select most recent config transaction")
}
......@@ -96,11 +96,11 @@ func TestGetConfigTxFailure(t *testing.T) {
}))
}
rl.Append(blockledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(genesisconfig.TestChainID, 11)}))
assert.Panics(t, func() { getConfigTx(rl) }, "Should have panicked because there was no config tx")
assert.Panics(t, func() { configTx(rl) }, "Should have panicked because there was no config tx")
block := blockledger.CreateNextBlock(rl, []*cb.Envelope{makeNormalTx(genesisconfig.TestChainID, 12)})
block.Metadata.Metadata[cb.BlockMetadataIndex_LAST_CONFIG] = []byte("bad metadata")
assert.Panics(t, func() { getConfigTx(rl) }, "Should have panicked because of bad last config metadata")
assert.Panics(t, func() { configTx(rl) }, "Should have panicked because of bad last config metadata")
}
// This test checks to make sure the orderer refuses to come up if it cannot find a system channel
......@@ -169,6 +169,41 @@ func TestManagerImpl(t *testing.T) {
}
}
func TestCreateChain(t *testing.T) {
lf, _ := NewRAMLedgerAndFactory(10)
consenters := make(map[string]consensus.Consenter)
consenters[conf.Orderer.OrdererType] = &mockConsenter{}
manager := NewRegistrar(lf, mockCrypto(), &disabled.Provider{})
manager.Initialize(consenters)
ledger, err := lf.GetOrCreate("mychannel")
assert.NoError(t, err)
genesisBlock := encoder.New(conf).GenesisBlockForChannel("mychannel")
ledger.Append(genesisBlock)
// Before creating the chain, it doesn't exist
assert.Nil(t, manager.GetChain("mychannel"))
// After creating the chain, it exists
manager.CreateChain("mychannel")
chain := manager.GetChain("mychannel")
assert.NotNil(t, chain)
// A subsequent creation, replaces the chain.
manager.CreateChain("mychannel")
chain2 := manager.GetChain("mychannel")
assert.NotNil(t, chain2)
// They are not the same
assert.NotEqual(t, chain, chain2)
// The old chain is halted
_, ok := <-chain.Chain.(*mockChain).queue
assert.False(t, ok)
// The new chain is not halted: Close the channel to prove that.
close(chain2.Chain.(*mockChain).queue)
}
// This test brings up the entire system, with the mock consenter, including the broadcasters etc. and creates a new chain
func TestNewChain(t *testing.T) {
expectedLastConfigBlockNumber := uint64(0)
......
......@@ -102,15 +102,15 @@ func Start(cmd string, conf *localconfig.TopLevel) {
clusterClientConfig := initializeClusterClientConfig(conf)
clusterDialer.SetConfig(clusterClientConfig)
r := &replicationInitiator{
logger: logger,
secOpts: clusterClientConfig.SecOpts,
conf: conf,
lf: &ledgerFactory{lf},
signer: signer,
}
// Only clusters that are equipped with a recent config block can replicate.
if clusterType && conf.General.GenesisMethod == "file" {
r := &replicationInitiator{
logger: logger,
secOpts: clusterClientConfig.SecOpts,
conf: conf,
lf: &ledgerFactory{lf},
signer: signer,
}
r.replicateIfNeeded(bootstrapBlock)
}
......@@ -156,7 +156,7 @@ func Start(cmd string, conf *localconfig.TopLevel) {
}
}
manager := initializeMultichannelRegistrar(bootstrapBlock, clusterDialer, clusterServerConfig, clusterGRPCServer, conf, signer, metricsProvider, opsSystem, lf, tlsCallback)
manager := initializeMultichannelRegistrar(bootstrapBlock, r, clusterDialer, clusterServerConfig, clusterGRPCServer, conf, signer, metricsProvider, opsSystem, lf, tlsCallback)
mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
server := NewServer(manager, metricsProvider, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS)
......@@ -501,6 +501,7 @@ type healthChecker interface {
func initializeMultichannelRegistrar(
bootstrapBlock *cb.Block,
ri *replicationInitiator,
clusterDialer *cluster.PredicateDialer,
srvConf comm.ServerConfig,
srv *comm.GRPCServer,
......@@ -529,13 +530,60 @@ func initializeMultichannelRegistrar(
// closes if we wished to cleanup this routine on exit.
go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)
if isClusterType(bootstrapBlock) {
raftConsenter := etcdraft.New(clusterDialer, conf, srvConf, srv, registrar)
consenters["etcdraft"] = raftConsenter
initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar)
}
registrar.Initialize(consenters)
return registrar
}
func initializeEtcdraftConsenter(consenters map[string]consensus.Consenter,
conf *localconfig.TopLevel,
lf blockledger.Factory,
clusterDialer *cluster.PredicateDialer,
bootstrapBlock *cb.Block,
ri *replicationInitiator,
srvConf comm.ServerConfig,
srv *comm.GRPCServer,
registrar *multichannel.Registrar) {
replicationRefreshInterval := conf.General.Cluster.ReplicationBackgroundRefreshInterval
if replicationRefreshInterval == 0 {
replicationRefreshInterval = defaultReplicationBackgroundRefreshInterval
}
systemChannelName, err := utils.GetChainIDFromBlock(bootstrapBlock)
if err != nil {
ri.logger.Panicf("Failed extracting system channel name from bootstrap block: %v", err)
}
systemLedger, err := lf.GetOrCreate(systemChannelName)
if err != nil {
ri.logger.Panicf("Failed obtaining system channel (%s) ledger: %v", systemChannelName, err)
}
getConfigBlock := func() *cb.Block {
return multichannel.ConfigBlock(systemLedger)
}
exponentialSleep := exponentialDurationSeries(replicationBackgroundInitialRefreshInterval, replicationRefreshInterval)
icr := &inactiveChainReplicator{
logger: logger,
scheduleChan: makeTickChannel(exponentialSleep, time.Sleep),
quitChan: make(chan struct{}),
replicator: ri,
chains2CreationCallbacks: make(map[string]chainCreation),
retrieveLastSysChannelConfigBlock: getConfigBlock,
}
// Use the inactiveChainReplicator as a channel lister, since it has knowledge
// of all inactive chains.
// This is to prevent us pulling the entire system chain when attempting to enumerate
// the channels in the system.
ri.channelLister = icr
go icr.run()
raftConsenter := etcdraft.New(clusterDialer, conf, srvConf, srv, registrar, icr)
consenters["etcdraft"] = raftConsenter
}
func newOperationsSystem(ops localconfig.Operations, metrics localconfig.Metrics) *operations.System {
return operations.NewSystem(operations.Options{
Logger: flogging.MustGetLogger("orderer.operations"),
......
......@@ -20,16 +20,20 @@ import (
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/flogging/floggingtest"
"github.com/hyperledger/fabric/common/ledger/blockledger/ram"
"github.com/hyperledger/fabric/common/localmsp"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/common/metrics/prometheus"
"github.com/hyperledger/fabric/common/tools/configtxgen/configtxgentest"
"github.com/hyperledger/fabric/common/tools/configtxgen/encoder"
genesisconfig "github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/config/configtest"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/multichannel"
"github.com/hyperledger/fabric/orderer/common/server/mocks"
"github.com/hyperledger/fabric/orderer/consensus"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
......@@ -268,7 +272,7 @@ func TestInitializeMultiChainManager(t *testing.T) {
initializeLocalMsp(conf)
lf, _ := createLedgerFactory(conf)
bootBlock := encoder.New(genesisconfig.Load(genesisconfig.SampleDevModeSoloProfile)).GenesisBlockForChannel("system")
initializeMultichannelRegistrar(bootBlock, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, conf, localmsp.NewSigner(), &disabled.Provider{}, &mocks.HealthChecker{}, lf)
initializeMultichannelRegistrar(bootBlock, &replicationInitiator{}, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, conf, localmsp.NewSigner(), &disabled.Provider{}, &mocks.HealthChecker{}, lf)
})
}
......@@ -331,7 +335,7 @@ func TestUpdateTrustedRoots(t *testing.T) {
}
lf, _ := createLedgerFactory(conf)
bootBlock := encoder.New(genesisconfig.Load(genesisconfig.SampleDevModeSoloProfile)).GenesisBlockForChannel("system")
initializeMultichannelRegistrar(bootBlock, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, &mocks.HealthChecker{}, lf, callback)
initializeMultichannelRegistrar(bootBlock, &replicationInitiator{}, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, &mocks.HealthChecker{}, lf, callback)
t.Logf("# app CAs: %d", len(caSupport.AppRootCAsByChain[genesisconfig.TestChainID]))
t.Logf("# orderer CAs: %d", len(caSupport.OrdererRootCAsByChain[genesisconfig.TestChainID]))
// mutual TLS not required so no updates should have occurred
......@@ -368,7 +372,7 @@ func TestUpdateTrustedRoots(t *testing.T) {
updateClusterDialer(caSupport, predDialer, clusterConf.SecOpts.ServerRootCAs)
}
}
initializeMultichannelRegistrar(bootBlock, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, &mocks.HealthChecker{}, lf, callback)
initializeMultichannelRegistrar(bootBlock, &replicationInitiator{}, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, &mocks.HealthChecker{}, lf, callback)
t.Logf("# app CAs: %d", len(caSupport.AppRootCAsByChain[genesisconfig.TestChainID]))
t.Logf("# orderer CAs: %d", len(caSupport.OrdererRootCAsByChain[genesisconfig.TestChainID]))
// mutual TLS is required so updates should have occurred
......@@ -572,6 +576,34 @@ func TestConfigureClusterListener(t *testing.T) {
}
}
func TestInitializeEtcdraftConsenter(t *testing.T) {
consenters := make(map[string]consensus.Consenter)
rlf := ramledger.New(10)
conf := configtxgentest.Load(genesisconfig.SampleInsecureSoloProfile)
genesisBlock := encoder.New(conf).GenesisBlock()
ca, _ := tlsgen.NewCA()
crt, _ := ca.NewServerCertKeyPair("127.0.0.1")
srv, err := comm.NewGRPCServer("127.0.0.1:0", comm.ServerConfig{})
assert.NoError(t, err)
initializeEtcdraftConsenter(consenters,
&localconfig.TopLevel{},
rlf,
&cluster.PredicateDialer{},
genesisBlock, &replicationInitiator{},
comm.ServerConfig{
SecOpts: &comm.SecureOptions{
Certificate: crt.Cert,
Key: crt.Key,
UseTLS: true,
},
}, srv, &multichannel.Registrar{})
assert.NotNil(t, consenters["etcdraft"])
}
func genesisConfig(t *testing.T) *localconfig.TopLevel {
t.Helper()
localMSPDir, _ := configtest.GetDevMspDir()
......
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import common "github.com/hyperledger/fabric/protos/common"
import mock "github.com/stretchr/testify/mock"
// ChainReplicator is an autogenerated mock type for the ChainReplicator type
type ChainReplicator struct {
mock.Mock
}
// ReplicateChains provides a mock function with given fields: lastConfigBlock, chains
func (_m *ChainReplicator) ReplicateChains(lastConfigBlock *common.Block, chains []string) []string {
ret := _m.Called(lastConfigBlock, chains)
var r0 []string
if rf, ok := ret.Get(0).(func(*common.Block, []string) []string); ok {
r0 = rf(lastConfigBlock, chains)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]string)
}
}
return r0
}
......@@ -7,6 +7,9 @@ SPDX-License-Identifier: Apache-2.0
package server
import (
"sync"
"time"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blockledger"
......@@ -18,12 +21,18 @@ import (
"github.com/hyperledger/fabric/protos/utils"
)
const (
defaultReplicationBackgroundRefreshInterval = time.Minute * 5
replicationBackgroundInitialRefreshInterval = time.Second * 10
)
type replicationInitiator struct {
logger *flogging.FabricLogger
secOpts *comm.SecureOptions
conf *localconfig.TopLevel
lf cluster.LedgerFactory
signer crypto.LocalSigner
channelLister cluster.ChannelLister
logger *flogging.FabricLogger
secOpts *comm.SecureOptions
conf *localconfig.TopLevel
lf cluster.LedgerFactory
signer crypto.LocalSigner
}
func (ri *replicationInitiator) replicateIfNeeded(bootstrapBlock *common.Block) {
......@@ -63,6 +72,11 @@ func (ri *replicationInitiator) createReplicator(bootstrapBlock *common.Block, f
},
}
// If a custom channel lister is requested, use it
if ri.channelLister != nil {
replicator.ChannelLister = ri.channelLister
}
return replicator
}
......@@ -83,7 +97,9 @@ func (ri *replicationInitiator) replicateNeededChannels(bootstrapBlock *common.B
replicator.ReplicateChains()
}