stateleveldb.go 6.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
Copyright IBM Corp. 2016 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package stateleveldb

import (
	"bytes"
21
	"fmt"
22
23
24

	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
manish's avatar
manish committed
25
26
	"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
	"github.com/hyperledger/fabric/core/ledger/util/leveldbhelper"
27
28
29
30
	logging "github.com/op/go-logging"
	"github.com/syndtr/goleveldb/leveldb/iterator"
)

31
var logger = logging.MustGetLogger("stateleveldb")
32
33
34
35
36
37
38

var compositeKeySep = []byte{0x00}
var lastKeyIndicator = byte(0x01)
var savePointKey = []byte{0x00}

// VersionedDBProvider implements interface VersionedDBProvider
type VersionedDBProvider struct {
manish's avatar
manish committed
39
	dbProvider *leveldbhelper.Provider
40
41
42
}

// NewVersionedDBProvider instantiates VersionedDBProvider
43
func NewVersionedDBProvider() *VersionedDBProvider {
manish's avatar
manish committed
44
	dbPath := ledgerconfig.GetStateLevelDBPath()
45
	logger.Debugf("constructing VersionedDBProvider dbPath=%s", dbPath)
manish's avatar
manish committed
46
47
	dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: dbPath})
	return &VersionedDBProvider{dbProvider}
48
49
50
}

// GetDBHandle gets the handle to a named database
51
func (provider *VersionedDBProvider) GetDBHandle(dbName string) (statedb.VersionedDB, error) {
manish's avatar
manish committed
52
	return newVersionedDB(provider.dbProvider.GetDBHandle(dbName), dbName), nil
53
54
55
56
}

// Close closes the underlying db
func (provider *VersionedDBProvider) Close() {
manish's avatar
manish committed
57
	provider.dbProvider.Close()
58
59
60
}

// VersionedDB implements VersionedDB interface
manish's avatar
manish committed
61
62
type versionedDB struct {
	db     *leveldbhelper.DBHandle
63
	dbName string
64
65
66
}

// newVersionedDB constructs an instance of VersionedDB
manish's avatar
manish committed
67
68
func newVersionedDB(db *leveldbhelper.DBHandle, dbName string) *versionedDB {
	return &versionedDB{db, dbName}
69
70
71
}

// Open implements method in VersionedDB interface
manish's avatar
manish committed
72
func (vdb *versionedDB) Open() error {
73
	// do nothing because shared db is used
74
75
76
77
	return nil
}

// Close implements method in VersionedDB interface
manish's avatar
manish committed
78
func (vdb *versionedDB) Close() {
79
	// do nothing because shared db is used
80
81
82
}

// GetState implements method in VersionedDB interface
manish's avatar
manish committed
83
func (vdb *versionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) {
84
	logger.Debugf("GetState(). ns=%s, key=%s", namespace, key)
manish's avatar
manish committed
85
	compositeKey := constructCompositeKey(namespace, key)
86
87
88
89
90
91
92
93
94
95
96
97
	dbVal, err := vdb.db.Get(compositeKey)
	if err != nil {
		return nil, err
	}
	if dbVal == nil {
		return nil, nil
	}
	val, ver := decodeValue(dbVal)
	return &statedb.VersionedValue{Value: val, Version: ver}, nil
}

// GetStateMultipleKeys implements method in VersionedDB interface
manish's avatar
manish committed
98
func (vdb *versionedDB) GetStateMultipleKeys(namespace string, keys []string) ([]*statedb.VersionedValue, error) {
99
100
101
102
103
104
105
106
107
108
109
110
	vals := make([]*statedb.VersionedValue, len(keys))
	for i, key := range keys {
		val, err := vdb.GetState(namespace, key)
		if err != nil {
			return nil, err
		}
		vals[i] = val
	}
	return vals, nil
}

// GetStateRangeScanIterator implements method in VersionedDB interface
111
112
// startKey is inclusive
// endKey is exclusive
manish's avatar
manish committed
113
114
115
func (vdb *versionedDB) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) {
	compositeStartKey := constructCompositeKey(namespace, startKey)
	compositeEndKey := constructCompositeKey(namespace, endKey)
116
117
118
119
120
121
122
123
	if endKey == "" {
		compositeEndKey[len(compositeEndKey)-1] = lastKeyIndicator
	}
	dbItr := vdb.db.GetIterator(compositeStartKey, compositeEndKey)
	return newKVScanner(namespace, dbItr), nil
}

// ExecuteQuery implements method in VersionedDB interface
manish's avatar
manish committed
124
func (vdb *versionedDB) ExecuteQuery(query string) (statedb.ResultsIterator, error) {
125
126
127
128
	panic("Method not supported for leveldb")
}

// ApplyUpdates implements method in VersionedDB interface
manish's avatar
manish committed
129
130
func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
	dbBatch := leveldbhelper.NewUpdateBatch()
131
	for ck, vv := range batch.KVs {
manish's avatar
manish committed
132
		compositeKey := constructCompositeKey(ck.Namespace, ck.Key)
133
134
135
136
137
138
139
140
		// trace the first 200 characters of versioned value only, in case it is huge
		if logger.IsEnabledFor(logging.DEBUG) {
			versionedValueDump := fmt.Sprintf("%#v", vv)
			if len(versionedValueDump) > 200 {
				versionedValueDump = versionedValueDump[0:200] + "..."
			}
			logger.Debugf("Applying key=%#v, versionedValue=%s", ck, versionedValueDump)
		}
141
		if vv.Value == nil {
manish's avatar
manish committed
142
			dbBatch.Delete(compositeKey)
143
		} else {
manish's avatar
manish committed
144
			dbBatch.Put(compositeKey, encodeValue(vv.Value, vv.Version))
145
146
		}
	}
manish's avatar
manish committed
147
148
	dbBatch.Put(savePointKey, height.ToBytes())
	if err := vdb.db.WriteBatch(dbBatch, false); err != nil {
149
150
151
152
153
154
		return err
	}
	return nil
}

// GetLatestSavePoint implements method in VersionedDB interface
manish's avatar
manish committed
155
156
func (vdb *versionedDB) GetLatestSavePoint() (*version.Height, error) {
	versionBytes, err := vdb.db.Get(savePointKey)
157
158
159
	if err != nil {
		return nil, err
	}
160
161
162
	if versionBytes == nil {
		return nil, nil
	}
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
	version, _ := version.NewHeightFromBytes(versionBytes)
	return version, nil
}

func encodeValue(value []byte, version *version.Height) []byte {
	encodedValue := version.ToBytes()
	if value != nil {
		encodedValue = append(encodedValue, value...)
	}
	return encodedValue
}

func decodeValue(encodedValue []byte) ([]byte, *version.Height) {
	version, n := version.NewHeightFromBytes(encodedValue)
	value := encodedValue[n:]
	return value, version
}

manish's avatar
manish committed
181
182
func constructCompositeKey(ns string, key string) []byte {
	return append(append([]byte(ns), compositeKeySep...), []byte(key)...)
183
184
}

manish's avatar
manish committed
185
186
187
func splitCompositeKey(compositeKey []byte) (string, string) {
	split := bytes.SplitN(compositeKey, compositeKeySep, 2)
	return string(split[0]), string(split[1])
188
189
190
191
192
193
194
195
196
197
198
}

type kvScanner struct {
	namespace string
	dbItr     iterator.Iterator
}

func newKVScanner(namespace string, dbItr iterator.Iterator) *kvScanner {
	return &kvScanner{namespace, dbItr}
}

199
func (scanner *kvScanner) Next() (statedb.QueryResult, error) {
200
201
202
	if !scanner.dbItr.Next() {
		return nil, nil
	}
manish's avatar
manish committed
203
	_, key := splitCompositeKey(scanner.dbItr.Key())
204
205
206
207
208
209
210
211
212
	value, version := decodeValue(scanner.dbItr.Value())
	return &statedb.VersionedKV{
		CompositeKey:   statedb.CompositeKey{Namespace: scanner.namespace, Key: key},
		VersionedValue: statedb.VersionedValue{Value: value, Version: version}}, nil
}

func (scanner *kvScanner) Close() {
	scanner.dbItr.Release()
}