Unverified Commit 07b7309c authored by Artem Barger's avatar Artem Barger
Browse files

[FAB-12945] add raft reconfiguration unit-tests



Change-Id: Ib77c866a30ed5108ad53908b0ca25a60a89e9a7c
Signed-off-by: default avatarArtem Barger <bartem@il.ibm.com>
parent c0450096
......@@ -743,6 +743,13 @@ func (c *Chain) apply(ents []raftpb.Entry) {
// set flag back
c.configChangeInProgress = false
}
if cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID {
c.logger.Infof("Current node removed from replica set for channel %s", c.channelID)
// calling goroutine, since otherwise it will be blocked
// trying to write into haltC
go c.Halt()
}
}
if ents[i].Index > c.appliedIndex {
......
......@@ -1265,6 +1265,24 @@ var _ = Describe("Chain", func() {
}
metadata.Consenters = append(metadata.Consenters, newConsenter)
return map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
}
removeConsenterConfigValue = func(id uint64) map[string]*common.ConfigValue {
newRaftMetadata := proto.Clone(raftMetadata).(*raftprotos.RaftMetadata)
delete(newRaftMetadata.Consenters, id)
metadata := &raftprotos.Metadata{}
for _, consenter := range newRaftMetadata.Consenters {
metadata.Consenters = append(metadata.Consenters, consenter)
}
return map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
......@@ -1290,7 +1308,7 @@ var _ = Describe("Chain", func() {
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteBlockCallCount() }, defaultTimeout).Should(Equal(1))
Eventually(c.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1))
})
})
......@@ -1328,7 +1346,7 @@ var _ = Describe("Chain", func() {
},
}
By("adding new consenter into configuration")
By("creating new configuration with removed node and new one")
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, value))
c1.cutter.CutNext = true
......@@ -1345,8 +1363,8 @@ var _ = Describe("Chain", func() {
c4.init()
By("adding new node to the network")
Eventually(func() int { return c4.support.WriteBlockCallCount() }, defaultTimeout).Should(Equal(0))
Eventually(func() int { return c4.support.WriteConfigBlockCallCount() }, defaultTimeout).Should(Equal(0))
Expect(c4.support.WriteBlockCallCount()).Should(Equal(0))
Expect(c4.support.WriteConfigBlockCallCount()).Should(Equal(0))
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, addConsenterConfigValue()))
c1.cutter.CutNext = true
......@@ -1436,8 +1454,6 @@ var _ = Describe("Chain", func() {
err = c4.Order(env, 0)
Expect(err).ToNot(HaveOccurred())
c1.clock.Increment(interval)
// elect newly added node to be the leader
network.elect(4)
......@@ -1525,15 +1541,13 @@ var _ = Describe("Chain", func() {
err = c4.Order(env, 0)
Expect(err).ToNot(HaveOccurred())
c2.clock.Increment(interval)
// rest nodes are alive include a newly added, hence should write 2 blocks
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c4.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
// node 1 has been stopped should not write any block
Consistently(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Consistently(c1.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1))
network.connect(1)
......@@ -1543,6 +1557,108 @@ var _ = Describe("Chain", func() {
Eventually(c1.observe, LongEventualTimeout).Should(Receive(Equal(uint64(2))))
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
})
It("ensures that despite leader failure cluster continue to process configuration to remove the leader", func() {
// Scenario: Starting replica set of 3 nodes, electing nodeID = 1 to be the leader.
// Prepare config update transaction which removes leader (nodeID = 1), then leader
// fails right after it commits configuration block.
configEnv := newConfigEnv(channelID,
common.HeaderType_CONFIG,
newConfigUpdateEnv(channelID, removeConsenterConfigValue(1))) // remove nodeID == 1
c1.cutter.CutNext = true
configBlock := &common.Block{
Header: &common.BlockHeader{},
Data: &common.BlockData{Data: [][]byte{marshalOrPanic(configEnv)}}}
c1.support.WriteConfigBlockStub = func(_ *common.Block, _ []byte) {
// disconnect leader after block being committed
network.disconnect(1)
}
// mock Block method to return recent configuration block
c2.support.BlockReturns(configBlock)
By("sending config transaction")
err := c1.Configure(configEnv, 0)
Expect(err).ToNot(HaveOccurred())
// every node has written config block to the OSN ledger
network.exec(
func(c *chain) {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
// electing new leader
network.elect(2)
By("submitting new transaction to follower")
c2.cutter.CutNext = true
err = c3.Order(env, 0)
Expect(err).ToNot(HaveOccurred())
// rest nodes are alive include a newly added, hence should write 2 blocks
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
// node 1 has been removed from replica set should not be getting any blocks
Consistently(c1.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1))
})
It("removes leader from replica set", func() {
// Scenario: Starting replica set of 3 nodes, electing nodeID = 1 to be the leader.
// Prepare config update transaction which removes leader (nodeID = 1), this to
// ensure we handle re-configuration of node removal correctly and remaining two
// nodes still capable to form functional quorum and Raft capable of making further progress.
// Moreover test asserts that removed node stops Rafting with rest of the cluster, i.e.
// should not be able to get updates or forward transactions.
configEnv := newConfigEnv(channelID,
common.HeaderType_CONFIG,
newConfigUpdateEnv(channelID, removeConsenterConfigValue(1))) // remove nodeID == 1
c1.cutter.CutNext = true
c1.support.WriteConfigBlockStub = func(configBlock *common.Block, _ []byte) {
// make sure c2 will get recent configuration block
c2.support.BlockReturns(configBlock)
}
By("sending config transaction")
err := c1.Configure(configEnv, 0)
Expect(err).ToNot(HaveOccurred())
// every node has written config block to the OSN ledger
network.exec(
func(c *chain) {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
By("by making sure remaining two nodes will elect new leader")
// deterministically select nodeID == 2 to be a leader
network.elect(2)
By("submitting transaction to new leader")
c2.cutter.CutNext = true
err = c2.Order(env, 0)
Expect(err).ToNot(HaveOccurred())
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
// node 1 has been stopped should not write any block
Consistently(c1.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1))
By("trying to submit to new node, expected to fail")
c1.cutter.CutNext = true
err = c1.Order(env, 0)
Expect(err).To(HaveOccurred())
// number of block writes should remain the same
Consistently(c2.support.WriteBlockCallCount, defaultTimeout).Should(Equal(2))
Consistently(c3.support.WriteBlockCallCount, defaultTimeout).Should(Equal(2))
Consistently(c1.support.WriteBlockCallCount, defaultTimeout).Should(Equal(1))
})
})
})
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment