Commit 84629f03 authored by manish's avatar manish Committed by Yacov Manevich
Browse files

[FAB-9657] Ledger: Metadata - E2E support



This CR ties all the changes made in the ledger code
for enabling the support of metadata. Specifically,
this includes
 - Changes in query-executor and tx-simulator for allowing the requests
   from chaincode/vscc for metadata
 - Tests for the e2e functionality at the ledger level

Change-Id: Ia156437002cddf9ae5ed902b37dcadfade9d2400
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent bc0d4cf1
......@@ -46,7 +46,7 @@ func (v *collNameValidator) validateCollName(ns, coll string) error {
func (v *collNameValidator) retrieveCollConfigFromStateDB(ns string) (*common.CollectionConfigPackage, error) {
logger.Debugf("retrieveCollConfigFromStateDB() begin - ns=[%s]", ns)
configPkgBytes, err := v.queryHelper.getState(lsccNamespace, constructCollectionConfigKey(ns))
configPkgBytes, _, err := v.queryHelper.getState(lsccNamespace, constructCollectionConfigKey(ns))
if err != nil {
return nil, err
}
......
......@@ -37,19 +37,19 @@ func newQueryHelper(txmgr *LockBasedTxMgr, rwsetBuilder *rwsetutil.RWSetBuilder)
return helper
}
func (h *queryHelper) getState(ns string, key string) ([]byte, error) {
func (h *queryHelper) getState(ns string, key string) ([]byte, []byte, error) {
if err := h.checkDone(); err != nil {
return nil, err
return nil, nil, err
}
versionedValue, err := h.txmgr.db.GetState(ns, key)
if err != nil {
return nil, err
return nil, nil, err
}
val, ver := decomposeVersionedValue(versionedValue)
val, metadata, ver := decomposeVersionedValue(versionedValue)
if h.rwsetBuilder != nil {
h.rwsetBuilder.AddToReadSet(ns, key, ver)
}
return val, nil
return val, metadata, nil
}
func (h *queryHelper) getStateMultipleKeys(namespace string, keys []string) ([][]byte, error) {
......@@ -62,7 +62,7 @@ func (h *queryHelper) getStateMultipleKeys(namespace string, keys []string) ([][
}
values := make([][]byte, len(versionedValues))
for i, versionedValue := range versionedValues {
val, ver := decomposeVersionedValue(versionedValue)
val, _, ver := decomposeVersionedValue(versionedValue)
if h.rwsetBuilder != nil {
h.rwsetBuilder.AddToReadSet(namespace, keys[i], ver)
}
......@@ -111,7 +111,8 @@ func (h *queryHelper) getPrivateData(ns, coll, key string) ([]byte, error) {
return nil, err
}
val, ver := decomposeVersionedValue(versionedValue)
// metadata is always nil for private data - because, the metadata is part of the hashed key (instead of raw key)
val, _, ver := decomposeVersionedValue(versionedValue)
keyHash := util.ComputeStringHash(key)
if hashVersion, err = h.txmgr.db.GetKeyHashVersion(ns, coll, keyHash); err != nil {
......@@ -128,6 +129,23 @@ func (h *queryHelper) getPrivateData(ns, coll, key string) ([]byte, error) {
return val, nil
}
func (h *queryHelper) getPrivateDataValueHash(ns, coll, key string) (valueHash, metadataBytes []byte, err error) {
if err := h.checkDone(); err != nil {
return nil, nil, err
}
var versionedValue *statedb.VersionedValue
keyHash := util.ComputeStringHash(key)
if versionedValue, err = h.txmgr.db.GetValueHash(ns, coll, keyHash); err != nil {
return nil, nil, err
}
valHash, metadata, ver := decomposeVersionedValue(versionedValue)
if h.rwsetBuilder != nil {
h.rwsetBuilder.AddToHashedReadSet(ns, coll, key, ver)
}
return valHash, metadata, nil
}
func (h *queryHelper) getPrivateDataMultipleKeys(ns, coll string, keys []string) ([][]byte, error) {
if err := h.validateCollName(ns, coll); err != nil {
return nil, err
......@@ -141,7 +159,7 @@ func (h *queryHelper) getPrivateDataMultipleKeys(ns, coll string, keys []string)
}
values := make([][]byte, len(versionedValues))
for i, versionedValue := range versionedValues {
val, ver := decomposeVersionedValue(versionedValue)
val, _, ver := decomposeVersionedValue(versionedValue)
if h.rwsetBuilder != nil {
h.rwsetBuilder.AddToHashedReadSet(ns, coll, keys[i], ver)
}
......@@ -335,14 +353,16 @@ func (itr *queryResultsItr) Close() {
itr.DBItr.Close()
}
func decomposeVersionedValue(versionedValue *statedb.VersionedValue) ([]byte, *version.Height) {
func decomposeVersionedValue(versionedValue *statedb.VersionedValue) ([]byte, []byte, *version.Height) {
var value []byte
var metadata []byte
var ver *version.Height
if versionedValue != nil {
value = versionedValue.Value
ver = versionedValue.Version
metadata = versionedValue.Metadata
}
return value, ver
return value, metadata, ver
}
// pvtdataResultsItr iterates over results of a query on pvt data
......
......@@ -7,7 +7,7 @@ package lockbasedtxmgr
import (
"github.com/hyperledger/fabric/common/ledger"
"github.com/pkg/errors"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/storageutil"
)
// LockBasedQueryExecutor is a query executor used in `LockBasedTxMgr`
......@@ -23,13 +23,22 @@ func newQueryExecutor(txmgr *LockBasedTxMgr, txid string) *lockBasedQueryExecuto
}
// GetState implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) GetState(ns string, key string) ([]byte, error) {
return q.helper.getState(ns, key)
func (q *lockBasedQueryExecutor) GetState(ns string, key string) (val []byte, err error) {
val, _, err = q.helper.getState(ns, key)
return
}
// GetStateMetadata implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) GetStateMetadata(namespace, key string) (map[string][]byte, error) {
return nil, errors.New("not implemented")
_, metadataBytes, err := q.helper.getState(namespace, key)
if err != nil {
return nil, err
}
metadata, err := storageutil.DeserializeMetadata(metadataBytes)
if err != nil {
return nil, err
}
return metadata, err
}
// GetStateMultipleKeys implements method in interface `ledger.QueryExecutor`
......@@ -57,7 +66,15 @@ func (q *lockBasedQueryExecutor) GetPrivateData(namespace, collection, key strin
// GetPrivateDataMetadata implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) GetPrivateDataMetadata(namespace, collection, key string) (map[string][]byte, error) {
return nil, errors.New("not implemented")
_, metadataBytes, err := q.helper.getPrivateDataValueHash(namespace, collection, key)
if err != nil {
return nil, err
}
metadata, err := storageutil.DeserializeMetadata(metadataBytes)
if err != nil {
return nil, err
}
return metadata, err
}
// GetPrivateDataMultipleKeys implements method in interface `ledger.QueryExecutor`
......
......@@ -33,13 +33,7 @@ func newLockBasedTxSimulator(txmgr *LockBasedTxMgr, txid string) (*lockBasedTxSi
// SetState implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) SetState(ns string, key string, value []byte) error {
if err := s.helper.checkDone(); err != nil {
return err
}
if err := s.checkBeforeWrite(); err != nil {
return err
}
if err := s.helper.txmgr.db.ValidateKeyValue(key, value); err != nil {
if err := s.checkWritePrecondition(key, value); err != nil {
return err
}
s.rwsetBuilder.AddToWriteSet(ns, key, value)
......@@ -63,12 +57,16 @@ func (s *lockBasedTxSimulator) SetStateMultipleKeys(namespace string, kvs map[st
// SetStateMetadata implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) SetStateMetadata(namespace, key string, metadata map[string][]byte) error {
return errors.New("not implemented")
if err := s.checkWritePrecondition(key, nil); err != nil {
return err
}
s.rwsetBuilder.AddToMetadataWriteSet(namespace, key, metadata)
return nil
}
// DeleteStateMetadata implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) DeleteStateMetadata(namespace, key string) error {
return errors.New("not implemented")
return s.SetStateMetadata(namespace, key, nil)
}
// SetPrivateData implements method in interface `ledger.TxSimulator`
......@@ -76,13 +74,7 @@ func (s *lockBasedTxSimulator) SetPrivateData(ns, coll, key string, value []byte
if err := s.helper.validateCollName(ns, coll); err != nil {
return err
}
if err := s.helper.checkDone(); err != nil {
return err
}
if err := s.checkBeforeWrite(); err != nil {
return err
}
if err := s.helper.txmgr.db.ValidateKeyValue(key, value); err != nil {
if err := s.checkWritePrecondition(key, value); err != nil {
return err
}
s.writePerformed = true
......@@ -115,12 +107,16 @@ func (s *lockBasedTxSimulator) GetPrivateDataRangeScanIterator(namespace, collec
// SetPrivateDataMetadata implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) SetPrivateDataMetadata(namespace, collection, key string, metadata map[string][]byte) error {
return errors.New("not implemented")
if err := s.checkWritePrecondition(key, nil); err != nil {
return err
}
s.rwsetBuilder.AddToHashedMetadataWriteSet(namespace, collection, key, metadata)
return nil
}
// DeletePrivateMetadata implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) DeletePrivateDataMetadata(namespace, collection, key string) error {
return errors.New("not implemented")
return s.SetPrivateDataMetadata(namespace, collection, key, nil)
}
// ExecuteQueryOnPrivateData implements method in interface `ledger.TxSimulator`
......@@ -150,13 +146,16 @@ func (s *lockBasedTxSimulator) ExecuteUpdate(query string) error {
return errors.New("not supported")
}
func (s *lockBasedTxSimulator) checkBeforeWrite() error {
if s.pvtdataQueriesPerformed {
return &txmgr.ErrUnsupportedTransaction{
Msg: fmt.Sprintf("txid [%s]: Transaction has already performed queries on pvt data. Writes are not allowed", s.txid),
}
func (s *lockBasedTxSimulator) checkWritePrecondition(key string, value []byte) error {
if err := s.helper.checkDone(); err != nil {
return err
}
if err := s.checkPvtdataQueryPerformed(); err != nil {
return err
}
if err := s.helper.txmgr.db.ValidateKeyValue(key, value); err != nil {
return err
}
s.writePerformed = true
return nil
}
......@@ -169,3 +168,13 @@ func (s *lockBasedTxSimulator) checkBeforePvtdataQueries() error {
s.pvtdataQueriesPerformed = true
return nil
}
func (s *lockBasedTxSimulator) checkPvtdataQueryPerformed() error {
if s.pvtdataQueriesPerformed {
return &txmgr.ErrUnsupportedTransaction{
Msg: fmt.Sprintf("txid [%s]: Transaction has already performed queries on pvt data. Writes are not allowed", s.txid),
}
}
s.writePerformed = true
return nil
}
......@@ -864,6 +864,120 @@ func TestTxSimulatorMissingPvtdataExpiry(t *testing.T) {
simulator.Done()
}
func TestTxWithPubMetadata(t *testing.T) {
for _, testEnv := range testEnvs {
t.Logf("Running test for TestEnv = %s", testEnv.getName())
testLedgerID := "testtxwithpubmetadata"
testEnv.init(t, testLedgerID, nil)
testTxWithPubMetadata(t, testEnv)
testEnv.cleanup()
}
}
func testTxWithPubMetadata(t *testing.T, env testEnv) {
namespace := "testns"
txMgr := env.getTxMgr()
txMgrHelper := newTxMgrTestHelper(t, txMgr)
// Simulate and commit tx1 - set val and metadata for key1 and key2. Set only metadata for key3
s1, _ := txMgr.NewTxSimulator("test_tx1")
key1, value1, metadata1 := "key1", []byte("value1"), map[string][]byte{"entry1": []byte("meatadata1-entry1")}
key2, value2, metadata2 := "key2", []byte("value2"), map[string][]byte{"entry1": []byte("meatadata2-entry1")}
key3, metadata3 := "key3", map[string][]byte{"entry1": []byte("meatadata3-entry")}
s1.SetState(namespace, key1, value1)
s1.SetStateMetadata(namespace, key1, metadata1)
s1.SetState(namespace, key2, value2)
s1.SetStateMetadata(namespace, key2, metadata2)
s1.SetStateMetadata(namespace, key3, metadata3)
s1.Done()
txRWSet1, _ := s1.GetTxSimulationResults()
txMgrHelper.validateAndCommitRWSet(txRWSet1.PubSimulationResults)
// Run query - key1 and key2 should return both value and metadata. Key3 should still be non-exsting in db
qe, _ := txMgr.NewQueryExecutor("test_tx2")
checkTestQueryResults(t, qe, namespace, key1, value1, metadata1)
checkTestQueryResults(t, qe, namespace, key2, value2, metadata2)
checkTestQueryResults(t, qe, namespace, key3, nil, nil)
qe.Done()
// Simulate and commit tx3 - update metadata for key1 and delete metadata for key2
updatedMetadata1 := map[string][]byte{"entry1": []byte("meatadata1-entry1"), "entry2": []byte("meatadata1-entry2")}
s2, _ := txMgr.NewTxSimulator("test_tx3")
s2.SetStateMetadata(namespace, key1, updatedMetadata1)
s2.DeleteStateMetadata(namespace, key2)
s2.Done()
txRWSet2, _ := s2.GetTxSimulationResults()
txMgrHelper.validateAndCommitRWSet(txRWSet2.PubSimulationResults)
// Run query - key1 should return updated metadata. Key2 should return 'nil' metadata
qe, _ = txMgr.NewQueryExecutor("test_tx4")
checkTestQueryResults(t, qe, namespace, key1, value1, updatedMetadata1)
checkTestQueryResults(t, qe, namespace, key2, value2, nil)
qe.Done()
}
func TestTxWithPvtdataMetadata(t *testing.T) {
ledgerid, ns, coll := "testtxwithpvtdatametadata", "ns", "coll"
cs := btltestutil.NewMockCollectionStore()
cs.SetBTL("ns", "coll", 1000)
btlPolicy := pvtdatapolicy.ConstructBTLPolicy(cs)
for _, testEnv := range testEnvs {
t.Logf("Running test for TestEnv = %s", testEnv.getName())
testEnv.init(t, ledgerid, btlPolicy)
testTxWithPvtdataMetadata(t, testEnv, ns, coll)
testEnv.cleanup()
}
}
func testTxWithPvtdataMetadata(t *testing.T, env testEnv, ns, coll string) {
ledgerid := "testtxwithpvtdatametadata"
txMgr := env.getTxMgr()
bg, _ := testutil.NewBlockGenerator(t, ledgerid, false)
populateCollConfigForTest(t, txMgr.(*LockBasedTxMgr), []collConfigkey{{"ns", "coll"}}, version.NewHeight(1, 1))
// Simulate and commit tx1 - set val and metadata for key1 and key2. Set only metadata for key3
s1, _ := txMgr.NewTxSimulator("test_tx1")
key1, value1, metadata1 := "key1", []byte("value1"), map[string][]byte{"entry1": []byte("meatadata1-entry1")}
key2, value2, metadata2 := "key2", []byte("value2"), map[string][]byte{"entry1": []byte("meatadata2-entry1")}
key3, metadata3 := "key3", map[string][]byte{"entry1": []byte("meatadata3-entry")}
s1.SetPrivateData(ns, coll, key1, value1)
s1.SetPrivateDataMetadata(ns, coll, key1, metadata1)
s1.SetPrivateData(ns, coll, key2, value2)
s1.SetPrivateDataMetadata(ns, coll, key2, metadata2)
s1.SetPrivateDataMetadata(ns, coll, key3, metadata3)
s1.Done()
blkAndPvtdata1 := prepareNextBlockForTestFromSimulator(t, bg, s1)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata1, true))
assert.NoError(t, txMgr.Commit())
// Run query - key1 and key2 should return both value and metadata. Key3 should still be non-exsting in db
qe, _ := txMgr.NewQueryExecutor("test_tx2")
checkPvtdataTestQueryResults(t, qe, ns, coll, key1, value1, metadata1)
checkPvtdataTestQueryResults(t, qe, ns, coll, key2, value2, metadata2)
checkPvtdataTestQueryResults(t, qe, ns, coll, key3, nil, nil)
qe.Done()
// Simulate and commit tx3 - update metadata for key1 and delete metadata for key2
updatedMetadata1 := map[string][]byte{"entry1": []byte("meatadata1-entry1"), "entry2": []byte("meatadata1-entry2")}
s2, _ := txMgr.NewTxSimulator("test_tx3")
s2.SetPrivateDataMetadata(ns, coll, key1, updatedMetadata1)
s2.DeletePrivateDataMetadata(ns, coll, key2)
s2.Done()
blkAndPvtdata2 := prepareNextBlockForTestFromSimulator(t, bg, s2)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata2, true))
assert.NoError(t, txMgr.Commit())
// Run query - key1 should return updated metadata. Key2 should return 'nil' metadata
qe, _ = txMgr.NewQueryExecutor("test_tx4")
checkPvtdataTestQueryResults(t, qe, ns, coll, key1, value1, updatedMetadata1)
checkPvtdataTestQueryResults(t, qe, ns, coll, key2, value2, nil)
qe.Done()
}
func prepareNextBlockForTest(t *testing.T, txMgr txmgr.TxMgr, bg *testutil.BlockGenerator,
txid string, pubKVs map[string]string, pvtKVs map[string]string) *ledger.BlockAndPvtData {
simulator, _ := txMgr.NewTxSimulator(txid)
......@@ -875,6 +989,10 @@ func prepareNextBlockForTest(t *testing.T, txMgr txmgr.TxMgr, bg *testutil.Block
simulator.SetPrivateData("ns", "coll", k, []byte(v))
}
simulator.Done()
return prepareNextBlockForTestFromSimulator(t, bg, simulator)
}
func prepareNextBlockForTestFromSimulator(t *testing.T, bg *testutil.BlockGenerator, simulator ledger.TxSimulator) *ledger.BlockAndPvtData {
simRes, _ := simulator.GetTxSimulationResults()
pubSimBytes, _ := simRes.GetPubSimulationBytes()
block := bg.NextBlock([][]byte{pubSimBytes})
......@@ -882,3 +1000,27 @@ func prepareNextBlockForTest(t *testing.T, txMgr txmgr.TxMgr, bg *testutil.Block
BlockPvtData: map[uint64]*ledger.TxPvtData{0: {SeqInBlock: 0, WriteSet: simRes.PvtSimulationResults}},
}
}
func checkTestQueryResults(t *testing.T, qe ledger.QueryExecutor, ns, key string,
expectedVal []byte, expectedMetadata map[string][]byte) {
committedVal, err := qe.GetState(ns, key)
assert.NoError(t, err)
assert.Equal(t, expectedVal, committedVal)
committedMetadata, err := qe.GetStateMetadata(ns, key)
assert.NoError(t, err)
assert.Equal(t, expectedMetadata, committedMetadata)
t.Logf("key=%s, value=%s, metadata=%s", key, committedVal, committedMetadata)
}
func checkPvtdataTestQueryResults(t *testing.T, qe ledger.QueryExecutor, ns, coll, key string,
expectedVal []byte, expectedMetadata map[string][]byte) {
committedVal, err := qe.GetPrivateData(ns, coll, key)
assert.NoError(t, err)
assert.Equal(t, expectedVal, committedVal)
committedMetadata, err := qe.GetPrivateDataMetadata(ns, coll, key)
assert.NoError(t, err)
assert.Equal(t, expectedMetadata, committedMetadata)
t.Logf("key=%s, value=%s, metadata=%s", key, committedVal, committedMetadata)
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment