Unverified Commit ff843afd authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13178] A dumb version of etcdraft BlockCreator



This CR rewrites BlockCreator so that it doesn't return nil block.

BEFORE:
blockcreator holds a channel of created blocks, which is buffered
with size of createdBlocksBuffersize (default 20). It also stores
the hash and number of latest block.

When requested to create new block, blockcreator does so
by assembling a block based on that hash and number, enque the
block to buffered channel. If channel is full, a nil is returned.

When commit a block, it drains the channel. If there's nothing in
the channel, it implies the blockcreator is manipulated by raft
follower, therefore blockreator simply updates hash and number.

NOW:
what we need is actually as simple as: a blockcreator holds the
hash and number of latest block. When it is requested to create
a block, it just uses that hash and number to assemble one.
And ONLY raft leader holds a blockcreator. Followers blindly
commit whatever comes from consensus. When a follower is elected
as new leader, it simply looks up the ledger, find hash and number
of latest block, and creates a new blockcreator.

Change-Id: I226ee34d666fbb1e8d034dc22ea6800df993f7a4
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent f28884a4
......@@ -7,114 +7,39 @@ SPDX-License-Identifier: Apache-2.0
package etcdraft
import (
"bytes"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/flogging"
cb "github.com/hyperledger/fabric/protos/common"
)
// This governs the max number of created blocks in-flight; i.e. blocks
// that were created but not written.
// CreateNextBLock returns nil once this number of blocks are in-flight.
const createdBlocksBuffersize = 20
// blockCreator optimistically creates blocks in a chain. The created
// blocks may not be written out eventually. This enables us to pipeline
// the creation of blocks with achieving consensus on them leading to
// performance improvements. The created chain is discarded if a
// diverging block is committed
// To safely use blockCreator, only one thread should interact with it.
// blockCreator holds number and hash of latest block
// so that next block will be created based on it.
type blockCreator struct {
CreatedBlocks chan *cb.Block
LastCreatedBlock *cb.Block
LastCommittedBlock *cb.Block
logger *flogging.FabricLogger
}
func newBlockCreator(lastBlock *cb.Block, logger *flogging.FabricLogger) *blockCreator {
if lastBlock == nil {
logger.Panic("block creator initialized with nil last block")
}
bc := &blockCreator{
CreatedBlocks: make(chan *cb.Block, createdBlocksBuffersize),
LastCreatedBlock: lastBlock,
LastCommittedBlock: lastBlock,
logger: logger,
}
hash []byte
number uint64
logger.Debugf("Initialized block creator with (lastblockNumber=%d)", lastBlock.Header.Number)
return bc
logger *flogging.FabricLogger
}
// CreateNextBlock creates a new block with the next block number, and the
// given contents.
// Returns the created block if the block could be created else nil.
// It can fail when the there are `createdBlocksBuffersize` blocks already
// created and no more can be accomodated in the `CreatedBlocks` channel.
func (bc *blockCreator) createNextBlock(messages []*cb.Envelope) *cb.Block {
previousBlockHash := bc.LastCreatedBlock.Header.Hash()
func (bc *blockCreator) createNextBlock(envs []*cb.Envelope) *cb.Block {
data := &cb.BlockData{
Data: make([][]byte, len(messages)),
Data: make([][]byte, len(envs)),
}
var err error
for i, msg := range messages {
data.Data[i], err = proto.Marshal(msg)
for i, env := range envs {
data.Data[i], err = proto.Marshal(env)
if err != nil {
bc.logger.Panicf("Could not marshal envelope: %s", err)
}
}
block := cb.NewBlock(bc.LastCreatedBlock.Header.Number+1, previousBlockHash)
block := cb.NewBlock(bc.number+1, bc.hash)
block.Header.DataHash = data.Hash()
block.Data = data
select {
case bc.CreatedBlocks <- block:
bc.LastCreatedBlock = block
bc.logger.Debugf("Created block %d", bc.LastCreatedBlock.Header.Number)
return block
default:
return nil
}
}
// ResetCreatedBlocks resets the queue of created blocks.
// Subsequent blocks will be created over the block that was last committed
// using CommitBlock.
func (bc *blockCreator) resetCreatedBlocks() {
// We should not recreate CreatedBlocks channel since it can lead to
// data races on its access
for len(bc.CreatedBlocks) > 0 {
// empties the channel
<-bc.CreatedBlocks
}
bc.LastCreatedBlock = bc.LastCommittedBlock
bc.logger.Debug("Reset created blocks")
}
bc.hash = block.Header.Hash()
bc.number++
// commitBlock should be invoked for all blocks to let the blockCreator know
// which blocks have been committed. If the committed block is divergent from
// the stack of created blocks then the stack is reset.
func (bc *blockCreator) commitBlock(block *cb.Block) {
bc.LastCommittedBlock = block
// check if the committed block diverges from the created blocks
select {
case b := <-bc.CreatedBlocks:
if !bytes.Equal(b.Header.Bytes(), block.Header.Bytes()) {
// the written block is diverging from the createBlocks stack
// discard the created blocks
bc.resetCreatedBlocks()
}
// else the written block is part of the createBlocks stack; nothing to be done
default:
// No created blocks; set this block as the last created block.
// This happens when calls to WriteBlock are being made without a CreateNextBlock being called.
// For example, in the case of etcd/raft, the leader proposes blocks and the followers
// only write the proposed blocks.
bc.LastCreatedBlock = block
}
return block
}
......@@ -7,11 +7,8 @@ SPDX-License-Identifier: Apache-2.0
package etcdraft
import (
"bytes"
"fmt"
"testing"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/flogging"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/stretchr/testify/assert"
......@@ -24,169 +21,21 @@ func getSeedBlock() *cb.Block {
return seedBlock
}
func TestValidCreatedBlocksQueue(t *testing.T) {
seedBlock := getSeedBlock()
logger := flogging.NewFabricLogger(zap.NewNop())
bc := newBlockCreator(seedBlock, logger)
t.Run("correct creation of a single block", func(t *testing.T) {
block := bc.createNextBlock([]*cb.Envelope{{Payload: []byte("some other bytes")}})
assert.Equal(t, seedBlock.Header.Number+1, block.Header.Number)
assert.Equal(t, block.Data.Hash(), block.Header.DataHash)
assert.Equal(t, seedBlock.Header.Hash(), block.Header.PreviousHash)
// this created block should be part of the queue of created blocks
assert.Len(t, bc.CreatedBlocks, 1)
bc.commitBlock(block)
assert.Empty(t, bc.CreatedBlocks)
assert.Equal(t, bc.LastCommittedBlock.Header.Hash(), block.Header.Hash())
})
t.Run("ResetCreatedBlocks resets the queue of created blocks", func(t *testing.T) {
numBlocks := 10
blocks := []*cb.Block{}
for i := 0; i < numBlocks; i++ {
blocks = append(blocks, bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}}))
}
bc.resetCreatedBlocks()
assert.True(t,
bytes.Equal(bc.LastCommittedBlock.Header.Bytes(), bc.LastCreatedBlock.Header.Bytes()),
"resetting the created blocks queue should leave the lastCommittedBlock and the lastCreatedBlock equal",
)
assert.Empty(t, bc.CreatedBlocks)
})
t.Run("commit of block without any created blocks sets the lastCreatedBlock correctly", func(t *testing.T) {
block := bc.createNextBlock([]*cb.Envelope{{Payload: []byte("some other bytes")}})
bc.resetCreatedBlocks()
bc.commitBlock(block)
assert.True(t,
bytes.Equal(block.Header.Bytes(), bc.LastCommittedBlock.Header.Bytes()),
"resetting the created blocks queue should leave the lastCommittedBlock and the lastCreatedBlock equal",
)
assert.True(t,
bytes.Equal(block.Header.Bytes(), bc.LastCreatedBlock.Header.Bytes()),
"resetting the created blocks queue should leave the lastCommittedBlock and the lastCreatedBlock equal",
)
assert.Empty(t, bc.CreatedBlocks)
})
t.Run("propose multiple blocks without having to commit them; tests the optimistic block creation", func(t *testing.T) {
/*
* Scenario:
* We create five blocks initially and then commit only two of them. We further create five more blocks
* and commit out the remaining 8 blocks in the propose stack. This should succeed since the committed
* blocks are not divergent from the created blocks.
*/
blocks := []*cb.Block{}
// Create five blocks without writing them out
for i := 0; i < 5; i++ {
blocks = append(blocks, bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}}))
}
assert.Len(t, bc.CreatedBlocks, 5)
// Write two of these out
for i := 0; i < 2; i++ {
bc.commitBlock(blocks[i])
}
assert.Len(t, bc.CreatedBlocks, 3)
// Create five more blocks; these should be created over the previous five blocks created
for i := 0; i < 5; i++ {
blocks = append(blocks, bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}}))
}
assert.Len(t, bc.CreatedBlocks, 8)
// Write out the remaining eight blocks; can only succeed if all the blocks were created in a single stack else will panic
for i := 2; i < 10; i++ {
bc.commitBlock(blocks[i])
}
assert.Empty(t, bc.CreatedBlocks)
// Assert that the block were indeed created in the correct sequence
for i := 0; i < 9; i++ {
assertNextBlock(t, blocks[i], blocks[i+1])
}
})
t.Run("createNextBlock returns nil after createdBlocksBuffersize blocks have been created", func(t *testing.T) {
numBlocks := createdBlocksBuffersize
blocks := []*cb.Block{}
for i := 0; i < numBlocks; i++ {
blocks = append(blocks, bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}}))
}
block := bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}})
assert.Nil(t, block)
})
t.Run("created blocks should always be over committed blocks", func(t *testing.T) {
/*
* Scenario:
* We will create
* 1. a propose stack with 5 blocks over baseLastCreatedBlock, and
* 2. an alternate block over baseLastCreatedBlock.
* We will write out this alternate block and verify that the subsequent block is created over this alternate block
* and not on the existing propose stack.
* This scenario fails if commitBlock does not flush the createdBlocks queue when the written block is divergent from the
* created blocks.
*/
baseLastCreatedBlock := bc.LastCreatedBlock
// Create the stack of five blocks without writing them out
blocks := []*cb.Block{}
for i := 0; i < 5; i++ {
blocks = append(blocks, bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}}))
}
// create and write out the alternate block
alternateBlock := createBlockOverSpecifiedBlock(baseLastCreatedBlock, []*cb.Envelope{{Payload: []byte("alternate test envelope")}})
bc.commitBlock(alternateBlock)
// assert that createNextBlock creates the next block over this alternateBlock
createdBlockOverAlternateBlock := bc.createNextBlock([]*cb.Envelope{{Payload: []byte("test envelope")}})
synthesizedBlockOverAlternateBlock := createBlockOverSpecifiedBlock(alternateBlock, []*cb.Envelope{{Payload: []byte("test envelope")}})
assert.True(t,
bytes.Equal(createdBlockOverAlternateBlock.Header.Bytes(), synthesizedBlockOverAlternateBlock.Header.Bytes()),
"created and synthesized blocks should be equal",
)
bc.commitBlock(createdBlockOverAlternateBlock)
})
}
func createBlockOverSpecifiedBlock(baseBlock *cb.Block, messages []*cb.Envelope) *cb.Block {
previousBlockHash := baseBlock.Header.Hash()
data := &cb.BlockData{
Data: make([][]byte, len(messages)),
func TestCreateNextBlock(t *testing.T) {
first := cb.NewBlock(0, []byte("firsthash"))
bc := &blockCreator{
hash: first.Header.Hash(),
number: first.Header.Number,
logger: flogging.NewFabricLogger(zap.NewNop()),
}
var err error
for i, msg := range messages {
data.Data[i], err = proto.Marshal(msg)
if err != nil {
panic(fmt.Sprintf("Could not marshal envelope: %s", err))
}
}
block := cb.NewBlock(baseBlock.Header.Number+1, previousBlockHash)
block.Header.DataHash = data.Hash()
block.Data = data
return block
}
second := bc.createNextBlock([]*cb.Envelope{{Payload: []byte("some other bytes")}})
assert.Equal(t, first.Header.Number+1, second.Header.Number)
assert.Equal(t, second.Data.Hash(), second.Header.DataHash)
assert.Equal(t, first.Header.Hash(), second.Header.PreviousHash)
// isNextBlock returns true if `nextBlock` is correctly formed as the next block
// following `block` in a chain.
func assertNextBlock(t *testing.T, block, nextBlock *cb.Block) {
assert.Equal(t, block.Header.Number+1, nextBlock.Header.Number)
assert.Equal(t, block.Header.Hash(), nextBlock.Header.PreviousHash)
third := bc.createNextBlock([]*cb.Envelope{{Payload: []byte("some other bytes")}})
assert.Equal(t, second.Header.Number+1, third.Header.Number)
assert.Equal(t, third.Data.Hash(), third.Header.DataHash)
assert.Equal(t, second.Header.Hash(), third.Header.PreviousHash)
}
......@@ -112,8 +112,7 @@ type Chain struct {
clock clock.Clock // Tests can inject a fake clock
support consensus.ConsenterSupport
BlockCreator *blockCreator
support consensus.ConsenterSupport
appliedIndex uint64
......@@ -160,8 +159,6 @@ func NewChain(
snapBlkNum = b.Header.Number
}
lastBlock := support.Block(support.Height() - 1)
c := &Chain{
configurator: conf,
rpc: rpc,
......@@ -176,7 +173,6 @@ func NewChain(
observeC: observeC,
support: support,
fresh: fresh,
BlockCreator: newBlockCreator(lastBlock, lg),
appliedIndex: opts.RaftMetadata.RaftIndex,
lastSnapBlockNum: snapBlkNum,
puller: puller,
......@@ -403,6 +399,7 @@ func (c *Chain) serveRequest() {
var leader uint64
submitC := c.submitC
var bc *blockCreator
for {
select {
......@@ -429,7 +426,7 @@ func (c *Chain) serveRequest() {
stop()
}
proposedConfigBlock := c.propose(batches...)
proposedConfigBlock := c.propose(bc, batches...)
if proposedConfigBlock {
submitC = nil // stop accepting new envelopes
}
......@@ -452,14 +449,21 @@ func (c *Chain) serveRequest() {
// follower -> leader
if newLeader == c.raftID {
elected = true
lastBlock := c.support.Block(c.support.Height() - 1)
bc = &blockCreator{
hash: lastBlock.Header.Hash(),
number: lastBlock.Header.Number,
logger: c.logger,
}
}
// leader -> follower
if leader == c.raftID {
_ = c.support.BlockCutter().Cut()
c.BlockCreator.resetCreatedBlocks()
stop()
submitC = c.submitC
bc = nil
}
leader = newLeader
......@@ -502,7 +506,7 @@ func (c *Chain) serveRequest() {
}
c.logger.Debugf("Batch timer expired, creating block")
c.propose(batch) // we are certain this is normal block, no need to block
c.propose(bc, batch) // we are certain this is normal block, no need to block
case sn := <-c.snapC:
if sn.Metadata.Index <= c.appliedIndex {
......@@ -530,7 +534,6 @@ func (c *Chain) serveRequest() {
// writeBlock returns a bool indicating whether we
// should unblock submitC to accept new envelopes
func (c *Chain) writeBlock(block *common.Block, index uint64) bool {
c.BlockCreator.commitBlock(block)
if utils.IsConfigBlock(block) {
return c.writeConfigBlock(block, index)
}
......@@ -582,9 +585,9 @@ func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]*common.Envelop
// propose returns true if config block is in-flight and we should
// block waiting for it to be committed before accepting new env
func (c *Chain) propose(batches ...[]*common.Envelope) (configInFlight bool) {
func (c *Chain) propose(bc *blockCreator, batches ...[]*common.Envelope) (configInFlight bool) {
for _, batch := range batches {
b := c.BlockCreator.createNextBlock(batch)
b := bc.createNextBlock(batch)
data := utils.MarshalOrPanic(b)
if err := c.node.Propose(context.TODO(), data); err != nil {
c.logger.Errorf("Failed to propose block to raft: %s", err)
......@@ -623,7 +626,6 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
if block == nil {
return errors.Errorf("failed to fetch block %d from cluster", next)
}
c.BlockCreator.commitBlock(block)
if utils.IsConfigBlock(block) {
c.support.WriteConfigBlock(block, nil)
} else {
......
......@@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0
package etcdraft_test
import (
"bytes"
"encoding/pem"
"fmt"
"io/ioutil"
......@@ -1862,30 +1861,12 @@ var _ = Describe("Chain", func() {
Expect(err).NotTo(HaveOccurred())
}
Eventually(c1.BlockCreator.CreatedBlocks).Should(HaveLen(10))
})
It("calls BlockCreator.commitBlock on all the nodes' chains once a block is written", func() {
normalBlock := &common.Block{
Header: &common.BlockHeader{},
Data: &common.BlockData{Data: [][]byte{[]byte("foo")}},
Metadata: &common.BlockMetadata{Metadata: make([][]byte, 4)},
}
// to test that commitBlock is called on c2(follower) as well; this block should be discarded
// after the calling of commitBlock since it is a diverging block
c2.BlockCreator.CreatedBlocks <- normalBlock
c1.cutter.CutNext = true
err := c1.Order(env, 0)
Expect(err).ToNot(HaveOccurred())
network.connect(1)
c1.clock.Increment(interval)
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
b, _ := c.support.WriteBlockArgsForCall(0)
Eventually(c.BlockCreator.CreatedBlocks, LongEventualTimeout).Should(HaveLen(0)) // implies that BlockCreator.commitBlock was called
// check that it updates the LastCreatedBlock correctly as well
Eventually(bytes.Equal(b.Header.Bytes(), c.BlockCreator.LastCreatedBlock.Header.Bytes()), LongEventualTimeout).Should(BeTrue())
Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(10))
})
})
......@@ -1914,22 +1895,38 @@ var _ = Describe("Chain", func() {
// config block
err := c1.Order(configEnv, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(c1.BlockCreator.CreatedBlocks).Should(HaveLen(1))
// to avoid data races since we are accessing these within a goroutine
tempEnv := env
tempC1 := c1
// normal blocks
done := make(chan struct{})
// normal block
go func() {
defer GinkgoRecover()
// This should be blocked if config block is not committed
err := tempC1.Order(tempEnv, 0)
// since the chain is stopped after the Consistently test below passes
Expect(err).To(MatchError("chain is stopped"))
Expect(err).NotTo(HaveOccurred())
close(done)
}()
// ensure that only one block is created since the config block is never written out
Consistently(c1.BlockCreator.CreatedBlocks).Should(HaveLen(1))
Consistently(done).ShouldNot(BeClosed())
network.connect(1)
c1.clock.Increment(interval)
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteConfigBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
})
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
})
})
It("continues creating blocks on leader after a config block has been successfully written out", func() {
......@@ -2138,13 +2135,6 @@ var _ = Describe("Chain", func() {
It("purges blockcutter, stops timer and discards created blocks if leadership is lost", func() {
// create one block on chain 1 to test for reset of the created blocks
network.disconnect(1)
normalBlock := &common.Block{
Header: &common.BlockHeader{},
Data: &common.BlockData{Data: [][]byte{[]byte("foo")}},
Metadata: &common.BlockMetadata{Metadata: make([][]byte, 4)},
}
c1.BlockCreator.CreatedBlocks <- normalBlock
Expect(len(c1.BlockCreator.CreatedBlocks)).To(Equal(1))
// enqueue one transaction into 1's blockcutter to test for purging of block cutter
c1.cutter.CutNext = false
......@@ -2162,7 +2152,6 @@ var _ = Describe("Chain", func() {
Eventually(c1.clock.WatcherCount, LongEventualTimeout).Should(Equal(1)) // blockcutter time is stopped
Eventually(c1.cutter.CurBatch, LongEventualTimeout).Should(HaveLen(0))
// the created block should be discarded since there is a leadership change
Eventually(c1.BlockCreator.CreatedBlocks).Should(HaveLen(0))
Consistently(c1.support.WriteBlockCallCount).Should(Equal(0))
network.disconnect(2)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment