Unverified Commit 942762e5 authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13059] Purge etcdraft WAL and Snapshot files



When a snapshot is taken, stale etcdraft WAL files should be
purged to free disk space, as well as old snapshot files.

However, we still keep several snapshot files around, in case
the latest file is corrupted, etcdraft will automatically load
an older one, until there's none left.

Change-Id: I2b8168dbc0c3e5bd56a081c104dd7dc9defbcd92
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 38c1515c
......@@ -952,6 +952,7 @@
"github.com/Knetic/govaluate",
"github.com/Shopify/sarama",
"github.com/Shopify/sarama/mocks",
"github.com/coreos/etcd/pkg/fileutil",
"github.com/coreos/etcd/raft",
"github.com/coreos/etcd/raft/raftpb",
"github.com/coreos/etcd/snap",
......
......@@ -1038,6 +1038,7 @@ var _ = Describe("Chain", func() {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Expect(c.puller.PullBlockCallCount()).Should(BeZero())
// old snapshot file is retained
Eventually(countFiles, LongEventualTimeout).Should(Equal(2))
})
})
......@@ -2206,19 +2207,33 @@ var _ = Describe("Chain", func() {
c1.cutter.CutNext = true
for i := 1; i <= 10; i++ {
var lasti uint64
var indices []uint64
// Only produce 4 blocks here, so that snapshot pruning does not occur.
// Otherwise, a slow garbage collection may prevent snapshotting from
// being triggered on next block, and assertion on number of snapshots
// would fail nondeterministically.
for i := 1; i <= 4; i++ {
err := c1.Order(env, 0)
Expect(err).NotTo(HaveOccurred())
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(i))
Eventually(c3.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(i))
Eventually(func() uint64 {
indices = etcdraft.ListSnapshots(logger, c1.opts.SnapDir)
if len(indices) == 0 {
return 0
}
return indices[len(indices)-1]
}, LongEventualTimeout).Should(BeNumerically(">", lasti))
lasti = indices[len(indices)-1]
}
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0))
network.join(2, false)
Eventually(c2.puller.PullBlockCallCount, LongEventualTimeout).Should(Equal(10))
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(10))
Eventually(c2.puller.PullBlockCallCount, LongEventualTimeout).Should(Equal(4))
Eventually(c2.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(4))
files, err := ioutil.ReadDir(c2.opts.SnapDir)
Expect(err).NotTo(HaveOccurred())
......@@ -2230,7 +2245,7 @@ var _ = Describe("Chain", func() {
network.exec(
func(c *chain) {
Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(11))
Eventually(func() int { return c.support.WriteBlockCallCount() }, LongEventualTimeout).Should(Equal(5))
})
})
})
......
......@@ -7,8 +7,13 @@ SPDX-License-Identifier: Apache-2.0
package etcdraft
import (
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
......@@ -18,6 +23,13 @@ import (
"github.com/pkg/errors"
)
// MaxSnapshotFiles defines max number of etcd/raft snapshot files to retain
// on filesystem. Snapshot files are read from newest to oldest, until first
// intact file is found. The more snapshot files we keep around, the more we
// mitigate the impact of a corrupted snapshots. This is exported for testing
// purpose. This MUST be greater equal than 1.
var MaxSnapshotFiles = 5
// MemoryStorage is currently backed by etcd/raft.MemoryStorage. This interface is
// defined to expose dependencies of fsm so that it may be swapped in the
// future. TODO(jay) Add other necessary methods to this interface once we need
......@@ -35,11 +47,17 @@ type MemoryStorage interface {
type RaftStorage struct {
SnapshotCatchUpEntries uint64
walDir string
snapDir string
lg *flogging.FabricLogger
ram MemoryStorage
wal *wal.WAL
snap *snap.Snapshotter
// a queue that keeps track of indices of snapshots on disk
snapshotIndex []uint64
}
// CreateStorage attempts to create a storage to persist etcd/raft data.
......@@ -91,7 +109,62 @@ func CreateStorage(
lg.Debugf("Appending %d entries to memory storage", len(ents))
ram.Append(ents) // MemoryStorage.Append always return nil
return &RaftStorage{lg: lg, ram: ram, wal: w, snap: sn}, nil
return &RaftStorage{
lg: lg,
ram: ram,
wal: w,
snap: sn,
walDir: walDir,
snapDir: snapDir,
snapshotIndex: ListSnapshots(lg, snapDir),
}, nil
}
// ListSnapshots returns a list of RaftIndex of snapshots stored on disk.
// If a file is corrupted, rename the file.
func ListSnapshots(logger *flogging.FabricLogger, snapDir string) []uint64 {
dir, err := os.Open(snapDir)
if err != nil {
logger.Errorf("Failed to open snapshot directory %s: %s", snapDir, err)
return nil
}
defer dir.Close()
filenames, err := dir.Readdirnames(-1)
if err != nil {
logger.Errorf("Failed to read snapshot files: %s", err)
return nil
}
snapfiles := []string{}
for i := range filenames {
if strings.HasSuffix(filenames[i], ".snap") {
snapfiles = append(snapfiles, filenames[i])
}
}
sort.Sort(sort.StringSlice(snapfiles))
var snapshots []uint64
for _, snapfile := range snapfiles {
fpath := filepath.Join(snapDir, snapfile)
s, err := snap.Read(fpath)
if err != nil {
logger.Errorf("Snapshot file %s is corrupted: %s", fpath, err)
broken := fpath + ".broken"
if err = os.Rename(fpath, broken); err != nil {
logger.Errorf("Failed to rename corrupted snapshot file %s to %s: %s", fpath, broken, err)
} else {
logger.Debugf("Renaming corrupted snapshot file %s to %s", fpath, broken)
}
continue
}
snapshots = append(snapshots, s.Metadata.Index)
}
return snapshots
}
func createSnapshotter(snapDir string) (*snap.Snapshotter, error) {
......@@ -100,7 +173,6 @@ func createSnapshotter(snapDir string) (*snap.Snapshotter, error) {
}
return snap.New(snapDir), nil
}
func createWAL(lg *flogging.FabricLogger, walDir string, snapshot *raftpb.Snapshot) (*wal.WAL, error) {
......@@ -211,6 +283,8 @@ func (rs *RaftStorage) TakeSnapshot(i uint64, cs raftpb.ConfState, data []byte)
return err
}
rs.snapshotIndex = append(rs.snapshotIndex, snap.Metadata.Index)
// Keep some entries in memory for slow followers to catchup
if i > rs.SnapshotCatchUpEntries {
compacti := i - rs.SnapshotCatchUpEntries
......@@ -225,9 +299,104 @@ func (rs *RaftStorage) TakeSnapshot(i uint64, cs raftpb.ConfState, data []byte)
}
rs.lg.Infof("Snapshot is taken at index %d", i)
rs.gc()
return nil
}
// gc collects etcd/raft garbage files, namely wal and snapshot files
func (rs *RaftStorage) gc() {
if len(rs.snapshotIndex) < MaxSnapshotFiles {
rs.lg.Debugf("Snapshots on disk (%d) < limit (%d), no need to purge wal/snapshot",
len(rs.snapshotIndex), MaxSnapshotFiles)
return
}
rs.snapshotIndex = rs.snapshotIndex[len(rs.snapshotIndex)-MaxSnapshotFiles:]
rs.purgeWAL()
rs.purgeSnap()
}
func (rs *RaftStorage) purgeWAL() {
retain := rs.snapshotIndex[0]
walFiles, err := fileutil.ReadDir(rs.walDir)
if err != nil {
rs.lg.Errorf("Failed to read WAL directory %s: %s", rs.walDir, err)
}
var files []string
for _, f := range walFiles {
if !strings.HasSuffix(f, ".wal") {
continue
}
var seq, index uint64
fmt.Sscanf(f, "%016x-%016x.wal", &seq, &index)
if index >= retain {
break
}
files = append(files, filepath.Join(rs.walDir, f))
}
if len(files) <= 1 {
// we need to keep one wal segment with index smaller than snapshot.
// see comment on wal.ReleaseLockTo for the more details.
return
}
rs.purge(files[:len(files)-1])
}
func (rs *RaftStorage) purgeSnap() {
snapFiles, err := fileutil.ReadDir(rs.snapDir)
if err != nil {
rs.lg.Errorf("Failed to read Snapshot directory %s: %s", rs.snapDir, err)
}
var files []string
for _, f := range snapFiles {
if !strings.HasSuffix(f, ".snap") {
if strings.HasPrefix(f, ".broken") {
rs.lg.Warnf("Found broken snapshot file %s, it can be removed manually", f)
}
continue
}
files = append(files, filepath.Join(rs.snapDir, f))
}
l := len(files)
if l <= MaxSnapshotFiles {
return
}
rs.purge(files[:l-MaxSnapshotFiles]) // retain last MaxSnapshotFiles snapshot files
}
func (rs *RaftStorage) purge(files []string) {
for _, file := range files {
l, err := fileutil.TryLockFile(file, os.O_WRONLY, fileutil.PrivateFileMode)
if err != nil {
rs.lg.Debugf("Failed to lock %s, abort purging", file)
break
}
if err = os.Remove(file); err != nil {
rs.lg.Errorf("Failed to remove %s: %s", file, err)
} else {
rs.lg.Debugf("Purged file %s", file)
}
if err = l.Close(); err != nil {
rs.lg.Errorf("Failed to close file lock %s: %s", l.Name(), err)
}
}
}
// ApplySnapshot applies snapshot to local memory storage
func (rs *RaftStorage) ApplySnapshot(snap raftpb.Snapshot) {
if err := rs.ram.ApplySnapshot(snap); err != nil {
......
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package etcdraft
import (
"io/ioutil"
"os"
"path"
"path/filepath"
"sort"
"strings"
"testing"
"github.com/coreos/etcd/pkg/fileutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/wal"
"github.com/hyperledger/fabric/common/flogging"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
var (
err error
logger *flogging.FabricLogger
dataDir, walDir, snapDir string
store *RaftStorage
)
func setup(t *testing.T) {
logger = flogging.NewFabricLogger(zap.NewNop())
ram := raft.NewMemoryStorage()
dataDir, err = ioutil.TempDir("", "etcdraft-")
assert.NoError(t, err)
walDir, snapDir = path.Join(dataDir, "wal"), path.Join(dataDir, "snapshot")
store, err = CreateStorage(logger, walDir, snapDir, ram)
assert.NoError(t, err)
}
func clean(t *testing.T) {
err = store.Close()
assert.NoError(t, err)
err = os.RemoveAll(dataDir)
assert.NoError(t, err)
}
func TestTakeSnapshot(t *testing.T) {
// To make this test more understandable, here's a list
// of expected wal files:
// (wal file name format: seq-index.wal, index is the first index in this file)
//
// 0000000000000000-0000000000000000.wal (this is created initially by etcd/wal)
// 0000000000000001-0000000000000001.wal
// 0000000000000002-0000000000000002.wal
// 0000000000000003-0000000000000003.wal
// 0000000000000004-0000000000000004.wal
// 0000000000000005-0000000000000005.wal
// 0000000000000006-0000000000000006.wal
// 0000000000000007-0000000000000007.wal
// 0000000000000008-0000000000000008.wal
// 0000000000000009-0000000000000009.wal
// 000000000000000a-000000000000000a.wal
fileCount := func(files []string, suffix string) (c int) {
for _, f := range files {
if strings.HasSuffix(f, suffix) {
c++
}
}
return
}
assertFileCount := func(t *testing.T, wal, snap int) {
files, err := fileutil.ReadDir(walDir)
assert.NoError(t, err)
assert.Equal(t, wal, fileCount(files, ".wal"), "WAL file count mismatch")
files, err = fileutil.ReadDir(snapDir)
assert.NoError(t, err)
assert.Equal(t, snap, fileCount(files, ".snap"), "Snap file count mismatch")
}
t.Run("Good", func(t *testing.T) {
t.Run("MaxSnapshotFiles==1", func(t *testing.T) {
backup := MaxSnapshotFiles
MaxSnapshotFiles = 1
defer func() { MaxSnapshotFiles = backup }()
setup(t)
defer clean(t)
// set SegmentSizeBytes to a small value so that
// every entry persisted to wal would result in
// a new wal being created.
oldSegmentSizeBytes := wal.SegmentSizeBytes
wal.SegmentSizeBytes = 10
defer func() {
wal.SegmentSizeBytes = oldSegmentSizeBytes
}()
// create 10 new wal files
for i := 0; i < 10; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
raftpb.HardState{},
raftpb.Snapshot{},
)
}
assertFileCount(t, 11, 0)
store.TakeSnapshot(uint64(3), raftpb.ConfState{Nodes: []uint64{1}}, make([]byte, 10))
// Snapshot is taken at index 3, which releases lock up to 2 (excl.).
// This results in wal files with index [0, 1] being purged (2 files)
assertFileCount(t, 9, 1)
store.TakeSnapshot(uint64(5), raftpb.ConfState{Nodes: []uint64{1}}, make([]byte, 10))
// Snapshot is taken at index 5, which releases lock up to 4 (excl.).
// This results in wal files with index [2, 3] being purged (2 files)
assertFileCount(t, 7, 1)
t.Logf("Close the storage and create a new one based on existing files")
err = store.Close()
assert.NoError(t, err)
ram := raft.NewMemoryStorage()
store, err = CreateStorage(logger, walDir, snapDir, ram)
assert.NoError(t, err)
store.TakeSnapshot(uint64(7), raftpb.ConfState{Nodes: []uint64{1}}, make([]byte, 10))
// Snapshot is taken at index 7, which releases lock up to 6 (excl.).
// This results in wal files with index [4, 5] being purged (2 file)
assertFileCount(t, 5, 1)
store.TakeSnapshot(uint64(9), raftpb.ConfState{Nodes: []uint64{1}}, make([]byte, 10))
// Snapshot is taken at index 9, which releases lock up to 8 (excl.).
// This results in wal files with index [6, 7] being purged (2 file)
assertFileCount(t, 3, 1)
})
t.Run("MaxSnapshotFiles==2", func(t *testing.T) {
backup := MaxSnapshotFiles
MaxSnapshotFiles = 2
defer func() { MaxSnapshotFiles = backup }()
setup(t)
defer clean(t)
// set SegmentSizeBytes to a small value so that
// every entry persisted to wal would result in
// a new wal being created.
oldSegmentSizeBytes := wal.SegmentSizeBytes
wal.SegmentSizeBytes = 10
defer func() {
wal.SegmentSizeBytes = oldSegmentSizeBytes
}()
// create 10 new wal files
for i := 0; i < 10; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
raftpb.HardState{},
raftpb.Snapshot{},
)
}
assertFileCount(t, 11, 0)
// Only one snapshot is taken, no wal pruning happened
store.TakeSnapshot(uint64(3), raftpb.ConfState{Nodes: []uint64{1}}, make([]byte, 10))
assertFileCount(t, 11, 1)
// Two snapshots at index 3, 5. And we keep one extra wal file prior to oldest snapshot.
// So we should have pruned wal file with index [0, 1]
store.TakeSnapshot(uint64(5), raftpb.ConfState{Nodes: []uint64{1}}, make([]byte, 10))
assertFileCount(t, 9, 2)
t.Logf("Close the storage and create a new one based on existing files")
err = store.Close()
assert.NoError(t, err)
ram := raft.NewMemoryStorage()
store, err = CreateStorage(logger, walDir, snapDir, ram)
assert.NoError(t, err)
// Two snapshots at index 5, 7. And we keep one extra wal file prior to oldest snapshot.
// So we should have pruned wal file with index [2, 3]
store.TakeSnapshot(uint64(7), raftpb.ConfState{Nodes: []uint64{1}}, make([]byte, 10))
assertFileCount(t, 7, 2)
// Two snapshots at index 7, 9. And we keep one extra wal file prior to oldest snapshot.
// So we should have pruned wal file with index [4, 5]
store.TakeSnapshot(uint64(9), raftpb.ConfState{Nodes: []uint64{1}}, make([]byte, 10))
assertFileCount(t, 5, 2)
})
})
t.Run("Bad", func(t *testing.T) {
t.Run("MaxSnapshotFiles==2", func(t *testing.T) {
// If latest snapshot file is corrupted, storage should be able
// to recover from an older one.
backup := MaxSnapshotFiles
MaxSnapshotFiles = 2
defer func() { MaxSnapshotFiles = backup }()
setup(t)
defer clean(t)
// set SegmentSizeBytes to a small value so that
// every entry persisted to wal would result in
// a new wal being created.
oldSegmentSizeBytes := wal.SegmentSizeBytes
wal.SegmentSizeBytes = 10
defer func() {
wal.SegmentSizeBytes = oldSegmentSizeBytes
}()
// create 10 new wal files
for i := 0; i < 10; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
raftpb.HardState{},
raftpb.Snapshot{},
)
}
assertFileCount(t, 11, 0)
// Only one snapshot is taken, no wal pruning happened
store.TakeSnapshot(uint64(3), raftpb.ConfState{Nodes: []uint64{1}}, make([]byte, 10))
assertFileCount(t, 11, 1)
// Two snapshots at index 3, 5. And we keep one extra wal file prior to oldest snapshot.
// So we should have pruned wal file with index [0, 1]
store.TakeSnapshot(uint64(5), raftpb.ConfState{Nodes: []uint64{1}}, make([]byte, 10))
assertFileCount(t, 9, 2)
d, err := os.Open(snapDir)
assert.NoError(t, err)
defer d.Close()
names, err := d.Readdirnames(-1)
assert.NoError(t, err)
sort.Sort(sort.Reverse(sort.StringSlice(names)))
t.Logf("Corrupt latest snapshot file: %s", filepath.Join(snapDir, names[0]))
f, err := os.OpenFile(filepath.Join(snapDir, names[0]), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666)
assert.NoError(t, err)
_, err = f.WriteString("Corrupted Snapshot")
assert.NoError(t, err)
f.Close()
t.Logf("Close the storage and create a new one based on existing files")
err = store.Close()
assert.NoError(t, err)
ram := raft.NewMemoryStorage()
store, err = CreateStorage(logger, walDir, snapDir, ram)
assert.NoError(t, err)
// Corrupted snapshot file should've been renamed
assertFileCount(t, 9, 1)
files, err := fileutil.ReadDir(snapDir)
assert.NoError(t, err)
assert.Equal(t, 1, fileCount(files, ".broken"))
store.TakeSnapshot(uint64(7), raftpb.ConfState{Nodes: []uint64{1}}, make([]byte, 10))
assertFileCount(t, 9, 2)
store.TakeSnapshot(uint64(9), raftpb.ConfState{Nodes: []uint64{1}}, make([]byte, 10))
assertFileCount(t, 5, 2)
})
})
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment