blockindex.go 8.35 KB
Newer Older
manish's avatar
manish committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
Copyright IBM Corp. 2016 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fsblkstorage

import (
	"fmt"

	"github.com/golang/protobuf/proto"
23
24
25
	"github.com/hyperledger/fabric/common/ledger/blkstorage"
	"github.com/hyperledger/fabric/common/ledger/util"
	"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
manish's avatar
manish committed
26
27
)

manish's avatar
manish committed
28
const (
29
30
31
32
33
	blockNumIdxKeyPrefix        = 'n'
	blockHashIdxKeyPrefix       = 'h'
	txIDIdxKeyPrefix            = 't'
	blockNumTranNumIdxKeyPrefix = 'a'
	indexCheckpointKeyStr       = "indexCheckpointKey"
manish's avatar
manish committed
34
35
36
37
38
39
40
41
42
43
)

var indexCheckpointKey = []byte(indexCheckpointKeyStr)

type index interface {
	getLastBlockIndexed() (uint64, error)
	indexBlock(blockIdxInfo *blockIdxInfo) error
	getBlockLocByHash(blockHash []byte) (*fileLocPointer, error)
	getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error)
	getTxLoc(txID string) (*fileLocPointer, error)
44
	getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error)
manish's avatar
manish committed
45
46
47
48
49
50
}

type blockIdxInfo struct {
	blockNum  uint64
	blockHash []byte
	flp       *fileLocPointer
51
	txOffsets []*txindexInfo
manish's avatar
manish committed
52
53
}

manish's avatar
manish committed
54
type blockIndex struct {
manish's avatar
manish committed
55
	indexItemsMap map[blkstorage.IndexableAttr]bool
manish's avatar
manish committed
56
	db            *leveldbhelper.DBHandle
manish's avatar
manish committed
57
58
}

manish's avatar
manish committed
59
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) *blockIndex {
manish's avatar
manish committed
60
61
62
63
64
65
	indexItems := indexConfig.AttrsToIndex
	logger.Debugf("newBlockIndex() - indexItems:[%s]", indexItems)
	indexItemsMap := make(map[blkstorage.IndexableAttr]bool)
	for _, indexItem := range indexItems {
		indexItemsMap[indexItem] = true
	}
manish's avatar
manish committed
66
	return &blockIndex{indexItemsMap, db}
manish's avatar
manish committed
67
68
69
70
71
}

func (index *blockIndex) getLastBlockIndexed() (uint64, error) {
	var blockNumBytes []byte
	var err error
manish's avatar
manish committed
72
	if blockNumBytes, err = index.db.Get(indexCheckpointKey); err != nil {
manish's avatar
manish committed
73
74
75
		return 0, nil
	}
	return decodeBlockNum(blockNumBytes), nil
manish's avatar
manish committed
76
77
}

manish's avatar
manish committed
78
func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
79
	// do not index anything
manish's avatar
manish committed
80
81
82
83
	if len(index.indexItemsMap) == 0 {
		logger.Debug("Not indexing block... as nothing to index")
		return nil
	}
manish's avatar
manish committed
84
85
86
	logger.Debugf("Indexing block [%s]", blockIdxInfo)
	flp := blockIdxInfo.flp
	txOffsets := blockIdxInfo.txOffsets
manish's avatar
manish committed
87
	batch := leveldbhelper.NewUpdateBatch()
manish's avatar
manish committed
88
89
90
91
	flpBytes, err := flp.marshal()
	if err != nil {
		return err
	}
manish's avatar
manish committed
92

93
	//Index1
manish's avatar
manish committed
94
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok {
manish's avatar
manish committed
95
		batch.Put(constructBlockHashKey(blockIdxInfo.blockHash), flpBytes)
manish's avatar
manish committed
96
97
	}

98
	//Index2
manish's avatar
manish committed
99
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok {
manish's avatar
manish committed
100
		batch.Put(constructBlockNumKey(blockIdxInfo.blockNum), flpBytes)
manish's avatar
manish committed
101
102
	}

103
	//Index3 Used to find a transaction by it's transaction id
manish's avatar
manish committed
104
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok {
105
106
107
		for _, txoffset := range txOffsets {
			txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
			logger.Debugf("Adding txLoc [%s] for tx ID: [%s] to index", txFlp, txoffset.txID)
manish's avatar
manish committed
108
109
110
111
			txFlpBytes, marshalErr := txFlp.marshal()
			if marshalErr != nil {
				return marshalErr
			}
112
			batch.Put(constructTxIDKey(txoffset.txID), txFlpBytes)
manish's avatar
manish committed
113
114
		}
	}
115
116
117
118
119
120
121
122
123
124
125
126
127
128

	//Index4 - Store BlockNumTranNum will be used to query history data
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; ok {
		for txIterator, txoffset := range txOffsets {
			txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
			logger.Debugf("Adding txLoc [%s] for tx number:[%d] ID: [%s] to blockNumTranNum index", txFlp, txIterator+1, txoffset.txID)
			txFlpBytes, marshalErr := txFlp.marshal()
			if marshalErr != nil {
				return marshalErr
			}
			batch.Put(constructBlockNumTranNumKey(blockIdxInfo.blockNum, uint64(txIterator+1)), txFlpBytes)
		}
	}

manish's avatar
manish committed
129
130
	batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
	if err := index.db.WriteBatch(batch, false); err != nil {
manish's avatar
manish committed
131
132
133
134
135
136
		return err
	}
	return nil
}

func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) {
manish's avatar
manish committed
137
138
139
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
manish's avatar
manish committed
140
	b, err := index.db.Get(constructBlockHashKey(blockHash))
manish's avatar
manish committed
141
142
143
	if err != nil {
		return nil, err
	}
manish's avatar
manish committed
144
	if b == nil {
manish's avatar
manish committed
145
		return nil, blkstorage.ErrNotFoundInIndex
manish's avatar
manish committed
146
	}
manish's avatar
manish committed
147
148
149
150
151
152
	blkLoc := &fileLocPointer{}
	blkLoc.unmarshal(b)
	return blkLoc, nil
}

func (index *blockIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error) {
manish's avatar
manish committed
153
154
155
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
manish's avatar
manish committed
156
	b, err := index.db.Get(constructBlockNumKey(blockNum))
manish's avatar
manish committed
157
158
159
	if err != nil {
		return nil, err
	}
manish's avatar
manish committed
160
	if b == nil {
manish's avatar
manish committed
161
		return nil, blkstorage.ErrNotFoundInIndex
manish's avatar
manish committed
162
	}
manish's avatar
manish committed
163
164
165
166
167
168
	blkLoc := &fileLocPointer{}
	blkLoc.unmarshal(b)
	return blkLoc, nil
}

func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) {
manish's avatar
manish committed
169
170
171
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
manish's avatar
manish committed
172
	b, err := index.db.Get(constructTxIDKey(txID))
manish's avatar
manish committed
173
174
175
	if err != nil {
		return nil, err
	}
manish's avatar
manish committed
176
	if b == nil {
manish's avatar
manish committed
177
		return nil, blkstorage.ErrNotFoundInIndex
manish's avatar
manish committed
178
	}
manish's avatar
manish committed
179
180
181
182
183
	txFLP := &fileLocPointer{}
	txFLP.unmarshal(b)
	return txFLP, nil
}

184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
func (index *blockIndex) getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) {
	if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; !ok {
		return nil, blkstorage.ErrAttrNotIndexed
	}
	b, err := index.db.Get(constructBlockNumTranNumKey(blockNum, tranNum))
	if err != nil {
		return nil, err
	}
	if b == nil {
		return nil, blkstorage.ErrNotFoundInIndex
	}
	txFLP := &fileLocPointer{}
	txFLP.unmarshal(b)
	return txFLP, nil
}

manish's avatar
manish committed
200
func constructBlockNumKey(blockNum uint64) []byte {
manish's avatar
manish committed
201
	blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
manish's avatar
manish committed
202
	return append([]byte{blockNumIdxKeyPrefix}, blkNumBytes...)
manish's avatar
manish committed
203
204
}

manish's avatar
manish committed
205
206
func constructBlockHashKey(blockHash []byte) []byte {
	return append([]byte{blockHashIdxKeyPrefix}, blockHash...)
manish's avatar
manish committed
207
208
}

manish's avatar
manish committed
209
210
func constructTxIDKey(txID string) []byte {
	return append([]byte{txIDIdxKeyPrefix}, []byte(txID)...)
manish's avatar
manish committed
211
212
}

213
214
215
216
217
218
219
func constructBlockNumTranNumKey(blockNum uint64, txNum uint64) []byte {
	blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
	tranNumBytes := util.EncodeOrderPreservingVarUint64(txNum)
	key := append(blkNumBytes, tranNumBytes...)
	return append([]byte{blockNumTranNumIdxKeyPrefix}, key...)
}

manish's avatar
manish committed
220
221
222
223
224
225
226
227
228
func encodeBlockNum(blockNum uint64) []byte {
	return proto.EncodeVarint(blockNum)
}

func decodeBlockNum(blockNumBytes []byte) uint64 {
	blockNum, _ := proto.DecodeVarint(blockNumBytes)
	return blockNum
}

manish's avatar
manish committed
229
230
231
232
233
234
235
236
237
238
type locPointer struct {
	offset      int
	bytesLength int
}

func (lp *locPointer) String() string {
	return fmt.Sprintf("offset=%d, bytesLength=%d",
		lp.offset, lp.bytesLength)
}

manish's avatar
manish committed
239
// fileLocPointer
manish's avatar
manish committed
240
241
242
243
244
type fileLocPointer struct {
	fileSuffixNum int
	locPointer
}

245
func newFileLocationPointer(fileSuffixNum int, beginningOffset int, relativeLP *locPointer) *fileLocPointer {
manish's avatar
manish committed
246
	flp := &fileLocPointer{fileSuffixNum: fileSuffixNum}
247
	flp.offset = beginningOffset + relativeLP.offset
manish's avatar
manish committed
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
	flp.bytesLength = relativeLP.bytesLength
	return flp
}

func (flp *fileLocPointer) marshal() ([]byte, error) {
	buffer := proto.NewBuffer([]byte{})
	e := buffer.EncodeVarint(uint64(flp.fileSuffixNum))
	if e != nil {
		return nil, e
	}
	e = buffer.EncodeVarint(uint64(flp.offset))
	if e != nil {
		return nil, e
	}
	e = buffer.EncodeVarint(uint64(flp.bytesLength))
	if e != nil {
		return nil, e
	}
	return buffer.Bytes(), nil
}

func (flp *fileLocPointer) unmarshal(b []byte) error {
	buffer := proto.NewBuffer(b)
	i, e := buffer.DecodeVarint()
	if e != nil {
		return e
	}
	flp.fileSuffixNum = int(i)

	i, e = buffer.DecodeVarint()
	if e != nil {
		return e
	}
	flp.offset = int(i)
	i, e = buffer.DecodeVarint()
	if e != nil {
		return e
	}
	flp.bytesLength = int(i)
	return nil
}

func (flp *fileLocPointer) String() string {
	return fmt.Sprintf("fileSuffixNum=%d, %s", flp.fileSuffixNum, flp.locPointer.String())
}
manish's avatar
manish committed
293
294
295
296

func (blockIdxInfo *blockIdxInfo) String() string {
	return fmt.Sprintf("blockNum=%d, blockHash=%#v", blockIdxInfo.blockNum, blockIdxInfo.blockHash)
}