Commit fa3b2d0f authored by Lucas Kuhring's avatar Lucas Kuhring
Browse files

FInish batch writing and update logging points

parent 1b0b9e88
......@@ -8,7 +8,6 @@ package deliver
import (
"context"
"fmt"
"io"
"math"
"strconv"
......@@ -20,7 +19,6 @@ import (
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/config"
"github.com/hyperledger/fabric/core/comm"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
......@@ -322,10 +320,6 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
logger.Debugf("[channel: %s] Delivering block for (%p) for %s", chdr.ChannelId, seekInfo, addr)
if config.Log.Ordering {
fmt.Printf("ord1,%d,%d\n", time.Now().UnixNano(), block.Header.Number)
}
if err := srv.SendBlockResponse(block); err != nil {
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
return cb.Status_INTERNAL_SERVER_ERROR, err
......
......@@ -128,15 +128,14 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
strBatchSize := os.Getenv("STREAMCHAIN_WRITEBATCH")
var bfw *batchedBlockfileWriter
bsz := 0
if strBatchSize == "" || strBatchSize == "0" {
bfw = newBatchedBlockFileWriter(currentFileWriter, 0)
} else {
bsz, _ := strconv.Atoi(strBatchSize)
bfw = newBatchedBlockFileWriter(currentFileWriter, bsz)
if strBatchSize != "" && strBatchSize != "0" {
bsz, _ = strconv.Atoi(strBatchSize)
}
bfw := newBatchedBlockFileWriter(currentFileWriter, bsz)
if err != nil {
panic(fmt.Sprintf("Could not open writer to current file: %s", err))
}
......@@ -147,7 +146,7 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
}
// Create a new KeyValue store database handler for the blocks index in the keyvalue database
if mgr.index, err = newBlockIndex(indexConfig, indexStore); err != nil {
if mgr.index, err = newBlockIndex(indexConfig, indexStore, bsz); err != nil {
panic(fmt.Sprintf("error in block index: %s", err))
}
......@@ -327,12 +326,13 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error {
txOffset.loc.offset += len(blockBytesEncodedLen)
}
//save the index in the database
//start := time.Now()
if err = mgr.index.indexBlock(&blockIdxInfo{
blockNum: block.Header.Number, blockHash: blockHash,
flp: blockFLP, txOffsets: txOffsets, metadata: block.Metadata}); err != nil {
return err
}
//logger.Errorf("Indexing: %.2f", time.Since(start).Seconds()*1000)
//update the checkpoint info (for storage) and the blockchain info (for APIs) in the manager
mgr.updateCheckpoint(newCPInfo)
mgr.updateBlockchainInfo(blockHash, block)
......
......@@ -7,19 +7,22 @@ SPDX-License-Identifier: Apache-2.0
package fsblkstorage
import (
"fmt"
"os"
"sync"
"time"
"github.com/pkg/errors"
)
var lock = sync.RWMutex{}
type batchedBlockfileWriter struct {
batch int
bfw *blockfileWriter
buffer []writeInfo
currentLen int
//updated chan struct{}
batch int
bfw *blockfileWriter
buffer []writeInfo
currentLen int
currentBuffer []byte
updated chan struct{}
}
type writeInfo struct {
......@@ -28,8 +31,11 @@ type writeInfo struct {
}
func newBatchedBlockFileWriter(bfw *blockfileWriter, batch int) *batchedBlockfileWriter {
//return &batchedBlockfileWriter{bfw: bfw, batch: batch, buffer: make([]writeInfo, 0, batch), updated: make(chan struct{})}
return &batchedBlockfileWriter{bfw: bfw, batch: batch, buffer: make([]writeInfo, 0, batch)}
b := &batchedBlockfileWriter{bfw: bfw, batch: batch, buffer: make([]writeInfo, 0, batch), updated: make(chan struct{})}
go b.finalWrite()
return b
}
func (w *batchedBlockfileWriter) setBlockfileWriter(bfw *blockfileWriter) {
......@@ -44,23 +50,21 @@ func (w *batchedBlockfileWriter) append(b []byte, sync bool) error {
return w.bfw.append(b, sync)
}
if w.currentBuffer == nil {
w.currentBuffer = make([]byte, 0, len(b))
}
w.currentBuffer = append(w.currentBuffer, b...)
if sync {
w.buffer = append(w.buffer, writeInfo{file: w.bfw.file, data: b})
} else {
if len(w.buffer) > 0 {
last := w.buffer[len(w.buffer)-1]
last.data = append(last.data, b...)
} else {
w.buffer = append(w.buffer, writeInfo{file: w.bfw.file, data: b})
}
w.buffer = append(w.buffer, writeInfo{file: w.bfw.file, data: append([]byte(nil), w.currentBuffer...)})
w.currentBuffer = w.currentBuffer[:0]
}
if len(w.buffer) == w.batch {
if err := w.writeOut(true); err != nil {
return err
}
//go w.writeOut(true)
}
w.currentLen += len(b)
......@@ -68,33 +72,35 @@ func (w *batchedBlockfileWriter) append(b []byte, sync bool) error {
return nil
}
/*
func (w *batchedBlockfileWriter) finalWrite() {
for {
select {
case <-time.After(time.Second * 10):
if err := w.writeOut(false); err != nil {
logger.Errorf("Error in batched write")
logger.Errorf("Error in batched write: %v", err)
}
case <-w.updated:
return
}
}
}
*/
func (w *batchedBlockfileWriter) close() {
w.bfw.close()
//close(w.updated)
}
func (w *batchedBlockfileWriter) writeOut(wait bool) error {
start := time.Now()
//lock.Lock()
//start := time.Now()
//if wait {
// w.updated <- struct{}{}
//}
if wait {
go w.finalWrite()
}
w.updated <- struct{}{}
var err error
......@@ -102,7 +108,7 @@ func (w *batchedBlockfileWriter) writeOut(wait bool) error {
for _, v := range w.buffer {
if lastFile != nil && lastFile != v.file {
if lastFile != nil && lastFile.Name() != v.file.Name() {
if err = lastFile.Sync(); err != nil {
return err
}
......@@ -117,18 +123,18 @@ func (w *batchedBlockfileWriter) writeOut(wait bool) error {
lastFile = v.file
}
if err = lastFile.Sync(); err != nil {
return err
if lastFile != nil {
if err = lastFile.Sync(); err != nil {
return err
}
}
//if wait {
// go w.finalWrite()
//}
fmt.Printf("wr,%d,%d,%.2f\n", time.Now().UnixNano(), len(w.buffer), time.Since(start).Seconds()*1000)
//logger.Errorf("wr,%d,%d,%.2f\n", time.Now().UnixNano(), len(w.buffer), time.Since(start).Seconds()*1000)
w.buffer = w.buffer[:0]
//lock.Unlock()
return nil
}
......
......@@ -9,6 +9,7 @@ package fsblkstorage
import (
"bytes"
"fmt"
"sync"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
......@@ -32,6 +33,7 @@ const (
var indexCheckpointKey = []byte(indexCheckpointKeyStr)
var errIndexEmpty = errors.New("NoBlockIndexed")
var rwLock = &sync.RWMutex{}
type index interface {
getLastBlockIndexed() (uint64, error)
......@@ -55,9 +57,12 @@ type blockIdxInfo struct {
type blockIndex struct {
indexItemsMap map[blkstorage.IndexableAttr]bool
db *leveldbhelper.DBHandle
cache map[string][]byte
pendingBatch int
batchSize int
}
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) (*blockIndex, error) {
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle, batchSize int) (*blockIndex, error) {
indexItems := indexConfig.AttrsToIndex
logger.Debugf("newBlockIndex() - indexItems:[%s]", indexItems)
indexItemsMap := make(map[blkstorage.IndexableAttr]bool)
......@@ -72,13 +77,37 @@ func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHand
return nil, errors.Errorf("dependent index [%s] is not enabled for [%s] or [%s]",
blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrTxValidationCode, blkstorage.IndexableAttrBlockTxID)
}
return &blockIndex{indexItemsMap, db}, nil
return &blockIndex{indexItemsMap: indexItemsMap, db: db, cache: make(map[string][]byte), batchSize: batchSize, pendingBatch: 0}, nil
}
func (index *blockIndex) getFromCacheOrDB(key []byte) ([]byte, error) {
rwLock.RLock()
b, ok := index.cache[string(key)]
rwLock.RUnlock()
if !ok {
var err error
b, err = index.db.Get(key)
if err != nil {
return nil, err
}
}
return b, nil
}
func (index *blockIndex) getLastBlockIndexed() (uint64, error) {
var blockNumBytes []byte
var err error
if blockNumBytes, err = index.db.Get(indexCheckpointKey); err != nil {
if blockNumBytes, err = index.getFromCacheOrDB(indexCheckpointKey); err != nil {
return 0, err
}
if blockNumBytes == nil {
......@@ -97,20 +126,21 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
flp := blockIdxInfo.flp
txOffsets := blockIdxInfo.txOffsets
txsfltr := ledgerUtil.TxValidationFlags(blockIdxInfo.metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
batch := leveldbhelper.NewUpdateBatch()
flpBytes, err := flp.marshal()
if err != nil {
return err
}
update := make(map[string][]byte)
//Index1
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok {
batch.Put(constructBlockHashKey(blockIdxInfo.blockHash), flpBytes)
update[string(constructBlockHashKey(blockIdxInfo.blockHash))] = flpBytes
}
//Index2
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok {
batch.Put(constructBlockNumKey(blockIdxInfo.blockNum), flpBytes)
update[string(constructBlockNumKey(blockIdxInfo.blockNum))] = flpBytes
}
//Index3 Used to find a transaction by it's transaction id
......@@ -130,7 +160,8 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
if marshalErr != nil {
return marshalErr
}
batch.Put(constructTxIDKey(txoffset.txID), txFlpBytes)
update[string(constructTxIDKey(txoffset.txID))] = txFlpBytes
}
}
......@@ -143,7 +174,7 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
if marshalErr != nil {
return marshalErr
}
batch.Put(constructBlockNumTranNumKey(blockIdxInfo.blockNum, uint64(txIterator)), txFlpBytes)
update[string(constructBlockNumTranNumKey(blockIdxInfo.blockNum, uint64(txIterator)))] = txFlpBytes
}
}
......@@ -153,7 +184,7 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
if txoffset.isDuplicate { // do not overwrite txid entry in the index - FAB-8557
continue
}
batch.Put(constructBlockTxIDKey(txoffset.txID), flpBytes)
update[string(constructBlockTxIDKey(txoffset.txID))] = flpBytes
}
}
......@@ -163,15 +194,47 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
if txoffset.isDuplicate { // do not overwrite txid entry in the index - FAB-8557
continue
}
batch.Put(constructTxValidationCodeIDKey(txoffset.txID), []byte{byte(txsfltr.Flag(idx))})
update[string(constructTxValidationCodeIDKey(txoffset.txID))] = []byte{byte(txsfltr.Flag(idx))}
}
}
update[string(indexCheckpointKey)] = encodeBlockNum(blockIdxInfo.blockNum)
rwLock.Lock()
for k, v := range update {
index.cache[k] = v
}
index.pendingBatch++
if index.pendingBatch >= index.batchSize {
toBeWritten := make(map[string][]byte)
for k, v := range index.cache {
toBeWritten[k] = v
}
index.pendingBatch = 0
index.cache = make(map[string][]byte)
rwLock.Unlock()
return index.writeOut(toBeWritten)
}
batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
rwLock.Unlock()
return nil
}
func (index *blockIndex) writeOut(data map[string][]byte) error {
// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
if err := index.db.WriteBatch(batch, true); err != nil {
if err := index.db.WriteBatch(&leveldbhelper.UpdateBatch{KVs: data}, true); err != nil {
return err
}
return nil
}
......@@ -201,7 +264,8 @@ func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, e
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(constructBlockHashKey(blockHash))
b, err := index.getFromCacheOrDB(constructBlockHashKey(blockHash))
if err != nil {
return nil, err
}
......@@ -214,10 +278,11 @@ func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, e
}
func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) {
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(constructBlockNumKey(blockNum))
b, err := index.getFromCacheOrDB(constructBlockNumKey(blockNum))
if err != nil {
return nil, err
}
......@@ -230,10 +295,11 @@ func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer
}
func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) {
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(constructTxIDKey(txID))
b, err := index.getFromCacheOrDB(constructTxIDKey(txID))
if err != nil {
return nil, err
}
......@@ -246,10 +312,11 @@ func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) {
}
func (index *blockIndex) getBlockLocByTxID(txID string) (*fileLocPointer, error) {
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockTxID]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(constructBlockTxIDKey(txID))
b, err := index.getFromCacheOrDB(constructBlockTxIDKey(txID))
if err != nil {
return nil, err
}
......@@ -262,10 +329,11 @@ func (index *blockIndex) getBlockLocByTxID(txID string) (*fileLocPointer, error)
}
func (index *blockIndex) getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) {
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(constructBlockNumTranNumKey(blockNum, tranNum))
b, err := index.getFromCacheOrDB(constructBlockNumTranNumKey(blockNum, tranNum))
if err != nil {
return nil, err
}
......@@ -278,11 +346,12 @@ func (index *blockIndex) getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint
}
func (index *blockIndex) getTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) {
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxValidationCode]; !ok {
return peer.TxValidationCode(-1), blkstorage.ErrAttrNotIndexed
}
raw, err := index.db.Get(constructTxValidationCodeIDKey(txID))
raw, err := index.getFromCacheOrDB(constructTxValidationCodeIDKey(txID))
if err != nil {
return peer.TxValidationCode(-1), err
......
......@@ -447,7 +447,7 @@ func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedPro
}
if config.Log.Endorsement {
fmt.Printf("end,%d,%d\n", time.Now().UnixNano()/1000000, time.Since(startTime).Nanoseconds())
fmt.Printf("end,%d,%d\n", time.Now().UnixNano(), time.Since(startTime).Nanoseconds())
}
endorserLogger.Debug("Exit: request from", addr)
......
......@@ -311,7 +311,6 @@ func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) er
}
elapsedBlockProcessing := time.Since(startBlockProcessing)
startCommitBlockStorage := time.Now()
logger.Debugf("[%s] Committing block [%d] to storage", l.ledgerID, blockNo)
l.blockAPIsRWLock.Lock()
......@@ -320,14 +319,12 @@ func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) er
return err
}
elapsedCommitBlockStorage := time.Since(startCommitBlockStorage)
startCommitState := time.Now()
logger.Debugf("[%s] Committing block [%d] transactions to state database", l.ledgerID, blockNo)
if err = l.txtmgmt.Commit(); err != nil {
panic(errors.WithMessage(err, "error during commit to txmgr"))
}
elapsedCommitState := time.Since(startCommitState)
// History database could be written in parallel with state and/or async as a future optimization,
// although it has not been a bottleneck...no need to clutter the log with elapsed duration.
if ledgerconfig.IsHistoryDBEnabled() {
......@@ -336,7 +333,6 @@ func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) er
panic(errors.WithMessage(err, "Error during commit to history db"))
}
}
elapsedCommitWithPvtData := time.Since(startBlockProcessing)
logger.Infof("[%s] Committed block [%d] with %d transaction(s) in %dms (state_validation=%dms block_commit=%dms state_commit=%dms)",
......
......@@ -26,6 +26,7 @@ const confPeerFileSystemPath = "peer.fileSystemPath"
const confLedgersData = "ledgersData"
const confLedgerProvider = "ledgerProvider"
const confStateleveldb = "stateLeveldb"
const confStateleveldbPath = "peer.stateDBPath"
const confHistoryLeveldb = "historyLeveldb"
const confBookkeeper = "bookkeeper"
const confConfigHistory = "configHistory"
......@@ -55,7 +56,8 @@ func GetLedgerProviderPath() string {
// GetStateLevelDBPath returns the filesystem path that is used to maintain the state level db
func GetStateLevelDBPath() string {
return filepath.Join(GetRootPath(), confStateleveldb)
path := config.GetPath(confStateleveldbPath)
return filepath.Join(path, confStateleveldb)
}
// GetHistoryLevelDBPath returns the filesystem path that is used to maintain the history level db
......
......@@ -129,7 +129,7 @@ func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error
} else {
logger.Debugf("Skipping writing block [%d] to pvt block store as the store height is [%d]", blockNum, pvtBlkStoreHt)
}
//start := time.Now()
strAddBlock := os.Getenv("STREAMCHAIN_ADDBLOCK")
if blockAndPvtdata.Block.Header.Number < 10 || strAddBlock == "" || strAddBlock == "true" {
......@@ -138,9 +138,10 @@ func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error
return err
}
}
//logger.Errorf("Block Add: %f", time.Since(start).Seconds()*1000)
if writtenToPvtStore {
return s.pvtdataStore.Commit()
err := s.pvtdataStore.Commit()
return err
}
return nil
}
......
......@@ -8,7 +8,9 @@ package pvtdatastorage
import (
"fmt"
"os"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
......@@ -37,6 +39,8 @@ type store struct {
isEmpty bool
lastCommittedBlock uint64
batchPending bool
currentBatch int
batchSize int
purgerLock sync.Mutex
collElgProcSync *collElgProcSync
// After committing the pvtdata of old blocks,
......@@ -49,6 +53,7 @@ type store struct {
// in the stateDB needs to be updated before finishing the
// recovery operation.
isLastUpdatedOldBlocksSet bool
batchToWrite bool
}
type blkTranNumKey []byte
......@@ -127,6 +132,13 @@ func (p *provider) OpenStore(ledgerid string) (Store, error) {
s.launchCollElgProc()
logger.Debugf("Pvtdata store opened. Initial state: isEmpty [%t], lastCommittedBlock [%d], batchPending [%t]",
s.isEmpty, s.lastCommittedBlock, s.batchPending)
strBatchSize := os.Getenv("STREAMCHAIN_WRITEBATCH")
if strBatchSize == "" || strBatchSize == "0" {
s.batchSize, _ = strconv.Atoi(strBatchSize)
}
return s, nil
}
......@@ -161,6 +173,10 @@ func (s *store) Init(btlPolicy pvtdatapolicy.BTLPolicy) {
s.btlPolicy = btlPolicy
}
func (s *storeEntries) isEmpty() bool {
return len(s.dataEntries)&len(s.expiryEntries)&len(s.missingDataEntries) == 0
}
// Prepare implements the function in the interface `Store`
func (s *store) Prepare(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtData ledger.TxMissingPvtDataMap) error {
if s.batchPending {
......@@ -172,7 +188,6 @@ func (s *store) Prepare(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvt
return &ErrIllegalArgs{fmt.Sprintf("Expected block number=%d, recived block number=%d", expectedBlockNum, blockNum)}
}
batch := leveldbhelper.NewUpdateBatch()