Commit 843d9265 authored by manish's avatar manish Committed by Gari Singh
Browse files

[FAB-10036] Ledger: Metadata - Commit-path enhancements



This CR enhances the ledger validation and commit code for
processing metadata writes in the read-write set.
For each key that is present in the writes in the read-write set,
the final value and metadata that should be committed to the
state-db is derived as follows
 - Take the value from the highest valid transaction (Tv)
   that updates the value
 - Take the metadata from the highest valid transaction (Tm)
   that updates the metadata
 - Construct the the final tuple <value, metadata, version> for the key
   as <value in Tv, metadata in Tm, max(version of Tv, version of Tm)>
 - Finally, if Tv indicates delete of the key,
   the  final tuple also indicates the delete of the key

For more details on the semantics, see the associated Jira.

Change-Id: Icae615b0972fe985f8e86147cbcfb0bf0a06266c
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
Signed-off-by: default avatarDavid Enyeart <enyeart@us.ibm.com>
parent 256c4fed
......@@ -111,7 +111,12 @@ func (b UpdateMap) IsEmpty() bool {
// Put sets the value in the batch for a given combination of namespace and collection name
func (b UpdateMap) Put(ns, coll, key string, value []byte, version *version.Height) {
b.getOrCreateNsBatch(ns).Put(coll, key, value, version)
b.PutValAndMetadata(ns, coll, key, value, nil, version)
}
// PutValAndMetadata adds a key with value and metadata
func (b UpdateMap) PutValAndMetadata(ns, coll, key string, value []byte, metadata []byte, version *version.Height) {
b.getOrCreateNsBatch(ns).PutValAndMetadata(coll, key, value, metadata, version)
}
// Delete adds a delete marker in the batch for a given combination of namespace and collection name
......@@ -157,7 +162,13 @@ func (h HashedUpdateBatch) Contains(ns, coll string, keyHash []byte) bool {
// Put overrides the function in UpdateMap for allowing the key to be a []byte instead of a string
func (h HashedUpdateBatch) Put(ns, coll string, key []byte, value []byte, version *version.Height) {
h.UpdateMap.Put(ns, coll, string(key), value, version)
h.PutValHashAndMetadata(ns, coll, key, value, nil, version)
}
// PutValHashAndMetadata adds a key with value and metadata
// TODO introducing a new function to limit the refactoring. Later in a separate CR, the 'Put' function above should be removed
func (h HashedUpdateBatch) PutValHashAndMetadata(ns, coll string, key []byte, value []byte, metadata []byte, version *version.Height) {
h.UpdateMap.PutValAndMetadata(ns, coll, string(key), value, metadata, version)
}
// Delete overrides the function in UpdateMap for allowing the key to be a []byte instead of a string
......
......@@ -142,10 +142,16 @@ func (batch *UpdateBatch) Get(ns string, key string) *VersionedValue {
// Put adds a key with value only. The metadata is assumed to be nil
func (batch *UpdateBatch) Put(ns string, key string, value []byte, version *version.Height) {
batch.PutValAndMetadata(ns, key, value, nil, version)
}
// PutValAndMetadata adds a key with value and metadata
// TODO introducing a new function to limit the refactoring. Later in a separate CR, the 'Put' function above should be removed
func (batch *UpdateBatch) PutValAndMetadata(ns string, key string, value []byte, metadata []byte, version *version.Height) {
if value == nil {
panic("Nil value not allowed. Instead call 'Delete' function")
}
batch.Update(ns, key, &VersionedValue{value, nil, version})
batch.Update(ns, key, &VersionedValue{value, metadata, version})
}
// Delete deletes a Key and associated value
......
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package storageutil
import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
)
// SerializeMetadata serializes metadata entries for stroing in statedb
func SerializeMetadata(metadataEntries []*kvrwset.KVMetadataEntry) ([]byte, error) {
metadata := &kvrwset.KVMetadataWrite{Entries: metadataEntries}
return proto.Marshal(metadata)
}
// DeserializeMetadata deserializes metadata bytes from statedb
func DeserializeMetadata(metadataBytes []byte) ([]*kvrwset.KVMetadataEntry, error) {
metadata := &kvrwset.KVMetadataWrite{}
if err := proto.Unmarshal(metadataBytes, metadata); err != nil {
return nil, err
}
return metadata.Entries, nil
}
......@@ -38,6 +38,7 @@ func TestMain(m *testing.M) {
flogging.SetModuleLevel("statecouchdb", "debug")
flogging.SetModuleLevel("valimpl", "debug")
flogging.SetModuleLevel("pvtstatepurgemgmt", "debug")
flogging.SetModuleLevel("valinternal", "debug")
viper.Set("peer.fileSystemPath", "/tmp/fabric/ledgertests/kvledger/txmgmt/txmgr/lockbasedtxmgr")
os.Exit(m.Run())
......
......@@ -109,7 +109,7 @@ func (v *Validator) ValidateAndPrepareBatch(block *valinternal.Block, doMVCCVali
if validationCode == peer.TxValidationCode_VALID {
logger.Debugf("Block [%d] Transaction index [%d] TxId [%s] marked as valid by state validator", block.Num, tx.IndexInBlock, tx.ID)
committingTxHeight := version.NewHeight(block.Num, uint64(tx.IndexInBlock))
updates.ApplyWriteSet(tx.RWSet, committingTxHeight)
updates.ApplyWriteSet(tx.RWSet, committingTxHeight, v.db)
} else {
logger.Warningf("Block [%d] Transaction index [%d] TxId [%s] marked as invalid by state validator. Reason code [%s]",
block.Num, tx.IndexInBlock, tx.ID, validationCode.String())
......
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package valinternal
type keyOpsFlag uint8
const (
upsertVal keyOpsFlag = 1 // 1 << 0
metadataUpdate = 2 // 1 << 1
metadataDelete = 4 // 1 << 2
keyDelete = 8 // 1 << 3
)
type compositeKey struct {
ns, coll, key string
}
type txOps map[compositeKey]*keyOps
type keyOps struct {
flag keyOpsFlag
value []byte
metadata []byte
}
////////////////// txOps functions
func (txops txOps) upsert(k compositeKey, val []byte) {
keyops := txops.getOrCreateKeyEntry(k)
keyops.flag += upsertVal
keyops.value = val
}
func (txops txOps) delete(k compositeKey) {
keyops := txops.getOrCreateKeyEntry(k)
keyops.flag += keyDelete
}
func (txops txOps) metadataUpdate(k compositeKey, metadata []byte) {
keyops := txops.getOrCreateKeyEntry(k)
keyops.flag += metadataUpdate
keyops.metadata = metadata
}
func (txops txOps) metadataDelete(k compositeKey) {
keyops := txops.getOrCreateKeyEntry(k)
keyops.flag += metadataDelete
}
func (txops txOps) getOrCreateKeyEntry(k compositeKey) *keyOps {
keyops, ok := txops[k]
if !ok {
keyops = &keyOps{}
txops[k] = keyops
}
return keyops
}
////////////////// keyOps functions
func (keyops keyOps) isDelete() bool {
return keyops.flag&(keyDelete) == keyDelete
}
func (keyops keyOps) isUpsertAndMetadataUpdate() bool {
if keyops.flag&upsertVal == upsertVal {
return keyops.flag&metadataUpdate == metadataUpdate ||
keyops.flag&metadataDelete == metadataDelete
}
return false
}
func (keyops keyOps) isOnlyUpsert() bool {
return keyops.flag|upsertVal == upsertVal
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package valinternal
import (
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/storageutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
)
func prepareTxOps(rwset *rwsetutil.TxRwSet, txht *version.Height,
precedingUpdates *PubAndHashUpdates, db privacyenabledstate.DB) (txOps, error) {
txops := txOps{}
txops.applyTxRwset(rwset)
//logger.Debugf("prepareTxOps() txops after applying raw rwset=%#v", spew.Sdump(txops))
for ck, keyop := range txops {
// check if the final state of the key, value and metadata, is already present in the transaction, then skip
// otherwise we need to retrieve latest state and merge in the current value or metadata update
if keyop.isDelete() || keyop.isUpsertAndMetadataUpdate() {
continue
}
latestVal, err := retrieveLatestState(ck.ns, ck.coll, ck.key, precedingUpdates, db)
if err != nil {
return nil, err
}
// check if only value is updated in the current transaction then merge the metadata from last committed state
if keyop.isOnlyUpsert() {
if latestVal != nil {
keyop.metadata = latestVal.Metadata
}
continue
}
// only metadata is updated in the current transaction. Merge the value from the last committed state
// If the key does not exist in the last state, make this key as noop in current transaction
if latestVal != nil {
keyop.value = latestVal.Value
} else {
delete(txops, ck)
}
}
//logger.Debugf("prepareTxOps() txops after final processing=%#v", spew.Sdump(txops))
return txops, nil
}
// applyTxRwset records the upsertion/deletion of a kv and updatation/deletion
// of asociated metadata present in a txrwset
func (txops txOps) applyTxRwset(rwset *rwsetutil.TxRwSet) error {
for _, nsRWSet := range rwset.NsRwSets {
ns := nsRWSet.NameSpace
for _, kvWrite := range nsRWSet.KvRwSet.Writes {
txops.applyKVWrite(ns, "", kvWrite)
}
for _, kvMetadataWrite := range nsRWSet.KvRwSet.MetadataWrites {
txops.applyMetadata(ns, "", kvMetadataWrite)
}
// apply collection level kvwrite and kvMetadataWrite
for _, collHashRWset := range nsRWSet.CollHashedRwSets {
coll := collHashRWset.CollectionName
for _, hashedWrite := range collHashRWset.HashedRwSet.HashedWrites {
txops.applyKVWrite(ns, coll,
&kvrwset.KVWrite{
Key: string(hashedWrite.KeyHash),
Value: hashedWrite.ValueHash,
IsDelete: hashedWrite.IsDelete,
},
)
}
for _, metadataWrite := range collHashRWset.HashedRwSet.MetadataWrites {
txops.applyMetadata(ns, coll,
&kvrwset.KVMetadataWrite{
Key: string(metadataWrite.KeyHash),
Entries: metadataWrite.Entries,
},
)
}
}
}
return nil
}
// applyKVWrite records upsertion/deletion of a kvwrite
func (txops txOps) applyKVWrite(ns, coll string, kvWrite *kvrwset.KVWrite) {
if kvWrite.IsDelete {
txops.delete(compositeKey{ns, coll, kvWrite.Key})
} else {
txops.upsert(compositeKey{ns, coll, kvWrite.Key}, kvWrite.Value)
}
}
// applyMetadata records updatation/deletion of a metadataWrite
func (txops txOps) applyMetadata(ns, coll string, metadataWrite *kvrwset.KVMetadataWrite) error {
if metadataWrite.Entries == nil {
txops.metadataDelete(compositeKey{ns, coll, metadataWrite.Key})
} else {
metadataBytes, err := storageutil.SerializeMetadata(metadataWrite.Entries)
if err != nil {
return err
}
txops.metadataUpdate(compositeKey{ns, coll, metadataWrite.Key}, metadataBytes)
}
return nil
}
// retrieveLatestState returns the value of the key from the precedingUpdates (if the key was operated upon by a previous tran in the block).
// If the key not present in the precedingUpdates, then this function, pulls the latest value from statedb
// TODO FAB-11328, pulling from state for (especially for couchdb) will pay significant performance penalty so a bulkload would be helpful.
// Further, all the keys that gets written will be required to pull from statedb by vscc for endorsement policy check (in the case of key level
// endorsement) and hence, the bulkload should be combined
func retrieveLatestState(ns, coll, key string,
precedingUpdates *PubAndHashUpdates, db privacyenabledstate.DB) (*statedb.VersionedValue, error) {
var vv *statedb.VersionedValue
var err error
if coll == "" {
vv := precedingUpdates.PubUpdates.Get(ns, key)
if vv == nil {
vv, err = db.GetState(ns, key)
}
return vv, err
}
vv = precedingUpdates.HashUpdates.Get(ns, coll, key)
if vv == nil {
vv, err = db.GetValueHash(ns, coll, []byte(key))
}
return vv, err
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package valinternal
import (
"os"
"testing"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/storageutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)
func TestMain(m *testing.M) {
flogging.SetModuleLevel("valinternal", "debug")
viper.Set("peer.fileSystemPath", "/tmp/fabric/ledgertests/kvledger/txmgmt/validator/valinternal")
os.Exit(m.Run())
}
func TestTxOpsPreparationValueUpdate(t *testing.T) {
testDBEnv := privacyenabledstate.LevelDBCommonStorageTestEnv{}
testDBEnv.Init(t)
defer testDBEnv.Cleanup()
db := testDBEnv.GetDBHandle("TestDB")
ck1, ck2, ck3 :=
compositeKey{ns: "ns1", key: "key1"},
compositeKey{ns: "ns1", key: "key2"},
compositeKey{ns: "ns1", key: "key3"}
updateBatch := privacyenabledstate.NewUpdateBatch()
updateBatch.PubUpdates.Put(ck1.ns, ck1.key, []byte("value1"), version.NewHeight(1, 1)) // write key1 with only value
updateBatch.PubUpdates.PutValAndMetadata( // write key2 with value and metadata
ck2.ns, ck2.key,
[]byte("value2"),
testutilSerializedMetadata(t, map[string][]byte{"metadata2": []byte("metadata2")}),
version.NewHeight(1, 2))
db.ApplyPrivacyAwareUpdates(updateBatch, version.NewHeight(1, 2)) //write the above initial state to db
precedingUpdates := NewPubAndHashUpdates()
rwset := testutilBuildRwset( // A sample rwset {upsert key1, key2, key3}
t,
map[compositeKey][]byte{
ck1: []byte("value1_new"),
ck2: []byte("value2_new"),
ck3: []byte("value3_new"),
},
nil,
)
txOps, err := prepareTxOps(rwset, version.NewHeight(1, 2), precedingUpdates, db)
assert.NoError(t, err)
assert.Len(t, txOps, 3)
ck1ExpectedKeyOps := &keyOps{ // finally, key1 should have only new value
flag: upsertVal,
value: []byte("value1_new"),
}
ck2ExpectedKeyOps := &keyOps{ // key2 should have new value and existing metadata
flag: upsertVal,
value: []byte("value2_new"),
metadata: testutilSerializedMetadata(t, map[string][]byte{"metadata2": []byte("metadata2")}),
}
ck3ExpectedKeyOps := &keyOps{ // key3 should have new value
flag: upsertVal,
value: []byte("value3_new"),
}
assert.Equal(t, ck1ExpectedKeyOps, txOps[ck1])
assert.Equal(t, ck2ExpectedKeyOps, txOps[ck2])
assert.Equal(t, ck3ExpectedKeyOps, txOps[ck3])
}
func TestTxOpsPreparationMetadataUpdates(t *testing.T) {
testDBEnv := privacyenabledstate.LevelDBCommonStorageTestEnv{}
testDBEnv.Init(t)
defer testDBEnv.Cleanup()
db := testDBEnv.GetDBHandle("TestDB")
ck1, ck2, ck3 :=
compositeKey{ns: "ns1", key: "key1"},
compositeKey{ns: "ns1", key: "key2"},
compositeKey{ns: "ns1", key: "key3"}
updateBatch := privacyenabledstate.NewUpdateBatch()
updateBatch.PubUpdates.Put(ck1.ns, ck1.key, []byte("value1"), version.NewHeight(1, 1)) // write key1 with only value
updateBatch.PubUpdates.PutValAndMetadata( // write key2 with value and metadata
ck2.ns, ck2.key,
[]byte("value2"),
testutilSerializedMetadata(t, map[string][]byte{"metadata2": []byte("metadata2")}),
version.NewHeight(1, 2))
db.ApplyPrivacyAwareUpdates(updateBatch, version.NewHeight(1, 2)) //write the above initial state to db
precedingUpdates := NewPubAndHashUpdates()
rwset := testutilBuildRwset( // A sample rwset {update metadta for the three keys}
t,
nil,
map[compositeKey]map[string][]byte{
ck1: {"metadata1": []byte("metadata1_new")},
ck2: {"metadata2": []byte("metadata2_new")},
ck3: {"metadata3": []byte("metadata3_new")},
},
)
txOps, err := prepareTxOps(rwset, version.NewHeight(1, 2), precedingUpdates, db)
assert.NoError(t, err)
assert.Len(t, txOps, 2) // key3 should have been removed from the txOps because, the key3 does not exist and only metadata is being updated
ck1ExpectedKeyOps := &keyOps{ // finally, key1 should have only existing value and new metadata
flag: metadataUpdate,
value: []byte("value1"),
metadata: testutilSerializedMetadata(t, map[string][]byte{"metadata1": []byte("metadata1_new")}),
}
ck2ExpectedKeyOps := &keyOps{ // key2 should have existing value and new metadata
flag: metadataUpdate,
value: []byte("value2"),
metadata: testutilSerializedMetadata(t, map[string][]byte{"metadata2": []byte("metadata2_new")}),
}
assert.Equal(t, ck1ExpectedKeyOps, txOps[ck1])
assert.Equal(t, ck2ExpectedKeyOps, txOps[ck2])
}
func TestTxOpsPreparationMetadataDelete(t *testing.T) {
testDBEnv := privacyenabledstate.LevelDBCommonStorageTestEnv{}
testDBEnv.Init(t)
defer testDBEnv.Cleanup()
db := testDBEnv.GetDBHandle("TestDB")
ck1, ck2, ck3 :=
compositeKey{ns: "ns1", key: "key1"},
compositeKey{ns: "ns1", key: "key2"},
compositeKey{ns: "ns1", key: "key3"}
updateBatch := privacyenabledstate.NewUpdateBatch()
updateBatch.PubUpdates.Put(ck1.ns, ck1.key, []byte("value1"), version.NewHeight(1, 1)) // write key1 with only value
updateBatch.PubUpdates.PutValAndMetadata( // write key2 with value and metadata
ck2.ns, ck2.key,
[]byte("value2"),
testutilSerializedMetadata(t, map[string][]byte{"metadata2": []byte("metadata2")}),
version.NewHeight(1, 2))
db.ApplyPrivacyAwareUpdates(updateBatch, version.NewHeight(1, 2)) //write the above initial state to db
precedingUpdates := NewPubAndHashUpdates()
rwset := testutilBuildRwset( // A sample rwset {delete metadata for the three keys}
t,
nil,
map[compositeKey]map[string][]byte{
ck1: {},
ck2: {},
ck3: {},
},
)
txOps, err := prepareTxOps(rwset, version.NewHeight(1, 2), precedingUpdates, db)
assert.NoError(t, err)
assert.Len(t, txOps, 2) // key3 should have been removed from the txOps because, the key3 does not exist and only metadata is being updated
ck1ExpectedKeyOps := &keyOps{ // finally, key1 should have only existing value and no metadata
flag: metadataDelete,
value: []byte("value1"),
}
ck2ExpectedKeyOps := &keyOps{ // key2 should have existing value and no metadata
flag: metadataDelete,
value: []byte("value2"),
}
assert.Equal(t, ck1ExpectedKeyOps, txOps[ck1])
assert.Equal(t, ck2ExpectedKeyOps, txOps[ck2])
}
func TestTxOpsPreparationMixedUpdates(t *testing.T) {
testDBEnv := privacyenabledstate.LevelDBCommonStorageTestEnv{}
testDBEnv.Init(t)
defer testDBEnv.Cleanup()
db := testDBEnv.GetDBHandle("TestDB")
ck1, ck2, ck3, ck4 :=
compositeKey{ns: "ns1", key: "key1"},
compositeKey{ns: "ns1", key: "key2"},
compositeKey{ns: "ns1", key: "key3"},
compositeKey{ns: "ns1", key: "key4"}
updateBatch := privacyenabledstate.NewUpdateBatch()
updateBatch.PubUpdates.Put(ck1.ns, ck1.key, []byte("value1"), version.NewHeight(1, 1)) // write key1 with only value
updateBatch.PubUpdates.Put(ck2.ns, ck2.key, []byte("value2"), version.NewHeight(1, 2)) // write key2 with only value
updateBatch.PubUpdates.PutValAndMetadata( // write key3 with value and metadata
ck3.ns, ck3.key,
[]byte("value3"),
testutilSerializedMetadata(t, map[string][]byte{"metadata3": []byte("metadata3")}),
version.NewHeight(1, 3))
updateBatch.PubUpdates.PutValAndMetadata( // write key4 with value and metadata
ck4.ns, ck4.key,
[]byte("value4"),
testutilSerializedMetadata(t, map[string][]byte{"metadata4": []byte("metadata4")}),
version.NewHeight(1, 4))
db.ApplyPrivacyAwareUpdates(updateBatch, version.NewHeight(1, 2)) //write the above initial state to db
precedingUpdates := NewPubAndHashUpdates()
rwset := testutilBuildRwset( // A sample rwset {key1:only value update, key2: value and metadata update, key3: only metadata update, key4: only value update}
t,
map[compositeKey][]byte{
ck1: []byte("value1_new"),
ck2: []byte("value2_new"),
ck4: []byte("value4_new"),
},
map[compositeKey]map[string][]byte{
ck2: {"metadata2": []byte("metadata2_new")},
ck3: {"metadata3": []byte("metadata3_new")},
},
)
txOps, err := prepareTxOps(rwset, version.NewHeight(1, 2), precedingUpdates, db)
assert.NoError(t, err)
assert.Len(t, txOps, 4)
ck1ExpectedKeyOps := &keyOps{ // finally, key1 should have only new value
flag: upsertVal,
value: []byte("value1_new"),
}
ck2ExpectedKeyOps := &keyOps{ // key2 should have new value and new metadata
flag: upsertVal + metadataUpdate,
value: []byte("value2_new"),
metadata: testutilSerializedMetadata(t, map[string][]byte{"metadata2": []byte("metadata2_new")}),
}
ck3ExpectedKeyOps := &keyOps{ // key3 should have existing value and new metadata
flag: metadataUpdate,
value: []byte("value3"),
metadata: testutilSerializedMetadata(t, map[string][]byte{"metadata3": []byte("metadata3_new")}),
}
ck4ExpectedKeyOps := &keyOps{ // key4 should have new value and existing metadata
fl