couchdb.go 61.2 KB
Newer Older
1
/*
2
3
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
4
5
6
7
8
9
*/

package couchdb

import (
	"bytes"
10
	"compress/gzip"
11
	"context"
12
	"encoding/base64"
13
14
15
16
	"encoding/json"
	"fmt"
	"io"
	"io/ioutil"
17
18
19
	"log"
	"mime"
	"mime/multipart"
20
	"net/http"
21
	"net/http/httputil"
22
	"net/textproto"
23
24
	"net/url"
	"regexp"
25
	"strconv"
26
	"strings"
27
	"time"
28
	"unicode/utf8"
29

30
	"github.com/hyperledger/fabric/common/flogging"
31
	"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
32
	"github.com/pkg/errors"
33
	"go.uber.org/zap/zapcore"
34
35
)

36
var logger = flogging.MustGetLogger("couchdb")
37

38
//time between retry attempts in milliseconds
39
const retryWaitTime = 125
40

41
42
43
44
45
46
47
48
49
// DBOperationResponse is body for successful database calls.
type DBOperationResponse struct {
	Ok  bool
	id  string
	rev string
}

// DBInfo is body for database information.
type DBInfo struct {
50
51
	DbName string `json:"db_name"`
	Sizes  struct {
52
53
54
55
		File     int `json:"file"`
		External int `json:"external"`
		Active   int `json:"active"`
	} `json:"sizes"`
56
	Other struct {
57
58
59
60
61
62
63
64
65
66
67
		DataSize int `json:"data_size"`
	} `json:"other"`
	DocDelCount       int    `json:"doc_del_count"`
	DocCount          int    `json:"doc_count"`
	DiskSize          int    `json:"disk_size"`
	DiskFormatVersion int    `json:"disk_format_version"`
	DataSize          int    `json:"data_size"`
	CompactRunning    bool   `json:"compact_running"`
	InstanceStartTime string `json:"instance_start_time"`
}

68
69
70
71
72
73
74
75
76
//ConnectionInfo is a structure for capturing the database info and version
type ConnectionInfo struct {
	Couchdb string `json:"couchdb"`
	Version string `json:"version"`
	Vendor  struct {
		Name string `json:"name"`
	} `json:"vendor"`
}

77
78
//RangeQueryResponse is used for processing REST range query responses from CouchDB
type RangeQueryResponse struct {
79
80
	TotalRows int32 `json:"total_rows"`
	Offset    int32 `json:"offset"`
81
82
83
84
85
86
87
88
89
90
	Rows      []struct {
		ID    string `json:"id"`
		Key   string `json:"key"`
		Value struct {
			Rev string `json:"rev"`
		} `json:"value"`
		Doc json.RawMessage `json:"doc"`
	} `json:"rows"`
}

91
92
//QueryResponse is used for processing REST query responses from CouchDB
type QueryResponse struct {
93
94
95
	Warning  string            `json:"warning"`
	Docs     []json.RawMessage `json:"docs"`
	Bookmark string            `json:"bookmark"`
96
97
}

98
// DocMetadata is used for capturing CouchDB document header info,
99
// used to capture id, version, rev and attachments returned in the query from CouchDB
100
type DocMetadata struct {
101
102
103
104
	ID              string                     `json:"_id"`
	Rev             string                     `json:"_rev"`
	Version         string                     `json:"~version"`
	AttachmentsInfo map[string]*AttachmentInfo `json:"_attachments"`
105
106
}

107
108
109
110
111
//DocID is a minimal structure for capturing the ID from a query result
type DocID struct {
	ID string `json:"_id"`
}

112
113
//QueryResult is used for returning query results from CouchDB
type QueryResult struct {
114
115
	ID          string
	Value       []byte
116
	Attachments []*AttachmentInfo
117
118
}

119
120
//CouchConnectionDef contains parameters
type CouchConnectionDef struct {
121
122
123
124
125
126
127
	URL                   string
	Username              string
	Password              string
	MaxRetries            int
	MaxRetriesOnStartup   int
	RequestTimeout        time.Duration
	CreateGlobalChangesDB bool
128
129
130
131
}

//CouchInstance represents a CouchDB instance
type CouchInstance struct {
132
133
	conf   CouchConnectionDef //connection configuration
	client *http.Client       // a client to connect to this instance
134
	stats  *stats
135
136
137
138
}

//CouchDatabase represents a database within a CouchDB instance
type CouchDatabase struct {
139
	CouchInstance    *CouchInstance //connection configuration
140
141
	DBName           string
	IndexWarmCounter int
142
143
}

144
145
//DBReturn contains an error reported by CouchDB
type DBReturn struct {
146
147
148
149
150
	StatusCode int    `json:"status_code"`
	Error      string `json:"error"`
	Reason     string `json:"reason"`
}

151
152
153
154
155
156
157
//CreateIndexResponse contains an the index creation response from CouchDB
type CreateIndexResponse struct {
	Result string `json:"result"`
	ID     string `json:"id"`
	Name   string `json:"name"`
}

158
159
//AttachmentInfo contains the definition for an attached file for couchdb
type AttachmentInfo struct {
160
	Name            string
161
	ContentType     string `json:"content_type"`
162
	Length          uint64
163
	AttachmentBytes []byte `json:"data"`
164
165
166
167
168
169
170
171
172
}

//FileDetails defines the structure needed to send an attachment to couchdb
type FileDetails struct {
	Follows     bool   `json:"follows"`
	ContentType string `json:"content_type"`
	Length      int    `json:"length"`
}

173
174
175
//CouchDoc defines the structure for a JSON document value
type CouchDoc struct {
	JSONValue   []byte
176
	Attachments []*AttachmentInfo
177
178
}

179
180
//BatchRetrieveDocMetadataResponse is used for processing REST batch responses from CouchDB
type BatchRetrieveDocMetadataResponse struct {
181
	Rows []struct {
182
183
		ID          string `json:"id"`
		DocMetadata struct {
184
185
			ID      string `json:"_id"`
			Rev     string `json:"_rev"`
186
			Version string `json:"~version"`
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
		} `json:"doc"`
	} `json:"rows"`
}

//BatchUpdateResponse defines a structure for batch update response
type BatchUpdateResponse struct {
	ID     string `json:"id"`
	Error  string `json:"error"`
	Reason string `json:"reason"`
	Ok     bool   `json:"ok"`
	Rev    string `json:"rev"`
}

//Base64Attachment contains the definition for an attached file for couchdb
type Base64Attachment struct {
	ContentType    string `json:"content_type"`
	AttachmentData string `json:"data"`
}

206
207
208
209
210
211
212
//IndexResult contains the definition for a couchdb index
type IndexResult struct {
	DesignDocument string `json:"designdoc"`
	Name           string `json:"name"`
	Definition     string `json:"definition"`
}

213
214
215
216
217
218
219
220
221
222
223
224
//DatabaseSecurity contains the definition for CouchDB database security
type DatabaseSecurity struct {
	Admins struct {
		Names []string `json:"names"`
		Roles []string `json:"roles"`
	} `json:"admins"`
	Members struct {
		Names []string `json:"names"`
		Roles []string `json:"roles"`
	} `json:"members"`
}

225
226
227
// closeResponseBody discards the body and then closes it to enable returning it to
// connection pool
func closeResponseBody(resp *http.Response) {
228
229
230
231
	if resp != nil {
		io.Copy(ioutil.Discard, resp.Body) // discard whatever is remaining of body
		resp.Body.Close()
	}
232
233
}

234
//CreateConnectionDefinition for a new client connection
235
func CreateConnectionDefinition(couchDBAddress, username, password string, maxRetries,
236
	maxRetriesOnStartup int, requestTimeout time.Duration, createGlobalChangesDB bool) (*CouchConnectionDef, error) {
237

238
	logger.Debugf("Entering CreateConnectionDefinition()")
239

240
241
242
243
	connectURL := &url.URL{
		Host:   couchDBAddress,
		Scheme: "http",
	}
244
245

	//parse the constructed URL to verify no errors
246
	finalURL, err := url.Parse(connectURL.String())
247
	if err != nil {
248
249
		logger.Errorf("URL parse error: %s", err)
		return nil, errors.Wrapf(err, "error parsing connect URL: %s", connectURL)
250
251
	}

252
253
	logger.Debugf("Created database configuration  URL=[%s]", finalURL.String())
	logger.Debugf("Exiting CreateConnectionDefinition()")
254
255

	//return an object containing the connection information
256
	return &CouchConnectionDef{finalURL.String(), username, password, maxRetries,
257
		maxRetriesOnStartup, requestTimeout, createGlobalChangesDB}, nil
258

259
260
261
}

//CreateDatabaseIfNotExist method provides function to create database
262
func (dbclient *CouchDatabase) CreateDatabaseIfNotExist() error {
263

264
	logger.Debugf("[%s] Entering CreateDatabaseIfNotExist()", dbclient.DBName)
265
266
267
268

	dbInfo, couchDBReturn, err := dbclient.GetDatabaseInfo()
	if err != nil {
		if couchDBReturn == nil || couchDBReturn.StatusCode != 404 {
269
			return err
270
271
272
		}
	}

273
274
	//If the dbInfo returns populated and status code is 200, then the database exists
	if dbInfo != nil && couchDBReturn.StatusCode == 200 {
275

276
277
278
279
280
281
		//Apply database security if needed
		errSecurity := dbclient.applyDatabasePermissions()
		if errSecurity != nil {
			return errSecurity
		}

282
		logger.Debugf("[%s] Database already exists", dbclient.DBName)
283

284
		logger.Debugf("[%s] Exiting CreateDatabaseIfNotExist()", dbclient.DBName)
285

286
287
		return nil
	}
288

289
	logger.Debugf("[%s] Database does not exist.", dbclient.DBName)
290

291
292
	connectURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
	if err != nil {
293
294
		logger.Errorf("URL parse error: %s", err)
		return errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.conf.URL)
295
	}
296

297
298
	//get the number of retries
	maxRetries := dbclient.CouchInstance.conf.MaxRetries
299

300
	//process the URL with a PUT, creates the database
301
	resp, _, err := dbclient.handleRequest(http.MethodPut, "CreateDatabaseIfNotExist", connectURL, nil, "", "", maxRetries, true, nil)
302

303
	if err != nil {
304

305
306
307
308
309
310
311
312
		// Check to see if the database exists
		// Even though handleRequest() returned an error, the
		// database may have been created and a false error
		// returned due to a timeout or race condition.
		// Do a final check to see if the database really got created.
		dbInfo, couchDBReturn, errDbInfo := dbclient.GetDatabaseInfo()
		//If there is no error, then the database exists,  return without an error
		if errDbInfo == nil && dbInfo != nil && couchDBReturn.StatusCode == 200 {
313
314
315
316
317
318

			errSecurity := dbclient.applyDatabasePermissions()
			if errSecurity != nil {
				return errSecurity
			}

319
320
			logger.Infof("[%s] Created state database", dbclient.DBName)
			logger.Debugf("[%s] Exiting CreateDatabaseIfNotExist()", dbclient.DBName)
321
322
323
324
			return nil
		}

		return err
325

326
	}
327
328
	defer closeResponseBody(resp)

329
330
331
332
333
	errSecurity := dbclient.applyDatabasePermissions()
	if errSecurity != nil {
		return errSecurity
	}

334
	logger.Infof("Created state database %s", dbclient.DBName)
335

336
	logger.Debugf("[%s] Exiting CreateDatabaseIfNotExist()", dbclient.DBName)
337

338
	return nil
339
340
341

}

342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
//applyDatabaseSecurity
func (dbclient *CouchDatabase) applyDatabasePermissions() error {

	//If the username and password are not set, then skip applying permissions
	if dbclient.CouchInstance.conf.Username == "" && dbclient.CouchInstance.conf.Password == "" {
		return nil
	}

	securityPermissions := &DatabaseSecurity{}

	securityPermissions.Admins.Names = append(securityPermissions.Admins.Names, dbclient.CouchInstance.conf.Username)
	securityPermissions.Members.Names = append(securityPermissions.Members.Names, dbclient.CouchInstance.conf.Username)

	err := dbclient.ApplyDatabaseSecurity(securityPermissions)
	if err != nil {
		return err
	}

	return nil
}

363
//GetDatabaseInfo method provides function to retrieve database information
364
func (dbclient *CouchDatabase) GetDatabaseInfo() (*DBInfo, *DBReturn, error) {
365

366
	connectURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
367
	if err != nil {
368
369
		logger.Errorf("URL parse error: %s", err)
		return nil, nil, errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.conf.URL)
370
	}
371

372
373
374
	//get the number of retries
	maxRetries := dbclient.CouchInstance.conf.MaxRetries

375
	resp, couchDBReturn, err := dbclient.handleRequest(http.MethodGet, "GetDatabaseInfo", connectURL, nil, "", "", maxRetries, true, nil)
376
377
378
	if err != nil {
		return nil, couchDBReturn, err
	}
379
	defer closeResponseBody(resp)
380
381

	dbResponse := &DBInfo{}
382
383
	decodeErr := json.NewDecoder(resp.Body).Decode(&dbResponse)
	if decodeErr != nil {
384
		return nil, nil, errors.Wrap(decodeErr, "error decoding response body")
385
	}
386

387
	// trace the database info response
388
	logger.Debugw("GetDatabaseInfo()", "dbResponseJSON", dbResponse)
389

390
391
392
393
	return dbResponse, couchDBReturn, nil

}

394
395
396
397
398
//VerifyCouchConfig method provides function to verify the connection information
func (couchInstance *CouchInstance) VerifyCouchConfig() (*ConnectionInfo, *DBReturn, error) {

	logger.Debugf("Entering VerifyCouchConfig()")
	defer logger.Debugf("Exiting VerifyCouchConfig()")
399
400
401

	connectURL, err := url.Parse(couchInstance.conf.URL)
	if err != nil {
402
403
		logger.Errorf("URL parse error: %s", err)
		return nil, nil, errors.Wrapf(err, "error parsing couch instance URL: %s", couchInstance.conf.URL)
404
405
406
	}
	connectURL.Path = "/"

407
408
409
	//get the number of retries for startup
	maxRetriesOnStartup := couchInstance.conf.MaxRetriesOnStartup

410
	resp, couchDBReturn, err := couchInstance.handleRequest(context.Background(), http.MethodGet, "", "VerifyCouchConfig", connectURL, nil,
411
		couchInstance.conf.Username, couchInstance.conf.Password, maxRetriesOnStartup, true, nil)
412

413
	if err != nil {
414
		return nil, couchDBReturn, errors.WithMessage(err, "unable to connect to CouchDB, check the hostname and port")
415
	}
416
	defer closeResponseBody(resp)
417
418

	dbResponse := &ConnectionInfo{}
419
420
	decodeErr := json.NewDecoder(resp.Body).Decode(&dbResponse)
	if decodeErr != nil {
421
		return nil, nil, errors.Wrap(decodeErr, "error decoding response body")
422
423
424
	}

	// trace the database info response
425
426
	logger.Debugw("VerifyConnection() dbResponseJSON: %s", dbResponse)

427
428
429
430
	//check to see if the system databases exist
	//Verifying the existence of the system database accomplishes two steps
	//1.  Ensures the system databases are created
	//2.  Verifies the username password provided in the CouchDB config are valid for system admin
431
	err = CreateSystemDatabasesIfNotExist(couchInstance)
432
	if err != nil {
433
434
		logger.Errorf("Unable to connect to CouchDB, error: %s. Check the admin username and password.", err)
		return nil, nil, errors.WithMessage(err, "unable to connect to CouchDB. Check the admin username and password")
435
436
	}

437
438
439
	return dbResponse, couchDBReturn, nil
}

440
441
442
443
444
445
446
447
448
449
450
451
452
453
// HealthCheck checks if the peer is able to communicate with CouchDB
func (couchInstance *CouchInstance) HealthCheck(ctx context.Context) error {
	connectURL, err := url.Parse(couchInstance.conf.URL)
	if err != nil {
		logger.Errorf("URL parse error: %s", err)
		return errors.Wrapf(err, "error parsing CouchDB URL: %s", couchInstance.conf.URL)
	}
	_, _, err = couchInstance.handleRequest(ctx, http.MethodHead, "", "HealthCheck", connectURL, nil, "", "", 0, true, nil)
	if err != nil {
		return fmt.Errorf("failed to connect to couch db [%s]", err)
	}
	return nil
}

454
//DropDatabase provides method to drop an existing database
455
func (dbclient *CouchDatabase) DropDatabase() (*DBOperationResponse, error) {
456
	dbName := dbclient.DBName
457

458
	logger.Debugf("[%s] Entering DropDatabase()", dbName)
459

460
	connectURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
461
	if err != nil {
462
463
		logger.Errorf("URL parse error: %s", err)
		return nil, errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.conf.URL)
464
	}
465

466
467
468
	//get the number of retries
	maxRetries := dbclient.CouchInstance.conf.MaxRetries

469
	resp, _, err := dbclient.handleRequest(http.MethodDelete, "DropDatabase", connectURL, nil, "", "", maxRetries, true, nil)
470
471
472
	if err != nil {
		return nil, err
	}
473
	defer closeResponseBody(resp)
474
475

	dbResponse := &DBOperationResponse{}
476
477
	decodeErr := json.NewDecoder(resp.Body).Decode(&dbResponse)
	if decodeErr != nil {
478
		return nil, errors.Wrap(decodeErr, "error decoding response body")
479
	}
480
481

	if dbResponse.Ok == true {
482
		logger.Debugf("[%s] Dropped database", dbclient.DBName)
483
484
	}

485
	logger.Debugf("[%s] Exiting DropDatabase()", dbclient.DBName)
486
487
488
489
490
491
492

	if dbResponse.Ok == true {

		return dbResponse, nil

	}

493
	return dbResponse, errors.New("error dropping database")
494

495
496
}

497
// EnsureFullCommit calls _ensure_full_commit for explicit fsync
498
func (dbclient *CouchDatabase) EnsureFullCommit() (*DBOperationResponse, error) {
499
	dbName := dbclient.DBName
500

501
	logger.Debugf("[%s] Entering EnsureFullCommit()", dbName)
502

503
	connectURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
504
	if err != nil {
505
506
		logger.Errorf("URL parse error: %s", err)
		return nil, errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.conf.URL)
507
	}
508

509
510
511
	//get the number of retries
	maxRetries := dbclient.CouchInstance.conf.MaxRetries

512
	resp, _, err := dbclient.handleRequest(http.MethodPost, "EnsureFullCommit", connectURL, nil, "", "", maxRetries, true, nil, "_ensure_full_commit")
513
	if err != nil {
514
		logger.Errorf("Failed to invoke couchdb _ensure_full_commit. Error: %+v", err)
515
516
		return nil, err
	}
517
	defer closeResponseBody(resp)
518
519

	dbResponse := &DBOperationResponse{}
520
521
	decodeErr := json.NewDecoder(resp.Body).Decode(&dbResponse)
	if decodeErr != nil {
522
		return nil, errors.Wrap(decodeErr, "error decoding response body")
523
	}
524

525
	//Check to see if autoWarmIndexes is enabled
526
527
528
529
	//If autoWarmIndexes is enabled, indexes will be refreshed after the number of blocks
	//in GetWarmIndexesAfterNBlocks() have been committed to the state database
	//Check to see if the number of blocks committed exceeds the threshold for index warming
	//Use a go routine to launch WarmIndexAllIndexes(), this will execute as a background process
530
	if ledgerconfig.IsAutoWarmIndexesEnabled() {
531
532
533
534
535
536
537

		if dbclient.IndexWarmCounter >= ledgerconfig.GetWarmIndexesAfterNBlocks() {
			go dbclient.runWarmIndexAllIndexes()
			dbclient.IndexWarmCounter = 0
		}
		dbclient.IndexWarmCounter++

538
539
	}

540
	logger.Debugf("[%s] Exiting EnsureFullCommit()", dbclient.DBName)
541
542
543
544
545
546
547

	if dbResponse.Ok == true {

		return dbResponse, nil

	}

548
	return dbResponse, errors.New("error syncing database")
549
550
}

551
//SaveDoc method provides a function to save a document, id and byte array
552
func (dbclient *CouchDatabase) SaveDoc(id string, rev string, couchDoc *CouchDoc) (string, error) {
553
	dbName := dbclient.DBName
554

555
	logger.Debugf("[%s] Entering SaveDoc() id=[%s]", dbName, id)
556

557
	if !utf8.ValidString(id) {
558
		return "", errors.Errorf("doc id [%x] not a valid utf8 string", id)
559
	}
560

561
	saveURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
562
	if err != nil {
563
564
		logger.Errorf("URL parse error: %s", err)
		return "", errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.conf.URL)
565
	}
566

567
	//Set up a buffer for the data to be pushed to couchdb
568
	data := []byte{}
569
570
571
572

	//Set up a default boundary for use by multipart if sending attachments
	defaultBoundary := ""

573
574
575
	//Create a flag for shared connections.  This is set to false for zero length attachments
	keepConnectionOpen := true

576
	//check to see if attachments is nil, if so, then this is a JSON only
577
	if couchDoc.Attachments == nil {
578
579

		//Test to see if this is a valid JSON
580
		if IsJSON(string(couchDoc.JSONValue)) != true {
581
			return "", errors.New("JSON format is not valid")
582
583
584
		}

		// if there are no attachments, then use the bytes passed in as the JSON
585
		data = couchDoc.JSONValue
586

587
	} else { // there are attachments
588
589

		//attachments are included, create the multipart definition
590
		multipartData, multipartBoundary, err3 := createAttachmentPart(couchDoc, defaultBoundary)
591
592
		if err3 != nil {
			return "", err3
593
594
		}

595
596
597
598
599
600
601
		//If there is a zero length attachment, do not keep the connection open
		for _, attach := range couchDoc.Attachments {
			if attach.Length < 1 {
				keepConnectionOpen = false
			}
		}

602
		//Set the data buffer to the data from the create multi-part data
603
		data = multipartData.Bytes()
604
605
606

		//Set the default boundary to the value generated in the multipart creation
		defaultBoundary = multipartBoundary
607

608
609
	}

610
611
612
	//get the number of retries
	maxRetries := dbclient.CouchInstance.conf.MaxRetries

613
	//handle the request for saving document with a retry if there is a revision conflict
614
	resp, _, err := dbclient.handleRequestWithRevisionRetry(id, http.MethodPut, dbName, "SaveDoc", saveURL, data, rev, defaultBoundary, maxRetries, keepConnectionOpen, nil)
615

616
617
618
	if err != nil {
		return "", err
	}
619
	defer closeResponseBody(resp)
620

621
	//get the revision and return
622
623
624
625
626
	revision, err := getRevisionHeader(resp)
	if err != nil {
		return "", err
	}

627
	logger.Debugf("[%s] Exiting SaveDoc()", dbclient.DBName)
628
629
630
631
632

	return revision, nil

}

633
634
635
636
637
638
639
640
641
642
643
644
645
646
//getDocumentRevision will return the revision if the document exists, otherwise it will return ""
func (dbclient *CouchDatabase) getDocumentRevision(id string) string {

	var rev = ""

	//See if the document already exists, we need the rev for saves and deletes
	_, revdoc, err := dbclient.ReadDoc(id)
	if err == nil {
		//set the revision to the rev returned from the document read
		rev = revdoc
	}
	return rev
}

647
func createAttachmentPart(couchDoc *CouchDoc, defaultBoundary string) (bytes.Buffer, string, error) {
648
649
650

	//Create a buffer for writing the result
	writeBuffer := new(bytes.Buffer)
651
652

	// read the attachment and save as an attachment
653
	writer := multipart.NewWriter(writeBuffer)
654
655
656
657
658
659

	//retrieve the boundary for the multipart
	defaultBoundary = writer.Boundary()

	fileAttachments := map[string]FileDetails{}

660
	for _, attachment := range couchDoc.Attachments {
661
662
663
664
665
666
		fileAttachments[attachment.Name] = FileDetails{true, attachment.ContentType, len(attachment.AttachmentBytes)}
	}

	attachmentJSONMap := map[string]interface{}{
		"_attachments": fileAttachments}

667
	//Add any data uploaded with the files
668
	if couchDoc.JSONValue != nil {
669
670
671

		//create a generic map
		genericMap := make(map[string]interface{})
672

673
		//unmarshal the data into the generic map
674
675
		decoder := json.NewDecoder(bytes.NewBuffer(couchDoc.JSONValue))
		decoder.UseNumber()
676
677
		decodeErr := decoder.Decode(&genericMap)
		if decodeErr != nil {
678
			return *writeBuffer, "", errors.Wrap(decodeErr, "error decoding json data")
679
		}
680
681
682
683
684
685
686
687

		//add all key/values to the attachmentJSONMap
		for jsonKey, jsonValue := range genericMap {
			attachmentJSONMap[jsonKey] = jsonValue
		}

	}

688
689
	filesForUpload, err := json.Marshal(attachmentJSONMap)
	if err != nil {
690
		return *writeBuffer, "", errors.Wrap(err, "error marshalling json data")
691
692
	}

693
694
695
696
697
698
699
700
	logger.Debugf(string(filesForUpload))

	//create the header for the JSON
	header := make(textproto.MIMEHeader)
	header.Set("Content-Type", "application/json")

	part, err := writer.CreatePart(header)
	if err != nil {
701
		return *writeBuffer, defaultBoundary, errors.Wrap(err, "error creating multipart")
702
703
704
705
	}

	part.Write(filesForUpload)

706
	for _, attachment := range couchDoc.Attachments {
707
708
709
710

		header := make(textproto.MIMEHeader)
		part, err2 := writer.CreatePart(header)
		if err2 != nil {
711
			return *writeBuffer, defaultBoundary, errors.Wrap(err2, "error creating multipart")
712
713
714
715
716
717
718
		}
		part.Write(attachment.AttachmentBytes)

	}

	err = writer.Close()
	if err != nil {
719
		return *writeBuffer, defaultBoundary, errors.Wrap(err, "error closing multipart writer")
720
721
	}

722
	return *writeBuffer, defaultBoundary, nil
723
724
725

}

726
727
func getRevisionHeader(resp *http.Response) (string, error) {

728
	if resp == nil {
729
		return "", errors.New("no response received from CouchDB")
730
731
	}

732
733
734
	revision := resp.Header.Get("Etag")

	if revision == "" {
735
		return "", errors.New("no revision tag detected")
736
737
	}

738
739
740
741
	reg := regexp.MustCompile(`"([^"]*)"`)
	revisionNoQuotes := reg.ReplaceAllString(revision, "${1}")
	return revisionNoQuotes, nil

742
743
}

744
745
//ReadDoc method provides function to retrieve a document and its revision
//from the database by id
746
747
func (dbclient *CouchDatabase) ReadDoc(id string) (*CouchDoc, string, error) {
	var couchDoc CouchDoc
748
	attachments := []*AttachmentInfo{}
749
	dbName := dbclient.DBName
750

751
	logger.Debugf("[%s] Entering ReadDoc()  id=[%s]", dbName, id)
752
	if !utf8.ValidString(id) {
753
		return nil, "", errors.Errorf("doc id [%x] not a valid utf8 string", id)
754
	}
755

756
	readURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
757
	if err != nil {
758
759
		logger.Errorf("URL parse error: %s", err)
		return nil, "", errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.conf.URL)
760
	}
761

762
763
764
	query := readURL.Query()
	query.Add("attachments", "true")

765
766
767
	//get the number of retries
	maxRetries := dbclient.CouchInstance.conf.MaxRetries

768
	resp, couchDBReturn, err := dbclient.handleRequest(http.MethodGet, "ReadDoc", readURL, nil, "", "", maxRetries, true, &query, id)
769
	if err != nil {
770
		if couchDBReturn != nil && couchDBReturn.StatusCode == 404 {
771
			logger.Debugf("[%s] Document not found (404), returning nil value instead of 404 error", dbclient.DBName)
772
773
774
775
			// non-existent document should return nil value instead of a 404 error
			// for details see https://github.com/hyperledger-archives/fabric/issues/936
			return nil, "", nil
		}
776
		logger.Debugf("[%s] couchDBReturn=%v\n", dbclient.DBName, couchDBReturn)
777
778
		return nil, "", err
	}
779
	defer closeResponseBody(resp)
780

781
782
	//Get the media type from the Content-Type header
	mediaType, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
783
	if err != nil {
784
		log.Fatal(err)
785
786
	}

787
	//Get the revision from header
788
789
790
791
792
	revision, err := getRevisionHeader(resp)
	if err != nil {
		return nil, "", err
	}

793
794
795
796
797
798
799
	//check to see if the is multipart,  handle as attachment if multipart is detected
	if strings.HasPrefix(mediaType, "multipart/") {
		//Set up the multipart reader based on the boundary
		multipartReader := multipart.NewReader(resp.Body, params["boundary"])
		for {
			p, err := multipartReader.NextPart()
			if err == io.EOF {
800
				break // processed all parts
801
802
			}
			if err != nil {
803
				return nil, "", errors.Wrap(err, "error reading next multipart")
804
805
			}

806
			defer p.Close()
807

808
			logger.Debugf("[%s] part header=%s", dbclient.DBName, p.Header)
809
810
			switch p.Header.Get("Content-Type") {
			case "application/json":
811
812
				partdata, err := ioutil.ReadAll(p)
				if err != nil {
813
					return nil, "", errors.Wrap(err, "error reading multipart data")
814
				}
815
816
				couchDoc.JSONValue = partdata
			default:
817

818
				//Create an attachment structure and load it
819
				attachment := &AttachmentInfo{}
820
821
822
823
824
825
826
827
828
829
				attachment.ContentType = p.Header.Get("Content-Type")
				contentDispositionParts := strings.Split(p.Header.Get("Content-Disposition"), ";")
				if strings.TrimSpace(contentDispositionParts[0]) == "attachment" {
					switch p.Header.Get("Content-Encoding") {
					case "gzip": //See if the part is gzip encoded

						var respBody []byte

						gr, err := gzip.NewReader(p)
						if err != nil {
830
							return nil, "", errors.Wrap(err, "error creating gzip reader")
831
832
833
						}
						respBody, err = ioutil.ReadAll(gr)
						if err != nil {
834
							return nil, "", errors.Wrap(err, "error reading gzip data")
835
836
						}

837
						logger.Debugf("[%s] Retrieved attachment data", dbclient.DBName)
838
						attachment.AttachmentBytes = respBody
839
						attachment.Length = uint64(len(attachment.AttachmentBytes))
840
						attachment.Name = p.FileName()
841
						attachments = append(attachments, attachment)
842
843
844
845
846
847

					default:

						//retrieve the data,  this is not gzip
						partdata, err := ioutil.ReadAll(p)
						if err != nil {
848
							return nil, "", errors.Wrap(err, "error reading multipart data")
849
						}
850
						logger.Debugf("[%s] Retrieved attachment data", dbclient.DBName)
851
						attachment.AttachmentBytes = partdata
852
						attachment.Length = uint64(len(attachment.AttachmentBytes))
853
						attachment.Name = p.FileName()
854
						attachments = append(attachments, attachment)
855
856
857
858
859
860

					} // end content-encoding switch
				} // end if attachment
			} // end content-type switch
		} // for all multiparts

861
862
		couchDoc.Attachments = attachments

863
864
		return &couchDoc, revision, nil
	}
865

866
867
868
	//handle as JSON document
	couchDoc.JSONValue, err = ioutil.ReadAll(resp.Body)
	if err != nil {
869
		return nil, "", errors.Wrap(err, "error reading response body")
870
	}
871

872
	logger.Debugf("[%s] Exiting ReadDoc()", dbclient.DBName)
873
	return &couchDoc, revision, nil
874
875
}

876
877
//ReadDocRange method provides function to a range of documents based on the start and end keys
//startKey and endKey can also be empty strings.  If startKey and endKey are empty, all documents are returned
878
879
//This function provides a limit option to specify the max number of entries and is supplied by config.
//Skip is reserved for possible future future use.
880
func (dbclient *CouchDatabase) ReadDocRange(startKey, endKey string, limit int32) ([]*QueryResult, string, error) {
881
882
	dbName := dbclient.DBName
	logger.Debugf("[%s] Entering ReadDocRange()  startKey=%s, endKey=%s", dbName, startKey, endKey)
883

884
	var results []*QueryResult
885

886
	rangeURL, err := url.Parse(dbclient.CouchInstance.conf.URL)
887
	if err != nil {
888
		logger.Errorf("URL parse error: %s", err)
889
		return nil, "", errors.Wrapf(err, "error parsing CouchDB URL: %s", dbclient.CouchInstance.conf.URL)
890
891
	}

892
	queryParms := rangeURL.Query()
893
894
	//Increment the limit by 1 to see if there are more qualifying records
	queryParms.Set("limit", strconv.FormatInt(int64(limit+1), 10))
895
	queryParms.Add("include_docs", "true")
896
	queryParms.Add("inclusive_end", "false") // endkey should be exclusive to be consistent with goleveldb
897
	queryParms.Add("attachments", "true")    // get the attachments as well
898
899
900

	//Append the startKey if provided
	if startKey != "" {
901
		if startKey, err = encodeForJSON(startKey); err != nil {
902
			return nil, "", err
903
		}
904
		queryParms.Add("startkey", "\""+startKey+"\"")
905
906
907
908
	}

	//Append the endKey if provided
	if endKey != "" {
909
910
		var err error
		if endKey, err = encodeForJSON(endKey); err != nil {
911
			return nil, "", err
912
		}
913
		queryParms.Add("endkey", "\""+endKey+"\"")
914
915
	}

916
917
918
	//get the number of retries
	maxRetries := dbclient.CouchInstance.conf.MaxRetries

919
	resp, _, err := dbclient.handleRequest(http.MethodGet, "RangeDocRange", rangeURL, nil, "", "", maxRetries, true, &queryParms, "_all_docs")
920
	if err != nil {
921
		return nil, "", err
922
	}
923
	defer closeResponseBody(resp)