Commit 71838920 authored by Lucas Kuhring's avatar Lucas Kuhring
Browse files

Fix validation pipeline

parent 82acef52
......@@ -55,6 +55,17 @@ const (
enqueueRetryInterval = time.Millisecond * 100
)
// Configuration keeps state transfer configuration parameters
type Configuration struct {
AntiEntropyInterval time.Duration
AntiEntropyStateResponseTimeout time.Duration
AntiEntropyBatchSize uint64
MaxBlockDistance int
AntiEntropyMaxRetries int
ChannelBufferSize int
EnableStateTransfer bool
}
// GossipAdapter defines gossip/communication required interface for state provider
type GossipAdapter interface {
// Send sends a message to remote peers
......@@ -151,10 +162,76 @@ type GossipStateProviderImpl struct {
once sync.Once
stateTransferActive int32
requestValidator *stateRequestValidator
blockingMode bool
config *Configuration
}
var logger = util.GetLogger(util.StateLogger, "")
// stateRequestValidator facilitates validation of the state request messages
type stateRequestValidator struct {
}
// validate checks for RemoteStateRequest message validity
func (v *stateRequestValidator) validate(request *proto.RemoteStateRequest, batchSize uint64) error {
if request.StartSeqNum > request.EndSeqNum {
return errors.Errorf("Invalid sequence interval [%d...%d).", request.StartSeqNum, request.EndSeqNum)
}
if request.EndSeqNum > batchSize+request.StartSeqNum {
return errors.Errorf("Requesting blocks range [%d-%d) greater than configured allowed"+
" (%d) batching size for anti-entropy.", request.StartSeqNum, request.EndSeqNum, batchSize)
}
return nil
}
// readConfiguration reading state configuration
func readConfiguration() *Configuration {
config := &Configuration{
AntiEntropyInterval: defAntiEntropyInterval,
AntiEntropyStateResponseTimeout: defAntiEntropyStateResponseTimeout,
AntiEntropyBatchSize: defAntiEntropyBatchSize,
MaxBlockDistance: defMaxBlockDistance,
AntiEntropyMaxRetries: defAntiEntropyMaxRetries,
ChannelBufferSize: defChannelBufferSize,
EnableStateTransfer: true,
}
if viper.IsSet("peer.gossip.state.checkInterval") {
config.AntiEntropyInterval = viper.GetDuration("peer.gossip.state.checkInterval")
}
if viper.IsSet("peer.gossip.state.responseTimeout") {
config.AntiEntropyStateResponseTimeout = viper.GetDuration("peer.gossip.state.responseTimeout")
}
if viper.IsSet("peer.gossip.state.batchSize") {
config.AntiEntropyBatchSize = uint64(viper.GetInt("peer.gossip.state.batchSize"))
}
if viper.IsSet("peer.gossip.state.blockBufferSize") {
config.MaxBlockDistance = viper.GetInt("peer.gossip.state.blockBufferSize")
}
if viper.IsSet("peer.gossip.state.maxRetries") {
config.AntiEntropyMaxRetries = viper.GetInt("peer.gossip.state.maxRetries")
}
if viper.IsSet("peer.gossip.state.channelSize") {
config.ChannelBufferSize = viper.GetInt("peer.gossip.state.channelSize")
}
if viper.IsSet("peer.gossip.state.enabled") {
config.EnableStateTransfer = viper.GetBool("peer.gossip.state.enabled")
}
return config
}
// NewGossipStateProvider creates state provider with coordinator instance
// to orchestrate arrival of private rwsets and blocks before committing them into the ledger.
func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger ledgerResources) GossipStateProvider {
......@@ -201,6 +278,9 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
return nil
}
// Reading state configuration
config := readConfiguration()
s := &GossipStateProviderImpl{
// MessageCryptoService
mediator: services,
......@@ -219,15 +299,19 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
ledger: ledger,
stateResponseCh: make(chan proto.ReceivedMessage, defChannelBufferSize),
stateResponseCh: make(chan proto.ReceivedMessage, config.ChannelBufferSize),
stateRequestCh: make(chan proto.ReceivedMessage, defChannelBufferSize),
stateRequestCh: make(chan proto.ReceivedMessage, config.ChannelBufferSize),
stopCh: make(chan struct{}, 1),
stateTransferActive: 0,
once: sync.Once{},
requestValidator: &stateRequestValidator{},
config: config,
}
logger.Infof("Updating metadata information, "+
......@@ -241,8 +325,10 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
go s.listen()
// Deliver in order messages into the incoming channel
go s.deliverPayloads()
// Execute anti entropy to fill missing gaps
go s.antiEntropy()
if s.config.EnableStateTransfer {
// Execute anti entropy to fill missing gaps
go s.antiEntropy()
}
// Taking care of state request messages
go s.processStateRequests()
......@@ -349,7 +435,7 @@ func (s *GossipStateProviderImpl) directMessage(msg proto.ReceivedMessage) {
incoming := msg.GetGossipMessage()
if incoming.GetStateRequest() != nil {
if len(s.stateRequestCh) < defChannelBufferSize {
if len(s.stateRequestCh) < s.config.ChannelBufferSize {
// Forward state request to the channel, if there are too
// many message of state request ignore to avoid flooding.
s.stateRequestCh <- msg
......@@ -386,15 +472,8 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg proto.ReceivedMessage)
}
request := msg.GetGossipMessage().GetStateRequest()
batchSize := request.EndSeqNum - request.StartSeqNum
if batchSize > defAntiEntropyBatchSize {
logger.Errorf("Requesting blocks batchSize size (%d) greater than configured allowed"+
" (%d) batching for anti-entropy. Ignoring request...", batchSize, defAntiEntropyBatchSize)
return
}
if request.StartSeqNum > request.EndSeqNum {
logger.Errorf("Invalid sequence interval [%d...%d], ignoring request...", request.StartSeqNum, request.EndSeqNum)
if err := s.requestValidator.validate(request, s.config.AntiEntropyBatchSize); err != nil {
logger.Errorf("State request validation failed, %s. Ignoring request...", err)
return
}
......@@ -663,7 +742,7 @@ func (s *GossipStateProviderImpl) antiEntropy() {
case <-s.stopCh:
s.stopCh <- struct{}{}
return
case <-time.After(defAntiEntropyInterval):
case <-time.After(s.config.AntiEntropyInterval):
ourHeight, err := s.ledger.LedgerHeight()
if err != nil {
// Unable to read from ledger continue to the next round
......@@ -708,7 +787,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
defer atomic.StoreInt32(&s.stateTransferActive, 0)
for prev := start; prev <= end; {
next := min(end, prev+defAntiEntropyBatchSize)
next := min(end, prev+s.config.AntiEntropyBatchSize)
gossipMsg := s.stateRequestMessage(prev, next)
......@@ -716,7 +795,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
tryCounts := 0
for !responseReceived {
if tryCounts > defAntiEntropyMaxRetries {
if tryCounts > s.config.AntiEntropyMaxRetries {
logger.Warningf("Wasn't able to get blocks in range [%d...%d), after %d retries",
prev, next, tryCounts)
return
......@@ -750,7 +829,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
}
prev = index + 1
responseReceived = true
case <-time.After(defAntiEntropyStateResponseTimeout):
case <-time.After(s.config.AntiEntropyStateResponseTimeout):
case <-s.stopCh:
s.stopCh <- struct{}{}
return
......@@ -836,15 +915,16 @@ func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMod
return errors.Wrap(err, "Failed obtaining ledger height")
}
if !blockingMode && payload.SeqNum-height >= defMaxBlockDistance {
if !blockingMode && payload.SeqNum-height >= uint64(s.config.MaxBlockDistance) {
return errors.Errorf("Ledger height is at %d, cannot enqueue block with sequence of %d", height, payload.SeqNum)
}
for blockingMode && s.payloads.Size() > defMaxBlockDistance*2 {
for blockingMode && s.payloads.Size() > s.config.MaxBlockDistance*2 {
time.Sleep(enqueueRetryInterval)
}
s.payloads.Push(payload)
logger.Debugf("Blocks payloads buffer size for channel [%s] is %d blocks", s.chainID, s.payloads.Size())
return nil
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment