Unverified Commit 631ab5b1 authored by Artem Barger's avatar Artem Barger
Browse files

[FAB-9787] populate PrivatePayload with sim. info



Populate PrivatePayload gossip message with additional information about
simulation and collection configuration available at the endorsement
time to support distribution of private data in light of collections
membership changes.

Change-Id: I0542e85afbeb9b1a273e53ae34161977dc1bb4d4
Signed-off-by: default avatarArtem Barger <bartem@il.ibm.com>
parent 6fd506e5
......@@ -31,7 +31,7 @@ var endorserLogger = flogging.MustGetLogger("endorser")
// The Jira issue that documents Endorser flow along with its relationship to
// the lifecycle chaincode - https://jira.hyperledger.org/browse/FAB-181
type privateDataDistributor func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error
type privateDataDistributor func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error
// Support contains functions that the endorser requires to execute its tasks
type Support interface {
......@@ -266,7 +266,7 @@ func (e *Endorser) simulateProposal(ctx context.Context, chainID string, txid st
// TODO: remove once we can store collection configuration outside of LSCC
return nil, nil, nil, nil, errors.New("Private data is forbidden to be used in instantiate")
}
if err := e.distributePrivateData(chainID, txid, simResult.PvtSimulationResults); err != nil {
if err := e.distributePrivateData(chainID, txid, simResult.PvtSimulationResults, simResult.SimulationBlkHt); err != nil {
return nil, nil, nil, nil, err
}
}
......
......@@ -56,7 +56,7 @@ func getSignedPropWithCHIdAndArgs(chid, ccid, ccver string, ccargs [][]byte, t *
}
func TestEndorserNilProp(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -74,7 +74,7 @@ func TestEndorserNilProp(t *testing.T) {
}
func TestEndorserUninvokableSysCC(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -92,7 +92,7 @@ func TestEndorserUninvokableSysCC(t *testing.T) {
}
func TestEndorserCCInvocationFailed(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -112,7 +112,7 @@ func TestEndorserCCInvocationFailed(t *testing.T) {
}
func TestEndorserNoCCDef(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -132,7 +132,7 @@ func TestEndorserNoCCDef(t *testing.T) {
}
func TestEndorserBadInstPolicy(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -152,7 +152,7 @@ func TestEndorserBadInstPolicy(t *testing.T) {
}
func TestEndorserSysCC(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -172,7 +172,7 @@ func TestEndorserSysCC(t *testing.T) {
}
func TestEndorserCCInvocationError(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -191,7 +191,7 @@ func TestEndorserCCInvocationError(t *testing.T) {
}
func TestEndorserLSCCBadType(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -219,7 +219,7 @@ func TestEndorserLSCCBadType(t *testing.T) {
}
func TestEndorserDupTXId(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -238,7 +238,7 @@ func TestEndorserDupTXId(t *testing.T) {
}
func TestEndorserBadACL(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -258,7 +258,7 @@ func TestEndorserBadACL(t *testing.T) {
}
func TestEndorserGoodPathEmptyChannel(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -277,7 +277,7 @@ func TestEndorserGoodPathEmptyChannel(t *testing.T) {
}
func TestEndorserLSCCInitFails(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -308,7 +308,7 @@ func TestEndorserLSCCDeploySysCC(t *testing.T) {
SysCCMap := make(map[string]struct{})
deployedCCName := "barf"
SysCCMap[deployedCCName] = struct{}{}
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -341,7 +341,7 @@ func TestEndorserLSCCJava1(t *testing.T) {
t.Skip("Java chaincode is supported")
}
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -374,7 +374,7 @@ func TestEndorserLSCCJava2(t *testing.T) {
t.Skip("Java chaincode is supported")
}
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -402,7 +402,7 @@ func TestEndorserLSCCJava2(t *testing.T) {
}
func TestEndorserGoodPathWEvents(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -422,7 +422,7 @@ func TestEndorserGoodPathWEvents(t *testing.T) {
}
func TestEndorserBadChannel(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -442,7 +442,7 @@ func TestEndorserBadChannel(t *testing.T) {
}
func TestEndorserGoodPath(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -461,7 +461,7 @@ func TestEndorserGoodPath(t *testing.T) {
}
func TestEndorserLSCC(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -488,7 +488,7 @@ func TestEndorserLSCC(t *testing.T) {
}
func TestSimulateProposal(t *testing.T) {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......@@ -508,7 +508,7 @@ func TestEndorserJavaChecks(t *testing.T) {
t.Skip("Java chaincode is supported")
}
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
es := NewEndorserServer(func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return nil
}, &em.MockSupport{
GetApplicationConfigBoolRv: true,
......
......@@ -327,8 +327,20 @@ func (cs *collectionStore) RetrieveCollection(common.CollectionCriteria) (privda
panic("implement me")
}
func (cs *collectionStore) RetrieveCollectionConfigPackage(common.CollectionCriteria) (*common.CollectionConfigPackage, error) {
panic("implement me")
func (cs *collectionStore) RetrieveCollectionConfigPackage(cc common.CollectionCriteria) (*common.CollectionConfigPackage, error) {
return &common.CollectionConfigPackage{
Config: []*common.CollectionConfig{
{
Payload: &common.CollectionConfig_StaticCollectionConfig{
StaticCollectionConfig: &common.StaticCollectionConfig{
Name: cc.Collection,
MaximumPeerCount: 1,
RequiredPeerCount: 1,
},
},
},
},
}, nil
}
type collectionAccessPolicy struct {
......
......@@ -38,7 +38,7 @@ type gossipAdapter interface {
// PvtDataDistributor interface to defines API of distributing private data
type PvtDataDistributor interface {
// Distribute broadcast reliably private data read write set based on policies
Distribute(txID string, privData *rwset.TxPvtReadWriteSet, cs privdata.CollectionStore) error
Distribute(txID string, privData *rwset.TxPvtReadWriteSet, cs privdata.CollectionStore, blkHt uint64) error
}
// distributorImpl the implementation of the private data distributor interface
......@@ -57,8 +57,8 @@ func NewDistributor(chainID string, gossip gossipAdapter) PvtDataDistributor {
}
// Distribute broadcast reliably private data read write set based on policies
func (d *distributorImpl) Distribute(txID string, privData *rwset.TxPvtReadWriteSet, cs privdata.CollectionStore) error {
disseminationPlan, err := d.computeDisseminationPlan(txID, privData, cs)
func (d *distributorImpl) Distribute(txID string, privData *rwset.TxPvtReadWriteSet, cs privdata.CollectionStore, blkHt uint64) error {
disseminationPlan, err := d.computeDisseminationPlan(txID, privData, cs, blkHt)
if err != nil {
return errors.WithStack(err)
}
......@@ -70,7 +70,10 @@ type dissemination struct {
criteria gossip2.SendCriteria
}
func (d *distributorImpl) computeDisseminationPlan(txID string, privData *rwset.TxPvtReadWriteSet, cs privdata.CollectionStore) ([]*dissemination, error) {
func (d *distributorImpl) computeDisseminationPlan(txID string,
privData *rwset.TxPvtReadWriteSet,
cs privdata.CollectionStore,
blkHt uint64) ([]*dissemination, error) {
var disseminationPlan []*dissemination
for _, pvtRwset := range privData.NsPvtRwset {
namespace := pvtRwset.Namespace
......@@ -94,7 +97,13 @@ func (d *distributorImpl) computeDisseminationPlan(txID string, privData *rwset.
return nil, errors.Errorf("No collection access policy filter computed for %v", cc)
}
pvtDataMsg, err := d.createPrivateDataMessage(txID, namespace, collection.CollectionName, collection.Rwset)
colCP, err := cs.RetrieveCollectionConfigPackage(cc)
if err != nil {
logger.Error("Failed to load collection config package, collection criteria", cc, "error", err)
return nil, errors.WithMessage(err, fmt.Sprintf("collection config package, for %v not found", cc))
}
pvtDataMsg, err := d.createPrivateDataMessage(txID, namespace, collection, colCP, blkHt)
if err != nil {
return nil, errors.WithStack(err)
}
......@@ -163,7 +172,10 @@ func (d *distributorImpl) disseminate(disseminationPlan []*dissemination) error
return nil
}
func (d *distributorImpl) createPrivateDataMessage(txID, namespace, collectionName string, rwset []byte) (*proto.SignedGossipMessage, error) {
func (d *distributorImpl) createPrivateDataMessage(txID, namespace string,
collection *rwset.CollectionPvtReadWriteSet,
ccp *common.CollectionConfigPackage,
blkHt uint64) (*proto.SignedGossipMessage, error) {
msg := &proto.GossipMessage{
Channel: []byte(d.chainID),
Nonce: util.RandomUInt64(),
......@@ -172,9 +184,11 @@ func (d *distributorImpl) createPrivateDataMessage(txID, namespace, collectionNa
PrivateData: &proto.PrivateDataMessage{
Payload: &proto.PrivatePayload{
Namespace: namespace,
CollectionName: collectionName,
CollectionName: collection.CollectionName,
TxId: txID,
PrivateRwset: rwset,
PrivateRwset: collection.Rwset,
PrivateSimHeight: blkHt,
CollectionConfigs: ccp,
},
},
},
......
......@@ -87,9 +87,9 @@ func TestDistributor(t *testing.T) {
Collection: "c2",
}).andIsLenient()
pvtData := pdFactory.addRWSet().addNSRWSet("ns1", "c1", "c2").addRWSet().addNSRWSet("ns2", "c1", "c2").create()
err := d.Distribute("tx1", pvtData[0].WriteSet, cs)
err := d.Distribute("tx1", pvtData[0].WriteSet, cs, 0)
assert.NoError(t, err)
err = d.Distribute("tx2", pvtData[1].WriteSet, cs)
err = d.Distribute("tx2", pvtData[1].WriteSet, cs, 0)
assert.NoError(t, err)
assertACL := func(pp *proto.PrivatePayload, sc gossip2.SendCriteria) {
......@@ -116,14 +116,14 @@ func TestDistributor(t *testing.T) {
// Bad path: dependencies (gossip and others) don't work properly
g.err = errors.New("failed obtaining filter")
err = d.Distribute("tx1", pvtData[0].WriteSet, cs)
err = d.Distribute("tx1", pvtData[0].WriteSet, cs, 0)
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed obtaining filter")
g.Mock = mock.Mock{}
g.On("SendByCriteria", mock.Anything, mock.Anything).Return(errors.New("failed sending"))
g.err = nil
err = d.Distribute("tx1", pvtData[0].WriteSet, cs)
err = d.Distribute("tx1", pvtData[0].WriteSet, cs, 0)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Failed disseminating 2 out of 2 private RWSets")
}
......@@ -43,7 +43,7 @@ type GossipService interface {
// DistributePrivateData distributes private data to the peers in the collections
// according to policies induced by the PolicyStore and PolicyParser
DistributePrivateData(chainID string, txID string, privateData *rwset.TxPvtReadWriteSet) error
DistributePrivateData(chainID string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error
// NewConfigEventer creates a ConfigProcessor which the channelconfig.BundleSource can ultimately route config updates to
NewConfigEventer() ConfigProcessor
// InitializeChannel allocates the state provider and should be invoked once per channel per execution
......@@ -171,7 +171,7 @@ func GetGossipService() GossipService {
}
// DistributePrivateData distribute private read write set inside the channel based on the collections policies
func (g *gossipServiceImpl) DistributePrivateData(chainID string, txID string, privData *rwset.TxPvtReadWriteSet) error {
func (g *gossipServiceImpl) DistributePrivateData(chainID string, txID string, privData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
g.lock.RLock()
handler, exists := g.privateHandlers[chainID]
g.lock.RUnlock()
......@@ -179,7 +179,7 @@ func (g *gossipServiceImpl) DistributePrivateData(chainID string, txID string, p
return errors.Errorf("No private data handler for %s", chainID)
}
if err := handler.distributor.Distribute(txID, privData, handler.support.Cs); err != nil {
if err := handler.distributor.Distribute(txID, privData, handler.support.Cs, blkHt); err != nil {
logger.Error("Failed to distributed private collection, txID", txID, "channel", chainID, "due to", err)
return err
}
......
......@@ -235,8 +235,8 @@ func serve(args []string) error {
// Start the Admin server
startAdminServer(listenAddr, peerServer.Server())
privDataDist := func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet) error {
return service.GetGossipService().DistributePrivateData(channel, txID, privateData)
privDataDist := func(channel string, txID string, privateData *rwset.TxPvtReadWriteSet, blkHt uint64) error {
return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt)
}
serverEndorser := endorser.NewEndorserServer(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment