Commit 2e9fbe46 authored by yacovm's avatar yacovm Committed by Yacov Manevich
Browse files

[FAB-12761] Attach onboarding to orderer



This change set attaches the onboarding replication code
to the orderer.

Change-Id: Ib442c19c89c6ac5b8f9cfa9366ce39f08683f8af
Signed-off-by: default avataryacovm <yacovm@il.ibm.com>
parent 1c3fa5e9
......@@ -353,6 +353,11 @@ func lastConfigFromBlock(block *common.Block) (uint64, error) {
return utils.GetLastConfigIndexFromBlock(block)
}
// Close closes the ChainInspector
func (ci *ChainInspector) Close() {
ci.Puller.Close()
}
// Channels returns the list of channels
// that exist in the chain
func (ci *ChainInspector) Channels() []string {
......
......@@ -91,10 +91,29 @@ func Main() {
// Start provides a layer of abstraction for benchmark test
func Start(cmd string, conf *localconfig.TopLevel) {
genesisBlock := extractGenesisBlock(conf)
clusterType := isClusterType(genesisBlock)
bootstrapBlock := extractBootstrapBlock(conf)
clusterType := isClusterType(bootstrapBlock)
signer := localmsp.NewSigner()
lf, _ := createLedgerFactory(conf)
clusterDialer := &cluster.PredicateDialer{}
clusterConfig := initializeClusterConfig(conf)
clusterDialer.SetConfig(clusterConfig)
// Only clusters that are equipped with a recent config block can replicate.
if clusterType && conf.General.GenesisMethod == "file" {
r := &replicationInitiator{
logger: logger,
secOpts: clusterConfig.SecOpts,
bootstrapBlock: bootstrapBlock,
conf: conf,
lf: &ledgerFactory{lf},
signer: signer,
}
r.replicateIfNeeded()
}
opsSystem := newOperationsSystem(conf.Operations)
err := opsSystem.Start()
if err != nil {
......@@ -111,10 +130,6 @@ func Start(cmd string, conf *localconfig.TopLevel) {
ClientRootCAs: serverConfig.SecOpts.ClientRootCAs,
}
clusterDialer := &cluster.PredicateDialer{}
clusterConfig := initializeClusterConfig(conf)
clusterDialer.SetConfig(clusterConfig)
tlsCallback := func(bundle *channelconfig.Bundle) {
// only need to do this if mutual TLS is required or if the orderer node is part of a cluster
if grpcServer.MutualTLSRequired() || clusterType {
......@@ -126,7 +141,7 @@ func Start(cmd string, conf *localconfig.TopLevel) {
}
}
manager := initializeMultichannelRegistrar(clusterType, clusterDialer, serverConfig, grpcServer, conf, signer, metricsProvider, tlsCallback)
manager := initializeMultichannelRegistrar(bootstrapBlock, clusterDialer, serverConfig, grpcServer, conf, signer, metricsProvider, lf, tlsCallback)
mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
server := NewServer(manager, metricsProvider, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS)
......@@ -317,20 +332,20 @@ func grpcLeveler(ctx context.Context, fullMethod string) zapcore.Level {
}
}
func extractGenesisBlock(conf *localconfig.TopLevel) *cb.Block {
var genesisBlock *cb.Block
func extractBootstrapBlock(conf *localconfig.TopLevel) *cb.Block {
var bootstrapBlock *cb.Block
// Select the bootstrapping mechanism
switch conf.General.GenesisMethod {
case "provisional":
genesisBlock = encoder.New(genesisconfig.Load(conf.General.GenesisProfile)).GenesisBlockForChannel(conf.General.SystemChannel)
bootstrapBlock = encoder.New(genesisconfig.Load(conf.General.GenesisProfile)).GenesisBlockForChannel(conf.General.SystemChannel)
case "file":
genesisBlock = file.New(conf.General.GenesisFile).GenesisBlock()
bootstrapBlock = file.New(conf.General.GenesisFile).GenesisBlock()
default:
logger.Panic("Unknown genesis method:", conf.General.GenesisMethod)
}
return genesisBlock
return bootstrapBlock
}
func initializeBootstrapChannel(genesisBlock *cb.Block, lf blockledger.Factory) {
......@@ -391,16 +406,16 @@ func initializeLocalMsp(conf *localconfig.TopLevel) {
}
}
func initializeMultichannelRegistrar(isClusterType bool,
func initializeMultichannelRegistrar(bootstrapBlock *cb.Block,
clusterDialer *cluster.PredicateDialer,
srvConf comm.ServerConfig,
srv *comm.GRPCServer,
conf *localconfig.TopLevel,
signer crypto.LocalSigner,
metricsProvider metrics.Provider,
lf blockledger.Factory,
callbacks ...func(bundle *channelconfig.Bundle)) *multichannel.Registrar {
lf, _ := createLedgerFactory(conf)
genesisBlock := extractGenesisBlock(conf)
genesisBlock := extractBootstrapBlock(conf)
// Are we bootstrapping?
if len(lf.ChainIDs()) == 0 {
initializeBootstrapChannel(genesisBlock, lf)
......@@ -418,7 +433,7 @@ func initializeMultichannelRegistrar(isClusterType bool,
// Note, we pass a 'nil' channel here, we could pass a channel that
// closes if we wished to cleanup this routine on exit.
go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)
if isClusterType {
if isClusterType(bootstrapBlock) {
raftConsenter := etcdraft.New(clusterDialer, conf, srvConf, srv, registrar)
consenters["etcdraft"] = raftConsenter
}
......
......@@ -21,6 +21,7 @@ import (
"github.com/hyperledger/fabric/common/localmsp"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/common/metrics/prometheus"
"github.com/hyperledger/fabric/common/tools/configtxgen/encoder"
genesisconfig "github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/config/configtest"
......@@ -202,12 +203,12 @@ func TestInitializeBootstrapChannel(t *testing.T) {
if tc.panics {
assert.Panics(t, func() {
genesisBlock := extractGenesisBlock(bootstrapConfig)
genesisBlock := extractBootstrapBlock(bootstrapConfig)
initializeBootstrapChannel(genesisBlock, ledgerFactory)
})
} else {
assert.NotPanics(t, func() {
genesisBlock := extractGenesisBlock(bootstrapConfig)
genesisBlock := extractBootstrapBlock(bootstrapConfig)
initializeBootstrapChannel(genesisBlock, ledgerFactory)
})
}
......@@ -259,7 +260,9 @@ func TestInitializeMultiChainManager(t *testing.T) {
conf := genesisConfig(t)
assert.NotPanics(t, func() {
initializeLocalMsp(conf)
initializeMultichannelRegistrar(false, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, conf, localmsp.NewSigner(), &disabled.Provider{})
lf, _ := createLedgerFactory(conf)
bootBlock := encoder.New(genesisconfig.Load(genesisconfig.SampleDevModeSoloProfile)).GenesisBlockForChannel("system")
initializeMultichannelRegistrar(bootBlock, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, conf, localmsp.NewSigner(), &disabled.Provider{}, lf)
})
}
......@@ -320,7 +323,9 @@ func TestUpdateTrustedRoots(t *testing.T) {
updateTrustedRoots(grpcServer, caSupport, bundle)
}
}
initializeMultichannelRegistrar(false, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, callback)
lf, _ := createLedgerFactory(conf)
bootBlock := encoder.New(genesisconfig.Load(genesisconfig.SampleDevModeSoloProfile)).GenesisBlockForChannel("system")
initializeMultichannelRegistrar(bootBlock, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, lf, callback)
t.Logf("# app CAs: %d", len(caSupport.AppRootCAsByChain[genesisconfig.TestChainID]))
t.Logf("# orderer CAs: %d", len(caSupport.OrdererRootCAsByChain[genesisconfig.TestChainID]))
// mutual TLS not required so no updates should have occurred
......@@ -357,7 +362,7 @@ func TestUpdateTrustedRoots(t *testing.T) {
updateClusterDialer(caSupport, predDialer, clusterConf.SecOpts.ServerRootCAs)
}
}
initializeMultichannelRegistrar(false, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, callback)
initializeMultichannelRegistrar(bootBlock, &cluster.PredicateDialer{}, comm.ServerConfig{}, nil, genesisConfig(t), localmsp.NewSigner(), &disabled.Provider{}, lf, callback)
t.Logf("# app CAs: %d", len(caSupport.AppRootCAsByChain[genesisconfig.TestChainID]))
t.Logf("# orderer CAs: %d", len(caSupport.OrdererRootCAsByChain[genesisconfig.TestChainID]))
// mutual TLS is required so updates should have occurred
......
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package server
import (
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
"github.com/hyperledger/fabric/protos/common"
)
type replicationInitiator struct {
logger *flogging.FabricLogger
secOpts *comm.SecureOptions
conf *localconfig.TopLevel
bootstrapBlock *common.Block
lf cluster.LedgerFactory
signer crypto.LocalSigner
}
func (ri *replicationInitiator) replicateIfNeeded() {
if ri.bootstrapBlock.Header.Number == 0 {
ri.logger.Debug("Booted with a genesis block, replication isn't an option")
return
}
consenterCert := etcdraft.ConsenterCertificate(ri.secOpts.Certificate)
pullerConfig := cluster.PullerConfigFromTopLevelConfig(ri.conf, ri.secOpts.Key, ri.secOpts.Certificate, ri.signer)
puller, err := cluster.BlockPullerFromConfigBlock(pullerConfig, ri.bootstrapBlock)
if err != nil {
ri.logger.Panicf("Failed creating puller config from bootstrap block: %v", err)
}
pullerLogger := flogging.MustGetLogger("orderer.common.cluster")
replicator := &cluster.Replicator{
LedgerFactory: ri.lf,
SystemChannel: ri.conf.General.SystemChannel,
BootBlock: ri.bootstrapBlock,
Logger: pullerLogger,
AmIPartOfChannel: consenterCert.IsConsenterOfChannel,
Puller: puller,
ChannelLister: &cluster.ChainInspector{
Logger: pullerLogger,
Puller: puller,
LastConfigBlock: ri.bootstrapBlock,
},
}
replicationNeeded, err := replicator.IsReplicationNeeded()
if err != nil {
ri.logger.Panicf("Failed determining whether replication is needed: %v", err)
}
if !replicationNeeded {
ri.logger.Info("Replication isn't needed")
return
}
ri.logger.Info("Will now replicate chains")
replicator.ReplicateChains()
}
type ledgerFactory struct {
blockledger.Factory
}
func (lf *ledgerFactory) GetOrCreate(chainID string) (cluster.LedgerWriter, error) {
return lf.Factory.GetOrCreate(chainID)
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package server
import (
"fmt"
"io/ioutil"
"path/filepath"
"sync/atomic"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blockledger/ram"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/config/configtest"
"github.com/hyperledger/fabric/orderer/common/cluster/mocks"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func newServerNode(t *testing.T, key, cert []byte) *deliverServer {
srv, err := comm.NewGRPCServer("127.0.0.1:0", comm.ServerConfig{
SecOpts: &comm.SecureOptions{
Key: key,
Certificate: cert,
UseTLS: true,
},
})
if err != nil {
panic(err)
}
ds := &deliverServer{
t: t,
blockResponses: make(chan *orderer.DeliverResponse, 100),
srv: srv,
}
orderer.RegisterAtomicBroadcastServer(srv.Server(), ds)
go srv.Start()
return ds
}
type deliverServer struct {
isConnected int32
t *testing.T
srv *comm.GRPCServer
blockResponses chan *orderer.DeliverResponse
}
func (*deliverServer) Broadcast(orderer.AtomicBroadcast_BroadcastServer) error {
panic("implement me")
}
func (ds *deliverServer) Deliver(stream orderer.AtomicBroadcast_DeliverServer) error {
atomic.StoreInt32(&ds.isConnected, 1)
seekInfo, err := readSeekEnvelope(stream)
if err != nil {
panic(err)
}
if seekInfo.GetStart().GetSpecified() != nil {
return ds.deliverBlocks(stream)
}
if seekInfo.GetStart().GetNewest() != nil {
resp := <-ds.blockResponses
return stream.Send(resp)
}
panic(fmt.Sprintf("expected either specified or newest seek but got %v", seekInfo.GetStart()))
}
func readSeekEnvelope(stream orderer.AtomicBroadcast_DeliverServer) (*orderer.SeekInfo, error) {
env, err := stream.Recv()
if err != nil {
return nil, err
}
payload, err := utils.UnmarshalPayload(env.Payload)
if err != nil {
return nil, err
}
seekInfo := &orderer.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
return nil, err
}
return seekInfo, nil
}
func (ds *deliverServer) deliverBlocks(stream orderer.AtomicBroadcast_DeliverServer) error {
for {
blockChan := ds.blockResponses
response := <-blockChan
if response == nil {
return nil
}
if err := stream.Send(response); err != nil {
return err
}
}
}
func loadPEM(suffix string, t *testing.T) []byte {
b, err := ioutil.ReadFile(filepath.Join("testdata", "tls", suffix))
assert.NoError(t, err)
return b
}
func TestReplicateIfNeeded(t *testing.T) {
t.Parallel()
caCert := loadPEM("ca.crt", t)
key := loadPEM("server.key", t)
cert := loadPEM("server.crt", t)
deliverServer := newServerNode(t, key, cert)
defer deliverServer.srv.Stop()
flogging.ActivateSpec("testReplicateIfNeeded=debug")
cleanup := configtest.SetDevFabricConfigPath(t)
defer cleanup()
blockBytes, err := ioutil.ReadFile(filepath.Join("testdata", "genesis.block"))
assert.NoError(t, err)
bootBlock := &common.Block{}
assert.NoError(t, proto.Unmarshal(blockBytes, bootBlock))
bootBlock.Header.Number = 10
injectOrdererEndpoint(t, bootBlock, deliverServer.srv.Address())
copyBlock := func(block *common.Block, seq uint64) *common.Block {
res := &common.Block{}
proto.Unmarshal(utils.MarshalOrPanic(block), res)
res.Header.Number = seq
return res
}
deliverServer.blockResponses <- &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{Block: bootBlock},
}
blocks := make([]*common.Block, 11)
// Genesis block can be anything... not important for channel traversal
// since it is skipped.
blocks[0] = &common.Block{Header: &common.BlockHeader{}}
for seq := uint64(1); seq <= uint64(10); seq++ {
block := copyBlock(bootBlock, seq)
block.Header.PreviousHash = blocks[seq-1].Header.Hash()
blocks[seq] = block
deliverServer.blockResponses <- &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{Block: block},
}
}
// We close the block responses to mark the server side to return from
// the method dispatch.
close(deliverServer.blockResponses)
// We need to ensure the hash chain is valid with respect to the bootstrap block.
// Validating the hash chain itself when we traverse channels will be taken care
// of in FAB-12926.
bootBlock.Header.PreviousHash = blocks[9].Header.Hash()
var hooksActivated bool
for _, testCase := range []struct {
name string
panicValue string
systemLedgerHeight uint64
bootBlock *common.Block
secOpts *comm.SecureOptions
conf *localconfig.TopLevel
ledgerFactoryErr error
signer crypto.LocalSigner
zapHooks []func(zapcore.Entry) error
shouldConnect bool
}{
{
name: "Genesis block makes replication be skipped",
bootBlock: &common.Block{Header: &common.BlockHeader{Number: 0}},
systemLedgerHeight: 10,
zapHooks: []func(entry zapcore.Entry) error{
func(entry zapcore.Entry) error {
hooksActivated = true
assert.Equal(t, entry.Message, "Booted with a genesis block, replication isn't an option")
return nil
},
},
},
{
name: "Block puller initialization failure panics",
systemLedgerHeight: 10,
panicValue: "Failed creating puller config from bootstrap block: block data is nil",
bootBlock: &common.Block{Header: &common.BlockHeader{Number: 10}},
conf: &localconfig.TopLevel{},
secOpts: &comm.SecureOptions{},
},
{
name: "Is Replication needed fails",
systemLedgerHeight: 10,
ledgerFactoryErr: errors.New("I/O error"),
panicValue: "Failed determining whether replication is needed: I/O error",
bootBlock: bootBlock,
conf: &localconfig.TopLevel{},
secOpts: &comm.SecureOptions{
Certificate: cert,
Key: key,
},
},
{
name: "Replication isn't needed",
systemLedgerHeight: 11,
bootBlock: bootBlock,
conf: &localconfig.TopLevel{},
secOpts: &comm.SecureOptions{
Certificate: cert,
Key: key,
},
zapHooks: []func(entry zapcore.Entry) error{
func(entry zapcore.Entry) error {
hooksActivated = true
assert.Equal(t, entry.Message, "Replication isn't needed")
return nil
},
},
},
{
name: "Replication is needed",
shouldConnect: true,
systemLedgerHeight: 10,
bootBlock: bootBlock,
conf: &localconfig.TopLevel{
General: localconfig.General{
Cluster: localconfig.Cluster{
ReplicationPullTimeout: time.Millisecond * 100,
DialTimeout: time.Millisecond * 100,
RPCTimeout: time.Millisecond * 100,
ReplicationRetryTimeout: time.Millisecond * 100,
ReplicationBufferSize: 1,
},
},
},
secOpts: &comm.SecureOptions{
Certificate: cert,
Key: key,
UseTLS: true,
ServerRootCAs: [][]byte{caCert},
},
zapHooks: []func(entry zapcore.Entry) error{
func(entry zapcore.Entry) error {
hooksActivated = true
assert.Equal(t, entry.Message, "Will now replicate chains")
return nil
},
},
},
} {
t.Run(testCase.name, func(t *testing.T) {
lw := &mocks.LedgerWriter{}
lw.On("Height").Return(testCase.systemLedgerHeight).Once()
lf := &mocks.LedgerFactory{}
lf.On("GetOrCreate", mock.Anything).Return(lw, testCase.ledgerFactoryErr).Once()
lf.On("Close")
r := &replicationInitiator{
lf: lf,
logger: flogging.MustGetLogger("testReplicateIfNeeded"),
signer: testCase.signer,
conf: testCase.conf,
bootstrapBlock: testCase.bootBlock,
secOpts: testCase.secOpts,
}
if testCase.panicValue != "" {
assert.PanicsWithValue(t, testCase.panicValue, r.replicateIfNeeded)
return
}
// Else, we are not expected to panic.
r.logger = r.logger.WithOptions(zap.Hooks(testCase.zapHooks...))
// This is the method we're testing.
r.replicateIfNeeded()
// Ensure we ran the hooks for a test case that doesn't panic
assert.True(t, hooksActivated)
// Restore the flag for the next iteration
defer func() {
hooksActivated = false
}()
assert.Equal(t, testCase.shouldConnect, atomic.LoadInt32(&deliverServer.isConnected) == int32(1))
})
}
}
func TestLedgerFactory(t *testing.T) {
lf := &ledgerFactory{ramledger.New(1)}
lw, err := lf.GetOrCreate("mychannel")
assert.NoError(t, err)
assert.Equal(t, uint64(0), lw.Height())
}
func injectOrdererEndpoint(t *testing.T, block *common.Block, endpoint string) {
ordererAddresses := channelconfig.OrdererAddressesValue([]string{endpoint})
// Unwrap the layers until we reach the orderer addresses
env, err := utils.ExtractEnvelope(block, 0)
assert.NoError(t, err)
payload, err := utils.ExtractPayload(env)
assert.NoError(t, err)
confEnv, err := configtx.UnmarshalConfigEnvelope(payload.Data)
assert.NoError(t, err)
// Replace the orderer addresses
confEnv.Config.ChannelGroup.Values[ordererAddresses.Key()].Value = utils.MarshalOrPanic(ordererAddresses.Value())
// And put it back into the block
payload.Data = utils.MarshalOrPanic(confEnv)
env.Payload = utils.MarshalOrPanic(payload)
block.Data.Data[0] = utils.MarshalOrPanic(env)
block.Header.DataHash = block.Data.Hash()
}
-----BEGIN CERTIFICATE-----
MIIB8TCCAZegAwIBAgIQU59imQ+xl+FmwuiFyUgFezAKBggqhkjOPQQDAjBYMQsw
CQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZvcm5pYTEWMBQGA1UEBxMNU2FuIEZy
YW5jaXNjbzENMAsGA1UEChMET3JnMTENMAsGA1UEAxMET3JnMTAeFw0xNzA1MDgw
OTMwMzRaFw0yNzA1MDYwOTMwMzRaMFgxCzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpD
YWxpZm9ybmlhMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2NvMQ0wCwYDVQQKEwRPcmcx
MQ0wCwYDVQQDEwRPcmcxMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEFkpP6EqE
87ghFi25UWLvgPatxDiYKYaVSPvpo/XDJ0+9uUmK/C2r5Bvvxx1t8eTROwN77tEK
r+jbJIxX3ZYQMKNDMEEwDgYDVR0PAQH/BAQDAgGmMA8GA1UdJQQIMAYGBFUdJQAw
DwYDVR0TAQH/BAUwAwEB/zANBgNVHQ4EBgQEAQIDBDAKBggqhkjOPQQDAgNIADBF
AiEA1Xkrpq+wrmfVVuY12dJfMQlSx+v0Q3cYce9BE1i2mioCIAzqyduK/lHPI81b