Commit 4abc690a authored by manish's avatar manish
Browse files

Ledger-lscc: Enhance StateListener with QueryExecutors



This CR enhances state listener interface and allows
passing two query executors - one that operates on the
committed state and the other that operates on to-be
committed state

FAB-11561 #done

Change-Id: I672de199bf3fa9ba18513a1917cb8a379ddaec0b
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent bd1fa1b8
......@@ -26,7 +26,8 @@ type KVLedgerLSCCStateListener struct {
// HandleStateUpdates iterates over key-values being written in the 'lscc' namespace (which indicates deployment of a chaincode)
// and invokes `HandleChaincodeDeploy` function on chaincode event manager (which in turn is responsible for creation of statedb
// artifacts for the chaincode statedata)
func (listener *KVLedgerLSCCStateListener) HandleStateUpdates(channelName string, stateUpdates ledger.StateUpdates, committingBlockNum uint64) error {
func (listener *KVLedgerLSCCStateListener) HandleStateUpdates(trigger *ledger.StateUpdateTrigger) error {
channelName, stateUpdates := trigger.LedgerID, trigger.StateUpdates
kvWrites := stateUpdates[lsccNamespace].([]*kvrwset.KVWrite)
logger.Debugf("Channel [%s]: Handling state updates in LSCC namespace - stateUpdates=%#v", channelName, kvWrites)
chaincodeDefs := []*ChaincodeDefinition{}
......
......@@ -109,11 +109,13 @@ func TestLSCCListener(t *testing.T) {
sampleChaincodeData1 := &ccprovider.ChaincodeData{Name: cc1Def.Name, Version: cc1Def.Version, Id: cc1Def.Hash}
sampleChaincodeDataBytes1, err := proto.Marshal(sampleChaincodeData1)
assert.NoError(t, err, "")
lsccStateListener.HandleStateUpdates(channelName,
ledger.StateUpdates{
lsccStateListener.HandleStateUpdates(&ledger.StateUpdateTrigger{
LedgerID: channelName,
StateUpdates: ledger.StateUpdates{
lsccNamespace: []*kvrwset.KVWrite{{Key: cc1Def.Name, Value: sampleChaincodeDataBytes1}},
},
50)
CommittingBlockNum: 50},
)
assert.Contains(t, handler1.eventsRecieved, &mockEvent{cc1Def, ccDBArtifactsTar})
})
......@@ -122,11 +124,13 @@ func TestLSCCListener(t *testing.T) {
sampleChaincodeData2 := &ccprovider.ChaincodeData{Name: cc2Def.Name, Version: cc2Def.Version, Id: cc2Def.Hash}
sampleChaincodeDataBytes2, err := proto.Marshal(sampleChaincodeData2)
assert.NoError(t, err, "")
lsccStateListener.HandleStateUpdates(channelName,
ledger.StateUpdates{
lsccStateListener.HandleStateUpdates(&ledger.StateUpdateTrigger{
LedgerID: channelName,
StateUpdates: ledger.StateUpdates{
lsccNamespace: []*kvrwset.KVWrite{{Key: cc2Def.Name, Value: sampleChaincodeDataBytes2, IsDelete: true}},
},
50)
CommittingBlockNum: 50},
)
assert.NotContains(t, handler1.eventsRecieved, &mockEvent{cc2Def, ccDBArtifactsTar})
})
......@@ -135,11 +139,13 @@ func TestLSCCListener(t *testing.T) {
sampleChaincodeData3 := &ccprovider.ChaincodeData{Name: cc3Def.Name, Version: cc3Def.Version, Id: cc3Def.Hash}
sampleChaincodeDataBytes3, err := proto.Marshal(sampleChaincodeData3)
assert.NoError(t, err, "")
lsccStateListener.HandleStateUpdates(channelName,
ledger.StateUpdates{
lsccStateListener.HandleStateUpdates(&ledger.StateUpdateTrigger{
LedgerID: channelName,
StateUpdates: ledger.StateUpdates{
lsccNamespace: []*kvrwset.KVWrite{{Key: cc3Def.Name, Value: sampleChaincodeDataBytes3}},
},
50)
CommittingBlockNum: 50},
)
assert.NotContains(t, handler1.eventsRecieved, &mockEvent{cc3Def, ccDBArtifactsTar})
})
}
......
......@@ -59,9 +59,9 @@ func (m *mgr) StateCommitDone(ledgerID string) {
// In this implementation, each collection configurations updates (in lscc namespace)
// are persisted as a separate entry in a separate db. The composite key for the entry
// is a tuple of <blockNum, namespace, key>
func (m *mgr) HandleStateUpdates(ledgerID string, stateUpdates ledger.StateUpdates, commitHeight uint64) error {
batch := prepareDBBatch(stateUpdates, commitHeight)
dbHandle := m.dbProvider.getDB(ledgerID)
func (m *mgr) HandleStateUpdates(trigger *ledger.StateUpdateTrigger) error {
batch := prepareDBBatch(trigger.StateUpdates, trigger.CommittingBlockNum)
dbHandle := m.dbProvider.getDB(trigger.LedgerID)
return dbHandle.writeBatch(batch, true)
}
......
......@@ -43,7 +43,11 @@ func TestMgr(t *testing.T) {
// for each ledgerid and commitHeight combination, construct a unique collConfigPackage and induce a stateUpdate
collConfigPackage := sampleCollectionConfigPackage(ledgerid, committingBlockNum)
stateUpdate := sampleStateUpdate(t, chaincodeName, collConfigPackage)
mgr.HandleStateUpdates(ledgerid, stateUpdate, committingBlockNum)
mgr.HandleStateUpdates(&ledger.StateUpdateTrigger{
LedgerID: ledgerid,
StateUpdates: stateUpdate,
CommittingBlockNum: committingBlockNum},
)
}
}
......
......@@ -97,7 +97,8 @@ func (l *mockStateListener) InterestedInNamespaces() []string {
return []string{l.namespace}
}
func (l *mockStateListener) HandleStateUpdates(channelName string, stateUpdates ledger.StateUpdates, committingBlockNum uint64) error {
func (l *mockStateListener) HandleStateUpdates(trigger *ledger.StateUpdateTrigger) error {
channelName, stateUpdates := trigger.LedgerID, trigger.StateUpdates
l.channelName = channelName
l.kvWrites = stateUpdates[l.namespace].([]*kvrwset.KVWrite)
return nil
......
......@@ -8,16 +8,16 @@ package lockbasedtxmgr
import (
"sync"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/bookkeeping"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/queryutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator/valimpl"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
)
......@@ -114,7 +114,25 @@ func (txmgr *LockBasedTxMgr) invokeNamespaceListeners() error {
continue
}
txmgr.current.listeners = append(txmgr.current.listeners, listener)
if err := listener.HandleStateUpdates(txmgr.ledgerid, stateUpdatesForListener, txmgr.current.blockNum()); err != nil {
committedStateQueryExecuter := &queryutil.QECombiner{
QueryExecuters: []queryutil.QueryExecuter{txmgr.db}}
postCommitQueryExecuter := &queryutil.QECombiner{
QueryExecuters: []queryutil.QueryExecuter{
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: txmgr.current.batch.PubUpdates.UpdateBatch},
txmgr.db,
},
}
trigger := &ledger.StateUpdateTrigger{
LedgerID: txmgr.ledgerid,
StateUpdates: stateUpdatesForListener,
CommittingBlockNum: txmgr.current.blockNum(),
CommittedStateQueryExecutor: committedStateQueryExecuter,
PostCommitQueryExecutor: postCommitQueryExecuter,
}
if err := listener.HandleStateUpdates(trigger); err != nil {
return err
}
logger.Debugf("Invoking listener for state changes:%s", listener)
......
......@@ -29,6 +29,7 @@ import (
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/queryresult"
"github.com/hyperledger/fabric/protos/ledger/rwset"
)
......@@ -171,3 +172,11 @@ func populateCollConfigForTest(t *testing.T, txMgr *LockBasedTxMgr, nsColls []co
}
txMgr.db.ApplyPrivacyAwareUpdates(updates, ht)
}
func testutilPopulateDB(t *testing.T, txMgr *LockBasedTxMgr, ns string, data []*queryresult.KV, version *version.Height) {
updates := privacyenabledstate.NewUpdateBatch()
for _, kv := range data {
updates.PubUpdates.Put(ns, kv.Key, kv.Value, version)
}
txMgr.db.ApplyPrivacyAwareUpdates(updates, version)
}
......@@ -8,11 +8,13 @@ package lockbasedtxmgr
import (
"testing"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/core/ledger/mock"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/queryresult"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
"github.com/stretchr/testify/assert"
)
......@@ -94,14 +96,63 @@ func TestStateListener(t *testing.T) {
assert.Equal(t, 1, ml3.StateCommitDoneCallCount())
}
func TestStateListenerQueryExecutor(t *testing.T) {
testEnv := testEnvsMap[levelDBtestEnvName]
testEnv.init(t, "testLedger", nil)
defer testEnv.cleanup()
txMgr := testEnv.getTxMgr().(*LockBasedTxMgr)
namespace := "ns"
initialData := []*queryresult.KV{
{Namespace: namespace, Key: "key1", Value: []byte("value1")},
{Namespace: namespace, Key: "key2", Value: []byte("value2")},
{Namespace: namespace, Key: "key3", Value: []byte("value3")},
}
// populate initial data in db
testutilPopulateDB(t, txMgr, namespace, initialData, version.NewHeight(1, 1))
sl := new(mock.StateListener)
sl.InterestedInNamespacesStub = func() []string { return []string{"ns"} }
txMgr.stateListeners = []ledger.StateListener{sl}
// Create next block
sim, err := txMgr.NewTxSimulator("tx1")
assert.NoError(t, err)
sim.SetState(namespace, "key1", []byte("value1_new"))
sim.DeleteState(namespace, "key2")
sim.SetState(namespace, "key4", []byte("value4_new"))
simRes, err := sim.GetTxSimulationResults()
simResBytes, err := simRes.GetPubSimulationBytes()
assert.NoError(t, err)
block := testutil.ConstructBlock(t, 1, nil, [][]byte{simResBytes}, false)
// invoke ValidateAndPrepare function
err = txMgr.ValidateAndPrepare(&ledger.BlockAndPvtData{Block: block}, false)
assert.NoError(t, err)
// validate that the query executors passed to the state listener
trigger := sl.HandleStateUpdatesArgsForCall(0)
assert.NotNil(t, trigger)
expectedCommittedData := initialData
checkQueryExecutor(t, trigger.CommittedStateQueryExecutor, namespace, expectedCommittedData)
expectedPostCommitData := []*queryresult.KV{
{Namespace: namespace, Key: "key1", Value: []byte("value1_new")},
{Namespace: namespace, Key: "key3", Value: []byte("value3")},
{Namespace: namespace, Key: "key4", Value: []byte("value4_new")},
}
checkQueryExecutor(t, trigger.PostCommitQueryExecutor, namespace, expectedPostCommitData)
}
func checkHandleStateUpdatesCallback(t *testing.T, ml *mock.StateListener, callNumber int,
expectedLedgerid string,
expectedUpdates ledger.StateUpdates,
expectedCommitHt uint64) {
actualNs, actualStateUpdate, actualHt := ml.HandleStateUpdatesArgsForCall(callNumber)
assert.Equal(t, expectedLedgerid, actualNs)
checkEqualUpdates(t, expectedUpdates, actualStateUpdate)
assert.Equal(t, expectedCommitHt, actualHt)
actualTrigger := ml.HandleStateUpdatesArgsForCall(callNumber)
assert.Equal(t, expectedLedgerid, actualTrigger.LedgerID)
checkEqualUpdates(t, expectedUpdates, actualTrigger.StateUpdates)
assert.Equal(t, expectedCommitHt, actualTrigger.CommittingBlockNum)
}
func checkEqualUpdates(t *testing.T, expected, actual ledger.StateUpdates) {
......@@ -110,3 +161,28 @@ func checkEqualUpdates(t *testing.T, expected, actual ledger.StateUpdates) {
assert.ElementsMatch(t, expectedUpdates, actual[ns])
}
}
func checkQueryExecutor(t *testing.T, qe ledger.SimpleQueryExecutor, namespace string, expectedResults []*queryresult.KV) {
for _, kv := range expectedResults {
val, err := qe.GetState(namespace, kv.Key)
assert.NoError(t, err)
assert.Equal(t, kv.Value, val)
}
itr, err := qe.GetStateRangeScanIterator(namespace, "", "")
assert.NoError(t, err)
defer itr.Close()
actualRes := []*queryresult.KV{}
for {
res, err := itr.Next()
if err != nil {
assert.NoError(t, err)
}
if res == nil {
break
}
actualRes = append(actualRes, res.(*queryresult.KV))
}
assert.Equal(t, expectedResults, actualRes)
}
......@@ -90,24 +90,29 @@ type ValidatedLedger interface {
commonledger.Ledger
}
// SimpleQueryExecutor encapsulates basic functions
type SimpleQueryExecutor interface {
// GetState gets the value for given namespace and key. For a chaincode, the namespace corresponds to the chaincodeId
GetState(namespace string, key string) ([]byte, error)
// GetStateRangeScanIterator returns an iterator that contains all the key-values between given key ranges.
// startKey is included in the results and endKey is excluded. An empty startKey refers to the first available key
// and an empty endKey refers to the last available key. For scanning all the keys, both the startKey and the endKey
// can be supplied as empty strings. However, a full scan should be used judiciously for performance reasons.
// The returned ResultsIterator contains results of type *KV which is defined in protos/ledger/queryresult.
GetStateRangeScanIterator(namespace string, startKey string, endKey string) (commonledger.ResultsIterator, error)
}
// QueryExecutor executes the queries
// Get* methods are for supporting KV-based data model. ExecuteQuery method is for supporting a rich datamodel and query support
//
// ExecuteQuery method in the case of a rich data model is expected to support queries on
// latest state, historical state and on the intersection of state and transactions
type QueryExecutor interface {
// GetState gets the value for given namespace and key. For a chaincode, the namespace corresponds to the chaincodeId
GetState(namespace string, key string) ([]byte, error)
SimpleQueryExecutor
// GetStateMetadata returns the metadata for given namespace and key
GetStateMetadata(namespace, key string) (map[string][]byte, error)
// GetStateMultipleKeys gets the values for multiple keys in a single call
GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error)
// GetStateRangeScanIterator returns an iterator that contains all the key-values between given key ranges.
// startKey is included in the results and endKey is excluded. An empty startKey refers to the first available key
// and an empty endKey refers to the last available key. For scanning all the keys, both the startKey and the endKey
// can be supplied as empty strings. However, a full scan should be used judiciously for performance reasons.
// The returned ResultsIterator contains results of type *KV which is defined in protos/ledger/queryresult.
GetStateRangeScanIterator(namespace string, startKey string, endKey string) (commonledger.ResultsIterator, error)
// ExecuteQuery executes the given query and returns an iterator that contains results of type specific to the underlying data store.
// Only used for state databases that support query
// For a chaincode, the namespace corresponds to the chaincodeId
......@@ -296,10 +301,19 @@ func (txSim *TxSimulationResults) ContainsPvtWrites() bool {
// and result in a panic
type StateListener interface {
InterestedInNamespaces() []string
HandleStateUpdates(ledgerID string, stateUpdates StateUpdates, committingBlockNum uint64) error
HandleStateUpdates(trigger *StateUpdateTrigger) error
StateCommitDone(channelID string)
}
// StateUpdateTrigger encapsulates the information and helper tools that may be used by a StateListener
type StateUpdateTrigger struct {
LedgerID string
StateUpdates StateUpdates
CommittingBlockNum uint64
CommittedStateQueryExecutor SimpleQueryExecutor
PostCommitQueryExecutor SimpleQueryExecutor
}
// StateUpdates is the generic type to represent the state updates
type StateUpdates map[string]interface{}
......
......@@ -17,12 +17,10 @@ type StateListener struct {
interestedInNamespacesReturnsOnCall map[int]struct {
result1 []string
}
HandleStateUpdatesStub func(ledgerID string, stateUpdates ledger.StateUpdates, committingBlockNum uint64) error
HandleStateUpdatesStub func(trigger *ledger.StateUpdateTrigger) error
handleStateUpdatesMutex sync.RWMutex
handleStateUpdatesArgsForCall []struct {
ledgerID string
stateUpdates ledger.StateUpdates
committingBlockNum uint64
trigger *ledger.StateUpdateTrigger
}
handleStateUpdatesReturns struct {
result1 error
......@@ -79,18 +77,16 @@ func (fake *StateListener) InterestedInNamespacesReturnsOnCall(i int, result1 []
}{result1}
}
func (fake *StateListener) HandleStateUpdates(ledgerID string, stateUpdates ledger.StateUpdates, committingBlockNum uint64) error {
func (fake *StateListener) HandleStateUpdates(trigger *ledger.StateUpdateTrigger) error {
fake.handleStateUpdatesMutex.Lock()
ret, specificReturn := fake.handleStateUpdatesReturnsOnCall[len(fake.handleStateUpdatesArgsForCall)]
fake.handleStateUpdatesArgsForCall = append(fake.handleStateUpdatesArgsForCall, struct {
ledgerID string
stateUpdates ledger.StateUpdates
committingBlockNum uint64
}{ledgerID, stateUpdates, committingBlockNum})
fake.recordInvocation("HandleStateUpdates", []interface{}{ledgerID, stateUpdates, committingBlockNum})
trigger *ledger.StateUpdateTrigger
}{trigger})
fake.recordInvocation("HandleStateUpdates", []interface{}{trigger})
fake.handleStateUpdatesMutex.Unlock()
if fake.HandleStateUpdatesStub != nil {
return fake.HandleStateUpdatesStub(ledgerID, stateUpdates, committingBlockNum)
return fake.HandleStateUpdatesStub(trigger)
}
if specificReturn {
return ret.result1
......@@ -104,10 +100,10 @@ func (fake *StateListener) HandleStateUpdatesCallCount() int {
return len(fake.handleStateUpdatesArgsForCall)
}
func (fake *StateListener) HandleStateUpdatesArgsForCall(i int) (string, ledger.StateUpdates, uint64) {
func (fake *StateListener) HandleStateUpdatesArgsForCall(i int) *ledger.StateUpdateTrigger {
fake.handleStateUpdatesMutex.RLock()
defer fake.handleStateUpdatesMutex.RUnlock()
return fake.handleStateUpdatesArgsForCall[i].ledgerID, fake.handleStateUpdatesArgsForCall[i].stateUpdates, fake.handleStateUpdatesArgsForCall[i].committingBlockNum
return fake.handleStateUpdatesArgsForCall[i].trigger
}
func (fake *StateListener) HandleStateUpdatesReturns(result1 error) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment