Commit 75e19aa0 authored by Artem Barger's avatar Artem Barger Committed by Gerrit Code Review
Browse files

Merge "FAB-14620 Refactor detectConfChange" into release-1.4

parents 433db56d a2323d98
......@@ -938,13 +938,13 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
if utils.IsConfigBlock(block) {
c.support.WriteConfigBlock(block, nil)
confChange, blockMetadata, rotate := c.detectConfChange(block)
configMembership := c.detectConfChange(block)
if confChange != nil || rotate != 0 {
if configMembership != nil && configMembership.Changed() {
c.logger.Infof("Config block %d changes consenter set, communication should be reconfigured", block.Header.Number)
c.raftMetadataLock.Lock()
c.opts.BlockMetadata = blockMetadata
c.opts.BlockMetadata = configMembership.NewBlockMetadata
c.raftMetadataLock.Unlock()
if err := c.configureComm(); err != nil {
......@@ -963,28 +963,31 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
return nil
}
func (c *Chain) detectConfChange(block *common.Block) (*raftpb.ConfChange, *etcdraft.BlockMetadata, uint64) {
func (c *Chain) detectConfChange(block *common.Block) *MembershipChanges {
// If config is targeting THIS channel, inspect consenter set and
// propose raft ConfChange if it adds/removes node.
configMetadata, blockMetadata := c.newMetadata(block)
configMetadata := c.newConfigMetadata(block)
if configMetadata != nil && configMetadata.Options != nil && configMetadata.Options.SnapshotInterval != 0 {
if configMetadata == nil {
return nil
}
if configMetadata.Options != nil && configMetadata.Options.SnapshotInterval != 0 {
old := c.sizeLimit
c.sizeLimit = configMetadata.Options.SnapshotInterval
c.logger.Infof("Snapshot interval is updated to %d bytes (was %d)", c.sizeLimit, old)
}
var changes *MembershipChanges
if configMetadata != nil {
changes = ComputeMembershipChanges(blockMetadata.Consenters, configMetadata.Consenters)
changes, err := ComputeMembershipChanges(c.opts.BlockMetadata, configMetadata.Consenters)
if err != nil {
c.logger.Panicf("illegal configuration change detected: %s", err)
}
confChange, rotate := changes.UpdateRaftMetadataAndConfChange(blockMetadata)
if rotate != 0 {
c.logger.Infof("Config block %d rotates TLS certificate of node %d", block.Header.Number, rotate)
if changes.Rotated() {
c.logger.Infof("Config block %d rotates TLS certificate of node %d", block.Header.Number, changes.RotatedNode)
}
return confChange, blockMetadata, rotate
return changes
}
func (c *Chain) apply(ents []raftpb.Entry) {
......@@ -1172,19 +1175,10 @@ func (c *Chain) checkConsentersSet(configValue *common.ConfigValue) error {
}
c.raftMetadataLock.RLock()
changes := ComputeMembershipChanges(c.opts.BlockMetadata.Consenters, updatedMetadata.Consenters)
_, err = ComputeMembershipChanges(c.opts.BlockMetadata, updatedMetadata.Consenters)
c.raftMetadataLock.RUnlock()
// Adding and removing 1 node is considered as certificate rotation, which is allowed.
if len(changes.AddedNodes) == 1 && len(changes.RemovedNodes) == 1 {
return nil
}
if changes.TotalChanges > 1 {
return errors.Errorf("update of more than one consenter at a time is not supported, requested changes: %s", changes)
}
return nil
return err
}
// writeConfigBlock writes configuration blocks into the ledger in
......@@ -1200,43 +1194,49 @@ func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
switch common.HeaderType(hdr.Type) {
case common.HeaderType_CONFIG:
confChange, blockMetadata, rotate := c.detectConfChange(block)
blockMetadata.RaftIndex = index
configMembership := c.detectConfChange(block)
blockMetadataBytes := utils.MarshalOrPanic(blockMetadata)
c.raftMetadataLock.Lock()
if configMembership != nil {
c.opts.BlockMetadata = configMembership.NewBlockMetadata
}
c.opts.BlockMetadata.RaftIndex = index
c.raftMetadataLock.Unlock()
blockMetadataBytes := utils.MarshalOrPanic(c.opts.BlockMetadata)
// write block with metadata
c.support.WriteConfigBlock(block, blockMetadataBytes)
c.raftMetadataLock.Lock()
c.opts.BlockMetadata = blockMetadata
c.raftMetadataLock.Unlock()
if configMembership == nil {
return
}
// update membership
if confChange != nil {
if configMembership.ConfChange != nil {
// We need to propose conf change in a go routine, because it may be blocked if raft node
// becomes leaderless, and we should not block `serveRequest` so it can keep consuming applyC,
// otherwise we have a deadlock.
go func() {
// ProposeConfChange returns error only if node being stopped.
// This proposal is dropped by followers because DisableProposalForwarding is enabled.
if err := c.Node.ProposeConfChange(context.TODO(), *confChange); err != nil {
if err := c.Node.ProposeConfChange(context.TODO(), *configMembership.ConfChange); err != nil {
c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
}
}()
c.confChangeInProgress = confChange
c.confChangeInProgress = configMembership.ConfChange
switch confChange.Type {
switch configMembership.ConfChange.Type {
case raftpb.ConfChangeAddNode:
c.logger.Infof("Config block just committed adds node %d, pause accepting transactions till config change is applied", confChange.NodeID)
c.logger.Infof("Config block just committed adds node %d, pause accepting transactions till config change is applied", configMembership.ConfChange.NodeID)
case raftpb.ConfChangeRemoveNode:
c.logger.Infof("Config block just committed removes node %d, pause accepting transactions till config change is applied", confChange.NodeID)
c.logger.Infof("Config block just committed removes node %d, pause accepting transactions till config change is applied", configMembership.ConfChange.NodeID)
default:
c.logger.Panic("Programming error, encountered unsupported raft config change")
}
c.configInflight = true
} else if rotate != 0 {
} else if configMembership.Rotated() {
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
......@@ -1296,19 +1296,13 @@ func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
return ConfChange(c.opts.BlockMetadata, confState)
}
// newMetadata extract raft metadata from the configuration block
func (c *Chain) newMetadata(block *common.Block) (*etcdraft.ConfigMetadata, *etcdraft.BlockMetadata) {
// newMetadata extract config metadata from the configuration block
func (c *Chain) newConfigMetadata(block *common.Block) *etcdraft.ConfigMetadata {
metadata, err := ConsensusMetadataFromConfigBlock(block)
if err != nil {
c.logger.Panicf("error reading consensus metadata: %s", err)
}
raftMetadata := proto.Clone(c.opts.BlockMetadata).(*etcdraft.BlockMetadata)
// proto.Clone doesn't copy an empty map, hence need to initialize it after
// cloning
if raftMetadata.Consenters == nil {
raftMetadata.Consenters = map[uint64]*etcdraft.Consenter{}
}
return metadata, raftMetadata
return metadata
}
func (c *Chain) suspectEviction() bool {
......
......@@ -33,9 +33,11 @@ import (
// MembershipChanges keeps information about membership
// changes introduced during configuration update
type MembershipChanges struct {
AddedNodes []*etcdraft.Consenter
RemovedNodes []*etcdraft.Consenter
TotalChanges uint32
NewBlockMetadata *etcdraft.BlockMetadata
AddedNodes []*etcdraft.Consenter
RemovedNodes []*etcdraft.Consenter
ConfChange *raftpb.ConfChange
RotatedNode uint64
}
// Stringer implements fmt.Stringer interface
......@@ -43,54 +45,14 @@ func (mc *MembershipChanges) String() string {
return fmt.Sprintf("add %d node(s), remove %d node(s)", len(mc.AddedNodes), len(mc.RemovedNodes))
}
// UpdateRaftMetadataAndConfChange given the membership changes and RaftMetadata method calculates
// updates to be applied to the raft cluster configuration in addition updates mapping between
// consenter and its id within metadata. Adding and removing a node at the same time is considered
// to be certificate rotation, and the ID of rotated node is returned.
func (mc *MembershipChanges) UpdateRaftMetadataAndConfChange(raftMetadata *etcdraft.BlockMetadata) (cc *raftpb.ConfChange, rotate uint64) {
if mc == nil || mc.TotalChanges == 0 {
return nil, 0
}
var confChange *raftpb.ConfChange
if len(mc.AddedNodes) == 1 && len(mc.RemovedNodes) == 1 {
for id, node := range raftMetadata.Consenters {
if bytes.Equal(mc.RemovedNodes[0].ClientTlsCert, node.ClientTlsCert) {
raftMetadata.Consenters[id] = mc.AddedNodes[0]
return nil, id
}
}
}
// producing corresponding raft configuration changes
if len(mc.AddedNodes) > 0 {
nodeID := raftMetadata.NextConsenterId
raftMetadata.Consenters[nodeID] = mc.AddedNodes[0]
raftMetadata.NextConsenterId++
confChange = &raftpb.ConfChange{
NodeID: nodeID,
Type: raftpb.ConfChangeAddNode,
}
return confChange, 0
}
if len(mc.RemovedNodes) > 0 {
for _, c := range mc.RemovedNodes {
for nodeID, node := range raftMetadata.Consenters {
if bytes.Equal(c.ClientTlsCert, node.ClientTlsCert) {
delete(raftMetadata.Consenters, nodeID)
confChange = &raftpb.ConfChange{
NodeID: nodeID,
Type: raftpb.ConfChangeRemoveNode,
}
break
}
}
}
}
// Changed indicates whether these changes actually do anything
func (mc *MembershipChanges) Changed() bool {
return len(mc.AddedNodes) > 0 || len(mc.RemovedNodes) > 0
}
return confChange, 0
// Rotated indicates whether the change was a rotation
func (mc *MembershipChanges) Rotated() bool {
return len(mc.AddedNodes) == 1 && len(mc.RemovedNodes) == 1
}
// EndpointconfigFromFromSupport extracts TLS CA certificates and endpoints from the ConsenterSupport
......@@ -203,29 +165,64 @@ func MembershipByCert(consenters map[uint64]*etcdraft.Consenter) map[string]stru
// ComputeMembershipChanges computes membership update based on information about new conseters, returns
// two slices: a slice of added consenters and a slice of consenters to be removed
func ComputeMembershipChanges(oldConsenters map[uint64]*etcdraft.Consenter, newConsenters []*etcdraft.Consenter) *MembershipChanges {
func ComputeMembershipChanges(oldMetadata *etcdraft.BlockMetadata, newConsenters []*etcdraft.Consenter) (*MembershipChanges, error) {
result := &MembershipChanges{
AddedNodes: []*etcdraft.Consenter{},
RemovedNodes: []*etcdraft.Consenter{},
NewBlockMetadata: proto.Clone(oldMetadata).(*etcdraft.BlockMetadata),
AddedNodes: []*etcdraft.Consenter{},
RemovedNodes: []*etcdraft.Consenter{},
}
if result.NewBlockMetadata.Consenters == nil {
// proto.Clone copies empty maps as nil
result.NewBlockMetadata.Consenters = map[uint64]*etcdraft.Consenter{}
}
currentConsentersSet := MembershipByCert(oldConsenters)
currentConsentersSet := MembershipByCert(oldMetadata.Consenters)
for _, c := range newConsenters {
if _, exists := currentConsentersSet[string(c.ClientTlsCert)]; !exists {
result.AddedNodes = append(result.AddedNodes, c)
result.TotalChanges++
}
}
var deletedNodeID uint64
newConsentersSet := ConsentersToMap(newConsenters)
for _, c := range oldConsenters {
for nodeID, c := range oldMetadata.Consenters {
if _, exists := newConsentersSet[string(c.ClientTlsCert)]; !exists {
result.RemovedNodes = append(result.RemovedNodes, c)
result.TotalChanges++
deletedNodeID = nodeID
}
}
return result
switch {
case len(result.AddedNodes) == 1 && len(result.RemovedNodes) == 1:
// cert rotation
result.RotatedNode = deletedNodeID
result.NewBlockMetadata.Consenters[deletedNodeID] = result.AddedNodes[0]
case len(result.AddedNodes) == 1 && len(result.RemovedNodes) == 0:
// new node
nodeID := result.NewBlockMetadata.NextConsenterId
result.NewBlockMetadata.Consenters[nodeID] = result.AddedNodes[0]
result.NewBlockMetadata.NextConsenterId++
result.ConfChange = &raftpb.ConfChange{
NodeID: nodeID,
Type: raftpb.ConfChangeAddNode,
}
case len(result.AddedNodes) == 0 && len(result.RemovedNodes) == 1:
// removed node
nodeID := deletedNodeID
result.ConfChange = &raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeID,
}
delete(result.NewBlockMetadata.Consenters, nodeID)
case len(result.AddedNodes) == 0 && len(result.RemovedNodes) == 0:
// no change
default:
// len(result.AddedNodes) > 1 || len(result.RemovedNodes) > 1 {
return nil, errors.Errorf("update of more than one consenter at a time is not supported, requested changes: %s", result)
}
return result, nil
}
// MetadataHasDuplication returns an error if the metadata has duplication of consenters.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment