Commit cd6e2c61 authored by Jonathan Levi (HACERA)'s avatar Jonathan Levi (HACERA) Committed by Gerrit Code Review
Browse files

Merge "[FAB-5313] Leader election yield if deliver unavailable"

parents e7b20bd3 6962ee36
......@@ -34,12 +34,20 @@ var (
reConnectBackoffThreshold = float64(time.Hour)
)
// SetReconnectTotalTimeThreshold sets the total time the delivery service
// may spend in reconnection attempts until its retry logic gives up
// and returns an error
func SetReconnectTotalTimeThreshold(duration time.Duration) {
reConnectTotalTimeThreshold = duration
}
// DeliverService used to communicate with orderers to obtain
// new blocks and send them to the committer service
type DeliverService interface {
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
// to channel peers.
StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error
// When the delivery finishes, the finalizer func is called
StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error
// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service
// to channel peers.
......@@ -117,7 +125,7 @@ func (d *deliverServiceImpl) validateConfiguration() error {
// initializes the grpc stream for given chainID, creates blocks provider instance
// that spawns in go routine to read new blocks starting from the position provided by ledger
// info instance.
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
......@@ -133,7 +141,10 @@ func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo b
client := d.newClient(chainID, ledgerInfo)
logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID)
d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc)
go d.blockProviders[chainID].DeliverBlocks()
go func() {
d.blockProviders[chainID].DeliverBlocks()
finalizer()
}()
}
return nil
}
......
......@@ -113,10 +113,10 @@ func TestNewDeliverService(t *testing.T) {
ConnFactory: connFactory,
})
assert.NoError(t, err)
assert.NoError(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}))
assert.NoError(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}, func() {}))
// Lets start deliver twice
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}), "can't start delivery")
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}, func() {}), "can't start delivery")
// Lets stop deliver that not started
assert.Error(t, service.StopDeliverForChannel("TEST_CHAINID2"), "can't stop delivery")
......@@ -130,7 +130,7 @@ func TestNewDeliverService(t *testing.T) {
assert.Equal(t, 0, connNumber)
assertBlockDissemination(0, gossipServiceAdapter.GossipBlockDisseminations, t)
assert.Equal(t, atomic.LoadInt32(&blocksDeliverer.RecvCnt), atomic.LoadInt32(&gossipServiceAdapter.AddPayloadsCnt))
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}), "Delivery service is stopping")
assert.Error(t, service.StartDeliverForChannel("TEST_CHAINID", &mocks.MockLedgerInfo{0}, func() {}), "Delivery service is stopping")
assert.Error(t, service.StopDeliverForChannel("TEST_CHAINID"), "Delivery service is stopping")
}
......@@ -157,7 +157,7 @@ func TestDeliverServiceRestart(t *testing.T) {
li := &mocks.MockLedgerInfo{Height: uint64(100)}
os.SetNextExpectedSeek(uint64(100))
err = service.StartDeliverForChannel("TEST_CHAINID", li)
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
assert.NoError(t, err, "can't start delivery")
// Check that delivery client requests blocks in order
go os.SendBlock(uint64(100))
......@@ -203,7 +203,7 @@ func TestDeliverServiceFailover(t *testing.T) {
os1.SetNextExpectedSeek(uint64(100))
os2.SetNextExpectedSeek(uint64(100))
err = service.StartDeliverForChannel("TEST_CHAINID", li)
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
assert.NoError(t, err, "can't start delivery")
// We need to discover to which instance the client connected to
go os1.SendBlock(uint64(100))
......@@ -278,7 +278,7 @@ func TestDeliverServiceServiceUnavailable(t *testing.T) {
os1.SetNextExpectedSeek(li.Height)
os2.SetNextExpectedSeek(li.Height)
err = service.StartDeliverForChannel("TEST_CHAINID", li)
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
assert.NoError(t, err, "can't start delivery")
waitForConnectionToSomeOSN := func() (*mocks.Orderer, *mocks.Orderer) {
......@@ -367,7 +367,7 @@ func TestDeliverServiceShutdown(t *testing.T) {
li := &mocks.MockLedgerInfo{Height: uint64(100)}
os.SetNextExpectedSeek(uint64(100))
err = service.StartDeliverForChannel("TEST_CHAINID", li)
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
assert.NoError(t, err, "can't start delivery")
// Check that delivery service requests blocks in order
......
......@@ -48,7 +48,7 @@ type mockDeliveryClient struct {
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
// to channel peers.
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, f func()) error {
return nil
}
......
......@@ -55,7 +55,7 @@ type mockDeliveryClient struct {
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
// to channel peers.
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
func (ds *mockDeliveryClient) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, f func()) error {
return nil
}
......
......@@ -93,6 +93,10 @@ type LeaderElectionService interface {
// Stop stops the LeaderElectionService
Stop()
// Yield relinquishes the leadership until a new leader is elected,
// or a timeout expires
Yield()
}
type peerID []byte
......@@ -150,10 +154,12 @@ type leaderElectionSvcImpl struct {
isLeader int32
toDie int32
leaderExists int32
yield int32
sleeping bool
adapter LeaderElectionAdapter
logger *logging.Logger
callback leadershipCallback
yieldTimer *time.Timer
}
func (le *leaderElectionSvcImpl) start() {
......@@ -239,6 +245,11 @@ func (le *leaderElectionSvcImpl) run() {
if !le.isLeaderExists() {
le.leaderElection()
}
// If we are yielding and some leader has been elected,
// stop yielding
if le.isLeaderExists() && le.isYielding() {
le.stopYielding()
}
if le.shouldStop() {
return
}
......@@ -253,7 +264,14 @@ func (le *leaderElectionSvcImpl) run() {
func (le *leaderElectionSvcImpl) leaderElection() {
le.logger.Debug(le.id, ": Entering")
defer le.logger.Debug(le.id, ": Exiting")
// If we're yielding to other peers, do not participate
// in leader election
if le.isYielding() {
return
}
// Propose ourselves as a leader
le.propose()
// Collect other proposals
le.waitForInterrupt(getLeaderElectionDuration())
// If someone declared itself as a leader, give up
// on trying to become a leader too
......@@ -261,6 +279,11 @@ func (le *leaderElectionSvcImpl) leaderElection() {
le.logger.Debug(le.id, ": Some peer is already a leader")
return
}
if le.isYielding() {
le.logger.Debug(le.id, ": Aborting leader election because yielding")
return
}
// Leader doesn't exist, let's see if there is a better candidate than us
// for being a leader
for _, o := range le.proposals.ToArray() {
......@@ -364,6 +387,38 @@ func (le *leaderElectionSvcImpl) shouldStop() bool {
return atomic.LoadInt32(&le.toDie) == int32(1)
}
func (le *leaderElectionSvcImpl) isYielding() bool {
return atomic.LoadInt32(&le.yield) == int32(1)
}
func (le *leaderElectionSvcImpl) stopYielding() {
le.logger.Debug("Stopped yielding")
le.Lock()
defer le.Unlock()
atomic.StoreInt32(&le.yield, int32(0))
le.yieldTimer.Stop()
}
// Yield relinquishes the leadership until a new leader is elected,
// or a timeout expires
func (le *leaderElectionSvcImpl) Yield() {
le.Lock()
defer le.Unlock()
if !le.IsLeader() || le.isYielding() {
return
}
// Turn on the yield flag
atomic.StoreInt32(&le.yield, int32(1))
// Stop being a leader
le.stopBeingLeader()
// Clear the leader exists flag since it could be that we are the leader
atomic.StoreInt32(&le.leaderExists, int32(0))
// Clear the yield flag in any case afterwards
le.yieldTimer = time.AfterFunc(getLeaderAliveThreshold()*6, func() {
atomic.StoreInt32(&le.yield, int32(0))
})
}
// Stop stops the LeaderElectionService
func (le *leaderElectionSvcImpl) Stop() {
le.logger.Debug(le.id, ": Entering")
......
......@@ -308,6 +308,70 @@ func TestLeadershipTakeover(t *testing.T) {
assert.Equal(t, "p2", leaders[0])
}
func TestYield(t *testing.T) {
t.Parallel()
// Scenario: Peers spawn and a leader is elected.
// After a while, the leader yields.
// (Call yield twice to ensure only one callback is called)
// Expected outcome:
// (1) A new leader is elected
// (2) The old leader doesn't take back its leadership
peers := createPeers(0, 0, 1, 2, 3, 4, 5)
leaders := waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p0", leaders[0])
peers[0].Yield()
// Ensure the callback was called with 'false'
assert.True(t, peers[0].isCallbackInvoked())
assert.False(t, peers[0].isLeaderFromCallback())
// Clear the callback invoked flag
peers[0].lock.Lock()
peers[0].callbackInvoked = false
peers[0].lock.Unlock()
// Yield again and ensure it isn't called again
peers[0].Yield()
assert.False(t, peers[0].isCallbackInvoked())
ensureP0isNotAleader := func() bool {
leaders := waitForLeaderElection(t, peers)
return len(leaders) == 1 && leaders[0] != "p0"
}
// A new leader is elected, and it is not p0
waitForBoolFunc(t, ensureP0isNotAleader, true)
time.Sleep(getLeaderAliveThreshold() * 2)
// After a while, p0 doesn't restore its leadership status
waitForBoolFunc(t, ensureP0isNotAleader, true)
}
func TestYieldSinglePeer(t *testing.T) {
t.Parallel()
// Scenario: spawn a single peer and have it yield.
// Ensure it recovers its leadership after a while.
peers := createPeers(0, 0)
waitForLeaderElection(t, peers)
peers[0].Yield()
assert.False(t, peers[0].IsLeader())
waitForLeaderElection(t, peers)
}
func TestYieldAllPeers(t *testing.T) {
t.Parallel()
// Scenario: spawn 2 peers and have them all yield after regaining leadership.
// Ensure the first peer is the leader in the end after both peers yield
peers := createPeers(0, 0, 1)
leaders := waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p0", leaders[0])
peers[0].Yield()
leaders = waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p1", leaders[0])
peers[1].Yield()
leaders = waitForLeaderElection(t, peers)
assert.Len(t, leaders, 1, "Only 1 leader should have been elected")
assert.Equal(t, "p0", leaders[0])
}
func TestPartition(t *testing.T) {
t.Parallel()
// Scenario: peers spawn together, and then after a while a network partition occurs
......
......@@ -196,7 +196,7 @@ func (g *gossipServiceImpl) InitializeChannel(chainID string, committer committe
g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, committer))
} else if isStaticOrgLeader {
logger.Debug("This peer is configured to connect to ordering service for blocks delivery, channel", chainID)
g.deliveryService.StartDeliverForChannel(chainID, committer)
g.deliveryService.StartDeliverForChannel(chainID, committer, func() {})
} else {
logger.Debug("This peer is not configured to connect to ordering service for blocks delivery, channel", chainID)
}
......@@ -282,8 +282,12 @@ func (g *gossipServiceImpl) amIinChannel(myOrg string, config Config) bool {
func (g *gossipServiceImpl) onStatusChangeFactory(chainID string, committer blocksprovider.LedgerInfo) func(bool) {
return func(isLeader bool) {
if isLeader {
yield := func() {
le := g.leaderElection[chainID]
le.Yield()
}
logger.Info("Elected as a leader, starting delivery service for channel", chainID)
if err := g.deliveryService.StartDeliverForChannel(chainID, committer); err != nil {
if err := g.deliveryService.StartDeliverForChannel(chainID, committer, yield); err != nil {
logger.Error("Delivery service is not able to start blocks delivery for chain, due to", err)
}
} else {
......
......@@ -276,7 +276,7 @@ type mockDeliverService struct {
running map[string]bool
}
func (ds *mockDeliverService) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo) error {
func (ds *mockDeliverService) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
ds.running[chainID] = true
return nil
}
......
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package service
import (
"fmt"
"sync"
"testing"
"time"
"github.com/hyperledger/fabric/core/deliverservice"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/election"
"github.com/hyperledger/fabric/gossip/identity"
"github.com/hyperledger/fabric/gossip/state"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)
type embeddingDeliveryService struct {
deliverclient.DeliverService
startSignal sync.WaitGroup
stopSignal sync.WaitGroup
}
func newEmbeddingDeliveryService(ds deliverclient.DeliverService) *embeddingDeliveryService {
eds := &embeddingDeliveryService{
DeliverService: ds,
}
eds.startSignal.Add(1)
eds.stopSignal.Add(1)
return eds
}
func (eds *embeddingDeliveryService) waitForDeliveryServiceActivation() {
eds.startSignal.Wait()
}
func (eds *embeddingDeliveryService) waitForDeliveryServiceTermination() {
eds.stopSignal.Wait()
}
func (eds *embeddingDeliveryService) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
eds.startSignal.Done()
return eds.DeliverService.StartDeliverForChannel(chainID, ledgerInfo, finalizer)
}
func (eds *embeddingDeliveryService) StopDeliverForChannel(chainID string) error {
eds.stopSignal.Done()
return eds.DeliverService.StopDeliverForChannel(chainID)
}
func (eds *embeddingDeliveryService) Stop() {
eds.DeliverService.Stop()
}
type embeddingDeliveryServiceFactory struct {
DeliveryServiceFactory
}
func (edsf *embeddingDeliveryServiceFactory) Service(g GossipService, endpoints []string, mcs api.MessageCryptoService) (deliverclient.DeliverService, error) {
ds, _ := edsf.DeliveryServiceFactory.Service(g, endpoints, mcs)
return newEmbeddingDeliveryService(ds), nil
}
func TestLeaderYield(t *testing.T) {
// Scenario: Spawn 2 peers and wait for the first one to be the leader
// There isn't any orderer present so the leader peer won't be able to
// connect to the orderer, and should relinquish its leadership after a while.
// Make sure the other peer declares itself as the leader soon after.
deliverclient.SetReconnectTotalTimeThreshold(time.Second * 5)
viper.Set("peer.gossip.useLeaderElection", true)
viper.Set("peer.gossip.orgLeader", false)
n := 2
portPrefix := 30000
gossips := startPeers(t, n, portPrefix)
defer stopPeers(gossips)
channelName := "channelA"
peerIndexes := []int{0, 1}
// Add peers to the channel
addPeersToChannel(t, n, portPrefix, channelName, gossips, peerIndexes)
// Prime the membership view of the peers
waitForFullMembership(t, gossips, n, time.Second*30, time.Second*2)
mcs := &naiveCryptoService{}
// Helper function that creates a gossipService instance
newGossipService := func(i int) *gossipServiceImpl {
peerIdentity := api.PeerIdentityType(fmt.Sprintf("localhost:%d", portPrefix+i))
gs := &gossipServiceImpl{
mcs: mcs,
gossipSvc: gossips[i],
chains: make(map[string]state.GossipStateProvider),
leaderElection: make(map[string]election.LeaderElectionService),
deliveryFactory: &embeddingDeliveryServiceFactory{&deliveryFactoryImpl{}},
idMapper: identity.NewIdentityMapper(mcs, peerIdentity),
peerIdentity: peerIdentity,
secAdv: &secAdvMock{},
}
gossipServiceInstance = gs
gs.InitializeChannel(channelName, &mockLedgerInfo{1}, []string{"localhost:7050"})
return gs
}
p0 := newGossipService(0)
p1 := newGossipService(1)
// Returns index of the leader or -1 if no leader elected
getLeader := func() int {
if p0.leaderElection[channelName].IsLeader() {
// Ensure p1 isn't a leader at the same time
assert.False(t, p1.leaderElection[channelName].IsLeader())
return 0
}
if p1.leaderElection[channelName].IsLeader() {
return 1
}
return -1
}
ds0 := p0.deliveryService.(*embeddingDeliveryService)
ds1 := p1.deliveryService.(*embeddingDeliveryService)
// Wait for p0 to connect to the ordering service
ds0.waitForDeliveryServiceActivation()
t.Log("p0 started its delivery service")
// Ensure it's a leader
assert.Equal(t, 0, getLeader())
// Wait for p0 to lose its leadership
ds0.waitForDeliveryServiceTermination()
t.Log("p0 stopped its delivery service")
// Ensure there is no leader
assert.Equal(t, -1, getLeader())
// Wait for p1 to take over
ds1.waitForDeliveryServiceActivation()
t.Log("p1 started its delivery service")
// Ensure it's a leader now
assert.Equal(t, 1, getLeader())
p0.chains[channelName].Stop()
p1.chains[channelName].Stop()
p0.deliveryService.Stop()
p1.deliveryService.Stop()
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment