Commit 97243c77 authored by yacovm's avatar yacovm
Browse files

[FAB-12760] Pull chains for onboarding



This change set implements logic for pulling all needed chains
the newly joined orderer is part of.

It adds a Replicator struct which has the following methods:
- IsReplicationNeeded: Expected to run at boot time to decide whether
  to actiate the onboarding code or skip to resume normal boot.
- ReplicateChains: Orchestrates replication of all (needed) chains by
  iterating the system channel and inspecting all channel creation
  transactions (by using previously code in past commits),
  and calling PullChannel on each needed channel.
- PullChannel: Pulls a channel and commits the blocks to the ledger.

Unit tests with full code coverage are included.

Change-Id: Ice450f5702e6316e8f3aa9aabdeb952749a9ff66
Signed-off-by: default avataryacovm <yacovm@il.ibm.com>
parent 61656ce8
......@@ -96,6 +96,7 @@ func (p *BlockPuller) HeightsByEndpoints() map[string]uint64 {
endpointInfo.conn.Close()
res[endpoint] = endpointInfo.lastBlockSeq + 1
}
p.Logger.Info("Returning the heights of OSNs mapped by endpoints", res)
return res
}
......@@ -371,6 +372,9 @@ func extractBlockFromResponse(resp *orderer.DeliverResponse) (*common.Block, err
if block == nil {
return nil, errors.New("block is nil")
}
if block.Data == nil {
return nil, errors.New("block data is nil")
}
if block.Header == nil {
return nil, errors.New("block header is nil")
}
......
......@@ -111,20 +111,24 @@ func noopBlockVerifierf(_ []*common.Block) error {
return nil
}
func readSeekEnvelope(stream orderer.AtomicBroadcast_DeliverServer) (*orderer.SeekInfo, error) {
func readSeekEnvelope(stream orderer.AtomicBroadcast_DeliverServer) (*orderer.SeekInfo, string, error) {
env, err := stream.Recv()
if err != nil {
return nil, err
return nil, "", err
}
payload, err := utils.UnmarshalPayload(env.Payload)
if err != nil {
return nil, err
return nil, "", err
}
seekInfo := &orderer.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
return nil, err
return nil, "", err
}
return seekInfo, nil
chdr := &common.ChannelHeader{}
if err = proto.Unmarshal(payload.Header.ChannelHeader, chdr); err != nil {
return nil, "", err
}
return seekInfo, chdr.ChannelId, nil
}
type deliverServer struct {
......@@ -132,7 +136,7 @@ type deliverServer struct {
sync.Mutex
err error
srv *comm.GRPCServer
seekAssertions chan func(*orderer.SeekInfo)
seekAssertions chan func(*orderer.SeekInfo, string)
blockResponses chan *orderer.DeliverResponse
}
......@@ -150,33 +154,58 @@ func (ds *deliverServer) Deliver(stream orderer.AtomicBroadcast_DeliverServer) e
ds.Lock()
err := ds.err
ds.Unlock()
if err != nil {
return ds.err
return err
}
seekInfo, err := readSeekEnvelope(stream)
seekInfo, channel, err := readSeekEnvelope(stream)
if err != nil {
panic(err)
}
// Get the next seek assertion and ensure the next seek is of the expected type
seekAssert := <-ds.seekAssertions
seekAssert(seekInfo)
seekAssert(seekInfo, channel)
if seekInfo.GetStart().GetSpecified() != nil {
for resp := range ds.blockResponses {
if err := stream.Send(resp); err != nil {
return nil
}
}
return nil
return ds.deliverBlocks(stream)
}
if seekInfo.GetStart().GetNewest() != nil {
resp := <-ds.blockResponses
resp := <-ds.blocks()
return stream.Send(resp)
}
panic(fmt.Sprintf("expected either specified or newest seek but got %v", seekInfo.GetStart()))
}
func (ds *deliverServer) deliverBlocks(stream orderer.AtomicBroadcast_DeliverServer) error {
for {
blockChan := ds.blocks()
response := <-blockChan
// A nil response is a signal from the test to close the stream.
// This is needed to avoid reading from the block buffer, hence
// consuming by accident a block that is tabled to be pulled
// later in the test.
if response == nil {
return nil
}
if err := stream.Send(response); err != nil {
return err
}
}
}
func (ds *deliverServer) blocks() chan *orderer.DeliverResponse {
ds.Lock()
defer ds.Unlock()
blockChan := ds.blockResponses
return blockChan
}
func (ds *deliverServer) setBlocks(blocks chan *orderer.DeliverResponse) {
ds.Lock()
defer ds.Unlock()
ds.blockResponses = blocks
}
func (ds *deliverServer) port() int {
_, portStr, err := net.SplitHostPort(ds.srv.Address())
assert.NoError(ds.t, err)
......@@ -190,7 +219,7 @@ func (ds *deliverServer) resurrect() {
var err error
// copy the responses channel into a fresh one
respChan := make(chan *orderer.DeliverResponse, 100)
for resp := range ds.blockResponses {
for resp := range ds.blocks() {
respChan <- resp
}
ds.blockResponses = respChan
......@@ -204,23 +233,23 @@ func (ds *deliverServer) resurrect() {
func (ds *deliverServer) stop() {
ds.srv.Stop()
close(ds.blockResponses)
close(ds.blocks())
}
func (ds *deliverServer) enqueueResponse(seq uint64) {
ds.blockResponses <- &orderer.DeliverResponse{
ds.blocks() <- &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{Block: common.NewBlock(seq, nil)},
}
}
func (ds *deliverServer) addExpectProbeAssert() {
ds.seekAssertions <- func(info *orderer.SeekInfo) {
ds.seekAssertions <- func(info *orderer.SeekInfo, _ string) {
assert.NotNil(ds.t, info.GetStart().GetNewest())
}
}
func (ds *deliverServer) addExpectPullAssert(seq uint64) {
ds.seekAssertions <- func(info *orderer.SeekInfo) {
ds.seekAssertions <- func(info *orderer.SeekInfo, _ string) {
assert.NotNil(ds.t, info.GetStart().GetSpecified())
assert.Equal(ds.t, seq, info.GetStart().GetSpecified().Number)
}
......@@ -233,7 +262,7 @@ func newClusterNode(t *testing.T) *deliverServer {
}
ds := &deliverServer{
t: t,
seekAssertions: make(chan func(*orderer.SeekInfo), 100),
seekAssertions: make(chan func(*orderer.SeekInfo, string), 100),
blockResponses: make(chan *orderer.DeliverResponse, 100),
srv: srv,
}
......@@ -704,8 +733,8 @@ func TestBlockPullerFailures(t *testing.T) {
malformBlockSignatureAndRecreateOSNBuffer := func(osn *deliverServer, bp *cluster.BlockPuller) {
bp.VerifyBlockSequence = func([]*common.Block) error {
close(osn.blockResponses)
osn.blockResponses = make(chan *orderer.DeliverResponse, 100)
close(osn.blocks())
osn.setBlocks(make(chan *orderer.DeliverResponse, 100))
osn.enqueueResponse(1)
osn.enqueueResponse(2)
osn.enqueueResponse(3)
......@@ -834,6 +863,11 @@ func TestBlockPullerBadBlocks(t *testing.T) {
return resp
}
removeData := func(resp *orderer.DeliverResponse) *orderer.DeliverResponse {
resp.GetBlock().Data = nil
return resp
}
removeMetadata := func(resp *orderer.DeliverResponse) *orderer.DeliverResponse {
resp.GetBlock().Metadata = nil
return resp
......@@ -861,6 +895,11 @@ func TestBlockPullerBadBlocks(t *testing.T) {
corruptBlock: removeHeader,
expectedErrMsg: "block header is nil",
},
{
name: "nil data",
corruptBlock: removeData,
expectedErrMsg: "block data is nil",
},
{
name: "nil metadata",
corruptBlock: removeMetadata,
......@@ -910,9 +949,9 @@ func TestBlockPullerBadBlocks(t *testing.T) {
if strings.Contains(entry.Message, fmt.Sprintf("Failed pulling blocks: %s", testCase.expectedErrMsg)) {
detectedBadBlock.Done()
// Close the channel to make the current server-side deliver stream close
close(osn.blockResponses)
close(osn.blocks())
// Ane reset the block buffer to be able to write into it again
osn.blockResponses = make(chan *orderer.DeliverResponse, 100)
osn.setBlocks(make(chan *orderer.DeliverResponse, 100))
// Put a correct block after it, 1 for the probing and 1 for the fetch
osn.enqueueResponse(10)
osn.enqueueResponse(10)
......
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import mock "github.com/stretchr/testify/mock"
// ChannelLister is an autogenerated mock type for the ChannelLister type
type ChannelLister struct {
mock.Mock
}
// Channels provides a mock function with given fields:
func (_m *ChannelLister) Channels() []string {
ret := _m.Called()
var r0 []string
if rf, ok := ret.Get(0).(func() []string); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]string)
}
}
return r0
}
// Close provides a mock function with given fields:
func (_m *ChannelLister) Close() {
_m.Called()
}
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import cluster "github.com/hyperledger/fabric/orderer/common/cluster"
import mock "github.com/stretchr/testify/mock"
// LedgerFactory is an autogenerated mock type for the LedgerFactory type
type LedgerFactory struct {
mock.Mock
}
// ChainIDs provides a mock function with given fields:
func (_m *LedgerFactory) ChainIDs() []string {
ret := _m.Called()
var r0 []string
if rf, ok := ret.Get(0).(func() []string); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]string)
}
}
return r0
}
// Close provides a mock function with given fields:
func (_m *LedgerFactory) Close() {
_m.Called()
}
// GetOrCreate provides a mock function with given fields: chainID
func (_m *LedgerFactory) GetOrCreate(chainID string) (cluster.LedgerWriter, error) {
ret := _m.Called(chainID)
var r0 cluster.LedgerWriter
if rf, ok := ret.Get(0).(func(string) cluster.LedgerWriter); ok {
r0 = rf(chainID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(cluster.LedgerWriter)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(chainID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import common "github.com/hyperledger/fabric/protos/common"
import mock "github.com/stretchr/testify/mock"
// LedgerWriter is an autogenerated mock type for the LedgerWriter type
type LedgerWriter struct {
mock.Mock
}
// Append provides a mock function with given fields: block
func (_m *LedgerWriter) Append(block *common.Block) error {
ret := _m.Called(block)
var r0 error
if rf, ok := ret.Get(0).(func(*common.Block) error); ok {
r0 = rf(block)
} else {
r0 = ret.Error(0)
}
return r0
}
// Height provides a mock function with given fields:
func (_m *LedgerWriter) Height() uint64 {
ret := _m.Called()
var r0 uint64
if rf, ok := ret.Get(0).(func() uint64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(uint64)
}
return r0
}
......@@ -7,13 +7,16 @@ SPDX-License-Identifier: Apache-2.0
package cluster
import (
"bytes"
"encoding/base64"
"encoding/hex"
"encoding/pem"
"time"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
......@@ -24,6 +27,201 @@ const (
RetryTimeout = time.Second * 10
)
// PullerConfigFromTopLevelConfig creates a PullerConfig from a TopLevel config,
// and from a signer and TLS key cert pair.
// The PullerConfig's channel is initialized to be the system channel.
func PullerConfigFromTopLevelConfig(conf *localconfig.TopLevel, tlsKey, tlsCert []byte, signer crypto.LocalSigner) PullerConfig {
return PullerConfig{
Channel: conf.General.SystemChannel,
MaxTotalBufferBytes: conf.General.Cluster.ReplicationBufferSize,
Timeout: conf.General.Cluster.RPCTimeout,
TLSKey: tlsKey,
TLSCert: tlsCert,
Signer: signer,
}
}
//go:generate mockery -dir . -name LedgerWriter -case underscore -output mocks/
// LedgerWriter allows the caller to write blocks and inspect the height
type LedgerWriter interface {
// Append a new block to the ledger
Append(block *common.Block) error
// Height returns the number of blocks on the ledger
Height() uint64
}
//go:generate mockery -dir . -name LedgerFactory -case underscore -output mocks/
// LedgerFactory retrieves or creates new ledgers by chainID
type LedgerFactory interface {
// GetOrCreate gets an existing ledger (if it exists)
// or creates it if it does not
GetOrCreate(chainID string) (LedgerWriter, error)
// Close releases all resources acquired by the factory
Close()
}
//go:generate mockery -dir . -name ChannelLister -case underscore -output mocks/
// ChannelLister returns a list of channels
type ChannelLister interface {
// Channels returns a list of channels
Channels() []string
// Close closes the ChannelLister
Close()
}
// Replicator replicates chains
type Replicator struct {
SystemChannel string
ChannelLister ChannelLister
Logger *flogging.FabricLogger
Puller *BlockPuller
BootBlock *common.Block
AmIPartOfChannel selfMembershipPredicate
LedgerFactory LedgerFactory
}
// IsReplicationNeeded returns whether replication is needed,
// or the cluster node can resume standard boot flow.
func (r *Replicator) IsReplicationNeeded() (bool, error) {
defer r.LedgerFactory.Close()
systemChannelLedger, err := r.LedgerFactory.GetOrCreate(r.SystemChannel)
if err != nil {
return false, err
}
lastBlockSeq := systemChannelLedger.Height() - 1
if r.BootBlock.Header.Number > lastBlockSeq {
return true, nil
}
return false, nil
}
// ReplicateChains pulls chains and commits them.
func (r *Replicator) ReplicateChains() {
channels := r.discoverChannels()
channels2Pull := r.channelsToPull(channels)
r.Logger.Info("Found myself in", len(channels2Pull), "channels:", channels2Pull)
for _, channel := range channels2Pull {
r.PullChannel(channel)
}
// Last, pull the system chain
r.PullChannel(r.SystemChannel)
r.LedgerFactory.Close()
}
func (r *Replicator) discoverChannels() []string {
r.Logger.Debug("Entering")
defer r.Logger.Debug("Exiting")
channels := r.ChannelLister.Channels()
r.Logger.Info("Discovered", len(channels), "channels:", channels)
r.ChannelLister.Close()
return channels
}
// PullChannel pulls the given channel from some orderer,
// and commits it to the ledger.
func (r *Replicator) PullChannel(channel string) error {
r.Logger.Info("Pulling channel", channel)
puller := r.Puller.Clone()
defer puller.Close()
puller.Channel = channel
endpoint, latestHeight := latestHeightAndEndpoint(puller)
if endpoint == "" {
return errors.Errorf("failed obtaining the latest block for channel %s", channel)
}
r.Logger.Info("Latest block height for channel", channel, "is", latestHeight)
// Ensure that if we pull the system channel, the latestHeight is bigger or equal to the
// bootstrap block of the system channel.
// Otherwise, we'd be left with a block gap.
if channel == r.SystemChannel && latestHeight-1 < r.BootBlock.Header.Number {
return errors.Errorf("latest height found among system channel(%s) orderers is %d, but the boot block's "+
"sequence is %d", r.SystemChannel, latestHeight, r.BootBlock.Header.Number)
}
return r.pullChannelBlocks(channel, puller, latestHeight)
}
func (r *Replicator) pullChannelBlocks(channel string, puller ChainPuller, latestHeight uint64) error {
ledger, err := r.LedgerFactory.GetOrCreate(channel)
if err != nil {
r.Logger.Panicf("Failed to create a ledger for channel %s: %v", channel, err)
}
// Pull the genesis block and remember its hash.
genesisBlock := puller.PullBlock(0)
r.appendBlock(genesisBlock, ledger)
actualPrevHash := genesisBlock.Header.Hash()
for seq := uint64(1); seq < latestHeight; seq++ {
block := puller.PullBlock(seq)
reportedPrevHash := block.Header.PreviousHash
if !bytes.Equal(reportedPrevHash, actualPrevHash) {
return errors.Errorf("block header mismatch on sequence %d, expected %x, got %x",
block.Header.Number, actualPrevHash, reportedPrevHash)
}
actualPrevHash = block.Header.Hash()
if channel == r.SystemChannel && block.Header.Number == r.BootBlock.Header.Number {
r.compareBootBlockWithSystemChannelLastConfigBlock(block)
r.appendBlock(block, ledger)
// No need to pull further blocks from the system channel
return nil
}
r.appendBlock(block, ledger)
}
return nil
}
func (r *Replicator) appendBlock(block *common.Block, ledger LedgerWriter) {
if err := ledger.Append(block); err != nil {
r.Logger.Panicf("Failed to write block %d: %v", block.Header.Number, err)
}
}
func (r *Replicator) compareBootBlockWithSystemChannelLastConfigBlock(block *common.Block) {
// Overwrite the received block's data hash
block.Header.DataHash = block.Data.Hash()
bootBlockHash := r.BootBlock.Header.Hash()
retrievedBlockHash := block.Header.Hash()
if bytes.Equal(bootBlockHash, retrievedBlockHash) {
return
}
r.Logger.Panicf("Block header mismatch on last system channel block, expected %s, got %s",
hex.EncodeToString(bootBlockHash), hex.EncodeToString(retrievedBlockHash))
}
func (r *Replicator) channelsToPull(channels []string) []string {
r.Logger.Info("Will now pull channels:", channels)
var channelsToPull []string
for _, channel := range channels {
r.Logger.Info("Pulling chain for", channel)
puller := r.Puller.Clone()
puller.Channel = channel
// Disable puller buffering when we check whether we are in the channel,
// as we only need to know about a single block.
bufferSize := puller.MaxTotalBufferBytes
puller.MaxTotalBufferBytes = 1
err := Participant(puller, r.AmIPartOfChannel)
puller.Close()
// Restore the previous buffer size
puller.MaxTotalBufferBytes = bufferSize
if err == NotInChannelError {
r.Logger.Info("I do not belong to channel", channel, ", skipping chain retrieval")
continue
}
if err != nil {
r.Logger.Panicf("Failed classifying whether I belong to channel %s: %v, skipping chain retrieval", channel, err)
continue
}
channelsToPull = append(channelsToPull, channel)
}
return channelsToPull
}
// PullerConfig configures a BlockPuller.
type PullerConfig struct {
TLSKey []byte
......@@ -123,13 +321,15 @@ func Participant(puller ChainPuller, analyzeLastConfBlock selfMembershipPredicat
if endpoint == "" {
return errors.New("no available orderer")
}
lastBlock := puller.PullBlock(latestHeight - 1)
lastConfNumber, err := lastConfigFromBlock(lastBlock)
if err != nil {
return err
}
// The last config block is smaller than the latest height,
// and a block iterator on the server side is a sequenced one.
// So we need to reset the puller if we wish to pull an earlier block.
puller.Close()
lastConfigBlock := puller.PullBlock(lastConfNumber)
return analyzeLastConfBlock(lastConfigBlock)
}
......
......@@ -20,13 +20,430 @@ import (
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/cluster/mocks"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/msp"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestIsReplicationNeeded(t *testing.T) {
for _, testCase := range []struct {
name string
bootBlock *common.Block
systemChannelHeight uint64
systemChannelError error
expectedError string
replicationNeeded bool
}{