Commit 1b0b9e88 authored by Lucas Kuhring's avatar Lucas Kuhring
Browse files

Implement write batch for blocks

parent 7b9fb90d
...@@ -10,6 +10,8 @@ import ( ...@@ -10,6 +10,8 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"math" "math"
"os"
"strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
...@@ -42,7 +44,7 @@ type blockfileMgr struct { ...@@ -42,7 +44,7 @@ type blockfileMgr struct {
index index index index
cpInfo *checkpointInfo cpInfo *checkpointInfo
cpInfoCond *sync.Cond cpInfoCond *sync.Cond
currentFileWriter *blockfileWriter currentFileWriter *batchedBlockfileWriter
bcInfo atomic.Value bcInfo atomic.Value
} }
...@@ -123,6 +125,18 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, ...@@ -123,6 +125,18 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
//Open a writer to the file identified by the number and truncate it to only contain the latest block //Open a writer to the file identified by the number and truncate it to only contain the latest block
// that was completely saved (file system, index, cpinfo, etc) // that was completely saved (file system, index, cpinfo, etc)
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum)) currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
strBatchSize := os.Getenv("STREAMCHAIN_WRITEBATCH")
var bfw *batchedBlockfileWriter
if strBatchSize == "" || strBatchSize == "0" {
bfw = newBatchedBlockFileWriter(currentFileWriter, 0)
} else {
bsz, _ := strconv.Atoi(strBatchSize)
bfw = newBatchedBlockFileWriter(currentFileWriter, bsz)
}
if err != nil { if err != nil {
panic(fmt.Sprintf("Could not open writer to current file: %s", err)) panic(fmt.Sprintf("Could not open writer to current file: %s", err))
} }
...@@ -139,7 +153,7 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, ...@@ -139,7 +153,7 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
// Update the manager with the checkpoint info and the file writer // Update the manager with the checkpoint info and the file writer
mgr.cpInfo = cpInfo mgr.cpInfo = cpInfo
mgr.currentFileWriter = currentFileWriter mgr.currentFileWriter = bfw
// Create a checkpoint condition (event) variable, for the goroutine waiting for // Create a checkpoint condition (event) variable, for the goroutine waiting for
// or announcing the occurrence of an event. // or announcing the occurrence of an event.
mgr.cpInfoCond = sync.NewCond(&sync.Mutex{}) mgr.cpInfoCond = sync.NewCond(&sync.Mutex{})
...@@ -228,12 +242,12 @@ func (mgr *blockfileMgr) moveToNextFile() { ...@@ -228,12 +242,12 @@ func (mgr *blockfileMgr) moveToNextFile() {
if err != nil { if err != nil {
panic(fmt.Sprintf("Could not open writer to next file: %s", err)) panic(fmt.Sprintf("Could not open writer to next file: %s", err))
} }
mgr.currentFileWriter.close() //mgr.currentFileWriter.close()
err = mgr.saveCurrentInfo(cpInfo, true) err = mgr.saveCurrentInfo(cpInfo, true)
if err != nil { if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err)) panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
} }
mgr.currentFileWriter = nextFileWriter mgr.currentFileWriter.setBlockfileWriter(nextFileWriter)
mgr.updateCheckpoint(cpInfo) mgr.updateCheckpoint(cpInfo)
} }
...@@ -326,6 +340,10 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error { ...@@ -326,6 +340,10 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error {
} }
func (mgr *blockfileMgr) syncIndex() error { func (mgr *blockfileMgr) syncIndex() error {
// Write out buffer before reading
mgr.currentFileWriter.writeOut(true)
var lastBlockIndexed uint64 var lastBlockIndexed uint64
var indexEmpty bool var indexEmpty bool
var err error var err error
...@@ -552,6 +570,10 @@ func (mgr *blockfileMgr) fetchTransactionEnvelope(lp *fileLocPointer) (*common.E ...@@ -552,6 +570,10 @@ func (mgr *blockfileMgr) fetchTransactionEnvelope(lp *fileLocPointer) (*common.E
} }
func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) { func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) {
// Write out buffer before reading
mgr.currentFileWriter.writeOut(true)
stream, err := newBlockfileStream(mgr.rootDir, lp.fileSuffixNum, int64(lp.offset)) stream, err := newBlockfileStream(mgr.rootDir, lp.fileSuffixNum, int64(lp.offset))
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -565,6 +587,10 @@ func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) { ...@@ -565,6 +587,10 @@ func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) {
} }
func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) { func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) {
// Write out buffer before reading
mgr.currentFileWriter.writeOut(true)
filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum) filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)
reader, err := newBlockfileReader(filePath) reader, err := newBlockfileReader(filePath)
if err != nil { if err != nil {
......
...@@ -7,11 +7,147 @@ SPDX-License-Identifier: Apache-2.0 ...@@ -7,11 +7,147 @@ SPDX-License-Identifier: Apache-2.0
package fsblkstorage package fsblkstorage
import ( import (
"fmt"
"os" "os"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
type batchedBlockfileWriter struct {
batch int
bfw *blockfileWriter
buffer []writeInfo
currentLen int
//updated chan struct{}
}
type writeInfo struct {
file *os.File
data []byte
}
func newBatchedBlockFileWriter(bfw *blockfileWriter, batch int) *batchedBlockfileWriter {
//return &batchedBlockfileWriter{bfw: bfw, batch: batch, buffer: make([]writeInfo, 0, batch), updated: make(chan struct{})}
return &batchedBlockfileWriter{bfw: bfw, batch: batch, buffer: make([]writeInfo, 0, batch)}
}
func (w *batchedBlockfileWriter) setBlockfileWriter(bfw *blockfileWriter) {
w.bfw.close()
w.currentLen = 0
w.bfw = bfw
}
func (w *batchedBlockfileWriter) append(b []byte, sync bool) error {
if w.batch == 0 {
return w.bfw.append(b, sync)
}
if sync {
w.buffer = append(w.buffer, writeInfo{file: w.bfw.file, data: b})
} else {
if len(w.buffer) > 0 {
last := w.buffer[len(w.buffer)-1]
last.data = append(last.data, b...)
} else {
w.buffer = append(w.buffer, writeInfo{file: w.bfw.file, data: b})
}
}
if len(w.buffer) == w.batch {
if err := w.writeOut(true); err != nil {
return err
}
//go w.writeOut(true)
}
w.currentLen += len(b)
return nil
}
/*
func (w *batchedBlockfileWriter) finalWrite() {
for {
select {
case <-time.After(time.Second * 10):
if err := w.writeOut(false); err != nil {
logger.Errorf("Error in batched write")
}
case <-w.updated:
return
}
}
}
*/
func (w *batchedBlockfileWriter) close() {
w.bfw.close()
//close(w.updated)
}
func (w *batchedBlockfileWriter) writeOut(wait bool) error {
start := time.Now()
//if wait {
// w.updated <- struct{}{}
//}
var err error
var lastFile *os.File
for _, v := range w.buffer {
if lastFile != nil && lastFile != v.file {
if err = lastFile.Sync(); err != nil {
return err
}
}
_, err = v.file.Write(v.data)
if err != nil {
return err
}
lastFile = v.file
}
if err = lastFile.Sync(); err != nil {
return err
}
//if wait {
// go w.finalWrite()
//}
fmt.Printf("wr,%d,%d,%.2f\n", time.Now().UnixNano(), len(w.buffer), time.Since(start).Seconds()*1000)
w.buffer = w.buffer[:0]
return nil
}
func (w *batchedBlockfileWriter) truncateFile(targetSize int) error {
if w.batch == 0 {
return w.bfw.truncateFile(targetSize)
}
if w.currentLen > targetSize {
lastBuf := w.buffer[len(w.buffer)-1].data
left := w.currentLen - targetSize
lastBuf = lastBuf[:(len(lastBuf) - left)]
w.currentLen = targetSize
}
return nil
}
//// WRITER //// //// WRITER ////
type blockfileWriter struct { type blockfileWriter struct {
filePath string filePath string
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment