Commit 945138e9 authored by manish's avatar manish
Browse files

Fix perf issue by key-endorsement to regular chaincode



Key-level endorsement needs to access the metadata for each
of the keys in the write-set. Even the chaincodes that do not
use this feature will end up paying this additional penality.
Especially, in an environment that uses  couchdb, the performance
regression will be visible for the existing chaincodes.

The major reason is that the metadata is accessed one at a time
during validation and commit path whereas, couchdb is better
used if the data can be loaded in bulk.

Ideally, the data required by key-level endorsement for the entire
block should be loaded in a single bulk load call. Which would
benefit even the chaincodes that leverage this feature. However,
this would need some refactoring that can be taken up in a future
release.

This CR introduces a fix which would save the regular chaincodes
(that do not use key-level endorsement feature) from paying
the avobe mentioned penality. In this fix, we track the chaincodes
that do not use metadata and for such chaincodes, the GetStateMetadata
call simply returns nil without going to the underlying db.

Even when we start using the bulk-load for key-level endorsement,
this fix will still be relevant in the sense that the bulkload call
will be avoided for regular chaincodes

FAB-11700 #done

Change-Id: Icfc654b7d27f727f1811a5a300400e92b6e8be9d
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent 861254d8
......@@ -81,6 +81,9 @@ func (h *DBHandle) Delete(key []byte, sync bool) error {
// WriteBatch writes a batch in an atomic way
func (h *DBHandle) WriteBatch(batch *UpdateBatch, sync bool) error {
if len(batch.KVs) == 0 {
return nil
}
levelBatch := &leveldb.Batch{}
for k, v := range batch.KVs {
key := constructLevelKey(h.dbName, []byte(k))
......
......@@ -19,6 +19,8 @@ type Category int
const (
// PvtdataExpiry repersents the bookkeeping related to expiry of pvtdata because of BTL policy
PvtdataExpiry Category = iota
// MetadataPresenceIndicator maintains the bookkeeping about whether metadata is ever set for a namespace
MetadataPresenceIndicator
)
// Provider provides handle to different bookkeepers for the given ledger
......
......@@ -28,7 +28,7 @@ type TestEnv struct {
}
// NewTestEnv construct a TestEnv for testing
func NewTestEnv(t *testing.T) *TestEnv {
func NewTestEnv(t testing.TB) *TestEnv {
removePath(t)
provider := NewProvider()
return &TestEnv{t, provider}
......
......@@ -57,14 +57,14 @@ func NewProvider() (ledger.PeerLedgerProvider, error) {
// Initialize the ID store (inventory of chainIds/ledgerIds)
idStore := openIDStore(ledgerconfig.GetLedgerProviderPath())
ledgerStoreProvider := ledgerstorage.NewProvider()
bookkeepingProvider := bookkeeping.NewProvider()
// Initialize the versioned database (state database)
vdbProvider, err := privacyenabledstate.NewCommonStorageDBProvider()
vdbProvider, err := privacyenabledstate.NewCommonStorageDBProvider(bookkeepingProvider)
if err != nil {
return nil, err
}
// Initialize the history database (index for history of values by key)
historydbProvider := historyleveldb.NewHistoryDBProvider()
bookkeepingProvider := bookkeeping.NewProvider()
logger.Info("ledger provider Initialized")
provider := &Provider{idStore, ledgerStoreProvider,
vdbProvider, historydbProvider, nil, nil, bookkeepingProvider, nil}
......
......@@ -15,13 +15,13 @@ import (
"github.com/hyperledger/fabric/core/common/ccprovider"
"github.com/hyperledger/fabric/core/common/privdata"
"github.com/hyperledger/fabric/core/ledger/cceventmgmt"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/protos/common"
"github.com/pkg/errors"
)
......@@ -36,10 +36,11 @@ const (
// CommonStorageDBProvider implements interface DBProvider
type CommonStorageDBProvider struct {
statedb.VersionedDBProvider
bookkeepingProvider bookkeeping.Provider
}
// NewCommonStorageDBProvider constructs an instance of DBProvider
func NewCommonStorageDBProvider() (DBProvider, error) {
func NewCommonStorageDBProvider(bookkeeperProvider bookkeeping.Provider) (DBProvider, error) {
var vdbProvider statedb.VersionedDBProvider
var err error
if ledgerconfig.IsCouchDBEnabled() {
......@@ -49,7 +50,7 @@ func NewCommonStorageDBProvider() (DBProvider, error) {
} else {
vdbProvider = stateleveldb.NewVersionedDBProvider()
}
return &CommonStorageDBProvider{vdbProvider}, nil
return &CommonStorageDBProvider{vdbProvider, bookkeeperProvider}, nil
}
// GetDBHandle implements function from interface DBProvider
......@@ -58,7 +59,9 @@ func (p *CommonStorageDBProvider) GetDBHandle(id string) (DB, error) {
if err != nil {
return nil, err
}
return NewCommonStorageDB(vdb, id)
bookkeeper := p.bookkeepingProvider.GetDBHandle(id, bookkeeping.MetadataPresenceIndicator)
metadataHint := newMetadataHint(bookkeeper)
return NewCommonStorageDB(vdb, id, metadataHint)
}
// Close implements function from interface DBProvider
......@@ -70,12 +73,13 @@ func (p *CommonStorageDBProvider) Close() {
// both the public and private data
type CommonStorageDB struct {
statedb.VersionedDB
metadataHint *metadataHint
}
// NewCommonStorageDB wraps a VersionedDB instance. The public data is managed directly by the wrapped versionedDB.
// For managing the hashed data and private data, this implementation creates separate namespaces in the wrapped db
func NewCommonStorageDB(vdb statedb.VersionedDB, ledgerid string) (DB, error) {
return &CommonStorageDB{VersionedDB: vdb}, nil
func NewCommonStorageDB(vdb statedb.VersionedDB, ledgerid string, metadataHint *metadataHint) (DB, error) {
return &CommonStorageDB{vdb, metadataHint}, nil
}
// IsBulkOptimizable implements corresponding function in interface DB
......@@ -197,9 +201,39 @@ func (s *CommonStorageDB) ApplyPrivacyAwareUpdates(updates *UpdateBatch, height
combinedUpdates := updates.PubUpdates
addPvtUpdates(combinedUpdates, updates.PvtUpdates)
addHashedUpdates(combinedUpdates, updates.HashUpdates, !s.BytesKeySuppoted())
s.metadataHint.setMetadataUsedFlag(updates)
return s.VersionedDB.ApplyUpdates(combinedUpdates.UpdateBatch, height)
}
// GetStateMetadata implements corresponding function in interface DB. This implementation provides
// an optimization such that it keeps track if a namespaces has never stored metadata for any of
// its items, the value 'nil' is returned without going to the db. This is intented to be invoked
// in the validation and commit path. This saves the chaincodes from paying unnecessary performance
// penality if they do not use features that leverage metadata (such as key-level endorsement),
func (s *CommonStorageDB) GetStateMetadata(namespace, key string) ([]byte, error) {
if !s.metadataHint.metadataEverUsedFor(namespace) {
return nil, nil
}
vv, err := s.GetState(namespace, key)
if err != nil || vv == nil {
return nil, err
}
return vv.Metadata, nil
}
// GetPrivateDataMetadataByHash implements corresponding function in interface DB. For additional details, see
// decription of the similar function 'GetStateMetadata'
func (s *CommonStorageDB) GetPrivateDataMetadataByHash(namespace, collection string, keyHash []byte) ([]byte, error) {
if !s.metadataHint.metadataEverUsedFor(namespace) {
return nil, nil
}
vv, err := s.GetValueHash(namespace, collection, keyHash)
if err != nil || vv == nil {
return nil, err
}
return vv.Metadata, nil
}
func (s *CommonStorageDB) getCollectionConfigMap(chaincodeDefinition *cceventmgmt.ChaincodeDefinition) (map[string]bool, error) {
var collectionConfigsBytes []byte
collectionConfigsMap := make(map[string]bool)
......@@ -254,17 +288,14 @@ func (s *CommonStorageDB) getCollectionConfigMap(chaincodeDefinition *cceventmgm
// is acceptable since peer can continue in the committing role without the indexes. However, executing chaincode queries
// may be affected, until a new chaincode with fixed indexes is installed and instantiated
func (s *CommonStorageDB) HandleChaincodeDeploy(chaincodeDefinition *cceventmgmt.ChaincodeDefinition, dbArtifactsTar []byte) error {
//Check to see if the interface for IndexCapable is implemented
indexCapable, ok := s.VersionedDB.(statedb.IndexCapable)
if !ok {
return nil
}
if chaincodeDefinition == nil {
return errors.New("chaincode definition not found while creating couchdb index")
}
dbArtifacts, err := ccprovider.ExtractFileEntries(dbArtifactsTar, indexCapable.GetDBType())
if err != nil {
logger.Errorf("Index creation: error extracting db artifacts from tar for chaincode [%s]: %s", chaincodeDefinition.Name, err)
......@@ -296,7 +327,6 @@ func (s *CommonStorageDB) HandleChaincodeDeploy(chaincodeDefinition *cceventmgmt
if !ok {
logger.Errorf("Error processing index for chaincode [%s]: cannot create an index for an undefined collection=[%s]", chaincodeDefinition.Name, collectionName)
} else {
err := indexCapable.ProcessIndexesForChaincodeDeploy(derivePvtDataNs(chaincodeDefinition.Name, collectionName),
archiveDirectoryEntries)
if err != nil {
......
......@@ -35,6 +35,8 @@ type DB interface {
GetKeyHashVersion(namespace, collection string, keyHash []byte) (*version.Height, error)
GetPrivateDataMultipleKeys(namespace, collection string, keys []string) ([]*statedb.VersionedValue, error)
GetPrivateDataRangeScanIterator(namespace, collection, startKey, endKey string) (statedb.ResultsIterator, error)
GetStateMetadata(namespace, key string) ([]byte, error)
GetPrivateDataMetadataByHash(namespace, collection string, keyHash []byte) ([]byte, error)
ExecuteQueryOnPrivateData(namespace, collection, query string) (statedb.ResultsIterator, error)
ApplyPrivacyAwareUpdates(updates *UpdateBatch, height *version.Height) error
}
......
......@@ -542,11 +542,54 @@ func testHandleChainCodeDeploy(t *testing.T, env TestEnv) {
}
func TestMetadataRetrieval(t *testing.T) {
for _, env := range testEnvs {
t.Run(env.GetName(), func(t *testing.T) {
testMetadataRetrieval(t, env)
})
}
}
func testMetadataRetrieval(t *testing.T, env TestEnv) {
env.Init(t)
defer env.Cleanup()
db := env.GetDBHandle("test-ledger-id")
updates := NewUpdateBatch()
updates.PubUpdates.PutValAndMetadata("ns1", "key1", []byte("value1"), []byte("metadata1"), version.NewHeight(1, 1))
updates.PubUpdates.PutValAndMetadata("ns1", "key2", []byte("value2"), nil, version.NewHeight(1, 2))
updates.PubUpdates.PutValAndMetadata("ns2", "key3", []byte("value3"), nil, version.NewHeight(1, 3))
putPvtUpdatesWithMetadata(t, updates, "ns1", "coll1", "key1", []byte("pvt_value1"), []byte("metadata1"), version.NewHeight(1, 4))
putPvtUpdatesWithMetadata(t, updates, "ns1", "coll1", "key2", []byte("pvt_value2"), nil, version.NewHeight(1, 5))
putPvtUpdatesWithMetadata(t, updates, "ns2", "coll1", "key3", []byte("pvt_value3"), nil, version.NewHeight(1, 6))
db.ApplyPrivacyAwareUpdates(updates, version.NewHeight(2, 6))
vm, _ := db.GetStateMetadata("ns1", "key1")
assert.Equal(t, vm, []byte("metadata1"))
vm, _ = db.GetStateMetadata("ns1", "key2")
assert.Nil(t, vm)
vm, _ = db.GetStateMetadata("ns2", "key3")
assert.Nil(t, vm)
vm, _ = db.GetPrivateDataMetadataByHash("ns1", "coll1", util.ComputeStringHash("key1"))
assert.Equal(t, vm, []byte("metadata1"))
vm, _ = db.GetPrivateDataMetadataByHash("ns1", "coll1", util.ComputeStringHash("key2"))
assert.Nil(t, vm)
vm, _ = db.GetPrivateDataMetadataByHash("ns2", "coll1", util.ComputeStringHash("key3"))
assert.Nil(t, vm)
}
func putPvtUpdates(t *testing.T, updates *UpdateBatch, ns, coll, key string, value []byte, ver *version.Height) {
updates.PvtUpdates.Put(ns, coll, key, value, ver)
updates.HashUpdates.Put(ns, coll, util.ComputeStringHash(key), util.ComputeHash(value), ver)
}
func putPvtUpdatesWithMetadata(t *testing.T, updates *UpdateBatch, ns, coll, key string, value []byte, metadata []byte, ver *version.Height) {
updates.PvtUpdates.Put(ns, coll, key, value, ver)
updates.HashUpdates.PutValHashAndMetadata(ns, coll, util.ComputeStringHash(key), util.ComputeHash(value), metadata, ver)
}
func deletePvtUpdates(t *testing.T, updates *UpdateBatch, ns, coll, key string, ver *version.Height) {
updates.PvtUpdates.Delete(ns, coll, key, ver)
updates.HashUpdates.Delete(ns, coll, util.ComputeStringHash(key), ver)
......
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package privacyenabledstate
import (
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
)
type metadataHint struct {
cache map[string]bool
bookkeeper *leveldbhelper.DBHandle
}
func newMetadataHint(bookkeeper *leveldbhelper.DBHandle) *metadataHint {
cache := map[string]bool{}
itr := bookkeeper.GetIterator(nil, nil)
defer itr.Release()
for itr.Next() {
namespace := string(itr.Key())
cache[namespace] = true
}
return &metadataHint{cache, bookkeeper}
}
func (h *metadataHint) metadataEverUsedFor(namespace string) bool {
return h.cache[namespace]
}
func (h *metadataHint) setMetadataUsedFlag(updates *UpdateBatch) {
batch := leveldbhelper.NewUpdateBatch()
for ns := range filterNamespacesThatHasMetadata(updates) {
if h.cache[ns] {
continue
}
h.cache[ns] = true
batch.Put([]byte(ns), []byte{})
}
h.bookkeeper.WriteBatch(batch, true)
}
func filterNamespacesThatHasMetadata(updates *UpdateBatch) map[string]bool {
namespaces := map[string]bool{}
pubUpdates, hashUpdates := updates.PubUpdates, updates.HashUpdates
// add ns for public data
for _, ns := range pubUpdates.GetUpdatedNamespaces() {
for _, vv := range updates.PubUpdates.GetUpdates(ns) {
if vv.Metadata == nil {
continue
}
namespaces[ns] = true
}
}
// add ns for private hashes
for ns, nsBatch := range hashUpdates.UpdateMap {
for _, coll := range nsBatch.GetCollectionNames() {
for _, vv := range nsBatch.GetUpdates(coll) {
if vv.Metadata == nil {
continue
}
namespaces[ns] = true
}
}
}
return namespaces
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package privacyenabledstate
import (
"testing"
"github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/mock"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/stretchr/testify/assert"
)
func TestMetadataHintCorrectness(t *testing.T) {
bookkeepingTestEnv := bookkeeping.NewTestEnv(t)
defer bookkeepingTestEnv.Cleanup()
bookkeeper := bookkeepingTestEnv.TestProvider.GetDBHandle("ledger1", bookkeeping.MetadataPresenceIndicator)
metadataHint := newMetadataHint(bookkeeper)
assert.False(t, metadataHint.metadataEverUsedFor("ns1"))
updates := NewUpdateBatch()
updates.PubUpdates.PutValAndMetadata("ns1", "key", []byte("value"), []byte("metadata"), version.NewHeight(1, 1))
updates.PubUpdates.PutValAndMetadata("ns2", "key", []byte("value"), []byte("metadata"), version.NewHeight(1, 2))
updates.PubUpdates.PutValAndMetadata("ns3", "key", []byte("value"), nil, version.NewHeight(1, 3))
updates.HashUpdates.PutValAndMetadata("ns1_pvt", "key", "coll", []byte("value"), []byte("metadata"), version.NewHeight(1, 1))
updates.HashUpdates.PutValAndMetadata("ns2_pvt", "key", "coll", []byte("value"), []byte("metadata"), version.NewHeight(1, 3))
updates.HashUpdates.PutValAndMetadata("ns3_pvt", "key", "coll", []byte("value"), nil, version.NewHeight(1, 3))
metadataHint.setMetadataUsedFlag(updates)
t.Run("MetadataAddedInCurrentSession", func(t *testing.T) {
assert.True(t, metadataHint.metadataEverUsedFor("ns1"))
assert.True(t, metadataHint.metadataEverUsedFor("ns2"))
assert.True(t, metadataHint.metadataEverUsedFor("ns1_pvt"))
assert.True(t, metadataHint.metadataEverUsedFor("ns2_pvt"))
assert.False(t, metadataHint.metadataEverUsedFor("ns3"))
assert.False(t, metadataHint.metadataEverUsedFor("ns4"))
})
t.Run("MetadataFromPersistence", func(t *testing.T) {
metadataHintFromPersistence := newMetadataHint(bookkeeper)
assert.True(t, metadataHintFromPersistence.metadataEverUsedFor("ns1"))
assert.True(t, metadataHintFromPersistence.metadataEverUsedFor("ns2"))
assert.True(t, metadataHintFromPersistence.metadataEverUsedFor("ns1_pvt"))
assert.True(t, metadataHintFromPersistence.metadataEverUsedFor("ns2_pvt"))
assert.False(t, metadataHintFromPersistence.metadataEverUsedFor("ns3"))
assert.False(t, metadataHintFromPersistence.metadataEverUsedFor("ns4"))
})
}
func TestMetadataHintOptimizationSkippingGoingToDB(t *testing.T) {
bookkeepingTestEnv := bookkeeping.NewTestEnv(t)
defer bookkeepingTestEnv.Cleanup()
bookkeeper := bookkeepingTestEnv.TestProvider.GetDBHandle("ledger1", bookkeeping.MetadataPresenceIndicator)
mockVersionedDB := &mock.VersionedDB{}
db, err := NewCommonStorageDB(mockVersionedDB, "testledger", newMetadataHint(bookkeeper))
assert.NoError(t, err)
updates := NewUpdateBatch()
updates.PubUpdates.PutValAndMetadata("ns1", "key", []byte("value"), []byte("metadata"), version.NewHeight(1, 1))
updates.PubUpdates.PutValAndMetadata("ns2", "key", []byte("value"), nil, version.NewHeight(1, 2))
db.ApplyPrivacyAwareUpdates(updates, version.NewHeight(1, 3))
db.GetStateMetadata("ns1", "randomkey")
assert.Equal(t, 1, mockVersionedDB.GetStateCallCount())
db.GetPrivateDataMetadataByHash("ns1", "randomColl", []byte("randomKeyhash"))
assert.Equal(t, 2, mockVersionedDB.GetStateCallCount())
db.GetStateMetadata("ns2", "randomkey")
db.GetPrivateDataMetadataByHash("ns2", "randomColl", []byte("randomKeyhash"))
assert.Equal(t, 2, mockVersionedDB.GetStateCallCount())
db.GetStateMetadata("randomeNs", "randomkey")
db.GetPrivateDataMetadataByHash("randomeNs", "randomColl", []byte("randomKeyhash"))
assert.Equal(t, 2, mockVersionedDB.GetStateCallCount())
}
......@@ -12,6 +12,8 @@ import (
"testing"
"time"
"github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/integration/runner"
......@@ -36,15 +38,17 @@ var testEnvs = []TestEnv{&LevelDBCommonStorageTestEnv{}, &CouchDBCommonStorageTe
// LevelDBCommonStorageTestEnv implements TestEnv interface for leveldb based storage
type LevelDBCommonStorageTestEnv struct {
t testing.TB
provider DBProvider
t testing.TB
provider DBProvider
bookkeeperTestEnv *bookkeeping.TestEnv
}
// Init implements corresponding function from interface TestEnv
func (env *LevelDBCommonStorageTestEnv) Init(t testing.TB) {
viper.Set("ledger.state.stateDatabase", "")
removeDBPath(t)
dbProvider, err := NewCommonStorageDBProvider()
env.bookkeeperTestEnv = bookkeeping.NewTestEnv(t)
dbProvider, err := NewCommonStorageDBProvider(env.bookkeeperTestEnv.TestProvider)
assert.NoError(t, err)
env.t = t
env.provider = dbProvider
......@@ -65,6 +69,7 @@ func (env *LevelDBCommonStorageTestEnv) GetName() string {
// Cleanup implements corresponding function from interface TestEnv
func (env *LevelDBCommonStorageTestEnv) Cleanup() {
env.provider.Close()
env.bookkeeperTestEnv.Cleanup()
removeDBPath(env.t)
}
......@@ -72,10 +77,11 @@ func (env *LevelDBCommonStorageTestEnv) Cleanup() {
// CouchDBCommonStorageTestEnv implements TestEnv interface for couchdb based storage
type CouchDBCommonStorageTestEnv struct {
t testing.TB
provider DBProvider
openDbIds map[string]bool
couchCleanup func()
t testing.TB
provider DBProvider
bookkeeperTestEnv *bookkeeping.TestEnv
openDbIds map[string]bool
couchCleanup func()
}
func (env *CouchDBCommonStorageTestEnv) setupCouch() string {
......@@ -106,7 +112,9 @@ func (env *CouchDBCommonStorageTestEnv) Init(t testing.TB) {
viper.Set("ledger.state.couchDBConfig.maxRetries", 3)
viper.Set("ledger.state.couchDBConfig.maxRetriesOnStartup", 10)
viper.Set("ledger.state.couchDBConfig.requestTimeout", time.Second*35)
dbProvider, err := NewCommonStorageDBProvider()
env.bookkeeperTestEnv = bookkeeping.NewTestEnv(t)
dbProvider, err := NewCommonStorageDBProvider(env.bookkeeperTestEnv.TestProvider)
assert.NoError(t, err)
env.t = t
env.provider = dbProvider
......@@ -131,6 +139,7 @@ func (env *CouchDBCommonStorageTestEnv) Cleanup() {
for id := range env.openDbIds {
statecouchdb.CleanupDB(id)
}
env.bookkeeperTestEnv.Cleanup()
env.provider.Close()
env.couchCleanup()
}
......
This diff is collapsed.
......@@ -16,6 +16,7 @@ import (
)
//go:generate counterfeiter -o mock/results_iterator.go -fake-name ResultsIterator . ResultsIterator
//go:generate counterfeiter -o mock/versioned_db.go -fake-name VersionedDB . VersionedDB
// VersionedDBProvider provides an instance of an versioned DB
type VersionedDBProvider interface {
......
......@@ -8,6 +8,7 @@ package lockbasedtxmgr
import (
"fmt"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/storageutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
commonledger "github.com/hyperledger/fabric/common/ledger"
......@@ -224,6 +225,61 @@ func (h *queryHelper) executeQueryOnPrivateData(namespace, collection, query str
return &pvtdataResultsItr{namespace, collection, dbItr}, nil
}
func (h *queryHelper) getStateMetadata(ns string, key string) (map[string][]byte, error) {
if err := h.checkDone(); err != nil {
return nil, err
}
var metadataBytes []byte
var err error
if h.rwsetBuilder == nil {
// reads versions are not getting recorded, retrieve metadata value via optimized path
if metadataBytes, err = h.txmgr.db.GetStateMetadata(ns, key); err != nil {
return nil, err
}
} else {
if _, metadataBytes, err = h.getState(ns, key); err != nil {
return nil, err
}
}
return storageutil.DeserializeMetadata(metadataBytes)
}
func (h *queryHelper) getPrivateDataMetadata(ns, coll, key string) (map[string][]byte, error) {
if h.rwsetBuilder == nil {
// reads versions are not getting recorded, retrieve metadata value via optimized path
return h.getPrivateDataMetadataByHash(ns, coll, util.ComputeStringHash(key))
}
if err := h.validateCollName(ns, coll); err != nil {
return nil, err
}
if err := h.checkDone(); err != nil {
return nil, err
}
_, metadataBytes, err := h.getPrivateDataValueHash(ns, coll, key)
if err != nil {
return nil, err
}
return storageutil.DeserializeMetadata(metadataBytes)
}
func (h *queryHelper) getPrivateDataMetadataByHash(ns, coll string, keyhash []byte) (map[string][]byte, error) {
if err := h.validateCollName(ns, coll); err != nil {
return nil, err
}
if err := h.checkDone(); err != nil {
return nil, err
}
if h.rwsetBuilder != nil {
// this requires to improve rwset builder to accept a keyhash
return nil, errors.New("retrieving private data metadata by keyhash is not supported in simulation. This function is only available for query as yet")
}
metadataBytes, err := h.txmgr.db.GetPrivateDataMetadataByHash(ns, coll, keyhash)
if err != nil {
return nil, err
}
return storageutil.DeserializeMetadata(metadataBytes)
}
func (h *queryHelper) done() {
if h.doneInvoked {
return
......
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package lockbasedtxmgr
......@@ -19,15 +8,16 @@ package lockbasedtxmgr
import (
"testing"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"