Unverified Commit 5c3e2fce authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-12709] Use another way to elect leader in UT



In etcdraft UT, we often need to deterministically elect a leader.
This was done by ticking ONLY one node in the network, so it is
the only node that start campaign.

HOWEVER, there are several problems with this approach:
1. it's slow. We need real time interval between ticks due to the
   way fake clock is implemented: it drops tick on the floor in
   case of slow consumer.
2. there is random factor in election timeout of etcd/raft. It is
   calculated as follow:
```
randomElectionTimeout = electionTimeout + rand.Intn(electionTimeout)
```
   in another word, if we send electionTimeout ticks, it's not
   guaranteed to trigger a leader election
3. if CheckQuorum is enabled, a lease is imposed on follower nodes
   which gets expired if
      electionTimeout <= elapsedTicks < randomElectionTimeout
   (if it's greater than randomElectionTimeout, it's reset to 0 and
   node starts campaign)

In this CR, we send an artificial MsgTimeoutNow to the node to be
elected. This message reliably triggers campaign and skip the lease
check.

This CR also fixes several potential data race and flakes in tests.

Change-Id: I3c8e0bcadbb8cfa1ae3393de2ea711fdd0d8b7aa
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 20dc27fc
......@@ -985,6 +985,7 @@
"github.com/onsi/gomega/gbytes",
"github.com/onsi/gomega/gexec",
"github.com/onsi/gomega/ghttp",
"github.com/onsi/gomega/types",
"github.com/op/go-logging",
"github.com/pkg/errors",
"github.com/prometheus/client_golang/prometheus",
......
......@@ -32,8 +32,10 @@ import (
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
raftprotos "github.com/hyperledger/fabric/protos/orderer/etcdraft"
"github.com/hyperledger/fabric/protos/utils"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
......@@ -1461,16 +1463,9 @@ var _ = Describe("Chain", func() {
c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0))
network.addChain(c4)
c4.Start()
// ConfChange is applied to etcd/raft asynchronously, meaning node 4 is not added
// to leader's node list right away. An immediate tick does not trigger a heartbeat
// being sent to node 4. Therefore, we repeatedly tick the leader until node 4 joins
// the cluster successfully.
Eventually(func() <-chan raft.SoftState {
c1.clock.Increment(interval)
return c4.observe
}, defaultTimeout).Should(Receive(Equal(raft.SoftState{Lead: 1, RaftState: raft.StateFollower})))
c4.start()
Expect(c4.WaitReady()).To(Succeed())
network.join(4, true)
Eventually(c4.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1))
Eventually(c4.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(1))
......@@ -1479,6 +1474,7 @@ var _ = Describe("Chain", func() {
c1.cutter.CutNext = true
err = c4.Order(env, 0)
Expect(err).ToNot(HaveOccurred())
Eventually(c4.support.WriteBlockCallCount, defaultTimeout).Should(Equal(2))
// elect newly added node to be the leader
network.elect(4)
......@@ -1516,14 +1512,18 @@ var _ = Describe("Chain", func() {
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, addConsenterConfigValue()))
c1.cutter.CutNext = true
stub := c1.support.WriteConfigBlockStub
c1.support.WriteConfigBlockStub = func(block *common.Block, metadata []byte) {
stub(block, metadata)
// disconnect leader after block being committed
network.disconnect(1)
// electing new leader
network.elect(2)
}
step1 := c1.getStepFunc()
count := c1.rpc.StepCallCount() // record current step call count
c1.setStepFunc(func(dest uint64, msg *orderer.StepRequest) (*orderer.StepResponse, error) {
// disconnect network after 4 MsgApp are sent by c1:
// - 2 MsgApp to c2 & c3 that replicate data to raft followers
// - 2 MsgApp to c2 & c3 that instructs followers to commit data
if c1.rpc.StepCallCount() == count+4 {
defer network.disconnect(1)
}
return step1(dest, msg)
})
By("sending config transaction")
err := c1.Configure(configEnv, 0)
......@@ -1534,6 +1534,8 @@ var _ = Describe("Chain", func() {
func(c *chain) {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
c1.setStepFunc(step1)
network.elect(2)
_, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0)
meta := &common.Metadata{Value: raftmetabytes}
......@@ -1549,15 +1551,9 @@ var _ = Describe("Chain", func() {
c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0))
network.addChain(c4)
c4.Start()
// ConfChange is applied to etcd/raft asynchronously, meaning node 4 is not added
// to leader's node list right away. An immediate tick does not trigger a heartbeat
// being sent to node 4. Therefore, we repeatedly tick the leader until node 4 joins
// the cluster successfully.
Eventually(func() <-chan raft.SoftState {
c2.clock.Increment(interval)
return c4.observe
}, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: 2, RaftState: raft.StateFollower})))
c4.start()
Expect(c4.WaitReady()).To(Succeed())
network.join(4, true)
Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c4.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
......@@ -1639,16 +1635,12 @@ var _ = Describe("Chain", func() {
network.connect(i)
}
c4.Start()
By("re-elect node 2 to be a leader")
network.elect(2)
By("confirm new node observed a leader node")
Eventually(func() <-chan raft.SoftState {
c2.clock.Increment(interval)
return c4.observe
}, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: 2, RaftState: raft.StateFollower})))
c4.start()
Expect(c4.WaitReady()).To(Succeed())
network.join(4, false)
Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c4.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
......@@ -1678,29 +1670,30 @@ var _ = Describe("Chain", func() {
c1.cutter.CutNext = true
stub := c1.support.WriteConfigBlockStub
c1.support.WriteConfigBlockStub = func(block *common.Block, metadata []byte) {
stub(block, metadata)
// disconnect leader after block being committed
network.disconnect(1)
}
step1 := c1.getStepFunc()
count := c1.rpc.StepCallCount() // record current step call count
c1.setStepFunc(func(dest uint64, msg *orderer.StepRequest) (*orderer.StepResponse, error) {
// disconnect network after 4 MsgApp are sent by c1:
// - 2 MsgApp to c2 & c3 that replicate data to raft followers
// - 2 MsgApp to c2 & c3 that instructs followers to commit data
if c1.rpc.StepCallCount() == count+4 {
defer network.disconnect(1)
}
return step1(dest, msg)
})
By("sending config transaction")
err := c1.Configure(configEnv, 0)
Expect(err).ToNot(HaveOccurred())
Eventually(c1.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
// Defer assertions on c2&c3 after c2 being elected because we may disconnect c1 too
// quickly, and the MsgApps it sends to c2/c3 are dropped. Therefore, this config block
// on c2/c3 has not been committed yet. After c2 being elected, new leader will continue
// the effort to commit config block, if it's not done yet.
network.exec(func(c *chain) {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
// electing new leader
network.elect(2)
Eventually(c2.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c3.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
By("submitting new transaction to follower")
c2.cutter.CutNext = true
err = c3.Order(env, 0)
......@@ -1709,8 +1702,6 @@ var _ = Describe("Chain", func() {
// rest nodes are alive include a newly added, hence should write 2 blocks
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
// node 1 has been removed from replica set should not be getting any blocks
Consistently(c1.support.WriteBlockCallCount).Should(Equal(1))
})
It("removes leader from replica set", func() {
......@@ -1844,6 +1835,7 @@ var _ = Describe("Chain", func() {
doneProp := make(chan struct{})
go func() {
defer GinkgoRecover()
Expect(c1.Order(env, 0)).To(Succeed())
close(doneProp)
}()
......@@ -2029,6 +2021,7 @@ var _ = Describe("Chain", func() {
network.elect(2)
go func() {
defer GinkgoRecover()
Expect(c2.Order(env, 0)).NotTo(HaveOccurred())
}()
......@@ -2191,7 +2184,7 @@ var _ = Describe("Chain", func() {
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0))
network.rejoin(2, false)
network.join(2, false)
Eventually(c2.puller.PullBlockCallCount, LongEventualTimeout).Should(Equal(10))
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(10))
......@@ -2302,7 +2295,7 @@ var _ = Describe("Chain", func() {
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(10))
network.rejoin(2, false)
network.join(2, false)
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(10))
network.disconnect(1)
......@@ -2310,21 +2303,19 @@ var _ = Describe("Chain", func() {
})
It("purges blockcutter, stops timer and discards created blocks if leadership is lost", func() {
// create one block on chain 1 to test for reset of the created blocks
network.disconnect(1)
// enqueue one transaction into 1's blockcutter to test for purging of block cutter
c1.cutter.CutNext = false
err := c1.Order(env, 0)
Expect(err).ToNot(HaveOccurred())
Eventually(c1.cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
// the created block should not be written since leader should not be able to get votes
// the block due to the network disconnectivity.
// no block should be written because env is not cut into block yet
c1.clock.WaitForNWatchersAndIncrement(interval, 2)
Consistently(c1.support.WriteBlockCallCount).Should(Equal(0))
network.disconnect(1)
network.elect(2)
network.rejoin(1, true)
network.join(1, true)
Eventually(c1.clock.WatcherCount, LongEventualTimeout).Should(Equal(1)) // blockcutter time is stopped
Eventually(c1.cutter.CurBatch, LongEventualTimeout).Should(HaveLen(0))
......@@ -2332,7 +2323,7 @@ var _ = Describe("Chain", func() {
Consistently(c1.support.WriteBlockCallCount).Should(Equal(0))
network.disconnect(2)
n := network.elect(1) // advances 1's clock by n intervals
network.elect(1)
err = c1.Order(env, 0)
Expect(err).ToNot(HaveOccurred())
......@@ -2360,11 +2351,11 @@ var _ = Describe("Chain", func() {
// at this point of time it should fire
// timer should not fire at this point
c1.clock.WaitForNWatchersAndIncrement(timeout-time.Duration(n*int(interval/time.Millisecond)), 2)
c1.clock.WaitForNWatchersAndIncrement(timeout-interval, 2)
Eventually(func() int { return c1.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(0))
Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(0))
c1.clock.Increment(time.Duration(n * int(interval/time.Millisecond)))
c1.clock.Increment(interval)
Eventually(func() int { return c1.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
Eventually(func() int { return c3.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(1))
})
......@@ -2599,6 +2590,11 @@ func (c *chain) init() {
c.Chain = ch
}
func (c *chain) start() {
c.unstarted = nil
c.Start()
}
func (c *chain) setStepFunc(f stepFunc) {
c.stepLock.Lock()
c.step = f
......@@ -2647,7 +2643,13 @@ func (n *network) addChain(c *chain) {
case <-n.connectivity[dest]:
case <-n.connectivity[c.id]:
default:
go n.chains[dest].Step(msg, c.id)
// get a reference of chain while
// n.chains is still RLock'ed
target := n.chains[dest]
go func() {
defer GinkgoRecover()
target.Step(msg, c.id)
}()
}
return nil, nil
......@@ -2667,7 +2669,13 @@ func (n *network) addChain(c *chain) {
case <-n.connectivity[dest]:
case <-n.connectivity[c.id]:
default:
go n.chains[dest].Submit(msg, c.id)
// get a reference of chain while
// n.chains is still RLock'ed
target := n.chains[dest]
go func() {
defer GinkgoRecover()
target.Submit(msg, c.id)
}()
}
return nil
......@@ -2782,57 +2790,60 @@ func (n *network) exec(f func(c *chain), ids ...uint64) {
}
}
// connect a node to network and tick on leader to trigger
// connect a node to network and tick leader to trigger
// a heartbeat so newly joined node can detect leader.
func (n *network) rejoin(id uint64, wasLeader bool) {
//
// expectLeaderChange controls whether leader change should
// be observed on newly joined node.
// - it should be true if newly joined node was leader
// - it should be false if newly joined node was follower, and
// already knows the leader.
func (n *network) join(id uint64, expectLeaderChange bool) {
n.connect(id)
n.chains[n.leader].clock.Increment(interval)
if wasLeader {
Eventually(n.chains[id].observe).Should(Receive(Equal(raft.SoftState{Lead: n.leader, RaftState: raft.StateFollower})))
} else {
Consistently(n.chains[id].observe).ShouldNot(Receive())
leader, follower := n.chains[n.leader], n.chains[id]
step := leader.getStepFunc()
signal := make(chan struct{})
leader.setStepFunc(func(dest uint64, msg *orderer.StepRequest) (*orderer.StepResponse, error) {
if dest == id {
// close signal channel when a message targeting newly
// joined node is observed on wire.
select {
case <-signal:
default:
close(signal)
}
}
return step(dest, msg)
})
leader.clock.Increment(interval)
Eventually(signal, LongEventualTimeout).Should(BeClosed())
leader.setStepFunc(step)
if expectLeaderChange {
Eventually(follower.observe, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: n.leader, RaftState: raft.StateFollower})))
}
// wait for newly joined node to catch up with leader
i, err := n.chains[n.leader].opts.MemoryStorage.LastIndex()
Expect(err).NotTo(HaveOccurred())
Eventually(n.chains[id].opts.MemoryStorage.LastIndex).Should(Equal(i))
Eventually(n.chains[id].opts.MemoryStorage.LastIndex, LongEventualTimeout).Should(Equal(i))
}
// elect deterministically elects a node as leader
// by only ticking timer on that node. It returns
// the actual number of ticks in case test needs it.
func (n *network) elect(id uint64) (tick int) {
// Also, due to the way fake clock is implemented,
// a slow consumer MAY skip a tick, which could
// results in undeterministic behavior. Therefore
// we are going to wait for enough time after each
// tick so it could take effect.
t := 10 * time.Millisecond
func (n *network) elect(id uint64) {
n.connLock.RLock()
c := n.chains[id]
n.connLock.RUnlock()
var elected bool
for !elected {
c.clock.Increment(interval)
tick++
select {
case <-time.After(t):
// this tick does not trigger leader change within t, continue
case s := <-c.observe: // leadership change occurs
if s.RaftState == raft.StateLeader {
elected = true
break
}
}
}
// Send node an artificial MsgTimeoutNow to emulate leadership transfer.
c.Step(&orderer.StepRequest{Payload: utils.MarshalOrPanic(&raftpb.Message{Type: raftpb.MsgTimeoutNow})}, 0)
Eventually(c.observe, LongEventualTimeout).Should(Receive(StateEqual(id, raft.StateLeader)))
// now observe leader change on other nodes
n.connLock.RLock()
for _, c := range n.chains {
if c.id == id {
......@@ -2844,13 +2855,12 @@ func (n *network) elect(id uint64) (tick int) {
case <-n.connectivity[c.id]: // skip check if node n is disconnected
case <-c.unstarted: // skip check if node is not started yet
default:
Eventually(c.observe).Should(Receive(Equal(raft.SoftState{Lead: id, RaftState: raft.StateFollower})))
Eventually(c.observe, LongEventualTimeout).Should(Receive(StateEqual(id, raft.StateFollower)))
}
}
n.connLock.RUnlock()
n.leader = id
return tick
}
func (n *network) disconnect(i uint64) {
......@@ -2915,3 +2925,7 @@ func getSeedBlock() *common.Block {
Metadata: &common.BlockMetadata{Metadata: make([][]byte, 4)},
}
}
func StateEqual(lead uint64, state raft.StateType) types.GomegaMatcher {
return Equal(raft.SoftState{Lead: lead, RaftState: state})
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment