Commit f46677dd authored by Jonathan Levi (HACERA)'s avatar Jonathan Levi (HACERA) Committed by Gerrit Code Review
Browse files

Merge "Ledger-lscc: Introduce queriabilty on to-be state"

parents 2d3d042f bd1fa1b8
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package queryutil_test
import (
"errors"
"os"
"testing"
"github.com/hyperledger/fabric/common/flogging"
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/queryutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/queryutil/mock"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
statedbmock "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/mock"
"github.com/hyperledger/fabric/protos/ledger/queryresult"
"github.com/stretchr/testify/assert"
)
func TestMain(m *testing.M) {
flogging.SetModuleLevel("util", "debug")
flogging.SetModuleLevel("statedb", "debug")
os.Exit(m.Run())
}
func TestCombinerGetState(t *testing.T) {
batch1 := statedb.NewUpdateBatch()
batch1.Put("ns1", "key1", []byte("b1_value1"), nil)
batch1.Delete("ns1", "key2", nil)
batch1.Put("ns1", "key3", []byte("b1_value3"), nil)
batch2 := statedb.NewUpdateBatch()
batch2.Put("ns1", "key1", []byte("b2_value1"), nil)
batch2.Put("ns1", "key2", []byte("b2_value2"), nil)
batch2.Put("ns1", "key3", []byte("b2_value3"), nil)
batch3 := statedb.NewUpdateBatch()
batch3.Put("ns1", "key1", []byte("b3_value1"), nil)
batch3.Put("ns1", "key2", []byte("b3_value2"), nil)
batch3.Delete("ns1", "key3", nil)
combiner := &queryutil.QECombiner{
QueryExecuters: []queryutil.QueryExecuter{
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch1},
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch2},
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch3},
}}
val, err := combiner.GetState("ns1", "key1")
assert.NoError(t, err)
assert.Equal(t, []byte("b1_value1"), val)
val, err = combiner.GetState("ns1", "key2")
assert.NoError(t, err)
assert.Nil(t, val)
val, err = combiner.GetState("ns1", "key3")
assert.NoError(t, err)
assert.Equal(t, []byte("b1_value3"), val)
combiner = &queryutil.QECombiner{
QueryExecuters: []queryutil.QueryExecuter{
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch3},
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch2},
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch1},
}}
val, err = combiner.GetState("ns1", "key1")
assert.NoError(t, err)
assert.Equal(t, []byte("b3_value1"), val)
val, err = combiner.GetState("ns1", "key2")
assert.NoError(t, err)
assert.Equal(t, []byte("b3_value2"), val)
val, err = combiner.GetState("ns1", "key3")
assert.NoError(t, err)
assert.Nil(t, val)
}
func TestCombinerRangeScan(t *testing.T) {
batch1 := statedb.NewUpdateBatch()
batch1.Put("ns1", "key1", []byte("batch1_value1"), nil)
batch1.Delete("ns1", "key2", nil)
batch1.Put("ns1", "key3", []byte("batch1_value3"), nil)
batch2 := statedb.NewUpdateBatch()
batch2.Put("ns1", "key1", []byte("batch2_value1"), nil)
batch2.Put("ns1", "key2", []byte("batch2_value2"), nil)
batch2.Delete("ns1", "key3", nil)
batch2.Put("ns1", "key4", []byte("batch2_value4"), nil)
batch3 := statedb.NewUpdateBatch()
batch3.Put("ns1", "key0", []byte("batch3_value0"), nil)
batch3.Put("ns1", "key1", []byte("batch3_value1"), nil)
batch3.Put("ns1", "key2", []byte("batch3_value2"), nil)
batch3.Put("ns1", "key3", []byte("batch3_value3"), nil)
batch3.Put("ns1", "key4", []byte("batch3_value4"), nil)
batch3.Put("ns1", "key5", []byte("batch3_value5"), nil)
combiner := &queryutil.QECombiner{
QueryExecuters: []queryutil.QueryExecuter{
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch1},
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch2},
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch3},
},
}
itr, err := combiner.GetStateRangeScanIterator("ns1", "key1", "key4")
assert.NoError(t, err)
expectedResults := []*queryresult.KV{
{Namespace: "ns1", Key: "key1", Value: []byte("batch1_value1")},
{Namespace: "ns1", Key: "key3", Value: []byte("batch1_value3")},
}
testutilCheckIteratorResults(t, itr, expectedResults)
itr, err = combiner.GetStateRangeScanIterator("ns1", "key0", "key6")
assert.NoError(t, err)
expectedResults = []*queryresult.KV{
{Namespace: "ns1", Key: "key0", Value: []byte("batch3_value0")},
{Namespace: "ns1", Key: "key1", Value: []byte("batch1_value1")},
{Namespace: "ns1", Key: "key3", Value: []byte("batch1_value3")},
{Namespace: "ns1", Key: "key4", Value: []byte("batch2_value4")},
{Namespace: "ns1", Key: "key5", Value: []byte("batch3_value5")},
}
testutilCheckIteratorResults(t, itr, expectedResults)
combiner = &queryutil.QECombiner{
QueryExecuters: []queryutil.QueryExecuter{
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch3},
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch2},
&queryutil.UpdateBatchBackedQueryExecuter{UpdateBatch: batch1},
},
}
itr, err = combiner.GetStateRangeScanIterator("ns1", "key0", "key6")
assert.NoError(t, err)
expectedResults = []*queryresult.KV{
{Namespace: "ns1", Key: "key0", Value: []byte("batch3_value0")},
{Namespace: "ns1", Key: "key1", Value: []byte("batch3_value1")},
{Namespace: "ns1", Key: "key2", Value: []byte("batch3_value2")},
{Namespace: "ns1", Key: "key3", Value: []byte("batch3_value3")},
{Namespace: "ns1", Key: "key4", Value: []byte("batch3_value4")},
{Namespace: "ns1", Key: "key5", Value: []byte("batch3_value5")},
}
testutilCheckIteratorResults(t, itr, expectedResults)
}
func TestGetStateError(t *testing.T) {
qe1 := &mock.QueryExecuter{}
qe1.GetStateReturns(&statedb.VersionedValue{Value: []byte("testValue")}, nil)
qe2 := &mock.QueryExecuter{}
qe2.GetStateReturns(nil, errors.New("Error for testing"))
combiner1 := &queryutil.QECombiner{
QueryExecuters: []queryutil.QueryExecuter{
qe1, qe2,
},
}
_, err := combiner1.GetState("ns", "key1")
assert.NoError(t, err)
combiner2 := &queryutil.QECombiner{
QueryExecuters: []queryutil.QueryExecuter{
qe2, qe1,
},
}
_, err = combiner2.GetState("ns", "key1")
assert.Error(t, err)
}
func TestGetRangeScanError(t *testing.T) {
itr1 := &statedbmock.ResultsIterator{}
itr1.NextReturns(
&statedb.VersionedKV{
CompositeKey: statedb.CompositeKey{Namespace: "ns", Key: "dummyKey"},
VersionedValue: statedb.VersionedValue{Value: []byte("dummyVal")},
},
nil,
)
qe1 := &mock.QueryExecuter{}
qe1.GetStateRangeScanIteratorReturns(itr1, nil)
qe2 := &mock.QueryExecuter{}
qe2.GetStateRangeScanIteratorReturns(nil, errors.New("dummy error on getting the iterator"))
combiner := &queryutil.QECombiner{
QueryExecuters: []queryutil.QueryExecuter{
qe1, qe2,
},
}
_, err := combiner.GetStateRangeScanIterator("ns", "startKey", "endKey")
assert.Error(t, err)
}
func TestGetRangeScanUnderlyingIteratorReturnsError(t *testing.T) {
itr1 := &statedbmock.ResultsIterator{}
itr1.NextReturns(
&statedb.VersionedKV{
CompositeKey: statedb.CompositeKey{Namespace: "ns", Key: "dummyKey"},
VersionedValue: statedb.VersionedValue{Value: []byte("dummyVal")},
},
nil,
)
itr2 := &statedbmock.ResultsIterator{}
itr2.NextReturns(
nil,
errors.New("dummyErrorOnIteratorNext"),
)
qe1 := &mock.QueryExecuter{}
qe1.GetStateRangeScanIteratorReturns(itr1, nil)
qe2 := &mock.QueryExecuter{}
qe2.GetStateRangeScanIteratorReturns(itr2, nil)
combiner := &queryutil.QECombiner{
QueryExecuters: []queryutil.QueryExecuter{
qe1, qe2,
},
}
_, err := combiner.GetStateRangeScanIterator("ns", "startKey", "endKey")
assert.Error(t, err)
}
func testutilCheckIteratorResults(t *testing.T, itr commonledger.ResultsIterator, expectedResults []*queryresult.KV) {
results := []*queryresult.KV{}
for {
result, err := itr.Next()
assert.NoError(t, err)
if result == nil {
break
}
results = append(results, result.(*queryresult.KV))
}
assert.Equal(t, expectedResults, results)
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package queryutil
import (
"fmt"
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/protos/ledger/queryresult"
)
type itrCombiner struct {
namespace string
holders []*itrHolder
}
func newItrCombiner(namespace string, baseIterators []statedb.ResultsIterator) (*itrCombiner, error) {
var holders []*itrHolder
for _, itr := range baseIterators {
res, err := itr.Next()
if err != nil {
for _, holder := range holders {
holder.itr.Close()
}
return nil, err
}
if res != nil {
holders = append(holders, &itrHolder{itr, res.(*statedb.VersionedKV)})
}
}
return &itrCombiner{namespace, holders}, nil
}
// Next returns the next eligible item from the underlying iterators.
// This function evaluates the underlying iterators, and picks the one which is
// gives the lexicographically smallest key. Then, it saves that value, and advances the chosen iterator.
// If the chosen iterator is out of elements, then that iterator is closed, and removed from the list of iterators.
func (combiner *itrCombiner) Next() (commonledger.QueryResult, error) {
logger.Debugf("Iterators position at beginning: %s", combiner.holders)
if len(combiner.holders) == 0 {
return nil, nil
}
smallestHolderIndex := 0
for i := 1; i < len(combiner.holders); i++ {
smallestKey, holderKey := combiner.keyAt(smallestHolderIndex), combiner.keyAt(i)
switch {
case holderKey == smallestKey: // we found the same key in the lower order iterator (stale value of the key);
// we already have the latest value for this key (in smallestHolder). Ignore this value and move the iterator
// to next item (to a greater key) so that for next round of key selection, we do not consider this key again
removed, err := combiner.moveItrAndRemoveIfExhausted(i)
if err != nil {
return nil, err
}
if removed { // if the current iterator is exhaused and hence removed, decrement the index
// because indexes of the remaining iterators are decremented by one
i--
}
case holderKey < smallestKey:
smallestHolderIndex = i
default:
// the current key under evaluation is greater than the smallestKey - do nothing
}
}
kv := combiner.kvAt(smallestHolderIndex)
combiner.moveItrAndRemoveIfExhausted(smallestHolderIndex)
if kv.IsDelete() {
return combiner.Next()
}
logger.Debugf("Key [%s] selected from iterator at index [%d]", kv.Key, smallestHolderIndex)
logger.Debugf("Iterators position at end: %s", combiner.holders)
return &queryresult.KV{Namespace: combiner.namespace, Key: kv.Key, Value: kv.Value}, nil
}
// moveItrAndRemoveIfExhausted moves the iterator at index i to the next item. If the iterator gets exhausted
// then the iterator is removed from the underlying slice
func (combiner *itrCombiner) moveItrAndRemoveIfExhausted(i int) (removed bool, err error) {
holder := combiner.holders[i]
exhausted, err := holder.moveToNext()
if err != nil {
return false, err
}
if exhausted {
combiner.holders[i].itr.Close()
combiner.holders = append(combiner.holders[:i], combiner.holders[i+1:]...)
}
return exhausted, nil
}
// kvAt returns the kv available from iterator at index i
func (combiner *itrCombiner) kvAt(i int) *statedb.VersionedKV {
return combiner.holders[i].kv
}
// keyAt returns the key available from iterator at index i
func (combiner *itrCombiner) keyAt(i int) string {
return combiner.kvAt(i).Key
}
// Close closes all the underlying iterators
func (combiner *itrCombiner) Close() {
for _, holder := range combiner.holders {
holder.itr.Close()
}
}
// itrHolder encloses an iterator and keeps the next item available from the iterator in the buffer
type itrHolder struct {
itr statedb.ResultsIterator
kv *statedb.VersionedKV
}
// moveToNext fetches the next item to keep in buffer and returns true if the iterator is exhausted
func (holder *itrHolder) moveToNext() (exhausted bool, err error) {
var res statedb.QueryResult
if res, err = holder.itr.Next(); err != nil {
return false, err
}
if res != nil {
holder.kv = res.(*statedb.VersionedKV)
}
return res == nil, nil
}
// String returns the key that the holder has in the buffer for serving as a next key
func (holder *itrHolder) String() string {
return fmt.Sprintf("{%s}", holder.kv.Key)
}
// Code generated by counterfeiter. DO NOT EDIT.
package mock
import (
"sync"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/queryutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
)
type QueryExecuter struct {
GetStateStub func(namespace, key string) (*statedb.VersionedValue, error)
getStateMutex sync.RWMutex
getStateArgsForCall []struct {
namespace string
key string
}
getStateReturns struct {
result1 *statedb.VersionedValue
result2 error
}
getStateReturnsOnCall map[int]struct {
result1 *statedb.VersionedValue
result2 error
}
GetStateRangeScanIteratorStub func(namespace, startKey, endKey string) (statedb.ResultsIterator, error)
getStateRangeScanIteratorMutex sync.RWMutex
getStateRangeScanIteratorArgsForCall []struct {
namespace string
startKey string
endKey string
}
getStateRangeScanIteratorReturns struct {
result1 statedb.ResultsIterator
result2 error
}
getStateRangeScanIteratorReturnsOnCall map[int]struct {
result1 statedb.ResultsIterator
result2 error
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *QueryExecuter) GetState(namespace string, key string) (*statedb.VersionedValue, error) {
fake.getStateMutex.Lock()
ret, specificReturn := fake.getStateReturnsOnCall[len(fake.getStateArgsForCall)]
fake.getStateArgsForCall = append(fake.getStateArgsForCall, struct {
namespace string
key string
}{namespace, key})
fake.recordInvocation("GetState", []interface{}{namespace, key})
fake.getStateMutex.Unlock()
if fake.GetStateStub != nil {
return fake.GetStateStub(namespace, key)
}
if specificReturn {
return ret.result1, ret.result2
}
return fake.getStateReturns.result1, fake.getStateReturns.result2
}
func (fake *QueryExecuter) GetStateCallCount() int {
fake.getStateMutex.RLock()
defer fake.getStateMutex.RUnlock()
return len(fake.getStateArgsForCall)
}
func (fake *QueryExecuter) GetStateArgsForCall(i int) (string, string) {
fake.getStateMutex.RLock()
defer fake.getStateMutex.RUnlock()
return fake.getStateArgsForCall[i].namespace, fake.getStateArgsForCall[i].key
}
func (fake *QueryExecuter) GetStateReturns(result1 *statedb.VersionedValue, result2 error) {
fake.GetStateStub = nil
fake.getStateReturns = struct {
result1 *statedb.VersionedValue
result2 error
}{result1, result2}
}
func (fake *QueryExecuter) GetStateReturnsOnCall(i int, result1 *statedb.VersionedValue, result2 error) {
fake.GetStateStub = nil
if fake.getStateReturnsOnCall == nil {
fake.getStateReturnsOnCall = make(map[int]struct {
result1 *statedb.VersionedValue
result2 error
})
}
fake.getStateReturnsOnCall[i] = struct {
result1 *statedb.VersionedValue
result2 error
}{result1, result2}
}
func (fake *QueryExecuter) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) {
fake.getStateRangeScanIteratorMutex.Lock()
ret, specificReturn := fake.getStateRangeScanIteratorReturnsOnCall[len(fake.getStateRangeScanIteratorArgsForCall)]
fake.getStateRangeScanIteratorArgsForCall = append(fake.getStateRangeScanIteratorArgsForCall, struct {
namespace string
startKey string
endKey string
}{namespace, startKey, endKey})
fake.recordInvocation("GetStateRangeScanIterator", []interface{}{namespace, startKey, endKey})
fake.getStateRangeScanIteratorMutex.Unlock()
if fake.GetStateRangeScanIteratorStub != nil {
return fake.GetStateRangeScanIteratorStub(namespace, startKey, endKey)
}
if specificReturn {
return ret.result1, ret.result2
}
return fake.getStateRangeScanIteratorReturns.result1, fake.getStateRangeScanIteratorReturns.result2
}
func (fake *QueryExecuter) GetStateRangeScanIteratorCallCount() int {
fake.getStateRangeScanIteratorMutex.RLock()
defer fake.getStateRangeScanIteratorMutex.RUnlock()
return len(fake.getStateRangeScanIteratorArgsForCall)
}
func (fake *QueryExecuter) GetStateRangeScanIteratorArgsForCall(i int) (string, string, string) {
fake.getStateRangeScanIteratorMutex.RLock()
defer fake.getStateRangeScanIteratorMutex.RUnlock()
return fake.getStateRangeScanIteratorArgsForCall[i].namespace, fake.getStateRangeScanIteratorArgsForCall[i].startKey, fake.getStateRangeScanIteratorArgsForCall[i].endKey
}
func (fake *QueryExecuter) GetStateRangeScanIteratorReturns(result1 statedb.ResultsIterator, result2 error) {
fake.GetStateRangeScanIteratorStub = nil
fake.getStateRangeScanIteratorReturns = struct {
result1 statedb.ResultsIterator
result2 error
}{result1, result2}
}
func (fake *QueryExecuter) GetStateRangeScanIteratorReturnsOnCall(i int, result1 statedb.ResultsIterator, result2 error) {
fake.GetStateRangeScanIteratorStub = nil
if fake.getStateRangeScanIteratorReturnsOnCall == nil {
fake.getStateRangeScanIteratorReturnsOnCall = make(map[int]struct {
result1 statedb.ResultsIterator
result2 error
})
}
fake.getStateRangeScanIteratorReturnsOnCall[i] = struct {
result1 statedb.ResultsIterator
result2 error
}{result1, result2}
}
func (fake *QueryExecuter) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.getStateMutex.RLock()
defer fake.getStateMutex.RUnlock()
fake.getStateRangeScanIteratorMutex.RLock()
defer fake.getStateRangeScanIteratorMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
func (fake *QueryExecuter) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}
}
if fake.invocations[key] == nil {
fake.invocations[key] = [][]interface{}{}
}
fake.invocations[key] = append(fake.invocations[key], args)
}
var _ queryutil.QueryExecuter = new(QueryExecuter)
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package queryutil
import (
"github.com/hyperledger/fabric/common/flogging"
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
)
var logger = flogging.MustGetLogger("util")
//go:generate counterfeiter -o mock/query_executer.go -fake-name QueryExecuter . QueryExecuter
// QueryExecuter encapsulates query functions
type QueryExecuter interface {
GetState(namespace, key string) (*statedb.VersionedValue, error)
GetStateRangeScanIterator(namespace, startKey, endKey string) (statedb.ResultsIterator, error)
}
// QECombiner combines the query results from one or more underlying 'queryExecuters'
// In case, the same key is returned by multiple 'queryExecuters', the first 'queryExecuter'
// in the input is considered having the latest state of the key
type QECombiner struct {
QueryExecuters []QueryExecuter // actual executers in decending order of priority
}