Commit 8cdd0f4d authored by manish's avatar manish
Browse files

SingleLevelDB for block index

https://jira.hyperledger.org/browse/FAB-1664



This changeset:
- Renames package ledger/util/db to ledger/util/leveldbhelper
- Implements a leveldb provider
  (that enables using same leveldb instance as a multiple logical dbs)
  in util package for being able to reuse across statedb, index,
  and later for historydb
- Implements a provider as a single point of invocation
  for managing multiple block storage
- Uses a single leveldb instance for block storage index
- Makes the structures other than providers as private
  to their respective packages

Change-Id: I5f0b3b9aa8ef3ac1ccdce4f3c6fa6d842b5318c1
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent 1642e880
......@@ -48,6 +48,15 @@ var (
ErrAttrNotIndexed = errors.New("Attribute not indexed")
)
// BlockStoreProvider provides an handle to a BlockStore
type BlockStoreProvider interface {
CreateBlockStore(ledgerid string) (BlockStore, error)
OpenBlockStore(ledgerid string) (BlockStore, error)
Exists(ledgerid string) (bool, error)
List() ([]string, error)
Close()
}
// BlockStore - an interface for persisting and retrieving blocks
// An implementation of this interface is expected to take an argument
// of type `IndexConfig` which configures the block store on what items should be indexed
......
......@@ -30,15 +30,15 @@ func TestBlockfileStream(t *testing.T) {
}
func testBlockfileStream(t *testing.T, numBlocks int) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
w := newTestBlockfileWrapper(t, env)
blockfileMgr := w.blockfileMgr
ledgerid := "testledger"
w := newTestBlockfileWrapper(env, ledgerid)
blocks := testutil.ConstructTestBlocks(t, numBlocks)
w.addBlocks(blocks)
w.close()
s, err := newBlockfileStream(blockfileMgr.rootDir, 0, 0)
s, err := newBlockfileStream(w.blockfileMgr.rootDir, 0, 0)
defer s.close()
testutil.AssertNoError(t, err, "Error in constructing blockfile stream")
......@@ -71,9 +71,9 @@ func TestBlockFileStreamUnexpectedEOF(t *testing.T) {
}
func testBlockFileStreamUnexpectedEOF(t *testing.T, numBlocks int, partialBlockBytes []byte) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
w := newTestBlockfileWrapper(t, env)
w := newTestBlockfileWrapper(env, "testLedger")
blockfileMgr := w.blockfileMgr
blocks := testutil.ConstructTestBlocks(t, numBlocks)
w.addBlocks(blocks)
......@@ -100,9 +100,9 @@ func TestBlockStream(t *testing.T) {
}
func testBlockStream(t *testing.T, numFiles int) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
w := newTestBlockfileWrapper(t, env)
w := newTestBlockfileWrapper(env, "testLedger")
defer w.close()
blockfileMgr := w.blockfileMgr
......
......@@ -25,7 +25,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/core/ledger/util/db"
"github.com/hyperledger/fabric/core/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
putil "github.com/hyperledger/fabric/protos/utils"
......@@ -43,10 +43,15 @@ var (
blkMgrInfoKey = []byte("blkMgrInfo")
)
type conf struct {
blockfilesDir string
maxBlockfileSize int
}
type blockfileMgr struct {
rootDir string
conf *Conf
db *db.DB
db *leveldbhelper.DBHandle
index index
cpInfo *checkpointInfo
cpInfoCond *sync.Cond
......@@ -95,17 +100,15 @@ At start up a new manager:
-- If index and file system are not in sync, syncs index from the FS
*) Updates blockchain info used by the APIs
*/
func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfileMgr {
func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, indexStore *leveldbhelper.DBHandle) *blockfileMgr {
//Determine the root directory for the blockfile storage, if it does not exist create it
rootDir := conf.blockfilesDir
rootDir := conf.getLedgerBlockDir(id)
_, err := util.CreateDirIfMissing(rootDir)
if err != nil {
panic(fmt.Sprintf("Error: %s", err))
}
//Determine the kev value db instance, if it does not exist, create the directory and instantiate the database.
db := initDB(conf)
// Instantiate the manager, i.e. blockFileMgr structure
mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: db}
mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore}
// cp = checkpointInfo, retrieve from the database the file suffix or number of where blocks were stored.
// It also retrieves the current size of that file and the last block number that was written to that file.
......@@ -123,7 +126,7 @@ func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfile
}
//Verify that the checkpoint stored in db is accurate with what is actually stored in block file system
// If not the same, sync the cpInfo and the file system
syncCPInfoFromFS(conf, cpInfo)
syncCPInfoFromFS(rootDir, cpInfo)
//Open a writer to the file identified by the number and truncate it to only contain the latest block
// that was completely saved (file system, index, cpinfo, etc)
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
......@@ -137,7 +140,7 @@ func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfile
}
// Create a new KeyValue store database handler for the blocks index in the keyvalue database
mgr.index = newBlockIndex(indexConfig, db)
mgr.index = newBlockIndex(indexConfig, indexStore)
// Update the manager with the checkpoint info and the file writer
mgr.cpInfo = cpInfo
......@@ -174,21 +177,13 @@ func newBlockfileMgr(conf *Conf, indexConfig *blkstorage.IndexConfig) *blockfile
return mgr
}
func initDB(conf *Conf) *db.DB {
dbInst := db.CreateDB(&db.Conf{
DBPath: conf.dbPath})
dbInst.Open()
return dbInst
}
//cp = checkpointInfo, from the database gets the file suffix and the size of
// the file of where the last block was written. Also retrieves contains the
// last block number that was written. At init
//checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
func syncCPInfoFromFS(conf *Conf, cpInfo *checkpointInfo) {
func syncCPInfoFromFS(rootDir string, cpInfo *checkpointInfo) {
logger.Debugf("Starting checkpoint=%s", cpInfo)
//Checks if the file suffix of where the last block was written exists
rootDir := conf.blockfilesDir
filePath := deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)
exists, size, err := util.FileExists(filePath)
if err != nil {
......@@ -224,7 +219,6 @@ func (mgr *blockfileMgr) open() error {
func (mgr *blockfileMgr) close() {
mgr.currentFileWriter.close()
mgr.db.Close()
}
func (mgr *blockfileMgr) moveToNextFile() {
......@@ -443,7 +437,7 @@ func (mgr *blockfileMgr) retrieveBlockHeaderByNumber(blockNum uint64) (*common.B
return info.blockHeader, nil
}
func (mgr *blockfileMgr) retrieveBlocks(startNum uint64) (*BlocksItr, error) {
func (mgr *blockfileMgr) retrieveBlocks(startNum uint64) (*blocksItr, error) {
return newBlockItr(mgr, startNum), nil
}
......
......@@ -28,9 +28,9 @@ import (
)
func TestBlockfileMgrBlockReadWrite(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
......@@ -47,9 +47,10 @@ func TestBlockfileMgrCrashDuringWriting(t *testing.T) {
func testBlockfileMgrCrashDuringWriting(t *testing.T, numBlocksBeforeCheckpoint int,
numBlocksAfterCheckpoint int, numLastBlockBytes int, numPartialBytesToWrite int) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
ledgerid := "testLedger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
bg := testutil.NewBlockGenerator(t)
blocksBeforeCP := bg.NextTestBlocks(numBlocksBeforeCheckpoint)
blkfileMgrWrapper.addBlocks(blocksBeforeCP)
......@@ -75,7 +76,7 @@ func testBlockfileMgrCrashDuringWriting(t *testing.T, numBlocksBeforeCheckpoint
blkfileMgrWrapper.close()
// simulate a start after a crash
blkfileMgrWrapper = newTestBlockfileWrapper(t, env)
blkfileMgrWrapper = newTestBlockfileWrapper(env, ledgerid)
defer blkfileMgrWrapper.close()
cpInfo3 := blkfileMgrWrapper.blockfileMgr.cpInfo
testutil.AssertEquals(t, cpInfo3, cpInfo2)
......@@ -91,9 +92,9 @@ func testBlockfileMgrCrashDuringWriting(t *testing.T, numBlocksBeforeCheckpoint
}
func TestBlockfileMgrBlockIterator(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
......@@ -109,7 +110,7 @@ func testBlockfileMgrBlockIterator(t *testing.T, blockfileMgr *blockfileMgr,
for {
block, err := itr.Next()
testutil.AssertNoError(t, err, fmt.Sprintf("Error while getting block number [%d] from iterator", numBlocksItrated))
testutil.AssertEquals(t, block.(*BlockHolder).GetBlock(), expectedBlocks[numBlocksItrated])
testutil.AssertEquals(t, block.(*blockHolder).GetBlock(), expectedBlocks[numBlocksItrated])
numBlocksItrated++
if numBlocksItrated == lastBlockNum-firstBlockNum+1 {
break
......@@ -119,9 +120,9 @@ func testBlockfileMgrBlockIterator(t *testing.T, blockfileMgr *blockfileMgr,
}
func TestBlockfileMgrBlockchainInfo(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
bcInfo := blkfileMgrWrapper.blockfileMgr.getBlockchainInfo()
......@@ -134,9 +135,9 @@ func TestBlockfileMgrBlockchainInfo(t *testing.T) {
}
func TestBlockfileMgrGetTxById(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
......@@ -155,21 +156,21 @@ func TestBlockfileMgrGetTxById(t *testing.T) {
}
func TestBlockfileMgrRestart(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
ledgerid := "testLedger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
blkfileMgrWrapper.close()
blkfileMgrWrapper = newTestBlockfileWrapper(t, env)
blkfileMgrWrapper = newTestBlockfileWrapper(env, ledgerid)
defer blkfileMgrWrapper.close()
testutil.AssertEquals(t, int(blkfileMgrWrapper.blockfileMgr.cpInfo.lastBlockNumber), 10)
blkfileMgrWrapper.testGetBlockByHash(blocks)
}
func TestBlockfileMgrFileRolling(t *testing.T) {
env := newTestEnv(t)
blocks := testutil.ConstructTestBlocks(t, 100)
size := 0
for _, block := range blocks {
......@@ -180,18 +181,17 @@ func TestBlockfileMgrFileRolling(t *testing.T) {
size += blockBytesSize + len(encodedLen)
}
env.conf.maxBlockfileSize = int(0.75 * float64(size))
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
maxFileSie := int(0.75 * float64(size))
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", maxFileSie))
defer env.Cleanup()
ledgerid := "testLedger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
blkfileMgrWrapper.addBlocks(blocks)
testutil.AssertEquals(t, blkfileMgrWrapper.blockfileMgr.cpInfo.latestFileChunkSuffixNum, 1)
blkfileMgrWrapper.testGetBlockByHash(blocks)
blkfileMgrWrapper.close()
env.Cleanup()
env = newTestEnv(t)
defer env.Cleanup()
env.conf.maxBlockfileSize = int(0.40 * float64(size))
blkfileMgrWrapper = newTestBlockfileWrapper(t, env)
blkfileMgrWrapper = newTestBlockfileWrapper(env, ledgerid)
defer blkfileMgrWrapper.close()
blkfileMgrWrapper.addBlocks(blocks)
testutil.AssertEquals(t, blkfileMgrWrapper.blockfileMgr.cpInfo.latestFileChunkSuffixNum, 2)
......
......@@ -27,9 +27,10 @@ import (
)
func TestBlockFileScanSmallTxOnly(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
ledgerid := "testLedger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
bg := testutil.NewBlockGenerator(t)
blocks := []*common.Block{}
blocks = append(blocks, bg.NextTestBlock(0, 0))
......@@ -38,20 +39,21 @@ func TestBlockFileScanSmallTxOnly(t *testing.T) {
blkfileMgrWrapper.addBlocks(blocks)
blkfileMgrWrapper.close()
filePath := deriveBlockfilePath(env.conf.blockfilesDir, 0)
filePath := deriveBlockfilePath(env.provider.conf.getLedgerBlockDir(ledgerid), 0)
_, fileSize, err := util.FileExists(filePath)
testutil.AssertNoError(t, err, "")
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(env.conf.blockfilesDir, 0, 0)
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, numBlocks, len(blocks))
testutil.AssertEquals(t, endOffsetLastBlock, fileSize)
}
func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
ledgerid := "testLedger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
bg := testutil.NewBlockGenerator(t)
blocks := []*common.Block{}
blocks = append(blocks, bg.NextTestBlock(0, 0))
......@@ -60,7 +62,7 @@ func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) {
blkfileMgrWrapper.addBlocks(blocks)
blkfileMgrWrapper.close()
filePath := deriveBlockfilePath(env.conf.blockfilesDir, 0)
filePath := deriveBlockfilePath(env.provider.conf.getLedgerBlockDir(ledgerid), 0)
_, fileSize, err := util.FileExists(filePath)
testutil.AssertNoError(t, err, "")
......@@ -70,7 +72,7 @@ func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) {
err = file.Truncate(fileSize - 1)
testutil.AssertNoError(t, err, "")
_, numBlocks, err := scanForLastCompleteBlock(env.conf.blockfilesDir, 0, 0)
_, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, numBlocks, len(blocks)-1)
}
......@@ -22,8 +22,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/core/ledger/util/db"
"github.com/syndtr/goleveldb/leveldb"
"github.com/hyperledger/fabric/core/ledger/util/leveldbhelper"
)
const (
......@@ -54,10 +53,10 @@ type blockIdxInfo struct {
type blockIndex struct {
indexItemsMap map[blkstorage.IndexableAttr]bool
db *db.DB
db *leveldbhelper.DBHandle
}
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *db.DB) *blockIndex {
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) *blockIndex {
indexItems := indexConfig.AttrsToIndex
logger.Debugf("newBlockIndex() - indexItems:[%s]", indexItems)
indexItemsMap := make(map[blkstorage.IndexableAttr]bool)
......@@ -85,7 +84,7 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
logger.Debugf("Indexing block [%s]", blockIdxInfo)
flp := blockIdxInfo.flp
txOffsets := blockIdxInfo.txOffsets
batch := &leveldb.Batch{}
batch := leveldbhelper.NewUpdateBatch()
flpBytes, err := flp.marshal()
if err != nil {
return err
......
......@@ -54,9 +54,10 @@ func TestBlockIndexSync(t *testing.T) {
}
func testBlockIndexSync(t *testing.T, numBlocks int, numBlocksToIndex int, syncByRestart bool) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
ledgerid := "testledger"
blkfileMgrWrapper := newTestBlockfileWrapper(env, ledgerid)
defer blkfileMgrWrapper.close()
blkfileMgr := blkfileMgrWrapper.blockfileMgr
origIndex := blkfileMgr.index
......@@ -87,7 +88,7 @@ func testBlockIndexSync(t *testing.T, numBlocks int, numBlocksToIndex int, syncB
// perform index sync
if syncByRestart {
blkfileMgrWrapper.close()
blkfileMgrWrapper = newTestBlockfileWrapper(t, env)
blkfileMgrWrapper = newTestBlockfileWrapper(env, ledgerid)
defer blkfileMgrWrapper.close()
blkfileMgr = blkfileMgrWrapper.blockfileMgr
} else {
......@@ -112,10 +113,9 @@ func TestBlockIndexSelectiveIndexing(t *testing.T) {
}
func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.IndexableAttr) {
env := newTestEnv(t)
env.indexConfig.AttrsToIndex = indexItems
env := newTestEnvSelectiveIndexing(t, NewConf("/tmp/fabric/ledgertests", 0), indexItems)
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testledger")
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 3)
......
......@@ -25,13 +25,13 @@ import (
"github.com/hyperledger/fabric/protos/common"
)
// BlockHolder holds block bytes
type BlockHolder struct {
// blockHolder holds block bytes
type blockHolder struct {
blockBytes []byte
}
// GetBlock serializes Block from block bytes
func (bh *BlockHolder) GetBlock() *common.Block {
func (bh *blockHolder) GetBlock() *common.Block {
block, err := deserializeBlock(bh.blockBytes)
if err != nil {
panic(fmt.Errorf("Problem in deserialzing block: %s", err))
......@@ -40,12 +40,12 @@ func (bh *BlockHolder) GetBlock() *common.Block {
}
// GetBlockBytes returns block bytes
func (bh *BlockHolder) GetBlockBytes() []byte {
func (bh *blockHolder) GetBlockBytes() []byte {
return bh.blockBytes
}
// BlocksItr - an iterator for iterating over a sequence of blocks
type BlocksItr struct {
// blocksItr - an iterator for iterating over a sequence of blocks
type blocksItr struct {
mgr *blockfileMgr
maxBlockNumAvailable uint64
blockNumToRetrieve uint64
......@@ -54,11 +54,11 @@ type BlocksItr struct {
closeMarkerLock *sync.Mutex
}
func newBlockItr(mgr *blockfileMgr, startBlockNum uint64) *BlocksItr {
return &BlocksItr{mgr, mgr.cpInfo.lastBlockNumber, startBlockNum, nil, false, &sync.Mutex{}}
func newBlockItr(mgr *blockfileMgr, startBlockNum uint64) *blocksItr {
return &blocksItr{mgr, mgr.cpInfo.lastBlockNumber, startBlockNum, nil, false, &sync.Mutex{}}
}
func (itr *BlocksItr) waitForBlock(blockNum uint64) uint64 {
func (itr *blocksItr) waitForBlock(blockNum uint64) uint64 {
itr.mgr.cpInfoCond.L.Lock()
defer itr.mgr.cpInfoCond.L.Unlock()
for itr.mgr.cpInfo.lastBlockNumber < blockNum && !itr.shouldClose() {
......@@ -70,7 +70,7 @@ func (itr *BlocksItr) waitForBlock(blockNum uint64) uint64 {
return itr.mgr.cpInfo.lastBlockNumber
}
func (itr *BlocksItr) initStream() error {
func (itr *blocksItr) initStream() error {
var lp *fileLocPointer
var err error
if lp, err = itr.mgr.index.getBlockLocByBlockNum(itr.blockNumToRetrieve); err != nil {
......@@ -82,14 +82,14 @@ func (itr *BlocksItr) initStream() error {
return nil
}
func (itr *BlocksItr) shouldClose() bool {
func (itr *blocksItr) shouldClose() bool {
itr.closeMarkerLock.Lock()
defer itr.closeMarkerLock.Unlock()
return itr.closeMarker
}
// Next moves the cursor to next block and returns true iff the iterator is not exhausted
func (itr *BlocksItr) Next() (ledger.QueryResult, error) {
func (itr *blocksItr) Next() (ledger.QueryResult, error) {
if itr.maxBlockNumAvailable < itr.blockNumToRetrieve {
itr.maxBlockNumAvailable = itr.waitForBlock(itr.blockNumToRetrieve)
}
......@@ -108,11 +108,11 @@ func (itr *BlocksItr) Next() (ledger.QueryResult, error) {
return nil, err
}
itr.blockNumToRetrieve++
return &BlockHolder{nextBlockBytes}, nil
return &blockHolder{nextBlockBytes}, nil
}
// Close releases any resources held by the iterator
func (itr *BlocksItr) Close() {
func (itr *blocksItr) Close() {
itr.closeMarkerLock.Lock()
defer itr.closeMarkerLock.Unlock()
itr.closeMarker = true
......
......@@ -25,9 +25,9 @@ import (
)
func TestBlocksItrBlockingNext(t *testing.T) {
env := newTestEnv(t)
env := newTestEnv(t, NewConf("/tmp/fabric/ledgertests", 0))
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blkfileMgrWrapper := newTestBlockfileWrapper(env, "testLedger")
defer blkfileMgrWrapper.close()
blkfileMgr := blkfileMgrWrapper.blockfileMgr
......@@ -52,12 +52,12 @@ func TestBlocksItrBlockingNext(t *testing.T) {
<-doneChan
}
func testIterateAndVerify(t *testing.T, itr *BlocksItr, blocks []*common.Block, doneChan chan bool) {
func testIterateAndVerify(t *testing.T, itr *blocksItr, blocks []*common.Block, doneChan chan bool) {
blocksIterated := 0
for {
blockHolder, err := itr.Next()
bh, err := itr.Next()
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, blockHolder.(*BlockHolder).GetBlock(), blocks[blocksIterated])
testutil.AssertEquals(t, bh.(*blockHolder).GetBlock(), blocks[blocksIterated])
blocksIterated++
if blocksIterated == len(blocks) {
break
......
......@@ -16,7 +16,7 @@ limitations under the License.
package fsblkstorage
import "strings"
import "path/filepath"
const (
defaultMaxBlockfileSize = 64 * 1024 * 1024
......@@ -24,19 +24,27 @@ const (
// Conf encapsulates all the configurations for `FsBlockStore`
type Conf struct {
blockfilesDir string
dbPath string
blockStorageDir string
maxBlockfileSize int
}
// NewConf constructs new `Conf`.
// filesystemPath is the top level folder under which `FsBlockStore` manages its data
func NewConf(filesystemPath string, maxBlockfileSize int) *Conf {
if !strings.HasSuffix(filesystemPath, "/") {
filesystemPath = filesystemPath + "/"
}
// blockStorageDir is the top level folder under which `FsBlockStore` manages its data
func NewConf(blockStorageDir string, maxBlockfileSize int) *Conf {
if maxBlockfileSize <= 0 {
maxBlockfileSize = defaultMaxBlockfileSize
}
return &Conf{filesystemPath + "blocks", filesystemPath + "db", maxBlockfileSize}
return &Conf{blockStorageDir, maxBlockfileSize}
}
func (conf *Conf) getIndexDir() string {
return filepath.Join(conf.blockStorageDir, "index")
}
func (conf *Conf) getBlocksDir() string {
return filepath.Join(conf.blockStorageDir, "blocks")
}
func (conf *Conf) getLedgerBlockDir(ledgerid string) string {
return filepath.Join(conf.getBlocksDir(), ledgerid)
}
......@@ -19,34 +19,38 @@ package fsblkstorage
import (
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"