Commit 130ad7c7 authored by manish's avatar manish
Browse files

Block stream across files



This commit allows a block stream consumer read across multiple files.

Change-Id: I1bd053e8d022bf1daf01f61f9e3e1ae1629e334b
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent 9c2ecfc2
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fsblkstorage
import (
"bufio"
"errors"
"fmt"
"io"
"os"
"github.com/golang/protobuf/proto"
)
// ErrUnexpectedEndOfBlockfile error used to indicate an unexpected end of a file segment
// this can happen mainly if a crash occurs during appening a block and partial block contents
// get written towards the end of the file
var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile")
// blockfileStream reads blocks sequentially from a single file.
// It starts from the given offset and can traverse till the end of the file
type blockfileStream struct {
file *os.File
reader *bufio.Reader
currentOffset int64
}
// blockStream reads blocks sequentially from multiple files.
// it starts from a given file offset and continues with the next
// file segment until the end of the last segment (`endFileNum`)
type blockStream struct {
rootDir string
currentFileNum int
endFileNum int
currentFileStream *blockfileStream
}
///////////////////////////////////
// blockfileStream functions
////////////////////////////////////
func newBlockfileStream(filePath string, startOffset int64) (*blockfileStream, error) {
logger.Debugf("newBlockfileStream(): filePath=[%s], startOffset=[%d]", filePath, startOffset)
var file *os.File
var err error
if file, err = os.OpenFile(filePath, os.O_RDONLY, 0600); err != nil {
return nil, err
}
var newPosition int64
if newPosition, err = file.Seek(startOffset, 0); err != nil {
// file.Seek does not raise an error - simply seeks to the new position
return nil, err
}
if newPosition != startOffset {
panic(fmt.Sprintf("Could not seek file [%s] to given startOffset [%d]. New position = [%d]",
filePath, startOffset, newPosition))
}
s := &blockfileStream{file, bufio.NewReader(file), startOffset}
return s, nil
}
func (s *blockfileStream) nextBlockBytes() ([]byte, error) {
var lenBytes []byte
var err error
if lenBytes, err = s.reader.Peek(8); err != nil {
// reader.Peek raises io.EOF error if enough bytes not available
if err == io.EOF {
if len(lenBytes) > 0 {
return nil, ErrUnexpectedEndOfBlockfile
}
return nil, nil
}
return nil, err
}
len, n := proto.DecodeVarint(lenBytes)
if n == 0 {
panic(fmt.Errorf("Error in decoding varint bytes"))
}
if _, err = s.reader.Discard(n); err != nil {
return nil, err
}
blockBytes := make([]byte, len)
if _, err = io.ReadAtLeast(s.reader, blockBytes, int(len)); err != nil {
// io.ReadAtLeast raises io.ErrUnexpectedEOF error if it is able to
// read a fewer (non-zero) bytes and io.EOF is encountered
if err == io.ErrUnexpectedEOF {
return nil, ErrUnexpectedEndOfBlockfile
}
return nil, err
}
s.currentOffset += int64(n) + int64(len)
return blockBytes, nil
}
func (s *blockfileStream) close() error {
return s.file.Close()
}
///////////////////////////////////
// blockStream functions
////////////////////////////////////
func newBlockStream(rootDir string, startFileNum int, startOffset int64, endFileNum int) (*blockStream, error) {
startFile := deriveBlockfilePath(rootDir, startFileNum)
startFileStream, err := newBlockfileStream(startFile, startOffset)
if err != nil {
return nil, err
}
return &blockStream{rootDir, startFileNum, endFileNum, startFileStream}, nil
}
func (s *blockStream) moveToNextBlockfileStream() error {
var err error
if err = s.currentFileStream.close(); err != nil {
return err
}
s.currentFileNum++
nextFile := deriveBlockfilePath(s.rootDir, s.currentFileNum)
if s.currentFileStream, err = newBlockfileStream(nextFile, 0); err != nil {
return err
}
return nil
}
func (s *blockStream) nextBlockBytes() ([]byte, error) {
var blockBytes []byte
var err error
if blockBytes, err = s.currentFileStream.nextBlockBytes(); err != nil {
logger.Debugf("current file [%d]", s.currentFileNum)
logger.Debugf("blockbytes [%d]. Err:%s", len(blockBytes), err)
return nil, err
}
logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum)
if blockBytes == nil && s.currentFileNum < s.endFileNum {
logger.Debugf("current file [%d] exhausted. Moving to next file", s.currentFileNum)
if err = s.moveToNextBlockfileStream(); err != nil {
return nil, err
}
return s.nextBlockBytes()
}
return blockBytes, nil
}
func (s *blockStream) close() error {
return s.currentFileStream.close()
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fsblkstorage
import (
"testing"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledgernext/testutil"
)
func TestBlockfileStream(t *testing.T) {
testBlockfileStream(t, 0)
testBlockfileStream(t, 1)
testBlockfileStream(t, 10)
}
func testBlockfileStream(t *testing.T, numBlocks int) {
env := newTestEnv(t)
defer env.Cleanup()
w := newTestBlockfileWrapper(t, env)
blockfileMgr := w.blockfileMgr
blocks := testutil.ConstructTestBlocks(t, numBlocks)
w.addBlocks(blocks)
w.close()
s, err := newBlockfileStream(deriveBlockfilePath(blockfileMgr.rootDir, 0), 0)
defer s.close()
testutil.AssertNoError(t, err, "Error in constructing blockfile stream")
blockCount := 0
for {
blockBytes, err := s.nextBlockBytes()
testutil.AssertNoError(t, err, "Error in getting next block")
if blockBytes == nil {
break
}
blockCount++
}
// After the stream has been exhausted, both blockBytes and err should be nil
blockBytes, err := s.nextBlockBytes()
testutil.AssertNil(t, blockBytes)
testutil.AssertNoError(t, err, "Error in getting next block after exhausting the file")
testutil.AssertEquals(t, blockCount, numBlocks)
}
func TestBlockFileStreamUnexpectedEOF(t *testing.T) {
partialBlockBytes := []byte{}
dummyBlockBytes := testutil.ConstructRandomBytes(t, 100)
lenBytes := proto.EncodeVarint(uint64(len(dummyBlockBytes)))
partialBlockBytes = append(partialBlockBytes, lenBytes...)
partialBlockBytes = append(partialBlockBytes, dummyBlockBytes...)
testBlockFileStreamUnexpectedEOF(t, 10, partialBlockBytes[:1])
testBlockFileStreamUnexpectedEOF(t, 10, partialBlockBytes[:2])
testBlockFileStreamUnexpectedEOF(t, 10, partialBlockBytes[:5])
testBlockFileStreamUnexpectedEOF(t, 10, partialBlockBytes[:20])
}
func testBlockFileStreamUnexpectedEOF(t *testing.T, numBlocks int, partialBlockBytes []byte) {
env := newTestEnv(t)
defer env.Cleanup()
w := newTestBlockfileWrapper(t, env)
blockfileMgr := w.blockfileMgr
blocks := testutil.ConstructTestBlocks(t, numBlocks)
w.addBlocks(blocks)
blockfileMgr.currentFileWriter.append(partialBlockBytes, true)
w.close()
s, err := newBlockfileStream(deriveBlockfilePath(blockfileMgr.rootDir, 0), 0)
defer s.close()
testutil.AssertNoError(t, err, "Error in constructing blockfile stream")
for i := 0; i < numBlocks; i++ {
blockBytes, err := s.nextBlockBytes()
testutil.AssertNotNil(t, blockBytes)
testutil.AssertNoError(t, err, "Error in getting next block")
}
blockBytes, err := s.nextBlockBytes()
testutil.AssertNil(t, blockBytes)
testutil.AssertSame(t, err, ErrUnexpectedEndOfBlockfile)
}
func TestBlockStream(t *testing.T) {
testBlockStream(t, 1)
testBlockStream(t, 2)
testBlockStream(t, 10)
}
func testBlockStream(t *testing.T, numFiles int) {
env := newTestEnv(t)
defer env.Cleanup()
w := newTestBlockfileWrapper(t, env)
defer w.close()
blockfileMgr := w.blockfileMgr
numBlocksInEachFile := 10
for i := 0; i < numFiles; i++ {
blocks := testutil.ConstructTestBlocks(t, numBlocksInEachFile)
w.addBlocks(blocks)
blockfileMgr.moveToNextFile()
}
s, err := newBlockStream(blockfileMgr.rootDir, 0, 0, numFiles-1)
defer s.close()
testutil.AssertNoError(t, err, "Error in constructing new block stream")
blockCount := 0
for {
blockBytes, err := s.nextBlockBytes()
testutil.AssertNoError(t, err, "Error in getting next block")
if blockBytes == nil {
break
}
blockCount++
}
// After the stream has been exhausted, both blockBytes and err should be nil
blockBytes, err := s.nextBlockBytes()
testutil.AssertNil(t, blockBytes)
testutil.AssertNoError(t, err, "Error in getting next block after exhausting the file")
testutil.AssertEquals(t, blockCount, numFiles*numBlocksInEachFile)
}
......@@ -269,10 +269,9 @@ func (mgr *blockfileMgr) retrieveBlocks(startNum uint64, endNum uint64) (*Blocks
if lp, err = mgr.index.getBlockLocByBlockNum(startNum); err != nil {
return nil, err
}
filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)
var stream *blockStream
if stream, err = newBlockStream(filePath, int64(lp.offset)); err != nil {
if stream, err = newBlockStream(mgr.rootDir, lp.fileSuffixNum,
int64(lp.offset), mgr.cpInfo.latestFileChunkSuffixNum); err != nil {
return nil, err
}
return newBlockItr(stream, int(endNum-startNum)+1), nil
......@@ -322,7 +321,7 @@ func (mgr *blockfileMgr) fetchTransaction(lp *fileLocPointer) (*protos.Transacti
func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) {
filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)
stream, err := newBlockStream(filePath, int64(lp.offset))
stream, err := newBlockfileStream(filePath, int64(lp.offset))
if err != nil {
return nil, err
}
......@@ -382,23 +381,23 @@ func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo, flush bool) error {
func scanForLastCompleteBlock(filePath string, startingOffset int64) (int64, int, error) {
logger.Debugf("scanForLastCompleteBlock(): filePath=[%s], startingOffset=[%d]", filePath, startingOffset)
numBlocks := 0
blockStream, err := newBlockStream(filePath, startingOffset)
blockStream, err := newBlockfileStream(filePath, startingOffset)
if err != nil {
return 0, 0, err
}
defer blockStream.close()
for {
blockBytes, err := blockStream.nextBlockBytes()
if blockBytes == nil || err != nil {
if blockBytes == nil || err == ErrUnexpectedEndOfBlockfile {
logger.Debugf(`scanForLastCompleteBlock(): error=[%s].
This may happen if a crash has happened during block appending.
The error may happen if a crash has happened during block appending.
Returning current offset as a last complete block's end offset`, err)
break
}
numBlocks++
}
logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentFileOffset)
return blockStream.currentFileOffset, numBlocks, err
logger.Debugf("scanForLastCompleteBlock(): last complete block ends at offset=[%d]", blockStream.currentOffset)
return blockStream.currentOffset, numBlocks, err
}
// checkpointInfo
......
......@@ -17,12 +17,7 @@ limitations under the License.
package fsblkstorage
import (
"bufio"
"fmt"
"io"
"os"
"github.com/golang/protobuf/proto"
)
//// WRITER ////
......@@ -97,49 +92,3 @@ func (r *blockfileReader) read(offset int, length int) ([]byte, error) {
func (r *blockfileReader) close() error {
return r.file.Close()
}
type blockStream struct {
file *os.File
reader *bufio.Reader
currentFileOffset int64
}
func newBlockStream(filePath string, offset int64) (*blockStream, error) {
var file *os.File
var err error
if file, err = os.OpenFile(filePath, os.O_RDONLY, 0600); err != nil {
return nil, err
}
var newPosition int64
if newPosition, err = file.Seek(offset, 0); err != nil {
return nil, err
}
if newPosition != offset {
panic(fmt.Sprintf("Could not seek file [%s] to given offset [%d]. New position = [%d]", filePath, offset, newPosition))
}
s := &blockStream{file, bufio.NewReader(file), offset}
return s, nil
}
func (s *blockStream) nextBlockBytes() ([]byte, error) {
lenBytes, err := s.reader.Peek(8)
if err == io.EOF {
logger.Debugf("block stream reached end of file. Returning next block as nil")
return nil, nil
}
len, n := proto.DecodeVarint(lenBytes)
if _, err = s.reader.Discard(n); err != nil {
return nil, err
}
blockBytes := make([]byte, len)
if _, err = io.ReadAtLeast(s.reader, blockBytes, int(len)); err != nil {
return nil, err
}
s.currentFileOffset += int64(n) + int64(len)
return blockBytes, nil
}
func (s *blockStream) close() error {
return s.file.Close()
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment