blockfile_mgr.go 23.9 KB
Newer Older
manish's avatar
manish committed
1
/*
2
Copyright IBM Corp. All Rights Reserved.
manish's avatar
manish committed
3

4
SPDX-License-Identifier: Apache-2.0
manish's avatar
manish committed
5
6
7
8
9
10
*/

package fsblkstorage

import (
	"fmt"
denyeart's avatar
denyeart committed
11
	"math"
manish's avatar
manish committed
12
	"sync"
manish's avatar
manish committed
13
14
	"sync/atomic"

15
16
	"github.com/davecgh/go-spew/spew"

manish's avatar
manish committed
17
	"github.com/golang/protobuf/proto"
18
	"github.com/hyperledger/fabric/common/flogging"
19
20
21
	"github.com/hyperledger/fabric/common/ledger/blkstorage"
	"github.com/hyperledger/fabric/common/ledger/util"
	"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
22
	"github.com/hyperledger/fabric/protos/common"
23
	"github.com/hyperledger/fabric/protos/peer"
24
	putil "github.com/hyperledger/fabric/protos/utils"
25
	"github.com/pkg/errors"
manish's avatar
manish committed
26
27
)

28
var logger = flogging.MustGetLogger("fsblkstorage")
manish's avatar
manish committed
29
30
31
32
33
34
35
36
37
38
39
40

const (
	blockfilePrefix = "blockfile_"
)

var (
	blkMgrInfoKey = []byte("blkMgrInfo")
)

type blockfileMgr struct {
	rootDir           string
	conf              *Conf
manish's avatar
manish committed
41
	db                *leveldbhelper.DBHandle
manish's avatar
manish committed
42
	index             index
manish's avatar
manish committed
43
	cpInfo            *checkpointInfo
manish's avatar
manish committed
44
	cpInfoCond        *sync.Cond
manish's avatar
manish committed
45
46
47
48
	currentFileWriter *blockfileWriter
	bcInfo            atomic.Value
}

Mari Wade's avatar
Mari Wade committed
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/*
Creates a new manager that will manage the files used for block persistence.
This manager manages the file system FS including
  -- the directory where the files are stored
  -- the individual files where the blocks are stored
  -- the checkpoint which tracks the latest file being persisted to
  -- the index which tracks what block and transaction is in what file
When a new blockfile manager is started (i.e. only on start-up), it checks
if this start-up is the first time the system is coming up or is this a restart
of the system.

The blockfile manager stores blocks of data into a file system.  That file
storage is done by creating sequentially numbered files of a configured size
i.e blockfile_000000, blockfile_000001, etc..

Jay Guo's avatar
Jay Guo committed
64
Each transaction in a block is stored with information about the number of
Mari Wade's avatar
Mari Wade committed
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
bytes in that transaction
 Adding txLoc [fileSuffixNum=0, offset=3, bytesLength=104] for tx [1:0] to index
 Adding txLoc [fileSuffixNum=0, offset=107, bytesLength=104] for tx [1:1] to index
Each block is stored with the total encoded length of that block as well as the
tx location offsets.

Remember that these steps are only done once at start-up of the system.
At start up a new manager:
  *) Checks if the directory for storing files exists, if not creates the dir
  *) Checks if the key value database exists, if not creates one
       (will create a db dir)
  *) Determines the checkpoint information (cpinfo) used for storage
		-- Loads from db if exist, if not instantiate a new cpinfo
		-- If cpinfo was loaded from db, compares to FS
		-- If cpinfo and file system are not in sync, syncs cpInfo from FS
  *) Starts a new file writer
		-- truncates file per cpinfo to remove any excess past last block
  *) Determines the index information used to find tx and blocks in
  the file blkstorage
		-- Instantiates a new blockIdxInfo
		-- Loads the index from the db if exists
		-- syncIndex comparing the last block indexed to what is in the FS
		-- If index and file system are not in sync, syncs index from the FS
  *)  Updates blockchain info used by the APIs
*/
manish's avatar
manish committed
90
func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, indexStore *leveldbhelper.DBHandle) *blockfileMgr {
91
	logger.Debugf("newBlockfileMgr() initializing file-based block storage for ledger: %s ", id)
Mari Wade's avatar
Mari Wade committed
92
	//Determine the root directory for the blockfile storage, if it does not exist create it
manish's avatar
manish committed
93
	rootDir := conf.getLedgerBlockDir(id)
manish's avatar
manish committed
94
95
	_, err := util.CreateDirIfMissing(rootDir)
	if err != nil {
96
		panic(fmt.Sprintf("Error creating block storage root dir [%s]: %s", rootDir, err))
manish's avatar
manish committed
97
	}
Mari Wade's avatar
Mari Wade committed
98
	// Instantiate the manager, i.e. blockFileMgr structure
manish's avatar
manish committed
99
	mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore}
Mari Wade's avatar
Mari Wade committed
100
101
102
103

	// cp = checkpointInfo, retrieve from the database the file suffix or number of where blocks were stored.
	// It also retrieves the current size of that file and the last block number that was written to that file.
	// At init checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
manish's avatar
manish committed
104
105
106
107
	cpInfo, err := mgr.loadCurrentInfo()
	if err != nil {
		panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err))
	}
108
	if cpInfo == nil {
109
		logger.Info(`Getting block information from block storage`)
110
111
		if cpInfo, err = constructCheckpointInfoFromBlockFiles(rootDir); err != nil {
			panic(fmt.Sprintf("Could not build checkpoint info from block files: %s", err))
manish's avatar
manish committed
112
		}
113
		logger.Debugf("Info constructed by scanning the blocks dir = %s", spew.Sdump(cpInfo))
114
	} else {
115
		logger.Debug(`Synching block information from block storage (if needed)`)
116
117
118
119
120
		syncCPInfoFromFS(rootDir, cpInfo)
	}
	err = mgr.saveCurrentInfo(cpInfo, true)
	if err != nil {
		panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
manish's avatar
manish committed
121
	}
122

Mari Wade's avatar
Mari Wade committed
123
124
	//Open a writer to the file identified by the number and truncate it to only contain the latest block
	// that was completely saved (file system, index, cpinfo, etc)
manish's avatar
manish committed
125
126
127
128
	currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
	if err != nil {
		panic(fmt.Sprintf("Could not open writer to current file: %s", err))
	}
Mari Wade's avatar
Mari Wade committed
129
	//Truncate the file to remove excess past last block
manish's avatar
manish committed
130
131
132
133
134
	err = currentFileWriter.truncateFile(cpInfo.latestFileChunksize)
	if err != nil {
		panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err))
	}

Mari Wade's avatar
Mari Wade committed
135
	// Create a new KeyValue store database handler for the blocks index in the keyvalue database
136
137
138
	if mgr.index, err = newBlockIndex(indexConfig, indexStore); err != nil {
		panic(fmt.Sprintf("error in block index: %s", err))
	}
Mari Wade's avatar
Mari Wade committed
139
140

	// Update the manager with the checkpoint info and the file writer
manish's avatar
manish committed
141
142
	mgr.cpInfo = cpInfo
	mgr.currentFileWriter = currentFileWriter
Mari Wade's avatar
Mari Wade committed
143
144
	// Create a checkpoint condition (event) variable, for the  goroutine waiting for
	// or announcing the occurrence of an event.
manish's avatar
manish committed
145
	mgr.cpInfoCond = sync.NewCond(&sync.Mutex{})
Mari Wade's avatar
Mari Wade committed
146
147

	// init BlockchainInfo for external API's
148
	bcInfo := &common.BlockchainInfo{
manish's avatar
manish committed
149
150
151
152
		Height:            0,
		CurrentBlockHash:  nil,
		PreviousBlockHash: nil}

153
	if !cpInfo.isChainEmpty {
154
155
		//If start up is a restart of an existing storage, sync the index from block storage and update BlockchainInfo for external API's
		mgr.syncIndex()
156
		lastBlockHeader, err := mgr.retrieveBlockHeaderByNumber(cpInfo.lastBlockNumber)
manish's avatar
manish committed
157
		if err != nil {
158
			panic(fmt.Sprintf("Could not retrieve header of the last block form file: %s", err))
manish's avatar
manish committed
159
		}
160
161
		lastBlockHash := lastBlockHeader.Hash()
		previousBlockHash := lastBlockHeader.PreviousHash
162
		bcInfo = &common.BlockchainInfo{
163
			Height:            cpInfo.lastBlockNumber + 1,
manish's avatar
manish committed
164
165
166
167
168
169
170
			CurrentBlockHash:  lastBlockHash,
			PreviousBlockHash: previousBlockHash}
	}
	mgr.bcInfo.Store(bcInfo)
	return mgr
}

Mari Wade's avatar
Mari Wade committed
171
172
173
174
//cp = checkpointInfo, from the database gets the file suffix and the size of
// the file of where the last block was written.  Also retrieves contains the
// last block number that was written.  At init
//checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
manish's avatar
manish committed
175
func syncCPInfoFromFS(rootDir string, cpInfo *checkpointInfo) {
manish's avatar
manish committed
176
	logger.Debugf("Starting checkpoint=%s", cpInfo)
Mari Wade's avatar
Mari Wade committed
177
	//Checks if the file suffix of where the last block was written exists
manish's avatar
manish committed
178
179
180
181
182
183
	filePath := deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)
	exists, size, err := util.FileExists(filePath)
	if err != nil {
		panic(fmt.Sprintf("Error in checking whether file [%s] exists: %s", filePath, err))
	}
	logger.Debugf("status of file [%s]: exists=[%t], size=[%d]", filePath, exists, size)
Mari Wade's avatar
Mari Wade committed
184
185
186
	//Test is !exists because when file number is first used the file does not exist yet
	//checks that the file exists and that the size of the file is what is stored in cpinfo
	//status of file [/tmp/tests/ledger/blkstorage/fsblkstorage/blocks/blockfile_000000]: exists=[false], size=[0]
manish's avatar
manish committed
187
188
189
190
	if !exists || int(size) == cpInfo.latestFileChunksize {
		// check point info is in sync with the file on disk
		return
	}
Mari Wade's avatar
Mari Wade committed
191
	//Scan the file system to verify that the checkpoint info stored in db is correct
192
	_, endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(
manish's avatar
manish committed
193
		rootDir, cpInfo.latestFileChunkSuffixNum, int64(cpInfo.latestFileChunksize))
manish's avatar
manish committed
194
195
196
197
	if err != nil {
		panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err))
	}
	cpInfo.latestFileChunksize = int(endOffsetLastBlock)
198
199
200
201
202
203
204
205
206
207
	if numBlocks == 0 {
		return
	}
	//Updates the checkpoint info for the actual last block number stored and it's end location
	if cpInfo.isChainEmpty {
		cpInfo.lastBlockNumber = uint64(numBlocks - 1)
	} else {
		cpInfo.lastBlockNumber += uint64(numBlocks)
	}
	cpInfo.isChainEmpty = false
manish's avatar
manish committed
208
209
210
	logger.Debugf("Checkpoint after updates by scanning the last file segment:%s", cpInfo)
}

manish's avatar
manish committed
211
212
213
214
215
216
217
218
219
func deriveBlockfilePath(rootDir string, suffixNum int) string {
	return rootDir + "/" + blockfilePrefix + fmt.Sprintf("%06d", suffixNum)
}

func (mgr *blockfileMgr) close() {
	mgr.currentFileWriter.close()
}

func (mgr *blockfileMgr) moveToNextFile() {
manish's avatar
manish committed
220
	cpInfo := &checkpointInfo{
manish's avatar
manish committed
221
		latestFileChunkSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum + 1,
manish's avatar
manish committed
222
223
		latestFileChunksize:      0,
		lastBlockNumber:          mgr.cpInfo.lastBlockNumber}
manish's avatar
manish committed
224
225

	nextFileWriter, err := newBlockfileWriter(
manish's avatar
manish committed
226
		deriveBlockfilePath(mgr.rootDir, cpInfo.latestFileChunkSuffixNum))
manish's avatar
manish committed
227

manish's avatar
manish committed
228
229
230
231
	if err != nil {
		panic(fmt.Sprintf("Could not open writer to next file: %s", err))
	}
	mgr.currentFileWriter.close()
manish's avatar
manish committed
232
	err = mgr.saveCurrentInfo(cpInfo, true)
manish's avatar
manish committed
233
234
235
236
	if err != nil {
		panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
	}
	mgr.currentFileWriter = nextFileWriter
manish's avatar
manish committed
237
	mgr.updateCheckpoint(cpInfo)
manish's avatar
manish committed
238
239
}

240
func (mgr *blockfileMgr) addBlock(block *common.Block) error {
241
	if block.Header.Number != mgr.getBlockchainInfo().Height {
242
		return errors.Errorf("block number should have been %d but was %d", mgr.getBlockchainInfo().Height, block.Header.Number)
243
	}
244
	blockBytes, info, err := serializeBlock(block)
manish's avatar
manish committed
245
	if err != nil {
246
		return errors.WithMessage(err, "error serializing block")
manish's avatar
manish committed
247
	}
248
	blockHash := block.Header.Hash()
Mari Wade's avatar
Mari Wade committed
249
	//Get the location / offset where each transaction starts in the block and where the block ends
250
	txOffsets := info.txOffsets
manish's avatar
manish committed
251
	currentOffset := mgr.cpInfo.latestFileChunksize
252

manish's avatar
manish committed
253
254
255
256
	blockBytesLen := len(blockBytes)
	blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))
	totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)

Mari Wade's avatar
Mari Wade committed
257
258
	//Determine if we need to start a new file since the size of this block
	//exceeds the amount of space left in the current file
manish's avatar
manish committed
259
	if currentOffset+totalBytesToAppend > mgr.conf.maxBlockfileSize {
manish's avatar
manish committed
260
261
262
		mgr.moveToNextFile()
		currentOffset = 0
	}
Mari Wade's avatar
Mari Wade committed
263
	//append blockBytesEncodedLen to the file
manish's avatar
manish committed
264
265
	err = mgr.currentFileWriter.append(blockBytesEncodedLen, false)
	if err == nil {
Mari Wade's avatar
Mari Wade committed
266
		//append the actual block bytes to the file
manish's avatar
manish committed
267
		err = mgr.currentFileWriter.append(blockBytes, true)
manish's avatar
manish committed
268
269
	}
	if err != nil {
manish's avatar
manish committed
270
271
272
		truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
		if truncateErr != nil {
			panic(fmt.Sprintf("Could not truncate current file to known size after an error during block append: %s", err))
manish's avatar
manish committed
273
		}
274
		return errors.WithMessage(err, "error appending block to file")
manish's avatar
manish committed
275
276
	}

Mari Wade's avatar
Mari Wade committed
277
	//Update the checkpoint info with the results of adding the new block
manish's avatar
manish committed
278
279
280
281
	currentCPInfo := mgr.cpInfo
	newCPInfo := &checkpointInfo{
		latestFileChunkSuffixNum: currentCPInfo.latestFileChunkSuffixNum,
		latestFileChunksize:      currentCPInfo.latestFileChunksize + totalBytesToAppend,
282
		isChainEmpty:             false,
283
		lastBlockNumber:          block.Header.Number}
Mari Wade's avatar
Mari Wade committed
284
	//save the checkpoint information in the database
manish's avatar
manish committed
285
286
	if err = mgr.saveCurrentInfo(newCPInfo, false); err != nil {
		truncateErr := mgr.currentFileWriter.truncateFile(currentCPInfo.latestFileChunksize)
manish's avatar
manish committed
287
288
		if truncateErr != nil {
			panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err))
manish's avatar
manish committed
289
		}
290
		return errors.WithMessage(err, "error saving current file info to db")
manish's avatar
manish committed
291
	}
manish's avatar
manish committed
292

Mari Wade's avatar
Mari Wade committed
293
	//Index block file location pointer updated with file suffex and offset for the new block
manish's avatar
manish committed
294
	blockFLP := &fileLocPointer{fileSuffixNum: newCPInfo.latestFileChunkSuffixNum}
manish's avatar
manish committed
295
	blockFLP.offset = currentOffset
manish's avatar
manish committed
296
	// shift the txoffset because we prepend length of bytes before block bytes
297
	for _, txOffset := range txOffsets {
298
		txOffset.loc.offset += len(blockBytesEncodedLen)
manish's avatar
manish committed
299
	}
Mari Wade's avatar
Mari Wade committed
300
	//save the index in the database
301
	if err = mgr.index.indexBlock(&blockIdxInfo{
302
		blockNum: block.Header.Number, blockHash: blockHash,
303
304
305
		flp: blockFLP, txOffsets: txOffsets, metadata: block.Metadata}); err != nil {
		return err
	}
manish's avatar
manish committed
306

Mari Wade's avatar
Mari Wade committed
307
	//update the checkpoint info (for storage) and the blockchain info (for APIs) in the manager
manish's avatar
manish committed
308
	mgr.updateCheckpoint(newCPInfo)
