Commit bf880107 authored by Lucas Kuhring's avatar Lucas Kuhring
Browse files

Delete unnecessary fastfabric code

parent 09a44eeb
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package parallel
import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
"sync"
"sync/atomic"
"github.com/hyperledger/fabric/gossip/util"
)
var logger = flogging.MustGetLogger("PayloadsBuffer")
// PayloadsBuffer is used to store payloads into which used to
// support payloads with blocks reordering according to the
// sequence numbers. It also will provide the capability
// to signal whenever expected block has arrived.
type PayloadsBuffer interface {
// Adds new block into the buffer
Push(payload *cached.Block)
// Returns next expected sequence number
Next() uint64
// Remove and return payload with given sequence number
Pop() *cached.Block
// Get current buffer size
Size() int
// Channel to indicate event when new payload pushed with sequence
// number equal to the next expected value.
Ready() chan struct{}
Close()
}
// PayloadsBufferImpl structure to implement PayloadsBuffer interface
// store inner state of available payloads and sequence numbers
type PayloadsBufferImpl struct {
next uint64
buf map[uint64]*cached.Block
readyChan chan struct{}
mutex sync.RWMutex
logger util.Logger
}
// NewPayloadsBuffer is factory function to create new payloads buffer
func NewPayloadsBuffer(next uint64) PayloadsBuffer {
return &PayloadsBufferImpl{
buf: make(map[uint64]*cached.Block),
readyChan: make(chan struct{}, 1),
next: next,
logger: util.GetLogger(util.StateLogger, ""),
}
}
// Ready function returns the channel which indicates whenever expected
// next block has arrived and one could safely pop out
// next sequence of blocks
func (b *PayloadsBufferImpl) Ready() chan struct{} {
return b.readyChan
}
// Push new payload into the buffer structure in case new arrived payload
// sequence number is below the expected next block number payload will be
// thrown away.
// TODO return bool to indicate if payload was added or not, so that caller can log result.
func (b *PayloadsBufferImpl) Push(block *cached.Block) {
b.mutex.Lock()
defer b.mutex.Unlock()
defer b.checkForReady()
seqNum := block.Header.Number
if seqNum < b.next || b.buf[seqNum] != nil {
logger.Debugf("Payload with sequence number = %d has been already processed", seqNum)
return
}
b.buf[seqNum] = block
}
// Next function provides the number of the next expected block
func (b *PayloadsBufferImpl) Next() uint64 {
// Atomically read the value of the top sequence number
return atomic.LoadUint64(&b.next)
}
// Pop function extracts the payload according to the next expected block
// number, if no next block arrived yet, function returns nil.
func (b *PayloadsBufferImpl) Pop() *cached.Block {
b.mutex.Lock()
defer b.mutex.Unlock()
defer b.checkForReady()
result := b.buf[b.Next()]
if result != nil {
// If there is such sequence in the buffer need to delete it
delete(b.buf, b.Next())
// Increment next expect block index
atomic.AddUint64(&b.next, 1)
b.drainReadChannel()
}
return result
}
func (b *PayloadsBufferImpl) checkForReady(){
// Send notification that next sequence has arrived
if b.buf[b.next] != nil && len(b.readyChan) == 0 {
b.readyChan <- struct{}{}
}
}
// drainReadChannel empties ready channel in case last
// payload has been poped up and there are still awaiting
// notifications in the channel
func (b *PayloadsBufferImpl) drainReadChannel() {
if len(b.buf) == 0 {
for {
if len(b.readyChan) > 0 {
<-b.readyChan
} else {
break
}
}
}
}
// Size returns current number of payloads stored within buffer
func (b *PayloadsBufferImpl) Size() int {
b.mutex.RLock()
defer b.mutex.RUnlock()
return len(b.buf)
}
// Close cleanups resources and channels in maintained
func (b *PayloadsBufferImpl) Close() {
close(b.readyChan)
}
package parallel
import (
"os"
"strconv"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
)
var (
ReadyToCommit chan chan *cached.Block
ReadyForValidation chan *Pipeline
)
type Pipeline struct {
Channel chan *cached.Block
Block *cached.Block
}
func InitPipeline() {
strPipeline := os.Getenv("STREAMCHAIN_PIPELINE")
p := 0
if strPipeline != "" && strPipeline != "0" {
p, _ = strconv.Atoi(strPipeline)
}
ReadyToCommit = make(chan chan *cached.Block, p)
ReadyForValidation = make(chan *Pipeline, p)
}
package state
import (
"sync"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
"github.com/hyperledger/fabric/fastfabric-extensions/parallel"
common2 "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/state"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/transientstore"
"github.com/pkg/errors"
)
var logger = util.GetLogger(util.ServiceLogger, "")
type ledgerResources interface {
VerifyBlock(block *cached.Block, pvtdata util.PvtDataCollections) (*cached.Block, error)
// StoreBlock deliver new block with underlined private data
// returns missing transaction ids
StoreBlock(block *cached.Block, data util.PvtDataCollections) error
// StorePvtData used to persist private date into transient store
StorePvtData(txid string, privData *transientstore.TxPvtReadWriteSetWithConfigInfo, blckHeight uint64) error
// GetPvtDataAndBlockByNum get block by number and returns also all related private data
// the order of private data in slice of PvtDataCollections doesn't imply the order of
// transactions in the block related to these private data, to get the correct placement
// need to read TxPvtData.SeqInBlock field
GetPvtDataAndBlockByNum(seqNum uint64, peerAuthInfo common.SignedData) (*common.Block, util.PvtDataCollections, error)
// Get recent block sequence number
LedgerHeight() (uint64, error)
// Close ledgerResources
Close()
}
type GossipStateProviderImpl struct {
state.GossipStateProvider
chainID string
buffer parallel.PayloadsBuffer
mediator *state.ServicesMediator
ledgerResources
done sync.WaitGroup
once sync.Once
stopCh chan struct{}
}
func NewGossipStateProvider(chainID string, services *state.ServicesMediator, ledger ledgerResources) state.GossipStateProvider {
height, err := ledger.LedgerHeight()
if height == 0 {
// Panic here since this is an indication of invalid situation which should not happen in normal
// code path.
logger.Panic("Committer height cannot be zero, ledger should include at least one block (genesis).")
}
if err != nil {
logger.Error("Could not read ledger info to obtain current ledger height due to: ", errors.WithStack(err))
// Exiting as without ledger it will be impossible
// to deliver new blocks
return nil
}
gsp := &GossipStateProviderImpl{
GossipStateProvider: state.NewGossipStateProvider(chainID, services, ledger),
chainID: chainID,
mediator: services,
ledgerResources: ledger,
buffer: parallel.NewPayloadsBuffer(height),
stopCh: make(chan struct{}, 1),
once: sync.Once{}}
gsp.done.Add(1)
parallel.InitPipeline()
go gsp.deliverPayloads()
return gsp
}
func (s *GossipStateProviderImpl) deliverPayloads() {
for pipelinedBlock := range parallel.ReadyForValidation {
go s.validate(pipelinedBlock)
}
logger.Debug("State provider has been stopped, finishing to push new blocks.")
return
}
func (s *GossipStateProviderImpl) commit() {
go s.store()
for blockPromise := range parallel.ReadyToCommit {
block, _ := <-blockPromise
if block != nil {
s.buffer.Push(block)
}
}
}
func (s *GossipStateProviderImpl) store() {
defer s.done.Done()
for {
select {
case <-s.buffer.Ready():
block := s.buffer.Pop()
// Commit block with available private transactions
if err := s.ledgerResources.StoreBlock(block, util.PvtDataCollections{}); err != nil {
logger.Errorf("Got error while committing(%+v)", errors.WithStack(err))
return
}
// Update ledger height
s.mediator.UpdateLedgerHeight(block.Header.Number+1, common2.ChainID(s.chainID))
logger.Debugf("[%s] Committed block [%d] with %d transaction(s)",
s.chainID, block.Header.Number, len(block.Data.Data))
case <-s.stopCh:
s.stopCh <- struct{}{}
logger.Debug("State provider has been stopped, finishing to push new blocks.")
return
}
}
}
func (s *GossipStateProviderImpl) validate(pipeline *parallel.Pipeline) {
defer close(pipeline.Channel)
if _, err := s.ledgerResources.VerifyBlock(pipeline.Block, util.PvtDataCollections{}); err != nil {
logger.Errorf("Validation failed: %+v", err)
return
}
pipeline.Channel <- pipeline.Block
}
func (s *GossipStateProviderImpl) Stop() {
// Make sure stop won't be executed twice
// and stop channel won't be used again
s.once.Do(func() {
s.stopCh <- struct{}{}
// Make sure all go-routines has finished
s.done.Wait()
})
s.GossipStateProvider.Stop()
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment