blockfile_rw.go 4.31 KB
Newer Older
manish's avatar
manish committed
1
/*
2
Copyright IBM Corp. All Rights Reserved.
manish's avatar
manish committed
3

4
SPDX-License-Identifier: Apache-2.0
manish's avatar
manish committed
5
6
7
8
9
10
*/

package fsblkstorage

import (
	"os"
11
	"sync"
12
	"time"
13
14

	"github.com/pkg/errors"
manish's avatar
manish committed
15
16
)

17
18
var lock = sync.RWMutex{}

19
type batchedBlockfileWriter struct {
20
21
22
23
24
25
	batch         int
	bfw           *blockfileWriter
	buffer        []writeInfo
	currentLen    int
	currentBuffer []byte
	updated       chan struct{}
26
27
28
29
30
31
32
33
}

type writeInfo struct {
	file *os.File
	data []byte
}

func newBatchedBlockFileWriter(bfw *blockfileWriter, batch int) *batchedBlockfileWriter {
34
35
36
37
38
	b := &batchedBlockfileWriter{bfw: bfw, batch: batch, buffer: make([]writeInfo, 0, batch), updated: make(chan struct{})}

	go b.finalWrite()

	return b
39
40
41
42
43
44
45
46
47
48
49
50
51
52
}

func (w *batchedBlockfileWriter) setBlockfileWriter(bfw *blockfileWriter) {
	w.bfw.close()
	w.currentLen = 0
	w.bfw = bfw
}

func (w *batchedBlockfileWriter) append(b []byte, sync bool) error {

	if w.batch == 0 {
		return w.bfw.append(b, sync)
	}

53
54
55
56
57
58
	if w.currentBuffer == nil {
		w.currentBuffer = make([]byte, 0, len(b))
	}

	w.currentBuffer = append(w.currentBuffer, b...)

59
	if sync {
60
61
		w.buffer = append(w.buffer, writeInfo{file: w.bfw.file, data: append([]byte(nil), w.currentBuffer...)})
		w.currentBuffer = w.currentBuffer[:0]
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
	}

	if len(w.buffer) == w.batch {
		if err := w.writeOut(true); err != nil {
			return err
		}
	}

	w.currentLen += len(b)

	return nil
}

func (w *batchedBlockfileWriter) finalWrite() {

	for {
		select {
		case <-time.After(time.Second * 10):
			if err := w.writeOut(false); err != nil {
81
				logger.Errorf("Error in batched write: %v", err)
82
83
84
85
86
87
			}
		case <-w.updated:
			return
		}
	}
}
88

89
90
91
92
93
94
func (w *batchedBlockfileWriter) close() {
	w.bfw.close()
}

func (w *batchedBlockfileWriter) writeOut(wait bool) error {

95
96
97
98
99
100
101
	//lock.Lock()

	//start := time.Now()

	if wait {
		go w.finalWrite()
	}
102

103
	w.updated <- struct{}{}
104
105
106
107
108
109
110

	var err error

	var lastFile *os.File

	for _, v := range w.buffer {

111
		if lastFile != nil && lastFile.Name() != v.file.Name() {
112
113
114
115
116
117
118
119
120
121
122
123
124
125
			if err = lastFile.Sync(); err != nil {
				return err
			}
		}

		_, err = v.file.Write(v.data)

		if err != nil {
			return err
		}

		lastFile = v.file
	}

126
127
128
129
	if lastFile != nil {
		if err = lastFile.Sync(); err != nil {
			return err
		}
130
131
	}

132
	//logger.Errorf("wr,%d,%d,%.2f\n", time.Now().UnixNano(), len(w.buffer), time.Since(start).Seconds()*1000)
133
134
135

	w.buffer = w.buffer[:0]

136
137
	//lock.Unlock()

138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
	return nil
}

func (w *batchedBlockfileWriter) truncateFile(targetSize int) error {

	if w.batch == 0 {
		return w.bfw.truncateFile(targetSize)
	}

	if w.currentLen > targetSize {
		lastBuf := w.buffer[len(w.buffer)-1].data
		left := w.currentLen - targetSize
		lastBuf = lastBuf[:(len(lastBuf) - left)]
		w.currentLen = targetSize
	}

	return nil
}

manish's avatar
manish committed
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
////  WRITER ////
type blockfileWriter struct {
	filePath string
	file     *os.File
}

func newBlockfileWriter(filePath string) (*blockfileWriter, error) {
	writer := &blockfileWriter{filePath: filePath}
	return writer, writer.open()
}

func (w *blockfileWriter) truncateFile(targetSize int) error {
	fileStat, err := w.file.Stat()
	if err != nil {
		return err
	}
	if fileStat.Size() > int64(targetSize) {
		w.file.Truncate(int64(targetSize))
	}
	return nil
}

manish's avatar
manish committed
179
func (w *blockfileWriter) append(b []byte, sync bool) error {
manish's avatar
manish committed
180
181
182
183
	_, err := w.file.Write(b)
	if err != nil {
		return err
	}
manish's avatar
manish committed
184
185
186
	if sync {
		return w.file.Sync()
	}
manish's avatar
manish committed
187
188
189
190
191
192
	return nil
}

func (w *blockfileWriter) open() error {
	file, err := os.OpenFile(w.filePath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
	if err != nil {
193
		return errors.Wrapf(err, "error opening block file writer for file %s", w.filePath)
manish's avatar
manish committed
194
195
196
197
198
199
	}
	w.file = file
	return nil
}

func (w *blockfileWriter) close() error {
200
	return errors.WithStack(w.file.Close())
manish's avatar
manish committed
201
202
203
204
205
206
207
208
209
210
}

////  READER ////
type blockfileReader struct {
	file *os.File
}

func newBlockfileReader(filePath string) (*blockfileReader, error) {
	file, err := os.OpenFile(filePath, os.O_RDONLY, 0600)
	if err != nil {
211
		return nil, errors.Wrapf(err, "error opening block file reader for file %s", filePath)
manish's avatar
manish committed
212
213
214
215
216
217
218
219
220
	}
	reader := &blockfileReader{file}
	return reader, nil
}

func (r *blockfileReader) read(offset int, length int) ([]byte, error) {
	b := make([]byte, length)
	_, err := r.file.ReadAt(b, int64(offset))
	if err != nil {
221
		return nil, errors.Wrapf(err, "error reading block file for offset %d and length %d", offset, length)
manish's avatar
manish committed
222
223
224
225
226
	}
	return b, nil
}

func (r *blockfileReader) close() error {
227
	return errors.WithStack(r.file.Close())
manish's avatar
manish committed
228
}