Commit 09a44eeb authored by Lucas Kuhring's avatar Lucas Kuhring
Browse files

Bug fix in ledger buffering

parent cddd278f
......@@ -252,6 +252,7 @@ func (mgr *blockfileMgr) moveToNextFile() {
}
func (mgr *blockfileMgr) addBlock(block *cached.Block) error {
bcInfo := mgr.getBlockchainInfo()
if block.Header.Number != bcInfo.Height {
return errors.Errorf(
......@@ -271,6 +272,7 @@ func (mgr *blockfileMgr) addBlock(block *cached.Block) error {
)
}
blockBytes, info, err := serializeBlock(block.Block)
if err != nil {
return errors.WithMessage(err, "error serializing block")
}
......@@ -302,7 +304,6 @@ func (mgr *blockfileMgr) addBlock(block *cached.Block) error {
}
return errors.WithMessage(err, "error appending block to file")
}
//Update the checkpoint info with the results of adding the new block
currentCPInfo := mgr.cpInfo
newCPInfo := &checkpointInfo{
......@@ -327,13 +328,11 @@ func (mgr *blockfileMgr) addBlock(block *cached.Block) error {
txOffset.loc.offset += len(blockBytesEncodedLen)
}
//save the index in the database
//start := time.Now()
if err = mgr.index.indexBlock(&blockIdxInfo{
blockNum: block.Header.Number, blockHash: blockHash,
flp: blockFLP, txOffsets: txOffsets, metadata: block.Metadata}); err != nil {
return err
}
//logger.Errorf("Indexing: %.2f", time.Since(start).Seconds()*1000)
//update the checkpoint info (for storage) and the blockchain info (for APIs) in the manager
mgr.updateCheckpoint(newCPInfo)
mgr.updateBlockchainInfo(blockHash, block)
......
......@@ -8,14 +8,11 @@ package fsblkstorage
import (
"os"
"sync"
"time"
"github.com/pkg/errors"
)
var lock = sync.RWMutex{}
type batchedBlockfileWriter struct {
batch int
bfw *blockfileWriter
......@@ -39,7 +36,11 @@ func newBatchedBlockFileWriter(bfw *blockfileWriter, batch int) *batchedBlockfil
}
func (w *batchedBlockfileWriter) setBlockfileWriter(bfw *blockfileWriter) {
if w.batch == 0 {
w.bfw.close()
}
w.currentLen = 0
w.bfw = bfw
}
......@@ -86,15 +87,20 @@ func (w *batchedBlockfileWriter) finalWrite() {
}
}
func (w *batchedBlockfileWriter) close() {
w.bfw.close()
}
func (w *batchedBlockfileWriter) close() error {
func (w *batchedBlockfileWriter) writeOut(wait bool) error {
if err := w.writeOut(true); err != nil {
return err
}
if err := w.bfw.close(); err != nil {
return err
}
//lock.Lock()
return nil
}
//start := time.Now()
func (w *batchedBlockfileWriter) writeOut(wait bool) error {
if wait {
go w.finalWrite()
......@@ -112,6 +118,9 @@ func (w *batchedBlockfileWriter) writeOut(wait bool) error {
if err = lastFile.Sync(); err != nil {
return err
}
if err := lastFile.Close(); err != nil {
return err
}
}
_, err = v.file.Write(v.data)
......@@ -129,12 +138,8 @@ func (w *batchedBlockfileWriter) writeOut(wait bool) error {
}
}
//logger.Errorf("wr,%d,%d,%.2f\n", time.Now().UnixNano(), len(w.buffer), time.Since(start).Seconds()*1000)
w.buffer = w.buffer[:0]
//lock.Unlock()
return nil
}
......@@ -151,6 +156,10 @@ func (w *batchedBlockfileWriter) truncateFile(targetSize int) error {
w.currentLen = targetSize
}
if err := w.writeOut(false); err != nil {
return err
}
return nil
}
......@@ -197,6 +206,7 @@ func (w *blockfileWriter) open() error {
}
func (w *blockfileWriter) close() error {
return errors.WithStack(w.file.Close())
}
......
......@@ -234,7 +234,6 @@ func (index *blockIndex) writeOut(data map[string][]byte) error {
if err := index.db.WriteBatch(&leveldbhelper.UpdateBatch{KVs: data}, true); err != nil {
return err
}
return nil
}
......
......@@ -183,6 +183,7 @@ func (s *store) Prepare(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvt
return &ErrIllegalCall{`A pending batch exists as as result of last invoke to "Prepare" call.
Invoke "Commit" or "Rollback" on the pending batch before invoking "Prepare" function`}
}
expectedBlockNum := s.nextBlockNum()
if expectedBlockNum != blockNum {
return &ErrIllegalArgs{fmt.Sprintf("Expected block number=%d, recived block number=%d", expectedBlockNum, blockNum)}
......
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package parallel
import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
"sync"
"sync/atomic"
"github.com/hyperledger/fabric/gossip/util"
)
var logger = flogging.MustGetLogger("PayloadsBuffer")
// PayloadsBuffer is used to store payloads into which used to
// support payloads with blocks reordering according to the
// sequence numbers. It also will provide the capability
// to signal whenever expected block has arrived.
type PayloadsBuffer interface {
// Adds new block into the buffer
Push(payload *cached.Block)
// Returns next expected sequence number
Next() uint64
// Remove and return payload with given sequence number
Pop() *cached.Block
// Get current buffer size
Size() int
// Channel to indicate event when new payload pushed with sequence
// number equal to the next expected value.
Ready() chan struct{}
Close()
}
// PayloadsBufferImpl structure to implement PayloadsBuffer interface
// store inner state of available payloads and sequence numbers
type PayloadsBufferImpl struct {
next uint64
buf map[uint64]*cached.Block
readyChan chan struct{}
mutex sync.RWMutex
logger util.Logger
}
// NewPayloadsBuffer is factory function to create new payloads buffer
func NewPayloadsBuffer(next uint64) PayloadsBuffer {
return &PayloadsBufferImpl{
buf: make(map[uint64]*cached.Block),
readyChan: make(chan struct{}, 1),
next: next,
logger: util.GetLogger(util.StateLogger, ""),
}
}
// Ready function returns the channel which indicates whenever expected
// next block has arrived and one could safely pop out
// next sequence of blocks
func (b *PayloadsBufferImpl) Ready() chan struct{} {
return b.readyChan
}
// Push new payload into the buffer structure in case new arrived payload
// sequence number is below the expected next block number payload will be
// thrown away.
// TODO return bool to indicate if payload was added or not, so that caller can log result.
func (b *PayloadsBufferImpl) Push(block *cached.Block) {
b.mutex.Lock()
defer b.mutex.Unlock()
defer b.checkForReady()
seqNum := block.Header.Number
if seqNum < b.next || b.buf[seqNum] != nil {
logger.Debugf("Payload with sequence number = %d has been already processed", seqNum)
return
}
b.buf[seqNum] = block
}
// Next function provides the number of the next expected block
func (b *PayloadsBufferImpl) Next() uint64 {
// Atomically read the value of the top sequence number
return atomic.LoadUint64(&b.next)
}
// Pop function extracts the payload according to the next expected block
// number, if no next block arrived yet, function returns nil.
func (b *PayloadsBufferImpl) Pop() *cached.Block {
b.mutex.Lock()
defer b.mutex.Unlock()
defer b.checkForReady()
result := b.buf[b.Next()]
if result != nil {
// If there is such sequence in the buffer need to delete it
delete(b.buf, b.Next())
// Increment next expect block index
atomic.AddUint64(&b.next, 1)
b.drainReadChannel()
}
return result
}
func (b *PayloadsBufferImpl) checkForReady(){
// Send notification that next sequence has arrived
if b.buf[b.next] != nil && len(b.readyChan) == 0 {
b.readyChan <- struct{}{}
}
}
// drainReadChannel empties ready channel in case last
// payload has been poped up and there are still awaiting
// notifications in the channel
func (b *PayloadsBufferImpl) drainReadChannel() {
if len(b.buf) == 0 {
for {
if len(b.readyChan) > 0 {
<-b.readyChan
} else {
break
}
}
}
}
// Size returns current number of payloads stored within buffer
func (b *PayloadsBufferImpl) Size() int {
b.mutex.RLock()
defer b.mutex.RUnlock()
return len(b.buf)
}
// Close cleanups resources and channels in maintained
func (b *PayloadsBufferImpl) Close() {
close(b.readyChan)
}
package parallel
import (
"os"
"strconv"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
)
var (
ReadyToCommit chan chan *cached.Block
ReadyForValidation chan *Pipeline
)
type Pipeline struct {
Channel chan *cached.Block
Block *cached.Block
}
func InitPipeline() {
strPipeline := os.Getenv("STREAMCHAIN_PIPELINE")
p := 0
if strPipeline != "" && strPipeline != "0" {
p, _ = strconv.Atoi(strPipeline)
}
ReadyToCommit = make(chan chan *cached.Block, p)
ReadyForValidation = make(chan *Pipeline, p)
}
package state
import (
"sync"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
"github.com/hyperledger/fabric/fastfabric-extensions/parallel"
common2 "github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/state"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/transientstore"
"github.com/pkg/errors"
)
var logger = util.GetLogger(util.ServiceLogger, "")
type ledgerResources interface {
VerifyBlock(block *cached.Block, pvtdata util.PvtDataCollections) (*cached.Block, error)
// StoreBlock deliver new block with underlined private data
// returns missing transaction ids
StoreBlock(block *cached.Block, data util.PvtDataCollections) error
// StorePvtData used to persist private date into transient store
StorePvtData(txid string, privData *transientstore.TxPvtReadWriteSetWithConfigInfo, blckHeight uint64) error
// GetPvtDataAndBlockByNum get block by number and returns also all related private data
// the order of private data in slice of PvtDataCollections doesn't imply the order of
// transactions in the block related to these private data, to get the correct placement
// need to read TxPvtData.SeqInBlock field
GetPvtDataAndBlockByNum(seqNum uint64, peerAuthInfo common.SignedData) (*common.Block, util.PvtDataCollections, error)
// Get recent block sequence number
LedgerHeight() (uint64, error)
// Close ledgerResources
Close()
}
type GossipStateProviderImpl struct {
state.GossipStateProvider
chainID string
buffer parallel.PayloadsBuffer
mediator *state.ServicesMediator
ledgerResources
done sync.WaitGroup
once sync.Once
stopCh chan struct{}
}
func NewGossipStateProvider(chainID string, services *state.ServicesMediator, ledger ledgerResources) state.GossipStateProvider {
height, err := ledger.LedgerHeight()
if height == 0 {
// Panic here since this is an indication of invalid situation which should not happen in normal
// code path.
logger.Panic("Committer height cannot be zero, ledger should include at least one block (genesis).")
}
if err != nil {
logger.Error("Could not read ledger info to obtain current ledger height due to: ", errors.WithStack(err))
// Exiting as without ledger it will be impossible
// to deliver new blocks
return nil
}
gsp := &GossipStateProviderImpl{
GossipStateProvider: state.NewGossipStateProvider(chainID, services, ledger),
chainID: chainID,
mediator: services,
ledgerResources: ledger,
buffer: parallel.NewPayloadsBuffer(height),
stopCh: make(chan struct{}, 1),
once: sync.Once{}}
gsp.done.Add(1)
parallel.InitPipeline()
go gsp.deliverPayloads()
return gsp
}
func (s *GossipStateProviderImpl) deliverPayloads() {
for pipelinedBlock := range parallel.ReadyForValidation {
go s.validate(pipelinedBlock)
}
logger.Debug("State provider has been stopped, finishing to push new blocks.")
return
}
func (s *GossipStateProviderImpl) commit() {
go s.store()
for blockPromise := range parallel.ReadyToCommit {
block, _ := <-blockPromise
if block != nil {
s.buffer.Push(block)
}
}
}
func (s *GossipStateProviderImpl) store() {
defer s.done.Done()
for {
select {
case <-s.buffer.Ready():
block := s.buffer.Pop()
// Commit block with available private transactions
if err := s.ledgerResources.StoreBlock(block, util.PvtDataCollections{}); err != nil {
logger.Errorf("Got error while committing(%+v)", errors.WithStack(err))
return
}
// Update ledger height
s.mediator.UpdateLedgerHeight(block.Header.Number+1, common2.ChainID(s.chainID))
logger.Debugf("[%s] Committed block [%d] with %d transaction(s)",
s.chainID, block.Header.Number, len(block.Data.Data))
case <-s.stopCh:
s.stopCh <- struct{}{}
logger.Debug("State provider has been stopped, finishing to push new blocks.")
return
}
}
}
func (s *GossipStateProviderImpl) validate(pipeline *parallel.Pipeline) {
defer close(pipeline.Channel)
if _, err := s.ledgerResources.VerifyBlock(pipeline.Block, util.PvtDataCollections{}); err != nil {
logger.Errorf("Validation failed: %+v", err)
return
}
pipeline.Channel <- pipeline.Block
}
func (s *GossipStateProviderImpl) Stop() {
// Make sure stop won't be executed twice
// and stop channel won't be used again
s.once.Do(func() {
s.stopCh <- struct{}{}
// Make sure all go-routines has finished
s.done.Wait()
})
s.GossipStateProvider.Stop()
}
......@@ -12,7 +12,8 @@ import (
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/committer/txvalidator"
"github.com/hyperledger/fabric/core/common/privdata"
"github.com/hyperledger/fabric/core/deliverservice"
deliverclient "github.com/hyperledger/fabric/core/deliverservice"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/gossip/api"
gossipCommon "github.com/hyperledger/fabric/gossip/common"
......
......@@ -144,10 +144,6 @@ func (n *node) run(campaign bool) {
// to the followers and them writing to their disks. Check 10.2.1 in thesis
n.send(rd.Messages)
//if config.Log.Ordering {
// fmt.Printf("ord1,%d,%d\n", time.Now().UnixNano(), rd.CommittedEntries[0].Index)
//}
case <-n.chain.haltC:
raftTicker.Stop()
n.Stop()
......
......@@ -44,6 +44,8 @@ import (
"google.golang.org/grpc/grpclog"
)
var viperMutex = &sync.Mutex{}
// checkSpec to see if chaincode resides within current package capture for language.
func checkSpec(spec *pb.ChaincodeSpec) error {
// Don't allow nil value
......@@ -210,9 +212,11 @@ func chaincodeInvokeOrQuery(cmd *cobra.Command, invoke bool, cf *ChaincodeCmdFac
if broadcastClient, ok := broadcastClientsMap[orderingEndpoint]; ok {
broadcastClients[i] = broadcastClient
} else {
viperMutex.Lock()
viper.Set("orderer.address", workloadEntry.orderingEndpoint)
viper.Set("orderer.tls.rootcert.file", workloadEntry.orderingCaFilePath)
broadcastClient, err = common.GetBroadcastClientFnc()
viperMutex.Unlock()
if err != nil {
res <- fmt.Errorf("Error getting broadcast client: %s", err)
......@@ -570,7 +574,9 @@ func InitCmdFactory(cmdName string, isEndorserRequired, isOrdererRequired bool)
}
logger.Infof("Retrieved channel (%s) orderer endpoint: %s", channelID, orderingEndpoints[0])
// override viper env
viperMutex.Lock()
viper.Set("orderer.address", orderingEndpoints[0])
viperMutex.Unlock()
}
broadcastClient, err = common.GetBroadcastClientFnc()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment