Commit 1daabff7 authored by manish's avatar manish
Browse files

[FAB-6779] Allow rebuilding block storage indexes



This CR allows building of block storage indexes.
For rebuilding the indexes, existing index folder would need to be dropped.
However, please note that this would drop (and rebuild) the indexes for all
the chains because they share the underlying leveldb.

Also, enabled the flush/synch of batch writting to leveldb (statedb, block indexes, and historydb).

Change-Id: I6a926ab765df4bbb6543d6a3960359d95d60fd68
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent c8efd6a9
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package fsblkstorage
import (
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"
"github.com/davecgh/go-spew/spew"
"github.com/hyperledger/fabric/protos/common"
)
// constructCheckpointInfoFromBlockFiles scans the last blockfile (if any) and construct the checkpoint info
// if the last file contains no block or only a partially written block (potentially because of a crash while writing block to the file),
// this scans the second last file (if any)
func constructCheckpointInfoFromBlockFiles(rootDir string) (*checkpointInfo, error) {
logger.Debugf("Retrieving checkpoint info from block files")
var lastFileNum int
var numBlocksInFile int
var endOffsetLastBlock int64
var lastBlockNumber uint64
var lastBlockBytes []byte
var lastBlock *common.Block
var err error
if lastFileNum, err = retrieveLastFileSuffix(rootDir); err != nil {
return nil, err
}
logger.Debugf("Last file number found = %d", lastFileNum)
if lastFileNum == -1 {
cpInfo := &checkpointInfo{0, 0, true, 0}
logger.Info("No block file found")
return cpInfo, nil
}
fileInfo := getFileInfoOrPanic(rootDir, lastFileNum)
logger.Infof("Last Block file info: FileName=[%s], FileSize=[%d]", fileInfo.Name(), fileInfo.Size())
if lastBlockBytes, endOffsetLastBlock, numBlocksInFile, err = scanForLastCompleteBlock(rootDir, lastFileNum, 0); err != nil {
logger.Errorf("Error while scanning last file [file num=%d]: %s", lastFileNum, err)
return nil, err
}
if numBlocksInFile == 0 && lastFileNum > 0 {
secondLastFileNum := lastFileNum - 1
fileInfo := getFileInfoOrPanic(rootDir, secondLastFileNum)
logger.Infof("Second last Block file info: FileName=[%s], FileSize=[%d]", fileInfo.Name(), fileInfo.Size())
if lastBlockBytes, _, _, err = scanForLastCompleteBlock(rootDir, secondLastFileNum, 0); err != nil {
logger.Errorf("Error while scanning second last file [file num=%d]: %s", secondLastFileNum, err)
return nil, err
}
}
if lastBlockBytes != nil {
if lastBlock, err = deserializeBlock(lastBlockBytes); err != nil {
logger.Errorf("Error deserializing last block: %s. Block bytes length = %d", err, len(lastBlockBytes))
return nil, err
}
lastBlockNumber = lastBlock.Header.Number
}
cpInfo := &checkpointInfo{
lastBlockNumber: lastBlockNumber,
latestFileChunksize: int(endOffsetLastBlock),
latestFileChunkSuffixNum: lastFileNum,
isChainEmpty: lastFileNum == 0 && numBlocksInFile == 0,
}
logger.Debugf("Checkpoint info constructed from file system = %s", spew.Sdump(cpInfo))
return cpInfo, nil
}
func retrieveLastFileSuffix(rootDir string) (int, error) {
logger.Debugf("retrieveLastFileSuffix()")
biggestFileNum := -1
filesInfo, err := ioutil.ReadDir(rootDir)
if err != nil {
return -1, err
}
for _, fileInfo := range filesInfo {
name := fileInfo.Name()
if fileInfo.IsDir() || !isBlockFileName(name) {
logger.Debugf("Skipping File name = %s", name)
continue
}
fileSuffix := strings.TrimPrefix(name, blockfilePrefix)
fileNum, err := strconv.Atoi(fileSuffix)
if err != nil {
return -1, err
}
if fileNum > biggestFileNum {
biggestFileNum = fileNum
}
}
logger.Debugf("retrieveLastFileSuffix() - biggestFileNum = %d", biggestFileNum)
return biggestFileNum, err
}
func isBlockFileName(name string) bool {
return strings.HasPrefix(name, blockfilePrefix)
}
func getFileInfoOrPanic(rootDir string, fileNum int) os.FileInfo {
filePath := deriveBlockfilePath(rootDir, fileNum)
fileInfo, err := os.Lstat(filePath)
if err != nil {
panic(fmt.Errorf("Error in retrieving file info for file num = %d", fileNum))
}
return fileInfo
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package fsblkstorage
import (
"os"
"testing"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/common/ledger/util"
)
func TestConstructCheckpointInfoFromBlockFiles(t *testing.T) {
testPath := "/tmp/tests/fabric/common/ledger/blkstorage/fsblkstorage"
ledgerid := "testLedger"
conf := NewConf(testPath, 0)
blkStoreDir := conf.getLedgerBlockDir(ledgerid)
env := newTestEnv(t, conf)
util.CreateDirIfMissing(blkStoreDir)
defer env.Cleanup()
// checkpoint constructed on an empty block folder should return CPInfo with isChainEmpty: true
cpInfo, err := constructCheckpointInfoFromBlockFiles(blkStoreDir)
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, cpInfo, &checkpointInfo{isChainEmpty: true, lastBlockNumber: 0, latestFileChunksize: 0, latestFileChunkSuffixNum: 0})
w := newTestBlockfileWrapper(env, ledgerid)
defer w.close()
blockfileMgr := w.blockfileMgr
bg, gb := testutil.NewBlockGenerator(t, ledgerid, false)
// Add a few blocks and verify that cpinfo derived from filesystem should be same as from the blockfile manager
blockfileMgr.addBlock(gb)
for _, blk := range bg.NextTestBlocks(3) {
blockfileMgr.addBlock(blk)
}
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)
// Move the chain to new file and check cpinfo derived from file system
blockfileMgr.moveToNextFile()
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)
// Add a few blocks that would go to new file and verify that cpinfo derived from filesystem should be same as from the blockfile manager
for _, blk := range bg.NextTestBlocks(3) {
blockfileMgr.addBlock(blk)
}
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)
// Write a partial block (to simulate a crash) and verify that cpinfo derived from filesystem should be same as from the blockfile manager
lastTestBlk := bg.NextTestBlocks(1)[0]
blockBytes, _, err := serializeBlock(lastTestBlk)
testutil.AssertNoError(t, err, "")
partialByte := append(proto.EncodeVarint(uint64(len(blockBytes))), blockBytes[len(blockBytes)/2:]...)
blockfileMgr.currentFileWriter.append(partialByte, true)
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)
// Close the block storage, drop the index and restart and verify
cpInfoBeforeClose := blockfileMgr.cpInfo
w.close()
env.provider.Close()
indexFolder := conf.getIndexDir()
testutil.AssertNoError(t, os.RemoveAll(indexFolder), "")
env = newTestEnv(t, conf)
w = newTestBlockfileWrapper(env, ledgerid)
blockfileMgr = w.blockfileMgr
testutil.AssertEquals(t, blockfileMgr.cpInfo, cpInfoBeforeClose)
lastBlkIndexed, err := blockfileMgr.index.getLastBlockIndexed()
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, lastBlkIndexed, uint64(6))
// Add the last block again after start and check cpinfo again
testutil.AssertNoError(t, blockfileMgr.addBlock(lastTestBlk), "")
checkCPInfoFromFile(t, blkStoreDir, blockfileMgr.cpInfo)
}
func checkCPInfoFromFile(t *testing.T, blkStoreDir string, expectedCPInfo *checkpointInfo) {
cpInfo, err := constructCheckpointInfoFromBlockFiles(blkStoreDir)
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, cpInfo, expectedCPInfo)
}
...@@ -22,6 +22,8 @@ import ( ...@@ -22,6 +22,8 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/davecgh/go-spew/spew"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blkstorage" "github.com/hyperledger/fabric/common/ledger/blkstorage"
...@@ -112,16 +114,23 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, ...@@ -112,16 +114,23 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
if err != nil { if err != nil {
panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err)) panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err))
} }
if cpInfo == nil { //if no cpInfo stored in db initiate to zero if cpInfo == nil {
cpInfo = &checkpointInfo{0, 0, true, 0} logger.Info(`No info about blocks file found in the db.
This could happen if this is the first time the ledger is constructed or the index is dropped.
Scanning blocks dir for the latest info`)
if cpInfo, err = constructCheckpointInfoFromBlockFiles(rootDir); err != nil {
panic(fmt.Sprintf("Could not build checkpoint info from block files: %s", err))
}
logger.Infof("Info constructed by scanning the blocks dir = %s", spew.Sdump(cpInfo))
} else {
logger.Info(`Synching the info about block files`)
syncCPInfoFromFS(rootDir, cpInfo)
}
err = mgr.saveCurrentInfo(cpInfo, true) err = mgr.saveCurrentInfo(cpInfo, true)
if err != nil { if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err)) panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
} }
}
//Verify that the checkpoint stored in db is accurate with what is actually stored in block file system
// If not the same, sync the cpInfo and the file system
syncCPInfoFromFS(rootDir, cpInfo)
//Open a writer to the file identified by the number and truncate it to only contain the latest block //Open a writer to the file identified by the number and truncate it to only contain the latest block
// that was completely saved (file system, index, cpinfo, etc) // that was completely saved (file system, index, cpinfo, etc)
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)) currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
...@@ -193,7 +202,7 @@ func syncCPInfoFromFS(rootDir string, cpInfo *checkpointInfo) { ...@@ -193,7 +202,7 @@ func syncCPInfoFromFS(rootDir string, cpInfo *checkpointInfo) {
return return
} }
//Scan the file system to verify that the checkpoint info stored in db is correct //Scan the file system to verify that the checkpoint info stored in db is correct
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock( _, endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(
rootDir, cpInfo.latestFileChunkSuffixNum, int64(cpInfo.latestFileChunksize)) rootDir, cpInfo.latestFileChunkSuffixNum, int64(cpInfo.latestFileChunksize))
if err != nil { if err != nil {
panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err)) panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err))
...@@ -325,25 +334,36 @@ func (mgr *blockfileMgr) syncIndex() error { ...@@ -325,25 +334,36 @@ func (mgr *blockfileMgr) syncIndex() error {
} }
indexEmpty = true indexEmpty = true
} }
//initialize index to file number:zero, offset:zero and blockNum:0 //initialize index to file number:zero, offset:zero and blockNum:0
startFileNum := 0 startFileNum := 0
startOffset := 0 startOffset := 0
blockNum := uint64(0)
skipFirstBlock := false skipFirstBlock := false
//get the last file that blocks were added to using the checkpoint info //get the last file that blocks were added to using the checkpoint info
endFileNum := mgr.cpInfo.latestFileChunkSuffixNum endFileNum := mgr.cpInfo.latestFileChunkSuffixNum
startingBlockNum := uint64(0)
//if the index stored in the db has value, update the index information with those values //if the index stored in the db has value, update the index information with those values
if !indexEmpty { if !indexEmpty {
if lastBlockIndexed == mgr.cpInfo.lastBlockNumber {
logger.Infof("Both the block files and indices are in sync.")
return nil
}
logger.Infof("Last block indexed [%d], Last block present in block files=[%d]", lastBlockIndexed, mgr.cpInfo.lastBlockNumber)
var flp *fileLocPointer var flp *fileLocPointer
if flp, err = mgr.index.getBlockLocByBlockNum(lastBlockIndexed); err != nil { if flp, err = mgr.index.getBlockLocByBlockNum(lastBlockIndexed); err != nil {
return err return err
} }
startFileNum = flp.fileSuffixNum startFileNum = flp.fileSuffixNum
startOffset = flp.locPointer.offset startOffset = flp.locPointer.offset
blockNum = lastBlockIndexed
skipFirstBlock = true skipFirstBlock = true
startingBlockNum = lastBlockIndexed + 1
} else {
logger.Infof("No block indexed, Last block present in block files=[%d]", mgr.cpInfo.lastBlockNumber)
} }
logger.Infof("Start building index from block [%d]", startingBlockNum)
//open a blockstream to the file location that was stored in the index //open a blockstream to the file location that was stored in the index
var stream *blockStream var stream *blockStream
if stream, err = newBlockStream(mgr.rootDir, startFileNum, int64(startOffset), endFileNum); err != nil { if stream, err = newBlockStream(mgr.rootDir, startFileNum, int64(startOffset), endFileNum); err != nil {
...@@ -365,6 +385,7 @@ func (mgr *blockfileMgr) syncIndex() error { ...@@ -365,6 +385,7 @@ func (mgr *blockfileMgr) syncIndex() error {
//Should be at the last block already, but go ahead and loop looking for next blockBytes. //Should be at the last block already, but go ahead and loop looking for next blockBytes.
//If there is another block, add it to the index. //If there is another block, add it to the index.
//This will ensure block indexes are correct, for example if peer had crashed before indexes got updated. //This will ensure block indexes are correct, for example if peer had crashed before indexes got updated.
blockIdxInfo := &blockIdxInfo{}
for { for {
if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil { if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
return err return err
...@@ -385,7 +406,6 @@ func (mgr *blockfileMgr) syncIndex() error { ...@@ -385,7 +406,6 @@ func (mgr *blockfileMgr) syncIndex() error {
} }
//Update the blockIndexInfo with what was actually stored in file system //Update the blockIndexInfo with what was actually stored in file system
blockIdxInfo := &blockIdxInfo{}
blockIdxInfo.blockHash = info.blockHeader.Hash() blockIdxInfo.blockHash = info.blockHeader.Hash()
blockIdxInfo.blockNum = info.blockHeader.Number blockIdxInfo.blockNum = info.blockHeader.Number
blockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum, blockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum,
...@@ -397,8 +417,11 @@ func (mgr *blockfileMgr) syncIndex() error { ...@@ -397,8 +417,11 @@ func (mgr *blockfileMgr) syncIndex() error {
if err = mgr.index.indexBlock(blockIdxInfo); err != nil { if err = mgr.index.indexBlock(blockIdxInfo); err != nil {
return err return err
} }
blockNum++ if blockIdxInfo.blockNum%10000 == 0 {
logger.Infof("Indexed block number [%d]", blockIdxInfo.blockNum)
}
} }
logger.Infof("Finished building index. Last block indexed [%d]", blockIdxInfo.blockNum)
return nil return nil
} }
...@@ -581,12 +604,13 @@ func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, sync bool) error { ...@@ -581,12 +604,13 @@ func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, sync bool) error {
// scanForLastCompleteBlock scan a given block file and detects the last offset in the file // scanForLastCompleteBlock scan a given block file and detects the last offset in the file
// after which there may lie a block partially written (towards the end of the file in a crash scenario). // after which there may lie a block partially written (towards the end of the file in a crash scenario).
func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) (int64, int, error) { func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) ([]byte, int64, int, error) {
//scan the passed file number suffix starting from the passed offset to find the last completed block //scan the passed file number suffix starting from the passed offset to find the last completed block
numBlocks := 0 numBlocks := 0
var lastBlockBytes []byte
blockStream, errOpen := newBlockfileStream(rootDir, fileNum, startingOffset) blockStream, errOpen := newBlockfileStream(rootDir, fileNum, startingOffset)
if errOpen != nil { if errOpen != nil {
return 0, 0, errOpen return nil, 0, 0, errOpen
} }
defer blockStream.close() defer blockStream.close()
var errRead error var errRead error
...@@ -596,6 +620,7 @@ func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) ...@@ -596,6 +620,7 @@ func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64)
if blockBytes == nil || errRead != nil { if blockBytes == nil || errRead != nil {
break break
} }
lastBlockBytes = blockBytes
numBlocks++ numBlocks++
} }
if errRead == ErrUnexpectedEndOfBlockfile { if errRead == ErrUnexpectedEndOfBlockfile {
...@@ -605,7 +630,7 @@ func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) ...@@ -605,7 +630,7 @@ func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64)
errRead = nil errRead = nil
} }
logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentOffset) logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentOffset)
return blockStream.currentOffset, numBlocks, errRead return lastBlockBytes, blockStream.currentOffset, numBlocks, errRead
} }
// checkpointInfo // checkpointInfo
......
...@@ -43,10 +43,13 @@ func TestBlockFileScanSmallTxOnly(t *testing.T) { ...@@ -43,10 +43,13 @@ func TestBlockFileScanSmallTxOnly(t *testing.T) {
_, fileSize, err := util.FileExists(filePath) _, fileSize, err := util.FileExists(filePath)
testutil.AssertNoError(t, err, "") testutil.AssertNoError(t, err, "")
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0) lastBlockBytes, endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
testutil.AssertNoError(t, err, "") testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, numBlocks, len(blocks)) testutil.AssertEquals(t, numBlocks, len(blocks))
testutil.AssertEquals(t, endOffsetLastBlock, fileSize) testutil.AssertEquals(t, endOffsetLastBlock, fileSize)
expectedLastBlockBytes, _, err := serializeBlock(blocks[len(blocks)-1])
testutil.AssertEquals(t, lastBlockBytes, expectedLastBlockBytes)
} }
func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) { func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) {
...@@ -72,7 +75,10 @@ func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) { ...@@ -72,7 +75,10 @@ func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) {
err = file.Truncate(fileSize - 1) err = file.Truncate(fileSize - 1)
testutil.AssertNoError(t, err, "") testutil.AssertNoError(t, err, "")
_, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0) lastBlockBytes, _, numBlocks, err := scanForLastCompleteBlock(env.provider.conf.getLedgerBlockDir(ledgerid), 0, 0)
testutil.AssertNoError(t, err, "") testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, numBlocks, len(blocks)-1) testutil.AssertEquals(t, numBlocks, len(blocks)-1)
expectedLastBlockBytes, _, err := serializeBlock(blocks[len(blocks)-2])
testutil.AssertEquals(t, lastBlockBytes, expectedLastBlockBytes)
} }
...@@ -156,7 +156,8 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error { ...@@ -156,7 +156,8 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
} }
batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum)) batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
if err := index.db.WriteBatch(batch, false); err != nil { // Setting snyc to true as a precaution, false may be an ok optimization after further testing.
if err := index.db.WriteBatch(batch, true); err != nil {
return err return err
} }
return nil return nil
......
...@@ -169,7 +169,8 @@ func (historyDB *historyDB) Commit(block *common.Block) error { ...@@ -169,7 +169,8 @@ func (historyDB *historyDB) Commit(block *common.Block) error {
dbBatch.Put(savePointKey, height.ToBytes()) dbBatch.Put(savePointKey, height.ToBytes())
// write the block's history records and savepoint to LevelDB // write the block's history records and savepoint to LevelDB
if err := historyDB.db.WriteBatch(dbBatch, false); err != nil { // Setting snyc to true as a precaution, false may be an ok optimization after further testing.
if err := historyDB.db.WriteBatch(dbBatch, true); err != nil {
return err return err
} }
......
...@@ -154,7 +154,8 @@ func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version ...@@ -154,7 +154,8 @@ func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version
} }
} }
dbBatch.Put(savePointKey, height.ToBytes()) dbBatch.Put(savePointKey, height.ToBytes())
if err := vdb.db.WriteBatch(dbBatch, false); err != nil { // Setting snyc to true as a precaution, false may be an ok optimization after further testing.
if err := vdb.db.WriteBatch(dbBatch, true); err != nil {
return err return err
} }
return nil return nil
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment