Commit 910e496c authored by manish's avatar manish
Browse files

Sync block index with block storage



This commit adds the functionality of checkpointing block index progress
and sync-ing (updating) the index during start of the block storage system

Change-Id: Ib1a325add455bce47e510ccfc7af052db51117e6
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent a069514c
......@@ -34,6 +34,7 @@ var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile")
// blockfileStream reads blocks sequentially from a single file.
// It starts from the given offset and can traverse till the end of the file
type blockfileStream struct {
fileNum int
file *os.File
reader *bufio.Reader
currentOffset int64
......@@ -49,10 +50,19 @@ type blockStream struct {
currentFileStream *blockfileStream
}
// blockPlacementInfo captures the information related
// to block's placement in the file.
type blockPlacementInfo struct {
fileNum int
blockStartOffset int64
blockBytesOffset int64
}
///////////////////////////////////
// blockfileStream functions
////////////////////////////////////
func newBlockfileStream(filePath string, startOffset int64) (*blockfileStream, error) {
func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockfileStream, error) {
filePath := deriveBlockfilePath(rootDir, fileNum)
logger.Debugf("newBlockfileStream(): filePath=[%s], startOffset=[%d]", filePath, startOffset)
var file *os.File
var err error
......@@ -68,41 +78,50 @@ func newBlockfileStream(filePath string, startOffset int64) (*blockfileStream, e
panic(fmt.Sprintf("Could not seek file [%s] to given startOffset [%d]. New position = [%d]",
filePath, startOffset, newPosition))
}
s := &blockfileStream{file, bufio.NewReader(file), startOffset}
s := &blockfileStream{fileNum, file, bufio.NewReader(file), startOffset}
return s, nil
}
func (s *blockfileStream) nextBlockBytes() ([]byte, error) {
blockBytes, _, err := s.nextBlockBytesAndPlacementInfo()
return blockBytes, err
}
func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) {
var lenBytes []byte
var err error
if lenBytes, err = s.reader.Peek(8); err != nil {
// reader.Peek raises io.EOF error if enough bytes not available
if err == io.EOF {
if len(lenBytes) > 0 {
return nil, ErrUnexpectedEndOfBlockfile
return nil, nil, ErrUnexpectedEndOfBlockfile
}
return nil, nil
return nil, nil, nil
}
return nil, err
return nil, nil, err
}
len, n := proto.DecodeVarint(lenBytes)
if n == 0 {
panic(fmt.Errorf("Error in decoding varint bytes"))
}
if _, err = s.reader.Discard(n); err != nil {
return nil, err
return nil, nil, err
}
blockBytes := make([]byte, len)
if _, err = io.ReadAtLeast(s.reader, blockBytes, int(len)); err != nil {
// io.ReadAtLeast raises io.ErrUnexpectedEOF error if it is able to
// read a fewer (non-zero) bytes and io.EOF is encountered
if err == io.ErrUnexpectedEOF {
return nil, ErrUnexpectedEndOfBlockfile
return nil, nil, ErrUnexpectedEndOfBlockfile
}
return nil, err
return nil, nil, err
}
blockPlacementInfo := &blockPlacementInfo{
fileNum: s.fileNum,
blockStartOffset: s.currentOffset,
blockBytesOffset: s.currentOffset + int64(n)}
s.currentOffset += int64(n) + int64(len)
return blockBytes, nil
return blockBytes, blockPlacementInfo, nil
}
func (s *blockfileStream) close() error {
......@@ -113,8 +132,7 @@ func (s *blockfileStream) close() error {
// blockStream functions
////////////////////////////////////
func newBlockStream(rootDir string, startFileNum int, startOffset int64, endFileNum int) (*blockStream, error) {
startFile := deriveBlockfilePath(rootDir, startFileNum)
startFileStream, err := newBlockfileStream(startFile, startOffset)
startFileStream, err := newBlockfileStream(rootDir, startFileNum, startOffset)
if err != nil {
return nil, err
}
......@@ -127,30 +145,35 @@ func (s *blockStream) moveToNextBlockfileStream() error {
return err
}
s.currentFileNum++
nextFile := deriveBlockfilePath(s.rootDir, s.currentFileNum)
if s.currentFileStream, err = newBlockfileStream(nextFile, 0); err != nil {
if s.currentFileStream, err = newBlockfileStream(s.rootDir, s.currentFileNum, 0); err != nil {
return err
}
return nil
}
func (s *blockStream) nextBlockBytes() ([]byte, error) {
blockBytes, _, err := s.nextBlockBytesAndPlacementInfo()
return blockBytes, err
}
func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) {
var blockBytes []byte
var blockPlacementInfo *blockPlacementInfo
var err error
if blockBytes, err = s.currentFileStream.nextBlockBytes(); err != nil {
if blockBytes, blockPlacementInfo, err = s.currentFileStream.nextBlockBytesAndPlacementInfo(); err != nil {
logger.Debugf("current file [%d]", s.currentFileNum)
logger.Debugf("blockbytes [%d]. Err:%s", len(blockBytes), err)
return nil, err
return nil, nil, err
}
logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum)
if blockBytes == nil && s.currentFileNum < s.endFileNum {
logger.Debugf("current file [%d] exhausted. Moving to next file", s.currentFileNum)
if err = s.moveToNextBlockfileStream(); err != nil {
return nil, err
return nil, nil, err
}
return s.nextBlockBytes()
return s.nextBlockBytesAndPlacementInfo()
}
return blockBytes, nil
return blockBytes, blockPlacementInfo, nil
}
func (s *blockStream) close() error {
......
......@@ -39,7 +39,7 @@ func testBlockfileStream(t *testing.T, numBlocks int) {
w.addBlocks(blocks)
w.close()
s, err := newBlockfileStream(deriveBlockfilePath(blockfileMgr.rootDir, 0), 0)
s, err := newBlockfileStream(blockfileMgr.rootDir, 0, 0)
defer s.close()
testutil.AssertNoError(t, err, "Error in constructing blockfile stream")
......@@ -80,7 +80,7 @@ func testBlockFileStreamUnexpectedEOF(t *testing.T, numBlocks int, partialBlockB
w.addBlocks(blocks)
blockfileMgr.currentFileWriter.append(partialBlockBytes, true)
w.close()
s, err := newBlockfileStream(deriveBlockfilePath(blockfileMgr.rootDir, 0), 0)
s, err := newBlockfileStream(blockfileMgr.rootDir, 0, 0)
defer s.close()
testutil.AssertNoError(t, err, "Error in constructing blockfile stream")
......
......@@ -44,7 +44,7 @@ type blockfileMgr struct {
conf *Conf
db *db.DB
defaultCF *gorocksdb.ColumnFamilyHandle
index *blockIndex
index index
cpInfo *checkpointInfo
currentFileWriter *blockfileWriter
bcInfo atomic.Value
......@@ -79,9 +79,11 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr {
panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err))
}
mgr.index = newBlockIndex(db)
mgr.index = newBlockIndex(db, db.GetCFHandle(blockIndexCF))
mgr.cpInfo = cpInfo
mgr.currentFileWriter = currentFileWriter
mgr.syncIndex()
// init BlockchainInfo
bcInfo := &protos.BlockchainInfo{
Height: 0,
......@@ -130,7 +132,8 @@ func updateCPInfo(conf *Conf, cpInfo *checkpointInfo) {
// check point info is in sync with the file on disk
return
}
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(filePath, int64(cpInfo.latestFileChunksize))
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(
rootDir, cpInfo.latestFileChunkSuffixNum, int64(cpInfo.latestFileChunksize))
if err != nil {
panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err))
}
......@@ -217,11 +220,73 @@ func (mgr *blockfileMgr) addBlock(block *protos.Block2) error {
}
blockFLP := &fileLocPointer{fileSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum}
blockFLP.offset = currentOffset
mgr.index.indexBlock(mgr.cpInfo.lastBlockNumber, blockHash, blockFLP, blockBytesLen, len(blockBytesEncodedLen), txOffsets)
// shift the txoffset because we prepend length of bytes before block bytes
for i := 0; i < len(txOffsets); i++ {
txOffsets[i] += len(blockBytesEncodedLen)
}
mgr.index.indexBlock(&blockIdxInfo{
blockNum: mgr.cpInfo.lastBlockNumber, blockHash: blockHash,
flp: blockFLP, txOffsets: txOffsets})
mgr.updateBlockchainInfo(blockHash, block)
return nil
}
func (mgr *blockfileMgr) syncIndex() error {
var lastBlockIndexed uint64
var err error
if lastBlockIndexed, err = mgr.index.getLastBlockIndexed(); err != nil {
return err
}
startFileNum := 0
startOffset := 0
blockNum := uint64(1)
endFileNum := mgr.cpInfo.latestFileChunkSuffixNum
if lastBlockIndexed != 0 {
var flp *fileLocPointer
if flp, err = mgr.index.getBlockLocByBlockNum(lastBlockIndexed); err != nil {
return err
}
startFileNum = flp.fileSuffixNum
startOffset = flp.locPointer.offset
blockNum = lastBlockIndexed
}
var stream *blockStream
if stream, err = newBlockStream(mgr.rootDir, startFileNum, int64(startOffset), endFileNum); err != nil {
return err
}
var blockBytes []byte
var blockPlacementInfo *blockPlacementInfo
for {
if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
return err
}
if blockBytes == nil {
break
}
serBlock2 := protos.NewSerBlock2(blockBytes)
var txOffsets []int
if txOffsets, err = serBlock2.GetTxOffsets(); err != nil {
return err
}
for i := 0; i < len(txOffsets); i++ {
txOffsets[i] += int(blockPlacementInfo.blockBytesOffset)
}
blockIdxInfo := &blockIdxInfo{}
blockIdxInfo.blockHash = serBlock2.ComputeHash()
blockIdxInfo.blockNum = blockNum
blockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum,
locPointer: locPointer{offset: int(blockPlacementInfo.blockStartOffset)}}
blockIdxInfo.txOffsets = txOffsets
if err = mgr.index.indexBlock(blockIdxInfo); err != nil {
return err
}
blockNum++
}
return nil
}
func (mgr *blockfileMgr) getBlockchainInfo() *protos.BlockchainInfo {
return mgr.bcInfo.Load().(*protos.BlockchainInfo)
}
......@@ -320,8 +385,7 @@ func (mgr *blockfileMgr) fetchTransaction(lp *fileLocPointer) (*protos.Transacti
}
func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) {
filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)
stream, err := newBlockfileStream(filePath, int64(lp.offset))
stream, err := newBlockfileStream(mgr.rootDir, lp.fileSuffixNum, int64(lp.offset))
if err != nil {
return nil, err
}
......@@ -378,10 +442,9 @@ func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, flush bool) error {
return nil
}
func scanForLastCompleteBlock(filePath string, startingOffset int64) (int64, int, error) {
logger.Debugf("scanForLastCompleteBlock(): filePath=[%s], startingOffset=[%d]", filePath, startingOffset)
func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) (int64, int, error) {
numBlocks := 0
blockStream, err := newBlockfileStream(filePath, startingOffset)
blockStream, err := newBlockfileStream(rootDir, fileNum, startingOffset)
if err != nil {
return 0, 0, err
}
......
......@@ -17,6 +17,7 @@ limitations under the License.
package fsblkstorage
import (
"errors"
"fmt"
"github.com/golang/protobuf/proto"
......@@ -25,37 +26,75 @@ import (
"github.com/tecbot/gorocksdb"
)
const (
blockNumIdxKeyPrefix = 'n'
blockHashIdxKeyPrefix = 'h'
txIDIdxKeyPrefix = 't'
indexCheckpointKeyStr = "indexCheckpointKey"
)
var indexCheckpointKey = []byte(indexCheckpointKeyStr)
type index interface {
getLastBlockIndexed() (uint64, error)
indexBlock(blockIdxInfo *blockIdxInfo) error
getBlockLocByHash(blockHash []byte) (*fileLocPointer, error)
getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error)
getTxLoc(txID string) (*fileLocPointer, error)
}
type blockIdxInfo struct {
blockNum uint64
blockHash []byte
flp *fileLocPointer
txOffsets []int
}
// ErrNotFoundInIndex is used to indicate missing entry in the index
var ErrNotFoundInIndex = errors.New("Entry not found in index")
type blockIndex struct {
db *db.DB
blockIndexCF *gorocksdb.ColumnFamilyHandle
}
func newBlockIndex(db *db.DB) *blockIndex {
//TODO during init make sure that the index is in sync with block strorage
return &blockIndex{db, db.GetCFHandle(blockIndexCF)}
func newBlockIndex(db *db.DB, indexCFHandle *gorocksdb.ColumnFamilyHandle) *blockIndex {
return &blockIndex{db, indexCFHandle}
}
func (index *blockIndex) getLastBlockIndexed() (uint64, error) {
var blockNumBytes []byte
var err error
if blockNumBytes, err = index.db.Get(index.blockIndexCF, indexCheckpointKey); err != nil {
return 0, nil
}
return decodeBlockNum(blockNumBytes), nil
}
func (index *blockIndex) indexBlock(blockNum uint64, blockHash []byte, flp *fileLocPointer, blockLen int, skip int, txOffsets []int) error {
logger.Debugf("Adding blockLoc [%s] to index", flp)
func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
logger.Debugf("Indexing block [%s]", blockIdxInfo)
flp := blockIdxInfo.flp
txOffsets := blockIdxInfo.txOffsets
batch := gorocksdb.NewWriteBatch()
defer batch.Destroy()
flpBytes, err := flp.marshal()
if err != nil {
return err
}
batch.PutCF(index.blockIndexCF, index.constructBlockHashKey(blockHash), flpBytes)
batch.PutCF(index.blockIndexCF, index.constructBlockNumKey(blockNum), flpBytes)
batch.PutCF(index.blockIndexCF, constructBlockHashKey(blockIdxInfo.blockHash), flpBytes)
batch.PutCF(index.blockIndexCF, constructBlockNumKey(blockIdxInfo.blockNum), flpBytes)
for i := 0; i < len(txOffsets)-1; i++ {
txID := constructTxID(blockNum, i)
txID := constructTxID(blockIdxInfo.blockNum, i)
txBytesLength := txOffsets[i+1] - txOffsets[i]
txFLP := newFileLocationPointer(flp.fileSuffixNum, flp.offset+skip, &locPointer{txOffsets[i], txBytesLength})
logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFLP, txID)
txFLPBytes, marshalErr := txFLP.marshal()
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, &locPointer{txOffsets[i], txBytesLength})
logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFlp, txID)
txFlpBytes, marshalErr := txFlp.marshal()
if marshalErr != nil {
return marshalErr
}
batch.PutCF(index.blockIndexCF, index.constructTxIDKey(txID), txFLPBytes)
batch.PutCF(index.blockIndexCF, constructTxIDKey(txID), txFlpBytes)
}
batch.PutCF(index.blockIndexCF, indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
if err := index.db.WriteBatch(batch); err != nil {
return err
}
......@@ -63,52 +102,70 @@ func (index *blockIndex) indexBlock(blockNum uint64, blockHash []byte, flp *file
}
func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) {
b, err := index.db.Get(index.blockIndexCF, index.constructBlockHashKey(blockHash))
b, err := index.db.Get(index.blockIndexCF, constructBlockHashKey(blockHash))
if err != nil {
return nil, err
}
if b == nil {
return nil, ErrNotFoundInIndex
}
blkLoc := &fileLocPointer{}
blkLoc.unmarshal(b)
return blkLoc, nil
}
func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) {
b, err := index.db.Get(index.blockIndexCF, index.constructBlockNumKey(blockNum))
b, err := index.db.Get(index.blockIndexCF, constructBlockNumKey(blockNum))
if err != nil {
return nil, err
}
if b == nil {
return nil, ErrNotFoundInIndex
}
blkLoc := &fileLocPointer{}
blkLoc.unmarshal(b)
return blkLoc, nil
}
func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) {
b, err := index.db.Get(index.blockIndexCF, index.constructTxIDKey(txID))
b, err := index.db.Get(index.blockIndexCF, constructTxIDKey(txID))
if err != nil {
return nil, err
}
if b == nil {
return nil, ErrNotFoundInIndex
}
txFLP := &fileLocPointer{}
txFLP.unmarshal(b)
return txFLP, nil
}
func (index *blockIndex) constructBlockNumKey(blockNum uint64) []byte {
func constructBlockNumKey(blockNum uint64) []byte {
blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
return append([]byte{'n'}, blkNumBytes...)
return append([]byte{blockNumIdxKeyPrefix}, blkNumBytes...)
}
func (index *blockIndex) constructBlockHashKey(blockHash []byte) []byte {
return append([]byte{'b'}, blockHash...)
func constructBlockHashKey(blockHash []byte) []byte {
return append([]byte{blockHashIdxKeyPrefix}, blockHash...)
}
func (index *blockIndex) constructTxIDKey(txID string) []byte {
return append([]byte{'t'}, []byte(txID)...)
func constructTxIDKey(txID string) []byte {
return append([]byte{txIDIdxKeyPrefix}, []byte(txID)...)
}
func constructTxID(blockNum uint64, txNum int) string {
return fmt.Sprintf("%d:%d", blockNum, txNum)
}
func encodeBlockNum(blockNum uint64) []byte {
return proto.EncodeVarint(blockNum)
}
func decodeBlockNum(blockNumBytes []byte) uint64 {
blockNum, _ := proto.DecodeVarint(blockNumBytes)
return blockNum
}
type locPointer struct {
offset int
bytesLength int
......@@ -173,3 +230,7 @@ func (flp *fileLocPointer) unmarshal(b []byte) error {
func (flp *fileLocPointer) String() string {
return fmt.Sprintf("fileSuffixNum=%d, %s", flp.fileSuffixNum, flp.locPointer.String())
}
func (blockIdxInfo *blockIdxInfo) String() string {
return fmt.Sprintf("blockNum=%d, blockHash=%#v", blockIdxInfo.blockNum, blockIdxInfo.blockHash)
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fsblkstorage
import (
"fmt"
"testing"
"github.com/hyperledger/fabric/core/ledgernext/testutil"
)
type noopIndex struct {
}
func (i *noopIndex) getLastBlockIndexed() (uint64, error) {
return 0, nil
}
func (i *noopIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
return nil
}
func (i *noopIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) {
return nil, nil
}
func (i *noopIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) {
return nil, nil
}
func (i *noopIndex) getTxLoc(txID string) (*fileLocPointer, error) {
return nil, nil
}
func TestBlockIndexSync(t *testing.T) {
testBlockIndexSync(t, 10, 5, false)
testBlockIndexSync(t, 10, 5, true)
testBlockIndexSync(t, 10, 0, true)
testBlockIndexSync(t, 10, 10, true)
}
func testBlockIndexSync(t *testing.T, numBlocks int, numBlocksToIndex int, syncByRestart bool) {
env := newTestEnv(t)
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
defer blkfileMgrWrapper.close()
blkfileMgr := blkfileMgrWrapper.blockfileMgr
origIndex := blkfileMgr.index
// construct blocks for testing
blocks := testutil.ConstructTestBlocks(t, numBlocks)
// add a few blocks
blkfileMgrWrapper.addBlocks(blocks[:numBlocksToIndex])
// Plug-in a noop index and add remaining blocks
blkfileMgr.index = &noopIndex{}
blkfileMgrWrapper.addBlocks(blocks[numBlocksToIndex:])
// Plug-in back the original index
blkfileMgr.index = origIndex
// The first set of blocks should be present in the orginal index
for i := 1; i <= numBlocksToIndex; i++ {
block, err := blkfileMgr.retrieveBlockByNumber(uint64(i))
testutil.AssertNoError(t, err, fmt.Sprintf("block [%d] should have been present in the index", i))
testutil.AssertEquals(t, block, blocks[i-1])
}
// The last set of blocks should not be present in the original index
for i := numBlocksToIndex + 1; i <= numBlocks; i++ {
_, err := blkfileMgr.retrieveBlockByNumber(uint64(i))
testutil.AssertSame(t, err, ErrNotFoundInIndex)
}
// perform index sync
if syncByRestart {
blkfileMgrWrapper.close()
blkfileMgrWrapper = newTestBlockfileWrapper(t, env)
defer blkfileMgrWrapper.close()
blkfileMgr = blkfileMgrWrapper.blockfileMgr
} else {
blkfileMgr.syncIndex()
}
// Now, last set of blocks should also be present in original index
for i := numBlocksToIndex + 1; i <= numBlocks; i++ {
block, err := blkfileMgr.retrieveBlockByNumber(uint64(i))
testutil.AssertNoError(t, err, fmt.Sprintf("block [%d] should have been present in the index", i))
testutil.AssertEquals(t, block, blocks[i-1])
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment