Unverified Commit 9b78a9d8 authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13178] Use MaxInflightMsgs to throttle requests



If there are MaxInflightMsgs blocks proposed but not
committed, chain blocks further incoming requests.

Change-Id: I58c84e23c882ccc152e5c9a248434e466a8b5266
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 0276480c
......@@ -114,6 +114,7 @@ type Chain struct {
confChangeInProgress *raftpb.ConfChange
justElected bool // this is true when node has just been elected
configInflight bool // this is true when there is config block or ConfChange in flight
blockInflight int // number of in flight blocks
clock clock.Clock // Tests can inject a fake clock
......@@ -415,6 +416,7 @@ func (c *Chain) serveRequest() {
var bc *blockCreator
becomeLeader := func() {
c.blockInflight = 0
c.justElected = true
submitC = nil
......@@ -431,6 +433,7 @@ func (c *Chain) serveRequest() {
}
becomeFollower := func() {
c.blockInflight = 0
_ = c.support.BlockCutter().Cut()
stop()
submitC = c.submitC
......@@ -468,7 +471,7 @@ func (c *Chain) serveRequest() {
}
c.propose(bc, batches...)
if c.configInflight {
if c.configInflight || c.blockInflight >= c.opts.MaxInflightMsgs {
submitC = nil // stop accepting new envelopes
}
......@@ -541,7 +544,7 @@ func (c *Chain) serveRequest() {
} else if c.configInflight {
c.logger.Debugf("Config block or ConfChange in flight, pause accepting transaction")
submitC = nil
} else {
} else if c.blockInflight < c.opts.MaxInflightMsgs {
submitC = c.submitC
}
......@@ -587,6 +590,10 @@ func (c *Chain) serveRequest() {
}
func (c *Chain) writeBlock(block *common.Block, index uint64) {
if c.blockInflight > 0 {
c.blockInflight-- // only reduce on leader
}
if utils.IsConfigBlock(block) {
c.writeConfigBlock(block, index)
return
......@@ -649,6 +656,8 @@ func (c *Chain) propose(bc *blockCreator, batches ...[]*common.Envelope) {
if utils.IsConfigBlock(b) {
c.configInflight = true
}
c.blockInflight++
}
return
......
......@@ -1830,6 +1830,75 @@ var _ = Describe("Chain", func() {
})
})
When("MaxInflightMsgs is reached", func() {
BeforeEach(func() {
network.exec(func(c *chain) { c.opts.MaxInflightMsgs = 1 })
})
It("waits for in flight blocks to be committed", func() {
c1.cutter.CutNext = true
// disconnect c1 to disrupt consensus
network.disconnect(1)
Expect(c1.Order(env, 0)).To(Succeed())
doneProp := make(chan struct{})
go func() {
Expect(c1.Order(env, 0)).To(Succeed())
close(doneProp)
}()
// expect second `Order` to block
Consistently(doneProp).ShouldNot(BeClosed())
network.exec(func(c *chain) {
Consistently(c.support.WriteBlockCallCount).Should(BeZero())
})
network.connect(1)
c1.clock.Increment(interval)
Eventually(doneProp).Should(BeClosed())
network.exec(func(c *chain) {
Eventually(c.support.WriteBlockCallCount).Should(Equal(2))
})
})
It("resets block in flight when steps down from leader", func() {
c1.cutter.CutNext = true
c2.cutter.CutNext = true
// disconnect c1 to disrupt consensus
network.disconnect(1)
Expect(c1.Order(env, 0)).To(Succeed())
doneProp := make(chan struct{})
go func() {
defer GinkgoRecover()
Expect(c1.Order(env, 0)).To(Succeed())
close(doneProp)
}()
// expect second `Order` to block
Consistently(doneProp).ShouldNot(BeClosed())
network.exec(func(c *chain) {
Consistently(c.support.WriteBlockCallCount).Should(BeZero())
})
network.elect(2)
Expect(c3.Order(env, 0)).To(Succeed())
Eventually(c1.support.WriteBlockCallCount).Should(Equal(0))
Eventually(c2.support.WriteBlockCallCount).Should(Equal(1))
Eventually(c3.support.WriteBlockCallCount).Should(Equal(1))
network.connect(1)
c2.clock.Increment(interval)
Eventually(doneProp).Should(BeClosed())
network.exec(func(c *chain) {
Eventually(c.support.WriteBlockCallCount).Should(Equal(2))
})
})
})
When("follower is disconnected", func() {
It("should return error when receiving an env", func() {
network.disconnect(2)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment