Commit 4c7ffa33 authored by Yacov Manevich's avatar Yacov Manevich Committed by Gerrit Code Review
Browse files

Merge changes from topics 'FAB-12944', '(detached'

* changes:
  [FAB-12944] Pulling system chain failure should panic
  [FAB-12761] Attach onboarding to orderer
parents 83dc3bc2 b4954f7b
......@@ -110,7 +110,9 @@ func (r *Replicator) ReplicateChains() {
r.PullChannel(channel)
}
// Last, pull the system chain
r.PullChannel(r.SystemChannel)
if err := r.PullChannel(r.SystemChannel); err != nil {
r.Logger.Panicf("Failed pulling system channel: %v", err)
}
r.LedgerFactory.Close()
}
......@@ -353,6 +355,11 @@ func lastConfigFromBlock(block *common.Block) (uint64, error) {
return utils.GetLastConfigIndexFromBlock(block)
}
// Close closes the ChainInspector
func (ci *ChainInspector) Close() {
ci.Puller.Close()
}
// Channels returns the list of channels
// that exist in the chain
func (ci *ChainInspector) Channels() []string {
......
......@@ -87,8 +87,7 @@ func TestIsReplicationNeeded(t *testing.T) {
func TestReplicateChainsFailures(t *testing.T) {
for _, testCase := range []struct {
name string
blocks []*common.Block
expectedError string
isProbeResponseDelayed bool
latestBlockSeqInOrderer uint64
ledgerFactoryError error
appendBlockError error
......@@ -96,18 +95,21 @@ func TestReplicateChainsFailures(t *testing.T) {
mutateBlocks func([]*common.Block)
}{
{
name: "no block received",
expectedError: "failed obtaining the latest block for channel system",
name: "no block received",
expectedPanic: "Failed pulling system channel: " +
"failed obtaining the latest block for channel system",
},
{
name: "latest block seq is less than boot block seq",
expectedError: "latest height found among system channel(system) orderers is 19," +
expectedPanic: "Failed pulling system channel: " +
"latest height found among system channel(system) orderers is 19," +
" but the boot block's sequence is 21",
latestBlockSeqInOrderer: 18,
},
{
name: "hash chain mismatch",
expectedError: "block header mismatch on sequence 11, " +
expectedPanic: "Failed pulling system channel: " +
"block header mismatch on sequence 11, " +
"expected 9cd61b7e9a5ea2d128cc877e5304e7205888175a8032d40b97db7412dca41d9e, got 010203",
latestBlockSeqInOrderer: 21,
mutateBlocks: func(systemChannelBlocks []*common.Block) {
......@@ -136,6 +138,13 @@ func TestReplicateChainsFailures(t *testing.T) {
appendBlockError: errors.New("IO error"),
expectedPanic: "Failed to write block 0: IO error",
},
{
name: "failure pulling the system chain",
latestBlockSeqInOrderer: 21,
expectedPanic: "Failed pulling system channel: " +
"failed obtaining the latest block for channel system",
isProbeResponseDelayed: true,
},
} {
t.Run(testCase.name, func(t *testing.T) {
systemChannelBlocks := createBlockChain(0, 21)
......@@ -156,18 +165,25 @@ func TestReplicateChainsFailures(t *testing.T) {
bp := newBlockPuller(dialer, osn.srv.Address())
bp.FetchTimeout = time.Millisecond * 100
cl := &mocks.ChannelLister{}
cl.On("Channels").Return(nil)
cl.On("Close")
r := cluster.Replicator{
Logger: flogging.MustGetLogger("test"),
BootBlock: systemChannelBlocks[21],
SystemChannel: "system",
LedgerFactory: lf,
Puller: bp,
ChannelLister: cl,
}
if !testCase.isProbeResponseDelayed {
osn.enqueueResponse(testCase.latestBlockSeqInOrderer)
osn.enqueueResponse(testCase.latestBlockSeqInOrderer)
}
osn.addExpectProbeAssert()
osn.enqueueResponse(testCase.latestBlockSeqInOrderer)
osn.addExpectProbeAssert()
osn.enqueueResponse(testCase.latestBlockSeqInOrderer)
osn.addExpectPullAssert(0)
for _, block := range systemChannelBlocks {
osn.blockResponses <- &orderer.DeliverResponse{
......@@ -175,15 +191,7 @@ func TestReplicateChainsFailures(t *testing.T) {
}
}
if testCase.expectedPanic == "" {
err := r.PullChannel("system")
assert.EqualError(t, err, testCase.expectedError)
} else {
assert.PanicsWithValue(t, testCase.expectedPanic, func() {
r.PullChannel("system")
})
}
assert.PanicsWithValue(t, testCase.expectedPanic, r.ReplicateChains)
bp.Close()
dialer.assertAllConnectionsClosed(t)
})
......
......@@ -91,10 +91,29 @@ func Main() {
// Start provides a layer of abstraction for benchmark test
func Start(cmd string, conf *localconfig.TopLevel) {
genesisBlock := extractGenesisBlock(conf)
clusterType := isClusterType(genesisBlock)
bootstrapBlock := extractBootstrapBlock(conf)
clusterType := isClusterType(bootstrapBlock)
signer := localmsp.NewSigner()
lf, _ := createLedgerFactory(conf)
clusterDialer := &cluster.PredicateDialer{}
clusterConfig := initializeClusterConfig(conf)
clusterDialer.SetConfig(clusterConfig)
// Only clusters that are equipped with a recent config block can replicate.
if clusterType && conf.General.GenesisMethod == "file" {
r := &replicationInitiator{
logger: logger,
secOpts: clusterConfig.SecOpts,
bootstrapBlock: bootstrapBlock,
conf: conf,
lf: &ledgerFactory{lf},
signer: signer,
}
r.replicateIfNeeded()
}
opsSystem := newOperationsSystem(conf.Operations)
err := opsSystem.Start()
if err != nil {
......@@ -111,10 +130,6 @@ func Start(cmd string, conf *localconfig.TopLevel) {
ClientRootCAs: serverConfig.SecOpts.ClientRootCAs,
}
clusterDialer := &cluster.PredicateDialer{}
clusterConfig := initializeClusterConfig(conf)
clusterDialer.SetConfig(clusterConfig)
tlsCallback := func(bundle *channelconfig.Bundle) {
// only need to do this if mutual TLS is required or if the orderer node is part of a cluster
if grpcServer.MutualTLSRequired() || clusterType {
......@@ -126,7 +141,7 @@ func Start(cmd string, conf *localconfig.TopLevel) {
}
}
manager := initializeMultichannelRegistrar(clusterType, clusterDialer, serverConfig, grpcServer, conf, signer, metricsProvider, tlsCallback)
manager := initializeMultichannelRegistrar(bootstrapBlock, clusterDialer, serverConfig, grpcServer, conf, signer, metricsProvider, lf, tlsCallback)
mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
server := NewServer(manager, metricsProvider, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS)
......@@ -317,20 +332,20 @@ func grpcLeveler(ctx context.Context, fullMethod string) zapcore.Level {
}
}
func extractGenesisBlock(conf *localconfig.TopLevel) *cb.Block {
var genesisBlock *cb.Block
func extractBootstrapBlock(conf *localconfig.TopLevel) *cb.Block {
var bootstrapBlock *cb.Block
// Select the bootstrapping mechanism
switch conf.General.GenesisMethod {
case "provisional":
genesisBlock = encoder.New(genesisconfig.Load(conf.General.GenesisProfile)).GenesisBlockForChannel(conf.General.SystemChannel)
bootstrapBlock = encoder.New(genesisconfig.Load(conf.General.GenesisProfile)).GenesisBlockForChannel(conf.General.SystemChannel)
case "file":
genesisBlock = file.New(conf.General.GenesisFile).GenesisBlock()
bootstrapBlock = file.New(conf.General.GenesisFile).GenesisBlock()
default:
logger.Panic("Unknown genesis method:", conf.General.GenesisMethod)
}
return genesisBlock
return bootstrapBlock
}
func initializeBootstrapChannel(genesisBlock *cb.Block, lf blockledger.Factory) {
......@@ -391,16 +406,16 @@ func initializeLocalMsp(conf *localconfig.TopLevel) {
}
}
func initializeMultichannelRegistrar(isClusterType bool,
func initializeMultichannelRegistrar(bootstrapBlock *cb.Block,
clusterDialer *cluster.PredicateDialer,
srvConf comm.ServerConfig,
srv *comm.GRPCServer,
conf *localconfig.TopLevel,
signer crypto.LocalSigner,
metricsProvider metrics.Provider,
lf blockledger.Factory,
callbacks ...func(bundle *channelconfig.Bundle)) *multichannel.Registrar {
lf, _ := createLedgerFactory(conf)
genesisBlock := extractGenesisBlock(conf)
genesisBlock := extractBootstrapBlock(conf)
// Are we bootstrapping?
if len(lf.ChainIDs()) == 0 {
initializeBootstrapChannel(genesisBlock, lf)
......@@ -418,7 +433,7 @@ func initializeMultichannelRegistrar(isClusterType bool,
// Note, we pass a 'nil' channel here, we could pass a channel that
// closes if we wished to cleanup this routine on exit.
go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)
if isClusterType {
if isClusterType(bootstrapBlock) {
raftConsenter := etcdraft.New(clusterDialer, conf, srvConf, srv, registrar)
consenters["etcdraft"] = raftConsenter
}
......
......@@ -21,6 +21,7 @@ import (
"github.com/hyperledger/fabric/common/localmsp"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/common/metrics/prometheus"
"github.com/hyperledger/fabric/common/tools/configtxgen/encoder"
genesisconfig "github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/config/configtest"
......@@ -202,12 +203,12 @@ func TestInitializeBootstrapChannel(t *testing.T) {
if tc.panics {
assert.Panics(t, func() {
genesisBlock := extractGenesisBlock(bootstrapConfig)
genesisBlock := extractBootstrapBlock(bootstrapConfig)
initializeBootstrapChannel(genesisBlock, ledgerFactory)
})
} else {
assert.NotPanics(t, func() {
genesisBlock := extractGenesisBlock(bootstrapConfig)
genesisBlock := extractBootstrapBlock(bootstrapConfig)
initializeBootstrapChannel(genesisBlock, ledgerFactory)
})
}
......@@ -259,7 +260,9 @@ func TestInitializeMultiChainManager(t *testing.T) {
conf := genesisConfig(t)
assert.NotPanics(t, func() {
initializeLocalMsp(conf)
initializeMultichannelRegistrar(false, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, conf, localmsp.NewSigner(), &disabled.Provider{})
lf, _ := createLedgerFactory(conf)
bootBlock := encoder.New(genesisconfig.Load(genesisconfig.SampleDevModeSoloProfile)).GenesisBlockForChannel("system")
initializeMultichannelRegistrar(bootBlock, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, conf, localmsp.NewSigner(), &disabled.Provider{}, lf)
})
}
......@@ -320,7 +323,9 @@ func TestUpdateTrustedRoots(t *testing.T) {
updateTrustedRoots(grpcServer, caSupport, bundle)
}
}
initializeMultichannelRegistrar(false, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, callback)
lf, _ := createLedgerFactory(conf)
bootBlock := encoder.New(genesisconfig.Load(genesisconfig.SampleDevModeSoloProfile)).GenesisBlockForChannel("system")
initializeMultichannelRegistrar(bootBlock, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, lf, callback)
t.Logf("# app CAs: %d", len(caSupport.AppRootCAsByChain[genesisconfig.TestChainID]))
t.Logf("# orderer CAs: %d", len(caSupport.OrdererRootCAsByChain[genesisconfig.TestChainID]))
// mutual TLS not required so no updates should have occurred
......@@ -357,7 +362,7 @@ func TestUpdateTrustedRoots(t *testing.T) {
updateClusterDialer(caSupport, predDialer, clusterConf.SecOpts.ServerRootCAs)
}
}
initializeMultichannelRegistrar(false, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, callback)
initializeMultichannelRegistrar(bootBlock, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, lf, callback)
t.Logf("# app CAs: %d", len(caSupport.AppRootCAsByChain[genesisconfig.TestChainID]))
t.Logf("# orderer CAs: %d", len(caSupport.OrdererRootCAsByChain[genesisconfig.TestChainID]))
// mutual TLS is required so updates should have occurred
......
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package server
import (
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
"github.com/hyperledger/fabric/protos/common"
)
type replicationInitiator struct {
logger *flogging.FabricLogger
secOpts *comm.SecureOptions
conf *localconfig.TopLevel
bootstrapBlock *common.Block
lf cluster.LedgerFactory
signer crypto.LocalSigner
}
func (ri *replicationInitiator) replicateIfNeeded() {
if ri.bootstrapBlock.Header.Number == 0 {
ri.logger.Debug("Booted with a genesis block, replication isn't an option")
return
}
consenterCert := etcdraft.ConsenterCertificate(ri.secOpts.Certificate)
pullerConfig := cluster.PullerConfigFromTopLevelConfig(ri.conf, ri.secOpts.Key, ri.secOpts.Certificate, ri.signer)
puller, err := cluster.BlockPullerFromConfigBlock(pullerConfig, ri.bootstrapBlock)
if err != nil {
ri.logger.Panicf("Failed creating puller config from bootstrap block: %v", err)
}
pullerLogger := flogging.MustGetLogger("orderer.common.cluster")
replicator := &cluster.Replicator{
LedgerFactory: ri.lf,
SystemChannel: ri.conf.General.SystemChannel,
BootBlock: ri.bootstrapBlock,
Logger: pullerLogger,
AmIPartOfChannel: consenterCert.IsConsenterOfChannel,
Puller: puller,
ChannelLister: &cluster.ChainInspector{
Logger: pullerLogger,
Puller: puller,
LastConfigBlock: ri.bootstrapBlock,
},
}
replicationNeeded, err := replicator.IsReplicationNeeded()
if err != nil {
ri.logger.Panicf("Failed determining whether replication is needed: %v", err)
}
if !replicationNeeded {
ri.logger.Info("Replication isn't needed")
return
}
ri.logger.Info("Will now replicate chains")
replicator.ReplicateChains()
}
type ledgerFactory struct {
blockledger.Factory
}
func (lf *ledgerFactory) GetOrCreate(chainID string) (cluster.LedgerWriter, error) {
return lf.Factory.GetOrCreate(chainID)
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package server
import (
"fmt"
"io/ioutil"
"path/filepath"
"sync/atomic"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blockledger/ram"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/config/configtest"
"github.com/hyperledger/fabric/orderer/common/cluster/mocks"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func newServerNode(t *testing.T, key, cert []byte) *deliverServer {
srv, err := comm.NewGRPCServer("127.0.0.1:0", comm.ServerConfig{
SecOpts: &comm.SecureOptions{
Key: key,
Certificate: cert,
UseTLS: true,
},
})
if err != nil {
panic(err)
}
ds := &deliverServer{
t: t,
blockResponses: make(chan *orderer.DeliverResponse, 100),
srv: srv,
}
orderer.RegisterAtomicBroadcastServer(srv.Server(), ds)
go srv.Start()
return ds
}
type deliverServer struct {
isConnected int32
t *testing.T
srv *comm.GRPCServer
blockResponses chan *orderer.DeliverResponse
}
func (*deliverServer) Broadcast(orderer.AtomicBroadcast_BroadcastServer) error {
panic("implement me")
}
func (ds *deliverServer) Deliver(stream orderer.AtomicBroadcast_DeliverServer) error {
atomic.StoreInt32(&ds.isConnected, 1)
seekInfo, err := readSeekEnvelope(stream)
if err != nil {
panic(err)
}
if seekInfo.GetStart().GetSpecified() != nil {
return ds.deliverBlocks(stream)
}
if seekInfo.GetStart().GetNewest() != nil {
resp := <-ds.blockResponses
return stream.Send(resp)
}
panic(fmt.Sprintf("expected either specified or newest seek but got %v", seekInfo.GetStart()))
}
func readSeekEnvelope(stream orderer.AtomicBroadcast_DeliverServer) (*orderer.SeekInfo, error) {
env, err := stream.Recv()
if err != nil {
return nil, err
}
payload, err := utils.UnmarshalPayload(env.Payload)
if err != nil {
return nil, err
}
seekInfo := &orderer.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
return nil, err
}
return seekInfo, nil
}
func (ds *deliverServer) deliverBlocks(stream orderer.AtomicBroadcast_DeliverServer) error {
for {
blockChan := ds.blockResponses
response := <-blockChan
if response == nil {
return nil
}
if err := stream.Send(response); err != nil {
return err
}
}
}
func loadPEM(suffix string, t *testing.T) []byte {
b, err := ioutil.ReadFile(filepath.Join("testdata", "tls", suffix))
assert.NoError(t, err)
return b
}
func TestReplicateIfNeeded(t *testing.T) {
t.Parallel()
caCert := loadPEM("ca.crt", t)
key := loadPEM("server.key", t)
cert := loadPEM("server.crt", t)
deliverServer := newServerNode(t, key, cert)
defer deliverServer.srv.Stop()
flogging.ActivateSpec("testReplicateIfNeeded=debug")
cleanup := configtest.SetDevFabricConfigPath(t)
defer cleanup()
blockBytes, err := ioutil.ReadFile(filepath.Join("testdata", "genesis.block"))
assert.NoError(t, err)
bootBlock := &common.Block{}
assert.NoError(t, proto.Unmarshal(blockBytes, bootBlock))
bootBlock.Header.Number = 10
injectOrdererEndpoint(t, bootBlock, deliverServer.srv.Address())
copyBlock := func(block *common.Block, seq uint64) *common.Block {
res := &common.Block{}
proto.Unmarshal(utils.MarshalOrPanic(block), res)
res.Header.Number = seq
return res
}
deliverServer.blockResponses <- &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{Block: bootBlock},
}
blocks := make([]*common.Block, 11)
// Genesis block can be anything... not important for channel traversal
// since it is skipped.
blocks[0] = &common.Block{Header: &common.BlockHeader{}}
for seq := uint64(1); seq <= uint64(10); seq++ {
block := copyBlock(bootBlock, seq)
block.Header.PreviousHash = blocks[seq-1].Header.Hash()
blocks[seq] = block
deliverServer.blockResponses <- &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{Block: block},
}
}
// We close the block responses to mark the server side to return from
// the method dispatch.
close(deliverServer.blockResponses)
// We need to ensure the hash chain is valid with respect to the bootstrap block.
// Validating the hash chain itself when we traverse channels will be taken care
// of in FAB-12926.
bootBlock.Header.PreviousHash = blocks[9].Header.Hash()
var hooksActivated bool
for _, testCase := range []struct {
name string
panicValue string
systemLedgerHeight uint64
bootBlock *common.Block
secOpts *comm.SecureOptions
conf *localconfig.TopLevel
ledgerFactoryErr error
signer crypto.LocalSigner
zapHooks []func(zapcore.Entry) error
shouldConnect bool
}{
{
name: "Genesis block makes replication be skipped",
bootBlock: &common.Block{Header: &common.BlockHeader{Number: 0}},
systemLedgerHeight: 10,
zapHooks: []func(entry zapcore.Entry) error{
func(entry zapcore.Entry) error {
hooksActivated = true
assert.Equal(t, entry.Message, "Booted with a genesis block, replication isn't an option")
return nil
},
},
},
{
name: "Block puller initialization failure panics",
systemLedgerHeight: 10,
panicValue: "Failed creating puller config from bootstrap block: block data is nil",
bootBlock: &common.Block{Header: &common.BlockHeader{Number: 10}},
conf: &localconfig.TopLevel{},
secOpts: &comm.SecureOptions{},
},
{
name: "Is Replication needed fails",
systemLedgerHeight: 10,
ledgerFactoryErr: errors.New("I/O error"),
panicValue: "Failed determining whether replication is needed: I/O error",
bootBlock: bootBlock,
conf: &localconfig.TopLevel{},
secOpts: &comm.SecureOptions{
Certificate: cert,
Key: key,
},
},
{
name: "Replication isn't needed",
systemLedgerHeight: 11,
bootBlock: bootBlock,
conf: &localconfig.TopLevel{},
secOpts: &comm.SecureOptions{
Certificate: cert,
Key: key,
},
zapHooks: []func(entry zapcore.Entry) error{
func(entry zapcore.Entry) error {
hooksActivated = true
assert.Equal(t, entry.Message, "Replication isn't needed")
return nil
},
},
},
{
name: "Replication is needed, but pulling fails",
panicValue: "Failed pulling system channel: " +
"failed obtaining the latest block for channel system",
shouldConnect: true,
systemLedgerHeight: 10,
bootBlock: bootBlock,
conf: &localconfig.TopLevel{
General: localconfig.General{