manish's avatar
manish committed
309
310
311
312
	mgr.updateBlockchainInfo(blockHash, block)
	return nil
}

manish's avatar
manish committed
313
314
func (mgr *blockfileMgr) syncIndex() error {
	var lastBlockIndexed uint64
315
	var indexEmpty bool
manish's avatar
manish committed
316
	var err error
Mari Wade's avatar
Mari Wade committed
317
	//from the database, get the last block that was indexed
manish's avatar
manish committed
318
	if lastBlockIndexed, err = mgr.index.getLastBlockIndexed(); err != nil {
319
320
321
322
		if err != errIndexEmpty {
			return err
		}
		indexEmpty = true
manish's avatar
manish committed
323
	}
324

325
	//initialize index to file number:zero, offset:zero and blockNum:0
manish's avatar
manish committed
326
327
	startFileNum := 0
	startOffset := 0
328
	skipFirstBlock := false
Mari Wade's avatar
Mari Wade committed
329
	//get the last file that blocks were added to using the checkpoint info
manish's avatar
manish committed
330
	endFileNum := mgr.cpInfo.latestFileChunkSuffixNum
331
332
	startingBlockNum := uint64(0)

Mari Wade's avatar
Mari Wade committed
333
	//if the index stored in the db has value, update the index information with those values
334
	if !indexEmpty {
335
		if lastBlockIndexed == mgr.cpInfo.lastBlockNumber {
336
			logger.Debug("Both the block files and indices are in sync.")
337
338
			return nil
		}
339
		logger.Debugf("Last block indexed [%d], Last block present in block files [%d]", lastBlockIndexed, mgr.cpInfo.lastBlockNumber)
manish's avatar
manish committed
340
341
342
343
344
345
		var flp *fileLocPointer
		if flp, err = mgr.index.getBlockLocByBlockNum(lastBlockIndexed); err != nil {
			return err
		}
		startFileNum = flp.fileSuffixNum
		startOffset = flp.locPointer.offset
346
		skipFirstBlock = true
347
348
		startingBlockNum = lastBlockIndexed + 1
	} else {
349
		logger.Debugf("No block indexed, Last block present in block files=[%d]", mgr.cpInfo.lastBlockNumber)
manish's avatar
manish committed
350
351
	}

352
	logger.Infof("Start building index from block [%d] to last block [%d]", startingBlockNum, mgr.cpInfo.lastBlockNumber)
353

Mari Wade's avatar
Mari Wade committed
354
	//open a blockstream to the file location that was stored in the index
manish's avatar
manish committed
355
356
357
358
359
360
361
	var stream *blockStream
	if stream, err = newBlockStream(mgr.rootDir, startFileNum, int64(startOffset), endFileNum); err != nil {
		return err
	}
	var blockBytes []byte
	var blockPlacementInfo *blockPlacementInfo

362
363
364
365
366
	if skipFirstBlock {
		if blockBytes, _, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
			return err
		}
		if blockBytes == nil {
367
			return errors.Errorf("block bytes for block num = [%d] should not be nil here. The indexes for the block are already present",
368
369
370
371
				lastBlockIndexed)
		}
	}

372
373
374
	//Should be at the last block already, but go ahead and loop looking for next blockBytes.
	//If there is another block, add it to the index.
	//This will ensure block indexes are correct, for example if peer had crashed before indexes got updated.
375
	blockIdxInfo := &blockIdxInfo{}
manish's avatar
manish committed
376
377
378
379
380
381
382
	for {
		if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
			return err
		}
		if blockBytes == nil {
			break
		}
383
384
		info, err := extractSerializedBlockInfo(blockBytes)
		if err != nil {
manish's avatar
manish committed
385
386
			return err
		}
387
388
389
390

		//The blockStartOffset will get applied to the txOffsets prior to indexing within indexBlock(),
		//therefore just shift by the difference between blockBytesOffset and blockStartOffset
		numBytesToShift := int(blockPlacementInfo.blockBytesOffset - blockPlacementInfo.blockStartOffset)
391
		for _, offset := range info.txOffsets {
392
			offset.loc.offset += numBytesToShift
manish's avatar
manish committed
393
		}
394

Mari Wade's avatar
Mari Wade committed
395
		//Update the blockIndexInfo with what was actually stored in file system
396
397
		blockIdxInfo.blockHash = info.blockHeader.Hash()
		blockIdxInfo.blockNum = info.blockHeader.Number
manish's avatar
manish committed
398
399
		blockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum,
			locPointer: locPointer{offset: int(blockPlacementInfo.blockStartOffset)}}
400
		blockIdxInfo.txOffsets = info.txOffsets
401
402
		blockIdxInfo.metadata = info.metadata

403
		logger.Debugf("syncIndex() indexing block [%d]", blockIdxInfo.blockNum)
manish's avatar
manish committed
404
405
406
		if err = mgr.index.indexBlock(blockIdxInfo); err != nil {
			return err
		}
407
408
409
		if blockIdxInfo.blockNum%10000 == 0 {
			logger.Infof("Indexed block number [%d]", blockIdxInfo.blockNum)
		}
manish's avatar
manish committed
410
	}
411
	logger.Infof("Finished building index. Last block indexed [%d]", blockIdxInfo.blockNum)
manish's avatar
manish committed
412
413
414
	return nil
}

415
416
func (mgr *blockfileMgr) getBlockchainInfo() *common.BlockchainInfo {
	return mgr.bcInfo.Load().(*common.BlockchainInfo)
manish's avatar
manish committed
417
418
}

manish's avatar
manish committed
419
420
421
422
423
424
425
426
func (mgr *blockfileMgr) updateCheckpoint(cpInfo *checkpointInfo) {
	mgr.cpInfoCond.L.Lock()
	defer mgr.cpInfoCond.L.Unlock()
	mgr.cpInfo = cpInfo
	logger.Debugf("Broadcasting about update checkpointInfo: %s", cpInfo)
	mgr.cpInfoCond.Broadcast()
}

427
func (mgr *blockfileMgr) updateBlockchainInfo(latestBlockHash []byte, latestBlock *common.Block) {
manish's avatar
manish committed
428
	currentBCInfo := mgr.getBlockchainInfo()
429
	newBCInfo := &common.BlockchainInfo{
manish's avatar
manish committed
430
431
		Height:            currentBCInfo.Height + 1,
		CurrentBlockHash:  latestBlockHash,
432
		PreviousBlockHash: latestBlock.Header.PreviousHash}
manish's avatar
manish committed
433

manish's avatar
manish committed
434
435
436
	mgr.bcInfo.Store(newBCInfo)
}

437
func (mgr *blockfileMgr) retrieveBlockByHash(blockHash []byte) (*common.Block, error) {
manish's avatar
manish committed
438
439
440
441
442
443
444
445
	logger.Debugf("retrieveBlockByHash() - blockHash = [%#v]", blockHash)
	loc, err := mgr.index.getBlockLocByHash(blockHash)
	if err != nil {
		return nil, err
	}
	return mgr.fetchBlock(loc)
}

446
func (mgr *blockfileMgr) retrieveBlockByNumber(blockNum uint64) (*common.Block, error) {
manish's avatar
manish committed
447
	logger.Debugf("retrieveBlockByNumber() - blockNum = [%d]", blockNum)
denyeart's avatar
denyeart committed
448
449
450

	// interpret math.MaxUint64 as a request for last block
	if blockNum == math.MaxUint64 {
451
		blockNum = mgr.getBlockchainInfo().Height - 1
denyeart's avatar
denyeart committed
452
453
	}

manish's avatar
manish committed
454
455
456
457
458
459
460
	loc, err := mgr.index.getBlockLocByBlockNum(blockNum)
	if err != nil {
		return nil, err
	}
	return mgr.fetchBlock(loc)
}

461
462
463
464
465
466
467
468
469
470
471
func (mgr *blockfileMgr) retrieveBlockByTxID(txID string) (*common.Block, error) {
	logger.Debugf("retrieveBlockByTxID() - txID = [%s]", txID)

	loc, err := mgr.index.getBlockLocByTxID(txID)

	if err != nil {
		return nil, err
	}
	return mgr.fetchBlock(loc)
}

472
473
474
475
476
func (mgr *blockfileMgr) retrieveTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) {
	logger.Debugf("retrieveTxValidationCodeByTxID() - txID = [%s]", txID)
	return mgr.index.getTxValidationCodeByTxID(txID)
}

477
478
func (mgr *blockfileMgr) retrieveBlockHeaderByNumber(blockNum uint64) (*common.BlockHeader, error) {
	logger.Debugf("retrieveBlockHeaderByNumber() - blockNum = [%d]", blockNum)
manish's avatar
manish committed
479
480
481
482
	loc, err := mgr.index.getBlockLocByBlockNum(blockNum)
	if err != nil {
		return nil, err
	}
483
484
485
486
487
488
489
490
491
	blockBytes, err := mgr.fetchBlockBytes(loc)
	if err != nil {
		return nil, err
	}
	info, err := extractSerializedBlockInfo(blockBytes)
	if err != nil {
		return nil, err
	}
	return info.blockHeader, nil
manish's avatar
manish committed
492
493
}

manish's avatar
manish committed
494
func (mgr *blockfileMgr) retrieveBlocks(startNum uint64) (*blocksItr, error) {
manish's avatar
manish committed
495
	return newBlockItr(mgr, startNum), nil
manish's avatar
manish committed
496
497
}

498
func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*common.Envelope, error) {
manish's avatar
manish committed
499
500
501
502
503
	logger.Debugf("retrieveTransactionByID() - txId = [%s]", txID)
	loc, err := mgr.index.getTxLoc(txID)
	if err != nil {
		return nil, err
	}
504
	return mgr.fetchTransactionEnvelope(loc)
manish's avatar
manish committed
505
506
}

507
508
509
func (mgr *blockfileMgr) retrieveTransactionByBlockNumTranNum(blockNum uint64, tranNum uint64) (*common.Envelope, error) {
	logger.Debugf("retrieveTransactionByBlockNumTranNum() - blockNum = [%d], tranNum = [%d]", blockNum, tranNum)
	loc, err := mgr.index.getTXLocByBlockNumTranNum(blockNum, tranNum)
510
511
512
	if err != nil {
		return nil, err
	}
513
	return mgr.fetchTransactionEnvelope(loc)
514
515
}

516
517
func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*common.Block, error) {
	blockBytes, err := mgr.fetchBlockBytes(lp)
manish's avatar
manish committed
518
519
520
	if err != nil {
		return nil, err
	}
521
	block, err := deserializeBlock(blockBytes)
manish's avatar
manish committed
522
523
524
525
526
527
	if err != nil {
		return nil, err
	}
	return block, nil
}

528
func (mgr *blockfileMgr) fetchTransactionEnvelope(lp *fileLocPointer) (*common.Envelope, error) {
529
	logger.Debugf("Entering fetchTransactionEnvelope() %v\n", lp)
530
531
532
	var err error
	var txEnvelopeBytes []byte
	if txEnvelopeBytes, err = mgr.fetchRawBytes(lp); err != nil {
manish's avatar
manish committed
533
534
		return nil, err
	}
535
	_, n := proto.DecodeVarint(txEnvelopeBytes)
536
	return putil.GetEnvelopeFromBlock(txEnvelopeBytes[n:])
manish's avatar
manish committed
537
538
539
}

func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) {
manish's avatar
manish committed
540
	stream, err := newBlockfileStream(mgr.rootDir, lp.fileSuffixNum, int64(lp.offset))
manish's avatar
manish committed
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
	if err != nil {
		return nil, err
	}
	defer stream.close()
	b, err := stream.nextBlockBytes()
	if err != nil {
		return nil, err
	}
	return b, nil
}

func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) {
	filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)
	reader, err := newBlockfileReader(filePath)
	if err != nil {
		return nil, err
	}
	defer reader.close()
	b, err := reader.read(lp.offset, lp.bytesLength)
	if err != nil {
		return nil, err
	}
	return b, nil
}

Mari Wade's avatar
Mari Wade committed
566
//Get the current checkpoint information that is stored in the database
manish's avatar
manish committed
567
func (mgr *blockfileMgr) loadCurrentInfo() (*checkpointInfo, error) {
manish's avatar
manish committed
568
569
	var b []byte
	var err error
manish's avatar
manish committed
570
	if b, err = mgr.db.Get(blkMgrInfoKey); b == nil || err != nil {
manish's avatar
manish committed
571
572
573
574
575
576
		return nil, err
	}
	i := &checkpointInfo{}
	if err = i.unmarshal(b); err != nil {
		return nil, err
	}
manish's avatar
manish committed
577
	logger.Debugf("loaded checkpointInfo:%s", i)
manish's avatar
manish committed
578
579
580
	return i, nil
}

manish's avatar
manish committed
581
func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, sync bool) error {
manish's avatar
manish committed
582
583
584
585
	b, err := i.marshal()
	if err != nil {
		return err
	}
manish's avatar
manish committed
586
	if err = mgr.db.Put(blkMgrInfoKey, b, sync); err != nil {
manish's avatar
manish committed
587
588
589
590
591
		return err
	}
	return nil
}

592
593
// scanForLastCompleteBlock scan a given block file and detects the last offset in the file
// after which there may lie a block partially written (towards the end of the file in a crash scenario).
594
func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) ([]byte, int64, int, error) {
Mari Wade's avatar
Mari Wade committed
595
	//scan the passed file number suffix starting from the passed offset to find the last completed block
manish's avatar
manish committed
596
	numBlocks := 0
597
	var lastBlockBytes []byte
598
599
	blockStream, errOpen := newBlockfileStream(rootDir, fileNum, startingOffset)
	if errOpen != nil {
600
		return nil, 0, 0, errOpen
manish's avatar
manish committed
601
602
	}
	defer blockStream.close()
603
604
	var errRead error
	var blockBytes []byte
manish's avatar
manish committed
605
	for {
606
607
		blockBytes, errRead = blockStream.nextBlockBytes()
		if blockBytes == nil || errRead != nil {
manish's avatar
manish committed
608
609
			break
		}
610
		lastBlockBytes = blockBytes
manish's avatar
manish committed
611
612
		numBlocks++
	}
613
614
615
616
617
618
	if errRead == ErrUnexpectedEndOfBlockfile {
		logger.Debugf(`Error:%s
		The error may happen if a crash has happened during block appending.
		Resetting error to nil and returning current offset as a last complete block's end offset`, errRead)
		errRead = nil
	}
manish's avatar
manish committed
619
	logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentOffset)
620
	return lastBlockBytes, blockStream.currentOffset, numBlocks, errRead
manish's avatar
manish committed
621
622
623
}

// checkpointInfo
manish's avatar
manish committed
624
625
626
type checkpointInfo struct {
	latestFileChunkSuffixNum int
	latestFileChunksize      int
627
	isChainEmpty             bool
manish's avatar
manish committed
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
	lastBlockNumber          uint64
}

func (i *checkpointInfo) marshal() ([]byte, error) {
	buffer := proto.NewBuffer([]byte{})
	var err error
	if err = buffer.EncodeVarint(uint64(i.latestFileChunkSuffixNum)); err != nil {
		return nil, err
	}
	if err = buffer.EncodeVarint(uint64(i.latestFileChunksize)); err != nil {
		return nil, err
	}
	if err = buffer.EncodeVarint(i.lastBlockNumber); err != nil {
		return nil, err
	}
643
644
645
646
647
648
649
	var chainEmptyMarker uint64
	if i.isChainEmpty {
		chainEmptyMarker = 1
	}
	if err = buffer.EncodeVarint(chainEmptyMarker); err != nil {
		return nil, err
	}
manish's avatar
manish committed
650
651
652
653
654
655
	return buffer.Bytes(), nil
}

func (i *checkpointInfo) unmarshal(b []byte) error {
	buffer := proto.NewBuffer(b)
	var val uint64
656
	var chainEmptyMarker uint64
manish's avatar
manish committed
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
	var err error

	if val, err = buffer.DecodeVarint(); err != nil {
		return err
	}
	i.latestFileChunkSuffixNum = int(val)

	if val, err = buffer.DecodeVarint(); err != nil {
		return err
	}
	i.latestFileChunksize = int(val)

	if val, err = buffer.DecodeVarint(); err != nil {
		return err
	}
	i.lastBlockNumber = val
673
674
675
676
	if chainEmptyMarker, err = buffer.DecodeVarint(); err != nil {
		return err
	}
	i.isChainEmpty = chainEmptyMarker == 1
manish's avatar
manish committed
677
678
	return nil
}
manish's avatar
manish committed
679
680

func (i *checkpointInfo) String() string {
681
682
	return fmt.Sprintf("latestFileChunkSuffixNum=[%d], latestFileChunksize=[%d], isChainEmpty=[%t], lastBlockNumber=[%d]",
		i.latestFileChunkSuffixNum, i.latestFileChunksize, i.isChainEmpty, i.lastBlockNumber)
manish's avatar
manish committed
683
}