stateleveldb.go 6.47 KB
Newer Older
1
/*
2
3
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
4
5
6
7
8
9
*/

package stateleveldb

import (
	"bytes"
10
	"errors"
11

12
	"github.com/hyperledger/fabric/common/flogging"
13
	"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
14
15
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
manish's avatar
manish committed
16
	"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
17
18
19
	"github.com/syndtr/goleveldb/leveldb/iterator"
)

20
var logger = flogging.MustGetLogger("stateleveldb")
21
22
23
24
25
26
27

var compositeKeySep = []byte{0x00}
var lastKeyIndicator = byte(0x01)
var savePointKey = []byte{0x00}

// VersionedDBProvider implements interface VersionedDBProvider
type VersionedDBProvider struct {
manish's avatar
manish committed
28
	dbProvider *leveldbhelper.Provider
29
30
31
}

// NewVersionedDBProvider instantiates VersionedDBProvider
32
func NewVersionedDBProvider() *VersionedDBProvider {
manish's avatar
manish committed
33
	dbPath := ledgerconfig.GetStateLevelDBPath()
34
	logger.Debugf("constructing VersionedDBProvider dbPath=%s", dbPath)
manish's avatar
manish committed
35
36
	dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: dbPath})
	return &VersionedDBProvider{dbProvider}
37
38
39
}

// GetDBHandle gets the handle to a named database
40
func (provider *VersionedDBProvider) GetDBHandle(dbName string) (statedb.VersionedDB, error) {
manish's avatar
manish committed
41
	return newVersionedDB(provider.dbProvider.GetDBHandle(dbName), dbName), nil
42
43
44
45
}

// Close closes the underlying db
func (provider *VersionedDBProvider) Close() {
manish's avatar
manish committed
46
	provider.dbProvider.Close()
47
48
49
}

// VersionedDB implements VersionedDB interface
manish's avatar
manish committed
50
51
type versionedDB struct {
	db     *leveldbhelper.DBHandle
52
	dbName string
53
54
55
}

// newVersionedDB constructs an instance of VersionedDB
manish's avatar
manish committed
56
57
func newVersionedDB(db *leveldbhelper.DBHandle, dbName string) *versionedDB {
	return &versionedDB{db, dbName}
58
59
60
}

// Open implements method in VersionedDB interface
manish's avatar
manish committed
61
func (vdb *versionedDB) Open() error {
62
	// do nothing because shared db is used
63
64
65
66
	return nil
}

// Close implements method in VersionedDB interface
manish's avatar
manish committed
67
func (vdb *versionedDB) Close() {
68
	// do nothing because shared db is used
69
70
}

71
72
73
74
75
// ValidateKey implements method in VersionedDB interface
func (vdb *versionedDB) ValidateKey(key string) error {
	return nil
}

76
77
78
79
80
// BytesKeySuppoted implements method in VersionedDB interface
func (vdb *versionedDB) BytesKeySuppoted() bool {
	return true
}

81
// GetState implements method in VersionedDB interface
manish's avatar
manish committed
82
func (vdb *versionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) {
83
	logger.Debugf("GetState(). ns=%s, key=%s", namespace, key)
manish's avatar
manish committed
84
	compositeKey := constructCompositeKey(namespace, key)
85
86
87
88
89
90
91
	dbVal, err := vdb.db.Get(compositeKey)
	if err != nil {
		return nil, err
	}
	if dbVal == nil {
		return nil, nil
	}
92
	val, ver := statedb.DecodeValue(dbVal)
93
94
95
	return &statedb.VersionedValue{Value: val, Version: ver}, nil
}

96
97
98
99
100
101
102
103
104
105
106
107
// GetVersion implements method in VersionedDB interface
func (vdb *versionedDB) GetVersion(namespace string, key string) (*version.Height, error) {
	versionedValue, err := vdb.GetState(namespace, key)
	if err != nil {
		return nil, err
	}
	if versionedValue == nil {
		return nil, nil
	}
	return versionedValue.Version, nil
}

108
// GetStateMultipleKeys implements method in VersionedDB interface
manish's avatar
manish committed
109
func (vdb *versionedDB) GetStateMultipleKeys(namespace string, keys []string) ([]*statedb.VersionedValue, error) {
110
111
112
113
114
115
116
117
118
119
120
121
	vals := make([]*statedb.VersionedValue, len(keys))
	for i, key := range keys {
		val, err := vdb.GetState(namespace, key)
		if err != nil {
			return nil, err
		}
		vals[i] = val
	}
	return vals, nil
}

// GetStateRangeScanIterator implements method in VersionedDB interface
122
123
// startKey is inclusive
// endKey is exclusive
manish's avatar
manish committed
124
125
126
func (vdb *versionedDB) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) {
	compositeStartKey := constructCompositeKey(namespace, startKey)
	compositeEndKey := constructCompositeKey(namespace, endKey)
127
128
129
130
131
132
133
134
	if endKey == "" {
		compositeEndKey[len(compositeEndKey)-1] = lastKeyIndicator
	}
	dbItr := vdb.db.GetIterator(compositeStartKey, compositeEndKey)
	return newKVScanner(namespace, dbItr), nil
}

// ExecuteQuery implements method in VersionedDB interface
135
func (vdb *versionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIterator, error) {
136
	return nil, errors.New("ExecuteQuery not supported for leveldb")
137
138
139
}

// ApplyUpdates implements method in VersionedDB interface
manish's avatar
manish committed
140
141
func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
	dbBatch := leveldbhelper.NewUpdateBatch()
142
143
144
145
146
	namespaces := batch.GetUpdatedNamespaces()
	for _, ns := range namespaces {
		updates := batch.GetUpdates(ns)
		for k, vv := range updates {
			compositeKey := constructCompositeKey(ns, k)
147
			logger.Debugf("Channel [%s]: Applying key(string)=[%s] key(bytes)=[%#v]", vdb.dbName, string(compositeKey), compositeKey)
148

149
150
151
			if vv.Value == nil {
				dbBatch.Delete(compositeKey)
			} else {
152
				dbBatch.Put(compositeKey, statedb.EncodeValue(vv.Value, vv.Version))
153
			}
154
155
		}
	}
manish's avatar
manish committed
156
	dbBatch.Put(savePointKey, height.ToBytes())
157
158
	// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
	if err := vdb.db.WriteBatch(dbBatch, true); err != nil {
159
160
161
162
163
164
		return err
	}
	return nil
}

// GetLatestSavePoint implements method in VersionedDB interface
manish's avatar
manish committed
165
166
func (vdb *versionedDB) GetLatestSavePoint() (*version.Height, error) {
	versionBytes, err := vdb.db.Get(savePointKey)
167
168
169
	if err != nil {
		return nil, err
	}
170
171
172
	if versionBytes == nil {
		return nil, nil
	}
173
174
175
176
	version, _ := version.NewHeightFromBytes(versionBytes)
	return version, nil
}

manish's avatar
manish committed
177
178
func constructCompositeKey(ns string, key string) []byte {
	return append(append([]byte(ns), compositeKeySep...), []byte(key)...)
179
180
}

manish's avatar
manish committed
181
182
183
func splitCompositeKey(compositeKey []byte) (string, string) {
	split := bytes.SplitN(compositeKey, compositeKeySep, 2)
	return string(split[0]), string(split[1])
184
185
186
187
188
189
190
191
192
193
194
}

type kvScanner struct {
	namespace string
	dbItr     iterator.Iterator
}

func newKVScanner(namespace string, dbItr iterator.Iterator) *kvScanner {
	return &kvScanner{namespace, dbItr}
}

195
func (scanner *kvScanner) Next() (statedb.QueryResult, error) {
196
197
198
	if !scanner.dbItr.Next() {
		return nil, nil
	}
199
200
201
202
203
	dbKey := scanner.dbItr.Key()
	dbVal := scanner.dbItr.Value()
	dbValCopy := make([]byte, len(dbVal))
	copy(dbValCopy, dbVal)
	_, key := splitCompositeKey(dbKey)
204
	value, version := statedb.DecodeValue(dbValCopy)
205
206
207
208
209
210
211
212
	return &statedb.VersionedKV{
		CompositeKey:   statedb.CompositeKey{Namespace: scanner.namespace, Key: key},
		VersionedValue: statedb.VersionedValue{Value: value, Version: version}}, nil
}

func (scanner *kvScanner) Close() {
	scanner.dbItr.Release()
}