Unverified Commit 100e1ad7 authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13178] Move `SendSubmit` out of serveRequest



When gRPC buffer of `Submit` stream is full, `SendSubmit` would
block, which freezes the `serveRequest` go routine. This CR moves
this out of go routine, and clients should be blocked on waiting
for room in buffer.

Change-Id: I62cd261b9419bd8df3fa1bfaeff14551168d2e65
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent e0e3ddbb
......@@ -86,8 +86,8 @@ type Options struct {
}
type submit struct {
req *orderer.SubmitRequest
errC chan error
req *orderer.SubmitRequest
leader chan uint64
}
// Chain implements consensus.Chain interface.
......@@ -368,13 +368,23 @@ func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
return err
}
errC := make(chan error, 1)
leadC := make(chan uint64, 1)
select {
case c.submitC <- &submit{req, errC}:
return <-errC
case c.submitC <- &submit{req, leadC}:
lead := <-leadC
if lead == raft.None {
return errors.Errorf("no Raft leader")
}
if lead != c.raftID {
return c.rpc.SendSubmit(lead, req)
}
case <-c.doneC:
return errors.Errorf("chain is stopped")
}
return nil
}
type apply struct {
......@@ -449,38 +459,29 @@ func (c *Chain) serveRequest() {
}
if soft.RaftState == raft.StatePreCandidate || soft.RaftState == raft.StateCandidate {
s.errC <- errors.Errorf("no Raft leader")
s.leader <- raft.None
continue
}
var err error
switch soft.Lead {
case raft.None: // no Raft leader
c.logger.Debugf("Request is dropped because there is no Raft leader")
err = errors.Errorf("no Raft leader")
case c.raftID: // this is leader
batches, pending, err := c.ordered(s.req)
if err != nil {
c.logger.Errorf("Failed to order message: %s", err)
}
if pending {
start() // no-op if timer is already started
} else {
stop()
}
c.propose(bc, batches...)
if c.configInflight || c.blockInflight >= c.opts.MaxInflightMsgs {
submitC = nil // stop accepting new envelopes
}
s.leader <- soft.Lead
if soft.Lead != c.raftID {
continue
}
default: // this is follower
c.logger.Debugf("Forwarding submit request to raft leader %d", soft.Lead)
err = c.rpc.SendSubmit(soft.Lead, s.req)
batches, pending, err := c.ordered(s.req)
if err != nil {
c.logger.Errorf("Failed to order message: %s", err)
}
if pending {
start() // no-op if timer is already started
} else {
stop()
}
s.errC <- err // send error back to submitter
c.propose(bc, batches...)
if c.configInflight || c.blockInflight >= c.opts.MaxInflightMsgs {
submitC = nil // stop accepting new envelopes
}
case app := <-c.applyC:
if app.soft != nil {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment