deliver.go 11.4 KB
Newer Older
jyellick's avatar
jyellick committed
1
/*
2
Copyright IBM Corp. All Rights Reserved.
jyellick's avatar
jyellick committed
3

4
SPDX-License-Identifier: Apache-2.0
jyellick's avatar
jyellick committed
5
6
*/

7
package deliver
jyellick's avatar
jyellick committed
8
9

import (
10
	"context"
11
	"io"
12
	"math"
Will Lahti's avatar
Will Lahti committed
13
	"strconv"
14
	"time"
15

16
17
	"github.com/golang/protobuf/proto"
	"github.com/hyperledger/fabric/common/crypto"
18
	"github.com/hyperledger/fabric/common/flogging"
19
	"github.com/hyperledger/fabric/common/ledger/blockledger"
20
	"github.com/hyperledger/fabric/common/policies"
21
	"github.com/hyperledger/fabric/common/util"
22
	"github.com/hyperledger/fabric/core/comm"
23
24
	cb "github.com/hyperledger/fabric/protos/common"
	ab "github.com/hyperledger/fabric/protos/orderer"
25
	"github.com/hyperledger/fabric/protos/utils"
26
	"github.com/pkg/errors"
jyellick's avatar
jyellick committed
27
28
)

29
var logger = flogging.MustGetLogger("common.deliver")
30

31
//go:generate counterfeiter -o mock/chain_manager.go -fake-name ChainManager . ChainManager
32

33
34
// ChainManager provides a way for the Handler to look up the Chain.
type ChainManager interface {
35
	GetChain(chainID string) Chain
36
37
}

38
//go:generate counterfeiter -o mock/chain.go -fake-name Chain . Chain
39

40
41
// Chain encapsulates chain operations and data.
type Chain interface {
42
43
44
	// Sequence returns the current config sequence number, can be used to detect config changes
	Sequence() uint64

45
46
47
48
	// PolicyManager returns the current policy manager as specified by the chain configuration
	PolicyManager() policies.Manager

	// Reader returns the chain Reader for the chain
49
	Reader() blockledger.Reader
50
51
52

	// Errored returns a channel which closes when the backing consenter has errored
	Errored() <-chan struct{}
53
54
}

55
56
//go:generate counterfeiter -o mock/policy_checker.go -fake-name PolicyChecker . PolicyChecker

57
// PolicyChecker checks the envelope against the policy logic supplied by the
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// function.
type PolicyChecker interface {
	CheckPolicy(envelope *cb.Envelope, channelID string) error
}

// The PolicyCheckerFunc is an adapter that allows the use of an ordinary
// function as a PolicyChecker.
type PolicyCheckerFunc func(envelope *cb.Envelope, channelID string) error

// CheckPolicy calls pcf(envelope, channelID)
func (pcf PolicyCheckerFunc) CheckPolicy(envelope *cb.Envelope, channelID string) error {
	return pcf(envelope, channelID)
}

//go:generate counterfeiter -o mock/inspector.go -fake-name Inspector . Inspector

// Inspector verifies an appropriate binding between the message and the context.
type Inspector interface {
	Inspect(context.Context, proto.Message) error
}

// The InspectorFunc is an adapter that allows the use of an ordinary
// function as an Inspector.
type InspectorFunc func(context.Context, proto.Message) error

// Inspect calls inspector(ctx, p)
func (inspector InspectorFunc) Inspect(ctx context.Context, p proto.Message) error {
	return inspector(ctx, p)
}
87

88
89
90
91
92
// Handler handles server requests.
type Handler struct {
	ChainManager     ChainManager
	TimeWindow       time.Duration
	BindingInspector Inspector
Will Lahti's avatar
Will Lahti committed
93
	Metrics          *Metrics
jyellick's avatar
jyellick committed
94
95
}

96
97
98
99
//go:generate counterfeiter -o mock/receiver.go -fake-name Receiver . Receiver

// Receiver is used to receive enveloped seek requests.
type Receiver interface {
100
101
102
	Recv() (*cb.Envelope, error)
}

103
104
105
106
107
108
109
//go:generate counterfeiter -o mock/response_sender.go -fake-name ResponseSender . ResponseSender

// ResponseSender defines the interface a handler must implement to send
// responses.
type ResponseSender interface {
	SendStatusResponse(status cb.Status) error
	SendBlockResponse(block *cb.Block) error
110
111
}

Will Lahti's avatar
Will Lahti committed
112
113
114
115
116
117
// Filtered is a marker interface that indicates a response sender
// is configured to send filtered blocks
type Filtered interface {
	IsFiltered() bool
}

118
119
120
121
122
123
// Server is a polymorphic structure to support generalization of this handler
// to be able to deliver different type of responses.
type Server struct {
	Receiver
	PolicyChecker
	ResponseSender
124
125
}

126
127
128
129
130
// ExtractChannelHeaderCertHash extracts the TLS cert hash from a channel header.
func ExtractChannelHeaderCertHash(msg proto.Message) []byte {
	chdr, isChannelHeader := msg.(*cb.ChannelHeader)
	if !isChannelHeader || chdr == nil {
		return nil
131
	}
132
133
	return chdr.TlsCertHash
}
134

135
// NewHandler creates an implementation of the Handler interface.
Will Lahti's avatar
Will Lahti committed
136
func NewHandler(cm ChainManager, timeWindow time.Duration, mutualTLS bool, metrics *Metrics) *Handler {
137
138
139
140
	return &Handler{
		ChainManager:     cm,
		TimeWindow:       timeWindow,
		BindingInspector: InspectorFunc(comm.NewBindingInspector(mutualTLS, ExtractChannelHeaderCertHash)),
Will Lahti's avatar
Will Lahti committed
141
		Metrics:          metrics,
jyellick's avatar
jyellick committed
142
143
144
	}
}

145
146
147
// Handle receives incoming deliver requests.
func (h *Handler) Handle(ctx context.Context, srv *Server) error {
	addr := util.ExtractRemoteAddress(ctx)
148
	logger.Debugf("Starting new deliver loop for %s", addr)
Will Lahti's avatar
Will Lahti committed
149
150
	h.Metrics.StreamsOpened.Add(1)
	defer h.Metrics.StreamsClosed.Add(1)
151
	for {
152
		logger.Debugf("Attempting to read seek info message from %s", addr)
153
		envelope, err := srv.Recv()
154
		if err == io.EOF {
155
			logger.Debugf("Received EOF from %s, hangup", addr)
156
157
			return nil
		}
158
		if err != nil {
159
			logger.Warningf("Error reading from %s: %s", addr, err)
160
161
			return err
		}
162

Will Lahti's avatar
Will Lahti committed
163
164
165
166
167
168
169
170
171
172
173
		status, err := h.deliverBlocks(ctx, srv, envelope)
		if err != nil {
			return err
		}

		err = srv.SendStatusResponse(status)
		if status != cb.Status_SUCCESS {
			return err
		}
		if err != nil {
			logger.Warningf("Error sending to %s: %s", addr, err)
174
			return err
175
176
		}

177
		logger.Debugf("Waiting for new SeekInfo from %s", addr)
178
179
	}
}
180

Will Lahti's avatar
Will Lahti committed
181
182
183
184
185
186
187
188
func isFiltered(srv *Server) bool {
	if filtered, ok := srv.ResponseSender.(Filtered); ok {
		return filtered.IsFiltered()
	}
	return false
}

func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) {
189
	addr := util.ExtractRemoteAddress(ctx)
190
191
	payload, err := utils.UnmarshalPayload(envelope.Payload)
	if err != nil {
192
		logger.Warningf("Received an envelope from %s with no payload: %s", addr, err)
Will Lahti's avatar
Will Lahti committed
193
		return cb.Status_BAD_REQUEST, nil
194
	}
195

196
	if payload.Header == nil {
197
		logger.Warningf("Malformed envelope received from %s with bad header", addr)
Will Lahti's avatar
Will Lahti committed
198
		return cb.Status_BAD_REQUEST, nil
199
	}
200

201
202
	chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
	if err != nil {
203
		logger.Warningf("Failed to unmarshal channel header from %s: %s", addr, err)
Will Lahti's avatar
Will Lahti committed
204
		return cb.Status_BAD_REQUEST, nil
205
	}
206

207
	err = h.validateChannelHeader(ctx, chdr)
208
209
	if err != nil {
		logger.Warningf("Rejecting deliver for %s due to envelope validation error: %s", addr, err)
Will Lahti's avatar
Will Lahti committed
210
		return cb.Status_BAD_REQUEST, nil
211
212
	}

213
214
	chain := h.ChainManager.GetChain(chdr.ChannelId)
	if chain == nil {
215
216
		// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
		// So we would expect our log to be somewhat flooded with these
217
		logger.Debugf("Rejecting deliver for %s because channel %s not found", addr, chdr.ChannelId)
Will Lahti's avatar
Will Lahti committed
218
219
220
221
222
223
		return cb.Status_NOT_FOUND, nil
	}

	labels := []string{
		"channel", chdr.ChannelId,
		"filtered", strconv.FormatBool(isFiltered(srv)),
224
	}
Will Lahti's avatar
Will Lahti committed
225
226
227
228
229
	h.Metrics.RequestsReceived.With(labels...).Add(1)
	defer func() {
		labels := append(labels, "success", strconv.FormatBool(status == cb.Status_SUCCESS))
		h.Metrics.RequestsCompleted.With(labels...).Add(1)
	}()
230

231
232
233
	erroredChan := chain.Errored()
	select {
	case <-erroredChan:
234
		logger.Warningf("[channel: %s] Rejecting deliver request for %s because of consenter error", chdr.ChannelId, addr)
Will Lahti's avatar
Will Lahti committed
235
		return cb.Status_SERVICE_UNAVAILABLE, nil
236
237
	default:
	}
238

239
	accessControl, err := NewSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, crypto.ExpiresAt)
240
241
	if err != nil {
		logger.Warningf("[channel: %s] failed to create access control object due to %s", chdr.ChannelId, err)
Will Lahti's avatar
Will Lahti committed
242
		return cb.Status_BAD_REQUEST, nil
243
	}
jyellick's avatar
jyellick committed
244

245
	if err := accessControl.Evaluate(); err != nil {
246
		logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
Will Lahti's avatar
Will Lahti committed
247
		return cb.Status_FORBIDDEN, nil
248
	}
249

250
251
	seekInfo := &ab.SeekInfo{}
	if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
252
		logger.Warningf("[channel: %s] Received a signed deliver request from %s with malformed seekInfo payload: %s", chdr.ChannelId, addr, err)
Will Lahti's avatar
Will Lahti committed
253
		return cb.Status_BAD_REQUEST, nil
254
	}
255

256
	if seekInfo.Start == nil || seekInfo.Stop == nil {
257
		logger.Warningf("[channel: %s] Received seekInfo message from %s with missing start or stop %v, %v", chdr.ChannelId, addr, seekInfo.Start, seekInfo.Stop)
Will Lahti's avatar
Will Lahti committed
258
		return cb.Status_BAD_REQUEST, nil
259
	}
jyellick's avatar
jyellick committed
260

261
	logger.Debugf("[channel: %s] Received seekInfo (%p) %v from %s", chdr.ChannelId, seekInfo, seekInfo, addr)
262
263
264
265
266
267
268
269
270
271
272
273

	cursor, number := chain.Reader().Iterator(seekInfo.Start)
	defer cursor.Close()
	var stopNum uint64
	switch stop := seekInfo.Stop.Type.(type) {
	case *ab.SeekPosition_Oldest:
		stopNum = number
	case *ab.SeekPosition_Newest:
		stopNum = chain.Reader().Height() - 1
	case *ab.SeekPosition_Specified:
		stopNum = stop.Specified.Number
		if stopNum < number {
274
			logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
Will Lahti's avatar
Will Lahti committed
275
			return cb.Status_BAD_REQUEST, nil
276
277
		}
	}
278

279
	for {
280
281
		if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
			if number > chain.Reader().Height()-1 {
Will Lahti's avatar
Will Lahti committed
282
				return cb.Status_NOT_FOUND, nil
283
284
			}
		}
jyellick's avatar
jyellick committed
285

286
287
288
289
290
291
292
293
294
295
296
297
		var block *cb.Block
		var status cb.Status

		iterCh := make(chan struct{})
		go func() {
			block, status = cursor.Next()
			close(iterCh)
		}()

		select {
		case <-ctx.Done():
			logger.Debugf("Context canceled, aborting wait for next block")
Will Lahti's avatar
Will Lahti committed
298
			return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
299
		case <-erroredChan:
300
301
302
			// TODO, today, the only user of the errorChan is the orderer consensus implementations.  If the peer ever reports
			// this error, we will need to update this error message, possibly finding a way to signal what error text to return.
			logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error")
Will Lahti's avatar
Will Lahti committed
303
			return cb.Status_SERVICE_UNAVAILABLE, nil
304
305
306
307
		case <-iterCh:
			// Iterator has set the block and status vars
		}

308
309
		if status != cb.Status_SUCCESS {
			logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
Will Lahti's avatar
Will Lahti committed
310
			return status, nil
311
		}
312

313
314
315
		// increment block number to support FAIL_IF_NOT_READY deliver behavior
		number++

316
		if err := accessControl.Evaluate(); err != nil {
317
			logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
Will Lahti's avatar
Will Lahti committed
318
			return cb.Status_FORBIDDEN, nil
jyellick's avatar
jyellick committed
319
		}
320

321
		logger.Debugf("[channel: %s] Delivering block for (%p) for %s", chdr.ChannelId, seekInfo, addr)
322

323
		if err := srv.SendBlockResponse(block); err != nil {
324
			logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
Will Lahti's avatar
Will Lahti committed
325
			return cb.Status_INTERNAL_SERVER_ERROR, err
jyellick's avatar
jyellick committed
326
		}
327

Will Lahti's avatar
Will Lahti committed
328
329
		h.Metrics.BlocksSent.With(labels...).Add(1)

330
331
332
		if stopNum == block.Header.Number {
			break
		}
jyellick's avatar
jyellick committed
333
	}
334

335
	logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)
336

Will Lahti's avatar
Will Lahti committed
337
	return cb.Status_SUCCESS, nil
jyellick's avatar
jyellick committed
338
339
}

340
func (h *Handler) validateChannelHeader(ctx context.Context, chdr *cb.ChannelHeader) error {
341
342
343
344
345
346
347
348
	if chdr.GetTimestamp() == nil {
		err := errors.New("channel header in envelope must contain timestamp")
		return err
	}

	envTime := time.Unix(chdr.GetTimestamp().Seconds, int64(chdr.GetTimestamp().Nanos)).UTC()
	serverTime := time.Now()

349
350
	if math.Abs(float64(serverTime.UnixNano()-envTime.UnixNano())) > float64(h.TimeWindow.Nanoseconds()) {
		err := errors.Errorf("envelope timestamp %s is more than %s apart from current server time %s", envTime, h.TimeWindow, serverTime)
351
352
353
		return err
	}

354
	err := h.BindingInspector.Inspect(ctx, chdr)
355
356
357
358
359
360
	if err != nil {
		return err
	}

	return nil
}