stateleveldb.go 6.77 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
Copyright IBM Corp. 2016 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package stateleveldb

import (
	"bytes"
21
	"fmt"
22
23
24

	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
manish's avatar
manish committed
25
26
	"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
	"github.com/hyperledger/fabric/core/ledger/util/leveldbhelper"
27
28
29
30
	logging "github.com/op/go-logging"
	"github.com/syndtr/goleveldb/leveldb/iterator"
)

31
var logger = logging.MustGetLogger("stateleveldb")
32
33
34
35
36
37
38

var compositeKeySep = []byte{0x00}
var lastKeyIndicator = byte(0x01)
var savePointKey = []byte{0x00}

// VersionedDBProvider implements interface VersionedDBProvider
type VersionedDBProvider struct {
manish's avatar
manish committed
39
	dbProvider *leveldbhelper.Provider
40
41
42
}

// NewVersionedDBProvider instantiates VersionedDBProvider
43
func NewVersionedDBProvider() *VersionedDBProvider {
manish's avatar
manish committed
44
	dbPath := ledgerconfig.GetStateLevelDBPath()
45
	logger.Debugf("constructing VersionedDBProvider dbPath=%s", dbPath)
manish's avatar
manish committed
46
47
	dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: dbPath})
	return &VersionedDBProvider{dbProvider}
48
49
50
}

// GetDBHandle gets the handle to a named database
51
func (provider *VersionedDBProvider) GetDBHandle(dbName string) (statedb.VersionedDB, error) {
manish's avatar
manish committed
52
	return newVersionedDB(provider.dbProvider.GetDBHandle(dbName), dbName), nil
53
54
55
56
}

// Close closes the underlying db
func (provider *VersionedDBProvider) Close() {
manish's avatar
manish committed
57
	provider.dbProvider.Close()
58
59
60
}

// VersionedDB implements VersionedDB interface
manish's avatar
manish committed
61
62
type versionedDB struct {
	db     *leveldbhelper.DBHandle
63
	dbName string
64
65
66
}

// newVersionedDB constructs an instance of VersionedDB
manish's avatar
manish committed
67
68
func newVersionedDB(db *leveldbhelper.DBHandle, dbName string) *versionedDB {
	return &versionedDB{db, dbName}
69
70
71
}

// Open implements method in VersionedDB interface
manish's avatar
manish committed
72
func (vdb *versionedDB) Open() error {
73
	// do nothing because shared db is used
74
75
76
77
	return nil
}

// Close implements method in VersionedDB interface
manish's avatar
manish committed
78
func (vdb *versionedDB) Close() {
79
	// do nothing because shared db is used
80
81
82
}

// GetState implements method in VersionedDB interface
manish's avatar
manish committed
83
func (vdb *versionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) {
84
	logger.Debugf("GetState(). ns=%s, key=%s", namespace, key)
manish's avatar
manish committed
85
	compositeKey := constructCompositeKey(namespace, key)
86
87
88
89
90
91
92
93
94
95
96
97
	dbVal, err := vdb.db.Get(compositeKey)
	if err != nil {
		return nil, err
	}
	if dbVal == nil {
		return nil, nil
	}
	val, ver := decodeValue(dbVal)
	return &statedb.VersionedValue{Value: val, Version: ver}, nil
}

// GetStateMultipleKeys implements method in VersionedDB interface
manish's avatar
manish committed
98
func (vdb *versionedDB) GetStateMultipleKeys(namespace string, keys []string) ([]*statedb.VersionedValue, error) {
99
100
101
102
103
104
105
106
107
108
109
110
	vals := make([]*statedb.VersionedValue, len(keys))
	for i, key := range keys {
		val, err := vdb.GetState(namespace, key)
		if err != nil {
			return nil, err
		}
		vals[i] = val
	}
	return vals, nil
}

// GetStateRangeScanIterator implements method in VersionedDB interface
111
112
// startKey is inclusive
// endKey is exclusive
manish's avatar
manish committed
113
114
115
func (vdb *versionedDB) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) {
	compositeStartKey := constructCompositeKey(namespace, startKey)
	compositeEndKey := constructCompositeKey(namespace, endKey)
116
117
118
119
120
121
122
123
	if endKey == "" {
		compositeEndKey[len(compositeEndKey)-1] = lastKeyIndicator
	}
	dbItr := vdb.db.GetIterator(compositeStartKey, compositeEndKey)
	return newKVScanner(namespace, dbItr), nil
}

// ExecuteQuery implements method in VersionedDB interface
manish's avatar
manish committed
124
func (vdb *versionedDB) ExecuteQuery(query string) (statedb.ResultsIterator, error) {
125
126
127
128
	panic("Method not supported for leveldb")
}

// ApplyUpdates implements method in VersionedDB interface
manish's avatar
manish committed
129
130
func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
	dbBatch := leveldbhelper.NewUpdateBatch()
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
	namespaces := batch.GetUpdatedNamespaces()
	for _, ns := range namespaces {
		updates := batch.GetUpdates(ns)
		for k, vv := range updates {
			compositeKey := constructCompositeKey(ns, k)
			// trace the first 200 characters of versioned value only, in case it is huge
			if logger.IsEnabledFor(logging.DEBUG) {
				versionedValueDump := fmt.Sprintf("%#v", vv)
				if len(versionedValueDump) > 200 {
					versionedValueDump = versionedValueDump[0:200] + "..."
				}
				logger.Debugf("Applying key=%#v, versionedValue=%s", compositeKey, versionedValueDump)
			}
			if vv.Value == nil {
				dbBatch.Delete(compositeKey)
			} else {
				dbBatch.Put(compositeKey, encodeValue(vv.Value, vv.Version))
148
			}
149
150
		}
	}
manish's avatar
manish committed
151
152
	dbBatch.Put(savePointKey, height.ToBytes())
	if err := vdb.db.WriteBatch(dbBatch, false); err != nil {
153
154
155
156
157
158
		return err
	}
	return nil
}

// GetLatestSavePoint implements method in VersionedDB interface
manish's avatar
manish committed
159
160
func (vdb *versionedDB) GetLatestSavePoint() (*version.Height, error) {
	versionBytes, err := vdb.db.Get(savePointKey)
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
	if err != nil {
		return nil, err
	}
	version, _ := version.NewHeightFromBytes(versionBytes)
	return version, nil
}

func encodeValue(value []byte, version *version.Height) []byte {
	encodedValue := version.ToBytes()
	if value != nil {
		encodedValue = append(encodedValue, value...)
	}
	return encodedValue
}

func decodeValue(encodedValue []byte) ([]byte, *version.Height) {
	version, n := version.NewHeightFromBytes(encodedValue)
	value := encodedValue[n:]
	return value, version
}

manish's avatar
manish committed
182
183
func constructCompositeKey(ns string, key string) []byte {
	return append(append([]byte(ns), compositeKeySep...), []byte(key)...)
184
185
}

manish's avatar
manish committed
186
187
188
func splitCompositeKey(compositeKey []byte) (string, string) {
	split := bytes.SplitN(compositeKey, compositeKeySep, 2)
	return string(split[0]), string(split[1])
189
190
191
192
193
194
195
196
197
198
199
}

type kvScanner struct {
	namespace string
	dbItr     iterator.Iterator
}

func newKVScanner(namespace string, dbItr iterator.Iterator) *kvScanner {
	return &kvScanner{namespace, dbItr}
}

200
func (scanner *kvScanner) Next() (statedb.QueryResult, error) {
201
202
203
	if !scanner.dbItr.Next() {
		return nil, nil
	}
204
205
206
207
208
209
	dbKey := scanner.dbItr.Key()
	dbVal := scanner.dbItr.Value()
	dbValCopy := make([]byte, len(dbVal))
	copy(dbValCopy, dbVal)
	_, key := splitCompositeKey(dbKey)
	value, version := decodeValue(dbValCopy)
210
211
212
213
214
215
216
217
	return &statedb.VersionedKV{
		CompositeKey:   statedb.CompositeKey{Namespace: scanner.namespace, Key: key},
		VersionedValue: statedb.VersionedValue{Value: value, Version: version}}, nil
}

func (scanner *kvScanner) Close() {
	scanner.dbItr.Release()
}