Commit 4445fa12 authored by Yacov Manevich's avatar Yacov Manevich Committed by Gerrit Code Review
Browse files

Merge changes I3aa68e4b,Idf10bff7,I5db2adbd,If1ce27b2,Ica00d5e6, ... into release-1.4

* changes:
  [FAB-13331] Refactor metadata updates in nwo
  [FAB-13298] Fix test flake on MacOS
  [FAB-13332] Add cryptogen extend to integration tests
  [FAB-13334] Onboarding: Allow empty channels
  [FAB-13330] Rename GetConfigBlock to GetConfig in nwo
  [FAB-13349] Add more assertion to etcdraft UT.
  [FAB-13095] fix UT flake RPC timeout
  [FAB-13350] Fix etcdraft flaky test
  [FAB-13298] Fix TestConfigureClusterListener in MacOS
  [FAB-13299] Onboarding: Skip committing existing blocks
  [FAB-12579] Separate TLS listener for intra-cluster
  [FAB-13262] typo in configblock.go
  [FAB-13053] Add an UT to assert retransmission.
  [FAB-12949] Fix etcdraft reconfiguration UT
  [FAB-12729] Support subset of system channel OSNs
  [FAB-13150] Re-enable etcdraft for v2.0 development
  [FAB-13225] address code review comments
  [FAB-13057] Remove applied index check in storage
  [FAB-13199] Reduce etcdraft test time.
  [FAB-12949] finish reconfiguration after restart
parents 011ac856 4f802d51
......@@ -256,7 +256,6 @@ func TestNewOrdererGroup(t *testing.T) {
})
t.Run("etcd/raft-based Orderer", func(t *testing.T) {
t.Skip()
config := configtxgentest.Load(genesisconfig.SampleDevModeEtcdRaftProfile)
group, _ := NewOrdererGroup(config.Orderer)
consensusType := group.GetValues()[channelconfig.ConsensusTypeKey]
......
......@@ -13,7 +13,7 @@ import (
"path/filepath"
"syscall"
"github.com/fsouza/go-dockerclient"
docker "github.com/fsouza/go-dockerclient"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/cauthdsl"
"github.com/hyperledger/fabric/integration/nwo"
......@@ -177,7 +177,7 @@ var _ = Describe("DiscoveryService", func() {
Expect(discovered[0].Layouts[0].QuantitiesByGroup).To(ConsistOf(uint32(1), uint32(1), uint32(1)))
By("changing the channel policy")
currentConfig := nwo.GetConfigBlock(network, network.Peer("org3", "peer0"), orderer, "testchannel")
currentConfig := nwo.GetConfig(network, network.Peer("org3", "peer0"), orderer, "testchannel")
updatedConfig := proto.Clone(currentConfig).(*common.Config)
updatedConfig.ChannelGroup.Groups["Application"].Groups["org3"].Policies["Writers"].Policy.Value = utils.MarshalOrPanic(cauthdsl.SignedByMspAdmin("Org3MSP"))
nwo.UpdateConfig(network, orderer, "testchannel", currentConfig, updatedConfig, network.Peer("org3", "peer0"))
......
......@@ -248,7 +248,7 @@ func SetACLPolicy(network *nwo.Network, channel, policyName, policy string, orde
submitter := network.Peer("Org1", "peer0")
signer := network.Peer("Org2", "peer0")
config := nwo.GetConfigBlock(network, submitter, orderer, channel)
config := nwo.GetConfig(network, submitter, orderer, channel)
updatedConfig := proto.Clone(config).(*common.Config)
// set the policy
......
......@@ -25,7 +25,7 @@ import (
"github.com/tedsuo/ifrit/grouper"
)
var _ = PDescribe("EndToEnd Crash Fault Tolerance", func() {
var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
var (
testDir string
client *docker.Client
......
......@@ -20,16 +20,13 @@ import (
"syscall"
"time"
"github.com/fsouza/go-dockerclient"
docker "github.com/fsouza/go-dockerclient"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-lib-go/healthz"
"github.com/hyperledger/fabric/core/aclmgmt/resources"
"github.com/hyperledger/fabric/integration/nwo"
"github.com/hyperledger/fabric/integration/nwo/commands"
"github.com/hyperledger/fabric/protos/common"
protosorderer "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/orderer/etcdraft"
"github.com/hyperledger/fabric/protos/utils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
......@@ -151,7 +148,7 @@ var _ = Describe("EndToEnd", func() {
})
})
PDescribe("basic single node etcdraft network with 2 orgs", func() {
Describe("basic single node etcdraft network with 2 orgs", func() {
BeforeEach(func() {
network = nwo.New(nwo.BasicEtcdRaft(), testDir, client, BasePort(), components)
network.GenerateConfigTree()
......@@ -172,7 +169,7 @@ var _ = Describe("EndToEnd", func() {
})
})
PDescribe("three node etcdraft network with 2 orgs", func() {
Describe("three node etcdraft network with 2 orgs", func() {
BeforeEach(func() {
network = nwo.New(nwo.MultiNodeEtcdRaft(), testDir, client, BasePort(), components)
network.GenerateConfigTree()
......@@ -250,7 +247,7 @@ var _ = Describe("EndToEnd", func() {
})
})
PDescribe("etcd raft, checking valid configuration update of type B", func() {
Describe("etcd raft, checking valid configuration update of type B", func() {
BeforeEach(func() {
network = nwo.New(nwo.BasicEtcdRaft(), testDir, client, BasePort(), components)
network.GenerateConfigTree()
......@@ -270,36 +267,24 @@ var _ = Describe("EndToEnd", func() {
nwo.DeployChaincode(network, "testchannel", orderer, chaincode)
RunQueryInvokeQuery(network, orderer, peer, "testchannel")
config := nwo.GetConfigBlock(network, peer, orderer, channel)
updatedConfig := proto.Clone(config).(*common.Config)
consensusTypeConfigValue := updatedConfig.ChannelGroup.Groups["Orderer"].Values["ConsensusType"]
consensusTypeValue := &protosorderer.ConsensusType{}
err := proto.Unmarshal(consensusTypeConfigValue.Value, consensusTypeValue)
Expect(err).NotTo(HaveOccurred())
metadata := &etcdraft.Metadata{}
err = proto.Unmarshal(consensusTypeValue.Metadata, metadata)
Expect(err).NotTo(HaveOccurred())
// update max in flight messages
metadata.Options.MaxInflightMsgs = 1000
metadata.Options.MaxSizePerMsg = 512
nwo.UpdateConsensusMetadata(network, peer, orderer, channel, func(originalMetadata []byte) []byte {
metadata := &etcdraft.Metadata{}
err := proto.Unmarshal(originalMetadata, metadata)
Expect(err).NotTo(HaveOccurred())
// write metadata back
consensusTypeValue.Metadata, err = proto.Marshal(metadata)
Expect(err).NotTo(HaveOccurred())
// update max in flight messages
metadata.Options.MaxInflightMsgs = 1000
metadata.Options.MaxSizePerMsg = 512
updatedConfig.ChannelGroup.Groups["Orderer"].Values["ConsensusType"] = &common.ConfigValue{
ModPolicy: "Admins",
Value: utils.MarshalOrPanic(consensusTypeValue),
}
nwo.UpdateOrdererConfig(network, orderer, channel, config, updatedConfig, peer, orderer)
// write metadata back
newMetadata, err := proto.Marshal(metadata)
Expect(err).NotTo(HaveOccurred())
return newMetadata
})
})
})
PDescribe("basic single node etcdraft network with 2 orgs and 2 channels", func() {
Describe("basic single node etcdraft network with 2 orgs and 2 channels", func() {
BeforeEach(func() {
network = nwo.New(nwo.MultiChannelEtcdRaft(), testDir, client, BasePort(), components)
network.GenerateConfigTree()
......
......@@ -22,3 +22,20 @@ func (c Generate) Args() []string {
"--output", c.Output,
}
}
type Extend struct {
Config string
Input string
}
func (c Extend) SessionName() string {
return "cryptogen-extend"
}
func (c Extend) Args() []string {
return []string{
"extend",
"--config", c.Config,
"--input", c.Input,
}
}
......@@ -11,19 +11,22 @@ import (
"os"
"path/filepath"
"bytes"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/tools/configtxlator/update"
"github.com/hyperledger/fabric/integration/nwo/commands"
"github.com/hyperledger/fabric/protos/common"
protosorderer "github.com/hyperledger/fabric/protos/orderer"
ectdraft_protos "github.com/hyperledger/fabric/protos/orderer/etcdraft"
"github.com/hyperledger/fabric/protos/utils"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
"github.com/onsi/gomega/gexec"
)
// GetConfigBlock retrieves the current config block for a channel and
// unmarshals it.
func GetConfigBlock(n *Network, peer *Peer, orderer *Orderer, channel string) *common.Config {
// GetConfigBlock retrieves the current config block for a channel
func GetConfigBlock(n *Network, peer *Peer, orderer *Orderer, channel string) *common.Block {
tempDir, err := ioutil.TempDir("", "getConfigBlock")
Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(tempDir)
......@@ -42,7 +45,12 @@ func GetConfigBlock(n *Network, peer *Peer, orderer *Orderer, channel string) *c
// unmarshal the config block bytes
configBlock := UnmarshalBlockFromFile(output)
return configBlock
}
// GetConfig retrieves the last config of the given channel
func GetConfig(n *Network, peer *Peer, orderer *Orderer, channel string) *common.Config {
configBlock := GetConfigBlock(n, peer, orderer, channel)
// unmarshal the envelope bytes
envelope, err := utils.GetEnvelopeFromBlock(configBlock.Data.Data[0])
Expect(err).NotTo(HaveOccurred())
......@@ -164,7 +172,7 @@ func UpdateOrdererConfig(n *Network, orderer *Orderer, channel string, current,
}
// CurrentConfigBlockNumber retrieves the block number from the header of the
// current config block. This can be used to detect whena configuration change
// current config block. This can be used to detect when configuration change
// has completed.
func CurrentConfigBlockNumber(n *Network, peer *Peer, orderer *Orderer, channel string) uint64 {
tempDir, err := ioutil.TempDir("", "currentConfigBlock")
......@@ -198,3 +206,62 @@ func UnmarshalBlockFromFile(blockFile string) *common.Block {
return block
}
// AddConsenter adds a new consenter to the given channel
func AddConsenter(n *Network, peer *Peer, orderer *Orderer, channel string, consenter ectdraft_protos.Consenter) {
UpdateConsensusMetadata(n, peer, orderer, channel, func(originalMetadata []byte) []byte {
metadata := &ectdraft_protos.Metadata{}
err := proto.Unmarshal(originalMetadata, metadata)
Expect(err).NotTo(HaveOccurred())
metadata.Consenters = append(metadata.Consenters, &consenter)
newMetadata, err := proto.Marshal(metadata)
Expect(err).NotTo(HaveOccurred())
return newMetadata
})
}
// RemoveConsenter removes a consenter with the given certificate in PEM format from the given channel
func RemoveConsenter(n *Network, peer *Peer, orderer *Orderer, channel string, certificate []byte) {
UpdateConsensusMetadata(n, peer, orderer, channel, func(originalMetadata []byte) []byte {
metadata := &ectdraft_protos.Metadata{}
err := proto.Unmarshal(originalMetadata, metadata)
Expect(err).NotTo(HaveOccurred())
var newConsenters []*ectdraft_protos.Consenter
for _, consenter := range metadata.Consenters {
if bytes.Equal(consenter.ClientTlsCert, certificate) || bytes.Equal(consenter.ServerTlsCert, certificate) {
continue
}
newConsenters = append(newConsenters, consenter)
}
metadata.Consenters = newConsenters
newMetadata, err := proto.Marshal(metadata)
Expect(err).NotTo(HaveOccurred())
return newMetadata
})
}
// ConsensusMetadataMutator receives ConsensusType.Metadata and mutates it
type ConsensusMetadataMutator func([]byte) []byte
// UpdateConsensusMetadata executes a config update that updates the consensus metadata according to the given ConsensusMetadataMutator
func UpdateConsensusMetadata(network *Network, peer *Peer, orderer *Orderer, channel string, mutateMetadata ConsensusMetadataMutator) {
config := GetConfig(network, peer, orderer, channel)
updatedConfig := proto.Clone(config).(*common.Config)
consensusTypeConfigValue := updatedConfig.ChannelGroup.Groups["Orderer"].Values["ConsensusType"]
consensusTypeValue := &protosorderer.ConsensusType{}
err := proto.Unmarshal(consensusTypeConfigValue.Value, consensusTypeValue)
Expect(err).NotTo(HaveOccurred())
consensusTypeValue.Metadata = mutateMetadata(consensusTypeValue.Metadata)
updatedConfig.ChannelGroup.Groups["Orderer"].Values["ConsensusType"] = &common.ConfigValue{
ModPolicy: "Admins",
Value: utils.MarshalOrPanic(consensusTypeValue),
}
UpdateOrdererConfig(network, orderer, channel, config, updatedConfig, peer, orderer)
}
......@@ -447,7 +447,7 @@ func TestAbortRPC(t *testing.T) {
},
{
name: "RPC timeout",
rpcTimeout: time.Millisecond * 100,
rpcTimeout: time.Second,
abortFunc: func(*cluster.RemoteContext) {},
},
}
......
......@@ -92,7 +92,7 @@ func (p *BlockPuller) PullBlock(seq uint64) *common.Block {
// HeightsByEndpoints returns the block heights by endpoints of orderers
func (p *BlockPuller) HeightsByEndpoints() map[string]uint64 {
res := make(map[string]uint64)
for endpoint, endpointInfo := range p.probeEndpoints(1).byEndpoints() {
for endpoint, endpointInfo := range p.probeEndpoints(0).byEndpoints() {
endpointInfo.conn.Close()
res[endpoint] = endpointInfo.lastBlockSeq + 1
}
......
......@@ -159,7 +159,7 @@ func (r *Replicator) pullChannelBlocks(channel string, puller ChainPuller, lates
}
// Pull the genesis block and remember its hash.
genesisBlock := puller.PullBlock(0)
r.appendBlock(genesisBlock, ledger)
r.appendBlockIfNeeded(genesisBlock, ledger, channel)
actualPrevHash := genesisBlock.Header.Hash()
for seq := uint64(1); seq < latestHeight; seq++ {
......@@ -172,19 +172,26 @@ func (r *Replicator) pullChannelBlocks(channel string, puller ChainPuller, lates
actualPrevHash = block.Header.Hash()
if channel == r.SystemChannel && block.Header.Number == r.BootBlock.Header.Number {
r.compareBootBlockWithSystemChannelLastConfigBlock(block)
r.appendBlock(block, ledger)
r.appendBlockIfNeeded(block, ledger, channel)
// No need to pull further blocks from the system channel
return nil
}
r.appendBlock(block, ledger)
r.appendBlockIfNeeded(block, ledger, channel)
}
return nil
}
func (r *Replicator) appendBlock(block *common.Block, ledger LedgerWriter) {
func (r *Replicator) appendBlockIfNeeded(block *common.Block, ledger LedgerWriter, channel string) {
currHeight := ledger.Height()
if currHeight >= block.Header.Number+1 {
r.Logger.Infof("Already at height %d for channel %s, skipping commit of block %d",
currHeight, channel, block.Header.Number)
return
}
if err := ledger.Append(block); err != nil {
r.Logger.Panicf("Failed to write block %d: %v", block.Header.Number, err)
}
r.Logger.Infof("Committed block %d for channel %s", block.Header.Number, channel)
}
func (r *Replicator) compareBootBlockWithSystemChannelLastConfigBlock(block *common.Block) {
......@@ -344,7 +351,7 @@ func latestHeightAndEndpoint(puller ChainPuller) (string, uint64) {
var maxHeight uint64
var mostUpToDateEndpoint string
for endpoint, height := range puller.HeightsByEndpoints() {
if height > maxHeight {
if height >= maxHeight {
maxHeight = height
mostUpToDateEndpoint = endpoint
}
......
......@@ -101,6 +101,7 @@ func TestReplicateChainsFailures(t *testing.T) {
name: "no block received",
expectedPanic: "Failed pulling system channel: " +
"failed obtaining the latest block for channel system",
isProbeResponseDelayed: true,
},
{
name: "latest block seq is less than boot block seq",
......@@ -157,6 +158,7 @@ func TestReplicateChainsFailures(t *testing.T) {
lw := &mocks.LedgerWriter{}
lw.On("Append", mock.Anything).Return(testCase.appendBlockError)
lw.On("Height").Return(uint64(0))
lf := &mocks.LedgerFactory{}
lf.On("GetOrCreate", "system").Return(lw, testCase.ledgerFactoryError)
......@@ -188,9 +190,12 @@ func TestReplicateChainsFailures(t *testing.T) {
osn.addExpectProbeAssert()
osn.addExpectProbeAssert()
osn.addExpectPullAssert(0)
for _, block := range systemChannelBlocks {
osn.blockResponses <- &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{Block: block},
if !testCase.isProbeResponseDelayed {
for _, block := range systemChannelBlocks {
osn.blockResponses <- &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{Block: block},
}
}
}
......@@ -290,6 +295,11 @@ func TestReplicateChainsGreenPath(t *testing.T) {
// Scenario: There are 2 channels in the system: A and B.
// We are in channel A but not in channel B, therefore
// we should pull channel A and then the system channel.
// However, this is not the first attempt of replication for
// our node, but the second.
// In the past, the node pulled 10 blocks of channel A and crashed.
// Therefore, it should pull blocks, but commit for channel A
// only blocks starting from block number 10.
systemChannelBlocks := createBlockChain(0, 21)
block30WithConfigBlockOf21 := common.NewBlock(30, nil)
......@@ -311,23 +321,41 @@ func TestReplicateChainsGreenPath(t *testing.T) {
amIPartOfChannelMock := &mock.Mock{}
// For channel A
amIPartOfChannelMock.On("func2").Return(nil).Once()
amIPartOfChannelMock.On("func5").Return(nil).Once()
// For channel B
amIPartOfChannelMock.On("func2").Return(cluster.ErrNotInChannel).Once()
amIPartOfChannelMock.On("func5").Return(cluster.ErrNotInChannel).Once()
// 22 is for the system channel, and 31 is for channel A
blocksCommittedToLedger := make(chan *common.Block, 22+31)
blocksCommittedToLedgerA := make(chan *common.Block, 31)
blocksCommittedToSystemLedger := make(chan *common.Block, 22)
// Put 10 blocks in the ledger of channel A, to simulate
// that the ledger had blocks when the node started.
for seq := 0; seq < 10; seq++ {
blocksCommittedToLedgerA <- &common.Block{
Header: &common.BlockHeader{Number: uint64(seq)},
}
}
lw := &mocks.LedgerWriter{}
lw.On("Append", mock.Anything).Return(nil).Run(func(arg mock.Arguments) {
blocksCommittedToLedger <- arg.Get(0).(*common.Block)
lwA := &mocks.LedgerWriter{}
lwA.On("Append", mock.Anything).Return(nil).Run(func(arg mock.Arguments) {
blocksCommittedToLedgerA <- arg.Get(0).(*common.Block)
})
lwA.On("Height").Return(func() uint64 {
return uint64(len(blocksCommittedToLedgerA))
})
lwSystem := &mocks.LedgerWriter{}
lwSystem.On("Append", mock.Anything).Return(nil).Run(func(arg mock.Arguments) {
blocksCommittedToSystemLedger <- arg.Get(0).(*common.Block)
})
lwSystem.On("Height").Return(func() uint64 {
return uint64(len(blocksCommittedToSystemLedger))
})
lf := &mocks.LedgerFactory{}
lf.On("Close")
lf.On("GetOrCreate", "A").Return(lw, nil)
lf.On("GetOrCreate", "B").Return(lw, nil)
lf.On("GetOrCreate", "system").Return(lw, nil)
lf.On("GetOrCreate", "A").Return(lwA, nil)
lf.On("GetOrCreate", "system").Return(lwSystem, nil)
r := cluster.Replicator{
LedgerFactory: lf,
......@@ -430,21 +458,20 @@ func TestReplicateChainsGreenPath(t *testing.T) {
// We replicated the chains, so all that left is to ensure
// the blocks were committed in order, and all blocks we expected
// to be committed (for channel A and the system channel) were committed.
close(blocksCommittedToLedger)
assert.Len(t, blocksCommittedToLedger, cap(blocksCommittedToLedger))
close(blocksCommittedToLedgerA)
close(blocksCommittedToSystemLedger)
assert.Len(t, blocksCommittedToLedgerA, cap(blocksCommittedToLedgerA))
assert.Len(t, blocksCommittedToSystemLedger, cap(blocksCommittedToSystemLedger))
// Count the blocks for channel A
var expectedSequence uint64
for block := range blocksCommittedToLedger {
for block := range blocksCommittedToLedgerA {
assert.Equal(t, expectedSequence, block.Header.Number)
expectedSequence++
if expectedSequence == 31 {
break
}
}
// Count the blocks for the system channel
expectedSequence = uint64(0)
for block := range blocksCommittedToLedger {
for block := range blocksCommittedToSystemLedger {
assert.Equal(t, expectedSequence, block.Header.Number)
expectedSequence++
}
......
......@@ -58,9 +58,13 @@ type General struct {
}
type Cluster struct {
RootCAs []string
ListenAddress string
ListenPort uint16
ServerCertificate string
ServerPrivateKey string
ClientCertificate string
ClientPrivateKey string
RootCAs []string
DialTimeout time.Duration
RPCTimeout time.Duration
ReplicationBufferSize int
......
......@@ -21,7 +21,6 @@ import (
)
func TestSpawnEtcdRaft(t *testing.T) {
t.Skip()
gt := NewGomegaWithT(t)
cwd, err := filepath.Abs(".")
......@@ -59,9 +58,10 @@ func TestSpawnEtcdRaft(t *testing.T) {
// Launch the OSN
ordererProcess := launchOrderer(gt, cmd, orderer, tempDir, genesisBlockPath, fabricRootDir)
defer ordererProcess.Kill()
gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("Starting cluster listener on 127.0.0.1:5612"))
gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("Beginning to serve requests"))
gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("becomeLeader"))
ordererProcess.Kill()
}
func launchOrderer(gt *GomegaWithT, cmd *exec.Cmd, orderer, tempDir, genesisBlockPath, fabricRootDir string) *gexec.Session {
......@@ -79,6 +79,10 @@ func launchOrderer(gt *GomegaWithT, cmd *exec.Cmd, orderer, tempDir, genesisBloc
"ORDERER_OPERATIONS_TLS_ENABLED=false",
fmt.Sprintf("ORDERER_FILELEDGER_LOCATION=%s", filepath.Join(tempDir, "ledger")),
fmt.Sprintf("ORDERER_GENERAL_GENESISFILE=%s", genesisBlockPath),
"ORDERER_GENERAL_CLUSTER_LISTENPORT=5612",
"ORDERER_GENERAL_CLUSTER_LISTENADDRESS=127.0.0.1",
fmt.Sprintf("ORDERER_GENERAL_CLUSTER_SERVERCERTIFICATE=%s", filepath.Join(cwd, "testdata", "tls", "server.crt")),
fmt.Sprintf("ORDERER_GENERAL_CLUSTER_SERVERPRIVATEKEY=%s", filepath.Join(cwd, "testdata", "tls", "server.key")),
fmt.Sprintf("ORDERER_GENERAL_CLUSTER_CLIENTCERTIFICATE=%s", filepath.Join(cwd, "testdata", "tls", "server.crt")),
fmt.Sprintf("ORDERER_GENERAL_CLUSTER_CLIENTPRIVATEKEY=%s", filepath.Join(cwd, "testdata", "tls", "server.key")),
fmt.Sprintf("ORDERER_GENERAL_CLUSTER_ROOTCAS=[%s]", filepath.Join(cwd, "testdata", "tls", "ca.crt")),
......
......@@ -19,6 +19,7 @@ import (
"syscall"
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-lib-go/healthz"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto"
......@@ -51,12 +52,12 @@ import (
"github.com/hyperledger/fabric/protos/utils"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"
)
var logger = flogging.MustGetLogger("orderer.common.server")
//command line flags
// command line flags
var (
app = kingpin.New("orderer", "Hyperledger Fabric orderer node")
......@@ -98,14 +99,14 @@ func Start(cmd string, conf *localconfig.TopLevel) {
lf, _ := createLedgerFactory(conf)
clusterDialer := &cluster.PredicateDialer{}
clusterConfig := initializeClusterConfig(conf)
clusterDialer.SetConfig(clusterConfig)
clusterClientConfig := initializeClusterClientConfig(conf)
clusterDialer.SetConfig(clusterClientConfig)
// Only clusters that are equipped with a recent config block can replicate.
if clusterType && conf.General.GenesisMethod == "file" {
r := &replicationInitiator{
logger: logger,
secOpts: clusterConfig.SecOpts,
secOpts: clusterClientConfig.SecOpts,
bootstrapBlock: bootstrapBlock,
conf: conf,
lf: &ledgerFactory{lf},
......@@ -132,25 +133,49 @@ func Start(cmd string, conf *localconfig.TopLevel) {
ClientRootCAs: serverConfig.SecOpts.ClientRootCAs,
}
clusterServerConfig := serverConfig
clusterGRPCServer := grpcServer
if clusterType {
clusterServerConfig, clusterGRPCServer = configureClusterListener(conf, serverConfig, grpcServer, ioutil.ReadFile)
}
var servers = []*comm.GRPCServer{grpcServer}
// If we have a separate gRPC server for the cluster, we need to update its TLS
// CA certificate pool too.
if clusterGRPCServer != grpcServer {
servers = append(servers, clusterGRPCServer)
}
tlsCallback := func(bundle *channelconfig.Bundle) {
// only need to do this if mutual TLS is required or if the orderer node is part of a cluster
if grpcServer.MutualTLSRequired() || clusterType {
logger.Debug("Executing callback to update root CAs")
updateTrustedRoots(grpcServer, caSupport, bundle)
updateTrustedRoots(caSupport, bundle, servers...)
if clusterType {
updateClusterDialer(caSupport, clusterDialer, clusterConfig.SecOpts.ServerRootCAs)
updateClusterDialer(caSupport, clusterDialer, clusterClientConfig.SecOpts.ServerRootCAs)
}
}
}
manager := initializeMultichannelRegistrar(bootstrapBlock, clusterDialer, serverConfig, grpcServer, conf, signer, metricsProvider, opsSystem, lf, tlsCallback)
manager := initializeMultichannelRegistrar(bootstrapBlock, clusterDialer, clusterServerConfig, clusterGRPCServer, conf, signer, metricsProvider, opsSystem, lf, tlsCallback)
mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
server := NewServer(manager, metricsProvider, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS)
logger.Infof("Starting %s", metadata.GetVersionInfo())
go handleSignals(addPlatformSignals(map[os.Signal]func(){
syscall.SIGTERM: func() { grpcServer.Stop() },
syscall.SIGTERM: func() {
grpcServer.Stop()
if clusterGRPCServer != grpcServer {
clusterGRPCServer.Stop()
}
},