Unverified Commit 0276480c authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13438] Errored should reflect correct state



This CR changes Errored to return a channel that is
closed when node becomes candidate.

Change-Id: Ibd0ece763b9d93c4da93825d1b302ecc55a9b32e
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 21a49bad
......@@ -107,6 +107,9 @@ type Chain struct {
startC chan struct{} // Closes when the node is started
snapC chan *raftpb.Snapshot // Signal to catch up with snapshot
errorCLock sync.RWMutex
errorC chan struct{} // returned by Errored()
raftMetadataLock sync.RWMutex
confChangeInProgress *raftpb.ConfChange
justElected bool // this is true when node has just been elected
......@@ -172,6 +175,7 @@ func NewChain(
doneC: make(chan struct{}),
startC: make(chan struct{}),
snapC: make(chan *raftpb.Snapshot),
errorC: make(chan struct{}),
observeC: observeC,
support: support,
fresh: fresh,
......@@ -226,6 +230,7 @@ func (c *Chain) Start() {
c.node.start(c.fresh, c.support.Height() > 1)
close(c.startC)
close(c.errorC)
go c.serveRequest()
}
......@@ -297,7 +302,9 @@ func (c *Chain) WaitReady() error {
// Errored returns a channel that closes when the chain stops.
func (c *Chain) Errored() <-chan struct{} {
return c.doneC
c.errorCLock.RLock()
defer c.errorCLock.RUnlock()
return c.errorC
}
// Halt stops the chain.
......@@ -374,6 +381,10 @@ type apply struct {
soft *raft.SoftState
}
func isCandidate(state raft.StateType) bool {
return state == raft.StatePreCandidate || state == raft.StateCandidate
}
func (c *Chain) serveRequest() {
ticking := false
timer := c.clock.NewTimer(time.Second)
......@@ -483,6 +494,23 @@ func (c *Chain) serveRequest() {
}
}
foundLeader := soft.Lead == raft.None && newLeader != raft.None
quitCandidate := isCandidate(soft.RaftState) && !isCandidate(app.soft.RaftState)
if foundLeader || quitCandidate {
c.errorCLock.Lock()
c.errorC = make(chan struct{})
c.errorCLock.Unlock()
}
if isCandidate(app.soft.RaftState) || newLeader == raft.None {
select {
case <-c.errorC:
default:
close(c.errorC)
}
}
soft = raft.SoftState{Lead: newLeader, RaftState: app.soft.RaftState}
// notify external observer
......@@ -546,6 +574,12 @@ func (c *Chain) serveRequest() {
}
case <-c.doneC:
select {
case <-c.errorC: // avoid closing closed channel
default:
close(c.errorC)
}
c.logger.Infof("Stop serving requests")
return
}
......
......@@ -356,9 +356,10 @@ var _ = Describe("Chain", func() {
})
It("unblocks Errored if chain is halted", func() {
Expect(chain.Errored()).NotTo(Receive())
errorC := chain.Errored()
Expect(errorC).NotTo(BeClosed())
chain.Halt()
Expect(chain.Errored()).Should(BeClosed())
Eventually(errorC).Should(BeClosed())
})
Describe("Config updates", func() {
......@@ -1738,6 +1739,7 @@ var _ = Describe("Chain", func() {
// Assert c1 has exited
Eventually(c1.Errored, LongEventualTimeout).Should(BeClosed())
close(c1.stopped)
By("making sure remaining two nodes will elect new leader")
......@@ -1832,14 +1834,24 @@ var _ = Describe("Chain", func() {
It("should return error when receiving an env", func() {
network.disconnect(2)
errorC := c2.Errored()
Consistently(errorC).ShouldNot(BeClosed()) // assert that errorC is not closed
By("Ticking node 2 until it becomes pre-candidate")
Eventually(func() <-chan raft.SoftState {
c2.clock.Increment(interval)
return c2.observe
}, LongEventualTimeout).Should(Receive(Equal(raft.SoftState{Lead: 1, RaftState: raft.StatePreCandidate})))
Eventually(errorC).Should(BeClosed())
err := c2.Order(env, 0)
Expect(err).To(HaveOccurred())
network.connect(2)
c1.clock.Increment(interval)
Expect(errorC).To(BeClosed())
Eventually(c2.Errored).ShouldNot(BeClosed())
})
})
......@@ -2412,6 +2424,7 @@ type chain struct {
observe chan raft.SoftState
unstarted chan struct{}
stopped chan struct{}
*etcdraft.Chain
}
......@@ -2467,6 +2480,7 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,
clock: clock,
opts: opts,
unstarted: ch,
stopped: make(chan struct{}),
configurator: configurator,
puller: puller,
ledger: map[uint64]*common.Block{
......@@ -2673,9 +2687,15 @@ func (n *network) stop(ids ...uint64) {
}
}
for _, c := range nodes {
n.chains[c].Halt()
<-n.chains[c].Errored()
for _, id := range nodes {
c := n.chains[id]
c.Halt()
<-c.Errored()
select {
case <-c.stopped:
default:
close(c.stopped)
}
}
}
......@@ -2751,7 +2771,7 @@ func (n *network) elect(id uint64) (tick int) {
}
select {
case <-c.Errored(): // skip if node is exit
case <-c.stopped: // skip check if node n is stopped
case <-n.connectivity[c.id]: // skip check if node n is disconnected
case <-c.unstarted: // skip check if node is not started yet
default:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment