Commit 51963595 authored by manish's avatar manish
Browse files

This commit fixes the bug reported at FAB-903

https://jira.hyperledger.org/browse/FAB-903



The bug was caused by an assumption in the code -
that the minimum size of a block would be 8 bytes.
This assumption is broken during testing when security is off
and a block does not include any transaction

Change-Id: Ib48ea3da2a661fbd8e4122573bac938c8b35f3d1
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent 18a44d0b
......@@ -87,40 +87,65 @@ func (s *blockfileStream) nextBlockBytes() ([]byte, error) {
return blockBytes, err
}
// nextBlockBytesAndPlacementInfo returns bytes for the next block
// along with the offset information in the block file.
// An error `ErrUnexpectedEndOfBlockfile` is returned if a partial written data is detected
// which is possible towards the tail of the file if a crash had taken place during appending of a block
func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) {
var lenBytes []byte
var err error
if lenBytes, err = s.reader.Peek(8); err != nil {
// reader.Peek raises io.EOF error if enough bytes not available
if err == io.EOF {
if len(lenBytes) > 0 {
return nil, nil, ErrUnexpectedEndOfBlockfile
}
return nil, nil, nil
}
var fileInfo os.FileInfo
moreContentAvailable := true
if fileInfo, err = s.file.Stat(); err != nil {
return nil, nil, err
}
len, n := proto.DecodeVarint(lenBytes)
if n == 0 {
panic(fmt.Errorf("Error in decoding varint bytes"))
if s.currentOffset == fileInfo.Size() {
logger.Debugf("Finished reading file number [%d]", s.fileNum)
return nil, nil, nil
}
if _, err = s.reader.Discard(n); err != nil {
remainingBytes := fileInfo.Size() - s.currentOffset
// Peek 8 or smaller number of bytes (if remaining bytes are less than 8)
// Assumption is that a block size would be small enough to be represented in 8 bytes varint
peekBytes := 8
if remainingBytes < int64(peekBytes) {
peekBytes = int(remainingBytes)
moreContentAvailable = false
}
logger.Debugf("Remaining bytes=[%d], Going to peek [%d] bytes", remainingBytes, peekBytes)
if lenBytes, err = s.reader.Peek(peekBytes); err != nil {
return nil, nil, err
}
blockBytes := make([]byte, len)
if _, err = io.ReadAtLeast(s.reader, blockBytes, int(len)); err != nil {
// io.ReadAtLeast raises io.ErrUnexpectedEOF error if it is able to
// read a fewer (non-zero) bytes and io.EOF is encountered
if err == io.ErrUnexpectedEOF {
length, n := proto.DecodeVarint(lenBytes)
if n == 0 {
// proto.DecodeVarint did not consume any byte at all which means that the bytes
// representing the size of the block are partial bytes
if !moreContentAvailable {
return nil, nil, ErrUnexpectedEndOfBlockfile
}
panic(fmt.Errorf("Error in decoding varint bytes [%#v]", lenBytes))
}
bytesExpected := int64(n) + int64(length)
if bytesExpected > remainingBytes {
logger.Debugf("At least [%d] bytes expected. Remaining bytes = [%d]. Returning with error [%s]",
bytesExpected, remainingBytes, ErrUnexpectedEndOfBlockfile)
return nil, nil, ErrUnexpectedEndOfBlockfile
}
// skip the bytes representing the block size
if _, err = s.reader.Discard(n); err != nil {
return nil, nil, err
}
blockBytes := make([]byte, length)
if _, err = io.ReadAtLeast(s.reader, blockBytes, int(length)); err != nil {
logger.Debugf("Error while trying to read [%d] bytes from fileNum [%d]: %s", length, s.fileNum, err)
return nil, nil, err
}
blockPlacementInfo := &blockPlacementInfo{
fileNum: s.fileNum,
blockStartOffset: s.currentOffset,
blockBytesOffset: s.currentOffset + int64(n)}
s.currentOffset += int64(n) + int64(len)
s.currentOffset += int64(n) + int64(length)
logger.Debugf("Returning blockbytes - length=[%d], placementInfo={%s}", len(blockBytes), blockPlacementInfo)
return blockBytes, blockPlacementInfo, nil
}
......@@ -179,3 +204,8 @@ func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementI
func (s *blockStream) close() error {
return s.currentFileStream.close()
}
func (i *blockPlacementInfo) String() string {
return fmt.Sprintf("fileNum=[%d], startOffset=[%d], bytesOffset=[%d]",
i.fileNum, i.blockStartOffset, i.blockBytesOffset)
}
......@@ -449,25 +449,32 @@ func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, flush bool) error {
return nil
}
// scanForLastCompleteBlock scan a given block file and detects the last offset in the file
// after which there may lie a block partially written (towards the end of the file in a crash scenario).
func scanForLastCompleteBlock(rootDir string, fileNum int, startingOffset int64) (int64, int, error) {
numBlocks := 0
blockStream, err := newBlockfileStream(rootDir, fileNum, startingOffset)
if err != nil {
return 0, 0, err
blockStream, errOpen := newBlockfileStream(rootDir, fileNum, startingOffset)
if errOpen != nil {
return 0, 0, errOpen
}
defer blockStream.close()
var errRead error
var blockBytes []byte
for {
blockBytes, err := blockStream.nextBlockBytes()
if blockBytes == nil || err == ErrUnexpectedEndOfBlockfile {
logger.Debugf(`scanForLastCompleteBlock(): error=[%s].
The error may happen if a crash has happened during block appending.
Returning current offset as a last complete block's end offset`, err)
blockBytes, errRead = blockStream.nextBlockBytes()
if blockBytes == nil || errRead != nil {
break
}
numBlocks++
}
if errRead == ErrUnexpectedEndOfBlockfile {
logger.Debugf(`Error:%s
The error may happen if a crash has happened during block appending.
Resetting error to nil and returning current offset as a last complete block's end offset`, errRead)
errRead = nil
}
logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentOffset)
return blockStream.currentOffset, numBlocks, err
return blockStream.currentOffset, numBlocks, errRead
}
// checkpointInfo
......
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fsblkstorage
import (
"os"
"testing"
"github.com/hyperledger/fabric/core/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos"
)
func TestBlockFileScanSmallTxOnly(t *testing.T) {
env := newTestEnv(t)
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blocks := []*protos.Block2{}
blocks = append(blocks, testutil.ConstructTestBlock(t, 0, 0, 0))
blocks = append(blocks, testutil.ConstructTestBlock(t, 0, 0, 0))
blocks = append(blocks, testutil.ConstructTestBlock(t, 0, 0, 0))
blkfileMgrWrapper.addBlocks(blocks)
blkfileMgrWrapper.close()
filePath := deriveBlockfilePath(env.conf.blockfilesDir, 0)
_, fileSize, err := util.FileExists(filePath)
testutil.AssertNoError(t, err, "")
endOffsetLastBlock, numBlocks, err := scanForLastCompleteBlock(env.conf.blockfilesDir, 0, 0)
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, numBlocks, len(blocks))
testutil.AssertEquals(t, endOffsetLastBlock, fileSize)
}
func TestBlockFileScanSmallTxLastTxIncomplete(t *testing.T) {
env := newTestEnv(t)
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
blocks := []*protos.Block2{}
blocks = append(blocks, testutil.ConstructTestBlock(t, 0, 0, 0))
blocks = append(blocks, testutil.ConstructTestBlock(t, 0, 0, 0))
blocks = append(blocks, testutil.ConstructTestBlock(t, 0, 0, 0))
blkfileMgrWrapper.addBlocks(blocks)
blkfileMgrWrapper.close()
filePath := deriveBlockfilePath(env.conf.blockfilesDir, 0)
_, fileSize, err := util.FileExists(filePath)
testutil.AssertNoError(t, err, "")
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
defer file.Close()
testutil.AssertNoError(t, err, "")
err = file.Truncate(fileSize - 1)
testutil.AssertNoError(t, err, "")
_, numBlocks, err := scanForLastCompleteBlock(env.conf.blockfilesDir, 0, 0)
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, numBlocks, len(blocks)-1)
}
......@@ -38,16 +38,16 @@ func ConstructBlockForSimulationResults(t *testing.T, simulationResults [][]byte
func ConstructTestBlocks(t *testing.T, numBlocks int) []*protos.Block2 {
blocks := []*protos.Block2{}
for i := 0; i < numBlocks; i++ {
blocks = append(blocks, ConstructTestBlock(t, 10, i*10))
blocks = append(blocks, ConstructTestBlock(t, 10, 100, i*10))
}
return blocks
}
// ConstructTestBlock constructs a block with 'numTx' number of transactions for testing
func ConstructTestBlock(t *testing.T, numTx int, startingTxID int) *protos.Block2 {
func ConstructTestBlock(t *testing.T, numTx int, txSize int, startingTxID int) *protos.Block2 {
txs := []*protos.Transaction2{}
for i := startingTxID; i < numTx+startingTxID; i++ {
tx, _ := putils.CreateTx(protos.Header_CHAINCODE, []byte{}, []byte{}, ConstructRandomBytes(t, 100), []*protos.Endorsement{})
tx, _ := putils.CreateTx(protos.Header_CHAINCODE, []byte{}, []byte{}, ConstructRandomBytes(t, txSize), []*protos.Endorsement{})
txs = append(txs, tx)
}
return newBlock(txs)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment