stateleveldb.go 6.51 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
Copyright IBM Corp. 2016 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package stateleveldb

import (
	"bytes"
21
	"errors"
22
	"fmt"
23

24
	"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
25
26
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
manish's avatar
manish committed
27
	"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
28
29
30
31
	logging "github.com/op/go-logging"
	"github.com/syndtr/goleveldb/leveldb/iterator"
)

32
var logger = logging.MustGetLogger("stateleveldb")
33
34
35
36
37
38
39

var compositeKeySep = []byte{0x00}
var lastKeyIndicator = byte(0x01)
var savePointKey = []byte{0x00}

// VersionedDBProvider implements interface VersionedDBProvider
type VersionedDBProvider struct {
manish's avatar
manish committed
40
	dbProvider *leveldbhelper.Provider
41
42
43
}

// NewVersionedDBProvider instantiates VersionedDBProvider
44
func NewVersionedDBProvider() *VersionedDBProvider {
manish's avatar
manish committed
45
	dbPath := ledgerconfig.GetStateLevelDBPath()
46
	logger.Debugf("constructing VersionedDBProvider dbPath=%s", dbPath)
manish's avatar
manish committed
47
48
	dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: dbPath})
	return &VersionedDBProvider{dbProvider}
49
50
51
}

// GetDBHandle gets the handle to a named database
52
func (provider *VersionedDBProvider) GetDBHandle(dbName string) (statedb.VersionedDB, error) {
manish's avatar
manish committed
53
	return newVersionedDB(provider.dbProvider.GetDBHandle(dbName), dbName), nil
54
55
56
57
}

// Close closes the underlying db
func (provider *VersionedDBProvider) Close() {
manish's avatar
manish committed
58
	provider.dbProvider.Close()
59
60
61
}

// VersionedDB implements VersionedDB interface
manish's avatar
manish committed
62
63
type versionedDB struct {
	db     *leveldbhelper.DBHandle
64
	dbName string
65
66
67
}

// newVersionedDB constructs an instance of VersionedDB
manish's avatar
manish committed
68
69
func newVersionedDB(db *leveldbhelper.DBHandle, dbName string) *versionedDB {
	return &versionedDB{db, dbName}
70
71
72
}

// Open implements method in VersionedDB interface
manish's avatar
manish committed
73
func (vdb *versionedDB) Open() error {
74
	// do nothing because shared db is used
75
76
77
78
	return nil
}

// Close implements method in VersionedDB interface
manish's avatar
manish committed
79
func (vdb *versionedDB) Close() {
80
	// do nothing because shared db is used
81
82
83
}

// GetState implements method in VersionedDB interface
manish's avatar
manish committed
84
func (vdb *versionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) {
85
	logger.Debugf("GetState(). ns=%s, key=%s", namespace, key)
manish's avatar
manish committed
86
	compositeKey := constructCompositeKey(namespace, key)
87
88
89
90
91
92
93
	dbVal, err := vdb.db.Get(compositeKey)
	if err != nil {
		return nil, err
	}
	if dbVal == nil {
		return nil, nil
	}
94
	val, ver := statedb.DecodeValue(dbVal)
95
96
97
98
	return &statedb.VersionedValue{Value: val, Version: ver}, nil
}

// GetStateMultipleKeys implements method in VersionedDB interface
manish's avatar
manish committed
99
func (vdb *versionedDB) GetStateMultipleKeys(namespace string, keys []string) ([]*statedb.VersionedValue, error) {
100
101
102
103
104
105
106
107
108
109
110
111
	vals := make([]*statedb.VersionedValue, len(keys))
	for i, key := range keys {
		val, err := vdb.GetState(namespace, key)
		if err != nil {
			return nil, err
		}
		vals[i] = val
	}
	return vals, nil
}

// GetStateRangeScanIterator implements method in VersionedDB interface
112
113
// startKey is inclusive
// endKey is exclusive
manish's avatar
manish committed
114
115
116
func (vdb *versionedDB) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) {
	compositeStartKey := constructCompositeKey(namespace, startKey)
	compositeEndKey := constructCompositeKey(namespace, endKey)
117
118
119
120
121
122
123
124
	if endKey == "" {
		compositeEndKey[len(compositeEndKey)-1] = lastKeyIndicator
	}
	dbItr := vdb.db.GetIterator(compositeStartKey, compositeEndKey)
	return newKVScanner(namespace, dbItr), nil
}

// ExecuteQuery implements method in VersionedDB interface
manish's avatar
manish committed
125
func (vdb *versionedDB) ExecuteQuery(query string) (statedb.ResultsIterator, error) {
126
	return nil, errors.New("ExecuteQuery not supported for leveldb")
127
128
129
}

// ApplyUpdates implements method in VersionedDB interface
manish's avatar
manish committed
130
131
func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
	dbBatch := leveldbhelper.NewUpdateBatch()
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
	namespaces := batch.GetUpdatedNamespaces()
	for _, ns := range namespaces {
		updates := batch.GetUpdates(ns)
		for k, vv := range updates {
			compositeKey := constructCompositeKey(ns, k)
			// trace the first 200 characters of versioned value only, in case it is huge
			if logger.IsEnabledFor(logging.DEBUG) {
				versionedValueDump := fmt.Sprintf("%#v", vv)
				if len(versionedValueDump) > 200 {
					versionedValueDump = versionedValueDump[0:200] + "..."
				}
				logger.Debugf("Applying key=%#v, versionedValue=%s", compositeKey, versionedValueDump)
			}
			if vv.Value == nil {
				dbBatch.Delete(compositeKey)
			} else {
148
				dbBatch.Put(compositeKey, statedb.EncodeValue(vv.Value, vv.Version))
149
			}
150
151
		}
	}
manish's avatar
manish committed
152
153
	dbBatch.Put(savePointKey, height.ToBytes())
	if err := vdb.db.WriteBatch(dbBatch, false); err != nil {
154
155
156
157
158
159
		return err
	}
	return nil
}

// GetLatestSavePoint implements method in VersionedDB interface
manish's avatar
manish committed
160
161
func (vdb *versionedDB) GetLatestSavePoint() (*version.Height, error) {
	versionBytes, err := vdb.db.Get(savePointKey)
162
163
164
	if err != nil {
		return nil, err
	}
165
166
167
	if versionBytes == nil {
		return nil, nil
	}
168
169
170
171
	version, _ := version.NewHeightFromBytes(versionBytes)
	return version, nil
}

manish's avatar
manish committed
172
173
func constructCompositeKey(ns string, key string) []byte {
	return append(append([]byte(ns), compositeKeySep...), []byte(key)...)
174
175
}

manish's avatar
manish committed
176
177
178
func splitCompositeKey(compositeKey []byte) (string, string) {
	split := bytes.SplitN(compositeKey, compositeKeySep, 2)
	return string(split[0]), string(split[1])
179
180
181
182
183
184
185
186
187
188
189
}

type kvScanner struct {
	namespace string
	dbItr     iterator.Iterator
}

func newKVScanner(namespace string, dbItr iterator.Iterator) *kvScanner {
	return &kvScanner{namespace, dbItr}
}

190
func (scanner *kvScanner) Next() (statedb.QueryResult, error) {
191
192
193
	if !scanner.dbItr.Next() {
		return nil, nil
	}
194
195
196
197
198
	dbKey := scanner.dbItr.Key()
	dbVal := scanner.dbItr.Value()
	dbValCopy := make([]byte, len(dbVal))
	copy(dbValCopy, dbVal)
	_, key := splitCompositeKey(dbKey)
199
	value, version := statedb.DecodeValue(dbValCopy)
200
201
202
203
204
205
206
207
	return &statedb.VersionedKV{
		CompositeKey:   statedb.CompositeKey{Namespace: scanner.namespace, Key: key},
		VersionedValue: statedb.VersionedValue{Value: value, Version: version}}, nil
}

func (scanner *kvScanner) Close() {
	scanner.dbItr.Release()
}