Commit 7e440c73 authored by Jay Guo's avatar Jay Guo
Browse files

FAB-14540 transfer leader if cert of it is rotated



When the certificate of leader is rotated, it will certainly be
disconnected after reconfiguring communication. Instead of waiting
for ElectionTimeout and elect new leader, the old leader should be
more cooporative and transfer its leadership to others.

Note that proposals sent during this transition will be automatically
dropped by etcd/raft, however transition should be fairly short.

Change-Id: Iabd005d00864afe09b4738f1ed36b939b1d83eed
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 23ddcebb
......@@ -191,9 +191,14 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
Eventually(ordererProc.Ready()).Should(BeClosed())
}
By("Finding leader")
leader := findLeader(ordererRunners)
leaderIndex := leader - 1
blockSeq := 0
By("Checking that all orderers are online")
assertBlockReception(map[string]int{
"systemchannel": 0,
"systemchannel": blockSeq,
}, orderers, peer, network)
By("Preparing new certificates for the orderer nodes")
......@@ -213,14 +218,33 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
metadata.Consenters = newConsenters
})
blockSeq++
}
for i, rotation := range certificateRotations {
o := network.Orderers[i]
port := network.OrdererPort(o, nwo.ListenPort)
rotate := func(target int) {
// submit a config tx to rotate the cert of an orderer.
// The orderer being rotated is going to be unavailable
// eventually, therefore submitter of tx is different
// from the target, so the configuration can be reliably
// checked.
submitter := (target + 1) % 3
rotation := certificateRotations[target]
targetOrderer := network.Orderers[target]
remainder := func() []*nwo.Orderer {
var ret []*nwo.Orderer
for i, o := range network.Orderers {
if i == target {
continue
}
ret = append(ret, o)
}
return ret
}()
submitterOrderer := network.Orderers[submitter]
port := network.OrdererPort(targetOrderer, nwo.ListenPort)
fmt.Fprintf(GinkgoWriter, "Adding the future certificate of orderer node %d", i+1)
swap(o, rotation.oldCert, etcdraft.Consenter{
fmt.Fprintf(GinkgoWriter, "Rotating certificate of orderer node %d\n", target+1)
swap(submitterOrderer, rotation.oldCert, etcdraft.Consenter{
ServerTlsCert: rotation.newCert,
ClientTlsCert: rotation.newCert,
Host: "127.0.0.1",
......@@ -229,25 +253,57 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
By("Waiting for all orderers to sync")
assertBlockReception(map[string]int{
"systemchannel": i + 1,
}, orderers, peer, network)
"systemchannel": blockSeq,
}, remainder, peer, network)
By("Waiting for rotated node to be unavailable")
c := commands.ChannelFetch{
ChannelID: network.SystemChannel.Name,
Block: "newest",
OutputFile: "/dev/null",
Orderer: network.OrdererAddress(targetOrderer, nwo.ListenPort),
}
Eventually(func() string {
sess, err := network.OrdererAdminSession(targetOrderer, peer, c)
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit())
if sess.ExitCode() != 0 {
return fmt.Sprintf("exit code is %d: %s", sess.ExitCode(), string(sess.Err.Contents()))
}
sessErr := string(sess.Err.Contents())
expected := fmt.Sprintf("Received block: %d", blockSeq)
if strings.Contains(sessErr, expected) {
return ""
}
return sessErr
}, network.EventuallyTimeout, time.Second).ShouldNot(BeEmpty())
By("Killing the orderer")
ordererProcesses[i].Signal(syscall.SIGTERM)
Eventually(ordererProcesses[i].Wait(), network.EventuallyTimeout).Should(Receive())
ordererProcesses[target].Signal(syscall.SIGTERM)
Eventually(ordererProcesses[target].Wait(), network.EventuallyTimeout).Should(Receive())
By("Starting the orderer again")
ordererRunner := network.OrdererRunner(orderers[i])
ordererRunner := network.OrdererRunner(targetOrderer)
ordererRunners = append(ordererRunners, ordererRunner)
ordererProcesses[i] = ifrit.Invoke(grouper.Member{Name: orderers[i].ID(), Runner: ordererRunner})
Eventually(ordererProcesses[i].Ready()).Should(BeClosed())
ordererProcesses[target] = ifrit.Invoke(grouper.Member{Name: orderers[target].ID(), Runner: ordererRunner})
Eventually(ordererProcesses[target].Ready()).Should(BeClosed())
By("And waiting for it to stabilize")
assertBlockReception(map[string]int{
"systemchannel": i + 1,
"systemchannel": blockSeq,
}, orderers, peer, network)
}
By(fmt.Sprintf("Rotating cert on leader %d", leader))
rotate(leaderIndex)
By("Rotating certificates of other orderer nodes")
for i := range certificateRotations {
if i != leaderIndex {
rotate(i)
}
}
})
It("is still possible to onboard new orderers", func() {
......
......@@ -169,21 +169,32 @@ func CurrentConfigBlockNumber(n *Network, peer *Peer, orderer *Orderer, channel
// fetch the config block
output := filepath.Join(tempDir, "config_block.pb")
sess, err := n.OrdererAdminSession(orderer, peer, commands.ChannelFetch{
ChannelID: channel,
Block: "config",
Orderer: n.OrdererAddress(orderer, ListenPort),
OutputFile: output,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess.Err).To(gbytes.Say("Received block: "))
FetchConfigBlock(n, peer, orderer, channel, output)
// unmarshal the config block bytes
configBlock := UnmarshalBlockFromFile(output)
return configBlock.Header.Number
}
// FetchConfigBlock fetches latest config block.
func FetchConfigBlock(n *Network, peer *Peer, orderer *Orderer, channel string, output string) {
fetch := func() int {
sess, err := n.OrdererAdminSession(orderer, peer, commands.ChannelFetch{
ChannelID: channel,
Block: "config",
Orderer: n.OrdererAddress(orderer, ListenPort),
OutputFile: output,
})
Expect(err).NotTo(HaveOccurred())
code := sess.Wait(n.EventuallyTimeout).ExitCode()
if code == 0 {
Expect(sess.Err).To(gbytes.Say("Received block: "))
}
return code
}
Eventually(fetch, n.EventuallyTimeout).Should(Equal(0))
}
// UpdateOrdererConfigFail computes, signs, and submits a configuration update which requires orderers signature
// and waits for the update to FAIL.
func UpdateOrdererConfigFail(n *Network, orderer *Orderer, channel string, current, updated *common.Config, submitter *Peer, additionalSigners ...*Orderer) {
......
......@@ -60,7 +60,7 @@ const (
DefaultLeaderlessCheckInterval = time.Second * 10
)
//go:generate mockery -dir . -name Configurator -case underscore -output ./mocks/
//go:generate counterfeiter -o mocks/configurator.go . Configurator
// Configurator is used to configure the communication layer
// when the chain starts.
......@@ -683,7 +683,7 @@ func (c *Chain) serveRequest() {
c.propose(propC, bc, batches...)
if c.configInflight {
c.logger.Info("Received config block, pause accepting transaction till it is committed")
c.logger.Info("Received config transaction, pause accepting transaction till it is committed")
submitC = nil
} else if c.blockInflight >= c.opts.MaxInflightBlocks {
c.logger.Debugf("Number of in-flight blocks (%d) reaches limit (%d), pause accepting transaction",
......@@ -1136,6 +1136,9 @@ func (c *Chain) configureComm() error {
}
func (c *Chain) remotePeers() ([]cluster.RemoteNode, error) {
c.raftMetadataLock.RLock()
defer c.raftMetadataLock.RUnlock()
var nodes []cluster.RemoteNode
for raftID, consenter := range c.opts.Consenters {
// No need to know yourself
......@@ -1245,8 +1248,19 @@ func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
c.configInflight = true
} else if configMembership.Rotated() {
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
lead := atomic.LoadUint64(&c.lastKnownLeader)
if configMembership.RotatedNode == lead {
c.logger.Infof("Certificate of Raft leader is being rotated, attempt leader transfer before reconfiguring communication")
go func() {
c.Node.abdicateLeader(lead)
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}()
} else {
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}
}
......
......@@ -35,14 +35,13 @@ import (
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.uber.org/zap"
)
const (
interval = time.Second
interval = 100 * time.Millisecond
LongEventualTimeout = 10 * time.Second
// 10 is the default setting of ELECTION_TICK.
......@@ -90,7 +89,7 @@ var _ = Describe("Chain", func() {
Describe("Single Raft node", func() {
var (
configurator *mocks.Configurator
configurator *mocks.FakeConfigurator
consenterMetadata *raftprotos.ConfigMetadata
consenters map[uint64]*raftprotos.Consenter
clock *fakeclock.FakeClock
......@@ -108,8 +107,7 @@ var _ = Describe("Chain", func() {
)
BeforeEach(func() {
configurator = &mocks.Configurator{}
configurator.On("Configure", mock.Anything, mock.Anything)
configurator = &mocks.FakeConfigurator{}
clock = fakeclock.NewFakeClock(time.Now())
storage = raft.NewMemoryStorage()
......@@ -209,7 +207,9 @@ var _ = Describe("Chain", func() {
Context("when a node starts up", func() {
It("properly configures the communication layer", func() {
expectedNodeConfig := nodeConfigFromMetadata(consenterMetadata)
configurator.AssertCalled(testingInstance, "Configure", channelID, expectedNodeConfig)
Eventually(configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(1))
_, arg2 := configurator.ConfigureArgsForCall(0)
Expect(arg2).To(Equal(expectedNodeConfig))
})
It("correctly sets the metrics labels and publishes requisite metrics", func() {
......@@ -1561,6 +1561,178 @@ var _ = Describe("Chain", func() {
By("sending config transaction")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
network.exec(func(c *chain) {
Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
})
})
It("rotates leader certificate and triggers leadership transfer", func() {
metadata := &raftprotos.ConfigMetadata{}
for id, consenter := range consenters {
if id == 1 {
// remove second consenter
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
// add new consenter
newConsenter := &raftprotos.Consenter{
Host: "localhost",
Port: 7050,
ServerTlsCert: serverTLSCert(tlsCA),
ClientTlsCert: clientTLSCert(tlsCA),
}
metadata.Consenters = append(metadata.Consenters, newConsenter)
value := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
By("creating new configuration with removed node and new one")
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, value))
c1.cutter.CutNext = true
By("sending config transaction")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
Eventually(c1.observe, LongEventualTimeout).Should(Receive(StateEqual(0, raft.StateFollower)))
network.exec(func(c *chain) {
Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
})
})
When("Leader is disconnected after cert rotation", func() {
It("still configures communication after failed leader transfer attempt", func() {
metadata := &raftprotos.ConfigMetadata{}
for id, consenter := range consenters {
if id == 1 {
// remove second consenter
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
// add new consenter
newConsenter := &raftprotos.Consenter{
Host: "localhost",
Port: 7050,
ServerTlsCert: serverTLSCert(tlsCA),
ClientTlsCert: clientTLSCert(tlsCA),
}
metadata.Consenters = append(metadata.Consenters, newConsenter)
value := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
By("creating new configuration with removed node and new one")
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, value))
c1.cutter.CutNext = true
step1 := c1.getStepFunc()
count := c1.rpc.SendConsensusCallCount() // record current step call count
c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error {
// disconnect network after 4 MsgApp are sent by c1:
// - 2 MsgApp to c2 & c3 that replicate data to raft followers
// - 2 MsgApp to c2 & c3 that instructs followers to commit data
if c1.rpc.SendConsensusCallCount() == count+4 {
defer network.disconnect(1)
}
return step1(dest, msg)
})
By("sending config transaction")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
Consistently(c1.observe).ShouldNot(Receive())
network.exec(func(c *chain) {
Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
})
})
})
When("Follower is disconnected while leader cert is being rotated", func() {
It("still configures communication and transfer leader", func() {
metadata := &raftprotos.ConfigMetadata{}
for id, consenter := range consenters {
if id == 1 {
// remove second consenter
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
// add new consenter
newConsenter := &raftprotos.Consenter{
Host: "localhost",
Port: 7050,
ServerTlsCert: serverTLSCert(tlsCA),
ClientTlsCert: clientTLSCert(tlsCA),
}
metadata.Consenters = append(metadata.Consenters, newConsenter)
value := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
cnt := c1.rpc.SendConsensusCallCount()
network.disconnect(3)
// Trigger some heartbeats to be sent so that leader notices
// failed message delivery to 3, and mark it as Paused.
// This is to ensure leadership is transferred to 2.
Eventually(func() int {
c1.clock.Increment(interval)
return c1.rpc.SendConsensusCallCount()
}, LongEventualTimeout).Should(BeNumerically(">=", cnt+5))
By("creating new configuration with removed node and new one")
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, value))
c1.cutter.CutNext = true
By("sending config transaction")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
Eventually(c1.observe, LongEventualTimeout).Should(Receive(StateEqual(2, raft.StateFollower)))
network.Lock()
network.leader = 2 // manually set network leader
network.Unlock()
network.disconnect(1)
network.exec(func(c *chain) {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
}, 1, 2)
network.join(3, true)
Eventually(c3.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c3.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
By("Ordering normal transaction")
c2.cutter.CutNext = true
Expect(c3.Order(env, 0)).To(Succeed())
network.exec(func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
}, 2, 3)
})
})
When("two type B config are sent back-to-back", func() {
......@@ -2934,7 +3106,7 @@ type chain struct {
support *consensusmocks.FakeConsenterSupport
cutter *mockblockcutter.Receiver
configurator *mocks.Configurator
configurator *mocks.FakeConfigurator
rpc *mocks.FakeRPC
storage *raft.MemoryStorage
walDir string
......@@ -2999,9 +3171,7 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,
// sent on this chan, so we need size to be 2
observe := make(chan raft.SoftState, 2)
configurator := &mocks.Configurator{}
configurator.On("Configure", mock.Anything, mock.Anything)
configurator := &mocks.FakeConfigurator{}
puller := &mocks.FakeBlockPuller{}
ch := make(chan struct{})
......@@ -3250,11 +3420,7 @@ func (n *network) addChain(c *chain) {
return map[string]uint64{"leader": leader.ledgerHeight}, nil
}
*c.configurator = mocks.Configurator{}
c.configurator.On("Configure", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
nodes, ok := args[1].([]cluster.RemoteNode)
Expect(ok).To(BeTrue())
c.configurator.ConfigureCalls(func(channel string, nodes []cluster.RemoteNode) {
var ids []uint64
for _, node := range nodes {
ids = append(ids, node.ID)
......
// Code generated by mockery v1.0.0. DO NOT EDIT.
// Code generated by counterfeiter. DO NOT EDIT.
package mocks
import cluster "github.com/hyperledger/fabric/orderer/common/cluster"
import (
"sync"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
)
type FakeConfigurator struct {
ConfigureStub func(string, []cluster.RemoteNode)
configureMutex sync.RWMutex
configureArgsForCall []struct {
arg1 string
arg2 []cluster.RemoteNode
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *FakeConfigurator) Configure(arg1 string, arg2 []cluster.RemoteNode) {
var arg2Copy []cluster.RemoteNode
if arg2 != nil {
arg2Copy = make([]cluster.RemoteNode, len(arg2))
copy(arg2Copy, arg2)
}
fake.configureMutex.Lock()
fake.configureArgsForCall = append(fake.configureArgsForCall, struct {
arg1 string
arg2 []cluster.RemoteNode
}{arg1, arg2Copy})
fake.recordInvocation("Configure", []interface{}{arg1, arg2Copy})
fake.configureMutex.Unlock()
if fake.ConfigureStub != nil {
fake.ConfigureStub(arg1, arg2)
}
}
func (fake *FakeConfigurator) ConfigureCallCount() int {
fake.configureMutex.RLock()
defer fake.configureMutex.RUnlock()
return len(fake.configureArgsForCall)
}
func (fake *FakeConfigurator) ConfigureCalls(stub func(string, []cluster.RemoteNode)) {
fake.configureMutex.Lock()
defer fake.configureMutex.Unlock()
fake.ConfigureStub = stub
}
import mock "github.com/stretchr/testify/mock"
func (fake *FakeConfigurator) ConfigureArgsForCall(i int) (string, []cluster.RemoteNode) {
fake.configureMutex.RLock()
defer fake.configureMutex.RUnlock()
argsForCall := fake.configureArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
// Configurator is an autogenerated mock type for the Configurator type
type Configurator struct {
mock.Mock
func (fake *FakeConfigurator) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.configureMutex.RLock()
defer fake.configureMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
// Configure provides a mock function with given fields: channel, newNodes
func (_m *Configurator) Configure(channel string, newNodes []cluster.RemoteNode) {
_m.Called(channel, newNodes)
func (fake *FakeConfigurator) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}
}
if fake.invocations[key] == nil {
fake.invocations[key] = [][]interface{}{}
}
fake.invocations[key] = append(fake.invocations[key], args)
}
var _ etcdraft.Configurator = new(FakeConfigurator)
......@@ -9,21 +9,15 @@ import (
)
type FakeBlockPuller struct {
PullBlockStub func(seq uint64) *common.Block
pullBlockMutex sync.RWMutex
pullBlockArgsForCall []struct {
seq uint64
}
pullBlockReturns struct {
result1 *common.Block
}
pullBlockReturnsOnCall map[int]struct {
result1 *common.Block
CloseStub func()
closeMutex sync.RWMutex
closeArgsForCall []struct {
}
HeightsByEndpointsStub func() (map[string]uint64, error)
heightsByEndpointsMutex sync.RWMutex
heightsByEndpointsArgsForCall []struct{}
heightsByEndpointsReturns struct {
heightsByEndpointsArgsForCall []struct {
}
heightsByEndpointsReturns struct {
result1 map[string]uint64
result2 error
}
......@@ -31,65 +25,49 @@ type FakeBlockPuller struct {