blockfile_mgr.go 21.4 KB
Newer Older
manish's avatar
manish committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
Copyright IBM Corp. 2016 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fsblkstorage

import (
	"fmt"
denyeart's avatar
denyeart committed
21
	"math"
manish's avatar
manish committed
22
	"sync"
manish's avatar
manish committed
23
24
25
	"sync/atomic"

	"github.com/golang/protobuf/proto"
26
27
28
	"github.com/hyperledger/fabric/common/ledger/blkstorage"
	"github.com/hyperledger/fabric/common/ledger/util"
	"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
29
30
31
	"github.com/hyperledger/fabric/protos/common"
	putil "github.com/hyperledger/fabric/protos/utils"
	"github.com/op/go-logging"
manish's avatar
manish committed
32
33
34
35
36
37
38
39
40
41
42
43
)

var logger = logging.MustGetLogger("kvledger")

const (
	blockfilePrefix = "blockfile_"
)

var (
	blkMgrInfoKey = []byte("blkMgrInfo")
)

manish's avatar
manish committed
44
45
46
47
48
type conf struct {
	blockfilesDir    string
	maxBlockfileSize int
}

manish's avatar
manish committed
49
50
51
type blockfileMgr struct {
	rootDir           string
	conf              *Conf
manish's avatar
manish committed
52
	db                *leveldbhelper.DBHandle
manish's avatar
manish committed
53
	index             index
manish's avatar
manish committed
54
	cpInfo            *checkpointInfo
manish's avatar
manish committed
55
	cpInfoCond        *sync.Cond
manish's avatar
manish committed
56
57
58
59
	currentFileWriter *blockfileWriter
	bcInfo            atomic.Value
}

Mari Wade's avatar
Mari Wade committed
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
Creates a new manager that will manage the files used for block persistence.
This manager manages the file system FS including
  -- the directory where the files are stored
  -- the individual files where the blocks are stored
  -- the checkpoint which tracks the latest file being persisted to
  -- the index which tracks what block and transaction is in what file
When a new blockfile manager is started (i.e. only on start-up), it checks
if this start-up is the first time the system is coming up or is this a restart
of the system.

The blockfile manager stores blocks of data into a file system.  That file
storage is done by creating sequentially numbered files of a configured size
i.e blockfile_000000, blockfile_000001, etc..

Each transcation in a block is stored with information about the number of
bytes in that transaction
 Adding txLoc [fileSuffixNum=0, offset=3, bytesLength=104] for tx [1:0] to index
 Adding txLoc [fileSuffixNum=0, offset=107, bytesLength=104] for tx [1:1] to index
Each block is stored with the total encoded length of that block as well as the
tx location offsets.

Remember that these steps are only done once at start-up of the system.
At start up a new manager:
  *) Checks if the directory for storing files exists, if not creates the dir
  *) Checks if the key value database exists, if not creates one
       (will create a db dir)
  *) Determines the checkpoint information (cpinfo) used for storage
		-- Loads from db if exist, if not instantiate a new cpinfo
		-- If cpinfo was loaded from db, compares to FS
		-- If cpinfo and file system are not in sync, syncs cpInfo from FS
  *) Starts a new file writer
		-- truncates file per cpinfo to remove any excess past last block
  *) Determines the index information used to find tx and blocks in
  the file blkstorage
		-- Instantiates a new blockIdxInfo
		-- Loads the index from the db if exists
		-- syncIndex comparing the last block indexed to what is in the FS
		-- If index and file system are not in sync, syncs index from the FS
  *)  Updates blockchain info used by the APIs
*/
manish's avatar
manish committed
101
func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, indexStore *leveldbhelper.DBHandle) *blockfileMgr {
Mari Wade's avatar
Mari Wade committed
102
	//Determine the root directory for the blockfile storage, if it does not exist create it
manish's avatar
manish committed
103
	rootDir := conf.getLedgerBlockDir(id)
manish's avatar
manish committed
104
105
106
107
	_, err := util.CreateDirIfMissing(rootDir)
	if err != nil {
		panic(fmt.Sprintf("Error: %s", err))
	}
Mari Wade's avatar
Mari Wade committed
108
	// Instantiate the manager, i.e. blockFileMgr structure
manish's avatar
manish committed
109
	mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore}
Mari Wade's avatar
Mari Wade committed
110
111
112
113

	// cp = checkpointInfo, retrieve from the database the file suffix or number of where blocks were stored.
	// It also retrieves the current size of that file and the last block number that was written to that file.
	// At init checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
manish's avatar
manish committed
114
115
116
117
	cpInfo, err := mgr.loadCurrentInfo()
	if err != nil {
		panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err))
	}
Mari Wade's avatar
Mari Wade committed
118
	if cpInfo == nil { //if no cpInfo stored in db initiate to zero
manish's avatar
manish committed
119
		cpInfo = &checkpointInfo{latestFileChunkSuffixNum: 0, latestFileChunksize: 0}
manish's avatar
manish committed
120
		err = mgr.saveCurrentInfo(cpInfo, true)
manish's avatar
manish committed
121
122
123
124
		if err != nil {
			panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
		}
	}
Mari Wade's avatar
Mari Wade committed
125
126
	//Verify that the checkpoint stored in db is accurate with what is actually stored in block file system
	// If not the same, sync the cpInfo and the file system
manish's avatar
manish committed
127
	syncCPInfoFromFS(rootDir, cpInfo)
Mari Wade's avatar
Mari Wade committed
128
129
	//Open a writer to the file identified by the number and truncate it to only contain the latest block
	// that was completely saved (file system, index, cpinfo, etc)
manish's avatar
manish committed
130
131
132
133
	currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
	if err != nil {
		panic(fmt.Sprintf("Could not open writer to current file: %s", err))
	}
Mari Wade's avatar
Mari Wade committed
134
	//Truncate the file to remove excess past last block
manish's avatar
manish committed
135
136
137
138
139
	err = currentFileWriter.truncateFile(cpInfo.latestFileChunksize)
	if err != nil {
		panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err))
	}

Mari Wade's avatar
Mari Wade committed
140
	// Create a new KeyValue store database handler for the blocks index in the keyvalue database
manish's avatar
manish committed
141
	mgr.index = newBlockIndex(indexConfig, indexStore)
Mari Wade's avatar
Mari Wade committed
142
143

	// Update the manager with the checkpoint info and the file writer
manish's avatar
manish committed
144
145
	mgr.cpInfo = cpInfo
	mgr.currentFileWriter = currentFileWriter
Mari Wade's avatar
Mari Wade committed
146
147
	// Create a checkpoint condition (event) variable, for the  goroutine waiting for
	// or announcing the occurrence of an event.
manish's avatar
manish committed
148
	mgr.cpInfoCond = sync.NewCond(&sync.Mutex{})
Mari Wade's avatar
Mari Wade committed
149
150
151

	// Verify that the index stored in db is accurate with what is actually stored in block file system
	// If not the same, sync the index and the file system
manish's avatar
manish committed
152
153
	mgr.syncIndex()

Mari Wade's avatar
Mari Wade committed
154
	// init BlockchainInfo for external API's
155
	bcInfo := &common.BlockchainInfo{
manish's avatar
manish committed
156
157
158
159
		Height:            0,
		CurrentBlockHash:  nil,
		PreviousBlockHash: nil}

Mari Wade's avatar
Mari Wade committed
160
	//If start up is a restart of an existing storage, update BlockchainInfo for external API's
manish's avatar
manish committed
161
	if cpInfo.lastBlockNumber > 0 {
162
		lastBlockHeader, err := mgr.retrieveBlockHeaderByNumber(cpInfo.lastBlockNumber)
manish's avatar
manish committed
163
		if err != nil {
164
			panic(fmt.Sprintf("Could not retrieve header of the last block form file: %s", err))
manish's avatar
manish committed
165
		}
166
167
		lastBlockHash := lastBlockHeader.Hash()
		previousBlockHash := lastBlockHeader.PreviousHash
168
		bcInfo = &common.BlockchainInfo{
manish's avatar
manish committed
169
170
171
172
173
			Height:            cpInfo.lastBlockNumber,
			CurrentBlockHash:  lastBlockHash,
			PreviousBlockHash: previousBlockHash}
	}
	mgr.bcInfo.Store(bcInfo)
Mari Wade's avatar
Mari Wade committed
174
	//return the new manager (blockfileMgr)
manish's avatar
manish committed
175
176
177
	return mgr
}

Mari Wade's avatar
Mari Wade committed
178
179
180
181
//cp = checkpointInfo, from the database gets the file suffix and the size of
// the file of where the last block was written.  Also retrieves contains the
// last block number that was written.  At init
//checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]
manish's avatar
manish committed
182
func syncCPInfoFromFS(rootDir string, cpInfo *checkpointInfo) {
manish's avatar
manish committed
183
	logger.Debugf("Starting checkpoint=%s", cpInfo)
Mari Wade's avatar
Mari Wade committed
184
	//Checks if the file suffix of where the last block was written exists
manish's avatar
manish committed
185
186
187
188
189
190
	filePath := deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)
	exists, size, err := util.FileExists(filePath)
	if err != nil {
		panic(fmt.Sprintf("Error in checking whether file [%s] exists: %s", filePath, err))
	}
	logger.Debugf("status of file [%s]: exists=[%t], size=[%d]", filePath, exists, size)
Mari Wade's avatar
Mari Wade committed
191
192
193
	//Test is !exists because when file number is first used the file does not exist yet
	//checks that the file exists and that the size of the file is what is stored in cpinfo
	//status of file [/tmp/tests/ledger/blkstorage/fsblkstorage/blocks/blockfile_000000]: exists=[false], size=[0]
manish's avatar
manish committed
194
195
196
197
	if !exists || int(size) == cpInfo.latestFileChunksize {
		// check point info is in sync with the file on disk
		return
	}
Mari Wade's avatar
Mari Wade committed
198
	//Scan the file system to verify that the checkpoint info stored in db is correct
manish's avatar
manish committed
199
200
	endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(
		rootDir, cpInfo.latestFileChunkSuffixNum, int64(cpInfo.latestFileChunksize))
manish's avatar
manish committed
201
202
203
	if err != nil {
		panic(fmt.Sprintf("Could not open current file for detecting last block in the file: %s", err))
	}
Mari Wade's avatar
Mari Wade committed
204
	//Updates the checkpoint info for the actual last block number stored and it's end location
manish's avatar
manish committed
205
206
207
208
209
	cpInfo.lastBlockNumber += uint64(numBlocks)
	cpInfo.latestFileChunksize = int(endOffsetLastBlock)
	logger.Debugf("Checkpoint after updates by scanning the last file segment:%s", cpInfo)
}

manish's avatar
manish committed
210
211
212
213
214
215
216
217
218
219
220
221
222
func deriveBlockfilePath(rootDir string, suffixNum int) string {
	return rootDir + "/" + blockfilePrefix + fmt.Sprintf("%06d", suffixNum)
}

func (mgr *blockfileMgr) open() error {
	return mgr.currentFileWriter.open()
}

func (mgr *blockfileMgr) close() {
	mgr.currentFileWriter.close()
}

func (mgr *blockfileMgr) moveToNextFile() {
manish's avatar
manish committed
223
	cpInfo := &checkpointInfo{
manish's avatar
manish committed
224
		latestFileChunkSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum + 1,
manish's avatar
manish committed
225
226
		latestFileChunksize:      0,
		lastBlockNumber:          mgr.cpInfo.lastBlockNumber}
manish's avatar
manish committed
227
228

	nextFileWriter, err := newBlockfileWriter(
manish's avatar
manish committed
229
		deriveBlockfilePath(mgr.rootDir, cpInfo.latestFileChunkSuffixNum))
manish's avatar
manish committed
230

manish's avatar
manish committed
231
232
233
234
	if err != nil {
		panic(fmt.Sprintf("Could not open writer to next file: %s", err))
	}
	mgr.currentFileWriter.close()
manish's avatar
manish committed
235
	err = mgr.saveCurrentInfo(cpInfo, true)
manish's avatar
manish committed
236
237
238
239
	if err != nil {
		panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
	}
	mgr.currentFileWriter = nextFileWriter
manish's avatar
manish committed
240
	mgr.updateCheckpoint(cpInfo)
manish's avatar
manish committed
241
242
}

243
244
func (mgr *blockfileMgr) addBlock(block *common.Block) error {
	blockBytes, info, err := serializeBlock(block)
manish's avatar
manish committed
245
246
247
	if err != nil {
		return fmt.Errorf("Error while serializing block: %s", err)
	}
248
	blockHash := block.Header.Hash()
Mari Wade's avatar
Mari Wade committed
249
	//Get the location / offset where each transaction starts in the block and where the block ends
250
	txOffsets := info.txOffsets
manish's avatar
manish committed
251
	currentOffset := mgr.cpInfo.latestFileChunksize
manish's avatar
manish committed
252
253
254
	if err != nil {
		return fmt.Errorf("Error while serializing block: %s", err)
	}
manish's avatar
manish committed
255
256
257
258
	blockBytesLen := len(blockBytes)
	blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))
	totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)

Mari Wade's avatar
Mari Wade committed
259
260
	//Determine if we need to start a new file since the size of this block
	//exceeds the amount of space left in the current file
manish's avatar
manish committed
261
	if currentOffset+totalBytesToAppend > mgr.conf.maxBlockfileSize {
manish's avatar
manish committed
262
263
264
		mgr.moveToNextFile()
		currentOffset = 0
	}
Mari Wade's avatar
Mari Wade committed
265
	//append blockBytesEncodedLen to the file
manish's avatar
manish committed
266
267
	err = mgr.currentFileWriter.append(blockBytesEncodedLen, false)
	if err == nil {
Mari Wade's avatar
Mari Wade committed
268
		//append the actual block bytes to the file
manish's avatar
manish committed
269
		err = mgr.currentFileWriter.append(blockBytes, true)
manish's avatar
manish committed
270
271
	}
	if err != nil {
manish's avatar
manish committed
272
273
274
		truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
		if truncateErr != nil {
			panic(fmt.Sprintf("Could not truncate current file to known size after an error during block append: %s", err))
manish's avatar
manish committed
275
276
277
278
		}
		return fmt.Errorf("Error while appending block to file: %s", err)
	}

Mari Wade's avatar
Mari Wade committed
279
	//Update the checkpoint info with the results of adding the new block
manish's avatar
manish committed
280
281
282
283
	currentCPInfo := mgr.cpInfo
	newCPInfo := &checkpointInfo{
		latestFileChunkSuffixNum: currentCPInfo.latestFileChunkSuffixNum,
		latestFileChunksize:      currentCPInfo.latestFileChunksize + totalBytesToAppend,
284
		lastBlockNumber:          block.Header.Number}
Mari Wade's avatar
Mari Wade committed
285
	//save the checkpoint information in the database
manish's avatar
manish committed
286
287
	if err = mgr.saveCurrentInfo(newCPInfo, false); err != nil {
		truncateErr := mgr.currentFileWriter.truncateFile(currentCPInfo.latestFileChunksize)
manish's avatar
manish committed
288
289
		if truncateErr != nil {
			panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err))
manish's avatar
manish committed
290
291
292
		}
		return fmt.Errorf("Error while saving current file info to db: %s", err)
	}
manish's avatar
manish committed
293

Mari Wade's avatar
Mari Wade committed
294
	//Index block file location pointer updated with file suffex and offset for the new block
manish's avatar
manish committed
295
	blockFLP := &fileLocPointer{fileSuffixNum: newCPInfo.latestFileChunkSuffixNum}
manish's avatar
manish committed
296
	blockFLP.offset = currentOffset
manish's avatar
manish committed
297
	// shift the txoffset because we prepend length of bytes before block bytes
298
	for _, txOffset := range txOffsets {
299
		txOffset.loc.offset += len(blockBytesEncodedLen)
manish's avatar
manish committed
300
	}
Mari Wade's avatar
Mari Wade committed
301
	//save the index in the database
manish's avatar
manish committed
302
	mgr.index.indexBlock(&blockIdxInfo{
303
		blockNum: block.Header.Number, blockHash: blockHash,
manish's avatar
manish committed
304
		flp: blockFLP, txOffsets: txOffsets})
manish's avatar
manish committed
305

Mari Wade's avatar
Mari Wade committed
306
	//update the checkpoint info (for storage) and the blockchain info (for APIs) in the manager
manish's avatar
manish committed
307
	mgr.updateCheckpoint(newCPInfo)
manish's avatar
manish committed
308
309
310
311
	mgr.updateBlockchainInfo(blockHash, block)
	return nil
}

manish's avatar
manish committed
312
313
314
func (mgr *blockfileMgr) syncIndex() error {
	var lastBlockIndexed uint64
	var err error
Mari Wade's avatar
Mari Wade committed
315
	//from the database, get the last block that was indexed
manish's avatar
manish committed
316
317
318
	if lastBlockIndexed, err = mgr.index.getLastBlockIndexed(); err != nil {
		return err
	}
Mari Wade's avatar
Mari Wade committed
319
	//initialize index to file number:zero, offset:zero and block:1
manish's avatar
manish committed
320
321
322
	startFileNum := 0
	startOffset := 0
	blockNum := uint64(1)
Mari Wade's avatar
Mari Wade committed
323
	//get the last file that blocks were added to using the checkpoint info
manish's avatar
manish committed
324
	endFileNum := mgr.cpInfo.latestFileChunkSuffixNum
Mari Wade's avatar
Mari Wade committed
325
	//if the index stored in the db has value, update the index information with those values
manish's avatar
manish committed
326
327
328
329
330
331
332
333
334
335
	if lastBlockIndexed != 0 {
		var flp *fileLocPointer
		if flp, err = mgr.index.getBlockLocByBlockNum(lastBlockIndexed); err != nil {
			return err
		}
		startFileNum = flp.fileSuffixNum
		startOffset = flp.locPointer.offset
		blockNum = lastBlockIndexed
	}

Mari Wade's avatar
Mari Wade committed
336
	//open a blockstream to the file location that was stored in the index
manish's avatar
manish committed
337
338
339
340
341
342
343
	var stream *blockStream
	if stream, err = newBlockStream(mgr.rootDir, startFileNum, int64(startOffset), endFileNum); err != nil {
		return err
	}
	var blockBytes []byte
	var blockPlacementInfo *blockPlacementInfo

Mari Wade's avatar
Mari Wade committed
344
345
	//Should be at the last block, but go ahead and loop looking for next blockBytes
	//If there is another block, add it to the index
manish's avatar
manish committed
346
347
348
349
350
351
352
	for {
		if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
			return err
		}
		if blockBytes == nil {
			break
		}
353
354
		info, err := extractSerializedBlockInfo(blockBytes)
		if err != nil {
manish's avatar
manish committed
355
356
			return err
		}
357
		for _, offset := range info.txOffsets {
358
			offset.loc.offset += int(blockPlacementInfo.blockBytesOffset)
manish's avatar
manish committed
359
		}
Mari Wade's avatar
Mari Wade committed
360
		//Update the blockIndexInfo with what was actually stored in file system
manish's avatar
manish committed
361
		blockIdxInfo := &blockIdxInfo{}
362
363
		blockIdxInfo.blockHash = info.blockHeader.Hash()
		blockIdxInfo.blockNum = info.blockHeader.Number
manish's avatar
manish committed
364
365
		blockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum,
			locPointer: locPointer{offset: int(blockPlacementInfo.blockStartOffset)}}
366
		blockIdxInfo.txOffsets = info.txOffsets
manish's avatar
manish committed
367
368
369
370
371
372
373
374
		if err = mgr.index.indexBlock(blockIdxInfo); err != nil {
			return err
		}
		blockNum++
	}
	return nil
}

375
376
func (mgr *blockfileMgr) getBlockchainInfo() *common.BlockchainInfo {
	return mgr.bcInfo.Load().(*common.BlockchainInfo)
manish's avatar
manish committed
377
378
}

manish's avatar
manish committed
379
380
381
382
383
384
385
386
func (mgr *blockfileMgr) updateCheckpoint(cpInfo *checkpointInfo) {
	mgr.cpInfoCond.L.Lock()
	defer mgr.cpInfoCond.L.Unlock()
	mgr.cpInfo = cpInfo
	logger.Debugf("Broadcasting about update checkpointInfo: %s", cpInfo)
	mgr.cpInfoCond.Broadcast()
}

387
func (mgr *blockfileMgr) updateBlockchainInfo(latestBlockHash []byte, latestBlock *common.Block) {
manish's avatar
manish committed
388
	currentBCInfo := mgr.getBlockchainInfo()
389
	newBCInfo := &common.BlockchainInfo{
manish's avatar
manish committed
390
391
		Height:            currentBCInfo.Height + 1,
		CurrentBlockHash:  latestBlockHash,
392
		PreviousBlockHash: latestBlock.Header.PreviousHash}
manish's avatar
manish committed
393

manish's avatar
manish committed
394
395
396
	mgr.bcInfo.Store(newBCInfo)
}

397
func (mgr *blockfileMgr) retrieveBlockByHash(blockHash []byte) (*common.Block, error) {
manish's avatar
manish committed
398
399
400
401
402
403
404
405
	logger.Debugf("retrieveBlockByHash() - blockHash = [%#v]", blockHash)
	loc, err := mgr.index.getBlockLocByHash(blockHash)
	if err != nil {
		return nil, err
	}
	return mgr.fetchBlock(loc)
}

406
func (mgr *blockfileMgr) retrieveBlockByNumber(blockNum uint64) (*common.Block, error) {
manish's avatar
manish committed
407
	logger.Debugf("retrieveBlockByNumber() - blockNum = [%d]", blockNum)
denyeart's avatar
denyeart committed
408
409
410
411
412
413

	// interpret math.MaxUint64 as a request for last block
	if blockNum == math.MaxUint64 {
		blockNum = mgr.getBlockchainInfo().Height
	}

manish's avatar
manish committed
414
415
416
417
418
419
420
	loc, err := mgr.index.getBlockLocByBlockNum(blockNum)
	if err != nil {
		return nil, err
	}
	return mgr.fetchBlock(loc)
}

421
422
func (mgr *blockfileMgr) retrieveBlockHeaderByNumber(blockNum uint64) (*common.BlockHeader, error) {
	logger.Debugf("retrieveBlockHeaderByNumber() - blockNum = [%d]", blockNum)
manish's avatar
manish committed
423
424
425
426
	loc, err := mgr.index.getBlockLocByBlockNum(blockNum)
	if err != nil {
		return nil, err
	}
427
428
429
430
431
432
433
434
435
	blockBytes, err := mgr.fetchBlockBytes(loc)
	if err != nil {
		return nil, err
	}
	info, err := extractSerializedBlockInfo(blockBytes)
	if err != nil {
		return nil, err
	}
	return info.blockHeader, nil
manish's avatar
manish committed
436
437
}

manish's avatar
manish committed
438
func (mgr *blockfileMgr) retrieveBlocks(startNum uint64) (*blocksItr, error) {
manish's avatar
manish committed
439
	return newBlockItr(mgr, startNum), nil
manish's avatar
manish committed
440
441
}

442
func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*common.Envelope, error) {
manish's avatar
manish committed
443
444
445
446
447
	logger.Debugf("retrieveTransactionByID() - txId = [%s]", txID)
	loc, err := mgr.index.getTxLoc(txID)
	if err != nil {
		return nil, err
	}
448
	return mgr.fetchTransactionEnvelope(loc)
manish's avatar
manish committed
449
450
}

451
func (mgr *blockfileMgr) retrieveTransactionForBlockNumTranNum(blockNum uint64, tranNum uint64) (*common.Envelope, error) {
452
453
454
455
456
	logger.Debugf("retrieveTransactionForBlockNumTranNum() - blockNum = [%d], tranNum = [%d]", blockNum, tranNum)
	loc, err := mgr.index.getTXLocForBlockNumTranNum(blockNum, tranNum)
	if err != nil {
		return nil, err
	}
457
	return mgr.fetchTransactionEnvelope(loc)
458
459
}

460
461
func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*common.Block, error) {
	blockBytes, err := mgr.fetchBlockBytes(lp)
manish's avatar
manish committed
462
463
464
	if err != nil {
		return nil, err
	}
465
	block, err := deserializeBlock(blockBytes)
manish's avatar
manish committed
466
467
468
469
470
471
	if err != nil {
		return nil, err
	}
	return block, nil
}

472
func (mgr *blockfileMgr) fetchTransactionEnvelope(lp *fileLocPointer) (*common.Envelope, error) {
473
474
475
	var err error
	var txEnvelopeBytes []byte
	if txEnvelopeBytes, err = mgr.fetchRawBytes(lp); err != nil {
manish's avatar
manish committed
476
477
		return nil, err
	}
478
	_, n := proto.DecodeVarint(txEnvelopeBytes)
479
	return putil.GetEnvelopeFromBlock(txEnvelopeBytes[n:])
manish's avatar
manish committed
480
481
482
}

func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) {
manish's avatar
manish committed
483
	stream, err := newBlockfileStream(mgr.rootDir, lp.fileSuffixNum, int64(lp.offset))
manish's avatar
manish committed
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
	if err != nil {
		return nil, err
	}
	defer stream.close()
	b, err := stream.nextBlockBytes()
	if err != nil {
		return nil, err
	}
	return b, nil
}

func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) {
	filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)
	reader, err := newBlockfileReader(filePath)
	if err != nil {
		return nil, err
	}
	defer reader.close()
	b, err := reader.read(lp.offset, lp.bytesLength)
	if err != nil {
		return nil, err
	}
	return b, nil
}

Mari Wade's avatar
Mari Wade committed
509
//Get the current checkpoint information that is stored in the database
manish's avatar
manish committed
510
func (mgr *blockfileMgr) loadCurrentInfo() (*checkpointInfo, error) {
manish's avatar
manish committed
511
512
	var b []byte
	var err error
manish's avatar
manish committed
513
	if b, err = mgr.db.Get(blkMgrInfoKey); b == nil || err != nil {
manish's avatar
manish committed
514
515
516
517
518
519
		return nil, err
	}
	i := &checkpointInfo{}
	if err = i.unmarshal(b); err != nil {
		return nil, err
	}
manish's avatar
manish committed
520
	logger.Debugf("loaded checkpointInfo:%s", i)
manish's avatar
manish committed
521
522
523
	return i, nil
}

manish's avatar
manish committed
524
func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, sync bool) error {
manish's avatar
manish committed
525
526
527
528
	b, err := i.marshal()
	if err != nil {
		return err
	}
manish's avatar
manish committed
529
	if err = mgr.db.Put(blkMgrInfoKey, b, sync); err != nil {
manish's avatar
manish committed
530
531
532
533
534
		return err
	}
	return nil
}

535
536
// scanForLastCompleteBlock scan a given block file and detects the last offset in the file
// after which there may lie a block partially written (towards the end of the file in a crash scenario).
manish's avatar
manish committed
537
func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) (int64, int, error) {
Mari Wade's avatar
Mari Wade committed
538
	//scan the passed file number suffix starting from the passed offset to find the last completed block
manish's avatar
manish committed
539
	numBlocks := 0
540
541
542
	blockStream, errOpen := newBlockfileStream(rootDir, fileNum, startingOffset)
	if errOpen != nil {
		return 0, 0, errOpen
manish's avatar
manish committed
543
544
	}
	defer blockStream.close()
545
546
	var errRead error
	var blockBytes []byte
manish's avatar
manish committed
547
	for {
548
549
		blockBytes, errRead = blockStream.nextBlockBytes()
		if blockBytes == nil || errRead != nil {
manish's avatar
manish committed
550
551
552
553
			break
		}
		numBlocks++
	}
554
555
556
557
558
559
	if errRead == ErrUnexpectedEndOfBlockfile {
		logger.Debugf(`Error:%s
		The error may happen if a crash has happened during block appending.
		Resetting error to nil and returning current offset as a last complete block's end offset`, errRead)
		errRead = nil
	}
manish's avatar
manish committed
560
	logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentOffset)
561
	return blockStream.currentOffset, numBlocks, errRead
manish's avatar
manish committed
562
563
564
}

// checkpointInfo
manish's avatar
manish committed
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
type checkpointInfo struct {
	latestFileChunkSuffixNum int
	latestFileChunksize      int
	lastBlockNumber          uint64
}

func (i *checkpointInfo) marshal() ([]byte, error) {
	buffer := proto.NewBuffer([]byte{})
	var err error
	if err = buffer.EncodeVarint(uint64(i.latestFileChunkSuffixNum)); err != nil {
		return nil, err
	}
	if err = buffer.EncodeVarint(uint64(i.latestFileChunksize)); err != nil {
		return nil, err
	}
	if err = buffer.EncodeVarint(i.lastBlockNumber); err != nil {
		return nil, err
	}
	return buffer.Bytes(), nil
}

func (i *checkpointInfo) unmarshal(b []byte) error {
	buffer := proto.NewBuffer(b)
	var val uint64
	var err error

	if val, err = buffer.DecodeVarint(); err != nil {
		return err
	}
	i.latestFileChunkSuffixNum = int(val)

	if val, err = buffer.DecodeVarint(); err != nil {
		return err
	}
	i.latestFileChunksize = int(val)

	if val, err = buffer.DecodeVarint(); err != nil {
		return err
	}
	i.lastBlockNumber = val

	return nil
}
manish's avatar
manish committed
608
609
610
611
612

func (i *checkpointInfo) String() string {
	return fmt.Sprintf("latestFileChunkSuffixNum=[%d], latestFileChunksize=[%d], lastBlockNumber=[%d]",
		i.latestFileChunkSuffixNum, i.latestFileChunksize, i.lastBlockNumber)
}