block_stream.go 7.15 KB
Newer Older
manish's avatar
manish committed
1
/*
2
Copyright IBM Corp. All Rights Reserved.
manish's avatar
manish committed
3

4
SPDX-License-Identifier: Apache-2.0
manish's avatar
manish committed
5
6
7
8
9
10
11
12
13
14
15
*/

package fsblkstorage

import (
	"bufio"
	"fmt"
	"io"
	"os"

	"github.com/golang/protobuf/proto"
16
	"github.com/pkg/errors"
manish's avatar
manish committed
17
18
19
20
21
22
23
24
25
26
)

// ErrUnexpectedEndOfBlockfile error used to indicate an unexpected end of a file segment
// this can happen mainly if a crash occurs during appening a block and partial block contents
// get written towards the end of the file
var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile")

// blockfileStream reads blocks sequentially from a single file.
// It starts from the given offset and can traverse till the end of the file
type blockfileStream struct {
manish's avatar
manish committed
27
	fileNum       int
manish's avatar
manish committed
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
	file          *os.File
	reader        *bufio.Reader
	currentOffset int64
}

// blockStream reads blocks sequentially from multiple files.
// it starts from a given file offset and continues with the next
// file segment until the end of the last segment (`endFileNum`)
type blockStream struct {
	rootDir           string
	currentFileNum    int
	endFileNum        int
	currentFileStream *blockfileStream
}

manish's avatar
manish committed
43
44
45
46
47
48
49
50
// blockPlacementInfo captures the information related
// to block's placement in the file.
type blockPlacementInfo struct {
	fileNum          int
	blockStartOffset int64
	blockBytesOffset int64
}

manish's avatar
manish committed
51
52
53
///////////////////////////////////
// blockfileStream functions
////////////////////////////////////
manish's avatar
manish committed
54
55
func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockfileStream, error) {
	filePath := deriveBlockfilePath(rootDir, fileNum)
manish's avatar
manish committed
56
57
58
59
	logger.Debugf("newBlockfileStream(): filePath=[%s], startOffset=[%d]", filePath, startOffset)
	var file *os.File
	var err error
	if file, err = os.OpenFile(filePath, os.O_RDONLY, 0600); err != nil {
60
		return nil, errors.Wrapf(err, "error opening block file %s", filePath)
manish's avatar
manish committed
61
62
63
	}
	var newPosition int64
	if newPosition, err = file.Seek(startOffset, 0); err != nil {
64
		return nil, errors.Wrapf(err, "error seeking block file [%s] to startOffset [%d]", filePath, startOffset)
manish's avatar
manish committed
65
66
	}
	if newPosition != startOffset {
67
		panic(fmt.Sprintf("Could not seek block file [%s] to startOffset [%d]. New position = [%d]",
manish's avatar
manish committed
68
69
			filePath, startOffset, newPosition))
	}
manish's avatar
manish committed
70
	s := &blockfileStream{fileNum, file, bufio.NewReader(file), startOffset}
manish's avatar
manish committed
71
72
73
74
	return s, nil
}

func (s *blockfileStream) nextBlockBytes() ([]byte, error) {
manish's avatar
manish committed
75
76
77
78
	blockBytes, _, err := s.nextBlockBytesAndPlacementInfo()
	return blockBytes, err
}

79
80
81
82
// nextBlockBytesAndPlacementInfo returns bytes for the next block
// along with the offset information in the block file.
// An error `ErrUnexpectedEndOfBlockfile` is returned if a partial written data is detected
// which is possible towards the tail of the file if a crash had taken place during appending of a block
manish's avatar
manish committed
83
func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) {
manish's avatar
manish committed
84
85
	var lenBytes []byte
	var err error
86
87
88
89
	var fileInfo os.FileInfo
	moreContentAvailable := true

	if fileInfo, err = s.file.Stat(); err != nil {
90
		return nil, nil, errors.Wrapf(err, "error getting block file stat")
manish's avatar
manish committed
91
	}
92
93
94
	if s.currentOffset == fileInfo.Size() {
		logger.Debugf("Finished reading file number [%d]", s.fileNum)
		return nil, nil, nil
manish's avatar
manish committed
95
	}
96
97
98
99
100
101
102
103
104
105
	remainingBytes := fileInfo.Size() - s.currentOffset
	// Peek 8 or smaller number of bytes (if remaining bytes are less than 8)
	// Assumption is that a block size would be small enough to be represented in 8 bytes varint
	peekBytes := 8
	if remainingBytes < int64(peekBytes) {
		peekBytes = int(remainingBytes)
		moreContentAvailable = false
	}
	logger.Debugf("Remaining bytes=[%d], Going to peek [%d] bytes", remainingBytes, peekBytes)
	if lenBytes, err = s.reader.Peek(peekBytes); err != nil {
106
		return nil, nil, errors.Wrapf(err, "error peeking [%d] bytes from block file", peekBytes)
manish's avatar
manish committed
107
	}
108
109
110
111
112
	length, n := proto.DecodeVarint(lenBytes)
	if n == 0 {
		// proto.DecodeVarint did not consume any byte at all which means that the bytes
		// representing the size of the block are partial bytes
		if !moreContentAvailable {
manish's avatar
manish committed
113
			return nil, nil, ErrUnexpectedEndOfBlockfile
manish's avatar
manish committed
114
		}
115
		panic(errors.Errorf("Error in decoding varint bytes [%#v]", lenBytes))
116
117
118
119
120
121
122
123
124
	}
	bytesExpected := int64(n) + int64(length)
	if bytesExpected > remainingBytes {
		logger.Debugf("At least [%d] bytes expected. Remaining bytes = [%d]. Returning with error [%s]",
			bytesExpected, remainingBytes, ErrUnexpectedEndOfBlockfile)
		return nil, nil, ErrUnexpectedEndOfBlockfile
	}
	// skip the bytes representing the block size
	if _, err = s.reader.Discard(n); err != nil {
125
		return nil, nil, errors.Wrapf(err, "error discarding [%d] bytes", n)
126
127
128
	}
	blockBytes := make([]byte, length)
	if _, err = io.ReadAtLeast(s.reader, blockBytes, int(length)); err != nil {
129
130
		logger.Errorf("Error reading [%d] bytes from file number [%d], error: %s", length, s.fileNum, err)
		return nil, nil, errors.Wrapf(err, "error reading [%d] bytes from file number [%d]", length, s.fileNum)
manish's avatar
manish committed
131
	}
manish's avatar
manish committed
132
133
134
135
	blockPlacementInfo := &blockPlacementInfo{
		fileNum:          s.fileNum,
		blockStartOffset: s.currentOffset,
		blockBytesOffset: s.currentOffset + int64(n)}
136
137
	s.currentOffset += int64(n) + int64(length)
	logger.Debugf("Returning blockbytes - length=[%d], placementInfo={%s}", len(blockBytes), blockPlacementInfo)
manish's avatar
manish committed
138
	return blockBytes, blockPlacementInfo, nil
manish's avatar
manish committed
139
140
141
}

func (s *blockfileStream) close() error {
142
	return errors.WithStack(s.file.Close())
manish's avatar
manish committed
143
144
145
146
147
148
}

///////////////////////////////////
// blockStream functions
////////////////////////////////////
func newBlockStream(rootDir string, startFileNum int, startOffset int64, endFileNum int) (*blockStream, error) {
manish's avatar
manish committed
149
	startFileStream, err := newBlockfileStream(rootDir, startFileNum, startOffset)
manish's avatar
manish committed
150
151
152
153
154
155
156
157
158
159
160
161
	if err != nil {
		return nil, err
	}
	return &blockStream{rootDir, startFileNum, endFileNum, startFileStream}, nil
}

func (s *blockStream) moveToNextBlockfileStream() error {
	var err error
	if err = s.currentFileStream.close(); err != nil {
		return err
	}
	s.currentFileNum++
manish's avatar
manish committed
162
	if s.currentFileStream, err = newBlockfileStream(s.rootDir, s.currentFileNum, 0); err != nil {
manish's avatar
manish committed
163
164
165
166
167
168
		return err
	}
	return nil
}

func (s *blockStream) nextBlockBytes() ([]byte, error) {
manish's avatar
manish committed
169
170
171
172
173
	blockBytes, _, err := s.nextBlockBytesAndPlacementInfo()
	return blockBytes, err
}

func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) {
manish's avatar
manish committed
174
	var blockBytes []byte
manish's avatar
manish committed
175
	var blockPlacementInfo *blockPlacementInfo
manish's avatar
manish committed
176
	var err error
manish's avatar
manish committed
177
	if blockBytes, blockPlacementInfo, err = s.currentFileStream.nextBlockBytesAndPlacementInfo(); err != nil {
178
		logger.Errorf("Error reading next block bytes from file number [%d]: %s", s.currentFileNum, err)
manish's avatar
manish committed
179
		return nil, nil, err
manish's avatar
manish committed
180
181
	}
	logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum)
manish's avatar
manish committed
182
	if blockBytes == nil && (s.currentFileNum < s.endFileNum || s.endFileNum < 0) {
manish's avatar
manish committed
183
184
		logger.Debugf("current file [%d] exhausted. Moving to next file", s.currentFileNum)
		if err = s.moveToNextBlockfileStream(); err != nil {
manish's avatar
manish committed
185
			return nil, nil, err
manish's avatar
manish committed
186
		}
manish's avatar
manish committed
187
		return s.nextBlockBytesAndPlacementInfo()
manish's avatar
manish committed
188
	}
manish's avatar
manish committed
189
	return blockBytes, blockPlacementInfo, nil
manish's avatar
manish committed
190
191
192
193
194
}

func (s *blockStream) close() error {
	return s.currentFileStream.close()
}
195
196
197
198
199

func (i *blockPlacementInfo) String() string {
	return fmt.Sprintf("fileNum=[%d], startOffset=[%d], bytesOffset=[%d]",
		i.fileNum, i.blockStartOffset, i.blockBytesOffset)
}