Unverified Commit f28884a4 authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13178] Remove global leader var in etcdraft chain



This CR removes the global leader var in etcdraft chain because
it is racy in following case: several requests are to be enqued
into submitC while leader loses its leadership.

This also removes the lock on rpc.SendSubmit because it's guarded
by the channel.

Change-Id: If5e785e05dcf9bfc60e403f2d5813baf769ee103
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent ae55c51e
......@@ -85,18 +85,21 @@ type Options struct {
RaftMetadata *etcdraft.RaftMetadata
}
type submit struct {
req *orderer.SubmitRequest
errC chan error
}
// Chain implements consensus.Chain interface.
type Chain struct {
configurator Configurator
// access to `SendSubmit` should be serialzed because gRPC is not thread-safe
submitLock sync.Mutex
rpc RPC
rpc RPC
raftID uint64
channelID string
submitC chan *orderer.SubmitRequest
submitC chan *submit
applyC chan apply
observeC chan<- uint64 // Notifies external observer on leader change (passed in optionally as an argument for tests)
haltC chan struct{} // Signals to goroutines that the chain is halting
......@@ -112,7 +115,6 @@ type Chain struct {
support consensus.ConsenterSupport
BlockCreator *blockCreator
leader uint64
appliedIndex uint64
// needed by snapshotting
......@@ -165,7 +167,7 @@ func NewChain(
rpc: rpc,
channelID: support.ChainID(),
raftID: opts.RaftID,
submitC: make(chan *orderer.SubmitRequest),
submitC: make(chan *submit),
applyC: make(chan apply),
haltC: make(chan struct{}),
doneC: make(chan struct{}),
......@@ -360,25 +362,13 @@ func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
return err
}
lead := atomic.LoadUint64(&c.leader)
if lead == raft.None {
return errors.Errorf("no Raft leader")
}
if lead == c.raftID {
select {
case c.submitC <- req:
return nil
case <-c.doneC:
return errors.Errorf("chain is stopped")
}
errC := make(chan error, 1)
select {
case c.submitC <- &submit{req, errC}:
return <-errC
case <-c.doneC:
return errors.Errorf("chain is stopped")
}
c.logger.Debugf("Forwarding submit request to Raft leader %d", lead)
c.submitLock.Lock()
defer c.submitLock.Unlock()
return c.rpc.SendSubmit(lead, req)
}
type apply struct {
......@@ -416,35 +406,48 @@ func (c *Chain) serveRequest() {
for {
select {
case msg := <-submitC:
if msg == nil {
case s := <-submitC:
if s == nil {
// polled by `WaitReady`
continue
}
batches, pending, err := c.ordered(msg)
if err != nil {
c.logger.Errorf("Failed to order message: %s", err)
}
if pending {
start() // no-op if timer is already started
} else {
stop()
}
var err error
switch leader {
case raft.None: // no Raft leader
c.logger.Debugf("Request is dropped because there is no Raft leader")
err = errors.Errorf("no Raft leader")
proposedConfigBlock := c.propose(batches...)
if proposedConfigBlock {
submitC = nil // stop accepting new envelopes
case c.raftID: // this is leader
batches, pending, err := c.ordered(s.req)
if err != nil {
c.logger.Errorf("Failed to order message: %s", err)
}
if pending {
start() // no-op if timer is already started
} else {
stop()
}
proposedConfigBlock := c.propose(batches...)
if proposedConfigBlock {
submitC = nil // stop accepting new envelopes
}
default: // this is follower
c.logger.Debugf("Forwarding submit request to raft leader %d", leader)
err = c.rpc.SendSubmit(leader, s.req)
}
s.errC <- err // send error back to submitter
case app := <-c.applyC:
var elected, ready bool
if app.soft != nil {
newLeader := atomic.LoadUint64(&app.soft.Lead) // etcdraft requires atomic access
oldLeader := atomic.SwapUint64(&c.leader, newLeader)
if newLeader != oldLeader {
c.logger.Infof("Raft leader changed: %d -> %d", oldLeader, newLeader)
if newLeader != leader {
c.logger.Infof("Raft leader changed: %d -> %d", leader, newLeader)
// follower -> leader
if newLeader == c.raftID {
......@@ -452,7 +455,7 @@ func (c *Chain) serveRequest() {
}
// leader -> follower
if oldLeader == c.raftID {
if leader == c.raftID {
_ = c.support.BlockCutter().Cut()
c.BlockCreator.resetCreatedBlocks()
stop()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment