Commit 0df6a8d1 authored by manish's avatar manish
Browse files

Disable WAL for block storage DB



(Rocks) DB WAL adds overheads while using the DB for saving checkpoints
for block storage. Avoiding writing to WAL translates the write workload
(appending blocks to the blockfile) into a sequential disk writes.

This commit changes the way checkpoints are saved - checkpoints are
written to DB as before, however since WAL is disabled, the checkpoint
stays in-memory only. The in-memory checkpoint is flushed explicitly to
disk (via DB flush) at the time of new block file creation. The in-memory
checkpoint would implicitly also be flushed to memory at the time of DB
shutdown. However, if a crash takes place, the checkpoint available in the
DB would be behind the actual block file status. In order to handle this
crash scenario, this commit also adds code to perform a scan of the block
file beyond the last saved checkpoint and update the checkpoint.

Change-Id: Ie88646b225abaa50b595833f5e7ed8d4facae920
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent 9ec48736
...@@ -64,11 +64,12 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr { ...@@ -64,11 +64,12 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr {
} }
if cpInfo == nil { if cpInfo == nil {
cpInfo = &checkpointInfo{latestFileChunkSuffixNum: 0, latestFileChunksize: 0} cpInfo = &checkpointInfo{latestFileChunkSuffixNum: 0, latestFileChunksize: 0}
err = mgr.saveCurrentInfo(cpInfo) err = mgr.saveCurrentInfo(cpInfo, true)
if err != nil { if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err)) panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
} }
} }
updateCPInfo(conf, cpInfo)
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)) currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
if err != nil { if err != nil {
panic(fmt.Sprintf("Could not open writer to current file: %s", err)) panic(fmt.Sprintf("Could not open writer to current file: %s", err))
...@@ -81,9 +82,12 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr { ...@@ -81,9 +82,12 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr {
mgr.index = newBlockIndex(db) mgr.index = newBlockIndex(db)
mgr.cpInfo = cpInfo mgr.cpInfo = cpInfo
mgr.currentFileWriter = currentFileWriter mgr.currentFileWriter = currentFileWriter
// init BlockchainInfo // init BlockchainInfo
bcInfo := &protos.BlockchainInfo{Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil} bcInfo := &protos.BlockchainInfo{
Height: 0,
CurrentBlockHash: nil,
PreviousBlockHash: nil}
if cpInfo.lastBlockNumber > 0 { if cpInfo.lastBlockNumber > 0 {
lastBlock, err := mgr.retrieveSerBlockByNumber(cpInfo.lastBlockNumber) lastBlock, err := mgr.retrieveSerBlockByNumber(cpInfo.lastBlockNumber)
if err != nil { if err != nil {
...@@ -104,11 +108,37 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr { ...@@ -104,11 +108,37 @@ func newBlockfileMgr(conf *Conf) *blockfileMgr {
} }
func initDB(conf *Conf) *db.DB { func initDB(conf *Conf) *db.DB {
dbInst := db.CreateDB(&db.Conf{DBPath: conf.dbPath, CFNames: []string{blockIndexCF}}) dbInst := db.CreateDB(&db.Conf{
DBPath: conf.dbPath,
CFNames: []string{blockIndexCF},
DisableWAL: true})
dbInst.Open() dbInst.Open()
return dbInst return dbInst
} }
func updateCPInfo(conf *Conf, cpInfo *checkpointInfo) {
logger.Debugf("Starting checkpoint=%s", cpInfo)
rootDir := conf.blockfilesDir
filePath := deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)
exists, size, err := util.FileExists(filePath)
if err != nil {
panic(fmt.Sprintf("Error in checking whether file [%s] exists: %s", filePath, err))
}
logger.Debugf("status of file [%s]: exists=[%t], size=[%d]", filePath, exists, size)
if !exists || int(size) == cpInfo.latestFileChunksize {
// check point info is in sync with the file on disk
return
}
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(filePath, int64(cpInfo.latestFileChunksize))
if err != nil {
panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err))
}
cpInfo.lastBlockNumber += uint64(numBlocks)
cpInfo.latestFileChunksize = int(endOffsetLastBlock)
logger.Debugf("Checkpoint after updates by scanning the last file segment:%s", cpInfo)
}
func deriveBlockfilePath(rootDir string, suffixNum int) string { func deriveBlockfilePath(rootDir string, suffixNum int) string {
return rootDir + "/" + blockfilePrefix + fmt.Sprintf("%06d", suffixNum) return rootDir + "/" + blockfilePrefix + fmt.Sprintf("%06d", suffixNum)
} }
...@@ -123,13 +153,18 @@ func (mgr *blockfileMgr) close() { ...@@ -123,13 +153,18 @@ func (mgr *blockfileMgr) close() {
} }
func (mgr *blockfileMgr) moveToNextFile() { func (mgr *blockfileMgr) moveToNextFile() {
nextFileInfo := &checkpointInfo{latestFileChunkSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum + 1, latestFileChunksize: 0} nextFileInfo := &checkpointInfo{
nextFileWriter, err := newBlockfileWriter(deriveBlockfilePath(mgr.rootDir, nextFileInfo.latestFileChunkSuffixNum)) latestFileChunkSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum + 1,
latestFileChunksize: 0}
nextFileWriter, err := newBlockfileWriter(
deriveBlockfilePath(mgr.rootDir, nextFileInfo.latestFileChunkSuffixNum))
if err != nil { if err != nil {
panic(fmt.Sprintf("Could not open writer to next file: %s", err)) panic(fmt.Sprintf("Could not open writer to next file: %s", err))
} }
mgr.currentFileWriter.close() mgr.currentFileWriter.close()
err = mgr.saveCurrentInfo(nextFileInfo) err = mgr.saveCurrentInfo(nextFileInfo, true)
if err != nil { if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err)) panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
} }
...@@ -145,49 +180,44 @@ func (mgr *blockfileMgr) addBlock(block *protos.Block2) error { ...@@ -145,49 +180,44 @@ func (mgr *blockfileMgr) addBlock(block *protos.Block2) error {
blockBytes := serBlock.GetBytes() blockBytes := serBlock.GetBytes()
blockHash := serBlock.ComputeHash() blockHash := serBlock.ComputeHash()
txOffsets, err := serBlock.GetTxOffsets() txOffsets, err := serBlock.GetTxOffsets()
currentOffset := mgr.cpInfo.latestFileChunksize
if err != nil { if err != nil {
return fmt.Errorf("Error while serializing block: %s", err) return fmt.Errorf("Error while serializing block: %s", err)
} }
currentOffset := mgr.cpInfo.latestFileChunksize blockBytesLen := len(blockBytes)
length := len(blockBytes) blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))
encodedLen := proto.EncodeVarint(uint64(length)) totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)
totalLen := length + len(encodedLen)
if currentOffset+totalLen > mgr.conf.maxBlockfileSize { if currentOffset+totalBytesToAppend > mgr.conf.maxBlockfileSize {
mgr.moveToNextFile() mgr.moveToNextFile()
currentOffset = 0 currentOffset = 0
} }
err = mgr.currentFileWriter.append(encodedLen) err = mgr.currentFileWriter.append(blockBytesEncodedLen, false)
if err != nil { if err == nil {
err1 := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize) err = mgr.currentFileWriter.append(blockBytes, true)
if err1 != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error while appending a block: %s", err))
}
return fmt.Errorf("Error while appending block to file: %s", err)
} }
err = mgr.currentFileWriter.append(blockBytes)
if err != nil { if err != nil {
err1 := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize) truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if err1 != nil { if truncateErr != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error while appending a block: %s", err)) panic(fmt.Sprintf("Could not truncate current file to known size after an error during block append: %s", err))
} }
return fmt.Errorf("Error while appending block to file: %s", err) return fmt.Errorf("Error while appending block to file: %s", err)
} }
mgr.cpInfo.latestFileChunksize += totalLen mgr.cpInfo.latestFileChunksize += totalBytesToAppend
mgr.cpInfo.lastBlockNumber++ mgr.cpInfo.lastBlockNumber++
err = mgr.saveCurrentInfo(mgr.cpInfo) err = mgr.saveCurrentInfo(mgr.cpInfo, false)
if err != nil { if err != nil {
mgr.cpInfo.latestFileChunksize -= totalLen mgr.cpInfo.latestFileChunksize -= totalBytesToAppend
err1 := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize) truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if err1 != nil { if truncateErr != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error while appending a block: %s", err)) panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err))
} }
return fmt.Errorf("Error while saving current file info to db: %s", err) return fmt.Errorf("Error while saving current file info to db: %s", err)
} }
blockFLP := &fileLocPointer{fileSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum} blockFLP := &fileLocPointer{fileSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum}
blockFLP.offset = currentOffset blockFLP.offset = currentOffset
mgr.index.indexBlock(mgr.cpInfo.lastBlockNumber, blockHash, blockFLP, length, len(encodedLen), txOffsets) mgr.index.indexBlock(mgr.cpInfo.lastBlockNumber, blockHash, blockFLP, blockBytesLen, len(blockBytesEncodedLen), txOffsets)
mgr.updateBlockchainInfo(blockHash, block) mgr.updateBlockchainInfo(blockHash, block)
return nil return nil
} }
...@@ -198,7 +228,11 @@ func (mgr *blockfileMgr) getBlockchainInfo() *protos.BlockchainInfo { ...@@ -198,7 +228,11 @@ func (mgr *blockfileMgr) getBlockchainInfo() *protos.BlockchainInfo {
func (mgr *blockfileMgr) updateBlockchainInfo(latestBlockHash []byte, latestBlock *protos.Block2) { func (mgr *blockfileMgr) updateBlockchainInfo(latestBlockHash []byte, latestBlock *protos.Block2) {
currentBCInfo := mgr.getBlockchainInfo() currentBCInfo := mgr.getBlockchainInfo()
newBCInfo := &protos.BlockchainInfo{Height: currentBCInfo.Height + 1, CurrentBlockHash: latestBlockHash, PreviousBlockHash: latestBlock.PreviousBlockHash} newBCInfo := &protos.BlockchainInfo{
Height: currentBCInfo.Height + 1,
CurrentBlockHash: latestBlockHash,
PreviousBlockHash: latestBlock.PreviousBlockHash}
mgr.bcInfo.Store(newBCInfo) mgr.bcInfo.Store(newBCInfo)
} }
...@@ -315,33 +349,59 @@ func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) { ...@@ -315,33 +349,59 @@ func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) {
} }
func (mgr *blockfileMgr) loadCurrentInfo() (*checkpointInfo, error) { func (mgr *blockfileMgr) loadCurrentInfo() (*checkpointInfo, error) {
b, err := mgr.db.Get(mgr.defaultCF, blkMgrInfoKey) var b []byte
if err != nil { var err error
return nil, err if b, err = mgr.db.Get(mgr.defaultCF, blkMgrInfoKey); b == nil || err != nil {
}
if b == nil {
return nil, err return nil, err
} }
i := &checkpointInfo{} i := &checkpointInfo{}
if err = i.unmarshal(b); err != nil { if err = i.unmarshal(b); err != nil {
return nil, err return nil, err
} }
logger.Debugf("loaded checkpointInfo:%s", i)
return i, nil return i, nil
} }
func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo) error { func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, flush bool) error {
b, err := i.marshal() b, err := i.marshal()
if err != nil { if err != nil {
return err return err
} }
err = mgr.db.Put(mgr.defaultCF, blkMgrInfoKey, b) if err = mgr.db.Put(mgr.defaultCF, blkMgrInfoKey, b); err != nil {
if err != nil { return err
}
if flush {
if err = mgr.db.Flush(true); err != nil {
return err return err
} }
logger.Debugf("saved checkpointInfo:%s", i)
}
return nil return nil
} }
// blkMgrInfo func scanForLastCompleteBlock(filePath string, startingOffset int64) (int64, int, error) {
logger.Debugf("scanForLastCompleteBlock(): filePath=[%s], startingOffset=[%d]", filePath, startingOffset)
numBlocks := 0
blockStream, err := newBlockStream(filePath, startingOffset)
if err != nil {
return 0, 0, err
}
defer blockStream.close()
for {
blockBytes, err := blockStream.nextBlockBytes()
if blockBytes == nil || err != nil {
logger.Debugf(`scanForLastCompleteBlock(): error=[%s].
This may happen if a crash has happened during block appending.
Returning current offset as a last complete block's end offset`, err)
break
}
numBlocks++
}
logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentFileOffset)
return blockStream.currentFileOffset, numBlocks, err
}
// checkpointInfo
type checkpointInfo struct { type checkpointInfo struct {
latestFileChunkSuffixNum int latestFileChunkSuffixNum int
latestFileChunksize int latestFileChunksize int
...@@ -385,3 +445,8 @@ func (i *checkpointInfo) unmarshal(b []byte) error { ...@@ -385,3 +445,8 @@ func (i *checkpointInfo) unmarshal(b []byte) error {
return nil return nil
} }
func (i *checkpointInfo) String() string {
return fmt.Sprintf("latestFileChunkSuffixNum=[%d], latestFileChunksize=[%d], lastBlockNumber=[%d]",
i.latestFileChunkSuffixNum, i.latestFileChunksize, i.lastBlockNumber)
}
...@@ -36,6 +36,61 @@ func TestBlockfileMgrBlockReadWrite(t *testing.T) { ...@@ -36,6 +36,61 @@ func TestBlockfileMgrBlockReadWrite(t *testing.T) {
blkfileMgrWrapper.testGetBlockByNumber(blocks, 1) blkfileMgrWrapper.testGetBlockByNumber(blocks, 1)
} }
func TestBlockfileMgrCrashDuringWriting(t *testing.T) {
testBlockfileMgrCrashDuringWriting(t, 10, 2, 1000, 10)
testBlockfileMgrCrashDuringWriting(t, 10, 2, 1000, 1)
testBlockfileMgrCrashDuringWriting(t, 10, 2, 1000, 0)
testBlockfileMgrCrashDuringWriting(t, 0, 0, 1000, 10)
}
func testBlockfileMgrCrashDuringWriting(t *testing.T, numBlocksBeforeCheckpoint int,
numBlocksAfterCheckpoint int, numLastBlockBytes int, numPartialBytesToWrite int) {
env := newTestEnv(t)
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blocksBeforeCP := testutil.ConstructTestBlocks(t, numBlocksBeforeCheckpoint)
blkfileMgrWrapper.addBlocks(blocksBeforeCP)
currentCPInfo := blkfileMgrWrapper.blockfileMgr.cpInfo
cpInfo1 := &checkpointInfo{
currentCPInfo.latestFileChunkSuffixNum,
currentCPInfo.latestFileChunksize,
currentCPInfo.lastBlockNumber}
blocksAfterCP := testutil.ConstructTestBlocks(t, numBlocksAfterCheckpoint)
blkfileMgrWrapper.addBlocks(blocksAfterCP)
cpInfo2 := blkfileMgrWrapper.blockfileMgr.cpInfo
// simulate a crash scenario
lastBlockBytes := []byte{}
encodedLen := proto.EncodeVarint(uint64(numLastBlockBytes))
randomBytes := testutil.ConstructRandomBytes(t, numLastBlockBytes)
lastBlockBytes = append(lastBlockBytes, encodedLen...)
lastBlockBytes = append(lastBlockBytes, randomBytes...)
partialBytes := lastBlockBytes[:numPartialBytesToWrite]
blkfileMgrWrapper.blockfileMgr.currentFileWriter.append(partialBytes, true)
blkfileMgrWrapper.blockfileMgr.saveCurrentInfo(cpInfo1, true)
blkfileMgrWrapper.close()
// simulate a start after a crash
blkfileMgrWrapper = newTestBlockfileWrapper(t, env)
defer blkfileMgrWrapper.close()
cpInfo3 := blkfileMgrWrapper.blockfileMgr.cpInfo
testutil.AssertEquals(t, cpInfo3, cpInfo2)
// add fresh blocks after restart
numBlocksAfterRestart := 2
blocksAfterRestart := testutil.ConstructTestBlocks(t, numBlocksAfterRestart)
blkfileMgrWrapper.addBlocks(blocksAfterRestart)
// itrerate for all blocks
allBlocks := []*protos.Block2{}
allBlocks = append(allBlocks, blocksBeforeCP...)
allBlocks = append(allBlocks, blocksAfterCP...)
allBlocks = append(allBlocks, blocksAfterRestart...)
numTotalBlocks := len(allBlocks)
testBlockfileMgrBlockIterator(t, blkfileMgrWrapper.blockfileMgr, 1, numTotalBlocks, allBlocks)
}
func TestBlockfileMgrBlockIterator(t *testing.T) { func TestBlockfileMgrBlockIterator(t *testing.T) {
env := newTestEnv(t) env := newTestEnv(t)
defer env.Cleanup() defer env.Cleanup()
...@@ -43,16 +98,21 @@ func TestBlockfileMgrBlockIterator(t *testing.T) { ...@@ -43,16 +98,21 @@ func TestBlockfileMgrBlockIterator(t *testing.T) {
defer blkfileMgrWrapper.close() defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10) blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks) blkfileMgrWrapper.addBlocks(blocks)
itr, err := blkfileMgrWrapper.blockfileMgr.retrieveBlocks(1, 8) testBlockfileMgrBlockIterator(t, blkfileMgrWrapper.blockfileMgr, 1, 8, blocks[0:8])
}
func testBlockfileMgrBlockIterator(t *testing.T, blockfileMgr *blockfileMgr,
firstBlockNum int, lastBlockNum int, expectedBlocks []*protos.Block2) {
itr, err := blockfileMgr.retrieveBlocks(uint64(firstBlockNum), uint64(lastBlockNum))
defer itr.Close() defer itr.Close()
testutil.AssertNoError(t, err, "Error while getting blocks iterator") testutil.AssertNoError(t, err, "Error while getting blocks iterator")
numBlocksItrated := 0 numBlocksItrated := 0
for ; itr.Next(); numBlocksItrated++ { for ; itr.Next(); numBlocksItrated++ {
block, err := itr.Get() block, err := itr.Get()
testutil.AssertNoError(t, err, fmt.Sprintf("Error while getting block number [%d] from iterator", numBlocksItrated)) testutil.AssertNoError(t, err, fmt.Sprintf("Error while getting block number [%d] from iterator", numBlocksItrated))
testutil.AssertEquals(t, block.(*BlockHolder).GetBlock(), blocks[numBlocksItrated]) testutil.AssertEquals(t, block.(*BlockHolder).GetBlock(), expectedBlocks[numBlocksItrated])
} }
testutil.AssertEquals(t, numBlocksItrated, 8) testutil.AssertEquals(t, numBlocksItrated, lastBlockNum-firstBlockNum+1)
} }
func TestBlockfileMgrBlockchainInfo(t *testing.T) { func TestBlockfileMgrBlockchainInfo(t *testing.T) {
......
...@@ -47,12 +47,14 @@ func (w *blockfileWriter) truncateFile(targetSize int) error { ...@@ -47,12 +47,14 @@ func (w *blockfileWriter) truncateFile(targetSize int) error {
return nil return nil
} }
func (w *blockfileWriter) append(b []byte) error { func (w *blockfileWriter) append(b []byte, sync bool) error {
_, err := w.file.Write(b) _, err := w.file.Write(b)
if err != nil { if err != nil {
return err return err
} }
w.file.Sync() if sync {
return w.file.Sync()
}
return nil return nil
} }
...@@ -99,6 +101,7 @@ func (r *blockfileReader) close() error { ...@@ -99,6 +101,7 @@ func (r *blockfileReader) close() error {
type blockStream struct { type blockStream struct {
file *os.File file *os.File
reader *bufio.Reader reader *bufio.Reader
currentFileOffset int64
} }
func newBlockStream(filePath string, offset int64) (*blockStream, error) { func newBlockStream(filePath string, offset int64) (*blockStream, error) {
...@@ -115,7 +118,7 @@ func newBlockStream(filePath string, offset int64) (*blockStream, error) { ...@@ -115,7 +118,7 @@ func newBlockStream(filePath string, offset int64) (*blockStream, error) {
if newPosition != offset { if newPosition != offset {
panic(fmt.Sprintf("Could not seek file [%s] to given offset [%d]. New position = [%d]", filePath, offset, newPosition)) panic(fmt.Sprintf("Could not seek file [%s] to given offset [%d]. New position = [%d]", filePath, offset, newPosition))
} }
s := &blockStream{file, bufio.NewReader(file)} s := &blockStream{file, bufio.NewReader(file), offset}
return s, nil return s, nil
} }
...@@ -133,6 +136,7 @@ func (s *blockStream) nextBlockBytes() ([]byte, error) { ...@@ -133,6 +136,7 @@ func (s *blockStream) nextBlockBytes() ([]byte, error) {
if _, err = io.ReadAtLeast(s.reader, blockBytes, int(len)); err != nil { if _, err = io.ReadAtLeast(s.reader, blockBytes, int(len)); err != nil {
return nil, err return nil, err
} }
s.currentFileOffset += int64(n) + int64(len)
return blockBytes, nil return blockBytes, nil
} }
......
...@@ -56,24 +56,7 @@ func (index *blockIndex) indexBlock(blockNum uint64, blockHash []byte, flp *file ...@@ -56,24 +56,7 @@ func (index *blockIndex) indexBlock(blockNum uint64, blockHash []byte, flp *file
} }
batch.PutCF(index.blockIndexCF, index.constructTxIDKey(txID), txFLPBytes) batch.PutCF(index.blockIndexCF, index.constructTxIDKey(txID), txFLPBytes)
} }
// for txNum, txOffset := range txOffsets { if err := index.db.WriteBatch(batch); err != nil {
// txID := constructTxID(blockNum, txNum)
// txBytesLength := 0
// if txNum < len(txOffsets)-1 {
// txBytesLength = txOffsets[txNum+1] - txOffsets[txNum]
// } else {
// txBytesLength = blockLen - txOffsets[txNum]
// }
// txFLP := newFileLocationPointer(flp.fileSuffixNum, flp.offset+skip, &locPointer{txOffset, txBytesLength})
// logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFLP, txID)
// txFLPBytes, marshalErr := txFLP.marshal()
// if marshalErr != nil {
// return marshalErr
// }
// batch.PutCF(index.blockIndexCF, index.constructTxIDKey(txID), txFLPBytes)
// }
err = index.db.WriteBatch(batch)
if err != nil {
return err return err
} }
return nil return nil
...@@ -136,7 +119,7 @@ func (lp *locPointer) String() string { ...@@ -136,7 +119,7 @@ func (lp *locPointer) String() string {
lp.offset, lp.bytesLength) lp.offset, lp.bytesLength)
} }
// locPointer // fileLocPointer
type fileLocPointer struct { type fileLocPointer struct {
fileSuffixNum int fileSuffixNum int
locPointer locPointer
......
...@@ -39,6 +39,7 @@ const ( ...@@ -39,6 +39,7 @@ const (
type Conf struct { type Conf struct {
DBPath string DBPath string
CFNames []string CFNames []string
DisableWAL bool
} }
// DB - a rocksDB instance // DB - a rocksDB instance
...@@ -48,12 +49,23 @@ type DB struct { ...@@ -48,12 +49,23 @@ type DB struct {
cfHandlesMap map[string]*gorocksdb.ColumnFamilyHandle cfHandlesMap map[string]*gorocksdb.ColumnFamilyHandle
dbState dbState dbState dbState
mux sync.Mutex mux sync.Mutex
readOpts *gorocksdb.ReadOptions
writeOpts *gorocksdb.WriteOptions
} }
// CreateDB constructs a `DB` // CreateDB constructs a `DB`
func CreateDB(conf *Conf) *DB { func CreateDB(conf *Conf) *DB {
conf.CFNames = append(conf.CFNames, defaultCFName) conf.CFNames = append(conf.CFNames, defaultCFName)
return &DB{conf: conf, cfHandlesMap: make(map[string]*gorocksdb.ColumnFamilyHandle), dbState: closed} readOpts := gorocksdb.NewDefaultReadOptions()
writeOpts := gorocksdb.NewDefaultWriteOptions()
writeOpts.DisableWAL(conf.DisableWAL)
return &DB{
conf: conf,
cfHandlesMap: make(map[string]*gorocksdb.ColumnFamilyHandle),
dbState: closed,
readOpts: readOpts,
writeOpts: writeOpts}
} }
// Open open underlying rocksdb // Open open underlying rocksdb
...@@ -73,7 +85,6 @@ func (dbInst *DB) Open() { ...@@ -73,7 +85,6 @@ func (dbInst *DB) Open() {
} }
opts := gorocksdb.NewDefaultOptions() opts := gorocksdb.NewDefaultOptions()
defer opts.Destroy() defer opts.Destroy()