statecouchdb_test.go 28.9 KB
Newer Older
1
/*
2
Copyright IBM Corp. All Rights Reserved.
3

4
SPDX-License-Identifier: Apache-2.0
5
6
7
8
9
*/

package statecouchdb

import (
10
	"fmt"
11
	"os"
manish's avatar
manish committed
12
	"strings"
13
	"testing"
14
	"time"
15

manish's avatar
manish committed
16
	"github.com/hyperledger/fabric/common/flogging"
17
	"github.com/hyperledger/fabric/common/ledger/testutil"
18
	"github.com/hyperledger/fabric/core/common/ccprovider"
19
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
20
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/commontests"
21
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
22
	ledgertestutil "github.com/hyperledger/fabric/core/ledger/testutil"
23
	"github.com/hyperledger/fabric/integration/runner"
24
25
	"github.com/spf13/viper"
	"github.com/stretchr/testify/assert"
26
27
28
)

func TestMain(m *testing.M) {
29
30
	os.Exit(testMain(m))
}
31

32
func testMain(m *testing.M) int {
33
	// Read the core.yaml file for default config.
34
	ledgertestutil.SetupCoreYAMLConfig()
35
	viper.Set("peer.fileSystemPath", "/tmp/fabric/ledgertests/kvledger/txmgmt/statedb/statecouchdb")
36
37

	// Switch to CouchDB
38
39
	couchAddress, cleanup := couchDBSetup()
	defer cleanup()
40
	viper.Set("ledger.state.stateDatabase", "CouchDB")
41
	defer viper.Set("ledger.state.stateDatabase", "goleveldb")
42

43
	viper.Set("ledger.state.couchDBConfig.couchDBAddress", couchAddress)
44
45
46
47
48
	// Replace with correct username/password such as
	// admin/admin if user security is enabled on couchdb.
	viper.Set("ledger.state.couchDBConfig.username", "")
	viper.Set("ledger.state.couchDBConfig.password", "")
	viper.Set("ledger.state.couchDBConfig.maxRetries", 3)
49
	viper.Set("ledger.state.couchDBConfig.maxRetriesOnStartup", 20)
50
	viper.Set("ledger.state.couchDBConfig.requestTimeout", time.Second*35)
51
52
53
	// Disable auto warm to avoid error logs when the couchdb database has been dropped
	viper.Set("ledger.state.couchDBConfig.autoWarmIndexes", false)

54
	flogging.ActivateSpec("statecouchdb,couchdb=debug")
55
	//run the actual test
56
57
	return m.Run()
}
58

59
60
61
62
63
64
65
66
67
68
69
70
func couchDBSetup() (addr string, cleanup func()) {
	externalCouch, set := os.LookupEnv("COUCHDB_ADDR")
	if set {
		return externalCouch, func() {}
	}

	couchDB := &runner.CouchDB{}
	if err := couchDB.Start(); err != nil {
		err := fmt.Errorf("failed to start couchDB: %s", err)
		panic(err)
	}
	return couchDB.Address(), func() { couchDB.Stop() }
71
72
73
}

func TestBasicRW(t *testing.T) {
74
	env := NewTestVDBEnv(t)
75
	defer env.Cleanup()
76
	commontests.TestBasicRW(t, env.DBProvider)
77
78
79
80

}

func TestMultiDBBasicRW(t *testing.T) {
81
	env := NewTestVDBEnv(t)
82
	defer env.Cleanup()
83
	commontests.TestMultiDBBasicRW(t, env.DBProvider)
84
85
86
87

}

func TestDeletes(t *testing.T) {
88
	env := NewTestVDBEnv(t)
89
	defer env.Cleanup()
90
	commontests.TestDeletes(t, env.DBProvider)
91
92
93
}

func TestIterator(t *testing.T) {
94
	env := NewTestVDBEnv(t)
95
	defer env.Cleanup()
96
	commontests.TestIterator(t, env.DBProvider)
97
98
99
100
101
}

// The following tests are unique to couchdb, they are not used in leveldb
//  query test
func TestQuery(t *testing.T) {
102
	env := NewTestVDBEnv(t)
103
	defer env.Cleanup()
104
	commontests.TestQuery(t, env.DBProvider)
105
}
106
107

func TestGetStateMultipleKeys(t *testing.T) {
108
109

	env := NewTestVDBEnv(t)
110
	defer env.Cleanup()
111
	commontests.TestGetStateMultipleKeys(t, env.DBProvider)
112
}
113
114

func TestGetVersion(t *testing.T) {
115
	env := NewTestVDBEnv(t)
116
	defer env.Cleanup()
117
	commontests.TestGetVersion(t, env.DBProvider)
118
}
119
120

func TestSmallBatchSize(t *testing.T) {
121
122
	viper.Set("ledger.state.couchDBConfig.maxBatchUpdateSize", 2)
	env := NewTestVDBEnv(t)
123
	defer env.Cleanup()
124
125
126
127
128
129
	defer viper.Set("ledger.state.couchDBConfig.maxBatchUpdateSize", 1000)
	commontests.TestSmallBatchSize(t, env.DBProvider)
}

func TestBatchRetry(t *testing.T) {
	env := NewTestVDBEnv(t)
130
	defer env.Cleanup()
131
132
133
	commontests.TestBatchWithIndividualRetry(t, env.DBProvider)
}

134
135
func TestValueAndMetadataWrites(t *testing.T) {
	env := NewTestVDBEnv(t)
136
	defer env.Cleanup()
137
138
139
	commontests.TestValueAndMetadataWrites(t, env.DBProvider)
}

140
141
func TestPaginatedRangeQuery(t *testing.T) {
	env := NewTestVDBEnv(t)
142
	defer env.Cleanup()
143
144
145
	commontests.TestPaginatedRangeQuery(t, env.DBProvider)
}

146
147
148
149
// TestUtilityFunctions tests utility functions
func TestUtilityFunctions(t *testing.T) {

	env := NewTestVDBEnv(t)
150
	defer env.Cleanup()
151
152

	db, err := env.DBProvider.GetDBHandle("testutilityfunctions")
153
	assert.NoError(t, err)
154

155
156
	// BytesKeySupported should be false for CouchDB
	byteKeySupported := db.BytesKeySupported()
157
	assert.False(t, byteKeySupported)
158

159
160
	// ValidateKeyValue should return nil for a valid key and value
	err = db.ValidateKeyValue("testKey", []byte("Some random bytes"))
161
	assert.Nil(t, err)
162

163
	// ValidateKeyValue should return an error for a key that is not a utf-8 valid string
164
	err = db.ValidateKeyValue(string([]byte{0xff, 0xfe, 0xfd}), []byte("Some random bytes"))
165
	assert.Error(t, err, "ValidateKey should have thrown an error for an invalid utf-8 string")
166

167
168
169
170
	// ValidateKeyValue should return an error for a key that is an empty string
	assert.EqualError(t, db.ValidateKeyValue("", []byte("validValue")),
		"invalid key. Empty string is not supported as a key by couchdb")

171
	reservedFields := []string{"~version", "_id", "_test"}
172

173
	// ValidateKeyValue should return an error for a json value that contains one of the reserved fields
174
	// at the top level
175
176
177
	for _, reservedField := range reservedFields {
		testVal := fmt.Sprintf(`{"%s":"dummyVal"}`, reservedField)
		err = db.ValidateKeyValue("testKey", []byte(testVal))
178
		assert.Error(t, err, fmt.Sprintf(
179
			"ValidateKey should have thrown an error for a json value %s, as contains one of the reserved fields", testVal))
180
	}
181

182
	// ValidateKeyValue should not return an error for a json value that contains one of the reserved fields
183
184
185
186
	// if not at the top level
	for _, reservedField := range reservedFields {
		testVal := fmt.Sprintf(`{"data.%s":"dummyVal"}`, reservedField)
		err = db.ValidateKeyValue("testKey", []byte(testVal))
187
		assert.NoError(t, err, fmt.Sprintf(
188
189
190
			"ValidateKey should not have thrown an error the json value %s since the reserved field was not at the top level", testVal))
	}

191
192
	// ValidateKeyValue should return an error for a key that begins with an underscore
	err = db.ValidateKeyValue("_testKey", []byte("testValue"))
193
	assert.Error(t, err, "ValidateKey should have thrown an error for a key that begins with an underscore")
194

195
196
}

197
198
199
200
// TestInvalidJSONFields tests for invalid JSON fields
func TestInvalidJSONFields(t *testing.T) {

	env := NewTestVDBEnv(t)
201
	defer env.Cleanup()
202
203

	db, err := env.DBProvider.GetDBHandle("testinvalidfields")
204
	assert.NoError(t, err)
205
206
207
208
209
210
211
212
213
214

	db.Open()
	defer db.Close()

	batch := statedb.NewUpdateBatch()
	jsonValue1 := `{"_id":"key1","asset_name":"marble1","color":"blue","size":1,"owner":"tom"}`
	batch.Put("ns1", "key1", []byte(jsonValue1), version.NewHeight(1, 1))

	savePoint := version.NewHeight(1, 2)
	err = db.ApplyUpdates(batch, savePoint)
215
	assert.Error(t, err, "Invalid field _id should have thrown an error")
216
217
218
219
220
221
222

	batch = statedb.NewUpdateBatch()
	jsonValue1 = `{"_rev":"rev1","asset_name":"marble1","color":"blue","size":1,"owner":"tom"}`
	batch.Put("ns1", "key1", []byte(jsonValue1), version.NewHeight(1, 1))

	savePoint = version.NewHeight(1, 2)
	err = db.ApplyUpdates(batch, savePoint)
223
	assert.Error(t, err, "Invalid field _rev should have thrown an error")
224
225
226
227
228
229
230

	batch = statedb.NewUpdateBatch()
	jsonValue1 = `{"_deleted":"true","asset_name":"marble1","color":"blue","size":1,"owner":"tom"}`
	batch.Put("ns1", "key1", []byte(jsonValue1), version.NewHeight(1, 1))

	savePoint = version.NewHeight(1, 2)
	err = db.ApplyUpdates(batch, savePoint)
231
	assert.Error(t, err, "Invalid field _deleted should have thrown an error")
232
233
234
235
236
237
238

	batch = statedb.NewUpdateBatch()
	jsonValue1 = `{"~version":"v1","asset_name":"marble1","color":"blue","size":1,"owner":"tom"}`
	batch.Put("ns1", "key1", []byte(jsonValue1), version.NewHeight(1, 1))

	savePoint = version.NewHeight(1, 2)
	err = db.ApplyUpdates(batch, savePoint)
239
	assert.Error(t, err, "Invalid field ~version should have thrown an error")
240
241
}

242
243
244
245
246
247
func TestDebugFunctions(t *testing.T) {

	//Test printCompositeKeys
	// initialize a key list
	loadKeys := []*statedb.CompositeKey{}
	//create a composite key and add to the key list
248
249
250
251
252
	compositeKey3 := statedb.CompositeKey{Namespace: "ns", Key: "key3"}
	loadKeys = append(loadKeys, &compositeKey3)
	compositeKey4 := statedb.CompositeKey{Namespace: "ns", Key: "key4"}
	loadKeys = append(loadKeys, &compositeKey4)
	assert.Equal(t, "[ns,key3],[ns,key4]", printCompositeKeys(loadKeys))
253

254
}
255
256
257
258

func TestHandleChaincodeDeploy(t *testing.T) {

	env := NewTestVDBEnv(t)
259
	defer env.Cleanup()
260
261

	db, err := env.DBProvider.GetDBHandle("testinit")
262
	assert.NoError(t, err)
263
264
265
266
	db.Open()
	defer db.Close()
	batch := statedb.NewUpdateBatch()

267
	jsonValue1 := `{"asset_name": "marble1","color": "blue","size": 1,"owner": "tom"}`
268
	batch.Put("ns1", "key1", []byte(jsonValue1), version.NewHeight(1, 1))
269
	jsonValue2 := `{"asset_name": "marble2","color": "blue","size": 2,"owner": "jerry"}`
270
	batch.Put("ns1", "key2", []byte(jsonValue2), version.NewHeight(1, 2))
271
	jsonValue3 := `{"asset_name": "marble3","color": "blue","size": 3,"owner": "fred"}`
272
	batch.Put("ns1", "key3", []byte(jsonValue3), version.NewHeight(1, 3))
273
	jsonValue4 := `{"asset_name": "marble4","color": "blue","size": 4,"owner": "martha"}`
274
	batch.Put("ns1", "key4", []byte(jsonValue4), version.NewHeight(1, 4))
275
	jsonValue5 := `{"asset_name": "marble5","color": "blue","size": 5,"owner": "fred"}`
276
	batch.Put("ns1", "key5", []byte(jsonValue5), version.NewHeight(1, 5))
277
	jsonValue6 := `{"asset_name": "marble6","color": "blue","size": 6,"owner": "elaine"}`
278
	batch.Put("ns1", "key6", []byte(jsonValue6), version.NewHeight(1, 6))
279
	jsonValue7 := `{"asset_name": "marble7","color": "blue","size": 7,"owner": "fred"}`
280
	batch.Put("ns1", "key7", []byte(jsonValue7), version.NewHeight(1, 7))
281
	jsonValue8 := `{"asset_name": "marble8","color": "blue","size": 8,"owner": "elaine"}`
282
	batch.Put("ns1", "key8", []byte(jsonValue8), version.NewHeight(1, 8))
283
	jsonValue9 := `{"asset_name": "marble9","color": "green","size": 9,"owner": "fred"}`
284
	batch.Put("ns1", "key9", []byte(jsonValue9), version.NewHeight(1, 9))
285
	jsonValue10 := `{"asset_name": "marble10","color": "green","size": 10,"owner": "mary"}`
286
	batch.Put("ns1", "key10", []byte(jsonValue10), version.NewHeight(1, 10))
287
	jsonValue11 := `{"asset_name": "marble11","color": "cyan","size": 1000007,"owner": "joe"}`
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
	batch.Put("ns1", "key11", []byte(jsonValue11), version.NewHeight(1, 11))

	//add keys for a separate namespace
	batch.Put("ns2", "key1", []byte(jsonValue1), version.NewHeight(1, 12))
	batch.Put("ns2", "key2", []byte(jsonValue2), version.NewHeight(1, 13))
	batch.Put("ns2", "key3", []byte(jsonValue3), version.NewHeight(1, 14))
	batch.Put("ns2", "key4", []byte(jsonValue4), version.NewHeight(1, 15))
	batch.Put("ns2", "key5", []byte(jsonValue5), version.NewHeight(1, 16))
	batch.Put("ns2", "key6", []byte(jsonValue6), version.NewHeight(1, 17))
	batch.Put("ns2", "key7", []byte(jsonValue7), version.NewHeight(1, 18))
	batch.Put("ns2", "key8", []byte(jsonValue8), version.NewHeight(1, 19))
	batch.Put("ns2", "key9", []byte(jsonValue9), version.NewHeight(1, 20))
	batch.Put("ns2", "key10", []byte(jsonValue10), version.NewHeight(1, 21))

	savePoint := version.NewHeight(2, 22)
	db.ApplyUpdates(batch, savePoint)

305
306
307
	//Create a tar file for test with 4 index definitions and 2 side dbs
	dbArtifactsTarBytes := testutil.CreateTarBytesForTest(
		[]*testutil.TarFileEntry{
308
309
310
311
			{Name: "META-INF/statedb/couchdb/indexes/indexColorSortName.json", Body: `{"index":{"fields":[{"color":"desc"}]},"ddoc":"indexColorSortName","name":"indexColorSortName","type":"json"}`},
			{Name: "META-INF/statedb/couchdb/indexes/indexSizeSortName.json", Body: `{"index":{"fields":[{"size":"desc"}]},"ddoc":"indexSizeSortName","name":"indexSizeSortName","type":"json"}`},
			{Name: "META-INF/statedb/couchdb/collections/collectionMarbles/indexes/indexCollMarbles.json", Body: `{"index":{"fields":["docType","owner"]},"ddoc":"indexCollectionMarbles", "name":"indexCollectionMarbles","type":"json"}`},
			{Name: "META-INF/statedb/couchdb/collections/collectionMarblesPrivateDetails/indexes/indexCollPrivDetails.json", Body: `{"index":{"fields":["docType","price"]},"ddoc":"indexPrivateDetails", "name":"indexPrivateDetails","type":"json"}`},
312
313
		},
	)
314
315
316
317
318

	//Create a query
	queryString := `{"selector":{"owner":"fred"}}`

	_, err = db.ExecuteQuery("ns1", queryString)
319
	assert.NoError(t, err)
320
321
322
323
324

	//Create a query with a sort
	queryString = `{"selector":{"owner":"fred"}, "sort": [{"size": "desc"}]}`

	_, err = db.ExecuteQuery("ns1", queryString)
325
	assert.Error(t, err, "Error should have been thrown for a missing index")
326

327
328
	indexCapable, ok := db.(statedb.IndexCapable)

329
	if !ok {
330
		t.Fatalf("Couchdb state impl is expected to implement interface `statedb.IndexCapable`")
331
	}
332

333
	fileEntries, errExtract := ccprovider.ExtractFileEntries(dbArtifactsTarBytes, "couchdb")
334
	assert.NoError(t, errExtract)
335

336
	indexCapable.ProcessIndexesForChaincodeDeploy("ns1", fileEntries["META-INF/statedb/couchdb/indexes"])
337
338
339
340
341
342
343
	//Sleep to allow time for index creation
	time.Sleep(100 * time.Millisecond)
	//Create a query with a sort
	queryString = `{"selector":{"owner":"fred"}, "sort": [{"size": "desc"}]}`

	//Query should complete without error
	_, err = db.ExecuteQuery("ns1", queryString)
344
	assert.NoError(t, err)
345
346
347

	//Query namespace "ns2", index is only created in "ns1".  This should return an error.
	_, err = db.ExecuteQuery("ns2", queryString)
348
	assert.Error(t, err, "Error should have been thrown for a missing index")
349

350
}
351

manish's avatar
manish committed
352
353
354
func TestTryCastingToJSON(t *testing.T) {
	sampleJSON := []byte(`{"a":"A", "b":"B"}`)
	isJSON, jsonVal := tryCastingToJSON(sampleJSON)
355
356
357
	assert.True(t, isJSON)
	assert.Equal(t, "A", jsonVal["a"])
	assert.Equal(t, "B", jsonVal["b"])
manish's avatar
manish committed
358
359
360

	sampleNonJSON := []byte(`This is not a json`)
	isJSON, jsonVal = tryCastingToJSON(sampleNonJSON)
361
	assert.False(t, isJSON)
manish's avatar
manish committed
362
363
}

364
365
366
func TestHandleChaincodeDeployErroneousIndexFile(t *testing.T) {
	channelName := "ch1"
	env := NewTestVDBEnv(t)
367
	defer env.Cleanup()
368
	db, err := env.DBProvider.GetDBHandle(channelName)
369
	assert.NoError(t, err)
370
371
372
373
374
375
376
377
378
	db.Open()
	defer db.Close()

	batch := statedb.NewUpdateBatch()
	batch.Put("ns1", "key1", []byte(`{"asset_name": "marble1","color": "blue","size": 1,"owner": "tom"}`), version.NewHeight(1, 1))
	batch.Put("ns1", "key2", []byte(`{"asset_name": "marble2","color": "blue","size": 2,"owner": "jerry"}`), version.NewHeight(1, 2))

	// Create a tar file for test with 2 index definitions - one of them being errorneous
	badSyntaxFileContent := `{"index":{"fields": This is a bad json}`
379
380
	dbArtifactsTarBytes := testutil.CreateTarBytesForTest(
		[]*testutil.TarFileEntry{
381
382
			{Name: "META-INF/statedb/couchdb/indexes/indexSizeSortName.json", Body: `{"index":{"fields":[{"size":"desc"}]},"ddoc":"indexSizeSortName","name":"indexSizeSortName","type":"json"}`},
			{Name: "META-INF/statedb/couchdb/indexes/badSyntax.json", Body: badSyntaxFileContent},
383
384
		},
	)
385
386
387
388
389
390
391

	indexCapable, ok := db.(statedb.IndexCapable)
	if !ok {
		t.Fatalf("Couchdb state impl is expected to implement interface `statedb.IndexCapable`")
	}

	fileEntries, errExtract := ccprovider.ExtractFileEntries(dbArtifactsTarBytes, "couchdb")
392
	assert.NoError(t, errExtract)
393
394
395

	indexCapable.ProcessIndexesForChaincodeDeploy("ns1", fileEntries["META-INF/statedb/couchdb/indexes"])

396
397
398
399
	//Sleep to allow time for index creation
	time.Sleep(100 * time.Millisecond)
	//Query should complete without error
	_, err = db.ExecuteQuery("ns1", `{"selector":{"owner":"fred"}, "sort": [{"size": "desc"}]}`)
400
	assert.NoError(t, err)
401
}
402

manish's avatar
manish committed
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
func TestIsBulkOptimizable(t *testing.T) {
	var db statedb.VersionedDB = &VersionedDB{}
	_, ok := db.(statedb.BulkOptimizable)
	if !ok {
		t.Fatal("state couch db is expected to implement interface statedb.BulkOptimizable")
	}
}

func printCompositeKeys(keys []*statedb.CompositeKey) string {

	compositeKeyString := []string{}
	for _, key := range keys {
		compositeKeyString = append(compositeKeyString, "["+key.Namespace+","+key.Key+"]")
	}
	return strings.Join(compositeKeyString, ",")
}
419
420
421
422
423

// TestPaginatedQuery tests queries with pagination
func TestPaginatedQuery(t *testing.T) {

	env := NewTestVDBEnv(t)
424
	defer env.Cleanup()
425
426

	db, err := env.DBProvider.GetDBHandle("testpaginatedquery")
427
	assert.NoError(t, err)
428
429
430
431
	db.Open()
	defer db.Close()

	batch := statedb.NewUpdateBatch()
432
	jsonValue1 := `{"asset_name": "marble1","color": "blue","size": 1,"owner": "tom"}`
433
	batch.Put("ns1", "key1", []byte(jsonValue1), version.NewHeight(1, 1))
434
	jsonValue2 := `{"asset_name": "marble2","color": "red","size": 2,"owner": "jerry"}`
435
	batch.Put("ns1", "key2", []byte(jsonValue2), version.NewHeight(1, 2))
436
	jsonValue3 := `{"asset_name": "marble3","color": "red","size": 3,"owner": "fred"}`
437
	batch.Put("ns1", "key3", []byte(jsonValue3), version.NewHeight(1, 3))
438
	jsonValue4 := `{"asset_name": "marble4","color": "red","size": 4,"owner": "martha"}`
439
	batch.Put("ns1", "key4", []byte(jsonValue4), version.NewHeight(1, 4))
440
	jsonValue5 := `{"asset_name": "marble5","color": "blue","size": 5,"owner": "fred"}`
441
	batch.Put("ns1", "key5", []byte(jsonValue5), version.NewHeight(1, 5))
442
	jsonValue6 := `{"asset_name": "marble6","color": "red","size": 6,"owner": "elaine"}`
443
	batch.Put("ns1", "key6", []byte(jsonValue6), version.NewHeight(1, 6))
444
	jsonValue7 := `{"asset_name": "marble7","color": "blue","size": 7,"owner": "fred"}`
445
	batch.Put("ns1", "key7", []byte(jsonValue7), version.NewHeight(1, 7))
446
	jsonValue8 := `{"asset_name": "marble8","color": "red","size": 8,"owner": "elaine"}`
447
	batch.Put("ns1", "key8", []byte(jsonValue8), version.NewHeight(1, 8))
448
	jsonValue9 := `{"asset_name": "marble9","color": "green","size": 9,"owner": "fred"}`
449
	batch.Put("ns1", "key9", []byte(jsonValue9), version.NewHeight(1, 9))
450
	jsonValue10 := `{"asset_name": "marble10","color": "green","size": 10,"owner": "mary"}`
451
452
	batch.Put("ns1", "key10", []byte(jsonValue10), version.NewHeight(1, 10))

453
	jsonValue11 := `{"asset_name": "marble11","color": "cyan","size": 11,"owner": "joe"}`
454
	batch.Put("ns1", "key11", []byte(jsonValue11), version.NewHeight(1, 11))
455
	jsonValue12 := `{"asset_name": "marble12","color": "red","size": 12,"owner": "martha"}`
456
	batch.Put("ns1", "key12", []byte(jsonValue12), version.NewHeight(1, 4))
457
	jsonValue13 := `{"asset_name": "marble13","color": "red","size": 13,"owner": "james"}`
458
	batch.Put("ns1", "key13", []byte(jsonValue13), version.NewHeight(1, 4))
459
	jsonValue14 := `{"asset_name": "marble14","color": "red","size": 14,"owner": "fred"}`
460
	batch.Put("ns1", "key14", []byte(jsonValue14), version.NewHeight(1, 4))
461
	jsonValue15 := `{"asset_name": "marble15","color": "red","size": 15,"owner": "mary"}`
462
	batch.Put("ns1", "key15", []byte(jsonValue15), version.NewHeight(1, 4))
463
	jsonValue16 := `{"asset_name": "marble16","color": "red","size": 16,"owner": "robert"}`
464
	batch.Put("ns1", "key16", []byte(jsonValue16), version.NewHeight(1, 4))
465
	jsonValue17 := `{"asset_name": "marble17","color": "red","size": 17,"owner": "alan"}`
466
	batch.Put("ns1", "key17", []byte(jsonValue17), version.NewHeight(1, 4))
467
	jsonValue18 := `{"asset_name": "marble18","color": "red","size": 18,"owner": "elaine"}`
468
	batch.Put("ns1", "key18", []byte(jsonValue18), version.NewHeight(1, 4))
469
	jsonValue19 := `{"asset_name": "marble19","color": "red","size": 19,"owner": "alan"}`
470
	batch.Put("ns1", "key19", []byte(jsonValue19), version.NewHeight(1, 4))
471
	jsonValue20 := `{"asset_name": "marble20","color": "red","size": 20,"owner": "elaine"}`
472
473
	batch.Put("ns1", "key20", []byte(jsonValue20), version.NewHeight(1, 4))

474
	jsonValue21 := `{"asset_name": "marble21","color": "cyan","size": 21,"owner": "joe"}`
475
	batch.Put("ns1", "key21", []byte(jsonValue21), version.NewHeight(1, 11))
476
	jsonValue22 := `{"asset_name": "marble22","color": "red","size": 22,"owner": "martha"}`
477
	batch.Put("ns1", "key22", []byte(jsonValue22), version.NewHeight(1, 4))
478
	jsonValue23 := `{"asset_name": "marble23","color": "blue","size": 23,"owner": "james"}`
479
	batch.Put("ns1", "key23", []byte(jsonValue23), version.NewHeight(1, 4))
480
	jsonValue24 := `{"asset_name": "marble24","color": "red","size": 24,"owner": "fred"}`
481
	batch.Put("ns1", "key24", []byte(jsonValue24), version.NewHeight(1, 4))
482
	jsonValue25 := `{"asset_name": "marble25","color": "red","size": 25,"owner": "mary"}`
483
	batch.Put("ns1", "key25", []byte(jsonValue25), version.NewHeight(1, 4))
484
	jsonValue26 := `{"asset_name": "marble26","color": "red","size": 26,"owner": "robert"}`
485
	batch.Put("ns1", "key26", []byte(jsonValue26), version.NewHeight(1, 4))
486
	jsonValue27 := `{"asset_name": "marble27","color": "green","size": 27,"owner": "alan"}`
487
	batch.Put("ns1", "key27", []byte(jsonValue27), version.NewHeight(1, 4))
488
	jsonValue28 := `{"asset_name": "marble28","color": "red","size": 28,"owner": "elaine"}`
489
	batch.Put("ns1", "key28", []byte(jsonValue28), version.NewHeight(1, 4))
490
	jsonValue29 := `{"asset_name": "marble29","color": "red","size": 29,"owner": "alan"}`
491
	batch.Put("ns1", "key29", []byte(jsonValue29), version.NewHeight(1, 4))
492
	jsonValue30 := `{"asset_name": "marble30","color": "red","size": 30,"owner": "elaine"}`
493
494
	batch.Put("ns1", "key30", []byte(jsonValue30), version.NewHeight(1, 4))

495
	jsonValue31 := `{"asset_name": "marble31","color": "cyan","size": 31,"owner": "joe"}`
496
	batch.Put("ns1", "key31", []byte(jsonValue31), version.NewHeight(1, 11))
497
	jsonValue32 := `{"asset_name": "marble32","color": "red","size": 32,"owner": "martha"}`
498
	batch.Put("ns1", "key32", []byte(jsonValue32), version.NewHeight(1, 4))
499
	jsonValue33 := `{"asset_name": "marble33","color": "red","size": 33,"owner": "james"}`
500
	batch.Put("ns1", "key33", []byte(jsonValue33), version.NewHeight(1, 4))
501
	jsonValue34 := `{"asset_name": "marble34","color": "red","size": 34,"owner": "fred"}`
502
	batch.Put("ns1", "key34", []byte(jsonValue34), version.NewHeight(1, 4))
503
	jsonValue35 := `{"asset_name": "marble35","color": "red","size": 35,"owner": "mary"}`
504
	batch.Put("ns1", "key35", []byte(jsonValue35), version.NewHeight(1, 4))
505
	jsonValue36 := `{"asset_name": "marble36","color": "orange","size": 36,"owner": "robert"}`
506
	batch.Put("ns1", "key36", []byte(jsonValue36), version.NewHeight(1, 4))
507
	jsonValue37 := `{"asset_name": "marble37","color": "red","size": 37,"owner": "alan"}`
508
	batch.Put("ns1", "key37", []byte(jsonValue37), version.NewHeight(1, 4))
509
	jsonValue38 := `{"asset_name": "marble38","color": "yellow","size": 38,"owner": "elaine"}`
510
	batch.Put("ns1", "key38", []byte(jsonValue38), version.NewHeight(1, 4))
511
	jsonValue39 := `{"asset_name": "marble39","color": "red","size": 39,"owner": "alan"}`
512
	batch.Put("ns1", "key39", []byte(jsonValue39), version.NewHeight(1, 4))
513
	jsonValue40 := `{"asset_name": "marble40","color": "red","size": 40,"owner": "elaine"}`
514
515
516
517
518
519
520
521
	batch.Put("ns1", "key40", []byte(jsonValue40), version.NewHeight(1, 4))

	savePoint := version.NewHeight(2, 22)
	db.ApplyUpdates(batch, savePoint)

	// Create a tar file for test with an index for size
	dbArtifactsTarBytes := testutil.CreateTarBytesForTest(
		[]*testutil.TarFileEntry{
522
			{Name: "META-INF/statedb/couchdb/indexes/indexSizeSortName.json", Body: `{"index":{"fields":[{"size":"desc"}]},"ddoc":"indexSizeSortName","name":"indexSizeSortName","type":"json"}`},
523
524
525
526
527
528
529
		},
	)

	// Create a query
	queryString := `{"selector":{"color":"red"}}`

	_, err = db.ExecuteQuery("ns1", queryString)
530
	assert.NoError(t, err)
531
532
533
534
535
536
537
538
539
540
541

	// Create a query with a sort
	queryString = `{"selector":{"color":"red"}, "sort": [{"size": "asc"}]}`

	indexCapable, ok := db.(statedb.IndexCapable)

	if !ok {
		t.Fatalf("Couchdb state impl is expected to implement interface `statedb.IndexCapable`")
	}

	fileEntries, errExtract := ccprovider.ExtractFileEntries(dbArtifactsTarBytes, "couchdb")
542
	assert.NoError(t, errExtract)
543
544
545
546
547
548
549
550
551

	indexCapable.ProcessIndexesForChaincodeDeploy("ns1", fileEntries["META-INF/statedb/couchdb/indexes"])
	// Sleep to allow time for index creation
	time.Sleep(100 * time.Millisecond)
	// Create a query with a sort
	queryString = `{"selector":{"color":"red"}, "sort": [{"size": "asc"}]}`

	// Query should complete without error
	_, err = db.ExecuteQuery("ns1", queryString)
552
	assert.NoError(t, err)
553
554
555
556
557

	// Test explicit paging
	// Execute 3 page queries, there are 28 records with color red, use page size 10
	returnKeys := []string{"key2", "key3", "key4", "key6", "key8", "key12", "key13", "key14", "key15", "key16"}
	bookmark, err := executeQuery(t, db, "ns1", queryString, "", int32(10), returnKeys)
558
	assert.NoError(t, err)
559
560
	returnKeys = []string{"key17", "key18", "key19", "key20", "key22", "key24", "key25", "key26", "key28", "key29"}
	bookmark, err = executeQuery(t, db, "ns1", queryString, bookmark, int32(10), returnKeys)
561
	assert.NoError(t, err)
562
563
564

	returnKeys = []string{"key30", "key32", "key33", "key34", "key35", "key37", "key39", "key40"}
	_, err = executeQuery(t, db, "ns1", queryString, bookmark, int32(10), returnKeys)
565
	assert.NoError(t, err)
566
567
568
569
570
571
572

	// Test explicit paging
	// Increase pagesize to 50,  should return all values
	returnKeys = []string{"key2", "key3", "key4", "key6", "key8", "key12", "key13", "key14", "key15",
		"key16", "key17", "key18", "key19", "key20", "key22", "key24", "key25", "key26", "key28", "key29",
		"key30", "key32", "key33", "key34", "key35", "key37", "key39", "key40"}
	_, err = executeQuery(t, db, "ns1", queryString, "", int32(50), returnKeys)
573
	assert.NoError(t, err)
574
575
576
577
578
579
580
581

	//Set queryLimit to 50
	viper.Set("ledger.state.couchDBConfig.internalQueryLimit", 50)

	// Test explicit paging
	// Pagesize is 10, so all 28 records should be return in 3 "pages"
	returnKeys = []string{"key2", "key3", "key4", "key6", "key8", "key12", "key13", "key14", "key15", "key16"}
	bookmark, err = executeQuery(t, db, "ns1", queryString, "", int32(10), returnKeys)
582
	assert.NoError(t, err)
583
584
	returnKeys = []string{"key17", "key18", "key19", "key20", "key22", "key24", "key25", "key26", "key28", "key29"}
	bookmark, err = executeQuery(t, db, "ns1", queryString, bookmark, int32(10), returnKeys)
585
	assert.NoError(t, err)
586
587
	returnKeys = []string{"key30", "key32", "key33", "key34", "key35", "key37", "key39", "key40"}
	_, err = executeQuery(t, db, "ns1", queryString, bookmark, int32(10), returnKeys)
588
	assert.NoError(t, err)
589
590
591
592
593
594
595
596
597

	// Set queryLimit to 10
	viper.Set("ledger.state.couchDBConfig.internalQueryLimit", 10)

	// Test implicit paging
	returnKeys = []string{"key2", "key3", "key4", "key6", "key8", "key12", "key13", "key14", "key15",
		"key16", "key17", "key18", "key19", "key20", "key22", "key24", "key25", "key26", "key28", "key29",
		"key30", "key32", "key33", "key34", "key35", "key37", "key39", "key40"}
	_, err = executeQuery(t, db, "ns1", queryString, "", int32(0), returnKeys)
598
	assert.NoError(t, err)
599
600
601
602
603
604
605

	//Set queryLimit to 5
	viper.Set("ledger.state.couchDBConfig.internalQueryLimit", 5)

	// pagesize greater than querysize will execute with implicit paging
	returnKeys = []string{"key2", "key3", "key4", "key6", "key8", "key12", "key13", "key14", "key15", "key16"}
	_, err = executeQuery(t, db, "ns1", queryString, "", int32(10), returnKeys)
606
	assert.NoError(t, err)
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655

	// Set queryLimit to 1000
	viper.Set("ledger.state.couchDBConfig.internalQueryLimit", 1000)
}

func executeQuery(t *testing.T, db statedb.VersionedDB, namespace, query, bookmark string, limit int32, returnKeys []string) (string, error) {

	var itr statedb.ResultsIterator
	var err error

	if limit == int32(0) && bookmark == "" {
		itr, err = db.ExecuteQuery(namespace, query)
		if err != nil {
			return "", err
		}
	} else {
		queryOptions := make(map[string]interface{})
		if bookmark != "" {
			queryOptions["bookmark"] = bookmark
		}
		if limit != 0 {
			queryOptions["limit"] = limit
		}

		itr, err = db.ExecuteQueryWithMetadata(namespace, query, queryOptions)
		if err != nil {
			return "", err
		}
	}

	// Verify the keys returned
	commontests.TestItrWithoutClose(t, itr, returnKeys)

	returnBookmark := ""
	if queryResultItr, ok := itr.(statedb.QueryResultsIterator); ok {
		returnBookmark = queryResultItr.GetBookmarkAndClose()
	}

	return returnBookmark, nil
}

// TestPaginatedQueryValidation tests queries with pagination
func TestPaginatedQueryValidation(t *testing.T) {

	queryOptions := make(map[string]interface{})
	queryOptions["bookmark"] = "Test1"
	queryOptions["limit"] = int32(10)

	err := validateQueryMetadata(queryOptions)
656
	assert.NoError(t, err, "An error was thrown for a valid options")
657
658
659
660
661
	queryOptions = make(map[string]interface{})
	queryOptions["bookmark"] = "Test1"
	queryOptions["limit"] = float64(10.2)

	err = validateQueryMetadata(queryOptions)
662
	assert.Error(t, err, "An should have been thrown for an invalid options")
663
664
665
666
667
668

	queryOptions = make(map[string]interface{})
	queryOptions["bookmark"] = "Test1"
	queryOptions["limit"] = "10"

	err = validateQueryMetadata(queryOptions)
669
	assert.Error(t, err, "An should have been thrown for an invalid options")
670
671
672
673
674
675

	queryOptions = make(map[string]interface{})
	queryOptions["bookmark"] = int32(10)
	queryOptions["limit"] = "10"

	err = validateQueryMetadata(queryOptions)
676
	assert.Error(t, err, "An should have been thrown for an invalid options")
677
678
679
680
681
682

	queryOptions = make(map[string]interface{})
	queryOptions["bookmark"] = "Test1"
	queryOptions["limit1"] = int32(10)

	err = validateQueryMetadata(queryOptions)
683
	assert.Error(t, err, "An should have been thrown for an invalid options")
684
685
686
687
688
689

	queryOptions = make(map[string]interface{})
	queryOptions["bookmark1"] = "Test1"
	queryOptions["limit1"] = int32(10)

	err = validateQueryMetadata(queryOptions)
690
	assert.Error(t, err, "An should have been thrown for an invalid options")
691
692

}
693
694
695
696
697
698

func TestApplyUpdatesWithNilHeight(t *testing.T) {
	env := NewTestVDBEnv(t)
	defer env.Cleanup()
	commontests.TestApplyUpdatesWithNilHeight(t, env.DBProvider)
}