Commit c7badc3e authored by manish's avatar manish
Browse files

Raw ledger implementation

Raw ledger provides basic functionality for storing and
retrieving blocks. This is intended to be used by an orderer service
https://jira.hyperledger.org/browse/FAB-56



Change-Id: I3fb733f5be53b6f630c20554ba4e362540b8f55a
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent bb413ce5
......@@ -716,16 +716,18 @@ func (handler *Handler) handleRangeQueryState(msg *pb.ChaincodeMessage) {
handler.putRangeQueryIterator(txContext, iterID, rangeIter)
hasNext := rangeIter.Next()
var keysAndValues []*pb.RangeQueryStateKeyValue
var i = uint32(0)
for ; hasNext && i < maxRangeQueryStateLimit; i++ {
qresult, err := rangeIter.Get()
var qresult ledger.QueryResult
for ; i < maxRangeQueryStateLimit; i++ {
qresult, err := rangeIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult == nil {
break
}
//PDMP - let it panic if not KV
kv := qresult.(ledger.KV)
// Decrypt the data if the confidential is enabled
......@@ -742,16 +744,14 @@ func (handler *Handler) handleRangeQueryState(msg *pb.ChaincodeMessage) {
}
keyAndValue := pb.RangeQueryStateKeyValue{Key: kv.Key, Value: decryptedValue}
keysAndValues = append(keysAndValues, &keyAndValue)
hasNext = rangeIter.Next()
}
if !hasNext {
if qresult != nil {
rangeIter.Close()
handler.deleteRangeQueryIterator(txContext, iterID)
}
payload := &pb.RangeQueryStateResponse{KeysAndValues: keysAndValues, HasMore: hasNext, ID: iterID}
payload := &pb.RangeQueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, ID: iterID}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
rangeIter.Close()
......@@ -827,13 +827,18 @@ func (handler *Handler) handleRangeQueryStateNext(msg *pb.ChaincodeMessage) {
var keysAndValues []*pb.RangeQueryStateKeyValue
var i = uint32(0)
hasNext := true
for ; hasNext && i < maxRangeQueryStateLimit; i++ {
qresult, err := rangeIter.Get()
var qresult ledger.QueryResult
var err error
for ; i < maxRangeQueryStateLimit; i++ {
qresult, err = rangeIter.Next()
if err != nil {
chaincodeLogger.Errorf("Failed to get query result from iterator. Sending %s", pb.ChaincodeMessage_ERROR)
return
}
if qresult != nil {
break
}
//PDMP - let it panic if not KV
kv := qresult.(ledger.KV)
// Decrypt the data if the confidential is enabled
......@@ -850,16 +855,14 @@ func (handler *Handler) handleRangeQueryStateNext(msg *pb.ChaincodeMessage) {
}
keyAndValue := pb.RangeQueryStateKeyValue{Key: kv.Key, Value: decryptedValue}
keysAndValues = append(keysAndValues, &keyAndValue)
hasNext = rangeIter.Next()
}
if !hasNext {
if qresult != nil {
rangeIter.Close()
handler.deleteRangeQueryIterator(txContext, rangeQueryStateNext.ID)
}
payload := &pb.RangeQueryStateResponse{KeysAndValues: keysAndValues, HasMore: hasNext, ID: rangeQueryStateNext.ID}
payload := &pb.RangeQueryStateResponse{KeysAndValues: keysAndValues, HasMore: qresult != nil, ID: rangeQueryStateNext.ID}
payloadBytes, err := proto.Marshal(payload)
if err != nil {
rangeIter.Close()
......
......@@ -17,15 +17,41 @@ limitations under the License.
package blkstorage
import (
"errors"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos"
)
// IndexableAttr represents an indexable attribute
type IndexableAttr string
// constants for indexable attributes
const (
IndexableAttrBlockNum = IndexableAttr("BlockNum")
IndexableAttrBlockHash = IndexableAttr("BlockHash")
IndexableAttrTxID = IndexableAttr("TxID")
)
// IndexConfig - a configuration that includes a list of attributes that should be indexed
type IndexConfig struct {
AttrsToIndex []IndexableAttr
}
var (
// ErrNotFoundInIndex is used to indicate missing entry in the index
ErrNotFoundInIndex = errors.New("Entry not found in index")
// ErrAttrNotIndexed is used to indicate that an attribute is not indexed
ErrAttrNotIndexed = errors.New("Attribute not indexed")
)
// BlockStore - an interface for persisting and retrieving blocks
// An implementation of this interface is expected to take an argument
// of type `IndexConfig` which configures the block store on what items should be indexed
type BlockStore interface {
AddBlock(block *protos.Block2) error
GetBlockchainInfo() (*protos.BlockchainInfo, error)
RetrieveBlocks(startNum uint64, endNum uint64) (ledger.ResultsIterator, error)
RetrieveBlocks(startNum uint64) (ledger.ResultsIterator, error)
RetrieveBlockByHash(blockHash []byte) (*protos.Block2, error)
RetrieveBlockByNumber(blockNum uint64) (*protos.Block2, error)
RetrieveTxByID(txID string) (*protos.Transaction2, error)
......
......@@ -166,7 +166,7 @@ func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementI
return nil, nil, err
}
logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum)
if blockBytes == nil && s.currentFileNum < s.endFileNum {
if blockBytes == nil && (s.currentFileNum < s.endFileNum || s.endFileNum < 0) {
logger.Debugf("current file [%d] exhausted. Moving to next file", s.currentFileNum)
if err = s.moveToNextBlockfileStream(); err != nil {
return nil, nil, err
......
......@@ -18,9 +18,11 @@ package fsblkstorage
import (
"fmt"
"sync"
"sync/atomic"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/core/ledger/util/db"
"github.com/hyperledger/fabric/protos"
......@@ -46,11 +48,12 @@ type blockfileMgr struct {
defaultCF *gorocksdb.ColumnFamilyHandle
index index
cpInfo *checkpointInfo
cpInfoCond *sync.Cond
currentFileWriter *blockfileWriter
bcInfo atomic.Value
}
func newBlockfileMgr(conf *Conf) *blockfileMgr {
func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfileMgr {
rootDir := conf.blockfilesDir
_, err := util.CreateDirIfMissing(rootDir)
if err != nil {
......@@ -69,7 +72,7 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
}
updateCPInfo(conf, cpInfo)
syncCPInfoFromFS(conf, cpInfo)
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
if err != nil {
panic(fmt.Sprintf("Could not open writer to current file: %s", err))
......@@ -79,9 +82,10 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr {
panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err))
}
mgr.index = newBlockIndex(db, db.GetCFHandle(blockIndexCF))
mgr.index = newBlockIndex(indexConfig, db, db.GetCFHandle(blockIndexCF))
mgr.cpInfo = cpInfo
mgr.currentFileWriter = currentFileWriter
mgr.cpInfoCond = sync.NewCond(&sync.Mutex{})
mgr.syncIndex()
// init BlockchainInfo
......@@ -119,7 +123,7 @@ func initDB(conf *Conf) *db.DB {
return dbInst
}
func updateCPInfo(conf *Conf, cpInfo *checkpointInfo) {
func syncCPInfoFromFS(conf *Conf, cpInfo *checkpointInfo) {
logger.Debugf("Starting checkpoint=%s", cpInfo)
rootDir := conf.blockfilesDir
filePath := deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)
......@@ -156,23 +160,24 @@ func (mgr *blockfileMgr) close() {
}
func (mgr *blockfileMgr) moveToNextFile() {
nextFileInfo := &checkpointInfo{
cpInfo := &checkpointInfo{
latestFileChunkSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum + 1,
latestFileChunksize: 0}
latestFileChunksize: 0,
lastBlockNumber: mgr.cpInfo.lastBlockNumber}
nextFileWriter, err := newBlockfileWriter(
deriveBlockfilePath(mgr.rootDir, nextFileInfo.latestFileChunkSuffixNum))
deriveBlockfilePath(mgr.rootDir, cpInfo.latestFileChunkSuffixNum))
if err != nil {
panic(fmt.Sprintf("Could not open writer to next file: %s", err))
}
mgr.currentFileWriter.close()
err = mgr.saveCurrentInfo(nextFileInfo, true)
err = mgr.saveCurrentInfo(cpInfo, true)
if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
mgr.cpInfo = nextFileInfo
mgr.currentFileWriter = nextFileWriter
mgr.updateCheckpoint(cpInfo)
}
func (mgr *blockfileMgr) addBlock(block *protos.Block2) error {
......@@ -207,26 +212,30 @@ func (mgr *blockfileMgr) addBlock(block *protos.Block2) error {
return fmt.Errorf("Error while appending block to file: %s", err)
}
mgr.cpInfo.latestFileChunksize += totalBytesToAppend
mgr.cpInfo.lastBlockNumber++
err = mgr.saveCurrentInfo(mgr.cpInfo, false)
if err != nil {
mgr.cpInfo.latestFileChunksize -= totalBytesToAppend
truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
currentCPInfo := mgr.cpInfo
newCPInfo := &checkpointInfo{
latestFileChunkSuffixNum: currentCPInfo.latestFileChunkSuffixNum,
latestFileChunksize: currentCPInfo.latestFileChunksize + totalBytesToAppend,
lastBlockNumber: currentCPInfo.lastBlockNumber + 1}
if err = mgr.saveCurrentInfo(newCPInfo, false); err != nil {
truncateErr := mgr.currentFileWriter.truncateFile(currentCPInfo.latestFileChunksize)
if truncateErr != nil {
panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err))
}
return fmt.Errorf("Error while saving current file info to db: %s", err)
}
blockFLP := &fileLocPointer{fileSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum}
blockFLP := &fileLocPointer{fileSuffixNum: newCPInfo.latestFileChunkSuffixNum}
blockFLP.offset = currentOffset
// shift the txoffset because we prepend length of bytes before block bytes
for i := 0; i < len(txOffsets); i++ {
txOffsets[i] += len(blockBytesEncodedLen)
}
mgr.index.indexBlock(&blockIdxInfo{
blockNum: mgr.cpInfo.lastBlockNumber, blockHash: blockHash,
blockNum: newCPInfo.lastBlockNumber, blockHash: blockHash,
flp: blockFLP, txOffsets: txOffsets})
mgr.updateCheckpoint(newCPInfo)
mgr.updateBlockchainInfo(blockHash, block)
return nil
}
......@@ -291,6 +300,14 @@ func (mgr *blockfileMgr) getBlockchainInfo() *protos.BlockchainInfo {
return mgr.bcInfo.Load().(*protos.BlockchainInfo)
}
func (mgr *blockfileMgr) updateCheckpoint(cpInfo *checkpointInfo) {
mgr.cpInfoCond.L.Lock()
defer mgr.cpInfoCond.L.Unlock()
mgr.cpInfo = cpInfo
logger.Debugf("Broadcasting about update checkpointInfo: %s", cpInfo)
mgr.cpInfoCond.Broadcast()
}
func (mgr *blockfileMgr) updateBlockchainInfo(latestBlockHash []byte, latestBlock *protos.Block2) {
currentBCInfo := mgr.getBlockchainInfo()
newBCInfo := &protos.BlockchainInfo{
......@@ -328,18 +345,8 @@ func (mgr *blockfileMgr) retrieveSerBlockByNumber(blockNum uint64) (*protos.SerB
return mgr.fetchSerBlock(loc)
}
func (mgr *blockfileMgr) retrieveBlocks(startNum uint64, endNum uint64) (*BlocksItr, error) {
var lp *fileLocPointer
var err error
if lp, err = mgr.index.getBlockLocByBlockNum(startNum); err != nil {
return nil, err
}
var stream *blockStream
if stream, err = newBlockStream(mgr.rootDir, lp.fileSuffixNum,
int64(lp.offset), mgr.cpInfo.latestFileChunkSuffixNum); err != nil {
return nil, err
}
return newBlockItr(stream, int(endNum-startNum)+1), nil
func (mgr *blockfileMgr) retrieveBlocks(startNum uint64) (*BlocksItr, error) {
return newBlockItr(mgr, startNum), nil
}
func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*protos.Transaction2, error) {
......
......@@ -103,14 +103,18 @@ func TestBlockfileMgrBlockIterator(t *testing.T) {
func testBlockfileMgrBlockIterator(t *testing.T, blockfileMgr *blockfileMgr,
firstBlockNum int, lastBlockNum int, expectedBlocks []*protos.Block2) {
itr, err := blockfileMgr.retrieveBlocks(uint64(firstBlockNum), uint64(lastBlockNum))
itr, err := blockfileMgr.retrieveBlocks(uint64(firstBlockNum))
defer itr.Close()
testutil.AssertNoError(t, err, "Error while getting blocks iterator")
numBlocksItrated := 0
for ; itr.Next(); numBlocksItrated++ {
block, err := itr.Get()
for {
block, err := itr.Next()
testutil.AssertNoError(t, err, fmt.Sprintf("Error while getting block number [%d] from iterator", numBlocksItrated))
testutil.AssertEquals(t, block.(*BlockHolder).GetBlock(), expectedBlocks[numBlocksItrated])
numBlocksItrated++
if numBlocksItrated == lastBlockNum-firstBlockNum+1 {
break
}
}
testutil.AssertEquals(t, numBlocksItrated, lastBlockNum-firstBlockNum+1)
}
......
......@@ -17,10 +17,10 @@ limitations under the License.
package fsblkstorage
import (
"errors"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/core/ledger/util/db"
"github.com/tecbot/gorocksdb"
......@@ -50,16 +50,21 @@ type blockIdxInfo struct {
txOffsets []int
}
// ErrNotFoundInIndex is used to indicate missing entry in the index
var ErrNotFoundInIndex = errors.New("Entry not found in index")
type blockIndex struct {
db *db.DB
blockIndexCF *gorocksdb.ColumnFamilyHandle
indexItemsMap map[blkstorage.IndexableAttr]bool
db *db.DB
blockIndexCF *gorocksdb.ColumnFamilyHandle
}
func newBlockIndex(db *db.DB, indexCFHandle *gorocksdb.ColumnFamilyHandle) *blockIndex {
return &blockIndex{db, indexCFHandle}
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *db.DB,
indexCFHandle *gorocksdb.ColumnFamilyHandle) *blockIndex {
indexItems := indexConfig.AttrsToIndex
logger.Debugf("newBlockIndex() - indexItems:[%s]", indexItems)
indexItemsMap := make(map[blkstorage.IndexableAttr]bool)
for _, indexItem := range indexItems {
indexItemsMap[indexItem] = true
}
return &blockIndex{indexItemsMap, db, indexCFHandle}
}
func (index *blockIndex) getLastBlockIndexed() (uint64, error) {
......@@ -72,6 +77,11 @@ func (index *blockIndex) getLastBlockIndexed() (uint64, error) {
}
func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
// do not index anyting
if len(index.indexItemsMap) == 0 {
logger.Debug("Not indexing block... as nothing to index")
return nil
}
logger.Debugf("Indexing block [%s]", blockIdxInfo)
flp := blockIdxInfo.flp
txOffsets := blockIdxInfo.txOffsets
......@@ -81,19 +91,29 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
if err != nil {
return err
}
batch.PutCF(index.blockIndexCF, constructBlockHashKey(blockIdxInfo.blockHash), flpBytes)
batch.PutCF(index.blockIndexCF, constructBlockNumKey(blockIdxInfo.blockNum), flpBytes)
for i := 0; i < len(txOffsets)-1; i++ {
txID := constructTxID(blockIdxInfo.blockNum, i)
txBytesLength := txOffsets[i+1] - txOffsets[i]
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, &locPointer{txOffsets[i], txBytesLength})
logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFlp, txID)
txFlpBytes, marshalErr := txFlp.marshal()
if marshalErr != nil {
return marshalErr
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok {
batch.PutCF(index.blockIndexCF, constructBlockHashKey(blockIdxInfo.blockHash), flpBytes)
}
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok {
batch.PutCF(index.blockIndexCF, constructBlockNumKey(blockIdxInfo.blockNum), flpBytes)
}
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok {
for i := 0; i < len(txOffsets)-1; i++ {
txID := constructTxID(blockIdxInfo.blockNum, i)
txBytesLength := txOffsets[i+1] - txOffsets[i]
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, &locPointer{txOffsets[i], txBytesLength})
logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFlp, txID)
txFlpBytes, marshalErr := txFlp.marshal()
if marshalErr != nil {
return marshalErr
}
batch.PutCF(index.blockIndexCF, constructTxIDKey(txID), txFlpBytes)
}
batch.PutCF(index.blockIndexCF, constructTxIDKey(txID), txFlpBytes)
}
batch.PutCF(index.blockIndexCF, indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
if err := index.db.WriteBatch(batch); err != nil {
return err
......@@ -102,12 +122,15 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
}
func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) {
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(index.blockIndexCF, constructBlockHashKey(blockHash))
if err != nil {
return nil, err
}
if b == nil {
return nil, ErrNotFoundInIndex
return nil, blkstorage.ErrNotFoundInIndex
}
blkLoc := &fileLocPointer{}
blkLoc.unmarshal(b)
......@@ -115,12 +138,15 @@ func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, e
}
func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) {
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(index.blockIndexCF, constructBlockNumKey(blockNum))
if err != nil {
return nil, err
}
if b == nil {
return nil, ErrNotFoundInIndex
return nil, blkstorage.ErrNotFoundInIndex
}
blkLoc := &fileLocPointer{}
blkLoc.unmarshal(b)
......@@ -128,12 +154,15 @@ func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer
}
func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) {
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(index.blockIndexCF, constructTxIDKey(txID))
if err != nil {
return nil, err
}
if b == nil {
return nil, ErrNotFoundInIndex
return nil, blkstorage.ErrNotFoundInIndex
}
txFLP := &fileLocPointer{}
txFLP.unmarshal(b)
......
......@@ -20,7 +20,10 @@ import (
"fmt"
"testing"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger/testutil"
"github.com/hyperledger/fabric/protos"
)
type noopIndex struct {
......@@ -77,7 +80,7 @@ func testBlockIndexSync(t *testing.T, numBlocks int, numBlocksToIndex int, syncB
// The last set of blocks should not be present in the original index
for i := numBlocksToIndex + 1; i <= numBlocks; i++ {
_, err := blkfileMgr.retrieveBlockByNumber(uint64(i))
testutil.AssertSame(t, err, ErrNotFoundInIndex)
testutil.AssertSame(t, err, blkstorage.ErrNotFoundInIndex)
}
// perform index sync
......@@ -97,3 +100,53 @@ func testBlockIndexSync(t *testing.T, numBlocks int, numBlocksToIndex int, syncB
testutil.AssertEquals(t, block, blocks[i-1])
}
}
func TestBlockIndexSelectiveIndexing(t *testing.T) {
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNum})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash, blkstorage.IndexableAttrBlockNum})
}
func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.IndexableAttr) {
env := newTestEnv(t)
env.indexConfig.AttrsToIndex = indexItems
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 3)
// add test blocks
blkfileMgrWrapper.addBlocks(blocks)
blockfileMgr := blkfileMgrWrapper.blockfileMgr
// if index has been configured for an indexItem then the item should be indexed else not
// test 'retrieveBlockByHash'
block, err := blockfileMgr.retrieveBlockByHash(testutil.ComputeBlockHash(t, blocks[0]))
if testutil.Contains(indexItems, blkstorage.IndexableAttrBlockHash) {
testutil.AssertNoError(t, err, "Error while retrieving block by hash")
testutil.AssertEquals(t, block, blocks[0])
} else {
testutil.AssertSame(t, err, blkstorage.ErrAttrNotIndexed)
}
// test 'retrieveBlockByNumber'
block, err = blockfileMgr.retrieveBlockByNumber(1)
if testutil.Contains(indexItems, blkstorage.IndexableAttrBlockNum) {
testutil.AssertNoError(t, err, "Error while retrieving block by number")
testutil.AssertEquals(t, block, blocks[0])
} else {
testutil.AssertSame(t, err, blkstorage.ErrAttrNotIndexed)
}
// test 'retrieveTransactionByID'
tx, err := blockfileMgr.retrieveTransactionByID(constructTxID(1, 0))
if testutil.Contains(indexItems, blkstorage.IndexableAttrTxID) {
testutil.AssertNoError(t, err, "Error while retrieving tx by id")
txOrig := &protos.Transaction2{}
proto.Unmarshal(blocks[0].Transactions[0], txOrig)
testutil.AssertEquals(t, tx, txOrig)
} else {
testutil.AssertSame(t, err, blkstorage.ErrAttrNotIndexed)
}
}
......@@ -18,6 +18,7 @@ package fsblkstorage
import (
"fmt"
"sync"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos"
......@@ -45,36 +46,78 @@ func (bh *BlockHolder) GetBlockBytes() []byte {
// BlocksItr - an iterator for iterating over a sequence of blocks
type BlocksItr struct {
stream *blockStream
nextBlockBytes []byte
err error
numTotalBlocks int
numIteratedBlocks int
mgr *blockfileMgr
maxBlockNumAvailable uint64
blockNumToRetrieve uint64
stream *blockStream
closeMarker bool
closeMarkerLock *sync.Mutex
}
func newBlockItr(stream *blockStream, numTotalBlocks int) *BlocksItr {
return &BlocksItr{stream, nil, nil, numTotalBlocks, 0}
func newBlockItr(mgr *blockfileMgr, startBlockNum uint64) *BlocksItr {
return &BlocksItr{mgr, mgr.cpInfo.lastBlockNumber, startBlockNum, nil, false, &sync.Mutex{}}
}
// Next moves the cursor to next block and returns true iff the iterator is not exhausted
func (itr *BlocksItr) Next() bool {
if itr.err != nil || itr.numIteratedBlocks == itr.numTotalBlocks {
return false
func (itr *BlocksItr) waitForBlock(blockNum uint64) uint64 {
itr.mgr.cpInfoCond.L.Lock()
defer itr.mgr.cpInfoCond.L.Unlock()
for itr.mgr.cpInfo.lastBlockNumber < blockNum && !itr.shouldClose() {
logger.Debugf("Going to wait for newer blocks. maxAvailaBlockNumber=[%d], waitForBlockNum=[%d]",
itr.mgr.cpInfo.lastBlockNumber, blockNum)
itr.mgr.cpInfoCond.Wait()
logger.Debugf("Came out of wait. maxAvailaBlockNumber=[%d]", itr.mgr.cpInfo.lastBlockNumber)
}
return itr.mgr.cpInfo.lastBlockNumber
}
func (itr *BlocksItr) initStream() error {
var lp *fileLocPointer
var err error
if lp, err = itr.mgr.index.getBlockLocByBlockNum(itr.blockNumToRetrieve); err != nil {
return err