Commit 5ecaa3e5 authored by Kostas Christidis's avatar Kostas Christidis Committed by Gerrit Code Review
Browse files

Merge "FAB-12986: ledger per chain for raft chain_test.go" into release-1.4

parents 7dbf15ac c7db89e0
......@@ -882,7 +882,6 @@ var _ = Describe("Chain", func() {
raftMetadata.RaftIndex = m.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
// c.support.HeightReturns(normalBlock.Header.Number + 1)
c.init()
c.Start()
......@@ -939,6 +938,11 @@ var _ = Describe("Chain", func() {
raftMetadata.RaftIndex = m.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
c.support.HeightReturns(5)
c.support.BlockReturns(&common.Block{
Header: &common.BlockHeader{},
Data: &common.BlockData{Data: [][]byte{[]byte("foo")}},
Metadata: &common.BlockMetadata{Metadata: make([][]byte, 4)},
})
c.init()
c.Start()
......@@ -990,7 +994,13 @@ var _ = Describe("Chain", func() {
raftMetadata.RaftIndex = m.RaftIndex
c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
// start chain at block 2 (height = 3)
c.support.HeightReturns(3)
c.support.BlockReturns(&common.Block{
Header: &common.BlockHeader{},
Data: &common.BlockData{Data: [][]byte{[]byte("foo")}},
Metadata: &common.BlockMetadata{Metadata: make([][]byte, 4)},
})
c.opts.SnapInterval = 2
c.init()
......@@ -1498,20 +1508,16 @@ var _ = Describe("Chain", func() {
configEnv := newConfigEnv(channelID, common.HeaderType_CONFIG, newConfigUpdateEnv(channelID, addConsenterConfigValue()))
c1.cutter.CutNext = true
configBlock := &common.Block{
Header: &common.BlockHeader{},
Data: &common.BlockData{Data: [][]byte{marshalOrPanic(configEnv)}}}
c1.support.WriteConfigBlockStub = func(_ *common.Block, _ []byte) {
stub := c1.support.WriteConfigBlockStub
c1.support.WriteConfigBlockStub = func(block *common.Block, metadata []byte) {
stub(block, metadata)
// disconnect leader after block being committed
network.disconnect(1)
// electing new leader
network.elect(2)
}
// mock Block method to return recent configuration block
c2.support.BlockReturns(configBlock)
By("sending config transaction")
err := c1.Configure(configEnv, 0)
Expect(err).ToNot(HaveOccurred())
......@@ -1568,18 +1574,14 @@ var _ = Describe("Chain", func() {
newConfigUpdateEnv(channelID, removeConsenterConfigValue(1))) // remove nodeID == 1
c1.cutter.CutNext = true
configBlock := &common.Block{
Header: &common.BlockHeader{},
Data: &common.BlockData{Data: [][]byte{marshalOrPanic(configEnv)}}}
c1.support.WriteConfigBlockStub = func(_ *common.Block, _ []byte) {
stub := c1.support.WriteConfigBlockStub
c1.support.WriteConfigBlockStub = func(block *common.Block, metadata []byte) {
stub(block, metadata)
// disconnect leader after block being committed
network.disconnect(1)
}
// mock Block method to return recent configuration block
c2.support.BlockReturns(configBlock)
By("sending config transaction")
err := c1.Configure(configEnv, 0)
Expect(err).ToNot(HaveOccurred())
......@@ -1619,11 +1621,6 @@ var _ = Describe("Chain", func() {
c1.cutter.CutNext = true
c1.support.WriteConfigBlockStub = func(configBlock *common.Block, _ []byte) {
// make sure c2 will get recent configuration block
c2.support.BlockReturns(configBlock)
}
By("sending config transaction")
err := c1.Configure(configEnv, 0)
Expect(err).ToNot(HaveOccurred())
......@@ -2186,6 +2183,10 @@ type chain struct {
opts etcdraft.Options
puller *mocks.FakeBlockPuller
// store written blocks to be returned by mock block puller
ledger map[uint64]*common.Block
ledgerLock sync.RWMutex
observe chan uint64
unstarted chan struct{}
......@@ -2220,10 +2221,6 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,
close(cutter.Block)
support.BlockCutterReturns(cutter)
// for block creator initialization
support.HeightReturns(1)
support.BlockReturns(getSeedBlock())
// upon leader change, lead is reset to 0 before set to actual
// new leader, i.e. 1 -> 0 -> 2. Therefore 2 numbers will be
// sent on this chan, so we need size to be 2
......@@ -2236,7 +2233,8 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,
ch := make(chan struct{})
close(ch)
return &chain{
c := &chain{
id: id,
support: support,
cutter: cutter,
......@@ -2248,7 +2246,40 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,
unstarted: ch,
configurator: configurator,
puller: puller,
ledger: map[uint64]*common.Block{
0: getSeedBlock(), // Very first block
},
}
// receives blocks and metadata and appends it into
// the ledger struct to simulate write behaviour
appendBlockToLedger := func(b *common.Block, meta []byte) {
bytes, err := proto.Marshal(&common.Metadata{Value: meta})
Expect(err).NotTo(HaveOccurred())
b.Metadata.Metadata[common.BlockMetadataIndex_ORDERER] = bytes
c.ledgerLock.Lock()
defer c.ledgerLock.Unlock()
c.ledger[b.Header.Number] = b
}
c.support.WriteBlockStub = appendBlockToLedger
c.support.WriteConfigBlockStub = appendBlockToLedger
// returns current ledger height
c.support.HeightStub = func() uint64 {
c.ledgerLock.RLock()
defer c.ledgerLock.RUnlock()
return uint64(len(c.ledger))
}
// reads block from the ledger
c.support.BlockStub = func(number uint64) *common.Block {
c.ledgerLock.RLock()
defer c.ledgerLock.RUnlock()
return c.ledger[number]
}
return c
}
func (c *chain) init() {
......@@ -2261,9 +2292,6 @@ type network struct {
leader uint64
chains map[uint64]*chain
// store written blocks to be returned by mock block puller
ledger *sync.Map
// used to determine connectivity of a chain.
// the actual value type is `chan struct` because
// it's used to skip assertion in `elect` if a
......@@ -2316,20 +2344,15 @@ func (n *network) addChain(c *chain) {
return nil
}
c.support.WriteBlockStub = func(b *common.Block, meta []byte) {
bytes, err := proto.Marshal(&common.Metadata{Value: meta})
Expect(err).NotTo(HaveOccurred())
b.Metadata.Metadata[common.BlockMetadataIndex_ORDERER] = bytes
n.ledger.Store(b.Header.Number, b)
}
c.puller.PullBlockStub = func(i uint64) *common.Block {
b, exist := n.ledger.Load(i)
if !exist {
return nil
}
n.connLock.RLock()
leaderChain := n.chains[n.leader]
n.connLock.RUnlock()
return b.(*common.Block)
leaderChain.ledgerLock.RLock()
defer leaderChain.ledgerLock.RUnlock()
block := leaderChain.ledger[i]
return block
}
n.appendChain(c)
......@@ -2339,7 +2362,6 @@ func createNetwork(timeout time.Duration, channel string, dataDir string, raftMe
n := &network{
chains: make(map[uint64]*chain),
connectivity: make(map[uint64]chan struct{}),
ledger: &sync.Map{},
}
for nodeID := range raftMetadata.Consenters {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment