Unverified Commit 5dadb3a5 authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13613] Fix race in etcdraft chain UT



Add a lock to guard manipulation of `StepStub`.

Change-Id: Icaadb1f5aea0cb7f266f24ed6756c4f6541768bd
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 0d247c1d
......@@ -18,6 +18,7 @@ import (
"code.cloudfoundry.org/clock/fakeclock"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/bccsp/factory"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
......@@ -1590,9 +1591,9 @@ var _ = Describe("Chain", func() {
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, addConsenterConfigValue()))
c1.cutter.CutNext = true
step := c1.rpc.StepStub
step1 := c1.getStepFunc()
count := c1.rpc.StepCallCount() // record current step call count
c1.rpc.StepStub = func(dest uint64, msg *orderer.StepRequest) (*orderer.StepResponse, error) {
c1.setStepFunc(func(dest uint64, msg *orderer.StepRequest) (*orderer.StepResponse, error) {
// disconnect network after 4 MsgApp are sent by c1:
// - 2 MsgApp to c2 & c3 that replicate data to raft followers
// - 2 MsgApp to c2 & c3 that instructs followers to commit data
......@@ -1604,8 +1605,8 @@ var _ = Describe("Chain", func() {
}()
}
return step(dest, msg)
}
return step1(dest, msg)
})
By("sending config transaction")
err := c1.Configure(configEnv, 0)
......@@ -1870,6 +1871,86 @@ var _ = Describe("Chain", func() {
})
})
It("new leader should wait for in-fight blocks to commit before accepting new env", func() {
// Scenario: when a node is elected as new leader and there are still in-flight blocks,
// it should not immediately start accepting new envelopes, instead it should wait for
// those in-flight blocks to be committed, otherwise we may create uncle block which
// forks and panicks chain.
//
// Steps:
// - start raft cluster with three nodes and genesis block0
// - order env1 on c1, which creates block1
// - drop MsgApp from 1 to 3
// - drop second round of MsgApp sent from 1 to 2, so that block1 is only committed on c1
// - disconnect c1 and elect c2
// - order env2 on c2. This env must NOT be immediately accepted, otherwise c2 would create
// an uncle block1 based on block0.
// - c2 commits block1
// - c2 accepts env2, and creates block2
// - c2 commits block2
c1.cutter.CutNext = true
c2.cutter.CutNext = true
step1 := c1.getStepFunc()
c1.setStepFunc(func(dest uint64, msg *orderer.StepRequest) (*orderer.StepResponse, error) {
stepMsg := &raftpb.Message{}
Expect(proto.Unmarshal(msg.Payload, stepMsg)).NotTo(HaveOccurred())
if dest == 3 {
return nil, nil
}
if stepMsg.Type == raftpb.MsgApp && len(stepMsg.Entries) == 0 {
return nil, nil
}
return step1(dest, msg)
})
Expect(c1.Order(env, 0)).NotTo(HaveOccurred())
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Consistently(c2.support.WriteBlockCallCount).Should(Equal(0))
Consistently(c3.support.WriteBlockCallCount).Should(Equal(0))
network.disconnect(1)
step2 := c2.getStepFunc()
c2.setStepFunc(func(dest uint64, msg *orderer.StepRequest) (*orderer.StepResponse, error) {
stepMsg := &raftpb.Message{}
Expect(proto.Unmarshal(msg.Payload, stepMsg)).NotTo(HaveOccurred())
if stepMsg.Type == raftpb.MsgApp && len(stepMsg.Entries) != 0 && dest == 3 {
for _, ent := range stepMsg.Entries {
if len(ent.Data) != 0 {
return nil, nil
}
}
}
return step2(dest, msg)
})
network.elect(2)
go func() {
Expect(c2.Order(env, 0)).NotTo(HaveOccurred())
}()
Consistently(c2.support.WriteBlockCallCount).Should(Equal(0))
Consistently(c3.support.WriteBlockCallCount).Should(Equal(0))
c2.setStepFunc(step2)
c2.clock.Increment(interval)
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
b, _ := c2.support.WriteBlockArgsForCall(0)
Expect(b.Header.Number).To(Equal(uint64(1)))
b, _ = c2.support.WriteBlockArgsForCall(1)
Expect(b.Header.Number).To(Equal(uint64(2)))
})
Context("handling config blocks", func() {
var configEnv *common.Envelope
BeforeEach(func() {
......@@ -2291,9 +2372,14 @@ func marshalOrPanic(pb proto.Message) []byte {
}
// helpers to facilitate tests
type stepFunc func(dest uint64, msg *orderer.StepRequest) (*orderer.StepResponse, error)
type chain struct {
id uint64
stepLock sync.RWMutex
step stepFunc
support *consensusmocks.FakeConsenterSupport
cutter *mockblockcutter.Receiver
configurator *mocks.Configurator
......@@ -2415,6 +2501,18 @@ func (c *chain) init() {
c.Chain = ch
}
func (c *chain) setStepFunc(f stepFunc) {
c.stepLock.Lock()
c.step = f
c.stepLock.Unlock()
}
func (c *chain) getStepFunc() stepFunc {
c.stepLock.RLock()
defer c.stepLock.RUnlock()
return c.step
}
type network struct {
leader uint64
chains map[uint64]*chain
......@@ -2443,7 +2541,7 @@ func (n *network) addConnection(id uint64) {
func (n *network) addChain(c *chain) {
n.addConnection(c.id)
c.rpc.StepStub = func(dest uint64, msg *orderer.StepRequest) (*orderer.StepResponse, error) {
c.step = func(dest uint64, msg *orderer.StepRequest) (*orderer.StepResponse, error) {
n.connLock.RLock()
defer n.connLock.RUnlock()
......@@ -2457,6 +2555,12 @@ func (n *network) addChain(c *chain) {
return nil, nil
}
c.rpc.StepStub = func(dest uint64, msg *orderer.StepRequest) (*orderer.StepResponse, error) {
c.stepLock.RLock()
defer c.stepLock.RUnlock()
return c.step(dest, msg)
}
c.rpc.SendSubmitStub = func(dest uint64, msg *orderer.SubmitRequest) error {
n.connLock.RLock()
defer n.connLock.RUnlock()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment