Commit c4432943 authored by Jay Guo's avatar Jay Guo
Browse files

FAB-14764 halt with delay if it's removed as leader



If leader halts too quickly, followers might not be able to pick up
the config change soon enough. This CR introduces a delay before
halting the chain on leader.

Change-Id: I384c06001bf31b2816d463e06bf6199a77a64f25
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 8735e589
......@@ -381,7 +381,7 @@ func findLeader(ordererRunners []*ginkgomon.Runner) int {
wg.Add(len(ordererRunners))
findLeader := func(runner *ginkgomon.Runner) int {
Eventually(runner.Err(), time.Minute, time.Second).Should(gbytes.Say("Raft leader changed: 0 -> "))
Eventually(runner.Err(), time.Minute, time.Second).Should(gbytes.Say("Raft leader changed: [0-9] -> "))
idBuff := make([]byte, 1)
runner.Err().Read(idBuff)
......@@ -398,8 +398,13 @@ func findLeader(ordererRunners []*ginkgomon.Runner) int {
defer GinkgoRecover()
defer wg.Done()
leader := findLeader(runner)
leaders <- leader
for {
leader := findLeader(runner)
if leader != 0 {
leaders <- leader
break
}
}
}(runner)
}
......
......@@ -684,27 +684,56 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
orderers := []*nwo.Orderer{o1, o2, o3}
By("Waiting for them to elect a leader")
evictedNode := findLeader(ordererRunners) - 1
firstEvictedNode := findLeader(ordererRunners) - 1
By("Removing the leader from system channel")
serverCertBytes, err := ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(network.Orderers[evictedNode]), "server.crt"))
By("Removing the leader from 3-node channel")
serverCertBytes, err := ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(orderers[firstEvictedNode]), "server.crt"))
Expect(err).To(Not(HaveOccurred()))
By("Removing the leader from both system channel and application channel")
nwo.RemoveConsenter(network, peer, network.Orderers[(evictedNode+1)%3], "systemchannel", serverCertBytes)
nwo.RemoveConsenter(network, peer, network.Orderers[(firstEvictedNode+1)%3], "systemchannel", serverCertBytes)
fmt.Fprintln(GinkgoWriter, "Ensuring the other orderers detect the eviction of the node on channel", "systemchannel")
Eventually(ordererRunners[(evictedNode+1)%3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Deactivated node"))
Eventually(ordererRunners[(evictedNode+2)%3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Deactivated node"))
Eventually(ordererRunners[(firstEvictedNode+1)%3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Deactivated node"))
Eventually(ordererRunners[(firstEvictedNode+2)%3].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say("Deactivated node"))
fmt.Fprintln(GinkgoWriter, "Ensuring the evicted orderer stops rafting on channel", "systemchannel")
stopMSg := fmt.Sprintf("Raft node stopped channel=%s", "systemchannel")
Eventually(ordererRunners[evictedNode].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say(stopMSg))
Eventually(ordererRunners[firstEvictedNode].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say(stopMSg))
By("Ensuring the evicted orderer now doesn't serve clients")
ensureEvicted(orderers[evictedNode], peer, network, "systemchannel")
ensureEvicted(orderers[firstEvictedNode], peer, network, "systemchannel")
By("Ensuring that all orderers don't log errors to the log")
assertNoErrorsAreLogged(ordererRunners)
var survivedOrdererRunners []*ginkgomon.Runner
for i := range orderers {
if i == firstEvictedNode {
continue
}
survivedOrdererRunners = append(survivedOrdererRunners, ordererRunners[i])
}
secondEvictedNode := findLeader(survivedOrdererRunners) - 1
var surviver int
for i := range orderers {
if i != firstEvictedNode && i != secondEvictedNode {
surviver = i
break
}
}
By("Removing the leader from 2-node channel")
serverCertBytes, err = ioutil.ReadFile(filepath.Join(network.OrdererLocalTLSDir(orderers[secondEvictedNode]), "server.crt"))
Expect(err).To(Not(HaveOccurred()))
nwo.RemoveConsenter(network, peer, orderers[surviver], "systemchannel", serverCertBytes)
fmt.Fprintln(GinkgoWriter, "Ensuring the other orderer detect the eviction of the node on channel", "systemchannel")
Eventually(ordererRunners[secondEvictedNode].Err(), network.EventuallyTimeout, time.Second).Should(gbytes.Say(stopMSg))
By("Ensuring the evicted orderer now doesn't serve clients")
ensureEvicted(orderers[secondEvictedNode], peer, network, "systemchannel")
By("Asserting the only remaining node elects itself")
findLeader([]*ginkgomon.Runner{ordererRunners[surviver]})
})
It("notices it even if it is down at the time of its eviction", func() {
......
......@@ -1071,7 +1071,21 @@ func (c *Chain) apply(ents []raftpb.Entry) {
c.logger.Infof("Current node removed from replica set for channel %s", c.channelID)
// calling goroutine, since otherwise it will be blocked
// trying to write into haltC
go c.Halt()
lead := atomic.LoadUint64(&c.lastKnownLeader)
if lead == c.raftID {
c.logger.Info("This node is being removed as current leader, halt with delay")
c.configInflight = true // toggle the flag so this node does not accept further tx
go func() {
select {
case <-c.clock.After(time.Duration(c.opts.ElectionTick) * c.opts.TickInterval):
case <-c.doneC:
}
c.Halt()
}()
} else {
go c.Halt()
}
}
}
......@@ -1177,11 +1191,11 @@ func (c *Chain) checkConsentersSet(updatedMetadata *etcdraft.ConfigMetadata) err
// sanity check of certificates
for _, consenter := range updatedMetadata.Consenters {
if bl, _ := pem.Decode(consenter.ServerTlsCert); bl == nil {
return errors.Errorf("Invalid server TLS cert: %s", string(consenter.ServerTlsCert))
return errors.Errorf("invalid server TLS cert: %s", string(consenter.ServerTlsCert))
}
if bl, _ := pem.Decode(consenter.ClientTlsCert); bl == nil {
return errors.Errorf("Invalid client TLS cert: %s", string(consenter.ClientTlsCert))
return errors.Errorf("invalid client TLS cert: %s", string(consenter.ClientTlsCert))
}
}
......
......@@ -665,7 +665,7 @@ var _ = Describe("Chain", func() {
It("should fail, since consenters set change is not supported", func() {
err := chain.Configure(configEnv, configSeq)
Expect(err).To(MatchError("update of more than one consenter at a time is not supported, requested changes: add 3 node(s), remove 1 node(s)"))
Expect(err).To(MatchError(ContainSubstring("update of more than one consenter at a time is not supported, requested changes: add 3 node(s), remove 1 node(s)")))
Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1))
Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Expect(fakeFields.fakeProposalFailures.AddCallCount()).To(Equal(1))
......@@ -1334,10 +1334,113 @@ var _ = Describe("Chain", func() {
})
})
})
})
Describe("2-node Raft cluster", func() {
var (
network *network
channelID string
timeout time.Duration
dataDir string
c1, c2 *chain
raftMetadata *raftprotos.BlockMetadata
consenters map[uint64]*raftprotos.Consenter
configEnv *common.Envelope
)
BeforeEach(func() {
var err error
channelID = "multi-node-channel"
timeout = 10 * time.Second
dataDir, err = ioutil.TempDir("", "raft-test-")
Expect(err).NotTo(HaveOccurred())
raftMetadata = &raftprotos.BlockMetadata{
ConsenterIds: []uint64{1, 2},
NextConsenterId: 3,
}
consenters = map[uint64]*raftprotos.Consenter{
1: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
2: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
}
metadata := &raftprotos.ConfigMetadata{Consenters: []*raftprotos.Consenter{consenters[2]}}
value := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
// prepare config update to remove 1
configEnv = newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, nil, value))
network = createNetwork(timeout, channelID, dataDir, raftMetadata, consenters)
c1, c2 = network.chains[1], network.chains[2]
c1.cutter.CutNext = true
network.init()
network.start()
})
AfterEach(func() {
network.stop()
os.RemoveAll(dataDir)
})
It("can remove leader by reconfiguring cluster", func() {
network.elect(1)
Expect(c1.Configure(configEnv, 0)).To(Succeed())
network.exec(func(c *chain) {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
})
Consistently(c1.Chain.Errored).ShouldNot(BeClosed())
c1.clock.WaitForNWatchersAndIncrement(ELECTION_TICK*interval, 2)
Eventually(c1.Chain.Errored, LongEventualTimeout).Should(BeClosed())
close(c1.stopped) // mark c1 stopped in network
By("Electing 2 as new leader")
network.elect(2)
By("Asserting leader can still serve requests as single-node cluster")
c2.cutter.CutNext = true
Expect(c2.Order(env, 0)).To(Succeed())
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
It("can remove follower by reconfiguring cluster", func() {
network.elect(2)
Expect(c1.Configure(configEnv, 0)).To(Succeed())
network.exec(func(c *chain) {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
})
Eventually(c1.Chain.Errored, LongEventualTimeout).Should(BeClosed())
By("Asserting leader can still serve requests as single-node cluster")
c2.cutter.CutNext = true
Expect(c2.Order(env, 0)).To(Succeed())
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
})
Describe("Multiple Raft nodes", func() {
Describe("3-node Raft cluster", func() {
var (
network *network
channelID string
......@@ -1572,7 +1675,7 @@ var _ = Describe("Chain", func() {
By("sending config transaction")
err := c1.Configure(configEnv, 0)
Expect(err).To(MatchError("update of more than one consenter at a time is not supported, requested changes: add 0 node(s), remove 2 node(s)"))
Expect(err).To(MatchError(ContainSubstring("update of more than one consenter at a time is not supported, requested changes: add 0 node(s), remove 2 node(s)")))
})
It("rejects invalid certificates", func() {
......@@ -1595,7 +1698,7 @@ var _ = Describe("Chain", func() {
c1.cutter.CutNext = true
By("sending config transaction")
Expect(c1.Configure(configEnv, 0)).To(MatchError("Invalid server TLS cert: hello"))
Expect(c1.Configure(configEnv, 0)).To(MatchError("invalid server TLS cert: hello"))
})
It("can rotate certificate by adding and removing 1 node in one config update", func() {
......@@ -2250,6 +2353,7 @@ var _ = Describe("Chain", func() {
})
// Assert c1 has exited
c1.clock.WaitForNWatchersAndIncrement(ELECTION_TICK*interval, 2)
Eventually(c1.Errored, LongEventualTimeout).Should(BeClosed())
close(c1.stopped)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment