Commit e5745e6b authored by Artem Barger's avatar Artem Barger Committed by Gerrit Code Review
Browse files

Merge "FAB-11755 Implement rollback in pvtdatastore"

parents 892e1455 95eebc1c
......@@ -197,3 +197,15 @@ func createRangeScanKeysForCollElg() (startKey, endKey []byte) {
return encodeCollElgKey(math.MaxUint64),
encodeCollElgKey(0)
}
func datakeyRange(blockNum uint64) (startKey, endKey []byte) {
startKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...)
endKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, math.MaxUint64).ToBytes()...)
return
}
func eligibleMissingdatakeyRange(blkNum uint64) (startKey, endKey []byte) {
startKey = append(eligibleMissingDataKeyPrefix, util.EncodeReverseOrderVarUint64(blkNum)...)
endKey = append(eligibleMissingDataKeyPrefix, util.EncodeReverseOrderVarUint64(blkNum-1)...)
return
}
......@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package pvtdatastorage
import (
"bytes"
"testing"
"github.com/stretchr/testify/assert"
......@@ -17,3 +18,63 @@ func TestDataKeyEncoding(t *testing.T) {
datakey2 := decodeDatakey(encodeDataKey(dataKey1))
assert.Equal(t, dataKey1, datakey2)
}
func TestDatakeyRange(t *testing.T) {
blockNum := uint64(20)
startKey, endKey := datakeyRange(blockNum)
var txNum uint64
for txNum = 0; txNum < 100; txNum++ {
keyOfBlock := encodeDataKey(
&dataKey{
nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum},
txNum: txNum,
},
)
keyOfPreviousBlock := encodeDataKey(
&dataKey{
nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum - 1},
txNum: txNum,
},
)
keyOfNextBlock := encodeDataKey(
&dataKey{
nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum + 1},
txNum: txNum,
},
)
assert.Equal(t, bytes.Compare(keyOfPreviousBlock, startKey), -1)
assert.Equal(t, bytes.Compare(keyOfBlock, startKey), 1)
assert.Equal(t, bytes.Compare(keyOfBlock, endKey), -1)
assert.Equal(t, bytes.Compare(keyOfNextBlock, endKey), 1)
}
}
func TestEligibleMissingdataRange(t *testing.T) {
blockNum := uint64(20)
startKey, endKey := eligibleMissingdatakeyRange(blockNum)
var txNum uint64
for txNum = 0; txNum < 100; txNum++ {
keyOfBlock := encodeMissingDataKey(
&missingDataKey{
nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum},
isEligible: true,
},
)
keyOfPreviousBlock := encodeMissingDataKey(
&missingDataKey{
nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum - 1},
isEligible: true,
},
)
keyOfNextBlock := encodeMissingDataKey(
&missingDataKey{
nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum + 1},
isEligible: true,
},
)
assert.Equal(t, bytes.Compare(keyOfNextBlock, startKey), -1)
assert.Equal(t, bytes.Compare(keyOfBlock, startKey), 1)
assert.Equal(t, bytes.Compare(keyOfBlock, endKey), -1)
assert.Equal(t, bytes.Compare(keyOfPreviousBlock, endKey), 1)
}
}
......@@ -236,12 +236,33 @@ func (s *store) Commit() error {
}
// Rollback implements the function in the interface `Store`
// Not deleting the existing data entries and expiry entries for now
// Because the next try would have exact same entries and will overwrite those
// This deletes the existing data entries and eligible missing data entries.
// However, this does not delete ineligible missing data entires as the next try
// would have exact same entries and will overwrite those. This also leaves the
// existing expiry entires as is because, most likely they will also get overwritten
// per new data entries. Even if some of the expiry entries does not get overwritten,
// (beacuse of some data may be missing next time), the additional expiry entries are just
// a Noop
func (s *store) Rollback() error {
if !s.batchPending {
return &ErrIllegalCall{"No pending batch to rollback"}
}
blkNum := s.nextBlockNum()
batch := leveldbhelper.NewUpdateBatch()
itr := s.db.GetIterator(datakeyRange(blkNum))
for itr.Next() {
batch.Delete(itr.Key())
}
itr.Release()
itr = s.db.GetIterator(eligibleMissingdatakeyRange(blkNum))
for itr.Next() {
batch.Delete(itr.Key())
}
itr.Release()
batch.Delete(pendingCommitKey)
if err := s.db.WriteBatch(batch, true); err != nil {
return err
}
s.batchPending = false
return nil
}
......
......@@ -773,7 +773,83 @@ func testCollElgEnabled(t *testing.T) {
assert.Equal(expectedMissingPvtDataInfo, missingPvtDataInfo)
}
// TODO Add tests for simulating a crash between calls `Prepare` and `Commit`/`Rollback`
func TestRollBack(t *testing.T) {
btlPolicy := btltestutil.SampleBTLPolicy(
map[[2]string]uint64{
{"ns-1", "coll-1"}: 0,
{"ns-1", "coll-2"}: 0,
},
)
env := NewTestStoreEnv(t, "TestRollBack", btlPolicy)
defer env.Cleanup()
assert := assert.New(t)
store := env.TestStore
assert.NoError(store.Prepare(0, nil, nil))
assert.NoError(store.Commit())
pvtdata := []*ledger.TxPvtData{
produceSamplePvtdata(t, 0, []string{"ns-1:coll-1", "ns-1:coll-2"}),
produceSamplePvtdata(t, 5, []string{"ns-1:coll-1", "ns-1:coll-2"}),
}
missingData := make(ledger.TxMissingPvtDataMap)
missingData.Add(1, "ns-1", "coll-1", true)
missingData.Add(5, "ns-1", "coll-1", true)
missingData.Add(5, "ns-2", "coll-2", false)
for i := 1; i <= 9; i++ {
assert.NoError(store.Prepare(uint64(i), pvtdata, missingData))
assert.NoError(store.Commit())
}
datakeyTx0 := &dataKey{
nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1"},
txNum: 0,
}
datakeyTx5 := &dataKey{
nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1"},
txNum: 5,
}
eligibleMissingdatakey := &missingDataKey{
nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1"},
isEligible: true,
}
// test store state before preparing for block 10
testPendingBatch(false, assert, store)
testLastCommittedBlockHeight(10, assert, store)
// prepare for block 10 and test store for presence of datakeys and eligibile missingdatakeys
assert.NoError(store.Prepare(10, pvtdata, missingData))
testPendingBatch(true, assert, store)
testLastCommittedBlockHeight(10, assert, store)
datakeyTx0.blkNum = 10
datakeyTx5.blkNum = 10
eligibleMissingdatakey.blkNum = 10
assert.True(testDataKeyExists(t, store, datakeyTx0))
assert.True(testDataKeyExists(t, store, datakeyTx5))
assert.True(testMissingDataKeyExists(t, store, eligibleMissingdatakey))
// rollback last prepared block and test store for absence of datakeys and eligibile missingdatakeys
store.Rollback()
testPendingBatch(false, assert, store)
testLastCommittedBlockHeight(10, assert, store)
assert.False(testDataKeyExists(t, store, datakeyTx0))
assert.False(testDataKeyExists(t, store, datakeyTx5))
assert.False(testMissingDataKeyExists(t, store, eligibleMissingdatakey))
// For previously committed blocks the datakeys and eligibile missingdatakeys should still be present
for i := 1; i <= 9; i++ {
datakeyTx0.blkNum = uint64(i)
datakeyTx5.blkNum = uint64(i)
eligibleMissingdatakey.blkNum = uint64(i)
assert.True(testDataKeyExists(t, store, datakeyTx0))
assert.True(testDataKeyExists(t, store, datakeyTx5))
assert.True(testMissingDataKeyExists(t, store, eligibleMissingdatakey))
}
}
// TODO Add tests for simulating a crash between calls `Prepare` and `Commit`/`Rollback` - [FAB-13099]
func testEmpty(expectedEmpty bool, assert *assert.Assertions, store Store) {
isEmpty, err := store.IsEmpty()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment