Commit ab1b9eed authored by wenjian3's avatar wenjian3
Browse files

FAB-5766 Error handling improvement for ledger (part 3)



Part 3 of ledger servicebility improvement:
Update common/ledger to use error handling framework

Change-Id: I86383b404941c5f922fa2f26ca71af4d91351d4c
Signed-off-by: default avatarWenjian Qiao <wenjianq@gmail.com>
parent 7893ab37
/* /*
Copyright IBM Corp. 2016 All Rights Reserved. Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); SPDX-License-Identifier: Apache-2.0
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/ */
package blkstorage package blkstorage
import ( import (
"errors"
"github.com/hyperledger/fabric/common/ledger" "github.com/hyperledger/fabric/common/ledger"
l "github.com/hyperledger/fabric/core/ledger" l "github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer" "github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
) )
// IndexableAttr represents an indexable attribute // IndexableAttr represents an indexable attribute
...@@ -48,7 +37,7 @@ var ( ...@@ -48,7 +37,7 @@ var (
ErrNotFoundInIndex = l.NotFoundInIndexErr("") ErrNotFoundInIndex = l.NotFoundInIndexErr("")
// ErrAttrNotIndexed is used to indicate that an attribute is not indexed // ErrAttrNotIndexed is used to indicate that an attribute is not indexed
ErrAttrNotIndexed = errors.New("Attribute not indexed") ErrAttrNotIndexed = errors.New("attribute not indexed")
) )
// BlockStoreProvider provides an handle to a BlockStore // BlockStoreProvider provides an handle to a BlockStore
......
/* /*
Copyright IBM Corp. 2016 All Rights Reserved. Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); SPDX-License-Identifier: Apache-2.0
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/ */
package fsblkstorage package fsblkstorage
import ( import (
"bufio" "bufio"
"errors"
"fmt" "fmt"
"io" "io"
"os" "os"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/pkg/errors"
) )
// ErrUnexpectedEndOfBlockfile error used to indicate an unexpected end of a file segment // ErrUnexpectedEndOfBlockfile error used to indicate an unexpected end of a file segment
...@@ -67,14 +57,14 @@ func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockf ...@@ -67,14 +57,14 @@ func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockf
var file *os.File var file *os.File
var err error var err error
if file, err = os.OpenFile(filePath, os.O_RDONLY, 0600); err != nil { if file, err = os.OpenFile(filePath, os.O_RDONLY, 0600); err != nil {
return nil, err return nil, errors.Wrapf(err, "error opening block file %s", filePath)
} }
var newPosition int64 var newPosition int64
if newPosition, err = file.Seek(startOffset, 0); err != nil { if newPosition, err = file.Seek(startOffset, 0); err != nil {
return nil, err return nil, errors.Wrapf(err, "error seeking block file [%s] to startOffset [%d]", filePath, startOffset)
} }
if newPosition != startOffset { if newPosition != startOffset {
panic(fmt.Sprintf("Could not seek file [%s] to given startOffset [%d]. New position = [%d]", panic(fmt.Sprintf("Could not seek block file [%s] to startOffset [%d]. New position = [%d]",
filePath, startOffset, newPosition)) filePath, startOffset, newPosition))
} }
s := &blockfileStream{fileNum, file, bufio.NewReader(file), startOffset} s := &blockfileStream{fileNum, file, bufio.NewReader(file), startOffset}
...@@ -97,7 +87,7 @@ func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacem ...@@ -97,7 +87,7 @@ func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacem
moreContentAvailable := true moreContentAvailable := true
if fileInfo, err = s.file.Stat(); err != nil { if fileInfo, err = s.file.Stat(); err != nil {
return nil, nil, err return nil, nil, errors.Wrapf(err, "error getting block file stat")
} }
if s.currentOffset == fileInfo.Size() { if s.currentOffset == fileInfo.Size() {
logger.Debugf("Finished reading file number [%d]", s.fileNum) logger.Debugf("Finished reading file number [%d]", s.fileNum)
...@@ -113,7 +103,7 @@ func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacem ...@@ -113,7 +103,7 @@ func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacem
} }
logger.Debugf("Remaining bytes=[%d], Going to peek [%d] bytes", remainingBytes, peekBytes) logger.Debugf("Remaining bytes=[%d], Going to peek [%d] bytes", remainingBytes, peekBytes)
if lenBytes, err = s.reader.Peek(peekBytes); err != nil { if lenBytes, err = s.reader.Peek(peekBytes); err != nil {
return nil, nil, err return nil, nil, errors.Wrapf(err, "error peeking [%d] bytes from block file", peekBytes)
} }
length, n := proto.DecodeVarint(lenBytes) length, n := proto.DecodeVarint(lenBytes)
if n == 0 { if n == 0 {
...@@ -122,7 +112,7 @@ func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacem ...@@ -122,7 +112,7 @@ func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacem
if !moreContentAvailable { if !moreContentAvailable {
return nil, nil, ErrUnexpectedEndOfBlockfile return nil, nil, ErrUnexpectedEndOfBlockfile
} }
panic(fmt.Errorf("Error in decoding varint bytes [%#v]", lenBytes)) panic(errors.Errorf("Error in decoding varint bytes [%#v]", lenBytes))
} }
bytesExpected := int64(n) + int64(length) bytesExpected := int64(n) + int64(length)
if bytesExpected > remainingBytes { if bytesExpected > remainingBytes {
...@@ -132,12 +122,12 @@ func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacem ...@@ -132,12 +122,12 @@ func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacem
} }
// skip the bytes representing the block size // skip the bytes representing the block size
if _, err = s.reader.Discard(n); err != nil { if _, err = s.reader.Discard(n); err != nil {
return nil, nil, err return nil, nil, errors.Wrapf(err, "error discarding [%d] bytes", n)
} }
blockBytes := make([]byte, length) blockBytes := make([]byte, length)
if _, err = io.ReadAtLeast(s.reader, blockBytes, int(length)); err != nil { if _, err = io.ReadAtLeast(s.reader, blockBytes, int(length)); err != nil {
logger.Debugf("Error while trying to read [%d] bytes from fileNum [%d]: %s", length, s.fileNum, err) logger.Errorf("Error reading [%d] bytes from file number [%d], error: %s", length, s.fileNum, err)
return nil, nil, err return nil, nil, errors.Wrapf(err, "error reading [%d] bytes from file number [%d]", length, s.fileNum)
} }
blockPlacementInfo := &blockPlacementInfo{ blockPlacementInfo := &blockPlacementInfo{
fileNum: s.fileNum, fileNum: s.fileNum,
...@@ -149,7 +139,7 @@ func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacem ...@@ -149,7 +139,7 @@ func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacem
} }
func (s *blockfileStream) close() error { func (s *blockfileStream) close() error {
return s.file.Close() return errors.WithStack(s.file.Close())
} }
/////////////////////////////////// ///////////////////////////////////
...@@ -185,7 +175,7 @@ func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementI ...@@ -185,7 +175,7 @@ func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementI
var blockPlacementInfo *blockPlacementInfo var blockPlacementInfo *blockPlacementInfo
var err error var err error
if blockBytes, blockPlacementInfo, err = s.currentFileStream.nextBlockBytesAndPlacementInfo(); err != nil { if blockBytes, blockPlacementInfo, err = s.currentFileStream.nextBlockBytesAndPlacementInfo(); err != nil {
logger.Debugf("current file [%d] length of blockbytes [%d]. Err:%s", s.currentFileNum, len(blockBytes), err) logger.Errorf("Error reading next block bytes from file number [%d]: %s", s.currentFileNum, err)
return nil, nil, err return nil, nil, err
} }
logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum) logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum)
......
...@@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0 ...@@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0
package fsblkstorage package fsblkstorage
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"strconv" "strconv"
...@@ -15,6 +14,7 @@ import ( ...@@ -15,6 +14,7 @@ import (
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/common"
"github.com/pkg/errors"
) )
// constructCheckpointInfoFromBlockFiles scans the last blockfile (if any) and construct the checkpoint info // constructCheckpointInfoFromBlockFiles scans the last blockfile (if any) and construct the checkpoint info
...@@ -45,7 +45,7 @@ func constructCheckpointInfoFromBlockFiles(rootDir string) (*checkpointInfo, err ...@@ -45,7 +45,7 @@ func constructCheckpointInfoFromBlockFiles(rootDir string) (*checkpointInfo, err
fileInfo := getFileInfoOrPanic(rootDir, lastFileNum) fileInfo := getFileInfoOrPanic(rootDir, lastFileNum)
logger.Debugf("Last Block file info: FileName=[%s], FileSize=[%d]", fileInfo.Name(), fileInfo.Size()) logger.Debugf("Last Block file info: FileName=[%s], FileSize=[%d]", fileInfo.Name(), fileInfo.Size())
if lastBlockBytes, endOffsetLastBlock, numBlocksInFile, err = scanForLastCompleteBlock(rootDir, lastFileNum, 0); err != nil { if lastBlockBytes, endOffsetLastBlock, numBlocksInFile, err = scanForLastCompleteBlock(rootDir, lastFileNum, 0); err != nil {
logger.Errorf("Error while scanning last file [file num=%d]: %s", lastFileNum, err) logger.Errorf("Error scanning last file [num=%d]: %s", lastFileNum, err)
return nil, err return nil, err
} }
...@@ -54,14 +54,14 @@ func constructCheckpointInfoFromBlockFiles(rootDir string) (*checkpointInfo, err ...@@ -54,14 +54,14 @@ func constructCheckpointInfoFromBlockFiles(rootDir string) (*checkpointInfo, err
fileInfo := getFileInfoOrPanic(rootDir, secondLastFileNum) fileInfo := getFileInfoOrPanic(rootDir, secondLastFileNum)
logger.Debugf("Second last Block file info: FileName=[%s], FileSize=[%d]", fileInfo.Name(), fileInfo.Size()) logger.Debugf("Second last Block file info: FileName=[%s], FileSize=[%d]", fileInfo.Name(), fileInfo.Size())
if lastBlockBytes, _, _, err = scanForLastCompleteBlock(rootDir, secondLastFileNum, 0); err != nil { if lastBlockBytes, _, _, err = scanForLastCompleteBlock(rootDir, secondLastFileNum, 0); err != nil {
logger.Errorf("Error while scanning second last file [file num=%d]: %s", secondLastFileNum, err) logger.Errorf("Error scanning second last file [num=%d]: %s", secondLastFileNum, err)
return nil, err return nil, err
} }
} }
if lastBlockBytes != nil { if lastBlockBytes != nil {
if lastBlock, err = deserializeBlock(lastBlockBytes); err != nil { if lastBlock, err = deserializeBlock(lastBlockBytes); err != nil {
logger.Errorf("Error deserializing last block: %s. Block bytes length = %d", err, len(lastBlockBytes)) logger.Errorf("Error deserializing last block: %s. Block bytes length: %d", err, len(lastBlockBytes))
return nil, err return nil, err
} }
lastBlockNumber = lastBlock.Header.Number lastBlockNumber = lastBlock.Header.Number
...@@ -82,7 +82,7 @@ func retrieveLastFileSuffix(rootDir string) (int, error) { ...@@ -82,7 +82,7 @@ func retrieveLastFileSuffix(rootDir string) (int, error) {
biggestFileNum := -1 biggestFileNum := -1
filesInfo, err := ioutil.ReadDir(rootDir) filesInfo, err := ioutil.ReadDir(rootDir)
if err != nil { if err != nil {
return -1, err return -1, errors.Wrapf(err, "error reading dir %s", rootDir)
} }
for _, fileInfo := range filesInfo { for _, fileInfo := range filesInfo {
name := fileInfo.Name() name := fileInfo.Name()
...@@ -111,7 +111,7 @@ func getFileInfoOrPanic(rootDir string, fileNum int) os.FileInfo { ...@@ -111,7 +111,7 @@ func getFileInfoOrPanic(rootDir string, fileNum int) os.FileInfo {
filePath := deriveBlockfilePath(rootDir, fileNum) filePath := deriveBlockfilePath(rootDir, fileNum)
fileInfo, err := os.Lstat(filePath) fileInfo, err := os.Lstat(filePath)
if err != nil { if err != nil {
panic(fmt.Errorf("Error in retrieving file info for file num = %d", fileNum)) panic(errors.Wrapf(err, "error retrieving file info for file number %d", fileNum))
} }
return fileInfo return fileInfo
} }
/* /*
Copyright IBM Corp. 2016 All Rights Reserved. Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); SPDX-License-Identifier: Apache-2.0
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/ */
package fsblkstorage package fsblkstorage
...@@ -32,6 +22,7 @@ import ( ...@@ -32,6 +22,7 @@ import (
"github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer" "github.com/hyperledger/fabric/protos/peer"
putil "github.com/hyperledger/fabric/protos/utils" putil "github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
) )
var logger = flogging.MustGetLogger("fsblkstorage") var logger = flogging.MustGetLogger("fsblkstorage")
...@@ -102,7 +93,7 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, ...@@ -102,7 +93,7 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
rootDir := conf.getLedgerBlockDir(id) rootDir := conf.getLedgerBlockDir(id)
_, err := util.CreateDirIfMissing(rootDir) _, err := util.CreateDirIfMissing(rootDir)
if err != nil { if err != nil {
panic(fmt.Sprintf("Error: %s", err)) panic(fmt.Sprintf("Error creating block storage root dir [%s]: %s", rootDir, err))
} }
// Instantiate the manager, i.e. blockFileMgr structure // Instantiate the manager, i.e. blockFileMgr structure
mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore} mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore}
...@@ -248,19 +239,17 @@ func (mgr *blockfileMgr) moveToNextFile() { ...@@ -248,19 +239,17 @@ func (mgr *blockfileMgr) moveToNextFile() {
func (mgr *blockfileMgr) addBlock(block *common.Block) error { func (mgr *blockfileMgr) addBlock(block *common.Block) error {
if block.Header.Number != mgr.getBlockchainInfo().Height { if block.Header.Number != mgr.getBlockchainInfo().Height {
return fmt.Errorf("Block number should have been %d but was %d", mgr.getBlockchainInfo().Height, block.Header.Number) return errors.Errorf("block number should have been %d but was %d", mgr.getBlockchainInfo().Height, block.Header.Number)
} }
blockBytes, info, err := serializeBlock(block) blockBytes, info, err := serializeBlock(block)
if err != nil { if err != nil {
return fmt.Errorf("Error while serializing block: %s", err) return errors.WithMessage(err, "error serializing block")
} }
blockHash := block.Header.Hash() blockHash := block.Header.Hash()
//Get the location / offset where each transaction starts in the block and where the block ends //Get the location / offset where each transaction starts in the block and where the block ends
txOffsets := info.txOffsets txOffsets := info.txOffsets
currentOffset := mgr.cpInfo.latestFileChunksize currentOffset := mgr.cpInfo.latestFileChunksize
if err != nil {
return fmt.Errorf("Error while serializing block: %s", err)
}
blockBytesLen := len(blockBytes) blockBytesLen := len(blockBytes)
blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen)) blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))
totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen) totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)
...@@ -282,7 +271,7 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error { ...@@ -282,7 +271,7 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error {
if truncateErr != nil { if truncateErr != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error during block append: %s", err)) panic(fmt.Sprintf("Could not truncate current file to known size after an error during block append: %s", err))
} }
return fmt.Errorf("Error while appending block to file: %s", err) return errors.WithMessage(err, "error appending block to file")
} }
//Update the checkpoint info with the results of adding the new block //Update the checkpoint info with the results of adding the new block
...@@ -298,7 +287,7 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error { ...@@ -298,7 +287,7 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error {
if truncateErr != nil { if truncateErr != nil {
panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err)) panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err))
} }
return fmt.Errorf("Error while saving current file info to db: %s", err) return errors.WithMessage(err, "error saving current file info to db")
} }
//Index block file location pointer updated with file suffex and offset for the new block //Index block file location pointer updated with file suffex and offset for the new block
...@@ -375,7 +364,7 @@ func (mgr *blockfileMgr) syncIndex() error { ...@@ -375,7 +364,7 @@ func (mgr *blockfileMgr) syncIndex() error {
return err return err
} }
if blockBytes == nil { if blockBytes == nil {
return fmt.Errorf("block bytes for block num = [%d] should not be nil here. The indexes for the block are already present", return errors.Errorf("block bytes for block num = [%d] should not be nil here. The indexes for the block are already present",
lastBlockIndexed) lastBlockIndexed)
} }
} }
......
/* /*
Copyright IBM Corp. 2016 All Rights Reserved. Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); SPDX-License-Identifier: Apache-2.0
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/ */
package fsblkstorage package fsblkstorage
import ( import (
"os" "os"
"github.com/pkg/errors"
) )
//// WRITER //// //// WRITER ////
...@@ -56,14 +48,14 @@ func (w *blockfileWriter) append(b []byte, sync bool) error { ...@@ -56,14 +48,14 @@ func (w *blockfileWriter) append(b []byte, sync bool) error {
func (w *blockfileWriter) open() error { func (w *blockfileWriter) open() error {
file, err := os.OpenFile(w.filePath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660) file, err := os.OpenFile(w.filePath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
if err != nil { if err != nil {
return err return errors.Wrapf(err, "error opening block file writer for file %s", w.filePath)
} }
w.file = file w.file = file
return nil return nil
} }
func (w *blockfileWriter) close() error { func (w *blockfileWriter) close() error {
return w.file.Close() return errors.WithStack(w.file.Close())
} }
//// READER //// //// READER ////
...@@ -74,7 +66,7 @@ type blockfileReader struct { ...@@ -74,7 +66,7 @@ type blockfileReader struct {
func newBlockfileReader(filePath string) (*blockfileReader, error) { func newBlockfileReader(filePath string) (*blockfileReader, error) {
file, err := os.OpenFile(filePath, os.O_RDONLY, 0600) file, err := os.OpenFile(filePath, os.O_RDONLY, 0600)
if err != nil { if err != nil {
return nil, err return nil, errors.Wrapf(err, "error opening block file reader for file %s", filePath)
} }
reader := &blockfileReader{file} reader := &blockfileReader{file}
return reader, nil return reader, nil
...@@ -84,11 +76,11 @@ func (r *blockfileReader) read(offset int, length int) ([]byte, error) { ...@@ -84,11 +76,11 @@ func (r *blockfileReader) read(offset int, length int) ([]byte, error) {
b := make([]byte, length) b := make([]byte, length)
_, err := r.file.ReadAt(b, int64(offset)) _, err := r.file.ReadAt(b, int64(offset))
if err != nil { if err != nil {
return nil, err return nil, errors.Wrapf(err, "error reading block file for offset %d and length %d", offset, length)
} }
return b, nil return b, nil
} }
func (r *blockfileReader) close() error { func (r *blockfileReader) close() error {
return r.file.Close() return errors.WithStack(r.file.Close())
} }
/* /*
Copyright IBM Corp. 2016 All Rights Reserved. Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"); SPDX-License-Identifier: Apache-2.0
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/ */
package fsblkstorage package fsblkstorage
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
...@@ -28,6 +17,7 @@ import ( ...@@ -28,6 +17,7 @@ import (
ledgerUtil "github.com/hyperledger/fabric/core/ledger/util" ledgerUtil "github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer" "github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
) )
const ( const (
...@@ -79,7 +69,7 @@ func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHand ...@@ -79,7 +69,7 @@ func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHand
// for efficiency purpose - [FAB-10587] // for efficiency purpose - [FAB-10587]
if (indexItemsMap[blkstorage.IndexableAttrTxValidationCode] || indexItemsMap[blkstorage.IndexableAttrBlockTxID]) && if (indexItemsMap[blkstorage.IndexableAttrTxValidationCode] || indexItemsMap[blkstorage.IndexableAttrBlockTxID]) &&
!indexItemsMap[blkstorage.IndexableAttrTxID] { !indexItemsMap[blkstorage.IndexableAttrTxID] {
return nil, fmt.Errorf("dependent index [%s] is not enabled for [%s] or [%s]", return nil, errors.Errorf("dependent index [%s] is not enabled for [%s] or [%s]",
blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrTxValidationCode, blkstorage.IndexableAttrBlockTxID) blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrTxValidationCode, blkstorage.IndexableAttrBlockTxID)
} }
return &blockIndex{indexItemsMap, db}, nil return &blockIndex{indexItemsMap, db}, nil
...@@ -126,8 +116,8 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error { ...@@ -126,8 +116,8 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
//Index3 Used to find a transaction by it's transaction id //Index3 Used to find a transaction by it's transaction id
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok {