Unverified Commit 7b44924b authored by Artem Barger's avatar Artem Barger
Browse files

FAB-14616 externalize gossip state transfer config



Externalize gossip state transfer parameters so it could
be controlled via peer configuration.

Change-Id: Ie5b26941d5e97c605c704298e97eed5ed178f0ca
Signed-off-by: default avatarArtem Barger <bartem@il.ibm.com>
parent c458d3ae
......@@ -124,6 +124,30 @@ peer:
# Time between peer sends propose message and declare itself as a leader (sends declaration message) (unit: second)
leaderElectionDuration: 5s
# Gossip state transfer related configuration
state:
# indicates whenever state transfer is enabled or not
# default value is true, i.e. state transfer is active
# and takes care to sync up missing blocks allowing
# lagging peer to catch up to speed with rest network
enabled: true
# checkInterval interval to check whether peer is lagging behind enough to
# request blocks via state transfer from another peer.
checkInterval: 10s
# responseTimeout amount of time to wait for state transfer response from
# other peers
responseTimeout: 3s
# batchSize the number of blocks to request via state transfer from another peer
batchSize: 10
# blockBufferSize reflect the maximum distance between lowest and
# highest block sequence number state buffer to avoid holes.
# In order to ensure absence of the holes actual buffer size
# is twice of this distance
blockBufferSize: 100
# maxRetries maximum number of re-tries to ask
# for single state transfer request
maxRetries: 3
# Sync related configuration
sync:
blocks:
......
......@@ -53,6 +53,17 @@ const (
enqueueRetryInterval = time.Millisecond * 100
)
// Configuration keeps state transfer configuration parameters
type Configuration struct {
AntiEntropyInterval time.Duration
AntiEntropyStateResponseTimeout time.Duration
AntiEntropyBatchSize uint64
MaxBlockDistance int
AntiEntropyMaxRetries int
ChannelBufferSize int
EnableStateTransfer bool
}
// GossipAdapter defines gossip/communication required interface for state provider
type GossipAdapter interface {
// Send sends a message to remote peers
......@@ -147,10 +158,76 @@ type GossipStateProviderImpl struct {
once sync.Once
stateTransferActive int32
requestValidator *stateRequestValidator
blockingMode bool
config *Configuration
}
var logger = util.GetLogger(util.StateLogger, "")
// stateRequestValidator facilitates validation of the state request messages
type stateRequestValidator struct {
}
// validate checks for RemoteStateRequest message validity
func (v *stateRequestValidator) validate(request *proto.RemoteStateRequest, batchSize uint64) error {
if request.StartSeqNum > request.EndSeqNum {
return errors.Errorf("Invalid sequence interval [%d...%d).", request.StartSeqNum, request.EndSeqNum)
}
if request.EndSeqNum > batchSize+request.StartSeqNum {
return errors.Errorf("Requesting blocks range [%d-%d) greater than configured allowed"+
" (%d) batching size for anti-entropy.", request.StartSeqNum, request.EndSeqNum, batchSize)
}
return nil
}
// readConfiguration reading state configuration
func readConfiguration() *Configuration {
config := &Configuration{
AntiEntropyInterval: defAntiEntropyInterval,
AntiEntropyStateResponseTimeout: defAntiEntropyStateResponseTimeout,
AntiEntropyBatchSize: defAntiEntropyBatchSize,
MaxBlockDistance: defMaxBlockDistance,
AntiEntropyMaxRetries: defAntiEntropyMaxRetries,
ChannelBufferSize: defChannelBufferSize,
EnableStateTransfer: true,
}
if viper.IsSet("peer.gossip.state.checkInterval") {
config.AntiEntropyInterval = viper.GetDuration("peer.gossip.state.checkInterval")
}
if viper.IsSet("peer.gossip.state.responseTimeout") {
config.AntiEntropyStateResponseTimeout = viper.GetDuration("peer.gossip.state.responseTimeout")
}
if viper.IsSet("peer.gossip.state.batchSize") {
config.AntiEntropyBatchSize = uint64(viper.GetInt("peer.gossip.state.batchSize"))
}
if viper.IsSet("peer.gossip.state.blockBufferSize") {
config.MaxBlockDistance = viper.GetInt("peer.gossip.state.blockBufferSize")
}
if viper.IsSet("peer.gossip.state.maxRetries") {
config.AntiEntropyMaxRetries = viper.GetInt("peer.gossip.state.maxRetries")
}
if viper.IsSet("peer.gossip.state.channelSize") {
config.ChannelBufferSize = viper.GetInt("peer.gossip.state.channelSize")
}
if viper.IsSet("peer.gossip.state.enabled") {
config.EnableStateTransfer = viper.GetBool("peer.gossip.state.enabled")
}
return config
}
// NewGossipStateProvider creates state provider with coordinator instance
// to orchestrate arrival of private rwsets and blocks before committing them into the ledger.
func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger ledgerResources) GossipStateProvider {
......@@ -197,6 +274,9 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
return nil
}
// Reading state configuration
config := readConfiguration()
s := &GossipStateProviderImpl{
// MessageCryptoService
mediator: services,
......@@ -215,15 +295,19 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
ledger: ledger,
stateResponseCh: make(chan proto.ReceivedMessage, defChannelBufferSize),
stateResponseCh: make(chan proto.ReceivedMessage, config.ChannelBufferSize),
stateRequestCh: make(chan proto.ReceivedMessage, defChannelBufferSize),
stateRequestCh: make(chan proto.ReceivedMessage, config.ChannelBufferSize),
stopCh: make(chan struct{}, 1),
stateTransferActive: 0,
once: sync.Once{},
requestValidator: &stateRequestValidator{},
config: config,
}
logger.Infof("Updating metadata information, "+
......@@ -237,8 +321,10 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
go s.listen()
// Deliver in order messages into the incoming channel
go s.deliverPayloads()
// Execute anti entropy to fill missing gaps
go s.antiEntropy()
if s.config.EnableStateTransfer {
// Execute anti entropy to fill missing gaps
go s.antiEntropy()
}
// Taking care of state request messages
go s.processStateRequests()
......@@ -345,7 +431,7 @@ func (s *GossipStateProviderImpl) directMessage(msg proto.ReceivedMessage) {
incoming := msg.GetGossipMessage()
if incoming.GetStateRequest() != nil {
if len(s.stateRequestCh) < defChannelBufferSize {
if len(s.stateRequestCh) < s.config.ChannelBufferSize {
// Forward state request to the channel, if there are too
// many message of state request ignore to avoid flooding.
s.stateRequestCh <- msg
......@@ -382,15 +468,8 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg proto.ReceivedMessage)
}
request := msg.GetGossipMessage().GetStateRequest()
batchSize := request.EndSeqNum - request.StartSeqNum
if batchSize > defAntiEntropyBatchSize {
logger.Errorf("Requesting blocks batchSize size (%d) greater than configured allowed"+
" (%d) batching for anti-entropy. Ignoring request...", batchSize, defAntiEntropyBatchSize)
return
}
if request.StartSeqNum > request.EndSeqNum {
logger.Errorf("Invalid sequence interval [%d...%d], ignoring request...", request.StartSeqNum, request.EndSeqNum)
if err := s.requestValidator.validate(request, s.config.AntiEntropyBatchSize); err != nil {
logger.Errorf("State request validation failed, %s. Ignoring request...", err)
return
}
......@@ -580,7 +659,7 @@ func (s *GossipStateProviderImpl) antiEntropy() {
case <-s.stopCh:
s.stopCh <- struct{}{}
return
case <-time.After(defAntiEntropyInterval):
case <-time.After(s.config.AntiEntropyInterval):
ourHeight, err := s.ledger.LedgerHeight()
if err != nil {
// Unable to read from ledger continue to the next round
......@@ -625,7 +704,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
defer atomic.StoreInt32(&s.stateTransferActive, 0)
for prev := start; prev <= end; {
next := min(end, prev+defAntiEntropyBatchSize)
next := min(end, prev+s.config.AntiEntropyBatchSize)
gossipMsg := s.stateRequestMessage(prev, next)
......@@ -633,7 +712,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
tryCounts := 0
for !responseReceived {
if tryCounts > defAntiEntropyMaxRetries {
if tryCounts > s.config.AntiEntropyMaxRetries {
logger.Warningf("Wasn't able to get blocks in range [%d...%d), after %d retries",
prev, next, tryCounts)
return
......@@ -667,7 +746,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
}
prev = index + 1
responseReceived = true
case <-time.After(defAntiEntropyStateResponseTimeout):
case <-time.After(s.config.AntiEntropyStateResponseTimeout):
case <-s.stopCh:
s.stopCh <- struct{}{}
return
......@@ -753,15 +832,16 @@ func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMod
return errors.Wrap(err, "Failed obtaining ledger height")
}
if !blockingMode && payload.SeqNum-height >= defMaxBlockDistance {
if !blockingMode && payload.SeqNum-height >= uint64(s.config.MaxBlockDistance) {
return errors.Errorf("Ledger height is at %d, cannot enqueue block with sequence of %d", height, payload.SeqNum)
}
for blockingMode && s.payloads.Size() > defMaxBlockDistance*2 {
for blockingMode && s.payloads.Size() > s.config.MaxBlockDistance*2 {
time.Sleep(enqueueRetryInterval)
}
s.payloads.Push(payload)
logger.Debugf("Blocks payloads buffer size for channel [%s] is %d blocks", s.chainID, s.payloads.Size())
return nil
}
......
......@@ -1642,6 +1642,29 @@ func TestTransferOfPvtDataBetweenPeers(t *testing.T) {
}
}
func TestStateRequestValidator(t *testing.T) {
validator := &stateRequestValidator{}
err := validator.validate(&proto.RemoteStateRequest{
StartSeqNum: 10,
EndSeqNum: 5,
}, defAntiEntropyBatchSize)
assert.Contains(t, err.Error(), "Invalid sequence interval [10...5).")
assert.Error(t, err)
err = validator.validate(&proto.RemoteStateRequest{
StartSeqNum: 10,
EndSeqNum: 30,
}, defAntiEntropyBatchSize)
assert.Contains(t, err.Error(), "Requesting blocks range [10-30) greater than configured")
assert.Error(t, err)
err = validator.validate(&proto.RemoteStateRequest{
StartSeqNum: 10,
EndSeqNum: 20,
}, defAntiEntropyBatchSize)
assert.NoError(t, err)
}
func waitUntilTrueOrTimeout(t *testing.T, predicate func() bool, timeout time.Duration) {
ch := make(chan struct{})
go func() {
......
......@@ -197,6 +197,30 @@ peer:
# reconciliationEnabled is a flag that indicates whether private data reconciliation is enable or not.
reconciliationEnabled: true
# Gossip state transfer related configuration
state:
# indicates whenever state transfer is enabled or not
# default value is true, i.e. state transfer is active
# and takes care to sync up missing blocks allowing
# lagging peer to catch up to speed with rest network
enabled: true
# checkInterval interval to check whether peer is lagging behind enough to
# request blocks via state transfer from another peer.
checkInterval: 10s
# responseTimeout amount of time to wait for state transfer response from
# other peers
responseTimeout: 3s
# batchSize the number of blocks to request via state transfer from another peer
batchSize: 10
# blockBufferSize reflect the maximum distance between lowest and
# highest block sequence number state buffer to avoid holes.
# In order to ensure absence of the holes actual buffer size
# is twice of this distance
blockBufferSize: 100
# maxRetries maximum number of re-tries to ask
# for single state transfer request
maxRetries: 3
# TLS Settings
# Note that peer-chaincode connections through chaincodeListenAddress is
# not mutual TLS auth. See comments on chaincodeListenAddress for more info
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment