Commit 343957a8 authored by Artem Barger's avatar Artem Barger Committed by Gerrit Code Review
Browse files

Merge "FAB-14618 Store only nodeIDs in metadata" into release-1.4

parents 00df45f9 9a124d65
......@@ -112,9 +112,13 @@ type Options struct {
MaxSizePerMsg uint64
MaxInflightMsgs int
// BlockMetdata and Consenters should only be modified while under lock
// of raftMetadataLock
BlockMetadata *etcdraft.BlockMetadata
Metrics *Metrics
Cert []byte
Consenters map[uint64]*etcdraft.Consenter
Metrics *Metrics
Cert []byte
EvictionSuspicion time.Duration
LeaderCheckInterval time.Duration
......@@ -313,7 +317,7 @@ func (c *Chain) MigrationStatus() migration.Status {
func (c *Chain) Start() {
c.logger.Infof("Starting Raft node")
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.Consenters)))
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
// all nodes start out as followers
c.Metrics.IsLeader.Set(float64(0))
if err := c.configureComm(); err != nil {
......@@ -719,7 +723,7 @@ func (c *Chain) serveRequest() {
select {
case <-c.errorC:
default:
nodeCount := len(c.opts.BlockMetadata.Consenters)
nodeCount := len(c.opts.BlockMetadata.ConsenterIds)
// Only close the error channel (to signal the broadcast/deliver front-end a consensus backend error)
// If we are a cluster of size 3 or more, otherwise we can't expand a cluster of size 1 to 2 nodes.
if nodeCount > 2 {
......@@ -945,6 +949,7 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
c.raftMetadataLock.Lock()
c.opts.BlockMetadata = configMembership.NewBlockMetadata
c.opts.Consenters = configMembership.NewConsenters
c.raftMetadataLock.Unlock()
if err := c.configureComm(); err != nil {
......@@ -978,7 +983,7 @@ func (c *Chain) detectConfChange(block *common.Block) *MembershipChanges {
c.logger.Infof("Snapshot interval is updated to %d bytes (was %d)", c.sizeLimit, old)
}
changes, err := ComputeMembershipChanges(c.opts.BlockMetadata, configMetadata.Consenters)
changes, err := ComputeMembershipChanges(c.opts.BlockMetadata, c.opts.Consenters, configMetadata.Consenters)
if err != nil {
c.logger.Panicf("illegal configuration change detected: %s", err)
}
......@@ -1054,7 +1059,7 @@ func (c *Chain) apply(ents []raftpb.Entry) {
c.confChangeInProgress = nil
c.configInflight = false
// report the new cluster size
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.Consenters)))
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
}
if cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID {
......@@ -1130,7 +1135,7 @@ func (c *Chain) configureComm() error {
func (c *Chain) remotePeers() ([]cluster.RemoteNode, error) {
var nodes []cluster.RemoteNode
for raftID, consenter := range c.opts.BlockMetadata.Consenters {
for raftID, consenter := range c.opts.Consenters {
// No need to know yourself
if raftID == c.raftID {
continue
......@@ -1175,7 +1180,7 @@ func (c *Chain) checkConsentersSet(configValue *common.ConfigValue) error {
}
c.raftMetadataLock.RLock()
_, err = ComputeMembershipChanges(c.opts.BlockMetadata, updatedMetadata.Consenters)
_, err = ComputeMembershipChanges(c.opts.BlockMetadata, c.opts.Consenters, updatedMetadata.Consenters)
c.raftMetadataLock.RUnlock()
return err
......@@ -1196,11 +1201,12 @@ func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
case common.HeaderType_CONFIG:
configMembership := c.detectConfChange(block)
c.opts.BlockMetadata.RaftIndex = index
c.raftMetadataLock.Lock()
if configMembership != nil {
c.opts.BlockMetadata = configMembership.NewBlockMetadata
c.opts.Consenters = configMembership.NewConsenters
}
c.opts.BlockMetadata.RaftIndex = index
c.raftMetadataLock.Unlock()
blockMetadataBytes := utils.MarshalOrPanic(c.opts.BlockMetadata)
......@@ -1284,7 +1290,7 @@ func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
// extracting current Raft configuration state
confState := c.Node.ApplyConfChange(raftpb.ConfChange{})
if len(confState.Nodes) == len(c.opts.BlockMetadata.Consenters) {
if len(confState.Nodes) == len(c.opts.BlockMetadata.ConsenterIds) {
// since configuration change could only add one node or
// remove one node at a time, if raft nodes state size
// equal to membership stored in block metadata field,
......
......@@ -92,6 +92,7 @@ var _ = Describe("Chain", func() {
var (
configurator *mocks.Configurator
consenterMetadata *raftprotos.ConfigMetadata
consenters map[uint64]*raftprotos.Consenter
clock *fakeclock.FakeClock
opts etcdraft.Options
support *consensusmocks.FakeConsenterSupport
......@@ -134,15 +135,20 @@ var _ = Describe("Chain", func() {
support.BlockReturns(getSeedBlock())
meta := &raftprotos.BlockMetadata{
Consenters: map[uint64]*raftprotos.Consenter{},
ConsenterIds: make([]uint64, len(consenterMetadata.Consenters)),
NextConsenterId: 1,
}
for _, c := range consenterMetadata.Consenters {
meta.Consenters[meta.NextConsenterId] = c
for i := range meta.ConsenterIds {
meta.ConsenterIds[i] = meta.NextConsenterId
meta.NextConsenterId++
}
consenters = map[uint64]*raftprotos.Consenter{}
for i, c := range consenterMetadata.Consenters {
consenters[meta.ConsenterIds[i]] = c
}
fakeFields = newFakeMetricsFields()
opts = etcdraft.Options{
......@@ -154,6 +160,7 @@ var _ = Describe("Chain", func() {
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
BlockMetadata: meta,
Consenters: consenters,
Logger: logger,
MemoryStorage: storage,
WALDir: walDir,
......@@ -715,17 +722,8 @@ var _ = Describe("Chain", func() {
)
BeforeEach(func() {
tlsCA, _ := tlsgen.NewCA()
raftMetadata = &raftprotos.BlockMetadata{
Consenters: map[uint64]*raftprotos.Consenter{
1: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
},
ConsenterIds: []uint64{1},
NextConsenterId: 2,
}
})
......@@ -762,7 +760,7 @@ var _ = Describe("Chain", func() {
})
It("replays blocks from committed entries", func() {
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
c.init()
c.Start()
defer c.Halt()
......@@ -792,7 +790,7 @@ var _ = Describe("Chain", func() {
It("only replays blocks after Applied index", func() {
raftMetadata.RaftIndex = m1.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
c.support.WriteBlock(support.WriteBlockArgsForCall(0))
c.init()
......@@ -818,7 +816,7 @@ var _ = Describe("Chain", func() {
It("does not replay any block if already in sync", func() {
raftMetadata.RaftIndex = m2.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
c.init()
c.Start()
defer c.Halt()
......@@ -950,7 +948,7 @@ var _ = Describe("Chain", func() {
chain.Halt()
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
c.init()
signal := make(chan struct{})
......@@ -1013,7 +1011,7 @@ var _ = Describe("Chain", func() {
chain.Halt()
raftMetadata.RaftIndex = m.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
c.opts.SnapInterval = 1
c.init()
......@@ -1042,7 +1040,7 @@ var _ = Describe("Chain", func() {
m = &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
raftMetadata.RaftIndex = m.RaftIndex
cx := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
cx := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
cx.init()
cx.Start()
......@@ -1106,7 +1104,7 @@ var _ = Describe("Chain", func() {
chain.Halt()
raftMetadata.RaftIndex = m.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
cnt := support.WriteBlockCallCount()
for i := 0; i < cnt; i++ {
c.support.WriteBlock(support.WriteBlockArgsForCall(i))
......@@ -1164,7 +1162,7 @@ var _ = Describe("Chain", func() {
chain.Halt()
raftMetadata.RaftIndex = m.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
// replay block 1&2
c.support.WriteBlock(support.WriteBlockArgsForCall(0))
c.support.WriteBlock(support.WriteBlockArgsForCall(1))
......@@ -1299,6 +1297,7 @@ var _ = Describe("Chain", func() {
dataDir string
c1, c2, c3 *chain
raftMetadata *raftprotos.BlockMetadata
consenters map[uint64]*raftprotos.Consenter
)
BeforeEach(func() {
......@@ -1311,30 +1310,32 @@ var _ = Describe("Chain", func() {
Expect(err).NotTo(HaveOccurred())
raftMetadata = &raftprotos.BlockMetadata{
Consenters: map[uint64]*raftprotos.Consenter{
1: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
2: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
3: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
},
ConsenterIds: []uint64{1, 2, 3},
NextConsenterId: 4,
}
network = createNetwork(timeout, channelID, dataDir, raftMetadata)
consenters = map[uint64]*raftprotos.Consenter{
1: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
2: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
3: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
}
network = createNetwork(timeout, channelID, dataDir, raftMetadata, consenters)
c1 = network.chains[1]
c2 = network.chains[2]
c3 = network.chains[3]
......@@ -1429,7 +1430,7 @@ var _ = Describe("Chain", func() {
var (
addConsenterConfigValue = func() map[string]*common.ConfigValue {
metadata := &raftprotos.ConfigMetadata{}
for _, consenter := range raftMetadata.Consenters {
for _, consenter := range consenters {
metadata.Consenters = append(metadata.Consenters, consenter)
}
......@@ -1451,11 +1452,11 @@ var _ = Describe("Chain", func() {
}
}
removeConsenterConfigValue = func(id uint64) map[string]*common.ConfigValue {
newRaftMetadata := proto.Clone(raftMetadata).(*raftprotos.BlockMetadata)
delete(newRaftMetadata.Consenters, id)
metadata := &raftprotos.ConfigMetadata{}
for _, consenter := range newRaftMetadata.Consenters {
for nodeID, consenter := range consenters {
if nodeID == id {
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
......@@ -1499,13 +1500,12 @@ var _ = Describe("Chain", func() {
Context("reconfiguration", func() {
It("cannot change consenter set by more than 1 node", func() {
updatedRaftMetadata := proto.Clone(raftMetadata).(*raftprotos.BlockMetadata)
// remove second & third consenter
delete(updatedRaftMetadata.Consenters, 2)
delete(updatedRaftMetadata.Consenters, 3)
metadata := &raftprotos.ConfigMetadata{}
for _, consenter := range updatedRaftMetadata.Consenters {
for id, consenter := range consenters {
if id == 2 || id == 3 {
// remove second & third consenter
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
......@@ -1528,12 +1528,12 @@ var _ = Describe("Chain", func() {
})
It("can rotate certificate by adding and removing 1 node in one config update", func() {
updatedRaftMetadata := proto.Clone(raftMetadata).(*raftprotos.BlockMetadata)
// remove second consenter
delete(updatedRaftMetadata.Consenters, 2)
metadata := &raftprotos.ConfigMetadata{}
for _, consenter := range updatedRaftMetadata.Consenters {
for id, consenter := range consenters {
if id == 2 {
// remove second consenter
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
......@@ -1612,7 +1612,7 @@ var _ = Describe("Chain", func() {
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
......@@ -1729,7 +1729,7 @@ var _ = Describe("Chain", func() {
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
......@@ -1820,7 +1820,7 @@ var _ = Describe("Chain", func() {
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
......@@ -1897,7 +1897,7 @@ var _ = Describe("Chain", func() {
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
......@@ -2073,7 +2073,7 @@ var _ = Describe("Chain", func() {
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
......@@ -2957,7 +2957,7 @@ type chain struct {
*etcdraft.Chain
}
func newChain(timeout time.Duration, channel string, dataDir string, id uint64, raftMetadata *raftprotos.BlockMetadata) *chain {
func newChain(timeout time.Duration, channel string, dataDir string, id uint64, raftMetadata *raftprotos.BlockMetadata, consenters map[uint64]*raftprotos.Consenter) *chain {
rpc := &mocks.FakeRPC{}
clock := fakeclock.NewFakeClock(time.Now())
storage := raft.NewMemoryStorage()
......@@ -2973,6 +2973,7 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
BlockMetadata: raftMetadata,
Consenters: consenters,
Logger: flogging.NewFabricLogger(zap.NewExample()),
MemoryStorage: storage,
WALDir: path.Join(dataDir, "wal"),
......@@ -3266,19 +3267,19 @@ func (n *network) addChain(c *chain) {
n.chains[c.id] = c
}
func createNetwork(timeout time.Duration, channel string, dataDir string, raftMetadata *raftprotos.BlockMetadata) *network {
func createNetwork(timeout time.Duration, channel string, dataDir string, raftMetadata *raftprotos.BlockMetadata, consenters map[uint64]*raftprotos.Consenter) *network {
n := &network{
chains: make(map[uint64]*chain),
connectivity: make(map[uint64]bool),
links: make(map[uint64]map[uint64]bool),
}
for nodeID := range raftMetadata.Consenters {
for _, nodeID := range raftMetadata.ConsenterIds {
dir, err := ioutil.TempDir(dataDir, fmt.Sprintf("node-%d-", nodeID))
Expect(err).NotTo(HaveOccurred())
m := proto.Clone(raftMetadata).(*raftprotos.BlockMetadata)
n.addChain(newChain(timeout, channel, dir, nodeID, m))
n.addChain(newChain(timeout, channel, dir, nodeID, m, consenters))
}
return n
......
......@@ -156,7 +156,12 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
return nil, errors.Wrapf(err, "failed to read Raft metadata")
}
id, err := c.detectSelfID(blockMetadata.Consenters)
consenters := map[uint64]*etcdraft.Consenter{}
for i, consenter := range m.Consenters {
consenters[blockMetadata.ConsenterIds[i]] = consenter
}
id, err := c.detectSelfID(consenters)
if err != nil {
c.InactiveChainRegistry.TrackChain(support.ChainID(), support.Block(0), func() {
c.CreateChain(support.ChainID())
......@@ -194,6 +199,7 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
SnapInterval: m.Options.SnapshotInterval,
BlockMetadata: blockMetadata,
Consenters: consenters,
WALDir: path.Join(c.EtcdRaftConfig.WALDir, support.ChainID()),
SnapDir: path.Join(c.EtcdRaftConfig.SnapDir, support.ChainID()),
......@@ -222,20 +228,21 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
// ReadBlockMetadata attempts to read raft metadata from block metadata, if available.
// otherwise, it reads raft metadata from config metadata supplied.
func ReadBlockMetadata(blockMetadata *common.Metadata, configMetadata *etcdraft.ConfigMetadata) (*etcdraft.BlockMetadata, error) {
m := &etcdraft.BlockMetadata{
Consenters: map[uint64]*etcdraft.Consenter{},
NextConsenterId: 1,
}
if blockMetadata != nil && len(blockMetadata.Value) != 0 { // we have consenters mapping from block
m := &etcdraft.BlockMetadata{}
if err := proto.Unmarshal(blockMetadata.Value, m); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal block's metadata")
}
return m, nil
}
m := &etcdraft.BlockMetadata{
NextConsenterId: 1,
ConsenterIds: make([]uint64, len(configMetadata.Consenters)),
}
// need to read consenters from the configuration
for _, consenter := range configMetadata.Consenters {
m.Consenters[m.NextConsenterId] = consenter
for i := range m.ConsenterIds {
m.ConsenterIds[i] = m.NextConsenterId
m.NextConsenterId++
}
......
......@@ -47,7 +47,7 @@ type node struct {
}
func (n *node) start(fresh, join, migration bool) {
raftPeers := RaftPeers(n.metadata.Consenters)
raftPeers := RaftPeers(n.metadata.ConsenterIds)
n.logger.Debugf("Starting raft node: #peers: %v", len(raftPeers))
var campaign bool
......
......@@ -34,6 +34,7 @@ import (
// changes introduced during configuration update
type MembershipChanges struct {
NewBlockMetadata *etcdraft.BlockMetadata
NewConsenters map[uint64]*etcdraft.Consenter
AddedNodes []*etcdraft.Consenter
RemovedNodes []*etcdraft.Consenter
ConfChange *raftpb.ConfChange
......@@ -135,10 +136,10 @@ func newBlockPuller(support consensus.ConsenterSupport,
}
// RaftPeers maps consenters to slice of raft.Peer
func RaftPeers(consenters map[uint64]*etcdraft.Consenter) []raft.Peer {
func RaftPeers(consenterIDs []uint64) []raft.Peer {
var peers []raft.Peer
for raftID := range consenters {
for _, raftID := range consenterIDs {
peers = append(peers, raft.Peer{ID: raftID})
}
return peers
......@@ -155,38 +156,41 @@ func ConsentersToMap(consenters []*etcdraft.Consenter) map[string]struct{} {
// MembershipByCert convert consenters map into set encapsulated by map
// where key is client TLS certificate
func MembershipByCert(consenters map[uint64]*etcdraft.Consenter) map[string]struct{} {
set := map[string]struct{}{}
for _, c := range consenters {
set[string(c.ClientTlsCert)] = struct{}{}
func MembershipByCert(consenters map[uint64]*etcdraft.Consenter) map[string]uint64 {
set := map[string]uint64{}
for nodeID, c := range consenters {
set[string(c.ClientTlsCert)] = nodeID
}
return set
}
// ComputeMembershipChanges computes membership update based on information about new conseters, returns
// two slices: a slice of added consenters and a slice of consenters to be removed
func ComputeMembershipChanges(oldMetadata *etcdraft.BlockMetadata, newConsenters []*etcdraft.Consenter) (*MembershipChanges, error) {
func ComputeMembershipChanges(oldMetadata *etcdraft.BlockMetadata, oldConsenters map[uint64]*etcdraft.Consenter, newConsenters []*etcdraft.Consenter) (mc *MembershipChanges, err error) {
result := &MembershipChanges{
NewConsenters: map[uint64]*etcdraft.Consenter{},
NewBlockMetadata: proto.Clone(oldMetadata).(*etcdraft.BlockMetadata),
AddedNodes: []*etcdraft.Consenter{},
RemovedNodes: []*etcdraft.Consenter{},
}
if result.NewBlockMetadata.Consenters == nil {
// proto.Clone copies empty maps as nil
result.NewBlockMetadata.Consenters = map[uint64]*etcdraft.Consenter{}
}
result.NewBlockMetadata.ConsenterIds = make([]uint64, len(newConsenters))
currentConsentersSet := MembershipByCert(oldMetadata.Consenters)
for _, c := range newConsenters {
if _, exists := currentConsentersSet[string(c.ClientTlsCert)]; !exists {
result.AddedNodes = append(result.AddedNodes, c)
var addedNodeIndex int
currentConsentersSet := MembershipByCert(oldConsenters)
for i, c := range newConsenters {
if nodeID, exists := currentConsentersSet[string(c.ClientTlsCert)]; exists {
result.NewBlockMetadata.ConsenterIds[i] = nodeID
result.NewConsenters[nodeID] = c
continue
}
addedNodeIndex = i
result.AddedNodes = append(result.AddedNodes, c)
}
var deletedNodeID uint64
newConsentersSet := ConsentersToMap(newConsenters)
for nodeID, c := range oldMetadata.Consenters {
for nodeID, c := range oldConsenters {
if _, exists := newConsentersSet[string(c.ClientTlsCert)]; !exists {
result.RemovedNodes = append(result.RemovedNodes, c)
deletedNodeID = nodeID
......@@ -197,11 +201,13 @@ func ComputeMembershipChanges(oldMetadata *etcdraft.BlockMetadata, newConsenters
case len(result.AddedNodes) == 1 && len(result.RemovedNodes) == 1:
// cert rotation
result.RotatedNode = deletedNodeID
result.NewBlockMetadata.Consenters[deletedNodeID] = result.AddedNodes[0]
result.NewBlockMetadata.ConsenterIds[addedNodeIndex] = deletedNodeID
result.NewConsenters[deletedNodeID] = result.AddedNodes[0]
case len(result.AddedNodes) == 1 && len(result.RemovedNodes) == 0:
// new node
nodeID := result.NewBlockMetadata.NextConsenterId
result.NewBlockMetadata.Consenters[nodeID] = result.AddedNodes[0]
result.NewConsenters[nodeID] = result.AddedNodes[0]
result.NewBlockMetadata.ConsenterIds[addedNodeIndex] = nodeID
result.NewBlockMetadata.NextConsenterId++
result.ConfChange = &raftpb.ConfChange{
NodeID: nodeID,
......@@ -214,7 +220,7 @@ func ComputeMembershipChanges(oldMetadata *etcdraft.BlockMetadata, newConsenters
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeID,
}
delete(result.NewBlockMetadata.Consenters, nodeID)
delete(result.NewConsenters, nodeID)
case len(result.AddedNodes) == 0 && len(result.RemovedNodes) == 0:
// no change
default:
......@@ -418,14 +424,14 @@ func NodeExists(id uint64, nodes []uint64) bool {
// ConfChange computes Raft configuration changes based on current Raft configuration state and
// consenters mapping stored in RaftMetadata
func ConfChange(raftMetadata *etcdraft.BlockMetadata, confState *raftpb.ConfState) *raftpb.ConfChange {
func ConfChange(blockMetadata *etcdraft.BlockMetadata, confState *raftpb.ConfState) *raftpb.ConfChange {