diff --git a/common/deliver/deliver.go b/common/deliver/deliver.go index d2684ff71cbfd3ba13fdfd1d455ff7c12b00d413..2362a816633d4fef37f32967eff7fa718191c68d 100644 --- a/common/deliver/deliver.go +++ b/common/deliver/deliver.go @@ -18,14 +18,18 @@ package deliver import ( "io" + "math" + "time" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/ledger/blockledger" "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/common/util" + "github.com/hyperledger/fabric/core/comm" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/hyperledger/fabric/protos/utils" + "github.com/pkg/errors" "github.com/golang/protobuf/proto" "github.com/op/go-logging" @@ -65,15 +69,29 @@ type Support interface { } type deliverServer struct { - sm SupportManager - policyName string + sm SupportManager + policyName string + timeWindow time.Duration + bindingInspector comm.BindingInspector } // NewHandlerImpl creates an implementation of the Handler interface -func NewHandlerImpl(sm SupportManager, policyName string) Handler { +func NewHandlerImpl(sm SupportManager, policyName string, timeWindow time.Duration, mutualTLS bool) Handler { + // function to extract the TLS cert hash from a channel header + extract := func(msg proto.Message) []byte { + chdr, isChannelHeader := msg.(*cb.ChannelHeader) + if !isChannelHeader || chdr == nil { + return nil + } + return chdr.TlsCertHash + } + bindingInspector := comm.NewBindingInspector(mutualTLS, extract) + return &deliverServer{ - sm: sm, - policyName: policyName, + sm: sm, + policyName: policyName, + timeWindow: timeWindow, + bindingInspector: bindingInspector, } } @@ -120,6 +138,12 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env return sendStatusReply(srv, cb.Status_BAD_REQUEST) } + err = ds.validateChannelHeader(srv, chdr) + if err != nil { + logger.Warningf("Rejecting deliver for %s due to envelope validation error: %s", addr, err) + return sendStatusReply(srv, cb.Status_BAD_REQUEST) + } + chain, ok := ds.sm.GetChain(chdr.ChannelId) if !ok { // Note, we log this at DEBUG because SDKs will poll waiting for channels to be created @@ -222,6 +246,28 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env } +func (ds *deliverServer) validateChannelHeader(srv ab.AtomicBroadcast_DeliverServer, chdr *cb.ChannelHeader) error { + if chdr.GetTimestamp() == nil { + err := errors.New("channel header in envelope must contain timestamp") + return err + } + + envTime := time.Unix(chdr.GetTimestamp().Seconds, int64(chdr.GetTimestamp().Nanos)).UTC() + serverTime := time.Now() + + if math.Abs(float64(serverTime.UnixNano()-envTime.UnixNano())) > float64(ds.timeWindow.Nanoseconds()) { + err := errors.Errorf("timestamp %s is more than the %s time window difference above/below server time %s. either the server and client clocks are out of sync or a relay attack has been attempted", envTime, ds.timeWindow, serverTime) + return err + } + + err := ds.bindingInspector(srv.Context(), chdr) + if err != nil { + return err + } + + return nil +} + func nextBlock(cursor blockledger.Iterator, cancel <-chan struct{}) (block *cb.Block, status cb.Status) { done := make(chan struct{}) go func() { diff --git a/common/deliver/deliver_test.go b/common/deliver/deliver_test.go index ac82425423076da77d18640f70c40e5baf4ad903..7037c3b01ded0c96cd7251c3d980641c93c7da19 100644 --- a/common/deliver/deliver_test.go +++ b/common/deliver/deliver_test.go @@ -17,33 +17,41 @@ limitations under the License. package deliver import ( + "crypto/tls" + "crypto/x509" "fmt" "io" "testing" "time" + "github.com/golang/protobuf/ptypes/timestamp" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/ledger/blockledger" ramledger "github.com/hyperledger/fabric/common/ledger/blockledger/ram" mockpolicies "github.com/hyperledger/fabric/common/mocks/policies" "github.com/hyperledger/fabric/common/policies" genesisconfig "github.com/hyperledger/fabric/common/tools/configtxgen/localconfig" + "github.com/hyperledger/fabric/common/util" cb "github.com/hyperledger/fabric/protos/common" ab "github.com/hyperledger/fabric/protos/orderer" "github.com/hyperledger/fabric/protos/utils" "github.com/stretchr/testify/assert" "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/peer" ) var genesisBlock = cb.NewBlock(0, nil) - var systemChainID = "systemChain" - var policyName = policies.ChannelReaders +var timeWindow = time.Duration(15 * time.Minute) +var testCert = &x509.Certificate{ + Raw: []byte("test"), +} const ledgerSize = 10 +const mutualTLS = true func init() { flogging.SetModuleLevel(pkgLogID, "DEBUG") @@ -54,7 +62,15 @@ type mockStream struct { } func (mockStream) Context() context.Context { - return peer.NewContext(context.Background(), &peer.Peer{}) + p := &peer.Peer{} + p.AuthInfo = credentials.TLSInfo{ + State: tls.ConnectionState{ + PeerCertificates: []*x509.Certificate{ + testCert, + }, + }, + } + return peer.NewContext(context.Background(), p) } type mockD struct { @@ -158,7 +174,7 @@ func initializeDeliverHandler() Handler { l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) } - return NewHandlerImpl(mm, policyName) + return NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS) } func newMockMultichainManager() *mockSupportManager { @@ -187,6 +203,23 @@ func makeSeek(chainID string, seekInfo *ab.SeekInfo) *cb.Envelope { Header: &cb.Header{ ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{ ChannelId: chainID, + Timestamp: util.CreateUtcTimestamp(), + }), + SignatureHeader: utils.MarshalOrPanic(&cb.SignatureHeader{}), + }, + Data: utils.MarshalOrPanic(seekInfo), + }), + } +} + +func makeSeekWithTLSCertHash(chainID string, seekInfo *ab.SeekInfo, tlsCert *x509.Certificate) *cb.Envelope { + return &cb.Envelope{ + Payload: utils.MarshalOrPanic(&cb.Payload{ + Header: &cb.Header{ + ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{ + ChannelId: chainID, + Timestamp: util.CreateUtcTimestamp(), + TlsCertHash: util.ComputeSHA256(tlsCert.Raw), }), SignatureHeader: utils.MarshalOrPanic(&cb.SignatureHeader{}), }, @@ -290,7 +323,7 @@ func TestUnauthorizedSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyName) + ds := NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS) go ds.Handle(m) @@ -315,7 +348,7 @@ func TestRevokedAuthorizationSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyName) + ds := NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS) go ds.Handle(m) @@ -398,7 +431,7 @@ func TestBlockingSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyName) + ds := NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS) go ds.Handle(m) @@ -452,7 +485,7 @@ func TestErroredSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyName) + ds := NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS) go ds.Handle(m) @@ -476,7 +509,7 @@ func TestErroredBlockingSeek(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyName) + ds := NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS) go ds.Handle(m) @@ -501,7 +534,7 @@ func TestErroredBlockingSeek(t *testing.T) { func TestSGracefulShutdown(t *testing.T) { m := newMockD() - ds := NewHandlerImpl(nil, policyName) + ds := NewHandlerImpl(nil, policyName, timeWindow, !mutualTLS) close(m.recvChan) assert.NoError(t, ds.Handle(m), "Expected no error for hangup") @@ -529,7 +562,7 @@ func TestReversedSeqSeek(t *testing.T) { } func TestBadStreamRecv(t *testing.T) { - bh := NewHandlerImpl(nil, policyName) + bh := NewHandlerImpl(nil, policyName, timeWindow, !mutualTLS) assert.Error(t, bh.Handle(&erroneousRecvMockD{}), "Should catch unexpected stream error") } @@ -618,7 +651,7 @@ func TestChainNotFound(t *testing.T) { m := newMockD() defer close(m.recvChan) - ds := NewHandlerImpl(mm, policyName) + ds := NewHandlerImpl(mm, policyName, timeWindow, !mutualTLS) go ds.Handle(m) m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) @@ -643,6 +676,7 @@ func TestBadSeekInfoPayload(t *testing.T) { Header: &cb.Header{ ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{ ChannelId: systemChainID, + Timestamp: util.CreateUtcTimestamp(), }), SignatureHeader: utils.MarshalOrPanic(&cb.SignatureHeader{}), }, @@ -670,6 +704,62 @@ func TestMissingSeekPosition(t *testing.T) { Header: &cb.Header{ ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{ ChannelId: systemChainID, + Timestamp: util.CreateUtcTimestamp(), + }), + SignatureHeader: utils.MarshalOrPanic(&cb.SignatureHeader{}), + }, + Data: nil, + }), + } + + select { + case deliverReply := <-m.sendChan: + assert.Equal(t, cb.Status_BAD_REQUEST, deliverReply.GetStatus(), "Received wrong error on the reply channel") + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get all blocks") + } +} + +func TestNilTimestamp(t *testing.T) { + m := newMockD() + defer close(m.recvChan) + + ds := initializeDeliverHandler() + go ds.Handle(m) + + m.recvChan <- &cb.Envelope{ + Payload: utils.MarshalOrPanic(&cb.Payload{ + Header: &cb.Header{ + ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{ + ChannelId: systemChainID, + }), + SignatureHeader: utils.MarshalOrPanic(&cb.SignatureHeader{}), + }, + Data: nil, + }), + } + + select { + case deliverReply := <-m.sendChan: + assert.Equal(t, cb.Status_BAD_REQUEST, deliverReply.GetStatus(), "Received wrong error on the reply channel") + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get all blocks") + } +} + +func TestTimestampOutOfTimeWindow(t *testing.T) { + m := newMockD() + defer close(m.recvChan) + + ds := initializeDeliverHandler() + go ds.Handle(m) + + m.recvChan <- &cb.Envelope{ + Payload: utils.MarshalOrPanic(&cb.Payload{ + Header: &cb.Header{ + ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{ + ChannelId: systemChainID, + Timestamp: ×tamp.Timestamp{Seconds: 0}, }), SignatureHeader: utils.MarshalOrPanic(&cb.SignatureHeader{}), }, @@ -684,3 +774,82 @@ func TestMissingSeekPosition(t *testing.T) { t.Fatalf("Timed out waiting to get all blocks") } } + +func TestSeekWithMutualTLS(t *testing.T) { + mm := newMockMultichainManager() + ms := mm.chains[systemChainID] + l := ms.ledger + for i := 1; i < ledgerSize; i++ { + l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) + } + + m := newMockD() + defer close(m.recvChan) + + ds := NewHandlerImpl(mm, policyName, timeWindow, mutualTLS) + go ds.Handle(m) + + m.recvChan <- makeSeekWithTLSCertHash(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}, testCert) + + select { + case deliverReply := <-m.sendChan: + if deliverReply.GetBlock() == nil { + t.Fatalf("Received an error on the reply channel") + } + if deliverReply.GetBlock().Header.Number != uint64(ledgerSize-1) { + t.Fatalf("Expected only the most recent block") + } + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get all blocks") + } +} + +func TestSeekWithMutualTLS_wrongTLSCert(t *testing.T) { + mm := newMockMultichainManager() + ms := mm.chains[systemChainID] + l := ms.ledger + for i := 1; i < ledgerSize; i++ { + l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) + } + + m := newMockD() + defer close(m.recvChan) + + ds := NewHandlerImpl(mm, policyName, timeWindow, mutualTLS) + go ds.Handle(m) + wrongCert := &x509.Certificate{ + Raw: []byte("wrong"), + } + m.recvChan <- makeSeekWithTLSCertHash(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}, wrongCert) + + select { + case deliverReply := <-m.sendChan: + assert.Equal(t, cb.Status_BAD_REQUEST, deliverReply.GetStatus(), "Received wrong error on the reply channel") + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get all blocks") + } +} + +func TestSeekWithMutualTLS_noTLSCert(t *testing.T) { + mm := newMockMultichainManager() + ms := mm.chains[systemChainID] + l := ms.ledger + for i := 1; i < ledgerSize; i++ { + l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}})) + } + + m := newMockD() + defer close(m.recvChan) + + ds := NewHandlerImpl(mm, policyName, timeWindow, mutualTLS) + go ds.Handle(m) + + m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY}) + + select { + case deliverReply := <-m.sendChan: + assert.Equal(t, cb.Status_BAD_REQUEST, deliverReply.GetStatus(), "Received wrong error on the reply channel") + case <-time.After(time.Second): + t.Fatalf("Timed out waiting to get all blocks") + } +} diff --git a/core/peer/atomicbroadcast.go b/core/peer/atomicbroadcast.go index 0f60b93b537531b46d194f824f23dba5b6e683e3..7357a8cdec8d70d3bc358c34ba570987e566c0ac 100644 --- a/core/peer/atomicbroadcast.go +++ b/core/peer/atomicbroadcast.go @@ -17,6 +17,7 @@ package peer import ( "runtime/debug" + "time" "github.com/hyperledger/fabric/common/deliver" "github.com/hyperledger/fabric/common/flogging" @@ -61,9 +62,9 @@ func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error { // NewAtomicBroadcastServer creates an ab.AtomicBroadcastServer based on the // ledger Reader. Broadcast is not implemented/supported on the peer. -func NewAtomicBroadcastServer() ab.AtomicBroadcastServer { +func NewAtomicBroadcastServer(timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer { s := &server{ - dh: deliver.NewHandlerImpl(DeliverSupportManager{}, policies.ChannelReaders), + dh: deliver.NewHandlerImpl(DeliverSupportManager{}, policies.ChannelReaders, timeWindow, mutualTLS), bh: broadcast.NewHandlerImpl(nil), } return s diff --git a/orderer/common/localconfig/config.go b/orderer/common/localconfig/config.go index 200c0559f26733eba67002ce20ec4b51ecba1375..6e1fbb53d947525dd5b6e2b165402bc5a4099f40 100644 --- a/orderer/common/localconfig/config.go +++ b/orderer/common/localconfig/config.go @@ -85,6 +85,7 @@ type General struct { LocalMSPDir string LocalMSPID string BCCSP *bccsp.FactoryOpts + Authentication Authentication } // Keepalive contains configuration for gRPC servers @@ -104,6 +105,12 @@ type TLS struct { ClientRootCAs []string } +// Authentication contains configuration parameters related to authenticating +// client messages +type Authentication struct { + TimeWindow time.Duration +} + // Profile contains configuration for Go pprof profiling. type Profile struct { Enabled bool @@ -196,6 +203,9 @@ var defaults = TopLevel{ LocalMSPDir: "msp", LocalMSPID: "DEFAULT", BCCSP: bccsp.GetDefaultOpts(), + Authentication: Authentication{ + TimeWindow: time.Duration(15 * time.Minute), + }, }, RAMLedger: RAMLedger{ HistorySize: 10000, diff --git a/orderer/common/server/main.go b/orderer/common/server/main.go index 37c5f2f66743cb84497af6cbc739a9b81b52653e..4601b4863486a359a9da4c4a23518e2f817dfbae 100644 --- a/orderer/common/server/main.go +++ b/orderer/common/server/main.go @@ -92,8 +92,10 @@ func Start(cmd string, conf *config.TopLevel) { updateTrustedRoots(grpcServer, caSupport, bundle) } } + manager := initializeMultichannelRegistrar(conf, signer, tlsCallback) - server := NewServer(manager, signer, &conf.Debug) + mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert + server := NewServer(manager, signer, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS) switch cmd { case start.FullCommand(): // "start" command diff --git a/orderer/common/server/server.go b/orderer/common/server/server.go index ba758174050352e026ac87075b8b439eb44428b7..c2cbcdb33525d0cd2419d4349032d45579ec0fbe 100644 --- a/orderer/common/server/server.go +++ b/orderer/common/server/server.go @@ -36,8 +36,8 @@ type deliverSupport struct { *multichannel.Registrar } -func (bs deliverSupport) GetChain(chainID string) (deliver.Support, bool) { - return bs.Registrar.GetChain(chainID) +func (ds deliverSupport) GetChain(chainID string) (deliver.Support, bool) { + return ds.Registrar.GetChain(chainID) } type server struct { @@ -46,10 +46,10 @@ type server struct { debug *localconfig.Debug } -// NewServer creates a ab.AtomicBroadcastServer based on the broadcast target and ledger Reader -func NewServer(r *multichannel.Registrar, _ crypto.LocalSigner, debug *localconfig.Debug) ab.AtomicBroadcastServer { +// NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader +func NewServer(r *multichannel.Registrar, _ crypto.LocalSigner, debug *localconfig.Debug, timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer { s := &server{ - dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}, policies.ChannelReaders), + dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}, policies.ChannelReaders, timeWindow, mutualTLS), bh: broadcast.NewHandlerImpl(broadcastSupport{Registrar: r}), debug: debug, } diff --git a/peer/node/start.go b/peer/node/start.go index d1a95a69c9217937ed9d6963bda3f852d40f90cd..5007261fc292928db237e01a103e138ced87380e 100644 --- a/peer/node/start.go +++ b/peer/node/start.go @@ -159,7 +159,9 @@ func serve(args []string) error { // create the peer's AtomicBroadcastServer, which supports deliver but not // broadcast - abServer := peer.NewAtomicBroadcastServer() + mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert + timeWindow := viper.GetDuration("peer.authentication.timewindow") + abServer := peer.NewAtomicBroadcastServer(timeWindow, mutualTLS) ab.RegisterAtomicBroadcastServer(peerServer.Server(), abServer) // enable the cache of chaincode info diff --git a/sampleconfig/core.yaml b/sampleconfig/core.yaml index 74b95dd5c606025bc2eabb31dc16f66a73595ebd..38b6083ba4a5a4537cbf4f84e616321ab738d107 100644 --- a/sampleconfig/core.yaml +++ b/sampleconfig/core.yaml @@ -292,6 +292,13 @@ peer: clientCert: file: + # Authentication contains configuration parameters related to authenticating + # client messages + authentication: + # the acceptable difference between the current server time and the + # client's time as specified in a client request message + timewindow: 15m + # Path on the file system where peer will store data (eg ledger). This # location must be access control protected to prevent unintended # modification that might corrupt the peer operations. diff --git a/sampleconfig/orderer.yaml b/sampleconfig/orderer.yaml index 2c1cf7f42787ba85268f2fdee3d2515b8469863d..9687ec41e0d654a94753b4dc0e049ce7c70689e9 100644 --- a/sampleconfig/orderer.yaml +++ b/sampleconfig/orderer.yaml @@ -115,6 +115,13 @@ General: FileKeyStore: KeyStore: + # Authentication contains configuration parameters related to authenticating + # client messages + Authentication: + # the acceptable difference between the current server time and the + # client's time as specified in a client request message + TimeWindow: 15m + ################################################################################ # # SECTION: File Ledger