Commit bf0f8423 authored by Artem Barger's avatar Artem Barger Committed by Gerrit Code Review
Browse files

Merge "[FAB-12760] Pull chains for onboarding"

parents 07ceffb7 97243c77
......@@ -96,6 +96,7 @@ func (p *BlockPuller) HeightsByEndpoints() map[string]uint64 {
endpointInfo.conn.Close()
res[endpoint] = endpointInfo.lastBlockSeq + 1
}
p.Logger.Info("Returning the heights of OSNs mapped by endpoints", res)
return res
}
......@@ -371,6 +372,9 @@ func extractBlockFromResponse(resp *orderer.DeliverResponse) (*common.Block, err
if block == nil {
return nil, errors.New("block is nil")
}
if block.Data == nil {
return nil, errors.New("block data is nil")
}
if block.Header == nil {
return nil, errors.New("block header is nil")
}
......
......@@ -111,20 +111,24 @@ func noopBlockVerifierf(_ []*common.Block) error {
return nil
}
func readSeekEnvelope(stream orderer.AtomicBroadcast_DeliverServer) (*orderer.SeekInfo, error) {
func readSeekEnvelope(stream orderer.AtomicBroadcast_DeliverServer) (*orderer.SeekInfo, string, error) {
env, err := stream.Recv()
if err != nil {
return nil, err
return nil, "", err
}
payload, err := utils.UnmarshalPayload(env.Payload)
if err != nil {
return nil, err
return nil, "", err
}
seekInfo := &orderer.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
return nil, err
return nil, "", err
}
return seekInfo, nil
chdr := &common.ChannelHeader{}
if err = proto.Unmarshal(payload.Header.ChannelHeader, chdr); err != nil {
return nil, "", err
}
return seekInfo, chdr.ChannelId, nil
}
type deliverServer struct {
......@@ -132,7 +136,7 @@ type deliverServer struct {
sync.Mutex
err error
srv *comm.GRPCServer
seekAssertions chan func(*orderer.SeekInfo)
seekAssertions chan func(*orderer.SeekInfo, string)
blockResponses chan *orderer.DeliverResponse
}
......@@ -150,33 +154,58 @@ func (ds *deliverServer) Deliver(stream orderer.AtomicBroadcast_DeliverServer) e
ds.Lock()
err := ds.err
ds.Unlock()
if err != nil {
return ds.err
return err
}
seekInfo, err := readSeekEnvelope(stream)
seekInfo, channel, err := readSeekEnvelope(stream)
if err != nil {
panic(err)
}
// Get the next seek assertion and ensure the next seek is of the expected type
seekAssert := <-ds.seekAssertions
seekAssert(seekInfo)
seekAssert(seekInfo, channel)
if seekInfo.GetStart().GetSpecified() != nil {
for resp := range ds.blockResponses {
if err := stream.Send(resp); err != nil {
return nil
}
}
return nil
return ds.deliverBlocks(stream)
}
if seekInfo.GetStart().GetNewest() != nil {
resp := <-ds.blockResponses
resp := <-ds.blocks()
return stream.Send(resp)
}
panic(fmt.Sprintf("expected either specified or newest seek but got %v", seekInfo.GetStart()))
}
func (ds *deliverServer) deliverBlocks(stream orderer.AtomicBroadcast_DeliverServer) error {
for {
blockChan := ds.blocks()
response := <-blockChan
// A nil response is a signal from the test to close the stream.
// This is needed to avoid reading from the block buffer, hence
// consuming by accident a block that is tabled to be pulled
// later in the test.
if response == nil {
return nil
}
if err := stream.Send(response); err != nil {
return err
}
}
}
func (ds *deliverServer) blocks() chan *orderer.DeliverResponse {
ds.Lock()
defer ds.Unlock()
blockChan := ds.blockResponses
return blockChan
}
func (ds *deliverServer) setBlocks(blocks chan *orderer.DeliverResponse) {
ds.Lock()
defer ds.Unlock()
ds.blockResponses = blocks
}
func (ds *deliverServer) port() int {
_, portStr, err := net.SplitHostPort(ds.srv.Address())
assert.NoError(ds.t, err)
......@@ -190,7 +219,7 @@ func (ds *deliverServer) resurrect() {
var err error
// copy the responses channel into a fresh one
respChan := make(chan *orderer.DeliverResponse, 100)
for resp := range ds.blockResponses {
for resp := range ds.blocks() {
respChan <- resp
}
ds.blockResponses = respChan
......@@ -204,23 +233,23 @@ func (ds *deliverServer) resurrect() {
func (ds *deliverServer) stop() {
ds.srv.Stop()
close(ds.blockResponses)
close(ds.blocks())
}
func (ds *deliverServer) enqueueResponse(seq uint64) {
ds.blockResponses <- &orderer.DeliverResponse{
ds.blocks() <- &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{Block: common.NewBlock(seq, nil)},
}
}
func (ds *deliverServer) addExpectProbeAssert() {
ds.seekAssertions <- func(info *orderer.SeekInfo) {
ds.seekAssertions <- func(info *orderer.SeekInfo, _ string) {
assert.NotNil(ds.t, info.GetStart().GetNewest())
}
}
func (ds *deliverServer) addExpectPullAssert(seq uint64) {
ds.seekAssertions <- func(info *orderer.SeekInfo) {
ds.seekAssertions <- func(info *orderer.SeekInfo, _ string) {
assert.NotNil(ds.t, info.GetStart().GetSpecified())
assert.Equal(ds.t, seq, info.GetStart().GetSpecified().Number)
}
......@@ -233,7 +262,7 @@ func newClusterNode(t *testing.T) *deliverServer {
}
ds := &deliverServer{
t: t,
seekAssertions: make(chan func(*orderer.SeekInfo), 100),
seekAssertions: make(chan func(*orderer.SeekInfo, string), 100),
blockResponses: make(chan *orderer.DeliverResponse, 100),
srv: srv,
}
......@@ -704,8 +733,8 @@ func TestBlockPullerFailures(t *testing.T) {
malformBlockSignatureAndRecreateOSNBuffer := func(osn *deliverServer, bp *cluster.BlockPuller) {
bp.VerifyBlockSequence = func([]*common.Block) error {
close(osn.blockResponses)
osn.blockResponses = make(chan *orderer.DeliverResponse, 100)
close(osn.blocks())
osn.setBlocks(make(chan *orderer.DeliverResponse, 100))
osn.enqueueResponse(1)
osn.enqueueResponse(2)
osn.enqueueResponse(3)
......@@ -834,6 +863,11 @@ func TestBlockPullerBadBlocks(t *testing.T) {
return resp
}
removeData := func(resp *orderer.DeliverResponse) *orderer.DeliverResponse {
resp.GetBlock().Data = nil
return resp
}
removeMetadata := func(resp *orderer.DeliverResponse) *orderer.DeliverResponse {
resp.GetBlock().Metadata = nil
return resp
......@@ -861,6 +895,11 @@ func TestBlockPullerBadBlocks(t *testing.T) {
corruptBlock: removeHeader,
expectedErrMsg: "block header is nil",
},
{
name: "nil data",
corruptBlock: removeData,
expectedErrMsg: "block data is nil",
},
{
name: "nil metadata",
corruptBlock: removeMetadata,
......@@ -910,9 +949,9 @@ func TestBlockPullerBadBlocks(t *testing.T) {
if strings.Contains(entry.Message, fmt.Sprintf("Failed pulling blocks: %s", testCase.expectedErrMsg)) {
detectedBadBlock.Done()
// Close the channel to make the current server-side deliver stream close
close(osn.blockResponses)
close(osn.blocks())
// Ane reset the block buffer to be able to write into it again
osn.blockResponses = make(chan *orderer.DeliverResponse, 100)
osn.setBlocks(make(chan *orderer.DeliverResponse, 100))
// Put a correct block after it, 1 for the probing and 1 for the fetch
osn.enqueueResponse(10)
osn.enqueueResponse(10)
......
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import mock "github.com/stretchr/testify/mock"
// ChannelLister is an autogenerated mock type for the ChannelLister type
type ChannelLister struct {
mock.Mock
}
// Channels provides a mock function with given fields:
func (_m *ChannelLister) Channels() []string {
ret := _m.Called()
var r0 []string
if rf, ok := ret.Get(0).(func() []string); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]string)
}
}
return r0
}
// Close provides a mock function with given fields:
func (_m *ChannelLister) Close() {
_m.Called()
}
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import cluster "github.com/hyperledger/fabric/orderer/common/cluster"
import mock "github.com/stretchr/testify/mock"
// LedgerFactory is an autogenerated mock type for the LedgerFactory type
type LedgerFactory struct {
mock.Mock
}
// ChainIDs provides a mock function with given fields:
func (_m *LedgerFactory) ChainIDs() []string {
ret := _m.Called()
var r0 []string
if rf, ok := ret.Get(0).(func() []string); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]string)
}
}
return r0
}
// Close provides a mock function with given fields:
func (_m *LedgerFactory) Close() {
_m.Called()
}
// GetOrCreate provides a mock function with given fields: chainID
func (_m *LedgerFactory) GetOrCreate(chainID string) (cluster.LedgerWriter, error) {
ret := _m.Called(chainID)
var r0 cluster.LedgerWriter
if rf, ok := ret.Get(0).(func(string) cluster.LedgerWriter); ok {
r0 = rf(chainID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(cluster.LedgerWriter)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(chainID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import common "github.com/hyperledger/fabric/protos/common"
import mock "github.com/stretchr/testify/mock"
// LedgerWriter is an autogenerated mock type for the LedgerWriter type
type LedgerWriter struct {
mock.Mock
}
// Append provides a mock function with given fields: block
func (_m *LedgerWriter) Append(block *common.Block) error {
ret := _m.Called(block)
var r0 error
if rf, ok := ret.Get(0).(func(*common.Block) error); ok {
r0 = rf(block)
} else {
r0 = ret.Error(0)
}
return r0
}
// Height provides a mock function with given fields:
func (_m *LedgerWriter) Height() uint64 {
ret := _m.Called()
var r0 uint64
if rf, ok := ret.Get(0).(func() uint64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(uint64)
}
return r0
}
......@@ -7,13 +7,16 @@ SPDX-License-Identifier: Apache-2.0
package cluster
import (
"bytes"
"encoding/base64"
"encoding/hex"
"encoding/pem"
"time"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
......@@ -24,6 +27,201 @@ const (
RetryTimeout = time.Second * 10
)
// PullerConfigFromTopLevelConfig creates a PullerConfig from a TopLevel config,
// and from a signer and TLS key cert pair.
// The PullerConfig's channel is initialized to be the system channel.
func PullerConfigFromTopLevelConfig(conf *localconfig.TopLevel, tlsKey, tlsCert []byte, signer crypto.LocalSigner) PullerConfig {
return PullerConfig{
Channel: conf.General.SystemChannel,
MaxTotalBufferBytes: conf.General.Cluster.ReplicationBufferSize,
Timeout: conf.General.Cluster.RPCTimeout,
TLSKey: tlsKey,
TLSCert: tlsCert,
Signer: signer,
}
}
//go:generate mockery -dir . -name LedgerWriter -case underscore -output mocks/
// LedgerWriter allows the caller to write blocks and inspect the height
type LedgerWriter interface {
// Append a new block to the ledger
Append(block *common.Block) error
// Height returns the number of blocks on the ledger
Height() uint64
}
//go:generate mockery -dir . -name LedgerFactory -case underscore -output mocks/
// LedgerFactory retrieves or creates new ledgers by chainID
type LedgerFactory interface {
// GetOrCreate gets an existing ledger (if it exists)
// or creates it if it does not
GetOrCreate(chainID string) (LedgerWriter, error)
// Close releases all resources acquired by the factory
Close()
}
//go:generate mockery -dir . -name ChannelLister -case underscore -output mocks/
// ChannelLister returns a list of channels
type ChannelLister interface {
// Channels returns a list of channels
Channels() []string
// Close closes the ChannelLister
Close()
}
// Replicator replicates chains
type Replicator struct {
SystemChannel string
ChannelLister ChannelLister
Logger *flogging.FabricLogger
Puller *BlockPuller
BootBlock *common.Block
AmIPartOfChannel selfMembershipPredicate
LedgerFactory LedgerFactory
}
// IsReplicationNeeded returns whether replication is needed,
// or the cluster node can resume standard boot flow.
func (r *Replicator) IsReplicationNeeded() (bool, error) {
defer r.LedgerFactory.Close()
systemChannelLedger, err := r.LedgerFactory.GetOrCreate(r.SystemChannel)
if err != nil {
return false, err
}
lastBlockSeq := systemChannelLedger.Height() - 1
if r.BootBlock.Header.Number > lastBlockSeq {
return true, nil
}
return false, nil
}
// ReplicateChains pulls chains and commits them.
func (r *Replicator) ReplicateChains() {
channels := r.discoverChannels()
channels2Pull := r.channelsToPull(channels)
r.Logger.Info("Found myself in", len(channels2Pull), "channels:", channels2Pull)
for _, channel := range channels2Pull {
r.PullChannel(channel)
}
// Last, pull the system chain
r.PullChannel(r.SystemChannel)
r.LedgerFactory.Close()
}
func (r *Replicator) discoverChannels() []string {
r.Logger.Debug("Entering")
defer r.Logger.Debug("Exiting")
channels := r.ChannelLister.Channels()
r.Logger.Info("Discovered", len(channels), "channels:", channels)
r.ChannelLister.Close()
return channels
}
// PullChannel pulls the given channel from some orderer,
// and commits it to the ledger.
func (r *Replicator) PullChannel(channel string) error {
r.Logger.Info("Pulling channel", channel)
puller := r.Puller.Clone()
defer puller.Close()
puller.Channel = channel
endpoint, latestHeight := latestHeightAndEndpoint(puller)
if endpoint == "" {
return errors.Errorf("failed obtaining the latest block for channel %s", channel)
}
r.Logger.Info("Latest block height for channel", channel, "is", latestHeight)
// Ensure that if we pull the system channel, the latestHeight is bigger or equal to the
// bootstrap block of the system channel.
// Otherwise, we'd be left with a block gap.
if channel == r.SystemChannel && latestHeight-1 < r.BootBlock.Header.Number {
return errors.Errorf("latest height found among system channel(%s) orderers is %d, but the boot block's "+
"sequence is %d", r.SystemChannel, latestHeight, r.BootBlock.Header.Number)
}
return r.pullChannelBlocks(channel, puller, latestHeight)
}
func (r *Replicator) pullChannelBlocks(channel string, puller ChainPuller, latestHeight uint64) error {
ledger, err := r.LedgerFactory.GetOrCreate(channel)
if err != nil {
r.Logger.Panicf("Failed to create a ledger for channel %s: %v", channel, err)
}
// Pull the genesis block and remember its hash.
genesisBlock := puller.PullBlock(0)
r.appendBlock(genesisBlock, ledger)
actualPrevHash := genesisBlock.Header.Hash()
for seq := uint64(1); seq < latestHeight; seq++ {
block := puller.PullBlock(seq)
reportedPrevHash := block.Header.PreviousHash
if !bytes.Equal(reportedPrevHash, actualPrevHash) {
return errors.Errorf("block header mismatch on sequence %d, expected %x, got %x",
block.Header.Number, actualPrevHash, reportedPrevHash)
}
actualPrevHash = block.Header.Hash()
if channel == r.SystemChannel && block.Header.Number == r.BootBlock.Header.Number {
r.compareBootBlockWithSystemChannelLastConfigBlock(block)
r.appendBlock(block, ledger)
// No need to pull further blocks from the system channel
return nil
}
r.appendBlock(block, ledger)
}
return nil
}
func (r *Replicator) appendBlock(block *common.Block, ledger LedgerWriter) {
if err := ledger.Append(block); err != nil {
r.Logger.Panicf("Failed to write block %d: %v", block.Header.Number, err)
}
}
func (r *Replicator) compareBootBlockWithSystemChannelLastConfigBlock(block *common.Block) {
// Overwrite the received block's data hash
block.Header.DataHash = block.Data.Hash()
bootBlockHash := r.BootBlock.Header.Hash()
retrievedBlockHash := block.Header.Hash()
if bytes.Equal(bootBlockHash, retrievedBlockHash) {
return
}
r.Logger.Panicf("Block header mismatch on last system channel block, expected %s, got %s",
hex.EncodeToString(bootBlockHash), hex.EncodeToString(retrievedBlockHash))
}
func (r *Replicator) channelsToPull(channels []string) []string {
r.Logger.Info("Will now pull channels:", channels)
var channelsToPull []string
for _, channel := range channels {
r.Logger.Info("Pulling chain for", channel)
puller := r.Puller.Clone()
puller.Channel = channel
// Disable puller buffering when we check whether we are in the channel,
// as we only need to know about a single block.
bufferSize := puller.MaxTotalBufferBytes
puller.MaxTotalBufferBytes = 1
err := Participant(puller, r.AmIPartOfChannel)
puller.Close()
// Restore the previous buffer size
puller.MaxTotalBufferBytes = bufferSize
if err == NotInChannelError {
r.Logger.Info("I do not belong to channel", channel, ", skipping chain retrieval")
continue
}
if err != nil {
r.Logger.Panicf("Failed classifying whether I belong to channel %s: %v, skipping chain retrieval", channel, err)
continue
}
channelsToPull = append(channelsToPull, channel)
}
return channelsToPull
}
// PullerConfig configures a BlockPuller.
type PullerConfig struct {
TLSKey []byte
......@@ -123,13 +321,15 @@ func Participant(puller ChainPuller, analyzeLastConfBlock selfMembershipPredicat
if endpoint == "" {
return errors.New("no available orderer")
}
lastBlock := puller.PullBlock(latestHeight - 1)
lastConfNumber, err := lastConfigFromBlock(lastBlock)
if err != nil {
return err
}
// The last config block is smaller than the latest height,
// and a block iterator on the server side is a sequenced one.
// So we need to reset the puller if we wish to pull an earlier block.
puller.Close()
lastConfigBlock := puller.PullBlock(lastConfNumber)
return analyzeLastConfBlock(lastConfigBlock)
}
......
......@@ -20,13 +20,430 @@ import (
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/cluster/mocks"
"github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/msp"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestIsReplicationNeeded(t *testing.T) {
for _, testCase := range []struct {
name string
bootBlock *common.Block
systemChannelHeight uint64
systemChannelError error
expectedError string
replicationNeeded bool
}{
{