Commit c48af694 authored by Gari Singh's avatar Gari Singh Committed by Gerrit Code Review
Browse files

Merge "[FAB-12354] Optimistic chain creation in etcd/raft"

parents f6562293 c0f21330
/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package etcdraft
import (
"bytes"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/flogging"
cb "github.com/hyperledger/fabric/protos/common"
)
// This governs the max number of created blocks in-flight; i.e. blocks
// that were created but not written.
// CreateNextBLock returns nil once this number of blocks are in-flight.
const createdBlocksBuffersize = 20
// blockCreator optimistically creates blocks in a chain. The created
// blocks may not be written out eventually. This enables us to pipeline
// the creation of blocks with achieving consensus on them leading to
// performance improvements. The created chain is discarded if a
// diverging block is committed
// To safely use blockCreator, only one thread should interact with it.
type blockCreator struct {
CreatedBlocks chan *cb.Block
LastCreatedBlock *cb.Block
LastCommittedBlock *cb.Block
logger *flogging.FabricLogger
}
func newBlockCreator(lastBlock *cb.Block, logger *flogging.FabricLogger) *blockCreator {
if lastBlock == nil {
logger.Panic("block creator initialized with nil last block")
}
bc := &blockCreator{
CreatedBlocks: make(chan *cb.Block, createdBlocksBuffersize),
LastCreatedBlock: lastBlock,
LastCommittedBlock: lastBlock,
logger: logger,
}
logger.Debugf("Initialized block creator with (lastblockNumber=%d)", lastBlock.Header.Number)
return bc
}
// CreateNextBlock creates a new block with the next block number, and the
// given contents.
// Returns the created block if the block could be created else nil.
// It can fail when the there are `createdBlocksBuffersize` blocks already
// created and no more can be accomodated in the `CreatedBlocks` channel.
func (bc *blockCreator) createNextBlock(messages []*cb.Envelope) *cb.Block {
previousBlockHash := bc.LastCreatedBlock.Header.Hash()
data := &cb.BlockData{
Data: make([][]byte, len(messages)),
}
var err error
for i, msg := range messages {
data.Data[i], err = proto.Marshal(msg)
if err != nil {
bc.logger.Panicf("Could not marshal envelope: %s", err)
}
}
block := cb.NewBlock(bc.LastCreatedBlock.Header.Number+1, previousBlockHash)
block.Header.DataHash = data.Hash()
block.Data = data
select {
case bc.CreatedBlocks <- block:
bc.LastCreatedBlock = block
bc.logger.Debugf("Created block %d", bc.LastCreatedBlock.Header.Number)
return block
default:
return nil
}
}
// ResetCreatedBlocks resets the queue of created blocks.
// Subsequent blocks will be created over the block that was last committed
// using CommitBlock.
func (bc *blockCreator) resetCreatedBlocks() {
// We should not recreate CreatedBlocks channel since it can lead to
// data races on its access
for len(bc.CreatedBlocks) > 0 {
// empties the channel
<-bc.CreatedBlocks
}
bc.LastCreatedBlock = bc.LastCommittedBlock
bc.logger.Debug("Reset created blocks")
}
// commitBlock should be invoked for all blocks to let the blockCreator know
// which blocks have been committed. If the committed block is divergent from
// the stack of created blocks then the stack is reset.
func (bc *blockCreator) commitBlock(block *cb.Block) {
bc.LastCommittedBlock = block
// check if the committed block diverges from the created blocks
select {
case b := <-bc.CreatedBlocks:
if !bytes.Equal(b.Header.Bytes(), block.Header.Bytes()) {
// the written block is diverging from the createBlocks stack
// discard the created blocks
bc.resetCreatedBlocks()
}
// else the written block is part of the createBlocks stack; nothing to be done
default:
// No created blocks; set this block as the last created block.
// This happens when calls to WriteBlock are being made without a CreateNextBlock being called.
// For example, in the case of etcd/raft, the leader proposes blocks and the followers
// only write the proposed blocks.
bc.LastCreatedBlock = block
}
}
/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package etcdraft
import (
"bytes"
"fmt"
"testing"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/flogging"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func getSeedBlock() *cb.Block {
seedBlock := cb.NewBlock(0, []byte("firsthash"))
seedBlock.Data.Data = [][]byte{[]byte("somebytes")}
return seedBlock
}
func TestValidCreatedBlocksQueue(t *testing.T) {
seedBlock := getSeedBlock()
logger := flogging.NewFabricLogger(zap.NewNop())
bc := newBlockCreator(seedBlock, logger)
t.Run("correct creation of a single block", func(t *testing.T) {
block := bc.createNextBlock([]*cb.Envelope{{Payload: []byte("some other bytes")}})
assert.Equal(t, seedBlock.Header.Number+1, block.Header.Number)
assert.Equal(t, block.Data.Hash(), block.Header.DataHash)
assert.Equal(t, seedBlock.Header.Hash(), block.Header.PreviousHash)
// this created block should be part of the queue of created blocks
assert.Len(t, bc.CreatedBlocks, 1)
bc.commitBlock(block)
assert.Empty(t, bc.CreatedBlocks)
assert.Equal(t, bc.LastCommittedBlock.Header.Hash(), block.Header.Hash())
})
t.Run("ResetCreatedBlocks resets the queue of created blocks", func(t *testing.T) {
numBlocks := 10
blocks := []*cb.Block{}
for i := 0; i < numBlocks; i++ {
blocks = append(blocks, bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}}))
}
bc.resetCreatedBlocks()
assert.True(t,
bytes.Equal(bc.LastCommittedBlock.Header.Bytes(), bc.LastCreatedBlock.Header.Bytes()),
"resetting the created blocks queue should leave the lastCommittedBlock and the lastCreatedBlock equal",
)
assert.Empty(t, bc.CreatedBlocks)
})
t.Run("commit of block without any created blocks sets the lastCreatedBlock correctly", func(t *testing.T) {
block := bc.createNextBlock([]*cb.Envelope{{Payload: []byte("some other bytes")}})
bc.resetCreatedBlocks()
bc.commitBlock(block)
assert.True(t,
bytes.Equal(block.Header.Bytes(), bc.LastCommittedBlock.Header.Bytes()),
"resetting the created blocks queue should leave the lastCommittedBlock and the lastCreatedBlock equal",
)
assert.True(t,
bytes.Equal(block.Header.Bytes(), bc.LastCreatedBlock.Header.Bytes()),
"resetting the created blocks queue should leave the lastCommittedBlock and the lastCreatedBlock equal",
)
assert.Empty(t, bc.CreatedBlocks)
})
t.Run("propose multiple blocks without having to commit them; tests the optimistic block creation", func(t *testing.T) {
/*
* Scenario:
* We create five blocks initially and then commit only two of them. We further create five more blocks
* and commit out the remaining 8 blocks in the propose stack. This should succeed since the committed
* blocks are not divergent from the created blocks.
*/
blocks := []*cb.Block{}
// Create five blocks without writing them out
for i := 0; i < 5; i++ {
blocks = append(blocks, bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}}))
}
assert.Len(t, bc.CreatedBlocks, 5)
// Write two of these out
for i := 0; i < 2; i++ {
bc.commitBlock(blocks[i])
}
assert.Len(t, bc.CreatedBlocks, 3)
// Create five more blocks; these should be created over the previous five blocks created
for i := 0; i < 5; i++ {
blocks = append(blocks, bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}}))
}
assert.Len(t, bc.CreatedBlocks, 8)
// Write out the remaining eight blocks; can only succeed if all the blocks were created in a single stack else will panic
for i := 2; i < 10; i++ {
bc.commitBlock(blocks[i])
}
assert.Empty(t, bc.CreatedBlocks)
// Assert that the block were indeed created in the correct sequence
for i := 0; i < 9; i++ {
assertNextBlock(t, blocks[i], blocks[i+1])
}
})
t.Run("createNextBlock returns nil after createdBlocksBuffersize blocks have been created", func(t *testing.T) {
numBlocks := createdBlocksBuffersize
blocks := []*cb.Block{}
for i := 0; i < numBlocks; i++ {
blocks = append(blocks, bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}}))
}
block := bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}})
assert.Nil(t, block)
})
t.Run("created blocks should always be over committed blocks", func(t *testing.T) {
/*
* Scenario:
* We will create
* 1. a propose stack with 5 blocks over baseLastCreatedBlock, and
* 2. an alternate block over baseLastCreatedBlock.
* We will write out this alternate block and verify that the subsequent block is created over this alternate block
* and not on the existing propose stack.
* This scenario fails if commitBlock does not flush the createdBlocks queue when the written block is divergent from the
* created blocks.
*/
baseLastCreatedBlock := bc.LastCreatedBlock
// Create the stack of five blocks without writing them out
blocks := []*cb.Block{}
for i := 0; i < 5; i++ {
blocks = append(blocks, bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}}))
}
// create and write out the alternate block
alternateBlock := createBlockOverSpecifiedBlock(baseLastCreatedBlock, []*cb.Envelope{{Payload: []byte("alternate test envelope")}})
bc.commitBlock(alternateBlock)
// assert that createNextBlock creates the next block over this alternateBlock
createdBlockOverAlternateBlock := bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}})
synthesizedBlockOverAlternateBlock := createBlockOverSpecifiedBlock(alternateBlock, []*cb.Envelope{{Payload: []byte("test envelope")}})
assert.True(t,
bytes.Equal(createdBlockOverAlternateBlock.Header.Bytes(), synthesizedBlockOverAlternateBlock.Header.Bytes()),
"created and synthesized blocks should be equal",
)
bc.commitBlock(createdBlockOverAlternateBlock)
})
}
func createBlockOverSpecifiedBlock(baseBlock *cb.Block, messages []*cb.Envelope) *cb.Block {
previousBlockHash := baseBlock.Header.Hash()
data := &cb.BlockData{
Data: make([][]byte, len(messages)),
}
var err error
for i, msg := range messages {
data.Data[i], err = proto.Marshal(msg)
if err != nil {
panic(fmt.Sprintf("Could not marshal envelope: %s", err))
}
}
block := cb.NewBlock(baseBlock.Header.Number+1, previousBlockHash)
block.Header.DataHash = data.Hash()
block.Data = data
return block
}
// isNextBlock returns true if `nextBlock` is correctly formed as the next block
// following `block` in a chain.
func assertNextBlock(t *testing.T, block, nextBlock *cb.Block) {
assert.Equal(t, block.Header.Number+1, nextBlock.Header.Number)
assert.Equal(t, block.Header.Hash(), nextBlock.Header.PreviousHash)
}
......@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package etcdraft
import (
"bytes"
"context"
"encoding/pem"
"fmt"
......@@ -120,7 +121,8 @@ type Chain struct {
clock clock.Clock // Tests can inject a fake clock
support consensus.ConsenterSupport
support consensus.ConsenterSupport
BlockCreator *blockCreator
leader uint64
appliedIndex uint64
......@@ -173,6 +175,8 @@ func NewChain(
snapBlkNum = b.Header.Number
}
lastBlock := support.Block(support.Height() - 1)
return &Chain{
configurator: conf,
rpc: rpc,
......@@ -190,6 +194,7 @@ func NewChain(
observeC: observeC,
support: support,
fresh: fresh,
BlockCreator: newBlockCreator(lastBlock, lg),
appliedIndex: appliedi,
lastSnapBlockNum: snapBlkNum,
puller: puller,
......@@ -453,6 +458,7 @@ func (c *Chain) serveRequest() {
case <-c.resignC:
_ = c.support.BlockCutter().Cut()
c.BlockCreator.resetCreatedBlocks()
stop()
case <-timer.C():
......@@ -483,6 +489,7 @@ func (c *Chain) serveRequest() {
}
func (c *Chain) writeBlock(b block) {
c.BlockCreator.commitBlock(b.b)
if utils.IsConfigBlock(b.b) {
if err := c.writeConfigBlock(b); err != nil {
c.logger.Panicf("failed to write configuration block, %+v", err)
......@@ -536,20 +543,32 @@ func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]*common.Envelop
func (c *Chain) commitBatches(batches ...[]*common.Envelope) error {
for _, batch := range batches {
b := c.support.CreateNextBlock(batch)
b := c.BlockCreator.createNextBlock(batch)
data := utils.MarshalOrPanic(b)
if err := c.node.Propose(context.TODO(), data); err != nil {
return errors.Errorf("failed to propose data to Raft node: %s", err)
}
select {
case block := <-c.commitC:
c.writeBlock(block)
case <-c.resignC:
return errors.Errorf("aborted block committing: lost leadership")
// if it is config block, then wait for the commit of the block
if utils.IsConfigBlock(b) {
// we need the loop to account for the normal blocks that might be in-flight before the arrival of the config block
commitConfigLoop:
for {
select {
case block := <-c.commitC:
c.writeBlock(block)
// since this is the config block that have been looking for, we break out of the loop
if bytes.Equal(b.Header.Bytes(), block.b.Header.Bytes()) {
break commitConfigLoop
}
case <-c.doneC:
return nil
case <-c.resignC:
return errors.Errorf("aborted block committing: lost leadership")
case <-c.doneC:
return nil
}
}
}
}
......@@ -584,6 +603,7 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
return errors.Errorf("failed to fetch block %d from cluster", next)
}
c.BlockCreator.commitBlock(block)
if utils.IsConfigBlock(block) {
c.support.WriteConfigBlock(block, nil)
} else {
......
This diff is collapsed.
......@@ -56,6 +56,9 @@ var _ = Describe("Consenter", func() {
proto.Unmarshal(blockBytes, goodConfigBlock)
lastBlock := &common.Block{
Header: &common.BlockHeader{
Number: 1,
},
Data: goodConfigBlock.Data,
Metadata: &common.BlockMetadata{
Metadata: [][]byte{{}, utils.MarshalOrPanic(&common.Metadata{
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment