Unverified Commit 657b8095 authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13438] pass SoftState on observe channel



This CR changes type of etcdraft observe channel from uint64
to raft.SoftState, so that chain_test can assert not only leader
id, but also the state of node.

Change-Id: Ia0c5f8c9060c234ceb84133e0c5598ed064dd1ee
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 5dadb3a5
......@@ -101,7 +101,7 @@ type Chain struct {
submitC chan *submit
applyC chan apply
observeC chan<- uint64 // Notifies external observer on leader change (passed in optionally as an argument for tests)
observeC chan<- raft.SoftState // Notifies external observer on leader change (passed in optionally as an argument for tests)
haltC chan struct{} // Signals to goroutines that the chain is halting
doneC chan struct{} // Closes when the chain halts
startC chan struct{} // Closes when the node is started
......@@ -138,7 +138,7 @@ func NewChain(
conf Configurator,
rpc RPC,
puller BlockPuller,
observeC chan<- uint64) (*Chain, error) {
observeC chan<- raft.SoftState) (*Chain, error) {
lg := opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID)
......@@ -478,12 +478,12 @@ func (c *Chain) serveRequest() {
}
leader = newLeader
}
// notify external observer
select {
case c.observeC <- leader:
default:
}
// notify external observer
select {
case c.observeC <- raft.SoftState{Lead: leader, RaftState: app.soft.RaftState}:
default:
}
}
......
......@@ -89,7 +89,7 @@ var _ = Describe("Chain", func() {
support *consensusmocks.FakeConsenterSupport
cutter *mockblockcutter.Receiver
storage *raft.MemoryStorage
observeC chan uint64
observeC chan raft.SoftState
chain *etcdraft.Chain
dataDir string
walDir string
......@@ -108,7 +108,7 @@ var _ = Describe("Chain", func() {
walDir = path.Join(dataDir, "wal")
snapDir = path.Join(dataDir, "snapshot")
observeC = make(chan uint64, 1)
observeC = make(chan raft.SoftState, 1)
support = &consensusmocks.FakeConsenterSupport{}
support.ChainIDReturns(channelID)
......@@ -150,12 +150,12 @@ var _ = Describe("Chain", func() {
}
})
campaign := func(clock *fakeclock.FakeClock, observeC <-chan uint64) {
campaign := func(clock *fakeclock.FakeClock, observeC <-chan raft.SoftState) {
Eventually(func() bool {
clock.Increment(interval)
select {
case <-observeC:
return true
case s := <-observeC:
return s.RaftState == raft.StateLeader
default:
return false
}
......@@ -1407,10 +1407,10 @@ var _ = Describe("Chain", func() {
// to leader's node list right away. An immediate tick does not trigger a heartbeat
// being sent to node 4. Therefore, we repeatedly tick the leader until node 4 joins
// the cluster successfully.
Eventually(func() <-chan uint64 {
Eventually(func() <-chan raft.SoftState {
c1.clock.Increment(interval)
return c4.observe
}, defaultTimeout).Should(Receive(Equal(uint64(1))))
}, defaultTimeout).Should(Receive(Equal(raft.SoftState{Lead: 1, RaftState: raft.StateFollower})))
Eventually(c4.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1))
Eventually(c4.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(1))
......@@ -1466,10 +1466,10 @@ var _ = Describe("Chain", func() {
// to leader's node list right away. An immediate tick does not trigger a heartbeat
// being sent to node 4. Therefore, we repeatedly tick the leader until node 4 joins
// the cluster successfully.
Eventually(func() <-chan uint64 {
Eventually(func() <-chan raft.SoftState {
c1.clock.Increment(interval)
return c4.observe
}).Should(Receive(Equal(uint64(1))))
}, defaultTimeout).Should(Receive(Equal(raft.SoftState{Lead: 1, RaftState: raft.StateFollower})))
Eventually(c4.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1))
Eventually(c4.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(1))
......@@ -1486,10 +1486,10 @@ var _ = Describe("Chain", func() {
network.connect(2)
// make sure second node see 4th as a leader
Eventually(func() <-chan uint64 {
Eventually(func() <-chan raft.SoftState {
c4.clock.Increment(interval)
return c2.observe
}, defaultTimeout).Should(Receive(Equal(uint64(4))))
}, defaultTimeout).Should(Receive(Equal(raft.SoftState{Lead: 4, RaftState: raft.StateFollower})))
By("submitting new transaction to re-connected node")
c4.cutter.CutNext = true
......@@ -1553,10 +1553,10 @@ var _ = Describe("Chain", func() {
// to leader's node list right away. An immediate tick does not trigger a heartbeat
// being sent to node 4. Therefore, we repeatedly tick the leader until node 4 joins
// the cluster successfully.
Eventually(func() <-chan uint64 {
Eventually(func() <-chan raft.SoftState {
c2.clock.Increment(interval)
return c4.observe
}, LongEventualTimeout).Should(Receive(Equal(uint64(2))))
}, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: 2, RaftState: raft.StateFollower})))
Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c4.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
......@@ -1579,7 +1579,7 @@ var _ = Describe("Chain", func() {
c2.clock.Increment(interval)
// check that former leader didn't get stuck and actually got resign signal,
// and once connected capable of communicating with rest of the replicas set
Eventually(c1.observe, LongEventualTimeout).Should(Receive(Equal(uint64(2))))
Eventually(c1.observe, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: 2, RaftState: raft.StateFollower})))
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})
......@@ -1644,10 +1644,10 @@ var _ = Describe("Chain", func() {
network.elect(2)
By("confirm new node observed a leader node")
Eventually(func() <-chan uint64 {
Eventually(func() <-chan raft.SoftState {
c2.clock.Increment(interval)
return c4.observe
}, LongEventualTimeout).Should(Receive(Equal(uint64(2))))
}, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: 2, RaftState: raft.StateFollower})))
Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c4.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
......@@ -2305,7 +2305,7 @@ var _ = Describe("Chain", func() {
c2.clock.Increment(interval)
// this check guarantees that signal on resignC is consumed in commitBatches method.
Eventually(c1.observe, LongEventualTimeout).Should(Receive(Equal(uint64(2))))
Eventually(c1.observe, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: 2, RaftState: raft.StateFollower})))
})
})
})
......@@ -2395,7 +2395,7 @@ type chain struct {
ledgerHeight uint64
ledgerLock sync.RWMutex
observe chan uint64
observe chan raft.SoftState
unstarted chan struct{}
*etcdraft.Chain
......@@ -2432,7 +2432,7 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,
// upon leader change, lead is reset to 0 before set to actual
// new leader, i.e. 1 -> 0 -> 2. Therefore 2 numbers will be
// sent on this chan, so we need size to be 2
observe := make(chan uint64, 2)
observe := make(chan raft.SoftState, 2)
configurator := &mocks.Configurator{}
configurator.On("Configure", mock.Anything, mock.Anything)
......@@ -2685,7 +2685,7 @@ func (n *network) rejoin(id uint64, wasLeader bool) {
n.chains[n.leader].clock.Increment(interval)
if wasLeader {
Eventually(n.chains[id].observe).Should(Receive(Equal(n.leader)))
Eventually(n.chains[id].observe).Should(Receive(Equal(raft.SoftState{Lead: n.leader, RaftState: raft.StateFollower})))
} else {
Consistently(n.chains[id].observe).ShouldNot(Receive())
}
......@@ -2719,21 +2719,11 @@ func (n *network) elect(id uint64) (tick int) {
select {
case <-time.After(t):
// this tick does not trigger leader change within t, continue
case n := <-c.observe: // leadership change occurs
if n == 0 {
// in etcd/raft, if there's already a leader,
// lead in softstate goes through X -> 0 -> Y.
// therefore, we might observe 0 first. In this
// situation, no more tick is needed because an
// leader election is already underway.
Eventually(c.observe, LongEventualTimeout).Should(Receive(Equal(id)))
} else {
// if there's no leader (fresh cluster), we have 0 -> Y
// therefore we should observe Y directly.
Expect(n).To(Equal(id))
case s := <-c.observe: // leadership change occurs
if s.RaftState == raft.StateLeader {
elected = true
break
}
elected = true
break
}
}
......@@ -2750,7 +2740,7 @@ func (n *network) elect(id uint64) (tick int) {
case <-n.connectivity[c.id]: // skip check if node n is disconnected
case <-c.unstarted: // skip check if node is not started yet
default:
Eventually(c.observe).Should(Receive(Equal(id)))
Eventually(c.observe).Should(Receive(Equal(raft.SoftState{Lead: id, RaftState: raft.StateFollower})))
}
}
n.connLock.RUnlock()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment