Unverified Commit e0e3ddbb authored by yacovm's avatar yacovm Committed by Artem Barger
Browse files

[FAB-13716] Block verifier book-keeping for onboarding



This change set adds the following supporting structs for adding
support for verifying blocks pulled by onboarding in future CRs:

- Ledger interceptor: intercepts a commit of a block, and invokes
  a callback.

- VerificationRegistry: tracks commit of config blocks, and builds
  channelconfig bundles from them, in order to support verification
  of blocks pulled.

- BlockVerifierAssembler and BlockValidationPolicyVerifier: together
  they build block verifiers out of config blocks.

- verifierLoader: Loads a mapping of chainID->cluster.BlockVerifier,
  which is to be used at OSN startup to preload the existing verifiers.
  It is needed in cases we recover from a crash, or if we do
  dynamic onboarding and the previous config blocks have been committed
  to the ledger before the OSN was started.

In the next CR, I will wire all these into the onboarding infrastructure
itself, and they will be used to hold the latest bundle per channel
in order to verify block signatures.

Change-Id: Ic9fc99243baa5c2cef97103d001180207414d98a
Signed-off-by: default avataryacovm <yacovm@il.ibm.com>
parent 9b78a9d8
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import cluster "github.com/hyperledger/fabric/orderer/common/cluster"
import common "github.com/hyperledger/fabric/protos/common"
import mock "github.com/stretchr/testify/mock"
// VerifierFactory is an autogenerated mock type for the VerifierFactory type
type VerifierFactory struct {
mock.Mock
}
// VerifierFromConfig provides a mock function with given fields: configuration, channel
func (_m *VerifierFactory) VerifierFromConfig(configuration *common.ConfigEnvelope, channel string) (cluster.BlockVerifier, error) {
ret := _m.Called(configuration, channel)
var r0 cluster.BlockVerifier
if rf, ok := ret.Get(0).(func(*common.ConfigEnvelope, string) cluster.BlockVerifier); ok {
r0 = rf(configuration, channel)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(cluster.BlockVerifier)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*common.ConfigEnvelope, string) error); ok {
r1 = rf(configuration, channel)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import cluster "github.com/hyperledger/fabric/orderer/common/cluster"
import mock "github.com/stretchr/testify/mock"
// VerifierRetriever is an autogenerated mock type for the VerifierRetriever type
type VerifierRetriever struct {
mock.Mock
}
// RetrieveVerifier provides a mock function with given fields: channel
func (_m *VerifierRetriever) RetrieveVerifier(channel string) cluster.BlockVerifier {
ret := _m.Called(channel)
var r0 cluster.BlockVerifier
if rf, ok := ret.Get(0).(func(string) cluster.BlockVerifier); ok {
r0 = rf(channel)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(cluster.BlockVerifier)
}
}
return r0
}
......@@ -15,6 +15,9 @@ import (
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/tools/protolator"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/protos/common"
......@@ -202,7 +205,7 @@ type BlockVerifier interface {
// BlockSequenceVerifier verifies that the given consecutive sequence
// of blocks is valid.
type BlockSequenceVerifier func([]*common.Block) error
type BlockSequenceVerifier func(blocks []*common.Block) error
// Dialer creates a gRPC connection to a remote address
type Dialer interface {
......@@ -403,6 +406,131 @@ func EndpointconfigFromConfigBlock(block *common.Block) (*EndpointConfig, error)
}, nil
}
//go:generate mockery -dir . -name VerifierFactory -case underscore -output ./mocks/
// VerifierFactory creates BlockVerifiers.
type VerifierFactory interface {
// VerifierFromConfig creates a BlockVerifier from the given configuration.
VerifierFromConfig(configuration *common.ConfigEnvelope, channel string) (BlockVerifier, error)
}
// VerificationRegistry registers verifiers and retrieves them.
type VerificationRegistry struct {
Logger *flogging.FabricLogger
VerifierFactory VerifierFactory
VerifiersByChannel map[string]BlockVerifier
}
// RetrieveVerifier returns a BlockVerifier for the given channel, or nil if not found.
func (vr *VerificationRegistry) RetrieveVerifier(channel string) BlockVerifier {
verifier, exists := vr.VerifiersByChannel[channel]
if exists {
return verifier
}
vr.Logger.Errorf("No verifier for channel %s exists", channel)
return nil
}
// BlockCommitted notifies the VerificationRegistry upon a block commit, which may
// trigger a registration of a verifier out of the block in case the block is a config block.
func (vr *VerificationRegistry) BlockCommitted(block *common.Block, channel string) {
conf, err := ConfigFromBlock(block)
// The block doesn't contain a config block, but is a valid block
if err == errNotAConfig {
vr.Logger.Debugf("Committed block %d for channel %s that is not a config block",
block.Header.Number, channel)
return
}
// The block isn't a valid block
if err != nil {
vr.Logger.Errorf("Failed parsing block of channel %s: %v, content: %s",
channel, err, BlockToString(block))
return
}
// The block contains a config block
verifier, err := vr.VerifierFactory.VerifierFromConfig(conf, channel)
if err != nil {
vr.Logger.Errorf("Failed creating a verifier from a config block for channel %s: %v, content: %s",
channel, err, BlockToString(block))
return
}
vr.VerifiersByChannel[channel] = verifier
vr.Logger.Debugf("Committed config block %d for channel %s", block.Header.Number, channel)
}
// BlockToString returns a string representation of this block.
func BlockToString(block *common.Block) string {
buff := &bytes.Buffer{}
protolator.DeepMarshalJSON(buff, block)
return buff.String()
}
// BlockCommitFunc signals a block commit.
type BlockCommitFunc func(block *common.Block, channel string)
// LedgerInterceptor intercepts block commits.
type LedgerInterceptor struct {
Channel string
InterceptBlockCommit BlockCommitFunc
LedgerWriter
}
// Append commits a block into the ledger, and also fires the configured callback.
func (interceptor *LedgerInterceptor) Append(block *common.Block) error {
defer interceptor.InterceptBlockCommit(block, interceptor.Channel)
return interceptor.LedgerWriter.Append(block)
}
// BlockVerifierAssembler creates a BlockVerifier out of a config envelope
type BlockVerifierAssembler struct {
Logger *flogging.FabricLogger
}
// VerifierFromConfig creates a BlockVerifier from the given configuration.
func (bva *BlockVerifierAssembler) VerifierFromConfig(configuration *common.ConfigEnvelope, channel string) (BlockVerifier, error) {
bundle, err := channelconfig.NewBundle(channel, configuration.Config)
if err != nil {
return nil, errors.Wrap(err, "failed extracting bundle from envelope")
}
policyMgr := bundle.PolicyManager()
return &BlockValidationPolicyVerifier{
Logger: bva.Logger,
PolicyMgr: policyMgr,
Channel: channel,
}, nil
}
// BlockValidationPolicyVerifier verifies signatures based on the block validation policy.
type BlockValidationPolicyVerifier struct {
Logger *flogging.FabricLogger
Channel string
PolicyMgr policies.Manager
}
// VerifyBlockSignature verifies the signed data associated to a block, optionally with the given config envelope.
func (bv *BlockValidationPolicyVerifier) VerifyBlockSignature(sd []*common.SignedData, envelope *common.ConfigEnvelope) error {
policyMgr := bv.PolicyMgr
// If the envelope passed isn't nil, we should use a different policy manager.
if envelope != nil {
bundle, err := channelconfig.NewBundle(bv.Channel, envelope.Config)
if err != nil {
buff := &bytes.Buffer{}
protolator.DeepMarshalJSON(buff, envelope.Config)
bv.Logger.Errorf("Failed creating a new bundle for channel %s, Config content is: %s", bv.Channel, buff.String())
return err
}
bv.Logger.Infof("Initializing new PolicyManager for channel %s", bv.Channel)
policyMgr = bundle.PolicyManager()
}
policy, exists := policyMgr.GetPolicy(policies.BlockValidation)
if !exists {
return errors.Errorf("policy %s wasn't found", policies.BlockValidation)
}
return policy.Evaluate(sd)
}
//go:generate mockery -dir . -name BlockRetriever -case underscore -output ./mocks/
// BlockRetriever retrieves blocks
......
......@@ -17,6 +17,12 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
mockpolicies "github.com/hyperledger/fabric/common/mocks/policies"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/tools/configtxgen/configtxgentest"
"github.com/hyperledger/fabric/common/tools/configtxgen/encoder"
"github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/cluster/mocks"
......@@ -24,6 +30,8 @@ import (
"github.com/hyperledger/fabric/protos/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func TestParallelStubActivation(t *testing.T) {
......@@ -609,6 +617,103 @@ func TestConfigFromBlockBadInput(t *testing.T) {
}
}
func TestBlockValidationPolicyVerifier(t *testing.T) {
t.Parallel()
config := configtxgentest.Load(localconfig.SampleInsecureSoloProfile)
group, err := encoder.NewChannelGroup(config)
assert.NoError(t, err)
assert.NotNil(t, group)
validConfigEnvelope := &common.ConfigEnvelope{
Config: &common.Config{
ChannelGroup: group,
},
}
for _, testCase := range []struct {
description string
expectedError string
envelope *common.ConfigEnvelope
policyMap map[string]policies.Policy
}{
{
description: "policy not found",
expectedError: "policy /Channel/Orderer/BlockValidation wasn't found",
},
{
description: "policy evaluation fails",
expectedError: "invalid signature",
policyMap: map[string]policies.Policy{
"/Channel/Orderer/BlockValidation": &mockpolicies.Policy{
Err: errors.New("invalid signature"),
},
},
},
{
description: "bad config envelope",
expectedError: "config must contain a channel group",
policyMap: map[string]policies.Policy{
"/Channel/Orderer/BlockValidation": &mockpolicies.Policy{
Err: errors.New("invalid signature"),
},
},
envelope: &common.ConfigEnvelope{Config: &common.Config{}},
},
{
description: "good config envelope overrides custom policy manager",
policyMap: map[string]policies.Policy{
"/Channel/Orderer/BlockValidation": &mockpolicies.Policy{
Err: errors.New("invalid signature"),
},
},
envelope: validConfigEnvelope,
},
} {
t.Run(testCase.description, func(t *testing.T) {
verifier := &cluster.BlockValidationPolicyVerifier{
Logger: flogging.MustGetLogger("test"),
Channel: "mychannel",
PolicyMgr: &mockpolicies.Manager{
PolicyMap: testCase.policyMap,
},
}
err := verifier.VerifyBlockSignature(nil, testCase.envelope)
if testCase.expectedError != "" {
assert.EqualError(t, err, testCase.expectedError)
} else {
assert.NoError(t, err)
}
})
}
}
func TestBlockVerifierAssembler(t *testing.T) {
t.Parallel()
config := configtxgentest.Load(localconfig.SampleInsecureSoloProfile)
group, err := encoder.NewChannelGroup(config)
assert.NoError(t, err)
assert.NotNil(t, group)
t.Run("Good config envelope", func(t *testing.T) {
bva := &cluster.BlockVerifierAssembler{}
verifier, err := bva.VerifierFromConfig(&common.ConfigEnvelope{
Config: &common.Config{
ChannelGroup: group,
},
}, "mychannel")
assert.NoError(t, err)
assert.NoError(t, verifier.VerifyBlockSignature(nil, nil))
})
t.Run("Bad config envelope", func(t *testing.T) {
bva := &cluster.BlockVerifierAssembler{}
_, err := bva.VerifierFromConfig(&common.ConfigEnvelope{}, "mychannel")
assert.EqualError(t, err, "failed extracting bundle from envelope: channelconfig Config cannot be nil")
})
}
func TestLastConfigBlock(t *testing.T) {
blockRetriever := &mocks.BlockRetriever{}
blockRetriever.On("Block", uint64(42)).Return(&common.Block{})
......@@ -694,3 +799,141 @@ func TestLastConfigBlock(t *testing.T) {
})
}
}
func TestVerificationRegistry(t *testing.T) {
t.Parallel()
blockBytes, err := ioutil.ReadFile("testdata/mychannel.block")
assert.NoError(t, err)
block := &common.Block{}
assert.NoError(t, proto.Unmarshal(blockBytes, block))
flogging.ActivateSpec("test=DEBUG")
defer flogging.Reset()
verifier := &mocks.BlockVerifier{}
for _, testCase := range []struct {
description string
verifiersByChannel map[string]cluster.BlockVerifier
blockCommitted *common.Block
channelCommitted string
channelRetrieved string
expectedVerifier cluster.BlockVerifier
verifierFromConfig cluster.BlockVerifier
verifierFromConfigErr error
loggedMessages map[string]struct{}
}{
{
description: "bad block",
blockCommitted: &common.Block{},
channelRetrieved: "foo",
channelCommitted: "foo",
loggedMessages: map[string]struct{}{
"Failed parsing block of channel foo: empty block, content: " +
"{\n\t\"data\": null,\n\t\"header\": null,\n\t\"metadata\": null\n}\n": {},
"No verifier for channel foo exists": {},
},
expectedVerifier: nil,
},
{
description: "not a config block",
blockCommitted: createBlockChain(5, 5)[0],
channelRetrieved: "foo",
channelCommitted: "foo",
loggedMessages: map[string]struct{}{
"No verifier for channel foo exists": {},
"Committed block 5 for channel foo that is not a config block": {},
},
expectedVerifier: nil,
},
{
description: "valid block but verifier from config fails",
blockCommitted: block,
verifierFromConfigErr: errors.New("invalid MSP config"),
channelRetrieved: "bar",
channelCommitted: "bar",
loggedMessages: map[string]struct{}{
"Failed creating a verifier from a " +
"config block for channel bar: invalid MSP config, " +
"content: " + cluster.BlockToString(block): {},
"No verifier for channel bar exists": {},
},
expectedVerifier: nil,
},
{
description: "valid block and verifier from config succeeds but wrong channel retrieved",
blockCommitted: block,
verifierFromConfig: verifier,
channelRetrieved: "foo",
channelCommitted: "bar",
loggedMessages: map[string]struct{}{
"No verifier for channel foo exists": {},
"Committed config block 0 for channel bar": {},
},
expectedVerifier: nil,
verifiersByChannel: make(map[string]cluster.BlockVerifier),
},
{
description: "valid block and verifier from config succeeds",
blockCommitted: block,
verifierFromConfig: verifier,
channelRetrieved: "bar",
channelCommitted: "bar",
loggedMessages: map[string]struct{}{
"Committed config block 0 for channel bar": {},
},
expectedVerifier: verifier,
verifiersByChannel: make(map[string]cluster.BlockVerifier),
},
} {
t.Run(testCase.description, func(t *testing.T) {
verifierFactory := &mocks.VerifierFactory{}
verifierFactory.On("VerifierFromConfig",
mock.Anything, testCase.channelCommitted).Return(testCase.verifierFromConfig, testCase.verifierFromConfigErr)
registry := &cluster.VerificationRegistry{
Logger: flogging.MustGetLogger("test"),
VerifiersByChannel: testCase.verifiersByChannel,
VerifierFactory: verifierFactory,
}
loggedEntriesByMethods := make(map[string]struct{})
// Configure the logger to collect the message logged
registry.Logger = registry.Logger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error {
loggedEntriesByMethods[entry.Message] = struct{}{}
return nil
}))
registry.BlockCommitted(testCase.blockCommitted, testCase.channelCommitted)
verifier := registry.RetrieveVerifier(testCase.channelRetrieved)
assert.Equal(t, testCase.loggedMessages, loggedEntriesByMethods)
assert.Equal(t, testCase.expectedVerifier, verifier)
})
}
}
func TestLedgerInterceptor(t *testing.T) {
block := &common.Block{}
ledger := &mocks.LedgerWriter{}
ledger.On("Append", block).Return(nil).Once()
var intercepted bool
var interceptedLedger cluster.LedgerWriter = &cluster.LedgerInterceptor{
Channel: "mychannel",
LedgerWriter: ledger,
InterceptBlockCommit: func(b *common.Block, channel string) {
assert.Equal(t, block, b)
assert.Equal(t, "mychannel", channel)
intercepted = true
},
}
err := interceptedLedger.Append(block)
assert.NoError(t, err)
assert.True(t, intercepted)
ledger.AssertCalled(t, "Append", block)
}
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import blockledger "github.com/hyperledger/fabric/common/ledger/blockledger"
import mock "github.com/stretchr/testify/mock"
// Factory is an autogenerated mock type for the Factory type
type Factory struct {
mock.Mock
}
// ChainIDs provides a mock function with given fields:
func (_m *Factory) ChainIDs() []string {
ret := _m.Called()
var r0 []string
if rf, ok := ret.Get(0).(func() []string); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]string)
}
}
return r0
}
// Close provides a mock function with given fields:
func (_m *Factory) Close() {
_m.Called()
}
// GetOrCreate provides a mock function with given fields: chainID
func (_m *Factory) GetOrCreate(chainID string) (blockledger.ReadWriter, error) {
ret := _m.Called(chainID)
var r0 blockledger.ReadWriter
if rf, ok := ret.Get(0).(func(string) blockledger.ReadWriter); ok {
r0 = rf(chainID)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(blockledger.ReadWriter)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(chainID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
......@@ -222,3 +222,80 @@ func (dc *inactiveChainReplicator) listInactiveChains() []string {
}
return chains
}
//go:generate mockery -dir . -name Factory -case underscore -output mocks/
// Factory retrieves or creates new ledgers by chainID
type Factory interface {
// GetOrCreate gets an existing ledger (if it exists)
// or creates it if it does not
GetOrCreate(chainID string) (blockledger.ReadWriter, error)
// ChainIDs returns the chain IDs the Factory is aware of
ChainIDs() []string
// Close releases all resources acquired by the factory
Close()
}
type blockGetter struct {
ledger blockledger.Reader
}
func (bg *blockGetter) Block(number uint64) *common.Block {
return blockledger.GetBlock(bg.ledger, number)
}
type verifierLoader struct {
ledgerFactory blockledger.Factory
verifierFactory cluster.VerifierFactory
logger *flogging.FabricLogger
onFailure func(block *common.Block)
}
type verifiersByChannel map[string]cluster.BlockVerifier
func (vl *verifierLoader) loadVerifiers() verifiersByChannel {
res := make(verifiersByChannel)
for _, chain := range vl.ledgerFactory.ChainIDs() {
ledger, err := vl.ledgerFactory.GetOrCreate(chain)
if err != nil {
vl.logger.Panicf("Failed obtaining ledger for channel %s", chain)
return nil
}
blockRetriever := &blockGetter{ledger: ledger}
height := ledger.Height()
if height == 0 {
vl.logger.Infof("Channel %s has no blocks, skipping it", chain)
continue
}
lastBlockIndex := height - 1
lastBlock := blockRetriever.Block(lastBlockIndex)
if lastBlock == nil {
vl.logger.Panicf("Failed retrieving block %d for channel %s", lastBlockIndex, chain)
}
lastConfigBlock, err := cluster.LastConfigBlock(lastBlock, blockRetriever)
if err != nil {
vl.logger.Panicf("Failed retrieving config block %d for channel %s", lastBlockIndex, chain)
}