Unverified Commit 2755580a authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13456] Use empty peer list to join raft cluster



When joining a fresh node to existing etcdraft cluster, it
should be using empty peer list to call `StartNode`.

Change-Id: Ib6acf6fd9b2956680c99d5d7370ce439228d3bfa
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 48bd0ee9
......@@ -155,12 +155,7 @@ func NewChain(
lg := opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID)
height := support.Height()
fresh := !wal.Exist(opts.WALDir)
if fresh && height > 1 {
lg.Infof("Etcdraft chain is booted as fresh raft node with existing ledger at block %d", height-1)
}
storage, err := CreateStorage(lg, opts.WALDir, opts.SnapDir, opts.MemoryStorage)
if err != nil {
return nil, errors.Errorf("failed to restore persisted raft data: %s", err)
......@@ -179,7 +174,7 @@ func NewChain(
snapBlkNum = b.Header.Number
}
lastBlock := support.Block(height - 1)
lastBlock := support.Block(support.Height() - 1)
return &Chain{
configurator: conf,
......@@ -238,10 +233,15 @@ func (c *Chain) Start() {
raftPeers := RaftPeers(c.opts.RaftMetadata.Consenters)
if c.fresh {
c.logger.Info("starting new raft node")
if c.support.Height() > 1 {
raftPeers = nil
c.logger.Info("Starting raft node to join an existing channel")
} else {
c.logger.Info("Starting raft node as part of a new channel")
}
c.node = raft.StartNode(config, raftPeers)
} else {
c.logger.Info("restarting raft node")
c.logger.Info("Restarting raft node")
c.node = raft.RestartNode(config)
}
......
......@@ -1373,16 +1373,6 @@ var _ = Describe("Chain", func() {
})
It("adding node to the cluster", func() {
c4 := newChain(timeout, channelID, dataDir, 4, &raftprotos.RaftMetadata{
Consenters: map[uint64]*raftprotos.Consenter{},
})
c4.init()
By("adding new node to the network")
Expect(c4.support.WriteBlockCallCount()).Should(Equal(0))
Expect(c4.support.WriteConfigBlockCallCount()).Should(Equal(0))
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, addConsenterConfigValue()))
c1.cutter.CutNext = true
......@@ -1394,6 +1384,19 @@ var _ = Describe("Chain", func() {
Eventually(c.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(1))
})
_, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0)
meta := &common.Metadata{Value: raftmetabytes}
raftmeta, err := etcdraft.ReadRaftMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4.init()
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0))
network.addChain(c4)
c4.Start()
......@@ -1428,15 +1431,6 @@ var _ = Describe("Chain", func() {
// disconnect second node
network.disconnect(2)
c4 := newChain(timeout, channelID, dataDir, 4, &raftprotos.RaftMetadata{
Consenters: map[uint64]*raftprotos.Consenter{},
})
c4.init()
By("adding new node to the network")
Eventually(c4.support.WriteBlockCallCount, defaultTimeout).Should(Equal(0))
Eventually(c4.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(0))
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, addConsenterConfigValue()))
c1.cutter.CutNext = true
......@@ -1449,6 +1443,19 @@ var _ = Describe("Chain", func() {
Eventually(c2.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(0))
Eventually(c3.support.WriteConfigBlockCallCount, defaultTimeout).Should(Equal(1))
_, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0)
meta := &common.Metadata{Value: raftmetabytes}
raftmeta, err := etcdraft.ReadRaftMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4.init()
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0))
network.addChain(c4)
c4.Start()
......@@ -1502,15 +1509,6 @@ var _ = Describe("Chain", func() {
// re-configuration. Later we connecting c1 back and making sure it capable of catching up with
// new configuration and successfully rejoins replica set.
c4 := newChain(timeout, channelID, dataDir, 4, &raftprotos.RaftMetadata{
Consenters: map[uint64]*raftprotos.Consenter{},
})
c4.init()
By("adding new node to the network")
Expect(c4.support.WriteBlockCallCount()).Should(Equal(0))
Expect(c4.support.WriteConfigBlockCallCount()).Should(Equal(0))
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, addConsenterConfigValue()))
c1.cutter.CutNext = true
......@@ -1533,6 +1531,19 @@ var _ = Describe("Chain", func() {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
_, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0)
meta := &common.Metadata{Value: raftmetabytes}
raftmeta, err := etcdraft.ReadRaftMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4.init()
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0))
network.addChain(c4)
c4.Start()
// ConfChange is applied to etcd/raft asynchronously, meaning node 4 is not added
......@@ -1574,15 +1585,6 @@ var _ = Describe("Chain", func() {
// configure chain support mock to stop cluster after config block is committed.
// Restart the cluster and ensure it picks up updates and capable to finish reconfiguration.
c4 := newChain(timeout, channelID, dataDir, 4, &raftprotos.RaftMetadata{
Consenters: map[uint64]*raftprotos.Consenter{},
})
c4.init()
By("adding new node to the network")
Expect(c4.support.WriteBlockCallCount()).Should(Equal(0))
Expect(c4.support.WriteConfigBlockCallCount()).Should(Equal(0))
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, addConsenterConfigValue()))
c1.cutter.CutNext = true
......@@ -1613,6 +1615,19 @@ var _ = Describe("Chain", func() {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
})
_, raftmetabytes := c1.support.WriteConfigBlockArgsForCall(0)
meta := &common.Metadata{Value: raftmetabytes}
raftmeta, err := etcdraft.ReadRaftMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4.init()
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
c4.support.WriteConfigBlock(c1.support.WriteConfigBlockArgsForCall(0))
network.addChain(c4)
By("reconnecting nodes back")
......
......@@ -117,7 +117,7 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
// In case chain has been restarted we restore raft metadata
// information from the recently committed block meta data
// field.
raftMetadata, err := readRaftMetadata(metadata, m)
raftMetadata, err := ReadRaftMetadata(metadata, m)
if err != nil {
return nil, errors.Wrapf(err, "failed to read Raft metadata")
}
......@@ -159,7 +159,9 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
return NewChain(support, opts, c.Communication, rpc, bp, nil)
}
func readRaftMetadata(blockMetadata *common.Metadata, configMetadata *etcdraft.Metadata) (*etcdraft.RaftMetadata, error) {
// ReadRaftMetadata attempts to read raft metadata from block metadata, if available.
// otherwise, it reads raft metadata from config metadata supplied.
func ReadRaftMetadata(blockMetadata *common.Metadata, configMetadata *etcdraft.Metadata) (*etcdraft.RaftMetadata, error) {
m := &etcdraft.RaftMetadata{
Consenters: map[uint64]*etcdraft.Consenter{},
NextConsenterId: 1,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment