blockindex.go 12.5 KB
Newer Older
manish's avatar
manish committed
1
/*
2
Copyright IBM Corp. All Rights Reserved.
manish's avatar
manish committed
3

4
SPDX-License-Identifier: Apache-2.0
manish's avatar
manish committed
5
6
7
8
9
*/

package fsblkstorage

import (
10
	"bytes"
manish's avatar
manish committed
11
12
13
	"fmt"

	"github.com/golang/protobuf/proto"
14
15
16
	"github.com/hyperledger/fabric/common/ledger/blkstorage"
	"github.com/hyperledger/fabric/common/ledger/util"
	"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
17
18
19
	ledgerUtil "github.com/hyperledger/fabric/core/ledger/util"
	"github.com/hyperledger/fabric/protos/common"
	"github.com/hyperledger/fabric/protos/peer"
20
	"github.com/pkg/errors"
manish's avatar
manish committed
21
22
)

manish's avatar
manish committed
23
const (
24
25
26
27
28
29
30
	blockNumIdxKeyPrefix           = 'n'
	blockHashIdxKeyPrefix          = 'h'
	txIDIdxKeyPrefix               = 't'
	blockNumTranNumIdxKeyPrefix    = 'a'
	blockTxIDIdxKeyPrefix          = 'b'
	txValidationResultIdxKeyPrefix = 'v'
	indexCheckpointKeyStr          = "indexCheckpointKey"
manish's avatar
manish committed
31
32
33
)

var indexCheckpointKey = []byte(indexCheckpointKeyStr)
34
var errIndexEmpty = errors.New("NoBlockIndexed")
manish's avatar
manish committed
35
36
37
38
39
40
41

type index interface {
	getLastBlockIndexed() (uint64, error)
	indexBlock(blockIdxInfo *blockIdxInfo) error
	getBlockLocByHash(blockHash []byte) (*fileLocPointer, error)
	getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error)
	getTxLoc(txID string) (*fileLocPointer, error)
42
	getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error)
43
	getBlockLocByTxID(txID string) (*fileLocPointer, error)
44
	getTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error)
manish's avatar
manish committed
45
46
47
48
49
50
}

type blockIdxInfo struct {
	blockNum  uint64
	blockHash []byte
	flp       *fileLocPointer
51
	txOffsets []*txindexInfo
52
	metadata  *common.BlockMetadata
manish's avatar
manish committed
53
54
}

manish's avatar
manish committed
55
type blockIndex struct {
manish's avatar
manish committed
56
	indexItemsMap map[blkstorage.IndexableAttr]bool
manish's avatar
manish committed
57
	db            *leveldbhelper.DBHandle
manish's avatar
manish committed
58
59
}

60
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) (*blockIndex, error) {
manish's avatar
manish committed
61
62
63
64
65
66
	indexItems := indexConfig.AttrsToIndex
	logger.Debugf("newBlockIndex() - indexItems:[%s]", indexItems)
	indexItemsMap := make(map[blkstorage.IndexableAttr]bool)
	for _, indexItem := range indexItems {
		indexItemsMap[indexItem] = true
	}
67
68
69
70
71
	// This dependency is needed because the index 'IndexableAttrTxID' is used for detecting the duplicate txid
	// and the results are reused in the other two indexes. Ideally, all three index should be merged into one
	// for efficiency purpose - [FAB-10587]
	if (indexItemsMap[blkstorage.IndexableAttrTxValidationCode] || indexItemsMap[blkstorage.IndexableAttrBlockTxID]) &&
		!indexItemsMap[blkstorage.IndexableAttrTxID] {
72
		return nil, errors.Errorf("dependent index [%s] is not enabled for [%s] or [%s]",
73
74
75
			blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrTxValidationCode, blkstorage.IndexableAttrBlockTxID)
	}
	return &blockIndex{indexItemsMap, db}, nil
manish's avatar
manish committed
76
77
78
79
80
}

func (index *blockIndex) getLastBlockIndexed() (uint64, error) {
	var blockNumBytes []byte
	var err error
manish's avatar
manish committed
81
	if blockNumBytes, err = index.db.Get(indexCheckpointKey); err != nil {
82
		return 0, err
manish's avatar
manish committed
83
	}
84
85
86
	if blockNumBytes == nil {
		return 0, errIndexEmpty
	}
manish's avatar
manish committed
87
	return decodeBlockNum(blockNumBytes), nil
manish's avatar
manish committed
88
89
}

manish's avatar
manish committed
90
func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
91
	// do not index anything
manish's avatar
manish committed
92
93
94
95
	if len(index.indexItemsMap) == 0 {
		logger.Debug("Not indexing block... as nothing to index")
		return nil
	}
manish's avatar
manish committed
96
97
98
	logger.Debugf("Indexing block [%s]", blockIdxInfo)
	flp := blockIdxInfo.flp
	txOffsets := blockIdxInfo.txOffsets
99
	txsfltr := ledgerUtil.TxValidationFlags(blockIdxInfo.metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
manish's avatar
manish committed
100
	batch := leveldbhelper.NewUpdateBatch()
manish's avatar
manish committed
101
102
103
104
	flpBytes, err := flp.marshal()
	if err != nil {
		return err
	}
manish's avatar
manish committed
105

106
	//Index1
manish's avatar
manish committed
107
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok {
manish's avatar
manish committed
108
		batch.Put(constructBlockHashKey(blockIdxInfo.blockHash), flpBytes)
manish's avatar
manish committed
109
110
	}

111
	//Index2
manish's avatar
manish committed
112
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok {
manish's avatar
manish committed
113
		batch.Put(constructBlockNumKey(blockIdxInfo.blockNum), flpBytes)
manish's avatar
manish committed
114
115
	}

116
	//Index3 Used to find a transaction by it's transaction id
manish's avatar
manish committed
117
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok {
118
		if err = index.markDuplicateTxids(blockIdxInfo); err != nil {
119
120
			logger.Errorf("error detecting duplicate txids: %s", err)
			return errors.WithMessage(err, "error detecting duplicate txids")
121
		}
122
		for _, txoffset := range txOffsets {
123
124
125
126
			if txoffset.isDuplicate { // do not overwrite txid entry in the index - FAB-8557
				logger.Debugf("txid [%s] is a duplicate of a previous tx. Not indexing in txid-index", txoffset.txID)
				continue
			}
127
			txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
128
			logger.Debugf("Adding txLoc [%s] for tx ID: [%s] to txid-index", txFlp, txoffset.txID)
manish's avatar
manish committed
129
130
131
132
			txFlpBytes, marshalErr := txFlp.marshal()
			if marshalErr != nil {
				return marshalErr
			}
133
			batch.Put(constructTxIDKey(txoffset.txID), txFlpBytes)
manish's avatar
manish committed
134
135
		}
	}
136
137
138
139
140

	//Index4 - Store BlockNumTranNum will be used to query history data
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; ok {
		for txIterator, txoffset := range txOffsets {
			txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
141
			logger.Debugf("Adding txLoc [%s] for tx number:[%d] ID: [%s] to blockNumTranNum index", txFlp, txIterator, txoffset.txID)
142
143
144
145
			txFlpBytes, marshalErr := txFlp.marshal()
			if marshalErr != nil {
				return marshalErr
			}
146
			batch.Put(constructBlockNumTranNumKey(blockIdxInfo.blockNum, uint64(txIterator)), txFlpBytes)
147
148
149
		}
	}

150
151
152
	// Index5 - Store BlockNumber will be used to find block by transaction id
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockTxID]; ok {
		for _, txoffset := range txOffsets {
153
154
155
			if txoffset.isDuplicate { // do not overwrite txid entry in the index - FAB-8557
				continue
			}
156
157
158
159
			batch.Put(constructBlockTxIDKey(txoffset.txID), flpBytes)
		}
	}

160
161
162
	// Index6 - Store transaction validation result by transaction id
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxValidationCode]; ok {
		for idx, txoffset := range txOffsets {
163
164
165
			if txoffset.isDuplicate { // do not overwrite txid entry in the index - FAB-8557
				continue
			}
166
167
168
169
			batch.Put(constructTxValidationCodeIDKey(txoffset.txID), []byte{byte(txsfltr.Flag(idx))})
		}
	}

manish's avatar
manish committed
170
	batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
171
172
	// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
	if err := index.db.WriteBatch(batch, true); err != nil {
manish's avatar
manish committed
173
174
175
176
177
		return err
	}
	return nil
}

178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
func (index *blockIndex) markDuplicateTxids(blockIdxInfo *blockIdxInfo) error {
	uniqueTxids := make(map[string]bool)
	for _, txIdxInfo := range blockIdxInfo.txOffsets {
		txid := txIdxInfo.txID
		if uniqueTxids[txid] { // txid is duplicate of a previous tx in the block
			txIdxInfo.isDuplicate = true
			continue
		}

		loc, err := index.getTxLoc(txid)
		if loc != nil { // txid is duplicate of a previous tx in the index
			txIdxInfo.isDuplicate = true
			continue
		}
		if err != blkstorage.ErrNotFoundInIndex {
			return err
		}
		uniqueTxids[txid] = true
	}
	return nil
}

manish's avatar
manish committed
200
func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) {
manish's avatar
manish committed
201
202
203
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
manish's avatar
manish committed
204
	b, err := index.db.Get(constructBlockHashKey(blockHash))
manish's avatar
manish committed
205
206
207
	if err != nil {
		return nil, err
	}
manish's avatar
manish committed
208
	if b == nil {
manish's avatar
manish committed
209
		return nil, blkstorage.ErrNotFoundInIndex
manish's avatar
manish committed
210
	}
manish's avatar
manish committed
211
212
213
214
215
216
	blkLoc := &fileLocPointer{}
	blkLoc.unmarshal(b)
	return blkLoc, nil
}

func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) {
manish's avatar
manish committed
217
218
219
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
manish's avatar
manish committed
220
	b, err := index.db.Get(constructBlockNumKey(blockNum))
manish's avatar
manish committed
221
222
223
	if err != nil {
		return nil, err
	}
manish's avatar
manish committed
224
	if b == nil {
manish's avatar
manish committed
225
		return nil, blkstorage.ErrNotFoundInIndex
manish's avatar
manish committed
226
	}
manish's avatar
manish committed
227
228
229
230
231
232
	blkLoc := &fileLocPointer{}
	blkLoc.unmarshal(b)
	return blkLoc, nil
}

func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) {
manish's avatar
manish committed
233
234
235
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
manish's avatar
manish committed
236
	b, err := index.db.Get(constructTxIDKey(txID))
manish's avatar
manish committed
237
238
239
	if err != nil {
		return nil, err
	}
manish's avatar
manish committed
240
	if b == nil {
manish's avatar
manish committed
241
		return nil, blkstorage.ErrNotFoundInIndex
manish's avatar
manish committed
242
	}
manish's avatar
manish committed
243
244
245
246
247
	txFLP := &fileLocPointer{}
	txFLP.unmarshal(b)
	return txFLP, nil
}

248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
func (index *blockIndex) getBlockLocByTxID(txID string) (*fileLocPointer, error) {
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockTxID]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
	b, err := index.db.Get(constructBlockTxIDKey(txID))
	if err != nil {
		return nil, err
	}
	if b == nil {
		return nil, blkstorage.ErrNotFoundInIndex
	}
	txFLP := &fileLocPointer{}
	txFLP.unmarshal(b)
	return txFLP, nil
}

264
func (index *blockIndex) getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) {
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
	b, err := index.db.Get(constructBlockNumTranNumKey(blockNum, tranNum))
	if err != nil {
		return nil, err
	}
	if b == nil {
		return nil, blkstorage.ErrNotFoundInIndex
	}
	txFLP := &fileLocPointer{}
	txFLP.unmarshal(b)
	return txFLP, nil
}

280
281
282
283
284
285
286
287
288
289
func (index *blockIndex) getTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) {
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxValidationCode]; !ok {
		return peer.TxValidationCode(-1), blkstorage.ErrAttrNotIndexed
	}

	raw, err := index.db.Get(constructTxValidationCodeIDKey(txID))

	if err != nil {
		return peer.TxValidationCode(-1), err
	} else if raw == nil {
290
		return peer.TxValidationCode(-1), blkstorage.ErrNotFoundInIndex
291
	} else if len(raw) != 1 {
292
		return peer.TxValidationCode(-1), errors.New("invalid value in indexItems")
293
294
295
296
297
298
299
	}

	result := peer.TxValidationCode(int32(raw[0]))

	return result, nil
}

manish's avatar
manish committed
300
func constructBlockNumKey(blockNum uint64) []byte {
manish's avatar
manish committed
301
	blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
manish's avatar
manish committed
302
	return append([]byte{blockNumIdxKeyPrefix}, blkNumBytes...)
manish's avatar
manish committed
303
304
}

manish's avatar
manish committed
305
306
func constructBlockHashKey(blockHash []byte) []byte {
	return append([]byte{blockHashIdxKeyPrefix}, blockHash...)
manish's avatar
manish committed
307
308
}

manish's avatar
manish committed
309
310
func constructTxIDKey(txID string) []byte {
	return append([]byte{txIDIdxKeyPrefix}, []byte(txID)...)
manish's avatar
manish committed
311
312
}

313
314
315
316
func constructBlockTxIDKey(txID string) []byte {
	return append([]byte{blockTxIDIdxKeyPrefix}, []byte(txID)...)
}

317
318
319
320
func constructTxValidationCodeIDKey(txID string) []byte {
	return append([]byte{txValidationResultIdxKeyPrefix}, []byte(txID)...)
}

321
322
323
324
325
326
327
func constructBlockNumTranNumKey(blockNum uint64, txNum uint64) []byte {
	blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
	tranNumBytes := util.EncodeOrderPreservingVarUint64(txNum)
	key := append(blkNumBytes, tranNumBytes...)
	return append([]byte{blockNumTranNumIdxKeyPrefix}, key...)
}

manish's avatar
manish committed
328
329
330
331
332
333
334
335
336
func encodeBlockNum(blockNum uint64) []byte {
	return proto.EncodeVarint(blockNum)
}

func decodeBlockNum(blockNumBytes []byte) uint64 {
	blockNum, _ := proto.DecodeVarint(blockNumBytes)
	return blockNum
}

manish's avatar
manish committed
337
338
339
340
341
342
343
344
345
346
type locPointer struct {
	offset      int
	bytesLength int
}

func (lp *locPointer) String() string {
	return fmt.Sprintf("offset=%d, bytesLength=%d",
		lp.offset, lp.bytesLength)
}

manish's avatar
manish committed
347
// fileLocPointer
manish's avatar
manish committed
348
349
350
351
352
type fileLocPointer struct {
	fileSuffixNum int
	locPointer
}

353
func newFileLocationPointer(fileSuffixNum int, beginningOffset int, relativeLP *locPointer) *fileLocPointer {
manish's avatar
manish committed
354
	flp := &fileLocPointer{fileSuffixNum: fileSuffixNum}
355
	flp.offset = beginningOffset + relativeLP.offset
manish's avatar
manish committed
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
	flp.bytesLength = relativeLP.bytesLength
	return flp
}

func (flp *fileLocPointer) marshal() ([]byte, error) {
	buffer := proto.NewBuffer([]byte{})
	e := buffer.EncodeVarint(uint64(flp.fileSuffixNum))
	if e != nil {
		return nil, e
	}
	e = buffer.EncodeVarint(uint64(flp.offset))
	if e != nil {
		return nil, e
	}
	e = buffer.EncodeVarint(uint64(flp.bytesLength))
	if e != nil {
		return nil, e
	}
	return buffer.Bytes(), nil
}

func (flp *fileLocPointer) unmarshal(b []byte) error {
	buffer := proto.NewBuffer(b)
	i, e := buffer.DecodeVarint()
	if e != nil {
		return e
	}
	flp.fileSuffixNum = int(i)

	i, e = buffer.DecodeVarint()
	if e != nil {
		return e
	}
	flp.offset = int(i)
	i, e = buffer.DecodeVarint()
	if e != nil {
		return e
	}
	flp.bytesLength = int(i)
	return nil
}

func (flp *fileLocPointer) String() string {
	return fmt.Sprintf("fileSuffixNum=%d, %s", flp.fileSuffixNum, flp.locPointer.String())
}
manish's avatar
manish committed
401
402

func (blockIdxInfo *blockIdxInfo) String() string {
403
404
405
406
407
408
409
410
411
412
413
414

	var buffer bytes.Buffer
	for _, txOffset := range blockIdxInfo.txOffsets {
		buffer.WriteString("txId=")
		buffer.WriteString(txOffset.txID)
		buffer.WriteString(" locPointer=")
		buffer.WriteString(txOffset.loc.String())
		buffer.WriteString("\n")
	}
	txOffsetsString := buffer.String()

	return fmt.Sprintf("blockNum=%d, blockHash=%#v txOffsets=\n%s", blockIdxInfo.blockNum, blockIdxInfo.blockHash, txOffsetsString)
manish's avatar
manish committed
415
}