blockfile_rw.go 4.29 KB
Newer Older
manish's avatar
manish committed
1
/*
2
Copyright IBM Corp. All Rights Reserved.
manish's avatar
manish committed
3

4
SPDX-License-Identifier: Apache-2.0
manish's avatar
manish committed
5
6
7
8
9
*/

package fsblkstorage

import (
10
	"fmt"
manish's avatar
manish committed
11
	"os"
12
	"time"
13
14

	"github.com/pkg/errors"
manish's avatar
manish committed
15
16
)

17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
type batchedBlockfileWriter struct {
	batch      int
	bfw        *blockfileWriter
	buffer     []writeInfo
	currentLen int
	//updated    chan struct{}
}

type writeInfo struct {
	file *os.File
	data []byte
}

func newBatchedBlockFileWriter(bfw *blockfileWriter, batch int) *batchedBlockfileWriter {
	//return &batchedBlockfileWriter{bfw: bfw, batch: batch, buffer: make([]writeInfo, 0, batch), updated: make(chan struct{})}
	return &batchedBlockfileWriter{bfw: bfw, batch: batch, buffer: make([]writeInfo, 0, batch)}
}

func (w *batchedBlockfileWriter) setBlockfileWriter(bfw *blockfileWriter) {
	w.bfw.close()
	w.currentLen = 0
	w.bfw = bfw
}

func (w *batchedBlockfileWriter) append(b []byte, sync bool) error {

	if w.batch == 0 {
		return w.bfw.append(b, sync)
	}

	if sync {
		w.buffer = append(w.buffer, writeInfo{file: w.bfw.file, data: b})
	} else {
		if len(w.buffer) > 0 {
			last := w.buffer[len(w.buffer)-1]
			last.data = append(last.data, b...)
		} else {
			w.buffer = append(w.buffer, writeInfo{file: w.bfw.file, data: b})
		}
	}

	if len(w.buffer) == w.batch {
		if err := w.writeOut(true); err != nil {
			return err
		}

		//go w.writeOut(true)
	}

	w.currentLen += len(b)

	return nil
}

/*
func (w *batchedBlockfileWriter) finalWrite() {

	for {
		select {
		case <-time.After(time.Second * 10):
			if err := w.writeOut(false); err != nil {
				logger.Errorf("Error in batched write")
			}
		case <-w.updated:
			return
		}
	}
}
*/
func (w *batchedBlockfileWriter) close() {
	w.bfw.close()
	//close(w.updated)
}

func (w *batchedBlockfileWriter) writeOut(wait bool) error {

	start := time.Now()

	//if wait {
	//	w.updated <- struct{}{}
	//}

	var err error

	var lastFile *os.File

	for _, v := range w.buffer {

		if lastFile != nil && lastFile != v.file {
			if err = lastFile.Sync(); err != nil {
				return err
			}
		}

		_, err = v.file.Write(v.data)

		if err != nil {
			return err
		}

		lastFile = v.file
	}

	if err = lastFile.Sync(); err != nil {
		return err
	}

	//if wait {
	//	go w.finalWrite()
	//}

	fmt.Printf("wr,%d,%d,%.2f\n", time.Now().UnixNano(), len(w.buffer), time.Since(start).Seconds()*1000)

	w.buffer = w.buffer[:0]

	return nil
}

func (w *batchedBlockfileWriter) truncateFile(targetSize int) error {

	if w.batch == 0 {
		return w.bfw.truncateFile(targetSize)
	}

	if w.currentLen > targetSize {
		lastBuf := w.buffer[len(w.buffer)-1].data
		left := w.currentLen - targetSize
		lastBuf = lastBuf[:(len(lastBuf) - left)]
		w.currentLen = targetSize
	}

	return nil
}

manish's avatar
manish committed
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
////  WRITER ////
type blockfileWriter struct {
	filePath string
	file     *os.File
}

func newBlockfileWriter(filePath string) (*blockfileWriter, error) {
	writer := &blockfileWriter{filePath: filePath}
	return writer, writer.open()
}

func (w *blockfileWriter) truncateFile(targetSize int) error {
	fileStat, err := w.file.Stat()
	if err != nil {
		return err
	}
	if fileStat.Size() > int64(targetSize) {
		w.file.Truncate(int64(targetSize))
	}
	return nil
}

manish's avatar
manish committed
173
func (w *blockfileWriter) append(b []byte, sync bool) error {
manish's avatar
manish committed
174
175
176
177
	_, err := w.file.Write(b)
	if err != nil {
		return err
	}
manish's avatar
manish committed
178
179
180
	if sync {
		return w.file.Sync()
	}
manish's avatar
manish committed
181
182
183
184
185
186
	return nil
}

func (w *blockfileWriter) open() error {
	file, err := os.OpenFile(w.filePath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660)
	if err != nil {
187
		return errors.Wrapf(err, "error opening block file writer for file %s", w.filePath)
manish's avatar
manish committed
188
189
190
191
192
193
	}
	w.file = file
	return nil
}

func (w *blockfileWriter) close() error {
194
	return errors.WithStack(w.file.Close())
manish's avatar
manish committed
195
196
197
198
199
200
201
202
203
204
}

////  READER ////
type blockfileReader struct {
	file *os.File
}

func newBlockfileReader(filePath string) (*blockfileReader, error) {
	file, err := os.OpenFile(filePath, os.O_RDONLY, 0600)
	if err != nil {
205
		return nil, errors.Wrapf(err, "error opening block file reader for file %s", filePath)
manish's avatar
manish committed
206
207
208
209
210
211
212
213
214
	}
	reader := &blockfileReader{file}
	return reader, nil
}

func (r *blockfileReader) read(offset int, length int) ([]byte, error) {
	b := make([]byte, length)
	_, err := r.file.ReadAt(b, int64(offset))
	if err != nil {
215
		return nil, errors.Wrapf(err, "error reading block file for offset %d and length %d", offset, length)
manish's avatar
manish committed
216
217
218
219
220
	}
	return b, nil
}

func (r *blockfileReader) close() error {
221
	return errors.WithStack(r.file.Close())
manish's avatar
manish committed
222
}