Commit 42f0e647 authored by Artem Barger's avatar Artem Barger Committed by Gerrit Code Review
Browse files

Merge "FAB-14540 transfer leader if cert of it is rotated" into release-1.4

parents d55833d2 7e440c73
......@@ -191,9 +191,14 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
Eventually(ordererProc.Ready()).Should(BeClosed())
}
By("Finding leader")
leader := findLeader(ordererRunners)
leaderIndex := leader - 1
blockSeq := 0
By("Checking that all orderers are online")
assertBlockReception(map[string]int{
"systemchannel": 0,
"systemchannel": blockSeq,
}, orderers, peer, network)
By("Preparing new certificates for the orderer nodes")
......@@ -213,14 +218,33 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
metadata.Consenters = newConsenters
})
blockSeq++
}
for i, rotation := range certificateRotations {
o := network.Orderers[i]
port := network.OrdererPort(o, nwo.ListenPort)
rotate := func(target int) {
// submit a config tx to rotate the cert of an orderer.
// The orderer being rotated is going to be unavailable
// eventually, therefore submitter of tx is different
// from the target, so the configuration can be reliably
// checked.
submitter := (target + 1) % 3
rotation := certificateRotations[target]
targetOrderer := network.Orderers[target]
remainder := func() []*nwo.Orderer {
var ret []*nwo.Orderer
for i, o := range network.Orderers {
if i == target {
continue
}
ret = append(ret, o)
}
return ret
}()
submitterOrderer := network.Orderers[submitter]
port := network.OrdererPort(targetOrderer, nwo.ListenPort)
fmt.Fprintf(GinkgoWriter, "Adding the future certificate of orderer node %d", i+1)
swap(o, rotation.oldCert, etcdraft.Consenter{
fmt.Fprintf(GinkgoWriter, "Rotating certificate of orderer node %d\n", target+1)
swap(submitterOrderer, rotation.oldCert, etcdraft.Consenter{
ServerTlsCert: rotation.newCert,
ClientTlsCert: rotation.newCert,
Host: "127.0.0.1",
......@@ -229,25 +253,57 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
By("Waiting for all orderers to sync")
assertBlockReception(map[string]int{
"systemchannel": i + 1,
}, orderers, peer, network)
"systemchannel": blockSeq,
}, remainder, peer, network)
By("Waiting for rotated node to be unavailable")
c := commands.ChannelFetch{
ChannelID: network.SystemChannel.Name,
Block: "newest",
OutputFile: "/dev/null",
Orderer: network.OrdererAddress(targetOrderer, nwo.ListenPort),
}
Eventually(func() string {
sess, err := network.OrdererAdminSession(targetOrderer, peer, c)
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit())
if sess.ExitCode() != 0 {
return fmt.Sprintf("exit code is %d: %s", sess.ExitCode(), string(sess.Err.Contents()))
}
sessErr := string(sess.Err.Contents())
expected := fmt.Sprintf("Received block: %d", blockSeq)
if strings.Contains(sessErr, expected) {
return ""
}
return sessErr
}, network.EventuallyTimeout, time.Second).ShouldNot(BeEmpty())
By("Killing the orderer")
ordererProcesses[i].Signal(syscall.SIGTERM)
Eventually(ordererProcesses[i].Wait(), network.EventuallyTimeout).Should(Receive())
ordererProcesses[target].Signal(syscall.SIGTERM)
Eventually(ordererProcesses[target].Wait(), network.EventuallyTimeout).Should(Receive())
By("Starting the orderer again")
ordererRunner := network.OrdererRunner(orderers[i])
ordererRunner := network.OrdererRunner(targetOrderer)
ordererRunners = append(ordererRunners, ordererRunner)
ordererProcesses[i] = ifrit.Invoke(grouper.Member{Name: orderers[i].ID(), Runner: ordererRunner})
Eventually(ordererProcesses[i].Ready()).Should(BeClosed())
ordererProcesses[target] = ifrit.Invoke(grouper.Member{Name: orderers[target].ID(), Runner: ordererRunner})
Eventually(ordererProcesses[target].Ready()).Should(BeClosed())
By("And waiting for it to stabilize")
assertBlockReception(map[string]int{
"systemchannel": i + 1,
"systemchannel": blockSeq,
}, orderers, peer, network)
}
By(fmt.Sprintf("Rotating cert on leader %d", leader))
rotate(leaderIndex)
By("Rotating certificates of other orderer nodes")
for i := range certificateRotations {
if i != leaderIndex {
rotate(i)
}
}
})
It("is still possible to onboard new orderers", func() {
......
......@@ -169,21 +169,32 @@ func CurrentConfigBlockNumber(n *Network, peer *Peer, orderer *Orderer, channel
// fetch the config block
output := filepath.Join(tempDir, "config_block.pb")
sess, err := n.OrdererAdminSession(orderer, peer, commands.ChannelFetch{
ChannelID: channel,
Block: "config",
Orderer: n.OrdererAddress(orderer, ListenPort),
OutputFile: output,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, n.EventuallyTimeout).Should(gexec.Exit(0))
Expect(sess.Err).To(gbytes.Say("Received block: "))
FetchConfigBlock(n, peer, orderer, channel, output)
// unmarshal the config block bytes
configBlock := UnmarshalBlockFromFile(output)
return configBlock.Header.Number
}
// FetchConfigBlock fetches latest config block.
func FetchConfigBlock(n *Network, peer *Peer, orderer *Orderer, channel string, output string) {
fetch := func() int {
sess, err := n.OrdererAdminSession(orderer, peer, commands.ChannelFetch{
ChannelID: channel,
Block: "config",
Orderer: n.OrdererAddress(orderer, ListenPort),
OutputFile: output,
})
Expect(err).NotTo(HaveOccurred())
code := sess.Wait(n.EventuallyTimeout).ExitCode()
if code == 0 {
Expect(sess.Err).To(gbytes.Say("Received block: "))
}
return code
}
Eventually(fetch, n.EventuallyTimeout).Should(Equal(0))
}
// UpdateOrdererConfigFail computes, signs, and submits a configuration update which requires orderers signature
// and waits for the update to FAIL.
func UpdateOrdererConfigFail(n *Network, orderer *Orderer, channel string, current, updated *common.Config, submitter *Peer, additionalSigners ...*Orderer) {
......
......@@ -60,7 +60,7 @@ const (
DefaultLeaderlessCheckInterval = time.Second * 10
)
//go:generate mockery -dir . -name Configurator -case underscore -output ./mocks/
//go:generate counterfeiter -o mocks/configurator.go . Configurator
// Configurator is used to configure the communication layer
// when the chain starts.
......@@ -683,7 +683,7 @@ func (c *Chain) serveRequest() {
c.propose(propC, bc, batches...)
if c.configInflight {
c.logger.Info("Received config block, pause accepting transaction till it is committed")
c.logger.Info("Received config transaction, pause accepting transaction till it is committed")
submitC = nil
} else if c.blockInflight >= c.opts.MaxInflightBlocks {
c.logger.Debugf("Number of in-flight blocks (%d) reaches limit (%d), pause accepting transaction",
......@@ -1136,6 +1136,9 @@ func (c *Chain) configureComm() error {
}
func (c *Chain) remotePeers() ([]cluster.RemoteNode, error) {
c.raftMetadataLock.RLock()
defer c.raftMetadataLock.RUnlock()
var nodes []cluster.RemoteNode
for raftID, consenter := range c.opts.Consenters {
// No need to know yourself
......@@ -1245,8 +1248,19 @@ func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
c.configInflight = true
} else if configMembership.Rotated() {
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
lead := atomic.LoadUint64(&c.lastKnownLeader)
if configMembership.RotatedNode == lead {
c.logger.Infof("Certificate of Raft leader is being rotated, attempt leader transfer before reconfiguring communication")
go func() {
c.Node.abdicateLeader(lead)
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}()
} else {
if err := c.configureComm(); err != nil {
c.logger.Panicf("Failed to configure communication: %s", err)
}
}
}
......
......@@ -35,14 +35,13 @@ import (
. "github.com/onsi/gomega"
"github.com/onsi/gomega/types"
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.uber.org/zap"
)
const (
interval = time.Second
interval = 100 * time.Millisecond
LongEventualTimeout = 10 * time.Second
// 10 is the default setting of ELECTION_TICK.
......@@ -90,7 +89,7 @@ var _ = Describe("Chain", func() {
Describe("Single Raft node", func() {
var (
configurator *mocks.Configurator
configurator *mocks.FakeConfigurator
consenterMetadata *raftprotos.ConfigMetadata
consenters map[uint64]*raftprotos.Consenter
clock *fakeclock.FakeClock
......@@ -108,8 +107,7 @@ var _ = Describe("Chain", func() {
)
BeforeEach(func() {
configurator = &mocks.Configurator{}
configurator.On("Configure", mock.Anything, mock.Anything)
configurator = &mocks.FakeConfigurator{}
clock = fakeclock.NewFakeClock(time.Now())
storage = raft.NewMemoryStorage()
......@@ -209,7 +207,9 @@ var _ = Describe("Chain", func() {
Context("when a node starts up", func() {
It("properly configures the communication layer", func() {
expectedNodeConfig := nodeConfigFromMetadata(consenterMetadata)
configurator.AssertCalled(testingInstance, "Configure", channelID, expectedNodeConfig)
Eventually(configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(1))
_, arg2 := configurator.ConfigureArgsForCall(0)
Expect(arg2).To(Equal(expectedNodeConfig))
})
It("correctly sets the metrics labels and publishes requisite metrics", func() {
......@@ -1561,6 +1561,178 @@ var _ = Describe("Chain", func() {
By("sending config transaction")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
network.exec(func(c *chain) {
Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
})
})
It("rotates leader certificate and triggers leadership transfer", func() {
metadata := &raftprotos.ConfigMetadata{}
for id, consenter := range consenters {
if id == 1 {
// remove second consenter
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
// add new consenter
newConsenter := &raftprotos.Consenter{
Host: "localhost",
Port: 7050,
ServerTlsCert: serverTLSCert(tlsCA),
ClientTlsCert: clientTLSCert(tlsCA),
}
metadata.Consenters = append(metadata.Consenters, newConsenter)
value := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
By("creating new configuration with removed node and new one")
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, value))
c1.cutter.CutNext = true
By("sending config transaction")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
Eventually(c1.observe, LongEventualTimeout).Should(Receive(StateEqual(0, raft.StateFollower)))
network.exec(func(c *chain) {
Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
})
})
When("Leader is disconnected after cert rotation", func() {
It("still configures communication after failed leader transfer attempt", func() {
metadata := &raftprotos.ConfigMetadata{}
for id, consenter := range consenters {
if id == 1 {
// remove second consenter
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
// add new consenter
newConsenter := &raftprotos.Consenter{
Host: "localhost",
Port: 7050,
ServerTlsCert: serverTLSCert(tlsCA),
ClientTlsCert: clientTLSCert(tlsCA),
}
metadata.Consenters = append(metadata.Consenters, newConsenter)
value := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
By("creating new configuration with removed node and new one")
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, value))
c1.cutter.CutNext = true
step1 := c1.getStepFunc()
count := c1.rpc.SendConsensusCallCount() // record current step call count
c1.setStepFunc(func(dest uint64, msg *orderer.ConsensusRequest) error {
// disconnect network after 4 MsgApp are sent by c1:
// - 2 MsgApp to c2 & c3 that replicate data to raft followers
// - 2 MsgApp to c2 & c3 that instructs followers to commit data
if c1.rpc.SendConsensusCallCount() == count+4 {
defer network.disconnect(1)
}
return step1(dest, msg)
})
By("sending config transaction")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
Consistently(c1.observe).ShouldNot(Receive())
network.exec(func(c *chain) {
Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
})
})
})
When("Follower is disconnected while leader cert is being rotated", func() {
It("still configures communication and transfer leader", func() {
metadata := &raftprotos.ConfigMetadata{}
for id, consenter := range consenters {
if id == 1 {
// remove second consenter
continue
}
metadata.Consenters = append(metadata.Consenters, consenter)
}
// add new consenter
newConsenter := &raftprotos.Consenter{
Host: "localhost",
Port: 7050,
ServerTlsCert: serverTLSCert(tlsCA),
ClientTlsCert: clientTLSCert(tlsCA),
}
metadata.Consenters = append(metadata.Consenters, newConsenter)
value := map[string]*common.ConfigValue{
"ConsensusType": {
Version: 1,
Value: marshalOrPanic(&orderer.ConsensusType{
Metadata: marshalOrPanic(metadata),
}),
},
}
cnt := c1.rpc.SendConsensusCallCount()
network.disconnect(3)
// Trigger some heartbeats to be sent so that leader notices
// failed message delivery to 3, and mark it as Paused.
// This is to ensure leadership is transferred to 2.
Eventually(func() int {
c1.clock.Increment(interval)
return c1.rpc.SendConsensusCallCount()
}, LongEventualTimeout).Should(BeNumerically(">=", cnt+5))
By("creating new configuration with removed node and new one")
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, value))
c1.cutter.CutNext = true
By("sending config transaction")
Expect(c1.Configure(configEnv, 0)).To(Succeed())
Eventually(c1.observe, LongEventualTimeout).Should(Receive(StateEqual(2, raft.StateFollower)))
network.Lock()
network.leader = 2 // manually set network leader
network.Unlock()
network.disconnect(1)
network.exec(func(c *chain) {
Eventually(c.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
}, 1, 2)
network.join(3, true)
Eventually(c3.support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(c3.configurator.ConfigureCallCount, LongEventualTimeout).Should(Equal(2))
By("Ordering normal transaction")
c2.cutter.CutNext = true
Expect(c3.Order(env, 0)).To(Succeed())
network.exec(func(c *chain) {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
}, 2, 3)
})
})
When("two type B config are sent back-to-back", func() {
......@@ -2934,7 +3106,7 @@ type chain struct {
support *consensusmocks.FakeConsenterSupport
cutter *mockblockcutter.Receiver
configurator *mocks.Configurator
configurator *mocks.FakeConfigurator
rpc *mocks.FakeRPC
storage *raft.MemoryStorage
walDir string
......@@ -2999,9 +3171,7 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,
// sent on this chan, so we need size to be 2
observe := make(chan raft.SoftState, 2)
configurator := &mocks.Configurator{}
configurator.On("Configure", mock.Anything, mock.Anything)
configurator := &mocks.FakeConfigurator{}
puller := &mocks.FakeBlockPuller{}
ch := make(chan struct{})
......@@ -3250,11 +3420,7 @@ func (n *network) addChain(c *chain) {
return map[string]uint64{"leader": leader.ledgerHeight}, nil
}
*c.configurator = mocks.Configurator{}
c.configurator.On("Configure", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
nodes, ok := args[1].([]cluster.RemoteNode)
Expect(ok).To(BeTrue())
c.configurator.ConfigureCalls(func(channel string, nodes []cluster.RemoteNode) {
var ids []uint64
for _, node := range nodes {
ids = append(ids, node.ID)
......
// Code generated by mockery v1.0.0. DO NOT EDIT.
// Code generated by counterfeiter. DO NOT EDIT.
package mocks
import cluster "github.com/hyperledger/fabric/orderer/common/cluster"
import (
"sync"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
)
type FakeConfigurator struct {
ConfigureStub func(string, []cluster.RemoteNode)
configureMutex sync.RWMutex
configureArgsForCall []struct {
arg1 string
arg2 []cluster.RemoteNode
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *FakeConfigurator) Configure(arg1 string, arg2 []cluster.RemoteNode) {
var arg2Copy []cluster.RemoteNode
if arg2 != nil {
arg2Copy = make([]cluster.RemoteNode, len(arg2))
copy(arg2Copy, arg2)
}
fake.configureMutex.Lock()
fake.configureArgsForCall = append(fake.configureArgsForCall, struct {
arg1 string
arg2 []cluster.RemoteNode
}{arg1, arg2Copy})
fake.recordInvocation("Configure", []interface{}{arg1, arg2Copy})
fake.configureMutex.Unlock()
if fake.ConfigureStub != nil {
fake.ConfigureStub(arg1, arg2)
}
}
func (fake *FakeConfigurator) ConfigureCallCount() int {
fake.configureMutex.RLock()
defer fake.configureMutex.RUnlock()
return len(fake.configureArgsForCall)
}
func (fake *FakeConfigurator) ConfigureCalls(stub func(string, []cluster.RemoteNode)) {
fake.configureMutex.Lock()
defer fake.configureMutex.Unlock()
fake.ConfigureStub = stub
}
import mock "github.com/stretchr/testify/mock"
func (fake *FakeConfigurator) ConfigureArgsForCall(i int) (string, []cluster.RemoteNode) {
fake.configureMutex.RLock()
defer fake.configureMutex.RUnlock()
argsForCall := fake.configureArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
// Configurator is an autogenerated mock type for the Configurator type
type Configurator struct {
mock.Mock
func (fake *FakeConfigurator) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.configureMutex.RLock()
defer fake.configureMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
// Configure provides a mock function with given fields: channel, newNodes
func (_m *Configurator) Configure(channel string, newNodes []cluster.RemoteNode) {
_m.Called(channel, newNodes)
func (fake *FakeConfigurator) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}
}
if fake.invocations[key] == nil {
fake.invocations[key] = [][]interface{}{}
}
fake.invocations[key] = append(fake.invocations[key], args)
}
var _ etcdraft.Configurator = new(FakeConfigurator)
......@@ -9,21 +9,15 @@ import (
)
type FakeBlockPuller struct {
PullBlockStub func(seq uint64) *common.Block
pullBlockMutex sync.RWMutex
pullBlockArgsForCall []struct {
seq uint64
}
pullBlockReturns struct {
result1 *common.Block
}
pullBlockReturnsOnCall map[int]struct {
result1 *common.Block
CloseStub func()
closeMutex sync.RWMutex
closeArgsForCall []struct {
}
HeightsByEndpointsStub func() (map[string]uint64, error)
heightsByEndpointsMutex sync.RWMutex
heightsByEndpointsArgsForCall []struct{}
heightsByEndpointsReturns struct {
heightsByEndpointsArgsForCall []struct {
}
heightsByEndpointsReturns struct {
result1 map[string]uint64
result2 error
}
......@@ -31,65 +25,49 @@ type FakeBlockPuller struct {
result1 map[string]uint64