blockfile_rw.go 4.38 KB
Newer Older
manish's avatar
manish committed
1
/*
2
Copyright IBM Corp. All Rights Reserved.
manish's avatar
manish committed
3

4
SPDX-License-Identifier: Apache-2.0
manish's avatar
manish committed
5
6
7
8
9
10
*/

package fsblkstorage

import (
	"os"
11
	"time"
12
13

	"github.com/pkg/errors"
manish's avatar
manish committed
14
15
)

16
type batchedBlockfileWriter struct {
17
18
19
20
21
22
	batch         int
	bfw           *blockfileWriter
	buffer        []writeInfo
	currentLen    int
	currentBuffer []byte
	updated       chan struct{}
23
24
25
26
27
28
29
30
}

type writeInfo struct {
	file *os.File
	data []byte
}

func newBatchedBlockFileWriter(bfw *blockfileWriter, batch int) *batchedBlockfileWriter {
31
32
33
34
35
	b := &batchedBlockfileWriter{bfw: bfw, batch: batch, buffer: make([]writeInfo, 0, batch), updated: make(chan struct{})}

	go b.finalWrite()

	return b
36
37
38
}

func (w *batchedBlockfileWriter) setBlockfileWriter(bfw *blockfileWriter) {
Lucas Kuhring's avatar
Lucas Kuhring committed
39
40
41
42
43

	if w.batch == 0 {
		w.bfw.close()
	}

44
45
46
47
48
49
50
51
52
53
	w.currentLen = 0
	w.bfw = bfw
}

func (w *batchedBlockfileWriter) append(b []byte, sync bool) error {

	if w.batch == 0 {
		return w.bfw.append(b, sync)
	}

54
55
56
57
58
59
	if w.currentBuffer == nil {
		w.currentBuffer = make([]byte, 0, len(b))
	}

	w.currentBuffer = append(w.currentBuffer, b...)

60
	if sync {
61
62
		w.buffer = append(w.buffer, writeInfo{file: w.bfw.file, data: append([]byte(nil), w.currentBuffer...)})
		w.currentBuffer = w.currentBuffer[:0]
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
	}

	if len(w.buffer) == w.batch {
		if err := w.writeOut(true); err != nil {
			return err
		}
	}

	w.currentLen += len(b)

	return nil
}

func (w *batchedBlockfileWriter) finalWrite() {

	for {
		select {
		case <-time.After(time.Second * 10):
			if err := w.writeOut(false); err != nil {
82
				logger.Errorf("Error in batched write: %v", err)
83
84
85
86
87
88
			}
		case <-w.updated:
			return
		}
	}
}
89

Lucas Kuhring's avatar
Lucas Kuhring committed
90
func (w *batchedBlockfileWriter) close() error {
91

Lucas Kuhring's avatar
Lucas Kuhring committed
92
93
94
	if err := w.writeOut(true); err != nil {
		return err
	}
95

Lucas Kuhring's avatar
Lucas Kuhring committed
96
97
98
	if err := w.bfw.close(); err != nil {
		return err
	}
99

Lucas Kuhring's avatar
Lucas Kuhring committed
100
101
102
103
	return nil
}

func (w *batchedBlockfileWriter) writeOut(wait bool) error {
104
105
106
107

	if wait {
		go w.finalWrite()
	}
108

109
	w.updated <- struct{}{}
110
111
112
113
114
115
116

	var err error

	var lastFile *os.File

	for _, v := range w.buffer {

117
		if lastFile != nil && lastFile.Name() != v.file.Name() {
118
119
120
			if err = lastFile.Sync(); err != nil {
				return err
			}
Lucas Kuhring's avatar
Lucas Kuhring committed
121
122
123
			if err := lastFile.Close(); err != nil {
				return err
			}
124
125
126
127
128
129
130
131
132
133
134
		}

		_, err = v.file.Write(v.data)

		if err != nil {
			return err
		}

		lastFile = v.file
	}

135
136
137
138
	if lastFile != nil {
		if err = lastFile.Sync(); err != nil {
			return err
		}
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
	}

	w.buffer = w.buffer[:0]

	return nil
}

func (w *batchedBlockfileWriter) truncateFile(targetSize int) error {

	if w.batch == 0 {
		return w.bfw.truncateFile(targetSize)
	}

	if w.currentLen > targetSize {
		lastBuf := w.buffer[len(w.buffer)-1].data
		left := w.currentLen - targetSize
		lastBuf = lastBuf[:(len(lastBuf) - left)]
		w.currentLen = targetSize
	}

Lucas Kuhring's avatar
Lucas Kuhring committed
159
160
161
162
	if err := w.writeOut(false); err != nil {
		return err
	}

163
164
165
	return nil
}

manish's avatar
manish committed
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
////  WRITER ////
type blockfileWriter struct {
	filePath string
	file     *os.File
}

func newBlockfileWriter(filePath string) (*blockfileWriter, error) {
	writer := &blockfileWriter{filePath: filePath}
	return writer, writer.open()
}

func (w *blockfileWriter) truncateFile(targetSize int) error {
	fileStat, err := w.file.Stat()
	if err != nil {
		return err
	}
	if fileStat.Size() > int64(targetSize) {
		w.file.Truncate(int64(targetSize))
	}
	return nil
}

manish's avatar
manish committed
188
func (w *blockfileWriter) append(b []byte, sync bool) error {
manish's avatar
manish committed
189
190
191
192
	_, err := w.file.Write(b)
	if err != nil {
		return err
	}
manish's avatar
manish committed
193
194
195
	if sync {
		return w.file.Sync()
	}
manish's avatar
manish committed
196
197
198
199
200
201
	return nil
}

func (w *blockfileWriter) open() error {
	file, err := os.OpenFile(w.filePath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
	if err != nil {
202
		return errors.Wrapf(err, "error opening block file writer for file %s", w.filePath)
manish's avatar
manish committed
203
204
205
206
207
208
	}
	w.file = file
	return nil
}

func (w *blockfileWriter) close() error {
Lucas Kuhring's avatar
Lucas Kuhring committed
209

210
	return errors.WithStack(w.file.Close())
manish's avatar
manish committed
211
212
213
214
215
216
217
218
219
220
}

////  READER ////
type blockfileReader struct {
	file *os.File
}

func newBlockfileReader(filePath string) (*blockfileReader, error) {
	file, err := os.OpenFile(filePath, os.O_RDONLY, 0600)
	if err != nil {
221
		return nil, errors.Wrapf(err, "error opening block file reader for file %s", filePath)
manish's avatar
manish committed
222
223
224
225
226
227
228
229
230
	}
	reader := &blockfileReader{file}
	return reader, nil
}

func (r *blockfileReader) read(offset int, length int) ([]byte, error) {
	b := make([]byte, length)
	_, err := r.file.ReadAt(b, int64(offset))
	if err != nil {
231
		return nil, errors.Wrapf(err, "error reading block file for offset %d and length %d", offset, length)
manish's avatar
manish committed
232
233
234
235
236
	}
	return b, nil
}

func (r *blockfileReader) close() error {
237
	return errors.WithStack(r.file.Close())
manish's avatar
manish committed
238
}