Unverified Commit 0da0ecee authored by yacovm's avatar yacovm Committed by Artem Barger
Browse files

[FAB-13465] Max retry attempts for orderer replication



This change set adds an option to configure the block puller
used for the replication with a maximum retry attempts.

It is needed because during onboarding, a specific application channel
might become unavailable, but it shouldn't block onboarding now when
we have dynamic periodical onboarding for channels we were unable to join.

Change-Id: I12f4247040c258809885f0e5fdc07d60914a56e2
Signed-off-by: default avataryacovm <yacovm@il.ibm.com>
parent 4bc13c8e
......@@ -29,6 +29,7 @@ import (
// Its operations are not thread safe.
type BlockPuller struct {
// Configuration
MaxPullBlockRetries uint64
MaxTotalBufferBytes int
Signer crypto.LocalSigner
TLSCert []byte
......@@ -80,13 +81,21 @@ func (p *BlockPuller) Close() {
}
// PullBlock blocks until a block with the given sequence is fetched
// from some remote ordering node.
// from some remote ordering node, or until consecutive failures
// of fetching the block exceeds MaxPullBlockRetries.
func (p *BlockPuller) PullBlock(seq uint64) *common.Block {
retriesLeft := p.MaxPullBlockRetries
for {
block := p.tryFetchBlock(seq)
if block != nil {
return block
}
retriesLeft--
if retriesLeft == 0 && p.MaxPullBlockRetries > 0 {
p.Logger.Errorf("Failed pulling block %d: retry count exhausted(%d)", seq, p.MaxPullBlockRetries)
return nil
}
time.Sleep(p.RetryTimeout)
}
}
......
......@@ -1021,3 +1021,74 @@ func TestImpatientStreamFailure(t *testing.T) {
_, err = stream.Recv()
assert.Error(t, err)
}
func TestBlockPullerMaxRetriesExhausted(t *testing.T) {
// Scenario:
// The block puller is expected to pull blocks 1 to 3.
// But the orderer only has blocks 1,2, and from some reason
// it sends back block 2 twice (we do this so that we
// don't rely on timeout, because timeouts are flaky in tests).
// It should attempt to re-connect and to send requests
// until the attempt number is exhausted, after which
// it gives up, and nil is returned.
osn := newClusterNode(t)
defer osn.stop()
// We report having up to block 3.
osn.enqueueResponse(3)
osn.addExpectProbeAssert()
// We send blocks 1
osn.addExpectPullAssert(1)
osn.enqueueResponse(1)
// And 2, twice.
osn.enqueueResponse(2)
osn.enqueueResponse(2)
// A nil message signals the deliver stream closes.
// This is to signal the server side to prepare for a new deliver
// stream that the client should open.
osn.blockResponses <- nil
for i := 0; i < 2; i++ {
// Therefore, the block puller should disconnect and reconnect.
osn.addExpectProbeAssert()
// We report having up to block 3.
osn.enqueueResponse(3)
// And we expect to be asked for block 3, since blocks 1, 2
// have already been passed to the caller.
osn.addExpectPullAssert(3)
// Once again, we send 2 instead of 3
osn.enqueueResponse(2)
// The client disconnects again
osn.blockResponses <- nil
}
dialer := newCountingDialer()
bp := newBlockPuller(dialer, osn.srv.Address())
var exhaustedRetryAttemptsLogged bool
bp.Logger = bp.Logger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error {
if entry.Message == "Failed pulling block 3: retry count exhausted(2)" {
exhaustedRetryAttemptsLogged = true
}
return nil
}))
bp.MaxPullBlockRetries = 2
// We don't expect to timeout in this test, so make the timeout large
// to prevent flakes due to CPU starvation.
bp.FetchTimeout = time.Hour
// Make the buffer tiny, only a single byte - in order deliver blocks
// to the caller one by one and not store them in the buffer.
bp.MaxTotalBufferBytes = 1
// Assert reception of blocks 1 to 3
assert.Equal(t, uint64(1), bp.PullBlock(uint64(1)).Header.Number)
assert.Equal(t, uint64(2), bp.PullBlock(uint64(2)).Header.Number)
assert.Nil(t, bp.PullBlock(uint64(3)))
bp.Close()
dialer.assertAllConnectionsClosed(t)
assert.True(t, exhaustedRetryAttemptsLogged)
}
......@@ -194,11 +194,17 @@ func (r *Replicator) pullChannelBlocks(channel string, puller ChainPuller, lates
}
// Pull the genesis block and remember its hash.
genesisBlock := puller.PullBlock(0)
if genesisBlock == nil {
return ErrRetryCountExhausted
}
r.appendBlockIfNeeded(genesisBlock, ledger, channel)
actualPrevHash := genesisBlock.Header.Hash()
for seq := uint64(1); seq < latestHeight; seq++ {
block := puller.PullBlock(seq)
if block == nil {
return ErrRetryCountExhausted
}
reportedPrevHash := block.Header.PreviousHash
if !bytes.Equal(reportedPrevHash, actualPrevHash) {
return errors.Errorf("block header mismatch on sequence %d, expected %x, got %x",
......@@ -274,6 +280,12 @@ func (r *Replicator) channelsToPull(channels GenesisBlocks) channelPullHints {
channelsNotToPull = append(channelsNotToPull, channel)
continue
}
if err == ErrRetryCountExhausted {
r.Logger.Warningf("Could not obtain blocks needed for classifying whether I am in the channel,"+
"skipping the retrieval of the chan %s", channel.ChannelName)
channelsNotToPull = append(channelsNotToPull, channel)
continue
}
if err != nil {
if !r.DoNotPanicIfClusterNotReachable {
r.Logger.Panicf("Failed classifying whether I belong to channel %s: %v, skipping chain retrieval", channel.ChannelName, err)
......@@ -384,6 +396,8 @@ var ErrServiceUnavailable = errors.New("service unavailable")
// ErrNotInChannel denotes that an ordering node is not in the channel
var ErrNotInChannel = errors.New("not in the channel")
var ErrRetryCountExhausted = errors.New("retry attempts exhausted")
// selfMembershipPredicate determines whether the caller is found in the given config block
type selfMembershipPredicate func(configBlock *common.Block) error
......@@ -398,9 +412,12 @@ func Participant(puller ChainPuller, analyzeLastConfBlock selfMembershipPredicat
return err
}
if endpoint == "" {
return errors.New("no available orderer")
return ErrRetryCountExhausted
}
lastBlock := puller.PullBlock(latestHeight - 1)
if lastBlock == nil {
return ErrRetryCountExhausted
}
lastConfNumber, err := lastConfigFromBlock(lastBlock)
if err != nil {
return err
......@@ -410,6 +427,9 @@ func Participant(puller ChainPuller, analyzeLastConfBlock selfMembershipPredicat
// So we need to reset the puller if we wish to pull an earlier block.
puller.Close()
lastConfigBlock := puller.PullBlock(lastConfNumber)
if lastConfigBlock == nil {
return ErrRetryCountExhausted
}
return analyzeLastConfBlock(lastConfigBlock)
}
......@@ -469,6 +489,9 @@ func (ci *ChainInspector) Channels() []ChannelGenesisBlock {
var prevHash []byte
for seq := uint64(1); seq < lastConfigBlockNum; seq++ {
block = ci.Puller.PullBlock(seq)
if block == nil {
ci.Logger.Panicf("Failed pulling block %d from the system channel", seq)
}
ci.validateHashPointer(block, prevHash)
channel, err := IsNewChannelBlock(block)
if err != nil {
......
......@@ -276,6 +276,78 @@ func TestReplicateChainsFailures(t *testing.T) {
}
}
func TestPullChannelFailure(t *testing.T) {
blockchain := createBlockChain(0, 5)
for _, testcase := range []struct {
name string
genesisBlockSequence int
thirdBlockSequence int
}{
{
name: "Failed to pull genesis block",
genesisBlockSequence: 1,
},
{
name: "Failed to pull some non genesis block",
genesisBlockSequence: 0,
thirdBlockSequence: 0,
},
} {
t.Run(testcase.name, func(t *testing.T) {
lw := &mocks.LedgerWriter{}
lw.On("Append", mock.Anything).Return(nil)
lw.On("Height").Return(uint64(0))
lf := &mocks.LedgerFactory{}
lf.On("GetOrCreate", "mychannel").Return(lw, nil)
osn := newClusterNode(t)
defer osn.stop()
enqueueBlock := func(seq int) {
osn.blockResponses <- &orderer.DeliverResponse{
Type: &orderer.DeliverResponse_Block{
Block: blockchain[seq],
},
}
}
dialer := newCountingDialer()
bp := newBlockPuller(dialer, osn.srv.Address())
// Put a big timeout, to reduce chance of flakes when the server gets stuck
// and we get an un-called for timeout.
bp.FetchTimeout = time.Hour
bp.MaxPullBlockRetries = 1
// Do not buffer blocks in memory
bp.MaxTotalBufferBytes = 1
r := cluster.Replicator{
Filter: cluster.AnyChannel,
AmIPartOfChannel: func(configBlock *common.Block) error {
return nil
},
Logger: flogging.MustGetLogger("test"),
SystemChannel: "system",
LedgerFactory: lf,
Puller: bp,
}
osn.addExpectProbeAssert()
enqueueBlock(5)
osn.addExpectProbeAssert()
enqueueBlock(5)
osn.addExpectPullAssert(0)
enqueueBlock(testcase.genesisBlockSequence)
enqueueBlock(1)
enqueueBlock(testcase.thirdBlockSequence)
err := r.PullChannel("mychannel")
assert.Equal(t, cluster.ErrRetryCountExhausted, err)
})
}
}
func TestPullerConfigFromTopLevelConfig(t *testing.T) {
signer := &crypto.LocalSigner{}
expected := cluster.PullerConfig{
......@@ -363,7 +435,7 @@ func TestReplicateChainsChannelClassificationFailure(t *testing.T) {
}
func TestReplicateChainsGreenPath(t *testing.T) {
// Scenario: There are 4 channels in the system: A and B.
// Scenario: There are 5 channels in the system: A-E.
// We are in channel A but not in channel B, therefore
// we should pull channel A and then the system channel.
// However, this is not the first attempt of replication for
......@@ -374,6 +446,8 @@ func TestReplicateChainsGreenPath(t *testing.T) {
// For channel C - we are forbidden from pulling any blocks.
// Channel D is a deserted channel - all OSNs have left it,
// therefore we should not pull it at all.
// Channel E cannot be pulled at all, due to the OSN being unavailable
// at that time.
systemChannelBlocks := createBlockChain(0, 21)
block30WithConfigBlockOf21 := common.NewBlock(30, nil)
......@@ -388,9 +462,11 @@ func TestReplicateChainsGreenPath(t *testing.T) {
dialer := newCountingDialer()
bp := newBlockPuller(dialer, osn.srv.Address())
bp.FetchTimeout = time.Hour
bp.MaxPullBlockRetries = 1
channelLister := &mocks.ChannelLister{}
channelLister.On("Channels").Return([]cluster.ChannelGenesisBlock{
{ChannelName: "E", GenesisBlock: fakeGB},
{ChannelName: "D", GenesisBlock: fakeGB}, {ChannelName: "C", GenesisBlock: fakeGB},
{ChannelName: "A"}, {ChannelName: "B", GenesisBlock: fakeGB},
})
......@@ -398,15 +474,16 @@ func TestReplicateChainsGreenPath(t *testing.T) {
amIPartOfChannelMock := &mock.Mock{}
// For channel A
amIPartOfChannelMock.On("func11").Return(nil).Once()
amIPartOfChannelMock.On("func13").Return(nil).Once()
// For channel B
amIPartOfChannelMock.On("func11").Return(cluster.ErrNotInChannel).Once()
amIPartOfChannelMock.On("func13").Return(cluster.ErrNotInChannel).Once()
// 22 is for the system channel, and 31 is for channel A, and for channel B we only need 1 block (the GB).
blocksCommittedToLedgerA := make(chan *common.Block, 31)
blocksCommittedToLedgerB := make(chan *common.Block, 1)
blocksCommittedToLedgerC := make(chan *common.Block, 1)
blocksCommittedToLedgerD := make(chan *common.Block, 1)
blocksCommittedToLedgerE := make(chan *common.Block, 1)
blocksCommittedToSystemLedger := make(chan *common.Block, 22)
// Put 10 blocks in the ledger of channel A, to simulate
// that the ledger had blocks when the node started.
......@@ -448,6 +525,14 @@ func TestReplicateChainsGreenPath(t *testing.T) {
blocksCommittedToLedgerD <- arg.Get(0).(*common.Block)
})
lwE := &mocks.LedgerWriter{}
lwE.On("Height").Return(func() uint64 {
return uint64(len(blocksCommittedToLedgerE))
})
lwE.On("Append", mock.Anything).Return(nil).Run(func(arg mock.Arguments) {
blocksCommittedToLedgerE <- arg.Get(0).(*common.Block)
})
lwSystem := &mocks.LedgerWriter{}
lwSystem.On("Append", mock.Anything).Return(nil).Run(func(arg mock.Arguments) {
blocksCommittedToSystemLedger <- arg.Get(0).(*common.Block)
......@@ -462,6 +547,7 @@ func TestReplicateChainsGreenPath(t *testing.T) {
lf.On("GetOrCreate", "B").Return(lwB, nil)
lf.On("GetOrCreate", "C").Return(lwC, nil)
lf.On("GetOrCreate", "D").Return(lwD, nil)
lf.On("GetOrCreate", "E").Return(lwE, nil)
lf.On("GetOrCreate", "system").Return(lwSystem, nil)
r := cluster.Replicator{
......@@ -477,7 +563,17 @@ func TestReplicateChainsGreenPath(t *testing.T) {
BootBlock: systemChannelBlocks[21],
}
// The first thing the orderer gets is a seek to channel D,
// The first thing the orderer gets is a seek to channel E.
// Unfortunately, it's not available!
osn.seekAssertions <- func(info *orderer.SeekInfo, actualChannel string) {
// Ensure the seek came to the right channel
assert.NotNil(osn.t, info.GetStart().GetNewest())
assert.Equal(t, "E", actualChannel)
}
// Send an EOF down the stream.
osn.blockResponses <- nil
// The second thing the orderer gets is a seek to channel D,
// which is followed by a response of service unavailable
osn.seekAssertions <- func(info *orderer.SeekInfo, actualChannel string) {
// Ensure the seek came to the right channel
......@@ -490,7 +586,7 @@ func TestReplicateChainsGreenPath(t *testing.T) {
},
}
// The second thing the orderer gets is a seek to channel C,
// The third thing the orderer gets is a seek to channel C,
// which is followed by a response of forbidden
osn.seekAssertions <- func(info *orderer.SeekInfo, actualChannel string) {
// Ensure the seek came to the right channel
......@@ -600,6 +696,7 @@ func TestReplicateChainsGreenPath(t *testing.T) {
assert.Len(t, blocksCommittedToLedgerB, 1)
assert.Len(t, blocksCommittedToLedgerC, 1)
assert.Len(t, blocksCommittedToLedgerD, 1)
assert.Len(t, blocksCommittedToLedgerE, 1)
// Count the blocks for channel A
var expectedSequence uint64
for block := range blocksCommittedToLedgerA {
......@@ -632,7 +729,7 @@ func TestParticipant(t *testing.T) {
}{
{
name: "No available orderer",
expectedError: "no available orderer",
expectedError: cluster.ErrRetryCountExhausted.Error(),
},
{
name: "Unauthorized for the channel",
......@@ -699,6 +796,34 @@ func TestParticipant(t *testing.T) {
latestConfigBlock: &common.Block{Header: &common.BlockHeader{Number: 42}},
predicateReturns: cluster.ErrNotInChannel,
},
{
name: "Failed pulling last block",
expectedError: cluster.ErrRetryCountExhausted.Error(),
heightsByEndpoints: map[string]uint64{
"orderer.example.com:7050": 100,
},
latestBlockSeq: uint64(99),
latestBlock: nil,
},
{
name: "Failed pulling last config block",
expectedError: cluster.ErrRetryCountExhausted.Error(),
heightsByEndpoints: map[string]uint64{
"orderer.example.com:7050": 100,
},
latestBlockSeq: uint64(99),
latestBlock: &common.Block{
Metadata: &common.BlockMetadata{
Metadata: [][]byte{{1, 2, 3}, utils.MarshalOrPanic(&common.Metadata{
Value: utils.MarshalOrPanic(&common.LastConfig{
Index: 42,
}),
})},
},
},
latestConfigBlockSeq: 42,
latestConfigBlock: nil,
},
} {
t.Run(testCase.name, func(t *testing.T) {
configBlocks := make(chan *common.Block, 1)
......@@ -1259,6 +1384,21 @@ func TestChannels(t *testing.T) {
})
},
},
{
name: "bad path - failed pulling blocks",
prepareSystemChain: func(systemChain []*common.Block) {
assignHashes(systemChain)
// Setting a block to nil makes the block puller return nil,
// which signals failure of pulling a block.
systemChain[len(systemChain)/2] = nil
},
assertion: func(t *testing.T, ci *cluster.ChainInspector) {
panicValue := "Failed pulling block 3 from the system channel"
assert.PanicsWithValue(t, panicValue, func() {
ci.Channels()
})
},
},
} {
t.Run(testCase.name, func(t *testing.T) {
systemChain := []*common.Block{
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment