blockindex.go 11.2 KB
Newer Older
manish's avatar
manish committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
Copyright IBM Corp. 2016 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fsblkstorage

import (
20
	"bytes"
21
	"errors"
manish's avatar
manish committed
22
23
24
	"fmt"

	"github.com/golang/protobuf/proto"
25
26
27
	"github.com/hyperledger/fabric/common/ledger/blkstorage"
	"github.com/hyperledger/fabric/common/ledger/util"
	"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
28
29
30
	ledgerUtil "github.com/hyperledger/fabric/core/ledger/util"
	"github.com/hyperledger/fabric/protos/common"
	"github.com/hyperledger/fabric/protos/peer"
manish's avatar
manish committed
31
32
)

manish's avatar
manish committed
33
const (
34
35
36
37
38
39
40
	blockNumIdxKeyPrefix           = 'n'
	blockHashIdxKeyPrefix          = 'h'
	txIDIdxKeyPrefix               = 't'
	blockNumTranNumIdxKeyPrefix    = 'a'
	blockTxIDIdxKeyPrefix          = 'b'
	txValidationResultIdxKeyPrefix = 'v'
	indexCheckpointKeyStr          = "indexCheckpointKey"
manish's avatar
manish committed
41
42
43
)

var indexCheckpointKey = []byte(indexCheckpointKeyStr)
44
var errIndexEmpty = errors.New("NoBlockIndexed")
manish's avatar
manish committed
45
46
47
48
49
50
51

type index interface {
	getLastBlockIndexed() (uint64, error)
	indexBlock(blockIdxInfo *blockIdxInfo) error
	getBlockLocByHash(blockHash []byte) (*fileLocPointer, error)
	getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error)
	getTxLoc(txID string) (*fileLocPointer, error)
52
	getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error)
53
	getBlockLocByTxID(txID string) (*fileLocPointer, error)
54
	getTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error)
manish's avatar
manish committed
55
56
57
58
59
60
}

type blockIdxInfo struct {
	blockNum  uint64
	blockHash []byte
	flp       *fileLocPointer
61
	txOffsets []*txindexInfo
62
	metadata  *common.BlockMetadata
manish's avatar
manish committed
63
64
}

manish's avatar
manish committed
65
type blockIndex struct {
manish's avatar
manish committed
66
	indexItemsMap map[blkstorage.IndexableAttr]bool
manish's avatar
manish committed
67
	db            *leveldbhelper.DBHandle
manish's avatar
manish committed
68
69
}

manish's avatar
manish committed
70
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) *blockIndex {
manish's avatar
manish committed
71
72
73
74
75
76
	indexItems := indexConfig.AttrsToIndex
	logger.Debugf("newBlockIndex() - indexItems:[%s]", indexItems)
	indexItemsMap := make(map[blkstorage.IndexableAttr]bool)
	for _, indexItem := range indexItems {
		indexItemsMap[indexItem] = true
	}
manish's avatar
manish committed
77
	return &blockIndex{indexItemsMap, db}
manish's avatar
manish committed
78
79
80
81
82
}

func (index *blockIndex) getLastBlockIndexed() (uint64, error) {
	var blockNumBytes []byte
	var err error
manish's avatar
manish committed
83
	if blockNumBytes, err = index.db.Get(indexCheckpointKey); err != nil {
84
		return 0, err
manish's avatar
manish committed
85
	}
86
87
88
	if blockNumBytes == nil {
		return 0, errIndexEmpty
	}
manish's avatar
manish committed
89
	return decodeBlockNum(blockNumBytes), nil
manish's avatar
manish committed
90
91
}

manish's avatar
manish committed
92
func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
93
	// do not index anything
manish's avatar
manish committed
94
95
96
97
	if len(index.indexItemsMap) == 0 {
		logger.Debug("Not indexing block... as nothing to index")
		return nil
	}
manish's avatar
manish committed
98
99
100
	logger.Debugf("Indexing block [%s]", blockIdxInfo)
	flp := blockIdxInfo.flp
	txOffsets := blockIdxInfo.txOffsets
101
	txsfltr := ledgerUtil.TxValidationFlags(blockIdxInfo.metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
manish's avatar
manish committed
102
	batch := leveldbhelper.NewUpdateBatch()
manish's avatar
manish committed
103
104
105
106
	flpBytes, err := flp.marshal()
	if err != nil {
		return err
	}
manish's avatar
manish committed
107

108
	//Index1
manish's avatar
manish committed
109
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok {
manish's avatar
manish committed
110
		batch.Put(constructBlockHashKey(blockIdxInfo.blockHash), flpBytes)
manish's avatar
manish committed
111
112
	}

113
	//Index2
manish's avatar
manish committed
114
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok {
manish's avatar
manish committed
115
		batch.Put(constructBlockNumKey(blockIdxInfo.blockNum), flpBytes)
manish's avatar
manish committed
116
117
	}

118
	//Index3 Used to find a transaction by it's transaction id
manish's avatar
manish committed
119
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok {
120
121
122
		for _, txoffset := range txOffsets {
			txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
			logger.Debugf("Adding txLoc [%s] for tx ID: [%s] to index", txFlp, txoffset.txID)
manish's avatar
manish committed
123
124
125
126
			txFlpBytes, marshalErr := txFlp.marshal()
			if marshalErr != nil {
				return marshalErr
			}
127
			batch.Put(constructTxIDKey(txoffset.txID), txFlpBytes)
manish's avatar
manish committed
128
129
		}
	}
130
131
132
133
134

	//Index4 - Store BlockNumTranNum will be used to query history data
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; ok {
		for txIterator, txoffset := range txOffsets {
			txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
135
			logger.Debugf("Adding txLoc [%s] for tx number:[%d] ID: [%s] to blockNumTranNum index", txFlp, txIterator, txoffset.txID)
136
137
138
139
			txFlpBytes, marshalErr := txFlp.marshal()
			if marshalErr != nil {
				return marshalErr
			}
140
			batch.Put(constructBlockNumTranNumKey(blockIdxInfo.blockNum, uint64(txIterator)), txFlpBytes)
141
142
143
		}
	}

144
145
146
147
148
149
150
	// Index5 - Store BlockNumber will be used to find block by transaction id
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockTxID]; ok {
		for _, txoffset := range txOffsets {
			batch.Put(constructBlockTxIDKey(txoffset.txID), flpBytes)
		}
	}

151
152
153
154
155
156
157
	// Index6 - Store transaction validation result by transaction id
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxValidationCode]; ok {
		for idx, txoffset := range txOffsets {
			batch.Put(constructTxValidationCodeIDKey(txoffset.txID), []byte{byte(txsfltr.Flag(idx))})
		}
	}

manish's avatar
manish committed
158
	batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
159
160
	// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
	if err := index.db.WriteBatch(batch, true); err != nil {
manish's avatar
manish committed
161
162
163
164
165
166
		return err
	}
	return nil
}

func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) {
manish's avatar
manish committed
167
168
169
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
manish's avatar
manish committed
170
	b, err := index.db.Get(constructBlockHashKey(blockHash))
manish's avatar
manish committed
171
172
173
	if err != nil {
		return nil, err
	}
manish's avatar
manish committed
174
	if b == nil {
manish's avatar
manish committed
175
		return nil, blkstorage.ErrNotFoundInIndex
manish's avatar
manish committed
176
	}
manish's avatar
manish committed
177
178
179
180
181
182
	blkLoc := &fileLocPointer{}
	blkLoc.unmarshal(b)
	return blkLoc, nil
}

func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) {
manish's avatar
manish committed
183
184
185
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
manish's avatar
manish committed
186
	b, err := index.db.Get(constructBlockNumKey(blockNum))
manish's avatar
manish committed
187
188
189
	if err != nil {
		return nil, err
	}
manish's avatar
manish committed
190
	if b == nil {
manish's avatar
manish committed
191
		return nil, blkstorage.ErrNotFoundInIndex
manish's avatar
manish committed
192
	}
manish's avatar
manish committed
193
194
195
196
197
198
	blkLoc := &fileLocPointer{}
	blkLoc.unmarshal(b)
	return blkLoc, nil
}

func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) {
manish's avatar
manish committed
199
200
201
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
manish's avatar
manish committed
202
	b, err := index.db.Get(constructTxIDKey(txID))
manish's avatar
manish committed
203
204
205
	if err != nil {
		return nil, err
	}
manish's avatar
manish committed
206
	if b == nil {
manish's avatar
manish committed
207
		return nil, blkstorage.ErrNotFoundInIndex
manish's avatar
manish committed
208
	}
manish's avatar
manish committed
209
210
211
212
213
	txFLP := &fileLocPointer{}
	txFLP.unmarshal(b)
	return txFLP, nil
}

214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
func (index *blockIndex) getBlockLocByTxID(txID string) (*fileLocPointer, error) {
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockTxID]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
	b, err := index.db.Get(constructBlockTxIDKey(txID))
	if err != nil {
		return nil, err
	}
	if b == nil {
		return nil, blkstorage.ErrNotFoundInIndex
	}
	txFLP := &fileLocPointer{}
	txFLP.unmarshal(b)
	return txFLP, nil
}

230
func (index *blockIndex) getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) {
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
	b, err := index.db.Get(constructBlockNumTranNumKey(blockNum, tranNum))
	if err != nil {
		return nil, err
	}
	if b == nil {
		return nil, blkstorage.ErrNotFoundInIndex
	}
	txFLP := &fileLocPointer{}
	txFLP.unmarshal(b)
	return txFLP, nil
}

246
247
248
249
250
251
252
253
254
255
func (index *blockIndex) getTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error) {
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxValidationCode]; !ok {
		return peer.TxValidationCode(-1), blkstorage.ErrAttrNotIndexed
	}

	raw, err := index.db.Get(constructTxValidationCodeIDKey(txID))

	if err != nil {
		return peer.TxValidationCode(-1), err
	} else if raw == nil {
256
		return peer.TxValidationCode(-1), blkstorage.ErrNotFoundInIndex
257
258
259
260
261
262
263
264
265
	} else if len(raw) != 1 {
		return peer.TxValidationCode(-1), errors.New("Invalid value in indexItems")
	}

	result := peer.TxValidationCode(int32(raw[0]))

	return result, nil
}

manish's avatar
manish committed
266
func constructBlockNumKey(blockNum uint64) []byte {
manish's avatar
manish committed
267
	blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
manish's avatar
manish committed
268
	return append([]byte{blockNumIdxKeyPrefix}, blkNumBytes...)
manish's avatar
manish committed
269
270
}

manish's avatar
manish committed
271
272
func constructBlockHashKey(blockHash []byte) []byte {
	return append([]byte{blockHashIdxKeyPrefix}, blockHash...)
manish's avatar
manish committed
273
274
}

manish's avatar
manish committed
275
276
func constructTxIDKey(txID string) []byte {
	return append([]byte{txIDIdxKeyPrefix}, []byte(txID)...)
manish's avatar
manish committed
277
278
}

279
280
281
282
func constructBlockTxIDKey(txID string) []byte {
	return append([]byte{blockTxIDIdxKeyPrefix}, []byte(txID)...)
}

283
284
285
286
func constructTxValidationCodeIDKey(txID string) []byte {
	return append([]byte{txValidationResultIdxKeyPrefix}, []byte(txID)...)
}

287
288
289
290
291
292
293
func constructBlockNumTranNumKey(blockNum uint64, txNum uint64) []byte {
	blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
	tranNumBytes := util.EncodeOrderPreservingVarUint64(txNum)
	key := append(blkNumBytes, tranNumBytes...)
	return append([]byte{blockNumTranNumIdxKeyPrefix}, key...)
}

manish's avatar
manish committed
294
295
296
297
298
299
300
301
302
func encodeBlockNum(blockNum uint64) []byte {
	return proto.EncodeVarint(blockNum)
}

func decodeBlockNum(blockNumBytes []byte) uint64 {
	blockNum, _ := proto.DecodeVarint(blockNumBytes)
	return blockNum
}

manish's avatar
manish committed
303
304
305
306
307
308
309
310
311
312
type locPointer struct {
	offset      int
	bytesLength int
}

func (lp *locPointer) String() string {
	return fmt.Sprintf("offset=%d, bytesLength=%d",
		lp.offset, lp.bytesLength)
}

manish's avatar
manish committed
313
// fileLocPointer
manish's avatar
manish committed
314
315
316
317
318
type fileLocPointer struct {
	fileSuffixNum int
	locPointer
}

319
func newFileLocationPointer(fileSuffixNum int, beginningOffset int, relativeLP *locPointer) *fileLocPointer {
manish's avatar
manish committed
320
	flp := &fileLocPointer{fileSuffixNum: fileSuffixNum}
321
	flp.offset = beginningOffset + relativeLP.offset
manish's avatar
manish committed
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
	flp.bytesLength = relativeLP.bytesLength
	return flp
}

func (flp *fileLocPointer) marshal() ([]byte, error) {
	buffer := proto.NewBuffer([]byte{})
	e := buffer.EncodeVarint(uint64(flp.fileSuffixNum))
	if e != nil {
		return nil, e
	}
	e = buffer.EncodeVarint(uint64(flp.offset))
	if e != nil {
		return nil, e
	}
	e = buffer.EncodeVarint(uint64(flp.bytesLength))
	if e != nil {
		return nil, e
	}
	return buffer.Bytes(), nil
}

func (flp *fileLocPointer) unmarshal(b []byte) error {
	buffer := proto.NewBuffer(b)
	i, e := buffer.DecodeVarint()
	if e != nil {
		return e
	}
	flp.fileSuffixNum = int(i)

	i, e = buffer.DecodeVarint()
	if e != nil {
		return e
	}
	flp.offset = int(i)
	i, e = buffer.DecodeVarint()
	if e != nil {
		return e
	}
	flp.bytesLength = int(i)
	return nil
}

func (flp *fileLocPointer) String() string {
	return fmt.Sprintf("fileSuffixNum=%d, %s", flp.fileSuffixNum, flp.locPointer.String())
}
manish's avatar
manish committed
367
368

func (blockIdxInfo *blockIdxInfo) String() string {
369
370
371
372
373
374
375
376
377
378
379
380

	var buffer bytes.Buffer
	for _, txOffset := range blockIdxInfo.txOffsets {
		buffer.WriteString("txId=")
		buffer.WriteString(txOffset.txID)
		buffer.WriteString(" locPointer=")
		buffer.WriteString(txOffset.loc.String())
		buffer.WriteString("\n")
	}
	txOffsetsString := buffer.String()

	return fmt.Sprintf("blockNum=%d, blockHash=%#v txOffsets=\n%s", blockIdxInfo.blockNum, blockIdxInfo.blockHash, txOffsetsString)
manish's avatar
manish committed
381
}