block_stream.go 7.3 KB
Newer Older
manish's avatar
manish committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/*
Copyright IBM Corp. 2016 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fsblkstorage

import (
	"bufio"
	"errors"
	"fmt"
	"io"
	"os"

	"github.com/golang/protobuf/proto"
)

// ErrUnexpectedEndOfBlockfile error used to indicate an unexpected end of a file segment
// this can happen mainly if a crash occurs during appening a block and partial block contents
// get written towards the end of the file
var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile")

// blockfileStream reads blocks sequentially from a single file.
// It starts from the given offset and can traverse till the end of the file
type blockfileStream struct {
manish's avatar
manish committed
37
	fileNum       int
manish's avatar
manish committed
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
	file          *os.File
	reader        *bufio.Reader
	currentOffset int64
}

// blockStream reads blocks sequentially from multiple files.
// it starts from a given file offset and continues with the next
// file segment until the end of the last segment (`endFileNum`)
type blockStream struct {
	rootDir           string
	currentFileNum    int
	endFileNum        int
	currentFileStream *blockfileStream
}

manish's avatar
manish committed
53
54
55
56
57
58
59
60
// blockPlacementInfo captures the information related
// to block's placement in the file.
type blockPlacementInfo struct {
	fileNum          int
	blockStartOffset int64
	blockBytesOffset int64
}

manish's avatar
manish committed
61
62
63
///////////////////////////////////
// blockfileStream functions
////////////////////////////////////
manish's avatar
manish committed
64
65
func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockfileStream, error) {
	filePath := deriveBlockfilePath(rootDir, fileNum)
manish's avatar
manish committed
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
	logger.Debugf("newBlockfileStream(): filePath=[%s], startOffset=[%d]", filePath, startOffset)
	var file *os.File
	var err error
	if file, err = os.OpenFile(filePath, os.O_RDONLY, 0600); err != nil {
		return nil, err
	}
	var newPosition int64
	if newPosition, err = file.Seek(startOffset, 0); err != nil {
		// file.Seek does not raise an error - simply seeks to the new position
		return nil, err
	}
	if newPosition != startOffset {
		panic(fmt.Sprintf("Could not seek file [%s] to given startOffset [%d]. New position = [%d]",
			filePath, startOffset, newPosition))
	}
manish's avatar
manish committed
81
	s := &blockfileStream{fileNum, file, bufio.NewReader(file), startOffset}
manish's avatar
manish committed
82
83
84
85
	return s, nil
}

func (s *blockfileStream) nextBlockBytes() ([]byte, error) {
manish's avatar
manish committed
86
87
88
89
	blockBytes, _, err := s.nextBlockBytesAndPlacementInfo()
	return blockBytes, err
}

90
91
92
93
// nextBlockBytesAndPlacementInfo returns bytes for the next block
// along with the offset information in the block file.
// An error `ErrUnexpectedEndOfBlockfile` is returned if a partial written data is detected
// which is possible towards the tail of the file if a crash had taken place during appending of a block
manish's avatar
manish committed
94
func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) {
manish's avatar
manish committed
95
96
	var lenBytes []byte
	var err error
97
98
99
100
	var fileInfo os.FileInfo
	moreContentAvailable := true

	if fileInfo, err = s.file.Stat(); err != nil {
manish's avatar
manish committed
101
		return nil, nil, err
manish's avatar
manish committed
102
	}
103
104
105
	if s.currentOffset == fileInfo.Size() {
		logger.Debugf("Finished reading file number [%d]", s.fileNum)
		return nil, nil, nil
manish's avatar
manish committed
106
	}
107
108
109
110
111
112
113
114
115
116
	remainingBytes := fileInfo.Size() - s.currentOffset
	// Peek 8 or smaller number of bytes (if remaining bytes are less than 8)
	// Assumption is that a block size would be small enough to be represented in 8 bytes varint
	peekBytes := 8
	if remainingBytes < int64(peekBytes) {
		peekBytes = int(remainingBytes)
		moreContentAvailable = false
	}
	logger.Debugf("Remaining bytes=[%d], Going to peek [%d] bytes", remainingBytes, peekBytes)
	if lenBytes, err = s.reader.Peek(peekBytes); err != nil {
manish's avatar
manish committed
117
		return nil, nil, err
manish's avatar
manish committed
118
	}
119
120
121
122
123
	length, n := proto.DecodeVarint(lenBytes)
	if n == 0 {
		// proto.DecodeVarint did not consume any byte at all which means that the bytes
		// representing the size of the block are partial bytes
		if !moreContentAvailable {
manish's avatar
manish committed
124
			return nil, nil, ErrUnexpectedEndOfBlockfile
manish's avatar
manish committed
125
		}
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
		panic(fmt.Errorf("Error in decoding varint bytes [%#v]", lenBytes))
	}
	bytesExpected := int64(n) + int64(length)
	if bytesExpected > remainingBytes {
		logger.Debugf("At least [%d] bytes expected. Remaining bytes = [%d]. Returning with error [%s]",
			bytesExpected, remainingBytes, ErrUnexpectedEndOfBlockfile)
		return nil, nil, ErrUnexpectedEndOfBlockfile
	}
	// skip the bytes representing the block size
	if _, err = s.reader.Discard(n); err != nil {
		return nil, nil, err
	}
	blockBytes := make([]byte, length)
	if _, err = io.ReadAtLeast(s.reader, blockBytes, int(length)); err != nil {
		logger.Debugf("Error while trying to read [%d] bytes from fileNum [%d]: %s", length, s.fileNum, err)
manish's avatar
manish committed
141
		return nil, nil, err
manish's avatar
manish committed
142
	}
manish's avatar
manish committed
143
144
145
146
	blockPlacementInfo := &blockPlacementInfo{
		fileNum:          s.fileNum,
		blockStartOffset: s.currentOffset,
		blockBytesOffset: s.currentOffset + int64(n)}
147
148
	s.currentOffset += int64(n) + int64(length)
	logger.Debugf("Returning blockbytes - length=[%d], placementInfo={%s}", len(blockBytes), blockPlacementInfo)
manish's avatar
manish committed
149
	return blockBytes, blockPlacementInfo, nil
manish's avatar
manish committed
150
151
152
153
154
155
156
157
158
159
}

func (s *blockfileStream) close() error {
	return s.file.Close()
}

///////////////////////////////////
// blockStream functions
////////////////////////////////////
func newBlockStream(rootDir string, startFileNum int, startOffset int64, endFileNum int) (*blockStream, error) {
manish's avatar
manish committed
160
	startFileStream, err := newBlockfileStream(rootDir, startFileNum, startOffset)
manish's avatar
manish committed
161
162
163
164
165
166
167
168
169
170
171
172
	if err != nil {
		return nil, err
	}
	return &blockStream{rootDir, startFileNum, endFileNum, startFileStream}, nil
}

func (s *blockStream) moveToNextBlockfileStream() error {
	var err error
	if err = s.currentFileStream.close(); err != nil {
		return err
	}
	s.currentFileNum++
manish's avatar
manish committed
173
	if s.currentFileStream, err = newBlockfileStream(s.rootDir, s.currentFileNum, 0); err != nil {
manish's avatar
manish committed
174
175
176
177
178
179
		return err
	}
	return nil
}

func (s *blockStream) nextBlockBytes() ([]byte, error) {
manish's avatar
manish committed
180
181
182
183
184
	blockBytes, _, err := s.nextBlockBytesAndPlacementInfo()
	return blockBytes, err
}

func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) {
manish's avatar
manish committed
185
	var blockBytes []byte
manish's avatar
manish committed
186
	var blockPlacementInfo *blockPlacementInfo
manish's avatar
manish committed
187
	var err error
manish's avatar
manish committed
188
	if blockBytes, blockPlacementInfo, err = s.currentFileStream.nextBlockBytesAndPlacementInfo(); err != nil {
manish's avatar
manish committed
189
190
		logger.Debugf("current file [%d]", s.currentFileNum)
		logger.Debugf("blockbytes [%d]. Err:%s", len(blockBytes), err)
manish's avatar
manish committed
191
		return nil, nil, err
manish's avatar
manish committed
192
193
	}
	logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum)
manish's avatar
manish committed
194
	if blockBytes == nil && (s.currentFileNum < s.endFileNum || s.endFileNum < 0) {
manish's avatar
manish committed
195
196
		logger.Debugf("current file [%d] exhausted. Moving to next file", s.currentFileNum)
		if err = s.moveToNextBlockfileStream(); err != nil {
manish's avatar
manish committed
197
			return nil, nil, err
manish's avatar
manish committed
198
		}
manish's avatar
manish committed
199
		return s.nextBlockBytesAndPlacementInfo()
manish's avatar
manish committed
200
	}
manish's avatar
manish committed
201
	return blockBytes, blockPlacementInfo, nil
manish's avatar
manish committed
202
203
204
205
206
}

func (s *blockStream) close() error {
	return s.currentFileStream.close()
}
207
208
209
210
211

func (i *blockPlacementInfo) String() string {
	return fmt.Sprintf("fileNum=[%d], startOffset=[%d], bytesOffset=[%d]",
		i.fileNum, i.blockStartOffset, i.blockBytesOffset)
}