Unverified Commit 566562e7 authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13656] Size-based snapshotting



Instead of taking snapshot every N blocks, this CR
changes it to taking snapshot every N bytes.

This also sets default SnapshotInterval to 100MB, if
it's unset. Otherwise data in memory is never compacted
till OOM.

Meanwhile, DefaultSnapshotCatchUpEntries is shrunk so
it does not take too much space to preserve extra entries
every time a snapshot is taken. Slow nodes are catching up
using blockpuller, which is also efficient.

Change-Id: I79cfeb8652fcbafdeb5793bf4f06267b95a858d6
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 89e67f35
......@@ -194,11 +194,12 @@ var genesisDefaults = TopLevel{
},
EtcdRaft: &etcdraft.Metadata{
Options: &etcdraft.Options{
TickInterval: 500,
ElectionTick: 10,
HeartbeatTick: 1,
MaxInflightMsgs: 256,
MaxSizePerMsg: 1048576,
TickInterval: 500,
ElectionTick: 10,
HeartbeatTick: 1,
MaxInflightMsgs: 256,
MaxSizePerMsg: 1048576,
SnapshotInterval: 100 * 1024 * 1024, // 100MB
},
},
},
......@@ -423,6 +424,10 @@ loop:
logger.Infof("Orderer.EtcdRaft.Options.MaxSizePerMsg unset, setting to %v", genesisDefaults.Orderer.EtcdRaft.Options.MaxSizePerMsg)
ord.EtcdRaft.Options.MaxSizePerMsg = genesisDefaults.Orderer.EtcdRaft.Options.MaxSizePerMsg
case ord.EtcdRaft.Options.SnapshotInterval == 0:
logger.Infof("Orderer.EtcdRaft.Options.SnapshotInterval unset, setting to %v", genesisDefaults.Orderer.EtcdRaft.Options.SnapshotInterval)
ord.EtcdRaft.Options.SnapshotInterval = genesisDefaults.Orderer.EtcdRaft.Options.SnapshotInterval
case len(ord.EtcdRaft.Consenters) == 0:
logger.Panicf("%s configuration did not specify any consenter", etcdraft.TypeKey)
......
......@@ -186,7 +186,7 @@ var _ = Describe("EndToEnd Crash Fault Tolerance", func() {
files, err := ioutil.ReadDir(path.Join(o2SnapDir, "testchannel"))
Expect(err).NotTo(HaveOccurred())
return len(files)
}).Should(Equal(1))
}).Should(Equal(5)) // snapshot interval is 1 KB, every block triggers snapshot
ordererProc.Signal(syscall.SIGKILL)
Eventually(ordererProc.Wait(), network.EventuallyTimeout).Should(Receive())
......
......@@ -90,7 +90,7 @@ Profiles:{{ range .Profiles }}
{{- if eq $w.Consensus.Type "etcdraft" }}
EtcdRaft:
Options:
SnapshotInterval: 5
SnapshotInterval: 1 KB
Consenters:{{ range .Orderers }}{{ with $w.Orderer . }}
- Host: 127.0.0.1
Port: {{ $w.OrdererPort . "Listen" }}
......
......@@ -30,10 +30,25 @@ import (
"github.com/pkg/errors"
)
// DefaultSnapshotCatchUpEntries is the default number of entries
// to preserve in memory when a snapshot is taken. This is for
// slow followers to catch up.
const DefaultSnapshotCatchUpEntries = uint64(500)
const (
BYTE = 1 << (10 * iota)
KILOBYTE
MEGABYTE
GIGABYTE
TERABYTE
)
const (
// DefaultSnapshotCatchUpEntries is the default number of entries
// to preserve in memory when a snapshot is taken. This is for
// slow followers to catch up.
DefaultSnapshotCatchUpEntries = uint64(20)
// DefaultSnapshotInterval is the default snapshot interval. It is
// used if SnapshotInterval is not provided in channel config options.
// It is needed to enforce snapshot being set.
DefaultSnapshotInterval = 100 * MEGABYTE // 100MB
)
//go:generate mockery -dir . -name Configurator -case underscore -output ./mocks/
......@@ -71,7 +86,7 @@ type Options struct {
WALDir string
SnapDir string
SnapInterval uint64
SnapInterval uint32
// This is configurable mainly for testing purpose. Users are not
// expected to alter this. Instead, DefaultSnapshotCatchUpEntries is used.
......@@ -134,6 +149,8 @@ type Chain struct {
appliedIndex uint64
// needed by snapshotting
sizeLimit uint32 // SnapshotInterval in bytes
accDataSize uint32 // accumulative data size since last snapshot
lastSnapBlockNum uint64
confState raftpb.ConfState // Etcdraft requires ConfState to be persisted within snapshot
......@@ -170,6 +187,11 @@ func NewChain(
storage.SnapshotCatchUpEntries = opts.SnapshotCatchUpEntries
}
sizeLimit := opts.SnapInterval
if sizeLimit == 0 {
sizeLimit = DefaultSnapshotInterval
}
// get block number in last snapshot, if exists
var snapBlkNum uint64
if s := storage.Snapshot(); !raft.IsEmptySnap(s) {
......@@ -194,6 +216,7 @@ func NewChain(
support: support,
fresh: fresh,
appliedIndex: opts.RaftMetadata.RaftIndex,
sizeLimit: sizeLimit,
lastSnapBlockNum: snapBlkNum,
createPuller: f,
clock: opts.Clock,
......@@ -742,6 +765,7 @@ func (c *Chain) apply(ents []raftpb.Entry) {
appliedb = block.Header.Number
position = i
c.accDataSize += uint32(len(ents[i].Data))
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
......@@ -779,19 +803,21 @@ func (c *Chain) apply(ents []raftpb.Entry) {
}
}
if c.opts.SnapInterval == 0 || appliedb == 0 {
// snapshot is not enabled (SnapInterval == 0) or
if appliedb == 0 {
// no block has been written (appliedb == 0) in this round
return
}
if appliedb-c.lastSnapBlockNum >= c.opts.SnapInterval {
if c.accDataSize >= c.sizeLimit {
select {
case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
c.logger.Infof("Taking snapshot at block %d, last snapshotted block number is %d", appliedb, c.lastSnapBlockNum)
c.logger.Infof("Accumulated %d bytes since last snapshot, exceeding size limit (%d bytes), "+
"taking snapshot at block %d, last snapshotted block number is %d",
c.accDataSize, c.sizeLimit, appliedb, c.lastSnapBlockNum)
c.accDataSize = 0
c.lastSnapBlockNum = appliedb
default:
c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapInterval is too small")
c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapshotInterval is too small")
}
}
......
This diff is collapsed.
......@@ -32,7 +32,7 @@ func (m *Metadata) Reset() { *m = Metadata{} }
func (m *Metadata) String() string { return proto.CompactTextString(m) }
func (*Metadata) ProtoMessage() {}
func (*Metadata) Descriptor() ([]byte, []int) {
return fileDescriptor_configuration_7287b6fe5795d8b9, []int{0}
return fileDescriptor_configuration_030e547fa9cbed9e, []int{0}
}
func (m *Metadata) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Metadata.Unmarshal(m, b)
......@@ -81,7 +81,7 @@ func (m *Consenter) Reset() { *m = Consenter{} }
func (m *Consenter) String() string { return proto.CompactTextString(m) }
func (*Consenter) ProtoMessage() {}
func (*Consenter) Descriptor() ([]byte, []int) {
return fileDescriptor_configuration_7287b6fe5795d8b9, []int{1}
return fileDescriptor_configuration_030e547fa9cbed9e, []int{1}
}
func (m *Consenter) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Consenter.Unmarshal(m, b)
......@@ -132,12 +132,13 @@ func (m *Consenter) GetServerTlsCert() []byte {
// Options to be specified for all the etcd/raft nodes. These can be modified on a
// per-channel basis.
type Options struct {
TickInterval uint64 `protobuf:"varint,1,opt,name=tick_interval,json=tickInterval,proto3" json:"tick_interval,omitempty"`
ElectionTick uint32 `protobuf:"varint,2,opt,name=election_tick,json=electionTick,proto3" json:"election_tick,omitempty"`
HeartbeatTick uint32 `protobuf:"varint,3,opt,name=heartbeat_tick,json=heartbeatTick,proto3" json:"heartbeat_tick,omitempty"`
MaxInflightMsgs uint32 `protobuf:"varint,4,opt,name=max_inflight_msgs,json=maxInflightMsgs,proto3" json:"max_inflight_msgs,omitempty"`
MaxSizePerMsg uint64 `protobuf:"varint,5,opt,name=max_size_per_msg,json=maxSizePerMsg,proto3" json:"max_size_per_msg,omitempty"`
SnapshotInterval uint64 `protobuf:"varint,6,opt,name=snapshot_interval,json=snapshotInterval,proto3" json:"snapshot_interval,omitempty"`
TickInterval uint64 `protobuf:"varint,1,opt,name=tick_interval,json=tickInterval,proto3" json:"tick_interval,omitempty"`
ElectionTick uint32 `protobuf:"varint,2,opt,name=election_tick,json=electionTick,proto3" json:"election_tick,omitempty"`
HeartbeatTick uint32 `protobuf:"varint,3,opt,name=heartbeat_tick,json=heartbeatTick,proto3" json:"heartbeat_tick,omitempty"`
MaxInflightMsgs uint32 `protobuf:"varint,4,opt,name=max_inflight_msgs,json=maxInflightMsgs,proto3" json:"max_inflight_msgs,omitempty"`
MaxSizePerMsg uint64 `protobuf:"varint,5,opt,name=max_size_per_msg,json=maxSizePerMsg,proto3" json:"max_size_per_msg,omitempty"`
// Take snapshot when cumulative data exceeds certain size in bytes.
SnapshotInterval uint32 `protobuf:"varint,6,opt,name=snapshot_interval,json=snapshotInterval,proto3" json:"snapshot_interval,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
......@@ -147,7 +148,7 @@ func (m *Options) Reset() { *m = Options{} }
func (m *Options) String() string { return proto.CompactTextString(m) }
func (*Options) ProtoMessage() {}
func (*Options) Descriptor() ([]byte, []int) {
return fileDescriptor_configuration_7287b6fe5795d8b9, []int{2}
return fileDescriptor_configuration_030e547fa9cbed9e, []int{2}
}
func (m *Options) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Options.Unmarshal(m, b)
......@@ -202,7 +203,7 @@ func (m *Options) GetMaxSizePerMsg() uint64 {
return 0
}
func (m *Options) GetSnapshotInterval() uint64 {
func (m *Options) GetSnapshotInterval() uint32 {
if m != nil {
return m.SnapshotInterval
}
......@@ -232,7 +233,7 @@ func (m *RaftMetadata) Reset() { *m = RaftMetadata{} }
func (m *RaftMetadata) String() string { return proto.CompactTextString(m) }
func (*RaftMetadata) ProtoMessage() {}
func (*RaftMetadata) Descriptor() ([]byte, []int) {
return fileDescriptor_configuration_7287b6fe5795d8b9, []int{3}
return fileDescriptor_configuration_030e547fa9cbed9e, []int{3}
}
func (m *RaftMetadata) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RaftMetadata.Unmarshal(m, b)
......@@ -289,10 +290,10 @@ func init() {
}
func init() {
proto.RegisterFile("orderer/etcdraft/configuration.proto", fileDescriptor_configuration_7287b6fe5795d8b9)
proto.RegisterFile("orderer/etcdraft/configuration.proto", fileDescriptor_configuration_030e547fa9cbed9e)
}
var fileDescriptor_configuration_7287b6fe5795d8b9 = []byte{
var fileDescriptor_configuration_030e547fa9cbed9e = []byte{
// 535 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x93, 0x4d, 0x6b, 0x1b, 0x3d,
0x14, 0x85, 0x99, 0x78, 0xf2, 0xa5, 0x78, 0x5e, 0xdb, 0x7a, 0x37, 0xa6, 0x50, 0x30, 0x6e, 0x9b,
......@@ -315,17 +316,17 @@ var fileDescriptor_configuration_7287b6fe5795d8b9 = []byte{
0x0b, 0x91, 0xe5, 0x96, 0xae, 0x4d, 0x66, 0x7c, 0xcc, 0x88, 0xf4, 0xd6, 0x6c, 0x37, 0xab, 0xf5,
0xb9, 0xc9, 0x0c, 0x7e, 0x8b, 0xfa, 0x8e, 0x35, 0xe2, 0x11, 0x68, 0x09, 0xda, 0xb1, 0xc3, 0x53,
0x9f, 0x2f, 0x5a, 0xb3, 0xdd, 0x0f, 0xf1, 0x08, 0xdf, 0x41, 0xcf, 0x4d, 0x86, 0x6f, 0xd1, 0xc0,
0x48, 0x56, 0x9a, 0x5c, 0xd9, 0xc3, 0x4d, 0xce, 0x3c, 0xd9, 0x6f, 0x8c, 0xe6, 0x36, 0xe3, 0xdf,
0x27, 0xa8, 0x4b, 0xd8, 0xd2, 0xb6, 0x73, 0xff, 0xfa, 0xcc, 0xdc, 0xaf, 0x0f, 0x53, 0x3c, 0x66,
0x0f, 0x4b, 0x60, 0xbe, 0x48, 0xab, 0xf7, 0xff, 0xac, 0xc2, 0x0d, 0x1a, 0x48, 0xd8, 0x59, 0xda,
0x4a, 0x54, 0xa4, 0xfe, 0xa9, 0x42, 0xd2, 0x73, 0x46, 0x5b, 0x3b, 0x4b, 0xf1, 0x7b, 0x84, 0xdd,
0x62, 0x52, 0x9e, 0x33, 0x99, 0x01, 0xe5, 0x6a, 0x23, 0xad, 0xf1, 0x2f, 0x16, 0x92, 0xbe, 0x73,
0xa6, 0xde, 0x98, 0x7a, 0x1d, 0xbf, 0x44, 0xc8, 0x45, 0xa1, 0x42, 0xa6, 0xb0, 0xf3, 0xaf, 0x15,
0x92, 0x4b, 0xa7, 0xcc, 0x9c, 0xf0, 0x82, 0xa0, 0xde, 0x93, 0x5c, 0xb8, 0x8f, 0x3a, 0x2b, 0xd8,
0xd7, 0xd3, 0x74, 0x47, 0xfc, 0x0e, 0x9d, 0x6e, 0x59, 0xb1, 0x81, 0x7a, 0x4d, 0x9f, 0x5d, 0xec,
0x8a, 0xf8, 0x74, 0xf2, 0x31, 0xf8, 0x9c, 0xa1, 0x58, 0xe9, 0x2c, 0xce, 0xf7, 0x25, 0xe8, 0x02,
0xd2, 0x0c, 0x74, 0xbc, 0x64, 0x0b, 0x2d, 0x78, 0xf5, 0x85, 0x4c, 0x5c, 0x7f, 0xb4, 0xb6, 0xcd,
0xcf, 0x0f, 0x99, 0xb0, 0xf9, 0x66, 0x11, 0x73, 0xb5, 0x4e, 0x8e, 0xca, 0x92, 0xaa, 0x2c, 0xa9,
0xca, 0x92, 0xa7, 0xff, 0x73, 0x71, 0xe6, 0x8d, 0xfb, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x56,
0xd3, 0xd9, 0x86, 0xba, 0x03, 0x00, 0x00,
0x48, 0x56, 0x9a, 0x5c, 0xd9, 0xc3, 0x4d, 0xce, 0x7c, 0xd3, 0x7e, 0x63, 0x34, 0xb7, 0x19, 0xff,
0x3e, 0x41, 0x5d, 0xc2, 0x96, 0xb6, 0x9d, 0xfb, 0xd7, 0x67, 0xe6, 0x7e, 0x7d, 0x98, 0xe2, 0x31,
0x7b, 0x58, 0x02, 0xf3, 0x45, 0x5a, 0xbd, 0xff, 0x67, 0x15, 0x6e, 0xd0, 0x40, 0xc2, 0xce, 0xd2,
0x56, 0xa2, 0x22, 0xf5, 0x4f, 0x15, 0x92, 0x9e, 0x33, 0xda, 0xda, 0x59, 0x8a, 0xdf, 0x23, 0xec,
0x16, 0x93, 0xf2, 0x9c, 0xc9, 0x0c, 0x28, 0x57, 0x1b, 0x69, 0x8d, 0x7f, 0xb1, 0x90, 0xf4, 0x9d,
0x33, 0xf5, 0xc6, 0xd4, 0xeb, 0xf8, 0x25, 0x42, 0x2e, 0x0a, 0x15, 0x32, 0x85, 0x9d, 0x7f, 0xad,
0x90, 0x5c, 0x3a, 0x65, 0xe6, 0x84, 0x17, 0x04, 0xf5, 0x9e, 0xe4, 0xc2, 0x7d, 0xd4, 0x59, 0xc1,
0xbe, 0x9e, 0xa6, 0x3b, 0xe2, 0x77, 0xe8, 0x74, 0xcb, 0x8a, 0x0d, 0xd4, 0x6b, 0xfa, 0xec, 0x62,
0x57, 0xc4, 0xa7, 0x93, 0x8f, 0xc1, 0xe7, 0x0c, 0xc5, 0x4a, 0x67, 0x71, 0xbe, 0x2f, 0x41, 0x17,
0x90, 0x66, 0xa0, 0xe3, 0x25, 0x5b, 0x68, 0xc1, 0xab, 0x2f, 0x64, 0xe2, 0xfa, 0xa3, 0xb5, 0x6d,
0x7e, 0x7e, 0xc8, 0x84, 0xcd, 0x37, 0x8b, 0x98, 0xab, 0x75, 0x72, 0x54, 0x96, 0x54, 0x65, 0x49,
0x55, 0x96, 0x3c, 0xfd, 0x9f, 0x8b, 0x33, 0x6f, 0xdc, 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0xfb,
0x39, 0x7a, 0xda, 0xba, 0x03, 0x00, 0x00,
}
......@@ -34,7 +34,8 @@ message Options {
uint32 heartbeat_tick = 3;
uint32 max_inflight_msgs = 4;
uint64 max_size_per_msg = 5;
uint64 snapshot_interval = 6; // take snapshot every n blocks
// Take snapshot when cumulative data exceeds certain size in bytes.
uint32 snapshot_interval = 6;
}
// RaftMetadata stores data used by the Raft OSNs when
......
......@@ -339,8 +339,8 @@ Orderer: &OrdererDefaults
# the throughput during normal replication.
MaxSizePerMsg: 1048576
# SnapshotInterval defines number of blocks per which a snapshot is taken
SnapshotInterval: 500
# SnapshotInterval defines number of bytes per which a snapshot is taken
SnapshotInterval: 100 MB
# Organizations lists the orgs participating on the orderer side of the
# network.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment