start.go 33.7 KB
Newer Older
1
/*
2
Copyright IBM Corp. All Rights Reserved.
3

4
SPDX-License-Identifier: Apache-2.0
5
6
7
8
9
10
11
12
13
14
15
16
17
*/

package node

import (
	"fmt"
	"net"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

18
	"github.com/golang/protobuf/proto"
19
	"github.com/hyperledger/fabric/common/cauthdsl"
20
	ccdef "github.com/hyperledger/fabric/common/chaincode"
21
	"github.com/hyperledger/fabric/common/crypto/tlsgen"
22
	"github.com/hyperledger/fabric/common/deliver"
Gari Singh's avatar
Gari Singh committed
23
	"github.com/hyperledger/fabric/common/flogging"
24
	floggingmetrics "github.com/hyperledger/fabric/common/flogging/metrics"
25
26
	"github.com/hyperledger/fabric/common/grpclogging"
	"github.com/hyperledger/fabric/common/grpcmetrics"
27
	"github.com/hyperledger/fabric/common/localmsp"
28
	"github.com/hyperledger/fabric/common/metadata"
29
	"github.com/hyperledger/fabric/common/metrics"
30
	"github.com/hyperledger/fabric/common/policies"
31
	"github.com/hyperledger/fabric/common/viperutil"
32
	"github.com/hyperledger/fabric/core/aclmgmt"
33
	"github.com/hyperledger/fabric/core/aclmgmt/resources"
34
	"github.com/hyperledger/fabric/core/admin"
35
	cc "github.com/hyperledger/fabric/core/cclifecycle"
36
	"github.com/hyperledger/fabric/core/chaincode"
37
	"github.com/hyperledger/fabric/core/chaincode/accesscontrol"
38
	"github.com/hyperledger/fabric/core/chaincode/lifecycle"
39
	"github.com/hyperledger/fabric/core/chaincode/persistence"
40
41
42
43
44
	"github.com/hyperledger/fabric/core/chaincode/platforms"
	"github.com/hyperledger/fabric/core/chaincode/platforms/car"
	"github.com/hyperledger/fabric/core/chaincode/platforms/golang"
	"github.com/hyperledger/fabric/core/chaincode/platforms/java"
	"github.com/hyperledger/fabric/core/chaincode/platforms/node"
45
	"github.com/hyperledger/fabric/core/comm"
46
	"github.com/hyperledger/fabric/core/committer/txvalidator"
47
	"github.com/hyperledger/fabric/core/common/ccprovider"
48
	"github.com/hyperledger/fabric/core/common/privdata"
49
	"github.com/hyperledger/fabric/core/container"
50
51
	"github.com/hyperledger/fabric/core/container/dockercontroller"
	"github.com/hyperledger/fabric/core/container/inproccontroller"
jeffgarratt's avatar
jeffgarratt committed
52
	"github.com/hyperledger/fabric/core/endorser"
53
	authHandler "github.com/hyperledger/fabric/core/handlers/auth"
54
55
	endorsement2 "github.com/hyperledger/fabric/core/handlers/endorsement/api"
	endorsement3 "github.com/hyperledger/fabric/core/handlers/endorsement/api/identities"
56
	"github.com/hyperledger/fabric/core/handlers/library"
57
	validation "github.com/hyperledger/fabric/core/handlers/validation/api"
58
	"github.com/hyperledger/fabric/core/ledger/cceventmgmt"
59
	"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
60
	"github.com/hyperledger/fabric/core/operations"
61
	"github.com/hyperledger/fabric/core/peer"
62
	"github.com/hyperledger/fabric/core/scc"
63
64
65
	"github.com/hyperledger/fabric/core/scc/cscc"
	"github.com/hyperledger/fabric/core/scc/lscc"
	"github.com/hyperledger/fabric/core/scc/qscc"
yacovm's avatar
yacovm committed
66
67
68
69
70
71
72
	"github.com/hyperledger/fabric/discovery"
	"github.com/hyperledger/fabric/discovery/endorsement"
	discsupport "github.com/hyperledger/fabric/discovery/support"
	discacl "github.com/hyperledger/fabric/discovery/support/acl"
	ccsupport "github.com/hyperledger/fabric/discovery/support/chaincode"
	"github.com/hyperledger/fabric/discovery/support/config"
	"github.com/hyperledger/fabric/discovery/support/gossip"
73
	gossipcommon "github.com/hyperledger/fabric/gossip/common"
74
	"github.com/hyperledger/fabric/gossip/service"
75
	"github.com/hyperledger/fabric/msp"
76
	"github.com/hyperledger/fabric/msp/mgmt"
77
	peergossip "github.com/hyperledger/fabric/peer/gossip"
78
	"github.com/hyperledger/fabric/peer/version"
79
	cb "github.com/hyperledger/fabric/protos/common"
80
	common2 "github.com/hyperledger/fabric/protos/common"
yacovm's avatar
yacovm committed
81
	discprotos "github.com/hyperledger/fabric/protos/discovery"
82
	pb "github.com/hyperledger/fabric/protos/peer"
83
	"github.com/hyperledger/fabric/protos/token"
84
	"github.com/hyperledger/fabric/protos/transientstore"
85
	"github.com/hyperledger/fabric/protos/utils"
86
	"github.com/hyperledger/fabric/token/server"
87
	"github.com/pkg/errors"
88
89
90
91
92
	"github.com/spf13/cobra"
	"github.com/spf13/viper"
	"google.golang.org/grpc"
)

93
const (
94
	chaincodeAddrKey       = "peer.chaincodeAddress"
95
96
	chaincodeListenAddrKey = "peer.chaincodeListenAddress"
	defaultChaincodePort   = 7052
97
	grpcMaxConcurrency     = 2500
98
99
)

100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
var chaincodeDevMode bool

func startCmd() *cobra.Command {
	// Set the flags on the node start command.
	flags := nodeStartCmd.Flags()
	flags.BoolVarP(&chaincodeDevMode, "peer-chaincodedev", "", false,
		"Whether peer in chaincode development mode")

	return nodeStartCmd
}

var nodeStartCmd = &cobra.Command{
	Use:   "start",
	Short: "Starts the node.",
	Long:  `Starts a node that interacts with the network.`,
	RunE: func(cmd *cobra.Command, args []string) error {
116
117
118
119
120
		if len(args) != 0 {
			return fmt.Errorf("trailing args detected")
		}
		// Parsing of the command line is done so silence cmd usage
		cmd.SilenceUsage = true
121
122
123
124
125
		return serve(args)
	},
}

func serve(args []string) error {
126
127
128
129
130
131
132
133
134
135
136
	// currently the peer only works with the standard MSP
	// because in certain scenarios the MSP has to make sure
	// that from a single credential you only have a single 'identity'.
	// Idemix does not support this *YET* but it can be easily
	// fixed to support it. For now, we just make sure that
	// the peer only comes up with the standard MSP
	mspType := mgmt.GetLocalMSP().GetType()
	if mspType != msp.FABRIC {
		panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))
	}

137
138
139
140
141
	// Trace RPCs with the golang.org/x/net/trace package. This was moved out of
	// the deliver service connection factory as it has process wide implications
	// and was racy with respect to initialization of gRPC clients and servers.
	grpc.EnableTracing = true

142
	logger.Infof("Starting %s", version.GetInfo())
143

144
145
	//startup aclmgmt with default ACL providers (resource based and default 1.0 policies based).
	//Users can pass in their own ACLProvider to RegisterACLProvider (currently unit tests do this)
146
147
148
	aclProvider := aclmgmt.NewACLProvider(
		aclmgmt.ResourceGetter(peer.GetStableChannelConfig),
	)
149

150
151
152
153
154
155
156
	pr := platforms.NewRegistry(
		&golang.Platform{},
		&node.Platform{},
		&java.Platform{},
		&car.Platform{},
	)

157
158
	deployedCCInfoProvider := &lscc.DeployedCCInfoProvider{}

159
160
161
162
	identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer {
		return mgmt.GetManagerForChain(chainID)
	}

163
164
	opsSystem := newOperationsSystem()
	err := opsSystem.Start()
165
	if err != nil {
166
		return errors.WithMessage(err, "failed to initialize operations subystems")
167
	}
168
169
170
	defer opsSystem.Stop()

	metricsProvider := opsSystem.Provider
171
172
	logObserver := floggingmetrics.NewObserver(metricsProvider)
	flogging.Global.SetObserver(logObserver)
173

174
	membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory)
175
	//initialize resource management exit
176
177
	ledgermgmt.Initialize(
		&ledgermgmt.Initializer{
178
179
180
			CustomTxProcessors:            peer.ConfigTxProcessors,
			PlatformRegistry:              pr,
			DeployedChaincodeInfoProvider: deployedCCInfoProvider,
181
			MembershipInfoProvider:        membershipInfoProvider,
182
			MetricsProvider:               metricsProvider,
183
			HealthCheckRegistry:           opsSystem,
184
185
		},
	)
186

187
	// Parameter overrides must be processed before any parameters are
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
	// cached. Failures to cache cause the server to terminate immediately.
	if chaincodeDevMode {
		logger.Info("Running in chaincode development mode")
		logger.Info("Disable loading validity system chaincode")

		viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)
	}

	if err := peer.CacheConfiguration(); err != nil {
		return err
	}

	peerEndpoint, err := peer.GetPeerEndpoint()
	if err != nil {
		err = fmt.Errorf("Failed to get Peer Endpoint: %s", err)
		return err
	}
205
206

	peerHost, _, err := net.SplitHostPort(peerEndpoint.Address)
207
208
209
	if err != nil {
		return fmt.Errorf("peer address is not in the format of host:port: %v", err)
	}
210
211

	listenAddr := viper.GetString("peer.listenAddress")
212
	serverConfig, err := peer.GetServerConfig()
213
	if err != nil {
214
		logger.Fatalf("Error loading secure config for peer (%s)", err)
215
	}
216

217
	throttle := comm.NewThrottle(grpcMaxConcurrency)
218
	serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer")
219
220
221
222
223
	serverConfig.MetricsProvider = metricsProvider
	serverConfig.UnaryInterceptors = append(
		serverConfig.UnaryInterceptors,
		grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),
		grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
224
		throttle.UnaryServerIntercptor,
225
226
227
228
229
	)
	serverConfig.StreamInterceptors = append(
		serverConfig.StreamInterceptors,
		grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),
		grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
230
		throttle.StreamServerInterceptor,
231
232
	)

233
	peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
234
235
236
	if err != nil {
		logger.Fatalf("Failed to create peer server (%s)", err)
	}
237

238
	if serverConfig.SecOpts.UseTLS {
239
		logger.Info("Starting peer with TLS enabled")
240
241
		// set up credential support
		cs := comm.GetCredentialSupport()
242
243
244
245
246
		roots, err := peer.GetServerRootCAs()
		if err != nil {
			logger.Fatalf("Failed to set TLS server root CAs: %s", err)
		}
		cs.ServerRootCAs = roots
247
248
249
250

		// set the cert to use if client auth is requested by remote endpoints
		clientCert, err := peer.GetClientCertificate()
		if err != nil {
251
			logger.Fatalf("Failed to set TLS client certificate: %s", err)
252
253
		}
		comm.GetCredentialSupport().SetClientCertificate(clientCert)
254
255
	}

256
	mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
257
	policyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc {
258
		return func(env *cb.Envelope, channelID string) error {
259
			return aclProvider.CheckACL(resourceName, channelID, env)
260
		}
261
	}
262

Will Lahti's avatar
Will Lahti committed
263
	abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
264
	pb.RegisterDeliverServer(peerServer.Server(), abServer)
265

266
	// Initialize chaincode service
267
	chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
268

269
	logger.Debugf("Running peer")
270

271
	// Start the Admin server
272
	startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
273

274
	privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {
275
		return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt)
276
277
	}

278
279
280
281
282
283
	signingIdentity := mgmt.GetLocalSigningIdentityOrPanic()
	serializedIdentity, err := signingIdentity.Serialize()
	if err != nil {
		logger.Panicf("Failed serializing self identity: %v", err)
	}

284
285
286
	libConf := library.Config{}
	if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err != nil {
		return errors.WithMessage(err, "could not load YAML config")
287
	}
288
289
290
291
292
293
294
295
	reg := library.InitRegistry(libConf)

	authFilters := reg.Lookup(library.Auth).([]authHandler.Filter)
	endorserSupport := &endorser.SupportImpl{
		SignerSupport:    signingIdentity,
		Peer:             peer.Default,
		PeerSupport:      peer.DefaultSupport,
		ChaincodeSupport: chaincodeSupport,
296
		SysCCProvider:    sccp,
297
		ACLProvider:      aclProvider,
298
	}
299
300
	endorsementPluginsByName := reg.Lookup(library.Endorsement).(map[string]endorsement2.PluginFactory)
	validationPluginsByName := reg.Lookup(library.Validation).(map[string]validation.PluginFactory)
301
302
	signingIdentityFetcher := (endorsement3.SigningIdentityFetcher)(endorserSupport)
	channelStateRetriever := endorser.ChannelStateRetriever(endorserSupport)
303
	pluginMapper := endorser.MapBasedPluginMapper(endorsementPluginsByName)
304
305
306
307
308
309
	pluginEndorser := endorser.NewPluginEndorser(&endorser.PluginSupport{
		ChannelStateRetriever:   channelStateRetriever,
		TransientStoreRetriever: peer.TransientStoreFactory,
		PluginMapper:            pluginMapper,
		SigningIdentityFetcher:  signingIdentityFetcher,
	})
310
	endorserSupport.PluginEndorser = pluginEndorser
muralisr's avatar
muralisr committed
311
	serverEndorser := endorser.NewEndorserServer(privDataDist, endorserSupport, pr, metricsProvider)
312
	auth := authHandler.ChainFilters(serverEndorser, authFilters...)
313
314
	// Register the Endorser server
	pb.RegisterEndorserServer(peerServer.Server(), auth)
jeffgarratt's avatar
jeffgarratt committed
315

316
	policyMgr := peer.NewChannelPolicyManagerGetter()
317

318
319
	// Initialize gossip component
	err = initGossipService(policyMgr, peerServer, serializedIdentity, peerEndpoint.Address)
320
321
322
	if err != nil {
		return err
	}
323
324
	defer service.GetGossipService().Stop()

325
	// register prover grpc service
326
327
328
329
330
	// FAB-12971 disable prover service before v1.4 cut. Will uncomment after v1.4 cut
	// err = registerProverService(peerServer, aclProvider, signingIdentity)
	// if err != nil {
	// 	return err
	// }
331

332
	// initialize system chaincodes
333

334
	// deploy system chaincodes
335
	sccp.DeploySysCCs("", ccp)
336
337
	logger.Infof("Deployed system chaincodes")

338
	installedCCs := func() ([]ccdef.InstalledChaincode, error) {
339
		return packageProvider.ListInstalledChaincodes()
340
341
342
343
344
345
346
347
348
	}
	lifecycle, err := cc.NewLifeCycle(cc.Enumerate(installedCCs))
	if err != nil {
		logger.Panicf("Failed creating lifecycle: +%v", err)
	}
	onUpdate := cc.HandleMetadataUpdate(func(channel string, chaincodes ccdef.MetadataSet) {
		service.GetGossipService().UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChainID(channel))
	})
	lifecycle.AddListener(onUpdate)
349

350
	// this brings up all the channels
351
	peer.Initialize(func(cid string) {
352
		logger.Debugf("Deploying system CC, for channel <%s>", cid)
353
		sccp.DeploySysCCs(cid, ccp)
354
		sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) {
355
			return peer.GetLedger(cid).NewQueryExecutor()
356
		}))
357
358
359
360
		if err != nil {
			logger.Panicf("Failed subscribing to chaincode lifecycle updates")
		}
		cceventmgmt.GetMgr().Register(cid, sub)
361
362
	}, ccp, sccp, txvalidator.MapBasedPluginMapper(validationPluginsByName),
		pr, deployedCCInfoProvider, membershipInfoProvider, metricsProvider)
363

yacovm's avatar
yacovm committed
364
	if viper.GetBool("peer.discovery.enabled") {
365
		registerDiscoveryService(peerServer, policyMgr, lifecycle)
yacovm's avatar
yacovm committed
366
367
	}

368
369
370
	networkID := viper.GetString("peer.networkId")

	logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)
371

372
373
374
375
376
	// Get configuration before starting go routines to avoid
	// racing in tests
	profileEnabled := viper.GetBool("peer.profile.enabled")
	profileListenAddress := viper.GetString("peer.profile.listenAddress")

377
378
379
380
381
382
	// Start the grpc server. Done in a goroutine so we can deploy the
	// genesis block if needed.
	serve := make(chan error)

	go func() {
		var grpcErr error
383
		if grpcErr = peerServer.Start(); grpcErr != nil {
384
385
			grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)
		} else {
386
			logger.Info("peer server exited")
387
388
389
390
		}
		serve <- grpcErr
	}()

Binh Q. Nguyen's avatar
Binh Q. Nguyen committed
391
	// Start profiling http endpoint if enabled
392
	if profileEnabled {
393
394
395
396
397
398
399
400
		go func() {
			logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)
			if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {
				logger.Errorf("Error starting profiler: %s", profileErr)
			}
		}()
	}

401
	go handleSignals(addPlatformSignals(map[os.Signal]func(){
402
403
		syscall.SIGINT:  func() { serve <- nil },
		syscall.SIGTERM: func() { serve <- nil },
404
	}))
405

406
	logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)
407

408
409
410
411
	// Block until grpc server exits
	return <-serve
}

412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
func handleSignals(handlers map[os.Signal]func()) {
	var signals []os.Signal
	for sig := range handlers {
		signals = append(signals, sig)
	}

	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, signals...)

	for sig := range signalChan {
		logger.Infof("Received signal: %d (%s)", sig, sig)
		handlers[sig]()
	}
}

427
428
429
430
431
432
433
434
435
436
func localPolicy(policyObject proto.Message) policies.Policy {
	localMSP := mgmt.GetLocalMSP()
	pp := cauthdsl.NewPolicyProvider(localMSP)
	policy, _, err := pp.NewPolicy(utils.MarshalOrPanic(policyObject))
	if err != nil {
		logger.Panicf("Failed creating local policy: +%v", err)
	}
	return policy
}

437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
func createSelfSignedData() common2.SignedData {
	sId := mgmt.GetLocalSigningIdentityOrPanic()
	msg := make([]byte, 32)
	sig, err := sId.Sign(msg)
	if err != nil {
		logger.Panicf("Failed creating self signed data because message signing failed: %v", err)
	}
	peerIdentity, err := sId.Serialize()
	if err != nil {
		logger.Panicf("Failed creating self signed data because peer identity couldn't be serialized: %v", err)
	}
	return common2.SignedData{
		Data:      msg,
		Signature: sig,
		Identity:  peerIdentity,
	}
}

455
func registerDiscoveryService(peerServer *comm.GRPCServer, polMgr policies.ChannelPolicyManagerGetter, lc *cc.Lifecycle) {
456
457
458
459
460
	mspID := viper.GetString("peer.localMspId")
	localAccessPolicy := localPolicy(cauthdsl.SignedByAnyAdmin([]string{mspID}))
	if viper.GetBool("peer.discovery.orgMembersAllowedAccess") {
		localAccessPolicy = localPolicy(cauthdsl.SignedByAnyMember([]string{mspID}))
	}
461
	channelVerifier := discacl.NewChannelVerifier(policies.ChannelApplicationWriters, polMgr)
462
	acl := discacl.NewDiscoverySupport(channelVerifier, localAccessPolicy, discacl.ChannelConfigGetterFunc(peer.GetStableChannelConfig))
yacovm's avatar
yacovm committed
463
464
465
466
467
468
469
	gSup := gossip.NewDiscoverySupport(service.GetGossipService())
	ccSup := ccsupport.NewDiscoverySupport(lc)
	ea := endorsement.NewEndorsementAnalyzer(gSup, ccSup, acl, lc)
	confSup := config.NewDiscoverySupport(config.CurrentConfigBlockGetterFunc(peer.GetCurrConfigBlock))
	support := discsupport.NewDiscoverySupport(acl, gSup, ea, confSup, acl)
	svc := discovery.NewService(discovery.Config{
		TLS:                          peerServer.TLSEnabled(),
470
		AuthCacheEnabled:             viper.GetBool("peer.discovery.authCacheEnabled"),
yacovm's avatar
yacovm committed
471
472
473
474
475
476
477
		AuthCacheMaxSize:             viper.GetInt("peer.discovery.authCacheMaxSize"),
		AuthCachePurgeRetentionRatio: viper.GetFloat64("peer.discovery.authCachePurgeRetentionRatio"),
	}, support)
	logger.Info("Discovery service activated")
	discprotos.RegisterDiscoveryServer(peerServer.Server(), svc)
}

478
//create a CC listener using peer.chaincodeListenAddress (and if that's not set use peer.peerAddress)
479
func createChaincodeServer(ca tlsgen.CA, peerHostname string) (srv *comm.GRPCServer, ccEndpoint string, err error) {
480
481
482
483
484
485
486
487
488
489
490
491
492
493
	// before potentially setting chaincodeListenAddress, compute chaincode endpoint at first
	ccEndpoint, err = computeChaincodeEndpoint(peerHostname)
	if err != nil {
		if chaincode.IsDevMode() {
			// if any error for dev mode, we use 0.0.0.0:7052
			ccEndpoint = fmt.Sprintf("%s:%d", "0.0.0.0", defaultChaincodePort)
			logger.Warningf("use %s as chaincode endpoint because of error in computeChaincodeEndpoint: %s", ccEndpoint, err)
		} else {
			// for non-dev mode, we have to return error
			logger.Errorf("Error computing chaincode endpoint: %s", err)
			return nil, "", err
		}
	}

494
495
496
497
498
	host, _, err := net.SplitHostPort(ccEndpoint)
	if err != nil {
		logger.Panic("Chaincode service host", ccEndpoint, "isn't a valid hostname:", err)
	}

499
500
501
502
503
504
	cclistenAddress := viper.GetString(chaincodeListenAddrKey)
	if cclistenAddress == "" {
		cclistenAddress = fmt.Sprintf("%s:%d", peerHostname, defaultChaincodePort)
		logger.Warningf("%s is not set, using %s", chaincodeListenAddrKey, cclistenAddress)
		viper.Set(chaincodeListenAddrKey, cclistenAddress)
	}
505

506
	config, err := peer.GetServerConfig()
507
	if err != nil {
508
509
		logger.Errorf("Error getting server config: %s", err)
		return nil, "", err
510
511
	}

Gari Singh's avatar
Gari Singh committed
512
	// set the logger for the server
513
	config.Logger = flogging.MustGetLogger("core.comm").With("server", "ChaincodeServer")
Gari Singh's avatar
Gari Singh committed
514

515
	// Override TLS configuration if TLS is applicable
516
	if config.SecOpts.UseTLS {
517
518
519
520
521
522
523
524
525
526
527
528
		// Create a self-signed TLS certificate with a SAN that matches the computed chaincode endpoint
		certKeyPair, err := ca.NewServerCertKeyPair(host)
		if err != nil {
			logger.Panicf("Failed generating TLS certificate for chaincode service: +%v", err)
		}
		config.SecOpts = &comm.SecureOptions{
			UseTLS: true,
			// Require chaincode shim to authenticate itself
			RequireClientCert: true,
			// Trust only client certificates signed by ourselves
			ClientRootCAs: [][]byte{ca.CertBytes()},
			// Use our own self-signed TLS certificate and key
529
530
			Certificate: certKeyPair.Cert,
			Key:         certKeyPair.Key,
531
532
533
534
			// No point in specifying server root CAs since this TLS config is only used for
			// a gRPC server and not a client
			ServerRootCAs: nil,
		}
535
536
	}

537
538
539
540
541
542
543
544
545
	// Chaincode keepalive options - static for now
	chaincodeKeepaliveOptions := &comm.KeepaliveOptions{
		ServerInterval:    time.Duration(2) * time.Hour,    // 2 hours - gRPC default
		ServerTimeout:     time.Duration(20) * time.Second, // 20 sec - gRPC default
		ServerMinInterval: time.Duration(1) * time.Minute,  // match ClientInterval
	}
	config.KaOpts = chaincodeKeepaliveOptions

	srv, err = comm.NewGRPCServer(cclistenAddress, config)
546
	if err != nil {
547
548
		logger.Errorf("Error creating GRPC server: %s", err)
		return nil, "", err
549
550
	}

551
	return srv, ccEndpoint, nil
552
553
}

554
555
// computeChaincodeEndpoint will utilize chaincode address, chaincode listen
// address (these two are from viper) and peer address to compute chaincode endpoint.
556
// There could be following cases of computing chaincode endpoint:
557
// Case A: if chaincodeAddrKey is set, use it if not "0.0.0.0" (or "::")
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
// Case B: else if chaincodeListenAddrKey is set and not "0.0.0.0" or ("::"), use it
// Case C: else use peer address if not "0.0.0.0" (or "::")
// Case D: else return error
func computeChaincodeEndpoint(peerHostname string) (ccEndpoint string, err error) {
	logger.Infof("Entering computeChaincodeEndpoint with peerHostname: %s", peerHostname)
	// set this to the host/ip the chaincode will resolve to. It could be
	// the same address as the peer (such as in the sample docker env using
	// the container name as the host name across the board)
	ccEndpoint = viper.GetString(chaincodeAddrKey)
	if ccEndpoint == "" {
		// the chaincodeAddrKey is not set, try to get the address from listener
		// (may finally use the peer address)
		ccEndpoint = viper.GetString(chaincodeListenAddrKey)
		if ccEndpoint == "" {
			// Case C: chaincodeListenAddrKey is not set, use peer address
			peerIp := net.ParseIP(peerHostname)
			if peerIp != nil && peerIp.IsUnspecified() {
				// Case D: all we have is "0.0.0.0" or "::" which chaincode cannot connect to
				logger.Errorf("ChaincodeAddress and chaincodeListenAddress are nil and peerIP is %s", peerIp)
				return "", errors.New("invalid endpoint for chaincode to connect")
			}
579

580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
			// use peerAddress:defaultChaincodePort
			ccEndpoint = fmt.Sprintf("%s:%d", peerHostname, defaultChaincodePort)

		} else {
			// Case B: chaincodeListenAddrKey is set
			host, port, err := net.SplitHostPort(ccEndpoint)
			if err != nil {
				logger.Errorf("ChaincodeAddress is nil and fail to split chaincodeListenAddress: %s", err)
				return "", err
			}

			ccListenerIp := net.ParseIP(host)
			// ignoring other values such as Multicast address etc ...as the server
			// wouldn't start up with this address anyway
			if ccListenerIp != nil && ccListenerIp.IsUnspecified() {
				// Case C: if "0.0.0.0" or "::", we have to use peer address with the listen port
				peerIp := net.ParseIP(peerHostname)
				if peerIp != nil && peerIp.IsUnspecified() {
					// Case D: all we have is "0.0.0.0" or "::" which chaincode cannot connect to
					logger.Error("ChaincodeAddress is nil while both chaincodeListenAddressIP and peerIP are 0.0.0.0")
					return "", errors.New("invalid endpoint for chaincode to connect")
				}
				ccEndpoint = fmt.Sprintf("%s:%s", peerHostname, port)
			}
604

605
606
607
608
		}

	} else {
		// Case A: the chaincodeAddrKey is set
609
		if host, _, err := net.SplitHostPort(ccEndpoint); err != nil {
610
611
			logger.Errorf("Fail to split chaincodeAddress: %s", err)
			return "", err
612
613
614
615
616
617
		} else {
			ccIP := net.ParseIP(host)
			if ccIP != nil && ccIP.IsUnspecified() {
				logger.Errorf("ChaincodeAddress' IP cannot be %s in non-dev mode", ccIP)
				return "", errors.New("invalid endpoint for chaincode to connect")
			}
618
619
620
621
622
623
624
		}
	}

	logger.Infof("Exit with ccEndpoint: %s", ccEndpoint)
	return ccEndpoint, nil
}

625
//NOTE - when we implement JOIN we will no longer pass the chainID as param
626
627
//The chaincode support will come up without registering system chaincodes
//which will be registered only during join phase.
628
629
630
631
632
633
634
635
636
637
func registerChaincodeSupport(
	grpcServer *comm.GRPCServer,
	ccEndpoint string,
	ca tlsgen.CA,
	packageProvider *persistence.PackageProvider,
	aclProvider aclmgmt.ACLProvider,
	pr *platforms.Registry,
	lifecycleSCC *lifecycle.SCC,
	ops *operations.System,
) (*chaincode.ChaincodeSupport, ccprovider.ChaincodeProvider, *scc.Provider) {
638
	//get user mode
639
	userRunsCC := chaincode.IsDevMode()
640
	tlsEnabled := viper.GetBool("peer.tls.enabled")
641

642
	authenticator := accesscontrol.NewAuthenticator(ca)
643
	ipRegistry := inproccontroller.NewRegistry()
644

645
	sccp := scc.NewProvider(peer.Default, peer.DefaultSupport, ipRegistry)
646
647
	lsccInst := lscc.New(sccp, aclProvider, pr)

648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
	dockerProvider := dockercontroller.NewProvider(
		viper.GetString("peer.id"),
		viper.GetString("peer.networkId"),
		ops.Provider,
	)
	dockerVM := dockercontroller.NewDockerVM(
		dockerProvider.PeerID,
		dockerProvider.NetworkID,
		dockerProvider.BuildMetrics,
	)

	err := ops.RegisterChecker("docker", dockerVM)
	if err != nil {
		logger.Panicf("failed to register docker health check: %s", err)
	}

664
665
666
667
668
669
	chaincodeSupport := chaincode.NewChaincodeSupport(
		chaincode.GlobalConfig(),
		ccEndpoint,
		userRunsCC,
		ca.CertBytes(),
		authenticator,
670
		packageProvider,
671
		lsccInst,
672
		aclProvider,
673
674
		container.NewVMController(
			map[string]container.VMProvider{
675
				dockercontroller.ContainerType: dockerProvider,
676
677
678
				inproccontroller.ContainerType: ipRegistry,
			},
		),
679
		sccp,
680
		pr,
681
		peer.DefaultSupport,
682
		ops.Provider,
683
	)
684
	ipRegistry.ChaincodeSupport = chaincodeSupport
685
	ccp := chaincode.NewProvider(chaincodeSupport)
686

687
688
689
690
	ccSrv := pb.ChaincodeSupportServer(chaincodeSupport)
	if tlsEnabled {
		ccSrv = authenticator.Wrap(ccSrv)
	}
691

692
693
694
	csccInst := cscc.New(ccp, sccp, aclProvider)
	qsccInst := qscc.New(aclProvider)

695
	//Now that chaincode is initialized, register all system chaincodes.
696
	sccs := scc.CreatePluginSysCCs(sccp)
697
	for _, cc := range append([]scc.SelfDescribingSysCC{lsccInst, csccInst, qsccInst, lifecycleSCC}, sccs...) {
698
699
		sccp.RegisterSysCC(cc)
	}
700
	pb.RegisterChaincodeSupportServer(grpcServer.Server(), ccSrv)
701

702
	return chaincodeSupport, ccp, sccp
703
704
}

705
706
707
708
// startChaincodeServer will finish chaincode related initialization, including:
// 1) setup local chaincode install path
// 2) create chaincode specific tls CA
// 3) start the chaincode specific gRPC listening service
709
710
711
712
713
714
func startChaincodeServer(
	peerHost string,
	aclProvider aclmgmt.ACLProvider,
	pr *platforms.Registry,
	ops *operations.System,
) (*chaincode.ChaincodeSupport, ccprovider.ChaincodeProvider, *scc.Provider, *persistence.PackageProvider) {
715
716
717
718
	// Setup chaincode path
	chaincodeInstallPath := ccprovider.GetChaincodeInstallPathFromViper()
	ccprovider.SetChaincodesPath(chaincodeInstallPath)

719
720
721
722
723
724
	ccPackageParser := &persistence.ChaincodePackageParser{}
	ccStore := &persistence.Store{
		Path:       chaincodeInstallPath,
		ReadWriter: &persistence.FilesystemIO{},
	}

725
726
	packageProvider := &persistence.PackageProvider{
		LegacyPP: &ccprovider.CCInfoFSImpl{},
727
728
729
730
731
732
733
734
		Store:    ccStore,
	}

	lifecycleSCC := &lifecycle.SCC{
		Protobuf: &lifecycle.ProtobufImpl{},
		Functions: &lifecycle.Lifecycle{
			PackageParser:  ccPackageParser,
			ChaincodeStore: ccStore,
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
		},
	}

	// Create a self-signed CA for chaincode service
	ca, err := tlsgen.NewCA()
	if err != nil {
		logger.Panic("Failed creating authentication layer:", err)
	}
	ccSrv, ccEndpoint, err := createChaincodeServer(ca, peerHost)
	if err != nil {
		logger.Panicf("Failed to create chaincode server: %s", err)
	}
	chaincodeSupport, ccp, sccp := registerChaincodeSupport(
		ccSrv,
		ccEndpoint,
		ca,
		packageProvider,
		aclProvider,
		pr,
754
		lifecycleSCC,
755
		ops,
756
757
758
759
760
	)
	go ccSrv.Start()
	return chaincodeSupport, ccp, sccp, packageProvider
}

761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
func adminHasSeparateListener(peerListenAddr string, adminListenAddress string) bool {
	// By default, admin listens on the same port as the peer data service
	if adminListenAddress == "" {
		return false
	}
	_, peerPort, err := net.SplitHostPort(peerListenAddr)
	if err != nil {
		logger.Panicf("Failed parsing peer listen address")
	}

	_, adminPort, err := net.SplitHostPort(adminListenAddress)
	if err != nil {
		logger.Panicf("Failed parsing admin listen address")
	}
	// Admin service has a separate listener in case it doesn't match the peer's
	// configured service
	return adminPort != peerPort
}

780
func startAdminServer(peerListenAddr string, peerServer *grpc.Server, metricsProvider metrics.Provider) {
781
782
783
	adminListenAddress := viper.GetString("peer.adminService.listenAddress")
	separateLsnrForAdmin := adminHasSeparateListener(peerListenAddr, adminListenAddress)
	mspID := viper.GetString("peer.localMspId")
784
	adminPolicy := localPolicy(cauthdsl.SignedByAnyAdmin([]string{mspID}))
785
786
787
788
789
790
791
	gRPCService := peerServer
	if separateLsnrForAdmin {
		logger.Info("Creating gRPC server for admin service on", adminListenAddress)
		serverConfig, err := peer.GetServerConfig()
		if err != nil {
			logger.Fatalf("Error loading secure config for admin service (%s)", err)
		}
792
		throttle := comm.NewThrottle(grpcMaxConcurrency)
793
794
795
796
797
798
		serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "AdminServer")
		serverConfig.MetricsProvider = metricsProvider
		serverConfig.UnaryInterceptors = append(
			serverConfig.UnaryInterceptors,
			grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),
			grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
799
			throttle.UnaryServerIntercptor,
800
801
802
803
804
		)
		serverConfig.StreamInterceptors = append(
			serverConfig.StreamInterceptors,
			grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),
			grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
805
			throttle.StreamServerInterceptor,
806
		)
807
808
809
810
811
812
813
814
815
816
817
818
		adminServer, err := peer.NewPeerServer(adminListenAddress, serverConfig)
		if err != nil {
			logger.Fatalf("Failed to create admin server (%s)", err)
		}
		gRPCService = adminServer.Server()
		defer func() {
			go adminServer.Start()
		}()
	}

	pb.RegisterAdminServer(gRPCService, admin.NewAdminServer(adminPolicy))
}
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867

// secureDialOpts is the callback function for secure dial options for gossip service
func secureDialOpts() []grpc.DialOption {
	var dialOpts []grpc.DialOption
	// set max send/recv msg sizes
	dialOpts = append(
		dialOpts,
		grpc.WithDefaultCallOptions(
			grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize),
			grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize)))
	// set the keepalive options
	kaOpts := comm.DefaultKeepaliveOptions
	if viper.IsSet("peer.keepalive.client.interval") {
		kaOpts.ClientInterval = viper.GetDuration("peer.keepalive.client.interval")
	}
	if viper.IsSet("peer.keepalive.client.timeout") {
		kaOpts.ClientTimeout = viper.GetDuration("peer.keepalive.client.timeout")
	}
	dialOpts = append(dialOpts, comm.ClientKeepaliveOptions(kaOpts)...)

	if viper.GetBool("peer.tls.enabled") {
		dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCredentialSupport().GetPeerCredentials()))
	} else {
		dialOpts = append(dialOpts, grpc.WithInsecure())
	}
	return dialOpts
}

// initGossipService will initialize the gossip service by:
// 1. Enable TLS if configured;
// 2. Init the message crypto service;
// 3. Init the security advisor;
// 4. Init gossip related struct.
func initGossipService(policyMgr policies.ChannelPolicyManagerGetter, peerServer *comm.GRPCServer, serializedIdentity []byte, peerAddr string) error {
	var certs *gossipcommon.TLSCertificates
	if peerServer.TLSEnabled() {
		serverCert := peerServer.ServerCertificate()
		clientCert, err := peer.GetClientCertificate()
		if err != nil {
			return errors.Wrap(err, "failed obtaining client certificates")
		}
		certs = &gossipcommon.TLSCertificates{}
		certs.TLSServerCert.Store(&serverCert)
		certs.TLSClientCert.Store(&clientCert)
	}

	messageCryptoService := peergossip.NewMCS(
		policyMgr,
		localmsp.NewSigner(),
868
869
		mgmt.NewDeserializersManager(),
	)
870
871
872
	secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager())
	bootstrap := viper.GetStringSlice("peer.gossip.bootstrap")

873
874
875
876
877
878
879
880
881
882
	return service.InitGossipService(
		serializedIdentity,
		peerAddr,
		peerServer.Server(),
		certs,
		messageCryptoService,
		secAdv,
		secureDialOpts,
		bootstrap...,
	)
883
}
884
885
886
887
888
889

func newOperationsSystem() *operations.System {
	return operations.NewSystem(operations.Options{
		Logger:        flogging.MustGetLogger("peer.operations"),
		ListenAddress: viper.GetString("operations.listenAddress"),
		Metrics: operations.MetricsOptions{
890
			Provider: viper.GetString("metrics.provider"),
891
			Statsd: &operations.Statsd{
892
893
894
895
				Network:       viper.GetString("metrics.statsd.network"),
				Address:       viper.GetString("metrics.statsd.address"),
				WriteInterval: viper.GetDuration("metrics.statsd.writeInterval"),
				Prefix:        viper.GetString("metrics.statsd.prefix"),
896
897
898
899
900
901
902
903
904
			},
		},
		TLS: operations.TLS{
			Enabled:            viper.GetBool("operations.tls.enabled"),
			CertFile:           viper.GetString("operations.tls.cert.file"),
			KeyFile:            viper.GetString("operations.tls.key.file"),
			ClientCertRequired: viper.GetBool("operations.tls.clientAuthRequired"),
			ClientCACertFiles:  viper.GetStringSlice("operations.tls.clientRootCAs.files"),
		},
905
		Version: metadata.Version,
906
907
	})
}
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925

func registerProverService(peerServer *comm.GRPCServer, aclProvider aclmgmt.ACLProvider, signingIdentity msp.SigningIdentity) error {
	policyChecker := &server.PolicyBasedAccessControl{
		ACLProvider: aclProvider,
		ACLResources: &server.ACLResources{
			IssueTokens:    resources.Token_Issue,
			TransferTokens: resources.Token_Transfer,
			ListTokens:     resources.Token_List,
		},
	}

	responseMarshaler, err := server.NewResponseMarshaler(signingIdentity)
	if err != nil {
		logger.Errorf("Failed to create prover service: %s", err)
		return err
	}

	prover := &server.Prover{
926
927
928
		CapabilityChecker: &server.TokenCapabilityChecker{
			PeerOps: peer.Default,
		},
929
930
931
932
933
934
935
936
937
		Marshaler:     responseMarshaler,
		PolicyChecker: policyChecker,
		TMSManager: &server.Manager{
			LedgerManager: &server.PeerLedgerManager{},
		},
	}
	token.RegisterProverServer(peerServer.Server(), prover)
	return nil
}