Commit ca152f26 authored by Chris Elder's avatar Chris Elder
Browse files

[FAB-9840] CouchDB safe pagination - statecouchdb



Currently, CouchDB queries are limited to configurable max number of
results/docs, based on peer queryLimit config option (default 10000).
The shim/peer supports pagination but CouchDB query iterators do not.

This change will allow additional controls to prevent a large accidental or
malicious query from causing performance issues with CouchDB.

Changes to core.yaml:

Changed queryLimit to 1000.  This is part of repurposing this parameter.
QueryLimit is now to be used as a per query limit to CouchDB instead of the
limit for the query from the shim.

Added totalLimit parameter.  This parameter will be used to cap the maximum
number of records returned by the query from the shim.

Changes to couchdb:

Added changes to range query to find the next start key (if exists) for the
specified range.  The next start key is returned with the query results.

Added changes to rich query to find the next bookmark for the query.  The
bookmark is returned with the query results.

Changes to statedb interface:

Add 2 new methods to allow parameters to be passed to the range and rich
query methods.

GetStateRangeScanIteratorWithMetadata(namespace string, startKey string,
endKey string, metadata map[string]interface{})
(ResultsIterator, error)

ExecuteQueryWithMetadata(namespace, query string, metadata map[string]interface{})
(ResultsIterator, error)

Changes to statecouchdb (range query):

Add concept of  limit as a query parameter.  If limit is set, then queries
are executed and returned using limit.  A nextStartKey is also
returned which will allow the nextStartKey to passed as a parameter to the next
query.  This implements an "explicit" paging.

If no limit is specified, then the query scanner will use the nextStartKey
internally to page through the results, using the queryLimit to control the
query size to CouchDB. This implements an "implicit" paging.

Changes to statecouchdb (rich query):

Add concept of  limit as a query parameter.  If limit is set, then queries
are executed and returned using limit to control size.  A bookmark is also
returned which will allow the bookmark to passed as a parameter to the next
query.  This implements an "explicit" paging.

If no  limit is specified, then the query scanner will use the bookmark
internally to page through the results, using the queryLimit to control the
query size to CouchDB.  This implements an "implicit" paging.

Change-Id: I2b2e70b3302120291bd2243d90e6c6402b70e399
Signed-off-by: default avatarChris Elder <chris.elder@us.ibm.com>
parent 3139ec24
......@@ -44,6 +44,12 @@ type ResultsIterator interface {
Close()
}
// QueryResultsIterator - an iterator for query result set
type QueryResultsIterator interface {
ResultsIterator
GetBookmarkAndClose() string
}
// QueryResult - a general interface for supporting different types of query results. Actual types differ for different queries
type QueryResult interface{}
......
......@@ -23,6 +23,7 @@ import (
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/spf13/viper"
)
// TestGetStateMultipleKeys tests read for given multiple keys
......@@ -189,7 +190,6 @@ func TestIterator(t *testing.T, dbProvider statedb.VersionedDBProvider) {
batch.Put("ns3", "key7", []byte("value7"), version.NewHeight(1, 7))
savePoint := version.NewHeight(2, 5)
db.ApplyUpdates(batch, savePoint)
itr1, _ := db.GetStateRangeScanIterator("ns1", "key1", "")
testItr(t, itr1, []string{"key1", "key2", "key3", "key4"})
......@@ -201,6 +201,7 @@ func TestIterator(t *testing.T, dbProvider statedb.VersionedDBProvider) {
itr4, _ := db.GetStateRangeScanIterator("ns2", "", "")
testItr(t, itr4, []string{"key5", "key6"})
}
func testItr(t *testing.T, itr statedb.ResultsIterator, expectedKeys []string) {
......@@ -211,9 +212,8 @@ func testItr(t *testing.T, itr statedb.ResultsIterator, expectedKeys []string) {
key := vkv.Key
testutil.AssertEquals(t, key, expectedKey)
}
last, err := itr.Next()
_, err := itr.Next()
testutil.AssertNoError(t, err, "")
testutil.AssertNil(t, last)
}
// TestQuery tests queries
......@@ -760,3 +760,192 @@ func TestValueAndMetadataWrites(t *testing.T, dbProvider statedb.VersionedDBProv
vv, _ = db.GetState("ns2", "key4")
testutil.AssertEquals(t, vv, &vv4)
}
// TestPaginatedRangeQuery tests range queries with pagination
func TestPaginatedRangeQuery(t *testing.T, dbProvider statedb.VersionedDBProvider) {
db, err := dbProvider.GetDBHandle("testpaginatedrangequery")
testutil.AssertNoError(t, err, "")
db.Open()
defer db.Close()
batch := statedb.NewUpdateBatch()
jsonValue1 := "{\"asset_name\": \"marble1\",\"color\": \"blue\",\"size\": 1,\"owner\": \"tom\"}"
batch.Put("ns1", "key1", []byte(jsonValue1), version.NewHeight(1, 1))
jsonValue2 := "{\"asset_name\": \"marble2\",\"color\": \"red\",\"size\": 2,\"owner\": \"jerry\"}"
batch.Put("ns1", "key2", []byte(jsonValue2), version.NewHeight(1, 2))
jsonValue3 := "{\"asset_name\": \"marble3\",\"color\": \"red\",\"size\": 3,\"owner\": \"fred\"}"
batch.Put("ns1", "key3", []byte(jsonValue3), version.NewHeight(1, 3))
jsonValue4 := "{\"asset_name\": \"marble4\",\"color\": \"red\",\"size\": 4,\"owner\": \"martha\"}"
batch.Put("ns1", "key4", []byte(jsonValue4), version.NewHeight(1, 4))
jsonValue5 := "{\"asset_name\": \"marble5\",\"color\": \"blue\",\"size\": 5,\"owner\": \"fred\"}"
batch.Put("ns1", "key5", []byte(jsonValue5), version.NewHeight(1, 5))
jsonValue6 := "{\"asset_name\": \"marble6\",\"color\": \"red\",\"size\": 6,\"owner\": \"elaine\"}"
batch.Put("ns1", "key6", []byte(jsonValue6), version.NewHeight(1, 6))
jsonValue7 := "{\"asset_name\": \"marble7\",\"color\": \"blue\",\"size\": 7,\"owner\": \"fred\"}"
batch.Put("ns1", "key7", []byte(jsonValue7), version.NewHeight(1, 7))
jsonValue8 := "{\"asset_name\": \"marble8\",\"color\": \"red\",\"size\": 8,\"owner\": \"elaine\"}"
batch.Put("ns1", "key8", []byte(jsonValue8), version.NewHeight(1, 8))
jsonValue9 := "{\"asset_name\": \"marble9\",\"color\": \"green\",\"size\": 9,\"owner\": \"fred\"}"
batch.Put("ns1", "key9", []byte(jsonValue9), version.NewHeight(1, 9))
jsonValue10 := "{\"asset_name\": \"marble10\",\"color\": \"green\",\"size\": 10,\"owner\": \"mary\"}"
batch.Put("ns1", "key10", []byte(jsonValue10), version.NewHeight(1, 10))
jsonValue11 := "{\"asset_name\": \"marble11\",\"color\": \"cyan\",\"size\": 8,\"owner\": \"joe\"}"
batch.Put("ns1", "key11", []byte(jsonValue11), version.NewHeight(1, 11))
jsonValue12 := "{\"asset_name\": \"marble12\",\"color\": \"red\",\"size\": 4,\"owner\": \"martha\"}"
batch.Put("ns1", "key12", []byte(jsonValue12), version.NewHeight(1, 4))
jsonValue13 := "{\"asset_name\": \"marble13\",\"color\": \"red\",\"size\": 6,\"owner\": \"james\"}"
batch.Put("ns1", "key13", []byte(jsonValue13), version.NewHeight(1, 4))
jsonValue14 := "{\"asset_name\": \"marble14\",\"color\": \"red\",\"size\": 10,\"owner\": \"fred\"}"
batch.Put("ns1", "key14", []byte(jsonValue14), version.NewHeight(1, 4))
jsonValue15 := "{\"asset_name\": \"marble15\",\"color\": \"red\",\"size\": 8,\"owner\": \"mary\"}"
batch.Put("ns1", "key15", []byte(jsonValue15), version.NewHeight(1, 4))
jsonValue16 := "{\"asset_name\": \"marble16\",\"color\": \"red\",\"size\": 4,\"owner\": \"robert\"}"
batch.Put("ns1", "key16", []byte(jsonValue16), version.NewHeight(1, 4))
jsonValue17 := "{\"asset_name\": \"marble17\",\"color\": \"red\",\"size\": 2,\"owner\": \"alan\"}"
batch.Put("ns1", "key17", []byte(jsonValue17), version.NewHeight(1, 4))
jsonValue18 := "{\"asset_name\": \"marble18\",\"color\": \"red\",\"size\": 10,\"owner\": \"elaine\"}"
batch.Put("ns1", "key18", []byte(jsonValue18), version.NewHeight(1, 4))
jsonValue19 := "{\"asset_name\": \"marble19\",\"color\": \"red\",\"size\": 2,\"owner\": \"alan\"}"
batch.Put("ns1", "key19", []byte(jsonValue19), version.NewHeight(1, 4))
jsonValue20 := "{\"asset_name\": \"marble20\",\"color\": \"red\",\"size\": 10,\"owner\": \"elaine\"}"
batch.Put("ns1", "key20", []byte(jsonValue20), version.NewHeight(1, 4))
jsonValue21 := "{\"asset_name\": \"marble21\",\"color\": \"cyan\",\"size\": 1000007,\"owner\": \"joe\"}"
batch.Put("ns1", "key21", []byte(jsonValue21), version.NewHeight(1, 11))
jsonValue22 := "{\"asset_name\": \"marble22\",\"color\": \"red\",\"size\": 4,\"owner\": \"martha\"}"
batch.Put("ns1", "key22", []byte(jsonValue22), version.NewHeight(1, 4))
jsonValue23 := "{\"asset_name\": \"marble23\",\"color\": \"blue\",\"size\": 6,\"owner\": \"james\"}"
batch.Put("ns1", "key23", []byte(jsonValue23), version.NewHeight(1, 4))
jsonValue24 := "{\"asset_name\": \"marble24\",\"color\": \"red\",\"size\": 10,\"owner\": \"fred\"}"
batch.Put("ns1", "key24", []byte(jsonValue24), version.NewHeight(1, 4))
jsonValue25 := "{\"asset_name\": \"marble25\",\"color\": \"red\",\"size\": 8,\"owner\": \"mary\"}"
batch.Put("ns1", "key25", []byte(jsonValue25), version.NewHeight(1, 4))
jsonValue26 := "{\"asset_name\": \"marble26\",\"color\": \"red\",\"size\": 4,\"owner\": \"robert\"}"
batch.Put("ns1", "key26", []byte(jsonValue26), version.NewHeight(1, 4))
jsonValue27 := "{\"asset_name\": \"marble27\",\"color\": \"green\",\"size\": 2,\"owner\": \"alan\"}"
batch.Put("ns1", "key27", []byte(jsonValue27), version.NewHeight(1, 4))
jsonValue28 := "{\"asset_name\": \"marble28\",\"color\": \"red\",\"size\": 10,\"owner\": \"elaine\"}"
batch.Put("ns1", "key28", []byte(jsonValue28), version.NewHeight(1, 4))
jsonValue29 := "{\"asset_name\": \"marble29\",\"color\": \"red\",\"size\": 2,\"owner\": \"alan\"}"
batch.Put("ns1", "key29", []byte(jsonValue29), version.NewHeight(1, 4))
jsonValue30 := "{\"asset_name\": \"marble30\",\"color\": \"red\",\"size\": 10,\"owner\": \"elaine\"}"
batch.Put("ns1", "key30", []byte(jsonValue30), version.NewHeight(1, 4))
jsonValue31 := "{\"asset_name\": \"marble31\",\"color\": \"cyan\",\"size\": 1000007,\"owner\": \"joe\"}"
batch.Put("ns1", "key31", []byte(jsonValue31), version.NewHeight(1, 11))
jsonValue32 := "{\"asset_name\": \"marble32\",\"color\": \"red\",\"size\": 4,\"owner\": \"martha\"}"
batch.Put("ns1", "key32", []byte(jsonValue32), version.NewHeight(1, 4))
jsonValue33 := "{\"asset_name\": \"marble33\",\"color\": \"red\",\"size\": 6,\"owner\": \"james\"}"
batch.Put("ns1", "key33", []byte(jsonValue33), version.NewHeight(1, 4))
jsonValue34 := "{\"asset_name\": \"marble34\",\"color\": \"red\",\"size\": 10,\"owner\": \"fred\"}"
batch.Put("ns1", "key34", []byte(jsonValue34), version.NewHeight(1, 4))
jsonValue35 := "{\"asset_name\": \"marble35\",\"color\": \"red\",\"size\": 8,\"owner\": \"mary\"}"
batch.Put("ns1", "key35", []byte(jsonValue35), version.NewHeight(1, 4))
jsonValue36 := "{\"asset_name\": \"marble36\",\"color\": \"orange\",\"size\": 4,\"owner\": \"robert\"}"
batch.Put("ns1", "key36", []byte(jsonValue36), version.NewHeight(1, 4))
jsonValue37 := "{\"asset_name\": \"marble37\",\"color\": \"red\",\"size\": 2,\"owner\": \"alan\"}"
batch.Put("ns1", "key37", []byte(jsonValue37), version.NewHeight(1, 4))
jsonValue38 := "{\"asset_name\": \"marble38\",\"color\": \"yellow\",\"size\": 10,\"owner\": \"elaine\"}"
batch.Put("ns1", "key38", []byte(jsonValue38), version.NewHeight(1, 4))
jsonValue39 := "{\"asset_name\": \"marble39\",\"color\": \"red\",\"size\": 2,\"owner\": \"alan\"}"
batch.Put("ns1", "key39", []byte(jsonValue39), version.NewHeight(1, 4))
jsonValue40 := "{\"asset_name\": \"marble40\",\"color\": \"red\",\"size\": 10,\"owner\": \"elaine\"}"
batch.Put("ns1", "key40", []byte(jsonValue40), version.NewHeight(1, 4))
savePoint := version.NewHeight(2, 22)
db.ApplyUpdates(batch, savePoint)
//Test range query with no pagination
returnKeys := []string{}
_, err = executeRangeQuery(t, db, "ns1", "key1", "key15", int32(0), returnKeys)
testutil.AssertNoError(t, err, "")
//Test range query with large page size (single page return)
returnKeys = []string{"key1", "key10", "key11", "key12", "key13", "key14"}
_, err = executeRangeQuery(t, db, "ns1", "key1", "key15", int32(10), returnKeys)
testutil.AssertNoError(t, err, "")
//Test explicit pagination
//Test range query with multiple pages
returnKeys = []string{"key1", "key10"}
nextStartKey, err := executeRangeQuery(t, db, "ns1", "key1", "key22", int32(2), returnKeys)
testutil.AssertNoError(t, err, "")
// NextStartKey is now passed in as startKey, verify the pagesize is working
returnKeys = []string{"key11", "key12"}
_, err = executeRangeQuery(t, db, "ns1", nextStartKey, "key22", int32(2), returnKeys)
testutil.AssertNoError(t, err, "")
//Set queryLimit to 2
viper.Set("ledger.state.couchDBConfig.internalQueryLimit", 2)
//Test implicit pagination
//Test range query with no pagesize and a small queryLimit
returnKeys = []string{}
_, err = executeRangeQuery(t, db, "ns1", "key1", "key15", int32(0), returnKeys)
testutil.AssertNoError(t, err, "")
//Test range query with pagesize greater than the queryLimit
returnKeys = []string{"key1", "key10", "key11", "key12"}
_, err = executeRangeQuery(t, db, "ns1", "key1", "key15", int32(4), returnKeys)
testutil.AssertNoError(t, err, "")
//reset queryLimit to 1000
viper.Set("ledger.state.couchDBConfig.internalQueryLimit", 1000)
}
func executeRangeQuery(t *testing.T, db statedb.VersionedDB, namespace, startKey, endKey string, limit int32, returnKeys []string) (string, error) {
var itr statedb.ResultsIterator
var err error
if limit == 0 {
itr, err = db.GetStateRangeScanIterator(namespace, startKey, endKey)
if err != nil {
return "", err
}
} else {
queryOptions := make(map[string]interface{})
if limit != 0 {
queryOptions["limit"] = limit
}
itr, err = db.GetStateRangeScanIteratorWithMetadata(namespace, startKey, endKey, queryOptions)
if err != nil {
return "", err
}
// Verify the keys returned
if limit > 0 {
TestItrWithoutClose(t, itr, returnKeys)
}
}
returnBookmark := ""
if limit > 0 {
if queryResultItr, ok := itr.(statedb.QueryResultsIterator); ok {
returnBookmark = queryResultItr.GetBookmarkAndClose()
}
}
return returnBookmark, nil
}
// TestItrWithoutClose verifies an iterator contains expected keys
func TestItrWithoutClose(t *testing.T, itr statedb.ResultsIterator, expectedKeys []string) {
for _, expectedKey := range expectedKeys {
queryResult, err := itr.Next()
testutil.AssertNoError(t, err, "An unexpected error was thrown during iterator Next()")
vkv := queryResult.(*statedb.VersionedKV)
key := vkv.Key
testutil.AssertEquals(t, key, expectedKey)
}
queryResult, err := itr.Next()
testutil.AssertNoError(t, err, "An unexpected error was thrown during iterator Next()")
testutil.AssertNil(t, queryResult)
}
......@@ -19,9 +19,18 @@ type ResultsIterator struct {
result1 statedb.QueryResult
result2 error
}
CloseStub func()
closeMutex sync.RWMutex
closeArgsForCall []struct{}
CloseStub func()
closeMutex sync.RWMutex
closeArgsForCall []struct{}
GetBookmarkAndCloseStub func() string
getBookmarkAndCloseMutex sync.RWMutex
getBookmarkAndCloseArgsForCall []struct{}
getBookmarkAndCloseReturns struct {
result1 string
}
getBookmarkAndCloseReturnsOnCall map[int]struct {
result1 string
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
......@@ -85,6 +94,46 @@ func (fake *ResultsIterator) CloseCallCount() int {
return len(fake.closeArgsForCall)
}
func (fake *ResultsIterator) GetBookmarkAndClose() string {
fake.getBookmarkAndCloseMutex.Lock()
ret, specificReturn := fake.getBookmarkAndCloseReturnsOnCall[len(fake.getBookmarkAndCloseArgsForCall)]
fake.getBookmarkAndCloseArgsForCall = append(fake.getBookmarkAndCloseArgsForCall, struct{}{})
fake.recordInvocation("GetBookmarkAndClose", []interface{}{})
fake.getBookmarkAndCloseMutex.Unlock()
if fake.GetBookmarkAndCloseStub != nil {
return fake.GetBookmarkAndCloseStub()
}
if specificReturn {
return ret.result1
}
return fake.getBookmarkAndCloseReturns.result1
}
func (fake *ResultsIterator) GetBookmarkAndCloseCallCount() int {
fake.getBookmarkAndCloseMutex.RLock()
defer fake.getBookmarkAndCloseMutex.RUnlock()
return len(fake.getBookmarkAndCloseArgsForCall)
}
func (fake *ResultsIterator) GetBookmarkAndCloseReturns(result1 string) {
fake.GetBookmarkAndCloseStub = nil
fake.getBookmarkAndCloseReturns = struct {
result1 string
}{result1}
}
func (fake *ResultsIterator) GetBookmarkAndCloseReturnsOnCall(i int, result1 string) {
fake.GetBookmarkAndCloseStub = nil
if fake.getBookmarkAndCloseReturnsOnCall == nil {
fake.getBookmarkAndCloseReturnsOnCall = make(map[int]struct {
result1 string
})
}
fake.getBookmarkAndCloseReturnsOnCall[i] = struct {
result1 string
}{result1}
}
func (fake *ResultsIterator) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
......@@ -92,6 +141,8 @@ func (fake *ResultsIterator) Invocations() map[string][][]interface{} {
defer fake.nextMutex.RUnlock()
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
fake.getBookmarkAndCloseMutex.RLock()
defer fake.getBookmarkAndCloseMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
......
......@@ -254,28 +254,98 @@ func (vdb *VersionedDB) GetStateMultipleKeys(namespace string, keys []string) ([
// startKey is inclusive
// endKey is exclusive
func (vdb *VersionedDB) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (statedb.ResultsIterator, error) {
// Get the querylimit from core.yaml
queryLimit := ledgerconfig.GetQueryLimit()
return vdb.GetStateRangeScanIteratorWithMetadata(namespace, startKey, endKey, nil)
}
const optionBookmark = "bookmark"
const optionLimit = "limit"
const returnCount = "count"
// GetStateRangeScanIteratorWithMetadata implements method in VersionedDB interface
// startKey is inclusive
// endKey is exclusive
// metadata contains a map of additional query options
func (vdb *VersionedDB) GetStateRangeScanIteratorWithMetadata(namespace string, startKey string, endKey string, metadata map[string]interface{}) (statedb.QueryResultsIterator, error) {
logger.Debugf("Entering GetStateRangeScanIteratorWithMetadata namespace: %s startKey: %s endKey: %s metadata: %v", namespace, startKey, endKey, metadata)
// Get the internalQueryLimit from core.yaml
internalQueryLimit := int32(ledgerconfig.GetInternalQueryLimit())
requestedLimit := int32(0)
// if metadata is provided, validate and apply options
if metadata != nil {
//validate the metadata
err := statedb.ValidateRangeMetadata(metadata)
if err != nil {
return nil, err
}
if limitOption, ok := metadata[optionLimit]; ok {
requestedLimit = limitOption.(int32)
}
}
db, err := vdb.getNamespaceDBHandle(namespace)
if err != nil {
return nil, err
}
queryResult, err := db.ReadDocRange(startKey, endKey, queryLimit, querySkip)
return newQueryScanner(namespace, db, "", internalQueryLimit, requestedLimit, "", startKey, endKey)
}
func (scanner *queryScanner) getNextStateRangeScanResults() error {
queryLimit := scanner.queryDefinition.internalQueryLimit
if scanner.paginationInfo.requestedLimit > 0 {
moreResultsNeeded := scanner.paginationInfo.requestedLimit - scanner.resultsInfo.totalRecordsReturned
if moreResultsNeeded < scanner.queryDefinition.internalQueryLimit {
queryLimit = moreResultsNeeded
}
}
queryResult, nextStartKey, err := scanner.db.ReadDocRange(scanner.queryDefinition.startKey, scanner.queryDefinition.endKey,
queryLimit)
if err != nil {
logger.Errorf("Error calling ReadDocRange(): %s", err.Error())
return nil, err
logger.Debugf("Error calling ReadDocRange(): %s\n", err.Error())
return err
}
logger.Debugf("Exiting GetStateRangeScanIterator")
return newQueryScanner(namespace, *queryResult), nil
scanner.resultsInfo.results = queryResult
scanner.queryDefinition.startKey = nextStartKey
scanner.paginationInfo.cursor = 0
return nil
}
// ExecuteQuery implements method in VersionedDB interface
func (vdb *VersionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIterator, error) {
queryResult, err := vdb.ExecuteQueryWithMetadata(namespace, query, nil)
if err != nil {
return nil, err
}
return queryResult, nil
}
// ExecuteQueryWithMetadata implements method in VersionedDB interface
func (vdb *VersionedDB) ExecuteQueryWithMetadata(namespace, query string, metadata map[string]interface{}) (statedb.QueryResultsIterator, error) {
logger.Debugf("Entering ExecuteQueryWithMetadata namespace: %s, query: %s, metadata: %v", namespace, query, metadata)
// Get the querylimit from core.yaml
queryLimit := ledgerconfig.GetQueryLimit()
// Explicit paging not yet supported.
// Use queryLimit from config and 0 skip.
queryString, err := applyAdditionalQueryOptions(query, queryLimit, 0)
internalQueryLimit := int32(ledgerconfig.GetInternalQueryLimit())
bookmark := ""
requestedLimit := int32(0)
// if metadata is provided, then validate and set provided options
if metadata != nil {
err := validateQueryMetadata(metadata)
if err != nil {
return nil, err
}
if limitOption, ok := metadata[optionLimit]; ok {
requestedLimit = limitOption.(int32)
}
if bookmarkOption, ok := metadata[optionBookmark]; ok {
bookmark = bookmarkOption.(string)
}
}
queryString, err := applyAdditionalQueryOptions(query, internalQueryLimit, bookmark)
if err != nil {
logger.Errorf("Error calling applyAdditionalQueryOptions(): %s", err.Error())
return nil, err
......@@ -284,13 +354,63 @@ func (vdb *VersionedDB) ExecuteQuery(namespace, query string) (statedb.ResultsIt
if err != nil {
return nil, err
}
queryResult, err := db.QueryDocuments(queryString)
return newQueryScanner(namespace, db, queryString, internalQueryLimit, requestedLimit, bookmark, "", "")
}
// executeQueryWithBookmark executes a "paging" query with a bookmark, this method allows a
// paged query without returning a new query iterator
func (scanner *queryScanner) executeQueryWithBookmark() error {
queryLimit := scanner.queryDefinition.internalQueryLimit
if scanner.paginationInfo.requestedLimit > 0 {
if scanner.paginationInfo.requestedLimit-scanner.resultsInfo.totalRecordsReturned < scanner.queryDefinition.internalQueryLimit {
queryLimit = scanner.paginationInfo.requestedLimit - scanner.resultsInfo.totalRecordsReturned
}
}
queryString, err := applyAdditionalQueryOptions(scanner.queryDefinition.query,
queryLimit, scanner.paginationInfo.bookmark)
if err != nil {
logger.Errorf("Error calling QueryDocuments(): %s", err.Error())
return nil, err
logger.Debugf("Error calling applyAdditionalQueryOptions(): %s\n", err.Error())
return err
}
queryResult, bookmark, err := scanner.db.QueryDocuments(queryString)
if err != nil {
logger.Debugf("Error calling QueryDocuments(): %s\n", err.Error())
return err
}
scanner.resultsInfo.results = queryResult
scanner.paginationInfo.bookmark = bookmark
scanner.paginationInfo.cursor = 0
return nil
}
func validateQueryMetadata(metadata map[string]interface{}) error {
for key, keyVal := range metadata {
switch key {
case optionBookmark:
//Verify the bookmark is a string
if _, ok := keyVal.(string); ok {
continue
}
return fmt.Errorf("Invalid entry, \"bookmark\" must be a string")
case optionLimit:
//Verify the limit is an integer
if _, ok := keyVal.(int32); ok {
continue
}
return fmt.Errorf("Invalid entry, \"limit\" must be an int32")
default:
return fmt.Errorf("Invalid entry, option %s not recognized", key)
}
}
logger.Debugf("Exiting ExecuteQuery")
return newQueryScanner(namespace, *queryResult), nil
return nil
}
// ApplyUpdates implements method in VersionedDB interface
......@@ -396,10 +516,10 @@ func (vdb *VersionedDB) GetLatestSavePoint() (*version.Height, error) {
}
// applyAdditionalQueryOptions will add additional fields to the query required for query processing
func applyAdditionalQueryOptions(queryString string, queryLimit, querySkip int) (string, error) {
func applyAdditionalQueryOptions(queryString string, queryLimit int32, queryBookmark string) (string, error) {
const jsonQueryFields = "fields"
const jsonQueryLimit = "limit"
const jsonQuerySkip = "skip"
const jsonQueryBookmark = "bookmark"
//create a generic map for the query json
jsonQueryMap := make(map[string]interface{})
//unmarshal the selector json into the generic map
......@@ -423,10 +543,10 @@ func applyAdditionalQueryOptions(queryString string, queryLimit, querySkip int)
// This will override any limit passed in the query.
// Explicit paging not yet supported.
jsonQueryMap[jsonQueryLimit] = queryLimit
// Add skip of 0.
// This will override any skip passed in the query.
// Explicit paging not yet supported.
jsonQueryMap[jsonQuerySkip] = querySkip
// Add the bookmark if provided
if queryBookmark != "" {
jsonQueryMap[jsonQueryBookmark] = queryBookmark
}
//Marshal the updated json query
editedQuery, err := json.Marshal(jsonQueryMap)
if err != nil {
......@@ -437,21 +557,89 @@ func applyAdditionalQueryOptions(queryString string, queryLimit, querySkip int)
}
type queryScanner struct {
cursor int
namespace string
results []couchdb.QueryResult
namespace string
db *couchdb.CouchDatabase
queryDefinition *queryDefinition
paginationInfo *paginationInfo
resultsInfo *resultsInfo
}
type queryDefinition struct {
startKey string
endKey string
query string
internalQueryLimit int32
}
type paginationInfo struct {
cursor int32
requestedLimit int32
bookmark string
}
type resultsInfo s