Commit 9a124d65 authored by Jason Yellick's avatar Jason Yellick Committed by Jay Guo
Browse files

FAB-14618 Store only nodeIDs in metadata



Presently, the block metadata encodes the TLS certificates of all of the
Raft consenters in the system for each block.  Because these TLS certs
are non-trivial in size, and there may be a large set of consenters,
this actually creates a significant amount of waste on the filesystem.

As a small optimization, this CR modifies the block metadata to only
store the nodeIDs instead of the full set of consenter info.  It then
correlates the consenter slice found in the channel config data with
this slice of nodeIDs to build a mapping between the two (which was
previously persisted).

Change-Id: Iaa66dacbcc48a041318c8a718099a873b9626240
Signed-off-by: default avatarJason Yellick <jyellick@us.ibm.com>
parent 75e19aa0
......@@ -112,9 +112,13 @@ type Options struct {
MaxSizePerMsg uint64
MaxInflightMsgs int
// BlockMetdata and Consenters should only be modified while under lock
// of raftMetadataLock
BlockMetadata *etcdraft.BlockMetadata
Metrics *Metrics
Cert []byte
Consenters map[uint64]*etcdraft.Consenter
Metrics *Metrics
Cert []byte
EvictionSuspicion time.Duration
LeaderCheckInterval time.Duration
......@@ -313,7 +317,7 @@ func (c *Chain) MigrationStatus() migration.Status {
func (c *Chain) Start() {
c.logger.Infof("Starting Raft node")
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.Consenters)))
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
// all nodes start out as followers
c.Metrics.IsLeader.Set(float64(0))
if err := c.configureComm(); err != nil {
......@@ -719,7 +723,7 @@ func (c *Chain) serveRequest() {
select {
case <-c.errorC:
default:
nodeCount := len(c.opts.BlockMetadata.Consenters)
nodeCount := len(c.opts.BlockMetadata.ConsenterIds)
// Only close the error channel (to signal the broadcast/deliver front-end a consensus backend error)
// If we are a cluster of size 3 or more, otherwise we can't expand a cluster of size 1 to 2 nodes.
if nodeCount > 2 {
......@@ -945,6 +949,7 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
c.raftMetadataLock.Lock()
c.opts.BlockMetadata = configMembership.NewBlockMetadata
c.opts.Consenters = configMembership.NewConsenters
c.raftMetadataLock.Unlock()
if err := c.configureComm(); err != nil {
......@@ -978,7 +983,7 @@ func (c *Chain) detectConfChange(block *common.Block) *MembershipChanges {
c.logger.Infof("Snapshot interval is updated to %d bytes (was %d)", c.sizeLimit, old)
}
changes, err := ComputeMembershipChanges(c.opts.BlockMetadata, configMetadata.Consenters)
changes, err := ComputeMembershipChanges(c.opts.BlockMetadata, c.opts.Consenters, configMetadata.Consenters)
if err != nil {
c.logger.Panicf("illegal configuration change detected: %s", err)
}
......@@ -1054,7 +1059,7 @@ func (c *Chain) apply(ents []raftpb.Entry) {
c.confChangeInProgress = nil
c.configInflight = false
// report the new cluster size
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.Consenters)))
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
}
if cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID {
......@@ -1130,7 +1135,7 @@ func (c *Chain) configureComm() error {
func (c *Chain) remotePeers() ([]cluster.RemoteNode, error) {
var nodes []cluster.RemoteNode
for raftID, consenter := range c.opts.BlockMetadata.Consenters {
for raftID, consenter := range c.opts.Consenters {
// No need to know yourself
if raftID == c.raftID {
continue
......@@ -1175,7 +1180,7 @@ func (c *Chain) checkConsentersSet(configValue *common.ConfigValue) error {
}
c.raftMetadataLock.RLock()
_, err = ComputeMembershipChanges(c.opts.BlockMetadata, updatedMetadata.Consenters)
_, err = ComputeMembershipChanges(c.opts.BlockMetadata, c.opts.Consenters, updatedMetadata.Consenters)
c.raftMetadataLock.RUnlock()
return err
......@@ -1196,11 +1201,12 @@ func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
case common.HeaderType_CONFIG:
configMembership := c.detectConfChange(block)
c.opts.BlockMetadata.RaftIndex = index
c.raftMetadataLock.Lock()
if configMembership != nil {
c.opts.BlockMetadata = configMembership.NewBlockMetadata
c.opts.Consenters = configMembership.NewConsenters
}
c.opts.BlockMetadata.RaftIndex = index
c.raftMetadataLock.Unlock()
blockMetadataBytes := utils.MarshalOrPanic(c.opts.BlockMetadata)
......@@ -1284,7 +1290,7 @@ func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
// extracting current Raft configuration state
confState := c.Node.ApplyConfChange(raftpb.ConfChange{})
if len(confState.Nodes) == len(c.opts.BlockMetadata.Consenters) {
if len(confState.Nodes) == len(c.opts.BlockMetadata.ConsenterIds) {
// since configuration change could only add one node or
// remove one node at a time, if raft nodes state size
// equal to membership stored in block metadata field,
......
......@@ -92,6 +92,7 @@ var _ = Describe("Chain", func() {
var (
configurator *mocks.Configurator
consenterMetadata *raftprotos.ConfigMetadata
consenters map[uint64]*raftprotos.Consenter
clock *fakeclock.FakeClock
opts etcdraft.Options
support *consensusmocks.FakeConsenterSupport
......@@ -134,15 +135,20 @@ var _ = Describe("Chain", func() {
support.BlockReturns(getSeedBlock())
meta := &raftprotos.BlockMetadata{
Consenters: map[uint64]*raftprotos.Consenter{},
ConsenterIds: make([]uint64, len(consenterMetadata.Consenters)),
NextConsenterId: 1,
}
for _, c := range consenterMetadata.Consenters {
meta.Consenters[meta.NextConsenterId] = c
for i := range meta.ConsenterIds {
meta.ConsenterIds[i] = meta.NextConsenterId
meta.NextConsenterId++
}
consenters = map[uint64]*raftprotos.Consenter{}
for i, c := range consenterMetadata.Consenters {
consenters[meta.ConsenterIds[i]] = c
}
fakeFields = newFakeMetricsFields()
opts = etcdraft.Options{
......@@ -154,6 +160,7 @@ var _ = Describe("Chain", func() {
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
BlockMetadata: meta,
Consenters: consenters,
Logger: logger,
MemoryStorage: storage,
WALDir: walDir,
......@@ -715,17 +722,8 @@ var _ = Describe("Chain", func() {
)
BeforeEach(func() {
tlsCA, _ := tlsgen.NewCA()
raftMetadata = &raftprotos.BlockMetadata{
Consenters: map[uint64]*raftprotos.Consenter{
1: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
},
ConsenterIds: []uint64{1},
NextConsenterId: 2,
}
})
......@@ -762,7 +760,7 @@ var _ = Describe("Chain", func() {
})
It("replays blocks from committed entries", func() {
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
c.init()
c.Start()
defer c.Halt()
......@@ -792,7 +790,7 @@ var _ = Describe("Chain", func() {
It("only replays blocks after Applied index", func() {
raftMetadata.RaftIndex = m1.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
c.support.WriteBlock(support.WriteBlockArgsForCall(0))
c.init()
......@@ -818,7 +816,7 @@ var _ = Describe("Chain", func() {
It("does not replay any block if already in sync", func() {
raftMetadata.RaftIndex = m2.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
c.init()
c.Start()
defer c.Halt()
......@@ -950,7 +948,7 @@ var _ = Describe("Chain", func() {
chain.Halt()
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
c.init()
signal := make(chan struct{})
......@@ -1013,7 +1011,7 @@ var _ = Describe("Chain", func() {
chain.Halt()
raftMetadata.RaftIndex = m.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
c.opts.SnapInterval = 1
c.init()
......@@ -1042,7 +1040,7 @@ var _ = Describe("Chain", func() {
m = &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
raftMetadata.RaftIndex = m.RaftIndex
cx := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
cx := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
cx.init()
cx.Start()
......@@ -1106,7 +1104,7 @@ var _ = Describe("Chain", func() {
chain.Halt()
raftMetadata.RaftIndex = m.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
cnt := support.WriteBlockCallCount()
for i := 0; i < cnt; i++ {
c.support.WriteBlock(support.WriteBlockArgsForCall(i))
......@@ -1164,7 +1162,7 @@ var _ = Describe("Chain", func() {
chain.Halt()
raftMetadata.RaftIndex = m.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
// replay block 1&2
c.support.WriteBlock(support.WriteBlockArgsForCall(0))
c.support.WriteBlock(support.WriteBlockArgsForCall(1))
......@@ -1299,6 +1297,7 @@ var _ = Describe("Chain", func() {
dataDir string
c1, c2, c3 *chain
raftMetadata *raftprotos.BlockMetadata
consenters map[uint64]*raftprotos.Consenter
)
BeforeEach(func() {
......@@ -1311,30 +1310,32 @@ var _ = Describe("Chain", func() {
Expect(err).NotTo(HaveOccurred())
raftMetadata = &raftprotos.BlockMetadata{
Consenters: map[uint64]*raftprotos.Consenter{
1: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
2: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
3: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
},
ConsenterIds: []uint64{1, 2, 3},
NextConsenterId: 4,
}
network = createNetwork(timeout, channelID, dataDir, raftMetadata)
consenters = map[uint64]*raftprotos.Consenter{
1: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
2: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
3: {
Host: "localhost",
Port: 7051,
ClientTlsCert: clientTLSCert(tlsCA),
ServerTlsCert: serverTLSCert(tlsCA),
},
}
network = createNetwork(timeout, channelID, dataDir, raftMetadata, consenters)
c1 = network.chains[1]
c2 = network.chains[2]
c3 = network.chains[3]
......@@ -1429,7 +1430,7 @@ var _ = Describe("Chain", func() {
var (
addConsenterConfigValue = func() map[string]*common.ConfigValue {
metadata := &raftprotos.ConfigMetadata{}
for _, consenter := range raftMetadata.Consenters {
for _, consenter := range consenters {
metadata.Consenters = append(metadata.Consenters, consenter)
}
......@@ -1451,11 +1452,11 @@ var _ = Describe("Chain", func() {
}
}
removeConsenterConfigValue = func(id uint64) map[string]*common.ConfigValue {
newRaftMetadata := proto.Clone(raftMetadata).(*raftprotos.BlockMetadata)
delete(newRaftMetadata.Consenters, id)
metadata := &raftprotos.ConfigMetadata{}
for _, consenter := range newRaftMetadata.Consenters {
for nodeID, consenter := range consenters {
if nodeID == id {
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
......@@ -1499,13 +1500,12 @@ var _ = Describe("Chain", func() {
Context("reconfiguration", func() {
It("cannot change consenter set by more than 1 node", func() {
updatedRaftMetadata := proto.Clone(raftMetadata).(*raftprotos.BlockMetadata)
// remove second & third consenter
delete(updatedRaftMetadata.Consenters, 2)
delete(updatedRaftMetadata.Consenters, 3)
metadata := &raftprotos.ConfigMetadata{}
for _, consenter := range updatedRaftMetadata.Consenters {
for id, consenter := range consenters {
if id == 2 || id == 3 {
// remove second & third consenter
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
......@@ -1528,12 +1528,12 @@ var _ = Describe("Chain", func() {
})
It("can rotate certificate by adding and removing 1 node in one config update", func() {
updatedRaftMetadata := proto.Clone(raftMetadata).(*raftprotos.BlockMetadata)
// remove second consenter
delete(updatedRaftMetadata.Consenters, 2)
metadata := &raftprotos.ConfigMetadata{}
for _, consenter := range updatedRaftMetadata.Consenters {
for id, consenter := range consenters {
if id == 2 {
// remove second consenter
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
......@@ -1612,7 +1612,7 @@ var _ = Describe("Chain", func() {
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
......@@ -1729,7 +1729,7 @@ var _ = Describe("Chain", func() {
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
......@@ -1820,7 +1820,7 @@ var _ = Describe("Chain", func() {
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
......@@ -1897,7 +1897,7 @@ var _ = Describe("Chain", func() {
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
......@@ -2073,7 +2073,7 @@ var _ = Describe("Chain", func() {
raftmeta, err := etcdraft.ReadBlockMetadata(meta, nil)
Expect(err).NotTo(HaveOccurred())
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta)
c4 := newChain(timeout, channelID, dataDir, 4, raftmeta, consenters)
// if we join a node to existing network, it MUST already obtained blocks
// till the config block that adds this node to cluster.
c4.support.WriteBlock(c1.support.WriteBlockArgsForCall(0))
......@@ -2957,7 +2957,7 @@ type chain struct {
*etcdraft.Chain
}
func newChain(timeout time.Duration, channel string, dataDir string, id uint64, raftMetadata *raftprotos.BlockMetadata) *chain {
func newChain(timeout time.Duration, channel string, dataDir string, id uint64, raftMetadata *raftprotos.BlockMetadata, consenters map[uint64]*raftprotos.Consenter) *chain {
rpc := &mocks.FakeRPC{}
clock := fakeclock.NewFakeClock(time.Now())
storage := raft.NewMemoryStorage()
......@@ -2973,6 +2973,7 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
BlockMetadata: raftMetadata,
Consenters: consenters,
Logger: flogging.NewFabricLogger(zap.NewExample()),
MemoryStorage: storage,
WALDir: path.Join(dataDir, "wal"),
......@@ -3266,19 +3267,19 @@ func (n *network) addChain(c *chain) {
n.chains[c.id] = c
}
func createNetwork(timeout time.Duration, channel string, dataDir string, raftMetadata *raftprotos.BlockMetadata) *network {
func createNetwork(timeout time.Duration, channel string, dataDir string, raftMetadata *raftprotos.BlockMetadata, consenters map[uint64]*raftprotos.Consenter) *network {
n := &network{
chains: make(map[uint64]*chain),
connectivity: make(map[uint64]bool),
links: make(map[uint64]map[uint64]bool),
}
for nodeID := range raftMetadata.Consenters {
for _, nodeID := range raftMetadata.ConsenterIds {
dir, err := ioutil.TempDir(dataDir, fmt.Sprintf("node-%d-", nodeID))
Expect(err).NotTo(HaveOccurred())
m := proto.Clone(raftMetadata).(*raftprotos.BlockMetadata)
n.addChain(newChain(timeout, channel, dir, nodeID, m))
n.addChain(newChain(timeout, channel, dir, nodeID, m, consenters))
}
return n
......
......@@ -156,7 +156,12 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
return nil, errors.Wrapf(err, "failed to read Raft metadata")
}
id, err := c.detectSelfID(blockMetadata.Consenters)
consenters := map[uint64]*etcdraft.Consenter{}
for i, consenter := range m.Consenters {
consenters[blockMetadata.ConsenterIds[i]] = consenter
}
id, err := c.detectSelfID(consenters)
if err != nil {
c.InactiveChainRegistry.TrackChain(support.ChainID(), support.Block(0), func() {
c.CreateChain(support.ChainID())
......@@ -194,6 +199,7 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
SnapInterval: m.Options.SnapshotInterval,
BlockMetadata: blockMetadata,
Consenters: consenters,
WALDir: path.Join(c.EtcdRaftConfig.WALDir, support.ChainID()),
SnapDir: path.Join(c.EtcdRaftConfig.SnapDir, support.ChainID()),
......@@ -222,20 +228,21 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
// ReadBlockMetadata attempts to read raft metadata from block metadata, if available.
// otherwise, it reads raft metadata from config metadata supplied.
func ReadBlockMetadata(blockMetadata *common.Metadata, configMetadata *etcdraft.ConfigMetadata) (*etcdraft.BlockMetadata, error) {
m := &etcdraft.BlockMetadata{
Consenters: map[uint64]*etcdraft.Consenter{},
NextConsenterId: 1,
}
if blockMetadata != nil && len(blockMetadata.Value) != 0 { // we have consenters mapping from block
m := &etcdraft.BlockMetadata{}
if err := proto.Unmarshal(blockMetadata.Value, m); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal block's metadata")
}
return m, nil
}
m := &etcdraft.BlockMetadata{
NextConsenterId: 1,
ConsenterIds: make([]uint64, len(configMetadata.Consenters)),
}
// need to read consenters from the configuration
for _, consenter := range configMetadata.Consenters {
m.Consenters[m.NextConsenterId] = consenter
for i := range m.ConsenterIds {
m.ConsenterIds[i] = m.NextConsenterId
m.NextConsenterId++
}
......
......@@ -47,7 +47,7 @@ type node struct {
}
func (n *node) start(fresh, join, migration bool) {
raftPeers := RaftPeers(n.metadata.Consenters)
raftPeers := RaftPeers(n.metadata.ConsenterIds)
n.logger.Debugf("Starting raft node: #peers: %v", len(raftPeers))
var campaign bool
......
......@@ -34,6 +34,7 @@ import (
// changes introduced during configuration update
type MembershipChanges struct {
NewBlockMetadata *etcdraft.BlockMetadata
NewConsenters map[uint64]*etcdraft.Consenter
AddedNodes []*etcdraft.Consenter
RemovedNodes []*etcdraft.Consenter
ConfChange *raftpb.ConfChange
......@@ -135,10 +136,10 @@ func newBlockPuller(support consensus.ConsenterSupport,
}
// RaftPeers maps consenters to slice of raft.Peer
func RaftPeers(consenters map[uint64]*etcdraft.Consenter) []raft.Peer {
func RaftPeers(consenterIDs []uint64) []raft.Peer {
var peers []raft.Peer
for raftID := range consenters {
for _, raftID := range consenterIDs {
peers = append(peers, raft.Peer{ID: raftID})
}
return peers
......@@ -155,38 +156,41 @@ func ConsentersToMap(consenters []*etcdraft.Consenter) map[string]struct{} {
// MembershipByCert convert consenters map into set encapsulated by map
// where key is client TLS certificate
func MembershipByCert(consenters map[uint64]*etcdraft.Consenter) map[string]struct{} {
set := map[string]struct{}{}
for _, c := range consenters {
set[string(c.ClientTlsCert)] = struct{}{}
func MembershipByCert(consenters map[uint64]*etcdraft.Consenter) map[string]uint64 {
set := map[string]uint64{}
for nodeID, c := range consenters {
set[string(c.ClientTlsCert)] = nodeID
}
return set
}
// ComputeMembershipChanges computes membership update based on information about new conseters, returns
// two slices: a slice of added consenters and a slice of consenters to be removed
func ComputeMembershipChanges(oldMetadata *etcdraft.BlockMetadata, newConsenters []*etcdraft.Consenter) (*MembershipChanges, error) {
func ComputeMembershipChanges(oldMetadata *etcdraft.BlockMetadata, oldConsenters map[uint64]*etcdraft.Consenter, newConsenters []*etcdraft.Consenter) (mc *MembershipChanges, err error) {
result := &MembershipChanges{
NewConsenters: map[uint64]*etcdraft.Consenter{},
NewBlockMetadata: proto.Clone(oldMetadata).(*etcdraft.BlockMetadata),
AddedNodes: []*etcdraft.Consenter{},
RemovedNodes: []*etcdraft.Consenter{},
}
if result.NewBlockMetadata.Consenters == nil {
// proto.Clone copies empty maps as nil
result.NewBlockMetadata.Consenters = map[uint64]*etcdraft.Consenter{}
}
result.NewBlockMetadata.ConsenterIds = make([]uint64, len(newConsenters))
currentConsentersSet := MembershipByCert(oldMetadata.Consenters)
for _, c := range newConsenters {
if _, exists := currentConsentersSet[string(c.ClientTlsCert)]; !exists {
result.AddedNodes = append(result.AddedNodes, c)
var addedNodeIndex int
currentConsentersSet := MembershipByCert(oldConsenters)
for i, c := range newConsenters {
if nodeID, exists := currentConsentersSet[string(c.ClientTlsCert)]; exists {
result.NewBlockMetadata.ConsenterIds[i] = nodeID
result.NewConsenters[nodeID] = c
continue
}
addedNodeIndex = i
result.AddedNodes = append(result.AddedNodes, c)
}
var deletedNodeID uint64
newConsentersSet := ConsentersToMap(newConsenters)
for nodeID, c := range oldMetadata.Consenters {
for nodeID, c := range oldConsenters {
if _, exists := newConsentersSet[string(c.ClientTlsCert)]; !exists {
result.RemovedNodes = append(result.RemovedNodes, c)
deletedNodeID = nodeID
......@@ -197,11 +201,13 @@ func ComputeMembershipChanges(oldMetadata *etcdraft.BlockMetadata, newConsenters
case len(result.AddedNodes) == 1 && len(result.RemovedNodes) == 1:
// cert rotation
result.RotatedNode = deletedNodeID
result.NewBlockMetadata.Consenters[deletedNodeID] = result.AddedNodes[0]
result.NewBlockMetadata.ConsenterIds[addedNodeIndex] = deletedNodeID
result.NewConsenters[deletedNodeID] = result.AddedNodes[0]
case len(result.AddedNodes) == 1 && len(result.RemovedNodes) == 0:
// new node
nodeID := result.NewBlockMetadata.NextConsenterId
result.NewBlockMetadata.Consenters[nodeID] = result.AddedNodes[0]
result.NewConsenters[nodeID] = result.AddedNodes[0]
result.NewBlockMetadata.ConsenterIds[addedNodeIndex] = nodeID
result.NewBlockMetadata.NextConsenterId++
result.ConfChange = &raftpb.ConfChange{
NodeID: nodeID,
......@@ -214,7 +220,7 @@ func ComputeMembershipChanges(oldMetadata *etcdraft.BlockMetadata, newConsenters
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeID,
}
delete(result.NewBlockMetadata.Consenters, nodeID)
delete(result.NewConsenters, nodeID)
case len(result.AddedNodes) == 0 && len(result.RemovedNodes) == 0:
// no change
default:
......@@ -418,14 +424,14 @@ func NodeExists(id uint64, nodes []uint64) bool {
// ConfChange computes Raft configuration changes based on current Raft configuration state and
// consenters mapping stored in RaftMetadata
func ConfChange(raftMetadata *etcdraft.BlockMetadata, confState *raftpb.ConfState) *raftpb.ConfChange {
func ConfChange(blockMetadata *etcdraft.BlockMetadata, confState *raftp