Commit 9557376e authored by Lucas Kuhring's avatar Lucas Kuhring
Browse files

Add minor latency improvements in validation

parent bf880107
......@@ -127,6 +127,7 @@ type coordinator struct {
selfSignedData common.SignedData
Support
transientBlockRetention uint64
retryThresh time.Duration
}
// NewCoordinator creates a new instance of coordinator
......@@ -136,7 +137,10 @@ func NewCoordinator(support Support, selfSignedData common.SignedData) Coordinat
logger.Warning("Configuration key", transientBlockRetentionConfigKey, "isn't set, defaulting to", transientBlockRetentionDefault)
transientBlockRetention = transientBlockRetentionDefault
}
return &coordinator{Support: support, selfSignedData: selfSignedData, transientBlockRetention: transientBlockRetention}
retryThresh := viper.GetDuration("peer.gossip.pvtData.pullRetryThreshold")
return &coordinator{Support: support, selfSignedData: selfSignedData, transientBlockRetention: transientBlockRetention, retryThresh: retryThresh}
}
// StorePvtData used to persist private date into transient store
......@@ -145,6 +149,7 @@ func (c *coordinator) StorePvtData(txID string, privData *transientstore2.TxPvtR
}
func (c *coordinator) VerifyBlock(block *cached.Block, privateDataSets util.PvtDataCollections) (*cached.Block, error) {
if block.Data == nil {
return block, errors.New("Block data is empty")
}
......@@ -192,17 +197,16 @@ func (c *coordinator) StoreBlock(block *cached.Block, privateDataSets util.PvtDa
return err
}
retryThresh := viper.GetDuration("peer.gossip.pvtData.pullRetryThreshold")
var bFetchFromPeers bool // defaults to false
if len(privateInfo.missingKeys) == 0 {
logger.Debugf("[%s] No missing collection private write sets to fetch from remote peers", c.ChainID)
} else {
bFetchFromPeers = true
logger.Debugf("[%s] Could not find all collection private write sets in local peer transient store for block [%d].", c.ChainID, block.Header.Number)
logger.Debugf("[%s] Fetching %d collection private write sets from remote peers for a maximum duration of %s", c.ChainID, len(privateInfo.missingKeys), retryThresh)
logger.Debugf("[%s] Fetching %d collection private write sets from remote peers for a maximum duration of %s", c.ChainID, len(privateInfo.missingKeys), c.retryThresh)
}
startPull := time.Now()
limit := startPull.Add(retryThresh)
limit := startPull.Add(c.retryThresh)
for len(privateInfo.missingKeys) > 0 && time.Now().Before(limit) {
c.fetchFromPeers(block.Header.Number, ownedRWsets, privateInfo)
// If succeeded to fetch everything, no need to sleep before
......
......@@ -620,16 +620,26 @@ type PipelineData struct {
seq uint64
}
func (s *GossipStateProviderImpl) commitPayloads(chans []chan *PipelineData, maxOutstanding int) {
func (s *GossipStateProviderImpl) updateLedgerHeight(cIn chan *PipelineData) {
for {
job := <-cIn
s.mediator.UpdateLedgerHeight(job.block.Header.Number+1, common2.ChainID(s.chainID))
}
}
func (s *GossipStateProviderImpl) commitPayloads(cIn []chan *PipelineData, cOut chan *PipelineData, maxOutstanding int) {
seq := int(0)
for {
job := <-chans[seq]
job := <-cIn[seq]
seq++
if seq == maxOutstanding {
seq = 0
}
//logger.Errorf("SC: store ", job.seq)
s.ledger.StoreBlock(job.block, job.privateDataSets)
cOut <- job
}
}
......@@ -665,18 +675,19 @@ func (s *GossipStateProviderImpl) deliverPayloads() {
MaxNumOutstanding, _ = strconv.Atoi(strPipeline)
}
var inChans []chan *PipelineData
var outChans []chan *PipelineData
inChans = make([]chan *PipelineData, MaxNumOutstanding)
outChans = make([]chan *PipelineData, MaxNumOutstanding)
inChans := make([]chan *PipelineData, MaxNumOutstanding)
intChans := make([]chan *PipelineData, MaxNumOutstanding)
outChans := make(chan *PipelineData, MaxNumOutstanding)
if pipeline {
for seq := 0; seq < MaxNumOutstanding; seq++ {
inChans[seq] = make(chan *PipelineData)
outChans[seq] = make(chan *PipelineData)
go s.verifyPayloads(inChans[seq], outChans[seq])
intChans[seq] = make(chan *PipelineData)
go s.verifyPayloads(inChans[seq], intChans[seq])
}
go s.commitPayloads(outChans, MaxNumOutstanding)
go s.commitPayloads(intChans, outChans, MaxNumOutstanding)
go s.updateLedgerHeight(outChans)
}
for {
......@@ -707,7 +718,6 @@ func (s *GossipStateProviderImpl) deliverPayloads() {
continue
}
}
job := &PipelineData{cached.GetBlock(rawBlock), p, payload.SeqNum}
if pipeline {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment