Commit d645c833 authored by Jason Yellick's avatar Jason Yellick
Browse files

FAB-14619 Rename Raft metadata protos



There are presently two etcdraft protos around metadata.  One is the
metadata stored in the config and it is named 'Metadata', the other is
the metadata stored in each block, this is named 'RaftMetadata'.  This
causes confusion when reading the code.  This CR transforms those names
to be:

 Metadata -> ConfigMetadata
 RaftMetadata -> BlockMetadata

Change-Id: Ia0394ebe78f5541996c010c3c67d760f336f75d8
Signed-off-by: default avatarJason Yellick <jyellick@us.ibm.com>
parent 1ac057f5
......@@ -227,7 +227,7 @@ var _ = Describe("Encoder", func() {
Context("when the consensus type is etcd/raft", func() {
BeforeEach(func() {
conf.OrdererType = "etcdraft"
conf.EtcdRaft = &etcdraft.Metadata{
conf.EtcdRaft = &etcdraft.ConfigMetadata{
Options: &etcdraft.Options{
TickInterval: "500ms",
},
......@@ -242,7 +242,7 @@ var _ = Describe("Encoder", func() {
err = proto.Unmarshal(cg.Values["ConsensusType"].Value, consensusType)
Expect(err).NotTo(HaveOccurred())
Expect(consensusType.Type).To(Equal("etcdraft"))
metadata := &etcdraft.Metadata{}
metadata := &etcdraft.ConfigMetadata{}
err = proto.Unmarshal(consensusType.Metadata, metadata)
Expect(err).NotTo(HaveOccurred())
Expect(metadata.Options.TickInterval).To(Equal("500ms"))
......@@ -250,7 +250,7 @@ var _ = Describe("Encoder", func() {
Context("when the raft configuration is bad", func() {
BeforeEach(func() {
conf.EtcdRaft = &etcdraft.Metadata{
conf.EtcdRaft = &etcdraft.ConfigMetadata{
Consenters: []*etcdraft.Consenter{
{},
},
......
......@@ -155,16 +155,16 @@ type AnchorPeer struct {
// Orderer contains configuration which is used for the
// bootstrapping of an orderer by the provisional bootstrapper.
type Orderer struct {
OrdererType string `yaml:"OrdererType"`
Addresses []string `yaml:"Addresses"`
BatchTimeout time.Duration `yaml:"BatchTimeout"`
BatchSize BatchSize `yaml:"BatchSize"`
Kafka Kafka `yaml:"Kafka"`
EtcdRaft *etcdraft.Metadata `yaml:"EtcdRaft"`
Organizations []*Organization `yaml:"Organizations"`
MaxChannels uint64 `yaml:"MaxChannels"`
Capabilities map[string]bool `yaml:"Capabilities"`
Policies map[string]*Policy `yaml:"Policies"`
OrdererType string `yaml:"OrdererType"`
Addresses []string `yaml:"Addresses"`
BatchTimeout time.Duration `yaml:"BatchTimeout"`
BatchSize BatchSize `yaml:"BatchSize"`
Kafka Kafka `yaml:"Kafka"`
EtcdRaft *etcdraft.ConfigMetadata `yaml:"EtcdRaft"`
Organizations []*Organization `yaml:"Organizations"`
MaxChannels uint64 `yaml:"MaxChannels"`
Capabilities map[string]bool `yaml:"Capabilities"`
Policies map[string]*Policy `yaml:"Policies"`
}
// BatchSize contains configuration affecting the size of batches.
......@@ -192,7 +192,7 @@ var genesisDefaults = TopLevel{
Kafka: Kafka{
Brokers: []string{"127.0.0.1:9092"},
},
EtcdRaft: &etcdraft.Metadata{
EtcdRaft: &etcdraft.ConfigMetadata{
Options: &etcdraft.Options{
TickInterval: "500ms",
ElectionTick: 10,
......
......@@ -132,7 +132,7 @@ func TestConsensusSpecificInit(t *testing.T) {
return &Profile{
Orderer: &Orderer{
OrdererType: "etcdraft",
EtcdRaft: &etcdraft.Metadata{
EtcdRaft: &etcdraft.ConfigMetadata{
Consenters: consenters,
Options: options,
},
......
......@@ -289,7 +289,7 @@ var _ = Describe("EndToEnd", func() {
numOfSnaps := len(files)
nwo.UpdateConsensusMetadata(network, peer, orderer, channel, func(originalMetadata []byte) []byte {
metadata := &etcdraft.Metadata{}
metadata := &etcdraft.ConfigMetadata{}
err := proto.Unmarshal(originalMetadata, metadata)
Expect(err).NotTo(HaveOccurred())
......
......@@ -201,7 +201,7 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
certificateRotations := refreshOrdererPEMs(network)
swap := func(o *nwo.Orderer, certificate []byte, c etcdraft.Consenter) {
nwo.UpdateEtcdRaftMetadata(network, peer, o, network.SystemChannel.Name, func(metadata *etcdraft.Metadata) {
nwo.UpdateEtcdRaftMetadata(network, peer, o, network.SystemChannel.Name, func(metadata *etcdraft.ConfigMetadata) {
var newConsenters []*etcdraft.Consenter
for _, consenter := range metadata.Consenters {
if bytes.Equal(consenter.ClientTlsCert, certificate) || bytes.Equal(consenter.ServerTlsCert, certificate) {
......@@ -861,7 +861,7 @@ var _ = Describe("EndToEnd reconfiguration and onboarding", func() {
By("Submitting another tx to increment Raft index on alive orderers")
nwo.UpdateConsensusMetadata(network, peer, orderers[4], network.SystemChannel.Name, func(originalMetadata []byte) []byte {
metadata := &etcdraft.Metadata{}
metadata := &etcdraft.ConfigMetadata{}
err := proto.Unmarshal(originalMetadata, metadata)
Expect(err).NotTo(HaveOccurred())
......
......@@ -258,7 +258,7 @@ var _ bool = Describe("Kafka2RaftMigration", func() {
Expect(filepath.Join(network.RootDir, tlsPath)).To(Equal(network.OrdererLocalTLSDir(orderer)))
certBytes, err := ioutil.ReadFile(filepath.Join(network.RootDir, tlsPath, "server.crt"))
Expect(err).NotTo(HaveOccurred())
raft_metadata := &protosraft.Metadata{
raft_metadata := &protosraft.ConfigMetadata{
Consenters: []*protosraft.Consenter{
{
ClientTlsCert: certBytes,
......@@ -277,7 +277,7 @@ var _ bool = Describe("Kafka2RaftMigration", func() {
}}
raft_metadata_bytes := utils.MarshalOrPanic(raft_metadata)
raft_metadata2 := &protosraft.Metadata{}
raft_metadata2 := &protosraft.ConfigMetadata{}
errUnma := proto.Unmarshal(raft_metadata_bytes, raft_metadata2)
Expect(errUnma).NotTo(HaveOccurred())
......
......@@ -245,14 +245,14 @@ func UnmarshalBlockFromFile(blockFile string) *common.Block {
// AddConsenter adds a new consenter to the given channel
func AddConsenter(n *Network, peer *Peer, orderer *Orderer, channel string, consenter ectdraft_protos.Consenter) {
UpdateEtcdRaftMetadata(n, peer, orderer, channel, func(metadata *ectdraft_protos.Metadata) {
UpdateEtcdRaftMetadata(n, peer, orderer, channel, func(metadata *ectdraft_protos.ConfigMetadata) {
metadata.Consenters = append(metadata.Consenters, &consenter)
})
}
// RemoveConsenter removes a consenter with the given certificate in PEM format from the given channel
func RemoveConsenter(n *Network, peer *Peer, orderer *Orderer, channel string, certificate []byte) {
UpdateEtcdRaftMetadata(n, peer, orderer, channel, func(metadata *ectdraft_protos.Metadata) {
UpdateEtcdRaftMetadata(n, peer, orderer, channel, func(metadata *ectdraft_protos.ConfigMetadata) {
var newConsenters []*ectdraft_protos.Consenter
for _, consenter := range metadata.Consenters {
if bytes.Equal(consenter.ClientTlsCert, certificate) || bytes.Equal(consenter.ServerTlsCert, certificate) {
......@@ -289,9 +289,9 @@ func UpdateConsensusMetadata(network *Network, peer *Peer, orderer *Orderer, cha
}
// UpdateEtcdRaftMetadata executes a config update that updates the etcdraft metadata according to the given function f
func UpdateEtcdRaftMetadata(network *Network, peer *Peer, orderer *Orderer, channel string, f func(md *ectdraft_protos.Metadata)) {
func UpdateEtcdRaftMetadata(network *Network, peer *Peer, orderer *Orderer, channel string, f func(md *ectdraft_protos.ConfigMetadata)) {
UpdateConsensusMetadata(network, peer, orderer, channel, func(originalMetadata []byte) []byte {
metadata := &ectdraft_protos.Metadata{}
metadata := &ectdraft_protos.ConfigMetadata{}
err := proto.Unmarshal(originalMetadata, metadata)
Expect(err).NotTo(HaveOccurred())
......
......@@ -880,7 +880,7 @@ func injectConsenterCertificate(t *testing.T, block *common.Block, tlsCert []byt
consensus := confEnv.Config.ChannelGroup.Groups[channelconfig.OrdererGroupKey].Values[channelconfig.ConsensusTypeKey]
consensus.Value = utils.MarshalOrPanic(&orderer.ConsensusType{
Type: "etcdraft",
Metadata: utils.MarshalOrPanic(&etcdraft.Metadata{
Metadata: utils.MarshalOrPanic(&etcdraft.ConfigMetadata{
Consenters: []*etcdraft.Consenter{
{
ServerTlsCert: tlsCert,
......
......@@ -112,9 +112,9 @@ type Options struct {
MaxSizePerMsg uint64
MaxInflightMsgs int
RaftMetadata *etcdraft.RaftMetadata
Metrics *Metrics
Cert []byte
BlockMetadata *etcdraft.BlockMetadata
Metrics *Metrics
Cert []byte
EvictionSuspicion time.Duration
LeaderCheckInterval time.Duration
......@@ -247,7 +247,7 @@ func NewChain(
observeC: observeC,
support: support,
fresh: fresh,
appliedIndex: opts.RaftMetadata.RaftIndex,
appliedIndex: opts.BlockMetadata.RaftIndex,
lastBlock: b,
sizeLimit: sizeLimit,
lastSnapBlockNum: snapBlkNum,
......@@ -297,7 +297,7 @@ func NewChain(
config: config,
tickInterval: c.opts.TickInterval,
clock: c.clock,
metadata: c.opts.RaftMetadata,
metadata: c.opts.BlockMetadata,
}
return c, nil
......@@ -313,7 +313,7 @@ func (c *Chain) MigrationStatus() migration.Status {
func (c *Chain) Start() {
c.logger.Infof("Starting Raft node")
c.Metrics.ClusterSize.Set(float64(len(c.opts.RaftMetadata.Consenters)))
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.Consenters)))
// all nodes start out as followers
c.Metrics.IsLeader.Set(float64(0))
if err := c.configureComm(); err != nil {
......@@ -719,7 +719,7 @@ func (c *Chain) serveRequest() {
select {
case <-c.errorC:
default:
nodeCount := len(c.opts.RaftMetadata.Consenters)
nodeCount := len(c.opts.BlockMetadata.Consenters)
// Only close the error channel (to signal the broadcast/deliver front-end a consensus backend error)
// If we are a cluster of size 3 or more, otherwise we can't expand a cluster of size 1 to 2 nodes.
if nodeCount > 2 {
......@@ -835,8 +835,8 @@ func (c *Chain) writeBlock(block *common.Block, index uint64) {
}
c.raftMetadataLock.Lock()
c.opts.RaftMetadata.RaftIndex = index
m := utils.MarshalOrPanic(c.opts.RaftMetadata)
c.opts.BlockMetadata.RaftIndex = index
m := utils.MarshalOrPanic(c.opts.BlockMetadata)
c.raftMetadataLock.Unlock()
c.support.WriteBlock(block, m)
......@@ -938,13 +938,13 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
if utils.IsConfigBlock(block) {
c.support.WriteConfigBlock(block, nil)
cc, raftMetadata, rotate := c.detectConfChange(block)
confChange, blockMetadata, rotate := c.detectConfChange(block)
if cc != nil || rotate != 0 {
if confChange != nil || rotate != 0 {
c.logger.Infof("Config block %d changes consenter set, communication should be reconfigured", block.Header.Number)
c.raftMetadataLock.Lock()
c.opts.RaftMetadata = raftMetadata
c.opts.BlockMetadata = blockMetadata
c.raftMetadataLock.Unlock()
if err := c.configureComm(); err != nil {
......@@ -963,28 +963,28 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
return nil
}
func (c *Chain) detectConfChange(block *common.Block) (*raftpb.ConfChange, *etcdraft.RaftMetadata, uint64) {
func (c *Chain) detectConfChange(block *common.Block) (*raftpb.ConfChange, *etcdraft.BlockMetadata, uint64) {
// If config is targeting THIS channel, inspect consenter set and
// propose raft ConfChange if it adds/removes node.
metadata, raftMetadata := c.newRaftMetadata(block)
configMetadata, blockMetadata := c.newMetadata(block)
if metadata != nil && metadata.Options != nil && metadata.Options.SnapshotInterval != 0 {
if configMetadata != nil && configMetadata.Options != nil && configMetadata.Options.SnapshotInterval != 0 {
old := c.sizeLimit
c.sizeLimit = metadata.Options.SnapshotInterval
c.sizeLimit = configMetadata.Options.SnapshotInterval
c.logger.Infof("Snapshot interval is updated to %d bytes (was %d)", c.sizeLimit, old)
}
var changes *MembershipChanges
if metadata != nil {
changes = ComputeMembershipChanges(raftMetadata.Consenters, metadata.Consenters)
if configMetadata != nil {
changes = ComputeMembershipChanges(blockMetadata.Consenters, configMetadata.Consenters)
}
confChange, rotate := changes.UpdateRaftMetadataAndConfChange(raftMetadata)
confChange, rotate := changes.UpdateRaftMetadataAndConfChange(blockMetadata)
if rotate != 0 {
c.logger.Infof("Config block %d rotates TLS certificate of node %d", block.Header.Number, rotate)
}
return confChange, raftMetadata, rotate
return confChange, blockMetadata, rotate
}
func (c *Chain) apply(ents []raftpb.Entry) {
......@@ -1051,7 +1051,7 @@ func (c *Chain) apply(ents []raftpb.Entry) {
c.confChangeInProgress = nil
c.configInflight = false
// report the new cluster size
c.Metrics.ClusterSize.Set(float64(len(c.opts.RaftMetadata.Consenters)))
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.Consenters)))
}
if cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID {
......@@ -1127,7 +1127,7 @@ func (c *Chain) configureComm() error {
func (c *Chain) remotePeers() ([]cluster.RemoteNode, error) {
var nodes []cluster.RemoteNode
for raftID, consenter := range c.opts.RaftMetadata.Consenters {
for raftID, consenter := range c.opts.BlockMetadata.Consenters {
// No need to know yourself
if raftID == c.raftID {
continue
......@@ -1172,7 +1172,7 @@ func (c *Chain) checkConsentersSet(configValue *common.ConfigValue) error {
}
c.raftMetadataLock.RLock()
changes := ComputeMembershipChanges(c.opts.RaftMetadata.Consenters, updatedMetadata.Consenters)
changes := ComputeMembershipChanges(c.opts.BlockMetadata.Consenters, updatedMetadata.Consenters)
c.raftMetadataLock.RUnlock()
// Adding and removing 1 node is considered as certificate rotation, which is allowed.
......@@ -1200,15 +1200,15 @@ func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
switch common.HeaderType(hdr.Type) {
case common.HeaderType_CONFIG:
confChange, raftMetadata, rotate := c.detectConfChange(block)
raftMetadata.RaftIndex = index
confChange, blockMetadata, rotate := c.detectConfChange(block)
blockMetadata.RaftIndex = index
raftMetadataBytes := utils.MarshalOrPanic(raftMetadata)
blockMetadataBytes := utils.MarshalOrPanic(blockMetadata)
// write block with metadata
c.support.WriteConfigBlock(block, raftMetadataBytes)
c.support.WriteConfigBlock(block, blockMetadataBytes)
c.raftMetadataLock.Lock()
c.opts.RaftMetadata = raftMetadata
c.opts.BlockMetadata = blockMetadata
c.raftMetadataLock.Unlock()
// update membership
......@@ -1245,8 +1245,8 @@ func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
case common.HeaderType_ORDERER_TRANSACTION:
// If this config is channel creation, no extra inspection is needed
c.raftMetadataLock.Lock()
c.opts.RaftMetadata.RaftIndex = index
m := utils.MarshalOrPanic(c.opts.RaftMetadata)
c.opts.BlockMetadata.RaftIndex = index
m := utils.MarshalOrPanic(c.opts.BlockMetadata)
c.raftMetadataLock.Unlock()
c.support.WriteConfigBlock(block, m)
......@@ -1273,8 +1273,8 @@ func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
}
// Detect if it is a restart right after consensus-type migration. If yes, return early in order to avoid using
// the block metadata as etcdraft.RaftMetadata (see below). Right after migration the block metadata will carry
// Kafka metadata. The etcdraft.RaftMetadata should be extracted from the ConsensusType.Metadata, instead.
// the block metadata as etcdraft.BlockMetadata (see below). Right after migration the block metadata will carry
// Kafka metadata. The etcdraft.BlockMetadata should be extracted from the ConsensusType.Metadata, instead.
if c.detectMigration() {
c.logger.Infof("[channel: %s], Restarting after consensus-type migration. Type: %s, just starting the chain.",
c.support.ChainID(), c.support.SharedConfig().ConsensusType())
......@@ -1284,7 +1284,7 @@ func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
// extracting current Raft configuration state
confState := c.Node.ApplyConfChange(raftpb.ConfChange{})
if len(confState.Nodes) == len(c.opts.RaftMetadata.Consenters) {
if len(confState.Nodes) == len(c.opts.BlockMetadata.Consenters) {
// since configuration change could only add one node or
// remove one node at a time, if raft nodes state size
// equal to membership stored in block metadata field,
......@@ -1293,16 +1293,16 @@ func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
return nil
}
return ConfChange(c.opts.RaftMetadata, confState)
return ConfChange(c.opts.BlockMetadata, confState)
}
// newRaftMetadata extract raft metadata from the configuration block
func (c *Chain) newRaftMetadata(block *common.Block) (*etcdraft.Metadata, *etcdraft.RaftMetadata) {
// newMetadata extract raft metadata from the configuration block
func (c *Chain) newMetadata(block *common.Block) (*etcdraft.ConfigMetadata, *etcdraft.BlockMetadata) {
metadata, err := ConsensusMetadataFromConfigBlock(block)
if err != nil {
c.logger.Panicf("error reading consensus metadata: %s", err)
}
raftMetadata := proto.Clone(c.opts.RaftMetadata).(*etcdraft.RaftMetadata)
raftMetadata := proto.Clone(c.opts.BlockMetadata).(*etcdraft.BlockMetadata)
// proto.Clone doesn't copy an empty map, hence need to initialize it after
// cloning
if raftMetadata.Consenters == nil {
......
......@@ -91,7 +91,7 @@ var _ = Describe("Chain", func() {
Describe("Single Raft node", func() {
var (
configurator *mocks.Configurator
consenterMetadata *raftprotos.Metadata
consenterMetadata *raftprotos.ConfigMetadata
clock *fakeclock.FakeClock
opts etcdraft.Options
support *consensusmocks.FakeConsenterSupport
......@@ -133,7 +133,7 @@ var _ = Describe("Chain", func() {
support.HeightReturns(1)
support.BlockReturns(getSeedBlock())
meta := &raftprotos.RaftMetadata{
meta := &raftprotos.BlockMetadata{
Consenters: map[uint64]*raftprotos.Consenter{},
NextConsenterId: 1,
}
......@@ -153,7 +153,7 @@ var _ = Describe("Chain", func() {
HeartbeatTick: HEARTBEAT_TICK,
MaxSizePerMsg: 1024 * 1024,
MaxInflightMsgs: 256,
RaftMetadata: meta,
BlockMetadata: meta,
Logger: logger,
MemoryStorage: storage,
WALDir: walDir,
......@@ -660,7 +660,7 @@ var _ = Describe("Chain", func() {
Context("updating consenters set by exactly one node", func() {
It("should be able to process config update adding single node", func() {
metadata := proto.Clone(consenterMetadata).(*raftprotos.Metadata)
metadata := proto.Clone(consenterMetadata).(*raftprotos.ConfigMetadata)
metadata.Consenters = append(metadata.Consenters, &raftprotos.Consenter{
Host: "localhost",
Port: 7050,
......@@ -686,7 +686,7 @@ var _ = Describe("Chain", func() {
})
It("should be able to process config update removing single node", func() {
metadata := proto.Clone(consenterMetadata).(*raftprotos.Metadata)
metadata := proto.Clone(consenterMetadata).(*raftprotos.ConfigMetadata)
// Remove one of the consenters
metadata.Consenters = metadata.Consenters[1:]
values := map[string]*common.ConfigValue{
......@@ -711,13 +711,13 @@ var _ = Describe("Chain", func() {
Describe("Crash Fault Tolerance", func() {
var (
raftMetadata *raftprotos.RaftMetadata
raftMetadata *raftprotos.BlockMetadata
)
BeforeEach(func() {
tlsCA, _ := tlsgen.NewCA()
raftMetadata = &raftprotos.RaftMetadata{
raftMetadata = &raftprotos.BlockMetadata{
Consenters: map[uint64]*raftprotos.Consenter{
1: {
Host: "localhost",
......@@ -732,8 +732,8 @@ var _ = Describe("Chain", func() {
Describe("when a chain is started with existing WAL", func() {
var (
m1 *raftprotos.RaftMetadata
m2 *raftprotos.RaftMetadata
m1 *raftprotos.BlockMetadata
m2 *raftprotos.BlockMetadata
)
JustBeforeEach(func() {
// to generate WAL data, we start a chain,
......@@ -747,7 +747,7 @@ var _ = Describe("Chain", func() {
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
_, metadata := support.WriteBlockArgsForCall(0)
m1 = &raftprotos.RaftMetadata{}
m1 = &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m1)
err = chain.Order(env, uint64(0))
......@@ -755,7 +755,7 @@ var _ = Describe("Chain", func() {
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
_, metadata = support.WriteBlockArgsForCall(1)
m2 = &raftprotos.RaftMetadata{}
m2 = &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m2)
chain.Halt()
......@@ -770,12 +770,12 @@ var _ = Describe("Chain", func() {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
_, metadata := c.support.WriteBlockArgsForCall(0)
m := &raftprotos.RaftMetadata{}
m := &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
Expect(m.RaftIndex).To(Equal(m1.RaftIndex))
_, metadata = c.support.WriteBlockArgsForCall(1)
m = &raftprotos.RaftMetadata{}
m = &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
Expect(m.RaftIndex).To(Equal(m2.RaftIndex))
......@@ -802,7 +802,7 @@ var _ = Describe("Chain", func() {
Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
_, metadata := c.support.WriteBlockArgsForCall(1)
m := &raftprotos.RaftMetadata{}
m := &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
Expect(m.RaftIndex).To(Equal(m2.RaftIndex))
......@@ -997,7 +997,7 @@ var _ = Describe("Chain", func() {
Expect(chain.Order(env, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
_, metadata := support.WriteBlockArgsForCall(0)
m := &raftprotos.RaftMetadata{}
m := &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
......@@ -1039,7 +1039,7 @@ var _ = Describe("Chain", func() {
c.Halt()
_, metadata = c.support.WriteBlockArgsForCall(0)
m = &raftprotos.RaftMetadata{}
m = &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
raftMetadata.RaftIndex = m.RaftIndex
cx := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
......@@ -1083,7 +1083,7 @@ var _ = Describe("Chain", func() {
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
_, metadata := support.WriteBlockArgsForCall(1)
m := &raftprotos.RaftMetadata{}
m := &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
// check snapshot does exit
......@@ -1152,7 +1152,7 @@ var _ = Describe("Chain", func() {
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
_, metadata := support.WriteBlockArgsForCall(1)
m := &raftprotos.RaftMetadata{}
m := &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
By("Cutting block 3")
......@@ -1215,7 +1215,7 @@ var _ = Describe("Chain", func() {
SnapDir: snapDir,
Logger: logger,
MemoryStorage: storage,
RaftMetadata: &raftprotos.RaftMetadata{},
BlockMetadata: &raftprotos.BlockMetadata{},
Metrics: newFakeMetrics(newFakeMetricsFields()),
},
configurator,
......@@ -1247,7 +1247,7 @@ var _ = Describe("Chain", func() {
SnapDir: snapDir,
Logger: logger,
MemoryStorage: storage,
RaftMetadata: &raftprotos.RaftMetadata{},
BlockMetadata: &raftprotos.BlockMetadata{},
Metrics: newFakeMetrics(newFakeMetricsFields()),
},
nil,
......@@ -1273,10 +1273,10 @@ var _ = Describe("Chain", func() {
chain, err := etcdraft.NewChain(
support,
etcdraft.Options{
WALDir: path.Join(d, "wal-dir"),
SnapDir: snapDir,
Logger: logger,
RaftMetadata: &raftprotos.RaftMetadata{},
WALDir: path.Join(d, "wal-dir"),
SnapDir: snapDir,
Logger: logger,
BlockMetadata: &raftprotos.BlockMetadata{},
},
nil,
nil,
......@@ -1298,7 +1298,7 @@ var _ = Describe("Chain", func() {
timeout time.Duration
dataDir string
c1, c2, c3 *chain
raftMetadata *raftprotos.RaftMetadata
raftMetadata *raftprotos.BlockMetadata
)
BeforeEach(func() {
......@@ -1310,7 +1310,7 @@ var _ = Describe("Chain", func() {
dataDir, err = ioutil.TempDir("", "raft-test-")
Expect(err).NotTo(HaveOccurred())
raftMetadata = &raftprotos.RaftMetadata{
raftMetadata = &raftprotos.BlockMetadata{
Consenters: map[uint64]*raftprotos.Consenter{
1: {
Host: "localhost",
......@@ -1428,7 +1428,7 @@ var _ = Describe("Chain", func() {
)
var (
addConsenterConfigValue = func() map[string]*common.ConfigValue {
metadata := &raftprotos.Metadata{}
metadata := &raftprotos.ConfigMetadata{}
for _, consenter := range raftMetadata.Consenters {
metadata.Consenters = append(metadata.Consenters, consenter)
}
......@@ -1451,10 +1451,10 @@ var _ = Describe("Chain", func() {
}
}
removeConsenterConfigValue = func(id uint64) map[string]*common.ConfigValue {
newRaftMetadata := proto.Clone(raftMetadata).(*raftprotos.RaftMetadata)
newRaftMetadata := proto.Clone(raftMetadata).(*raftprotos.BlockMetadata)
delete(newRaftMetadata.Consenters, id)
metadata := &raftprotos.Metadata{}
metadata := &raftprotos.ConfigMetadata{}
for _, consenter := range newRaftMetadata.Consenters {
metadata.Consenters = append(metadata.Consenters, consenter)
}
......@@ -1499,12 +1499,12 @@ var _ = Describe("Chain", func() {
Context("reconfiguration", func() {
It("cannot change consenter set by more than 1 node", func() {
updatedRaftMetadata := proto.Clone(raftMetadata).(*raftprotos.RaftMetadata)
updatedRaftMetadata := proto.Clone(raftMetadata).(*raftprotos.BlockMetadata)
// remove second & third consenter
delete(updatedRaftMetadata.Consenters, 2)
delete(updatedRaftMetadata.Consenters, 3)