blockfile_mgr.go 24.5 KB
Newer Older
manish's avatar
manish committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
Copyright IBM Corp. 2016 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fsblkstorage

import (
	"fmt"
denyeart's avatar
denyeart committed
21
	"math"
manish's avatar
manish committed
22
	"sync"
manish's avatar
manish committed
23
24
	"sync/atomic"

25
26
	"github.com/davecgh/go-spew/spew"

manish's avatar
manish committed
27
	"github.com/golang/protobuf/proto"
28
	"github.com/hyperledger/fabric/common/flogging"
29
30
31
	"github.com/hyperledger/fabric/common/ledger/blkstorage"
	"github.com/hyperledger/fabric/common/ledger/util"
	"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
32
	"github.com/hyperledger/fabric/protos/common"
33
	"github.com/hyperledger/fabric/protos/peer"
34
	putil "github.com/hyperledger/fabric/protos/utils"
manish's avatar
manish committed
35
36
)

37
var logger = flogging.MustGetLogger("fsblkstorage")
manish's avatar
manish committed
38
39
40
41
42
43
44
45
46
47
48
49

const (
	blockfilePrefix = "blockfile_"
)

var (
	blkMgrInfoKey = []byte("blkMgrInfo")
)

type blockfileMgr struct {
	rootDir           string
	conf              *Conf
manish's avatar
manish committed
50
	db                *leveldbhelper.DBHandle
manish's avatar
manish committed
51
	index             index
manish's avatar
manish committed
52
	cpInfo            *checkpointInfo
manish's avatar
manish committed
53
	cpInfoCond        *sync.Cond
manish's avatar
manish committed
54
55
56
57
	currentFileWriter *blockfileWriter
	bcInfo            atomic.Value
}

Mari Wade's avatar
Mari Wade committed
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/*
Creates a new manager that will manage the files used for block persistence.
This manager manages the file system FS including
  -- the directory where the files are stored
  -- the individual files where the blocks are stored
  -- the checkpoint which tracks the latest file being persisted to
  -- the index which tracks what block and transaction is in what file
When a new blockfile manager is started (i.e. only on start-up), it checks
if this start-up is the first time the system is coming up or is this a restart
of the system.

The blockfile manager stores blocks of data into a file system.  That file
storage is done by creating sequentially numbered files of a configured size
i.e blockfile_000000, blockfile_000001, etc..

Jay Guo's avatar
Jay Guo committed
73
Each transaction in a block is stored with information about the number of
Mari Wade's avatar
Mari Wade committed
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
bytes in that transaction
 Adding txLoc [fileSuffixNum=0, offset=3, bytesLength=104] for tx [1:0] to index
 Adding txLoc [fileSuffixNum=0, offset=107, bytesLength=104] for tx [1:1] to index
Each block is stored with the total encoded length of that block as well as the
tx location offsets.

Remember that these steps are only done once at start-up of the system.
At start up a new manager:
  *) Checks if the directory for storing files exists, if not creates the dir
  *) Checks if the key value database exists, if not creates one
       (will create a db dir)
  *) Determines the checkpoint information (cpinfo) used for storage
		-- Loads from db if exist, if not instantiate a new cpinfo
		-- If cpinfo was loaded from db, compares to FS
		-- If cpinfo and file system are not in sync, syncs cpInfo from FS
  *) Starts a new file writer
		-- truncates file per cpinfo to remove any excess past last block
  *) Determines the index information used to find tx and blocks in
  the file blkstorage
		-- Instantiates a new blockIdxInfo
		-- Loads the index from the db if exists
		-- syncIndex comparing the last block indexed to what is in the FS
		-- If index and file system are not in sync, syncs index from the FS
  *)  Updates blockchain info used by the APIs
*/
manish's avatar
manish committed
99
func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, indexStore *leveldbhelper.DBHandle) *blockfileMgr {
100
	logger.Debugf("newBlockfileMgr() initializing file-based block storage for ledger: %s ", id)
Mari Wade's avatar
Mari Wade committed
101
	//Determine the root directory for the blockfile storage, if it does not exist create it
manish's avatar
manish committed
102
	rootDir := conf.getLedgerBlockDir(id)
manish's avatar
manish committed
103
104
105
106
	_, err := util.CreateDirIfMissing(rootDir)
	if err != nil {
		panic(fmt.Sprintf("Error: %s", err))
	}
Mari Wade's avatar
Mari Wade committed
107
	// Instantiate the manager, i.e. blockFileMgr structure
manish's avatar
manish committed
108
	mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore}
Mari Wade's avatar
Mari Wade committed
109
110
111
112

	// cp = checkpointInfo, retrieve from the database the file suffix or number of where blocks were stored.
	// It also retrieves the current size of that file and the last block number that was written to that file.
	// At init checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
manish's avatar
manish committed
113
114
115
116
	cpInfo, err := mgr.loadCurrentInfo()
	if err != nil {
		panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err))
	}
117
118
119
120
121
122
	if cpInfo == nil {
		logger.Info(`No info about blocks file found in the db. 
			This could happen if this is the first time the ledger is constructed or the index is dropped.
			Scanning blocks dir for the latest info`)
		if cpInfo, err = constructCheckpointInfoFromBlockFiles(rootDir); err != nil {
			panic(fmt.Sprintf("Could not build checkpoint info from block files: %s", err))
manish's avatar
manish committed
123
		}
124
125
126
127
128
129
130
131
		logger.Infof("Info constructed by scanning the blocks dir = %s", spew.Sdump(cpInfo))
	} else {
		logger.Info(`Synching the info about block files`)
		syncCPInfoFromFS(rootDir, cpInfo)
	}
	err = mgr.saveCurrentInfo(cpInfo, true)
	if err != nil {
		panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
manish's avatar
manish committed
132
	}
133

Mari Wade's avatar
Mari Wade committed
134
135
	//Open a writer to the file identified by the number and truncate it to only contain the latest block
	// that was completely saved (file system, index, cpinfo, etc)
manish's avatar
manish committed
136
137
138
139
	currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
	if err != nil {
		panic(fmt.Sprintf("Could not open writer to current file: %s", err))
	}
Mari Wade's avatar
Mari Wade committed
140
	//Truncate the file to remove excess past last block
manish's avatar
manish committed
141
142
143
144
145
	err = currentFileWriter.truncateFile(cpInfo.latestFileChunksize)
	if err != nil {
		panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err))
	}

Mari Wade's avatar
Mari Wade committed
146
	// Create a new KeyValue store database handler for the blocks index in the keyvalue database
manish's avatar
manish committed
147
	mgr.index = newBlockIndex(indexConfig, indexStore)
Mari Wade's avatar
Mari Wade committed
148
149

	// Update the manager with the checkpoint info and the file writer
manish's avatar
manish committed
150
151
	mgr.cpInfo = cpInfo
	mgr.currentFileWriter = currentFileWriter
Mari Wade's avatar
Mari Wade committed
152
153
	// Create a checkpoint condition (event) variable, for the  goroutine waiting for
	// or announcing the occurrence of an event.
manish's avatar
manish committed
154
	mgr.cpInfoCond = sync.NewCond(&sync.Mutex{})
Mari Wade's avatar
Mari Wade committed
155
156
157

	// Verify that the index stored in db is accurate with what is actually stored in block file system
	// If not the same, sync the index and the file system
manish's avatar
manish committed
158
159
	mgr.syncIndex()

Mari Wade's avatar
Mari Wade committed
160
	// init BlockchainInfo for external API's
161
	bcInfo := &common.BlockchainInfo{
manish's avatar
manish committed
162
163
164
165
		Height:            0,
		CurrentBlockHash:  nil,
		PreviousBlockHash: nil}

Mari Wade's avatar
Mari Wade committed
166
	//If start up is a restart of an existing storage, update BlockchainInfo for external API's
167
	if !cpInfo.isChainEmpty {
168
		lastBlockHeader, err := mgr.retrieveBlockHeaderByNumber(cpInfo.lastBlockNumber)
manish's avatar
manish committed
169
		if err != nil {
170
			panic(fmt.Sprintf("Could not retrieve header of the last block form file: %s", err))
manish's avatar
manish committed
171
		}
172
173
		lastBlockHash := lastBlockHeader.Hash()
		previousBlockHash := lastBlockHeader.PreviousHash
174
		bcInfo = &common.BlockchainInfo{
175
			Height:            cpInfo.lastBlockNumber + 1,
manish's avatar
manish committed
176
177
178
179
			CurrentBlockHash:  lastBlockHash,
			PreviousBlockHash: previousBlockHash}
	}
	mgr.bcInfo.Store(bcInfo)
Mari Wade's avatar
Mari Wade committed
180
	//return the new manager (blockfileMgr)
manish's avatar
manish committed
181
182
183
	return mgr
}

Mari Wade's avatar
Mari Wade committed
184
185
186
187
//cp = checkpointInfo, from the database gets the file suffix and the size of
// the file of where the last block was written.  Also retrieves contains the
// last block number that was written.  At init
//checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
manish's avatar
manish committed
188
func syncCPInfoFromFS(rootDir string, cpInfo *checkpointInfo) {
manish's avatar
manish committed
189
	logger.Debugf("Starting checkpoint=%s", cpInfo)
Mari Wade's avatar
Mari Wade committed
190
	//Checks if the file suffix of where the last block was written exists
manish's avatar
manish committed
191
192
193
194
195
196
	filePath := deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)
	exists, size, err := util.FileExists(filePath)
	if err != nil {
		panic(fmt.Sprintf("Error in checking whether file [%s] exists: %s", filePath, err))
	}
	logger.Debugf("status of file [%s]: exists=[%t], size=[%d]", filePath, exists, size)
Mari Wade's avatar
Mari Wade committed
197
198
199
	//Test is !exists because when file number is first used the file does not exist yet
	//checks that the file exists and that the size of the file is what is stored in cpinfo
	//status of file [/tmp/tests/ledger/blkstorage/fsblkstorage/blocks/blockfile_000000]: exists=[false], size=[0]
manish's avatar
manish committed
200
201
202
203
	if !exists || int(size) == cpInfo.latestFileChunksize {
		// check point info is in sync with the file on disk
		return
	}
Mari Wade's avatar
Mari Wade committed
204
	//Scan the file system to verify that the checkpoint info stored in db is correct
205
	_, endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(
manish's avatar
manish committed
206
		rootDir, cpInfo.latestFileChunkSuffixNum, int64(cpInfo.latestFileChunksize))
manish's avatar
manish committed
207
208
209
210
	if err != nil {
		panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err))
	}
	cpInfo.latestFileChunksize = int(endOffsetLastBlock)
211
212
213
214
215
216
217
218
219
220
	if numBlocks == 0 {
		return
	}
	//Updates the checkpoint info for the actual last block number stored and it's end location
	if cpInfo.isChainEmpty {
		cpInfo.lastBlockNumber = uint64(numBlocks - 1)
	} else {
		cpInfo.lastBlockNumber += uint64(numBlocks)
	}
	cpInfo.isChainEmpty = false
manish's avatar
manish committed
221
222
223
	logger.Debugf("Checkpoint after updates by scanning the last file segment:%s", cpInfo)
}

manish's avatar
manish committed
224
225
226
227
228
229
230
231
232
func deriveBlockfilePath(rootDir string, suffixNum int) string {
	return rootDir + "/" + blockfilePrefix + fmt.Sprintf("%06d", suffixNum)
}

func (mgr *blockfileMgr) close() {
	mgr.currentFileWriter.close()
}

func (mgr *blockfileMgr) moveToNextFile() {
manish's avatar
manish committed
233
	cpInfo := &checkpointInfo{
manish's avatar
manish committed
234
		latestFileChunkSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum + 1,
manish's avatar
manish committed
235
236
		latestFileChunksize:      0,
		lastBlockNumber:          mgr.cpInfo.lastBlockNumber}
manish's avatar
manish committed
237
238

	nextFileWriter, err := newBlockfileWriter(
manish's avatar
manish committed
239
		deriveBlockfilePath(mgr.rootDir, cpInfo.latestFileChunkSuffixNum))
manish's avatar
manish committed
240

manish's avatar
manish committed
241
242
243
244
	if err != nil {
		panic(fmt.Sprintf("Could not open writer to next file: %s", err))
	}
	mgr.currentFileWriter.close()
manish's avatar
manish committed
245
	err = mgr.saveCurrentInfo(cpInfo, true)
manish's avatar
manish committed
246
247
248
249
	if err != nil {
		panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
	}
	mgr.currentFileWriter = nextFileWriter
manish's avatar
manish committed
250
	mgr.updateCheckpoint(cpInfo)
manish's avatar
manish committed
251
252
}

253
func (mgr *blockfileMgr) addBlock(block *common.Block) error {
254
255
256
	if block.Header.Number != mgr.getBlockchainInfo().Height {
		return fmt.Errorf("Block number should have been %d but was %d", mgr.getBlockchainInfo().Height, block.Header.Number)
	}
257
	blockBytes, info, err := serializeBlock(block)
manish's avatar
manish committed
258
259
260
	if err != nil {
		return fmt.Errorf("Error while serializing block: %s", err)
	}
261
	blockHash := block.Header.Hash()
Mari Wade's avatar
Mari Wade committed
262
	//Get the location / offset where each transaction starts in the block and where the block ends
263
	txOffsets := info.txOffsets
manish's avatar
manish committed
264
	currentOffset := mgr.cpInfo.latestFileChunksize
manish's avatar
manish committed
265
266
267
	if err != nil {
		return fmt.Errorf("Error while serializing block: %s", err)
	}
manish's avatar
manish committed
268
269
270
271
	blockBytesLen := len(blockBytes)
	blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))
	totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)

Mari Wade's avatar
Mari Wade committed
272
273
	//Determine if we need to start a new file since the size of this block
	//exceeds the amount of space left in the current file
manish's avatar
manish committed
274
	if currentOffset+totalBytesToAppend > mgr.conf.maxBlockfileSize {
manish's avatar
manish committed
275
276
277
		mgr.moveToNextFile()
		currentOffset = 0
	}
Mari Wade's avatar
Mari Wade committed
278
	//append blockBytesEncodedLen to the file
manish's avatar
manish committed
279
280
	err = mgr.currentFileWriter.append(blockBytesEncodedLen, false)
	if err == nil {
Mari Wade's avatar
Mari Wade committed
281
		//append the actual block bytes to the file
manish's avatar
manish committed
282
		err = mgr.currentFileWriter.append(blockBytes, true)
manish's avatar
manish committed
283
284
	}
	if err != nil {
manish's avatar
manish committed
285
286
287
		truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
		if truncateErr != nil {
			panic(fmt.Sprintf("Could not truncate current file to known size after an error during block append: %s", err))
manish's avatar
manish committed
288
289
290
291
		}
		return fmt.Errorf("Error while appending block to file: %s", err)
	}

Mari Wade's avatar
Mari Wade committed
292
	//Update the checkpoint info with the results of adding the new block
manish's avatar
manish committed
293
294
295
296
	currentCPInfo := mgr.cpInfo
	newCPInfo := &checkpointInfo{
		latestFileChunkSuffixNum: currentCPInfo.latestFileChunkSuffixNum,
		latestFileChunksize:      currentCPInfo.latestFileChunksize + totalBytesToAppend,
297
		isChainEmpty:             false,
298
		lastBlockNumber:          block.Header.Number}
Mari Wade's avatar
Mari Wade committed
299
	//save the checkpoint information in the database
manish's avatar
manish committed
300
301
	if err = mgr.saveCurrentInfo(newCPInfo, false); err != nil {
		truncateErr := mgr.currentFileWriter.truncateFile(currentCPInfo.latestFileChunksize)
manish's avatar
manish committed
302
303
		if truncateErr != nil {
			panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err))
manish's avatar
manish committed
304
305
306
		}
		return fmt.Errorf("Error while saving current file info to db: %s", err)
	}
manish's avatar
manish committed
307

Mari Wade's avatar
Mari Wade committed
308
	//Index block file location pointer updated with file suffex and offset for the new block
manish's avatar
manish committed
309
	blockFLP := &fileLocPointer{fileSuffixNum: newCPInfo.latestFileChunkSuffixNum}
manish's avatar
manish committed
310
	blockFLP.offset = currentOffset
manish's avatar
manish committed
311
	// shift the txoffset because we prepend length of bytes before block bytes
312
	for _, txOffset := range txOffsets {
313
		txOffset.loc.offset += len(blockBytesEncodedLen)
manish's avatar
manish committed
314
	}
Mari Wade's avatar
Mari Wade committed
315
	//save the index in the database
manish's avatar
manish committed
316
	mgr.index.indexBlock(&blockIdxInfo{
317
		blockNum: block.Header.Number, blockHash: blockHash,
318
		flp: blockFLP, txOffsets: txOffsets, metadata: block.Metadata})
manish's avatar
manish committed
319

Mari Wade's avatar
Mari Wade committed
320
	//update the checkpoint info (for storage) and the blockchain info (for APIs) in the manager
manish's avatar
manish committed
321
	mgr.updateCheckpoint(newCPInfo)
manish's avatar
manish committed
322
323
324
325
	mgr.updateBlockchainInfo(blockHash, block)
	return nil
}

manish's avatar
manish committed
326
327
func (mgr *blockfileMgr) syncIndex() error {
	var lastBlockIndexed uint64
328
	var indexEmpty bool
manish's avatar
manish committed
329
	var err error
Mari Wade's avatar
Mari Wade committed
330
	//from the database, get the last block that was indexed
manish's avatar
manish committed
331
	if lastBlockIndexed, err = mgr.index.getLastBlockIndexed(); err != nil {
332
333
334
335
		if err != errIndexEmpty {
			return err
		}
		indexEmpty = true
manish's avatar
manish committed
336
	}
337

338
	//initialize index to file number:zero, offset:zero and blockNum:0
manish's avatar
manish committed
339
340
	startFileNum := 0
	startOffset := 0
341
	skipFirstBlock := false
Mari Wade's avatar
Mari Wade committed
342
	//get the last file that blocks were added to using the checkpoint info
manish's avatar
manish committed
343
	endFileNum := mgr.cpInfo.latestFileChunkSuffixNum
344
345
	startingBlockNum := uint64(0)

Mari Wade's avatar
Mari Wade committed
346
	//if the index stored in the db has value, update the index information with those values
347
	if !indexEmpty {
348
349
350
351
352
		if lastBlockIndexed == mgr.cpInfo.lastBlockNumber {
			logger.Infof("Both the block files and indices are in sync.")
			return nil
		}
		logger.Infof("Last block indexed [%d], Last block present in block files=[%d]", lastBlockIndexed, mgr.cpInfo.lastBlockNumber)
manish's avatar
manish committed
353
354
355
356
357
358
		var flp *fileLocPointer
		if flp, err = mgr.index.getBlockLocByBlockNum(lastBlockIndexed); err != nil {
			return err
		}
		startFileNum = flp.fileSuffixNum
		startOffset = flp.locPointer.offset
359
		skipFirstBlock = true
360
361
362
		startingBlockNum = lastBlockIndexed + 1
	} else {
		logger.Infof("No block indexed, Last block present in block files=[%d]", mgr.cpInfo.lastBlockNumber)
manish's avatar
manish committed
363
364
	}

365
366
	logger.Infof("Start building index from block [%d]", startingBlockNum)

Mari Wade's avatar
Mari Wade committed
367
	//open a blockstream to the file location that was stored in the index
manish's avatar
manish committed
368
369
370
371
372
373
374
	var stream *blockStream
	if stream, err = newBlockStream(mgr.rootDir, startFileNum, int64(startOffset), endFileNum); err != nil {
		return err
	}
	var blockBytes []byte
	var blockPlacementInfo *blockPlacementInfo

375
376
377
378
379
380
381
382
383
384
	if skipFirstBlock {
		if blockBytes, _, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
			return err
		}
		if blockBytes == nil {
			return fmt.Errorf("block bytes for block num = [%d] should not be nil here. The indexes for the block are already present",
				lastBlockIndexed)
		}
	}

385
386
387
	//Should be at the last block already, but go ahead and loop looking for next blockBytes.
	//If there is another block, add it to the index.
	//This will ensure block indexes are correct, for example if peer had crashed before indexes got updated.
388
	blockIdxInfo := &blockIdxInfo{}
manish's avatar
manish committed
389
390
391
392
393
394
395
	for {
		if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
			return err
		}
		if blockBytes == nil {
			break
		}
396
397
		info, err := extractSerializedBlockInfo(blockBytes)
		if err != nil {
manish's avatar
manish committed
398
399
			return err
		}
400
401
402
403

		//The blockStartOffset will get applied to the txOffsets prior to indexing within indexBlock(),
		//therefore just shift by the difference between blockBytesOffset and blockStartOffset
		numBytesToShift := int(blockPlacementInfo.blockBytesOffset - blockPlacementInfo.blockStartOffset)
404
		for _, offset := range info.txOffsets {
405
			offset.loc.offset += numBytesToShift
manish's avatar
manish committed
406
		}
407

Mari Wade's avatar
Mari Wade committed
408
		//Update the blockIndexInfo with what was actually stored in file system
409
410
		blockIdxInfo.blockHash = info.blockHeader.Hash()
		blockIdxInfo.blockNum = info.blockHeader.Number
manish's avatar
manish committed
411
412
		blockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum,
			locPointer: locPointer{offset: int(blockPlacementInfo.blockStartOffset)}}
413
		blockIdxInfo.txOffsets = info.txOffsets
414
415
		blockIdxInfo.metadata = info.metadata

416
		logger.Debugf("syncIndex() indexing block [%d]", blockIdxInfo.blockNum)
manish's avatar
manish committed
417
418
419
		if err = mgr.index.indexBlock(blockIdxInfo); err != nil {
			return err
		}
420
421
422
		if blockIdxInfo.blockNum%10000 == 0 {
			logger.Infof("Indexed block number [%d]", blockIdxInfo.blockNum)
		}
manish's avatar
manish committed
423
	}
424
	logger.Infof("Finished building index. Last block indexed [%d]", blockIdxInfo.blockNum)
manish's avatar
manish committed
425
426
427
	return nil
}

428
429
func (mgr *blockfileMgr) getBlockchainInfo() *common.BlockchainInfo {
	return mgr.bcInfo.Load().(*common.BlockchainInfo)
manish's avatar
manish committed
430
431
}

manish's avatar
manish committed
432
433
434
435
436
437
438
439
func (mgr *blockfileMgr) updateCheckpoint(cpInfo *checkpointInfo) {
	mgr.cpInfoCond.L.Lock()
	defer mgr.cpInfoCond.L.Unlock()
	mgr.cpInfo = cpInfo
	logger.Debugf("Broadcasting about update checkpointInfo: %s", cpInfo)
	mgr.cpInfoCond.Broadcast()
}

440
func (mgr *blockfileMgr) updateBlockchainInfo(latestBlockHash []byte, latestBlock *common.Block) {
manish's avatar
manish committed
441
	currentBCInfo := mgr.getBlockchainInfo()
442
	newBCInfo := &common.BlockchainInfo{
manish's avatar
manish committed
443
444
		Height:            currentBCInfo.Height + 1,
		CurrentBlockHash:  latestBlockHash,
445
		PreviousBlockHash: latestBlock.Header.PreviousHash}
manish's avatar
manish committed
446

manish's avatar
manish committed
447
448
449
	mgr.bcInfo.Store(newBCInfo)
}

450
func (mgr *blockfileMgr) retrieveBlockByHash(blockHash []byte) (*common.Block, error) {
manish's avatar
manish committed
451
452
453
454
455
456
457
458
	logger.Debugf("retrieveBlockByHash() - blockHash = [%#v]", blockHash)
	loc, err := mgr.index.getBlockLocByHash(blockHash)
	if err != nil {
		return nil, err
	}
	return mgr.fetchBlock(loc)
}

459
func (mgr *blockfileMgr) retrieveBlockByNumber(blockNum uint64) (*common.Block, error) {
manish's avatar
manish committed
460
	logger.Debugf("retrieveBlockByNumber() - blockNum = [%d]", blockNum)
denyeart's avatar
denyeart committed
461
462
463

	// interpret math.MaxUint64 as a request for last block
	if blockNum == math.MaxUint64 {
464
		blockNum = mgr.getBlockchainInfo().Height - 1
denyeart's avatar
denyeart committed
465
466
	}

manish's avatar
manish committed
467
468
469
470
471
472
473
	loc, err := mgr.index.getBlockLocByBlockNum(blockNum)
	if err != nil {
		return nil, err
	}
	return mgr.fetchBlock(loc)
}

474
475
476
477
478
479
480
481
482
483
484
func (mgr *blockfileMgr) retrieveBlockByTxID(txID string) (*common.Block, error) {
	logger.Debugf("retrieveBlockByTxID() - txID = [%s]", txID)

	loc, err := mgr.index.getBlockLocByTxID(txID)

	if err != nil {
		return nil, err
	}
	return mgr.fetchBlock(loc)
}

485
486
487
488
489
func (mgr *blockfileMgr) retrieveTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) {
	logger.Debugf("retrieveTxValidationCodeByTxID() - txID = [%s]", txID)
	return mgr.index.getTxValidationCodeByTxID(txID)
}

490
491
func (mgr *blockfileMgr) retrieveBlockHeaderByNumber(blockNum uint64) (*common.BlockHeader, error) {
	logger.Debugf("retrieveBlockHeaderByNumber() - blockNum = [%d]", blockNum)
manish's avatar
manish committed
492
493
494
495
	loc, err := mgr.index.getBlockLocByBlockNum(blockNum)
	if err != nil {
		return nil, err
	}
496
497
498
499
500
501
502
503
504
	blockBytes, err := mgr.fetchBlockBytes(loc)
	if err != nil {
		return nil, err
	}
	info, err := extractSerializedBlockInfo(blockBytes)
	if err != nil {
		return nil, err
	}
	return info.blockHeader, nil
manish's avatar
manish committed
505
506
}

manish's avatar
manish committed
507
func (mgr *blockfileMgr) retrieveBlocks(startNum uint64) (*blocksItr, error) {
manish's avatar
manish committed
508
	return newBlockItr(mgr, startNum), nil
manish's avatar
manish committed
509
510
}

511
func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*common.Envelope, error) {
manish's avatar
manish committed
512
513
514
515
516
	logger.Debugf("retrieveTransactionByID() - txId = [%s]", txID)
	loc, err := mgr.index.getTxLoc(txID)
	if err != nil {
		return nil, err
	}
517
	return mgr.fetchTransactionEnvelope(loc)
manish's avatar
manish committed
518
519
}

520
521
522
func (mgr *blockfileMgr) retrieveTransactionByBlockNumTranNum(blockNum uint64, tranNum uint64) (*common.Envelope, error) {
	logger.Debugf("retrieveTransactionByBlockNumTranNum() - blockNum = [%d], tranNum = [%d]", blockNum, tranNum)
	loc, err := mgr.index.getTXLocByBlockNumTranNum(blockNum, tranNum)
523
524
525
	if err != nil {
		return nil, err
	}
526
	return mgr.fetchTransactionEnvelope(loc)
527
528
}

529
530
func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*common.Block, error) {
	blockBytes, err := mgr.fetchBlockBytes(lp)
manish's avatar
manish committed
531
532
533
	if err != nil {
		return nil, err
	}
534
	block, err := deserializeBlock(blockBytes)
manish's avatar
manish committed
535
536
537
538
539
540
	if err != nil {
		return nil, err
	}
	return block, nil
}

541
func (mgr *blockfileMgr) fetchTransactionEnvelope(lp *fileLocPointer) (*common.Envelope, error) {
542
	logger.Debugf("Entering fetchTransactionEnvelope() %v\n", lp)
543
544
545
	var err error
	var txEnvelopeBytes []byte
	if txEnvelopeBytes, err = mgr.fetchRawBytes(lp); err != nil {
manish's avatar
manish committed
546
547
		return nil, err
	}
548
	_, n := proto.DecodeVarint(txEnvelopeBytes)
549
	return putil.GetEnvelopeFromBlock(txEnvelopeBytes[n:])
manish's avatar
manish committed
550
551
552
}

func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) {
manish's avatar
manish committed
553
	stream, err := newBlockfileStream(mgr.rootDir, lp.fileSuffixNum, int64(lp.offset))
manish's avatar
manish committed
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
	if err != nil {
		return nil, err
	}
	defer stream.close()
	b, err := stream.nextBlockBytes()
	if err != nil {
		return nil, err
	}
	return b, nil
}

func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) {
	filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)
	reader, err := newBlockfileReader(filePath)
	if err != nil {
		return nil, err
	}
	defer reader.close()
	b, err := reader.read(lp.offset, lp.bytesLength)
	if err != nil {
		return nil, err
	}
	return b, nil
}

Mari Wade's avatar
Mari Wade committed
579
//Get the current checkpoint information that is stored in the database
manish's avatar
manish committed
580
func (mgr *blockfileMgr) loadCurrentInfo() (*checkpointInfo, error) {
manish's avatar
manish committed
581
582
	var b []byte
	var err error
manish's avatar
manish committed
583
	if b, err = mgr.db.Get(blkMgrInfoKey); b == nil || err != nil {
manish's avatar
manish committed
584
585
586
587
588
589
		return nil, err
	}
	i := &checkpointInfo{}
	if err = i.unmarshal(b); err != nil {
		return nil, err
	}
manish's avatar
manish committed
590
	logger.Debugf("loaded checkpointInfo:%s", i)
manish's avatar
manish committed
591
592
593
	return i, nil
}

manish's avatar
manish committed
594
func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, sync bool) error {
manish's avatar
manish committed
595
596
597
598
	b, err := i.marshal()
	if err != nil {
		return err
	}
manish's avatar
manish committed
599
	if err = mgr.db.Put(blkMgrInfoKey, b, sync); err != nil {
manish's avatar
manish committed
600
601
602
603
604
		return err
	}
	return nil
}

605
606
// scanForLastCompleteBlock scan a given block file and detects the last offset in the file
// after which there may lie a block partially written (towards the end of the file in a crash scenario).
607
func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) ([]byte, int64, int, error) {
Mari Wade's avatar
Mari Wade committed
608
	//scan the passed file number suffix starting from the passed offset to find the last completed block
manish's avatar
manish committed
609
	numBlocks := 0
610
	var lastBlockBytes []byte
611
612
	blockStream, errOpen := newBlockfileStream(rootDir, fileNum, startingOffset)
	if errOpen != nil {
613
		return nil, 0, 0, errOpen
manish's avatar
manish committed
614
615
	}
	defer blockStream.close()
616
617
	var errRead error
	var blockBytes []byte
manish's avatar
manish committed
618
	for {
619
620
		blockBytes, errRead = blockStream.nextBlockBytes()
		if blockBytes == nil || errRead != nil {
manish's avatar
manish committed
621
622
			break
		}
623
		lastBlockBytes = blockBytes
manish's avatar
manish committed
624
625
		numBlocks++
	}
626
627
628
629
630
631
	if errRead == ErrUnexpectedEndOfBlockfile {
		logger.Debugf(`Error:%s
		The error may happen if a crash has happened during block appending.
		Resetting error to nil and returning current offset as a last complete block's end offset`, errRead)
		errRead = nil
	}
manish's avatar
manish committed
632
	logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentOffset)
633
	return lastBlockBytes, blockStream.currentOffset, numBlocks, errRead
manish's avatar
manish committed
634
635
636
}

// checkpointInfo
manish's avatar
manish committed
637
638
639
type checkpointInfo struct {
	latestFileChunkSuffixNum int
	latestFileChunksize      int
640
	isChainEmpty             bool
manish's avatar
manish committed
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
	lastBlockNumber          uint64
}

func (i *checkpointInfo) marshal() ([]byte, error) {
	buffer := proto.NewBuffer([]byte{})
	var err error
	if err = buffer.EncodeVarint(uint64(i.latestFileChunkSuffixNum)); err != nil {
		return nil, err
	}
	if err = buffer.EncodeVarint(uint64(i.latestFileChunksize)); err != nil {
		return nil, err
	}
	if err = buffer.EncodeVarint(i.lastBlockNumber); err != nil {
		return nil, err
	}
656
657
658
659
660
661
662
	var chainEmptyMarker uint64
	if i.isChainEmpty {
		chainEmptyMarker = 1
	}
	if err = buffer.EncodeVarint(chainEmptyMarker); err != nil {
		return nil, err
	}
manish's avatar
manish committed
663
664
665
666
667
668
	return buffer.Bytes(), nil
}

func (i *checkpointInfo) unmarshal(b []byte) error {
	buffer := proto.NewBuffer(b)
	var val uint64
669
	var chainEmptyMarker uint64
manish's avatar
manish committed
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
	var err error

	if val, err = buffer.DecodeVarint(); err != nil {
		return err
	}
	i.latestFileChunkSuffixNum = int(val)

	if val, err = buffer.DecodeVarint(); err != nil {
		return err
	}
	i.latestFileChunksize = int(val)

	if val, err = buffer.DecodeVarint(); err != nil {
		return err
	}
	i.lastBlockNumber = val
686
687
688
689
	if chainEmptyMarker, err = buffer.DecodeVarint(); err != nil {
		return err
	}
	i.isChainEmpty = chainEmptyMarker == 1
manish's avatar
manish committed
690
691
	return nil
}
manish's avatar
manish committed
692
693

func (i *checkpointInfo) String() string {
694
695
	return fmt.Sprintf("latestFileChunkSuffixNum=[%d], latestFileChunksize=[%d], isChainEmpty=[%t], lastBlockNumber=[%d]",
		i.latestFileChunkSuffixNum, i.latestFileChunksize, i.isChainEmpty, i.lastBlockNumber)
manish's avatar
manish committed
696
}