blockindex.go 9.55 KB
Newer Older
manish's avatar
manish committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
Copyright IBM Corp. 2016 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fsblkstorage

import (
20
	"bytes"
manish's avatar
manish committed
21
22
23
	"fmt"

	"github.com/golang/protobuf/proto"
24
25
26
	"github.com/hyperledger/fabric/common/ledger/blkstorage"
	"github.com/hyperledger/fabric/common/ledger/util"
	"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
manish's avatar
manish committed
27
28
)

manish's avatar
manish committed
29
const (
30
31
32
33
	blockNumIdxKeyPrefix        = 'n'
	blockHashIdxKeyPrefix       = 'h'
	txIDIdxKeyPrefix            = 't'
	blockNumTranNumIdxKeyPrefix = 'a'
34
	blockTxIDIdxKeyPrefix       = 'b'
35
	indexCheckpointKeyStr       = "indexCheckpointKey"
manish's avatar
manish committed
36
37
38
39
40
41
42
43
44
45
)

var indexCheckpointKey = []byte(indexCheckpointKeyStr)

type index interface {
	getLastBlockIndexed() (uint64, error)
	indexBlock(blockIdxInfo *blockIdxInfo) error
	getBlockLocByHash(blockHash []byte) (*fileLocPointer, error)
	getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error)
	getTxLoc(txID string) (*fileLocPointer, error)
46
	getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error)
47
	getBlockLocByTxID(txID string) (*fileLocPointer, error)
manish's avatar
manish committed
48
49
50
51
52
53
}

type blockIdxInfo struct {
	blockNum  uint64
	blockHash []byte
	flp       *fileLocPointer
54
	txOffsets []*txindexInfo
manish's avatar
manish committed
55
56
}

manish's avatar
manish committed
57
type blockIndex struct {
manish's avatar
manish committed
58
	indexItemsMap map[blkstorage.IndexableAttr]bool
manish's avatar
manish committed
59
	db            *leveldbhelper.DBHandle
manish's avatar
manish committed
60
61
}

manish's avatar
manish committed
62
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) *blockIndex {
manish's avatar
manish committed
63
64
65
66
67
68
	indexItems := indexConfig.AttrsToIndex
	logger.Debugf("newBlockIndex() - indexItems:[%s]", indexItems)
	indexItemsMap := make(map[blkstorage.IndexableAttr]bool)
	for _, indexItem := range indexItems {
		indexItemsMap[indexItem] = true
	}
manish's avatar
manish committed
69
	return &blockIndex{indexItemsMap, db}
manish's avatar
manish committed
70
71
72
73
74
}

func (index *blockIndex) getLastBlockIndexed() (uint64, error) {
	var blockNumBytes []byte
	var err error
manish's avatar
manish committed
75
	if blockNumBytes, err = index.db.Get(indexCheckpointKey); err != nil {
manish's avatar
manish committed
76
77
78
		return 0, nil
	}
	return decodeBlockNum(blockNumBytes), nil
manish's avatar
manish committed
79
80
}

manish's avatar
manish committed
81
func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
82
	// do not index anything
manish's avatar
manish committed
83
84
85
86
	if len(index.indexItemsMap) == 0 {
		logger.Debug("Not indexing block... as nothing to index")
		return nil
	}
manish's avatar
manish committed
87
88
89
	logger.Debugf("Indexing block [%s]", blockIdxInfo)
	flp := blockIdxInfo.flp
	txOffsets := blockIdxInfo.txOffsets
manish's avatar
manish committed
90
	batch := leveldbhelper.NewUpdateBatch()
manish's avatar
manish committed
91
92
93
94
	flpBytes, err := flp.marshal()
	if err != nil {
		return err
	}
manish's avatar
manish committed
95

96
	//Index1
manish's avatar
manish committed
97
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok {
manish's avatar
manish committed
98
		batch.Put(constructBlockHashKey(blockIdxInfo.blockHash), flpBytes)
manish's avatar
manish committed
99
100
	}

101
	//Index2
manish's avatar
manish committed
102
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok {
manish's avatar
manish committed
103
		batch.Put(constructBlockNumKey(blockIdxInfo.blockNum), flpBytes)
manish's avatar
manish committed
104
105
	}

106
	//Index3 Used to find a transaction by it's transaction id
manish's avatar
manish committed
107
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok {
108
109
110
		for _, txoffset := range txOffsets {
			txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
			logger.Debugf("Adding txLoc [%s] for tx ID: [%s] to index", txFlp, txoffset.txID)
manish's avatar
manish committed
111
112
113
114
			txFlpBytes, marshalErr := txFlp.marshal()
			if marshalErr != nil {
				return marshalErr
			}
115
			batch.Put(constructTxIDKey(txoffset.txID), txFlpBytes)
manish's avatar
manish committed
116
117
		}
	}
118
119
120
121
122
123
124
125
126
127
128
129
130
131

	//Index4 - Store BlockNumTranNum will be used to query history data
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; ok {
		for txIterator, txoffset := range txOffsets {
			txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
			logger.Debugf("Adding txLoc [%s] for tx number:[%d] ID: [%s] to blockNumTranNum index", txFlp, txIterator+1, txoffset.txID)
			txFlpBytes, marshalErr := txFlp.marshal()
			if marshalErr != nil {
				return marshalErr
			}
			batch.Put(constructBlockNumTranNumKey(blockIdxInfo.blockNum, uint64(txIterator+1)), txFlpBytes)
		}
	}

132
133
134
135
136
137
138
	// Index5 - Store BlockNumber will be used to find block by transaction id
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockTxID]; ok {
		for _, txoffset := range txOffsets {
			batch.Put(constructBlockTxIDKey(txoffset.txID), flpBytes)
		}
	}

manish's avatar
manish committed
139
140
	batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
	if err := index.db.WriteBatch(batch, false); err != nil {
manish's avatar
manish committed
141
142
143
144
145
146
		return err
	}
	return nil
}

func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) {
manish's avatar
manish committed
147
148
149
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
manish's avatar
manish committed
150
	b, err := index.db.Get(constructBlockHashKey(blockHash))
manish's avatar
manish committed
151
152
153
	if err != nil {
		return nil, err
	}
manish's avatar
manish committed
154
	if b == nil {
manish's avatar
manish committed
155
		return nil, blkstorage.ErrNotFoundInIndex
manish's avatar
manish committed
156
	}
manish's avatar
manish committed
157
158
159
160
161
162
	blkLoc := &fileLocPointer{}
	blkLoc.unmarshal(b)
	return blkLoc, nil
}

func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) {
manish's avatar
manish committed
163
164
165
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
manish's avatar
manish committed
166
	b, err := index.db.Get(constructBlockNumKey(blockNum))
manish's avatar
manish committed
167
168
169
	if err != nil {
		return nil, err
	}
manish's avatar
manish committed
170
	if b == nil {
manish's avatar
manish committed
171
		return nil, blkstorage.ErrNotFoundInIndex
manish's avatar
manish committed
172
	}
manish's avatar
manish committed
173
174
175
176
177
178
	blkLoc := &fileLocPointer{}
	blkLoc.unmarshal(b)
	return blkLoc, nil
}

func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) {
manish's avatar
manish committed
179
180
181
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
manish's avatar
manish committed
182
	b, err := index.db.Get(constructTxIDKey(txID))
manish's avatar
manish committed
183
184
185
	if err != nil {
		return nil, err
	}
manish's avatar
manish committed
186
	if b == nil {
manish's avatar
manish committed
187
		return nil, blkstorage.ErrNotFoundInIndex
manish's avatar
manish committed
188
	}
manish's avatar
manish committed
189
190
191
192
193
	txFLP := &fileLocPointer{}
	txFLP.unmarshal(b)
	return txFLP, nil
}

194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
func (index *blockIndex) getBlockLocByTxID(txID string) (*fileLocPointer, error) {
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockTxID]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
	b, err := index.db.Get(constructBlockTxIDKey(txID))
	if err != nil {
		return nil, err
	}
	if b == nil {
		return nil, blkstorage.ErrNotFoundInIndex
	}
	txFLP := &fileLocPointer{}
	txFLP.unmarshal(b)
	return txFLP, nil
}

210
func (index *blockIndex) getTXLocByBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) {
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
	b, err := index.db.Get(constructBlockNumTranNumKey(blockNum, tranNum))
	if err != nil {
		return nil, err
	}
	if b == nil {
		return nil, blkstorage.ErrNotFoundInIndex
	}
	txFLP := &fileLocPointer{}
	txFLP.unmarshal(b)
	return txFLP, nil
}

manish's avatar
manish committed
226
func constructBlockNumKey(blockNum uint64) []byte {
manish's avatar
manish committed
227
	blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
manish's avatar
manish committed
228
	return append([]byte{blockNumIdxKeyPrefix}, blkNumBytes...)
manish's avatar
manish committed
229
230
}

manish's avatar
manish committed
231
232
func constructBlockHashKey(blockHash []byte) []byte {
	return append([]byte{blockHashIdxKeyPrefix}, blockHash...)
manish's avatar
manish committed
233
234
}

manish's avatar
manish committed
235
236
func constructTxIDKey(txID string) []byte {
	return append([]byte{txIDIdxKeyPrefix}, []byte(txID)...)
manish's avatar
manish committed
237
238
}

239
240
241
242
func constructBlockTxIDKey(txID string) []byte {
	return append([]byte{blockTxIDIdxKeyPrefix}, []byte(txID)...)
}

243
244
245
246
247
248
249
func constructBlockNumTranNumKey(blockNum uint64, txNum uint64) []byte {
	blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
	tranNumBytes := util.EncodeOrderPreservingVarUint64(txNum)
	key := append(blkNumBytes, tranNumBytes...)
	return append([]byte{blockNumTranNumIdxKeyPrefix}, key...)
}

manish's avatar
manish committed
250
251
252
253
254
255
256
257
258
func encodeBlockNum(blockNum uint64) []byte {
	return proto.EncodeVarint(blockNum)
}

func decodeBlockNum(blockNumBytes []byte) uint64 {
	blockNum, _ := proto.DecodeVarint(blockNumBytes)
	return blockNum
}

manish's avatar
manish committed
259
260
261
262
263
264
265
266
267
268
type locPointer struct {
	offset      int
	bytesLength int
}

func (lp *locPointer) String() string {
	return fmt.Sprintf("offset=%d, bytesLength=%d",
		lp.offset, lp.bytesLength)
}

manish's avatar
manish committed
269
// fileLocPointer
manish's avatar
manish committed
270
271
272
273
274
type fileLocPointer struct {
	fileSuffixNum int
	locPointer
}

275
func newFileLocationPointer(fileSuffixNum int, beginningOffset int, relativeLP *locPointer) *fileLocPointer {
manish's avatar
manish committed
276
	flp := &fileLocPointer{fileSuffixNum: fileSuffixNum}
277
	flp.offset = beginningOffset + relativeLP.offset
manish's avatar
manish committed
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
	flp.bytesLength = relativeLP.bytesLength
	return flp
}

func (flp *fileLocPointer) marshal() ([]byte, error) {
	buffer := proto.NewBuffer([]byte{})
	e := buffer.EncodeVarint(uint64(flp.fileSuffixNum))
	if e != nil {
		return nil, e
	}
	e = buffer.EncodeVarint(uint64(flp.offset))
	if e != nil {
		return nil, e
	}
	e = buffer.EncodeVarint(uint64(flp.bytesLength))
	if e != nil {
		return nil, e
	}
	return buffer.Bytes(), nil
}

func (flp *fileLocPointer) unmarshal(b []byte) error {
	buffer := proto.NewBuffer(b)
	i, e := buffer.DecodeVarint()
	if e != nil {
		return e
	}
	flp.fileSuffixNum = int(i)

	i, e = buffer.DecodeVarint()
	if e != nil {
		return e
	}
	flp.offset = int(i)
	i, e = buffer.DecodeVarint()
	if e != nil {
		return e
	}
	flp.bytesLength = int(i)
	return nil
}

func (flp *fileLocPointer) String() string {
	return fmt.Sprintf("fileSuffixNum=%d, %s", flp.fileSuffixNum, flp.locPointer.String())
}
manish's avatar
manish committed
323
324

func (blockIdxInfo *blockIdxInfo) String() string {
325
326
327
328
329
330
331
332
333
334
335
336

	var buffer bytes.Buffer
	for _, txOffset := range blockIdxInfo.txOffsets {
		buffer.WriteString("txId=")
		buffer.WriteString(txOffset.txID)
		buffer.WriteString(" locPointer=")
		buffer.WriteString(txOffset.loc.String())
		buffer.WriteString("\n")
	}
	txOffsetsString := buffer.String()

	return fmt.Sprintf("blockNum=%d, blockHash=%#v txOffsets=\n%s", blockIdxInfo.blockNum, blockIdxInfo.blockHash, txOffsetsString)
manish's avatar
manish committed
337
}