Unverified Commit d735a06c authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13447] Streamline the code



Instead of returning status several levels up, several methods
in etcdraft chain can just set member var to store current state.

Change-Id: I67612917bf3bb3225f1507c8b7376d730b18e9f4
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent ff843afd
......@@ -109,6 +109,8 @@ type Chain struct {
raftMetadataLock sync.RWMutex
confChangeInProgress *raftpb.ConfChange
justElected bool // this is true when node has just been elected
configInflight bool // this is true when there is config block or ConfChange in flight
clock clock.Clock // Tests can inject a fake clock
......@@ -401,6 +403,36 @@ func (c *Chain) serveRequest() {
submitC := c.submitC
var bc *blockCreator
becomeLeader := func() {
c.justElected = true
submitC = nil
lastBlock := c.support.Block(c.support.Height() - 1)
bc = &blockCreator{
hash: lastBlock.Header.Hash(),
number: lastBlock.Header.Number,
logger: c.logger,
}
// if there is unfinished ConfChange, we should resume the effort to propose it as
// new leader, and wait for it to be committed before start serving new requests.
if cc := c.getInFlightConfChange(); cc != nil {
if err := c.node.ProposeConfChange(context.TODO(), *cc); err != nil {
c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
}
c.confChangeInProgress = cc
c.configInflight = true
}
}
becomeFollower := func() {
_ = c.support.BlockCutter().Cut()
stop()
submitC = c.submitC
bc = nil
}
for {
select {
case s := <-submitC:
......@@ -426,8 +458,8 @@ func (c *Chain) serveRequest() {
stop()
}
proposedConfigBlock := c.propose(bc, batches...)
if proposedConfigBlock {
c.propose(bc, batches...)
if c.configInflight {
submitC = nil // stop accepting new envelopes
}
......@@ -439,31 +471,17 @@ func (c *Chain) serveRequest() {
s.errC <- err // send error back to submitter
case app := <-c.applyC:
var elected, ready bool
if app.soft != nil {
newLeader := atomic.LoadUint64(&app.soft.Lead) // etcdraft requires atomic access
if newLeader != leader {
c.logger.Infof("Raft leader changed: %d -> %d", leader, newLeader)
// follower -> leader
if newLeader == c.raftID {
elected = true
lastBlock := c.support.Block(c.support.Height() - 1)
bc = &blockCreator{
hash: lastBlock.Header.Hash(),
number: lastBlock.Header.Number,
logger: c.logger,
}
becomeLeader()
}
// leader -> follower
if leader == c.raftID {
_ = c.support.BlockCutter().Cut()
stop()
submitC = c.submitC
bc = nil
becomeFollower()
}
leader = newLeader
......@@ -476,23 +494,9 @@ func (c *Chain) serveRequest() {
}
}
if elected {
// if there is unfinished ConfChange, we should resume the effort to propose it as
// new leader, and wait for it to be committed before start serving new requests.
if cc := c.getInFlightConfChange(); cc != nil {
if err := c.node.ProposeConfChange(context.TODO(), *cc); err != nil {
c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
}
c.apply(app.entries)
c.confChangeInProgress = cc
} else {
ready = true // no ConfChange in flight, ready to serve requests
}
}
appliedConfig := c.apply(app.entries)
if (appliedConfig || ready) && c.raftID == leader {
c.logger.Infof("Start accepting requests as Raft leader")
if !c.configInflight {
submitC = c.submitC
}
......@@ -531,11 +535,10 @@ func (c *Chain) serveRequest() {
}
}
// writeBlock returns a bool indicating whether we
// should unblock submitC to accept new envelopes
func (c *Chain) writeBlock(block *common.Block, index uint64) bool {
func (c *Chain) writeBlock(block *common.Block, index uint64) {
if utils.IsConfigBlock(block) {
return c.writeConfigBlock(block, index)
c.writeConfigBlock(block, index)
return
}
c.raftMetadataLock.Lock()
......@@ -544,7 +547,6 @@ func (c *Chain) writeBlock(block *common.Block, index uint64) bool {
c.raftMetadataLock.Unlock()
c.support.WriteBlock(block, m)
return false
}
// Orders the envelope in the `msg` content. SubmitRequest.
......@@ -583,9 +585,7 @@ func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]*common.Envelop
}
// propose returns true if config block is in-flight and we should
// block waiting for it to be committed before accepting new env
func (c *Chain) propose(bc *blockCreator, batches ...[]*common.Envelope) (configInFlight bool) {
func (c *Chain) propose(bc *blockCreator, batches ...[]*common.Envelope) {
for _, batch := range batches {
b := bc.createNextBlock(batch)
data := utils.MarshalOrPanic(b)
......@@ -596,7 +596,7 @@ func (c *Chain) propose(bc *blockCreator, batches ...[]*common.Envelope) (config
// if it is config block, then we should wait for the commit of the block
if utils.IsConfigBlock(b) {
configInFlight = true
c.configInflight = true
}
}
......@@ -639,10 +639,7 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
return nil
}
// apply applies data to ledger, and return true when:
// - comitting a config block that does not add/remove nodes, or
// - applying a raft ConfChange that was introduced by previous config block
func (c *Chain) apply(ents []raftpb.Entry) (unblocking bool) {
func (c *Chain) apply(ents []raftpb.Entry) {
if len(ents) == 0 {
return
}
......@@ -663,7 +660,7 @@ func (c *Chain) apply(ents []raftpb.Entry) (unblocking bool) {
}
block := utils.UnmarshalBlockOrPanic(ents[i].Data)
unblocking = c.writeBlock(block, ents[i].Index)
c.writeBlock(block, ents[i].Index)
appliedb = block.Header.Number
position = i
......@@ -688,7 +685,7 @@ func (c *Chain) apply(ents []raftpb.Entry) (unblocking bool) {
}
c.confChangeInProgress = nil
unblocking = true
c.configInflight = false
}
if cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID {
......@@ -794,8 +791,7 @@ func (c *Chain) checkConsentersSet(configValue *common.ConfigValue) error {
// writeConfigBlock writes configuration blocks into the ledger in
// addition extracts updates about raft replica set and if there
// are changes updates cluster membership as well
// it returns false if this config block adds/removes raft node.
func (c *Chain) writeConfigBlock(block *common.Block, index uint64) bool {
func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
metadata, raftMetadata := c.newRaftMetadata(block)
var changes *MembershipChanges
......@@ -809,6 +805,7 @@ func (c *Chain) writeConfigBlock(block *common.Block, index uint64) bool {
raftMetadataBytes := utils.MarshalOrPanic(raftMetadata)
// write block with metadata
c.support.WriteConfigBlock(block, raftMetadataBytes)
c.configInflight = false
// update membership
if confChange != nil {
......@@ -824,10 +821,8 @@ func (c *Chain) writeConfigBlock(block *common.Block, index uint64) bool {
c.opts.RaftMetadata = raftMetadata
c.raftMetadataLock.Unlock()
return false
c.configInflight = true
}
return true
}
// getInFlightConfChange returns ConfChange in-flight if any.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment