deliverclient.go 4.76 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
Copyright IBM Corp. 2017 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

                 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package channel

import (
	"fmt"
21
	"time"
22

23
	"github.com/hyperledger/fabric/common/localmsp"
24
25
	"github.com/hyperledger/fabric/common/util"
	pcommon "github.com/hyperledger/fabric/peer/common"
26
27
28
	"github.com/hyperledger/fabric/protos/common"
	ab "github.com/hyperledger/fabric/protos/orderer"
	"github.com/hyperledger/fabric/protos/utils"
29
	"github.com/pkg/errors"
30
31
32
)

type deliverClientIntf interface {
33
34
35
	getSpecifiedBlock(num uint64) (*common.Block, error)
	getOldestBlock() (*common.Block, error)
	getNewestBlock() (*common.Block, error)
36
	Close() error
37
38
39
}

type deliverClient struct {
40
41
42
	client      ab.AtomicBroadcast_DeliverClient
	chainID     string
	tlsCertHash []byte
43
44
}

45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func newDeliverClient(chainID string) (*deliverClient, error) {
	var tlsCertHash []byte
	oc, err := pcommon.NewOrdererClientFromEnv()
	if err != nil {
		return nil, errors.WithMessage(err, "failed to create deliver client")
	}
	dc, err := oc.Deliver()
	if err != nil {
		return nil, errors.WithMessage(err, "failed to create deliver client")
	}
	// check for client certificate and create hash if present
	if len(oc.Certificate().Certificate) > 0 {
		tlsCertHash = util.ComputeSHA256(oc.Certificate().Certificate[0])
	}
	return &deliverClient{client: dc, chainID: chainID, tlsCertHash: tlsCertHash}, nil
60
61
}

62
63
64
65
66
67
func seekHelper(
	chainID string,
	position *ab.SeekPosition,
	tlsCertHash []byte,
) *common.Envelope {

68
	seekInfo := &ab.SeekInfo{
69
70
		Start:    position,
		Stop:     position,
71
		Behavior: ab.SeekInfo_BLOCK_UNTIL_READY,
72
	}
73

74
75
76
	env, err := utils.CreateSignedEnvelopeWithTLSBinding(
		common.HeaderType_CONFIG_UPDATE, chainID, localmsp.NewSigner(),
		seekInfo, int32(0), uint64(0), tlsCertHash)
77
	if err != nil {
78
		logger.Errorf("Error signing envelope:  %s", err)
79
80
81
		return nil
	}
	return env
82
83
}

84
func (r *deliverClient) seekSpecified(blockNumber uint64) error {
85
86
87
88
	return r.client.Send(seekHelper(r.chainID, &ab.SeekPosition{
		Type: &ab.SeekPosition_Specified{
			Specified: &ab.SeekSpecified{
				Number: blockNumber}}}, r.tlsCertHash))
89
90
}

91
func (r *deliverClient) seekOldest() error {
92
93
94
	return r.client.Send(seekHelper(r.chainID,
		&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{
			Oldest: &ab.SeekOldest{}}}, r.tlsCertHash))
95
96
97
}

func (r *deliverClient) seekNewest() error {
98
99
100
	return r.client.Send(seekHelper(r.chainID,
		&ab.SeekPosition{Type: &ab.SeekPosition_Newest{
			Newest: &ab.SeekNewest{}}}, r.tlsCertHash))
101
102
}

103
func (r *deliverClient) readBlock() (*common.Block, error) {
104
105
	msg, err := r.client.Recv()
	if err != nil {
106
		return nil, fmt.Errorf("Error receiving: %s", err)
107
	}
108

109
110
	switch t := msg.Type.(type) {
	case *ab.DeliverResponse_Status:
111
		logger.Infof("Got status: %v", t)
112
		return nil, fmt.Errorf("can't read the block: %v", t)
113
	case *ab.DeliverResponse_Block:
114
		logger.Infof("Received block: %v", t.Block.Header.Number)
115
		r.client.Recv() // Flush the success message
116
117
		return t.Block, nil
	default:
118
		return nil, fmt.Errorf("response error: unknown type %T", t)
119
120
121
	}
}

122
123
124
func (r *deliverClient) getSpecifiedBlock(num uint64) (*common.Block, error) {
	err := r.seekSpecified(num)
	if err != nil {
125
		logger.Errorf("Received error: %s", err)
126
127
128
129
130
131
132
133
		return nil, err
	}

	return r.readBlock()
}

func (r *deliverClient) getOldestBlock() (*common.Block, error) {
	err := r.seekOldest()
134
	if err != nil {
135
		return nil, fmt.Errorf("Received error: %s ", err)
136
137
	}

138
139
140
141
142
	return r.readBlock()
}

func (r *deliverClient) getNewestBlock() (*common.Block, error) {
	err := r.seekNewest()
143
	if err != nil {
144
		logger.Errorf("Received error: %s", err)
145
146
147
		return nil, err
	}

148
	return r.readBlock()
149
}
150
151

func (r *deliverClient) Close() error {
152
	return r.client.CloseSend()
153
154
155
156
157
158
159
160
161
162
163
164
}

func getGenesisBlock(cf *ChannelCmdFactory) (*common.Block, error) {
	timer := time.NewTimer(time.Second * time.Duration(timeout))
	defer timer.Stop()

	for {
		select {
		case <-timer.C:
			cf.DeliverClient.Close()
			return nil, fmt.Errorf("timeout waiting for channel creation")
		default:
165
			if block, err := cf.DeliverClient.getSpecifiedBlock(0); err != nil {
166
				cf.DeliverClient.Close()
167
				cf, err = InitCmdFactory(EndorserNotRequired, OrdererRequired)
168
169
170
171
172
				if err != nil {
					return nil, fmt.Errorf("failed connecting: %v", err)
				}
				time.Sleep(200 * time.Millisecond)
			} else {
173
				cf.DeliverClient.Close()
174
175
176
177
178
				return block, nil
			}
		}
	}
}