Commit e87c8157 authored by Luis Sanchez's avatar Luis Sanchez
Browse files

[FAB-2669] use fs ledger's blockstore iterator



- Updated the orderer's file ledger to use it's underlying
  blockstore ledger impl's iterator APIs.
- Blockstore iterators must be explicitly closed to avoid
  leaking resources, so now orderer/ledger.Iterator must
  also be explicitly closed. Added Close() to the
  orderer/ledger.Iterator interface.

Change-Id: Id838a661a11bf5b64a0cbb57d75a27d69d251269
Signed-off-by: default avatarLuis Sanchez <sanchezl@us.ibm.com>
parent f56a82e3
......@@ -84,129 +84,142 @@ func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
return err
}
payload, err := utils.UnmarshalPayload(envelope.Payload)
if err != nil {
logger.Warningf("Received an envelope with no payload: %s", err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
if payload.Header == nil {
logger.Warningf("Malformed envelope received with bad header")
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
if err := ds.deliverBlocks(srv, envelope); err != nil {
return err
}
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Warningf("Failed to unmarshal channel header: %s", err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
logger.Debugf("Waiting for new SeekInfo")
}
}
chain, ok := ds.sm.GetChain(chdr.ChannelId)
if !ok {
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
// So we would expect our log to be somewhat flooded with these
logger.Debugf("Rejecting deliver because channel %s not found", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, envelope *cb.Envelope) error {
erroredChan := chain.Errored()
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Rejecting deliver request because of consenter error", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
default:
payload, err := utils.UnmarshalPayload(envelope.Payload)
if err != nil {
logger.Warningf("Received an envelope with no payload: %s", err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
}
if payload.Header == nil {
logger.Warningf("Malformed envelope received with bad header")
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
lastConfigSequence := chain.Sequence()
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Warningf("Failed to unmarshal channel header: %s", err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
logger.Warningf("[channel: %s] Received unauthorized deliver request", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}
chain, ok := ds.sm.GetChain(chdr.ChannelId)
if !ok {
// Note, we log this at DEBUG because SDKs will poll waiting for channels to be created
// So we would expect our log to be somewhat flooded with these
logger.Debugf("Rejecting deliver because channel %s not found", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
logger.Warningf("[channel: %s] Received a signed deliver request with malformed seekInfo payload: %s", chdr.ChannelId, err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
erroredChan := chain.Errored()
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Rejecting deliver request because of consenter error", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
default:
if seekInfo.Start == nil || seekInfo.Stop == nil {
logger.Warningf("[channel: %s] Received seekInfo message with missing start or stop %v, %v", chdr.ChannelId, seekInfo.Start, seekInfo.Stop)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
}
logger.Debugf("[channel: %s] Received seekInfo (%p) %v", chdr.ChannelId, seekInfo, seekInfo)
cursor, number := chain.Reader().Iterator(seekInfo.Start)
var stopNum uint64
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest:
stopNum = number
case *ab.SeekPosition_Newest:
stopNum = chain.Reader().Height() - 1
case *ab.SeekPosition_Specified:
stopNum = stop.Specified.Number
if stopNum < number {
logger.Warningf("[channel: %s] Received invalid seekInfo message: start number %d greater than stop number %d", chdr.ChannelId, number, stopNum)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
}
lastConfigSequence := chain.Sequence()
for {
if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY {
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Aborting deliver request because of consenter error", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
case <-cursor.ReadyChan():
}
} else {
select {
case <-cursor.ReadyChan():
default:
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
}
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
logger.Warningf("[channel: %s] Received unauthorized deliver request", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}
currentConfigSequence := chain.Sequence()
if currentConfigSequence > lastConfigSequence {
lastConfigSequence = currentConfigSequence
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
logger.Warningf("[channel: %s] Client authorization revoked for deliver request", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}
}
seekInfo := &ab.SeekInfo{}
if err = proto.Unmarshal(payload.Data, seekInfo); err != nil {
logger.Warningf("[channel: %s] Received a signed deliver request with malformed seekInfo payload: %s", chdr.ChannelId, err)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
block, status := cursor.Next()
if status != cb.Status_SUCCESS {
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
return sendStatusReply(srv, status)
}
if seekInfo.Start == nil || seekInfo.Stop == nil {
logger.Warningf("[channel: %s] Received seekInfo message with missing start or stop %v, %v", chdr.ChannelId, seekInfo.Start, seekInfo.Stop)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
logger.Debugf("[channel: %s] Delivering block for (%p)", chdr.ChannelId, seekInfo)
logger.Debugf("[channel: %s] Received seekInfo (%p) %v", chdr.ChannelId, seekInfo, seekInfo)
cursor, number := chain.Reader().Iterator(seekInfo.Start)
defer cursor.Close()
var stopNum uint64
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest:
stopNum = number
case *ab.SeekPosition_Newest:
stopNum = chain.Reader().Height() - 1
case *ab.SeekPosition_Specified:
stopNum = stop.Specified.Number
if stopNum < number {
logger.Warningf("[channel: %s] Received invalid seekInfo message: start number %d greater than stop number %d", chdr.ChannelId, number, stopNum)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
}
if err := sendBlockReply(srv, block); err != nil {
logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
return err
for {
if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY {
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Aborting deliver request because of consenter error", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
case <-cursor.ReadyChan():
}
} else {
select {
case <-cursor.ReadyChan():
default:
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
}
if stopNum == block.Header.Number {
break
currentConfigSequence := chain.Sequence()
if currentConfigSequence > lastConfigSequence {
lastConfigSequence = currentConfigSequence
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
if result != filter.Forward {
logger.Warningf("[channel: %s] Client authorization revoked for deliver request", chdr.ChannelId)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}
}
if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil {
block, status := cursor.Next()
if status != cb.Status_SUCCESS {
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
return sendStatusReply(srv, status)
}
logger.Debugf("[channel: %s] Delivering block for (%p)", chdr.ChannelId, seekInfo)
if err := sendBlockReply(srv, block); err != nil {
logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
return err
}
logger.Debugf("[channel: %s] Done delivering for (%p), waiting for new SeekInfo", chdr.ChannelId, seekInfo)
if stopNum == block.Header.Number {
break
}
}
if err := sendStatusReply(srv, cb.Status_SUCCESS); err != nil {
logger.Warningf("[channel: %s] Error sending to stream: %s", chdr.ChannelId, err)
return err
}
logger.Debugf("[channel: %s] Done delivering for (%p)", chdr.ChannelId, seekInfo)
return nil
}
func sendStatusReply(srv ab.AtomicBroadcast_DeliverServer, status cb.Status) error {
......
......@@ -139,6 +139,7 @@ func testRetrieval(lf ledgerTestFactory, t *testing.T) {
_, li := lf.New()
li.Append(CreateNextBlock(li, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}))
it, num := li.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}})
defer it.Close()
if num != 0 {
t.Fatalf("Expected genesis block iterator, but got %d", num)
}
......@@ -177,6 +178,7 @@ func TestBlockedRetrieval(t *testing.T) {
func testBlockedRetrieval(lf ledgerTestFactory, t *testing.T) {
_, li := lf.New()
it, num := li.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
if num != 1 {
t.Fatalf("Expected block iterator at 1, but got %d", num)
}
......
......@@ -17,6 +17,7 @@ limitations under the License.
package fileledger
import (
cl "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
ledger "github.com/hyperledger/fabric/orderer/ledger"
cb "github.com/hyperledger/fabric/protos/common"
......@@ -38,8 +39,9 @@ type fileLedger struct {
}
type fileLedgerIterator struct {
ledger *fileLedger
blockNumber uint64
ledger *fileLedger
blockNumber uint64
commonIterator cl.ResultsIterator
}
// Next blocks until there is a new block available, or returns an error if the
......@@ -47,12 +49,12 @@ type fileLedgerIterator struct {
func (i *fileLedgerIterator) Next() (*cb.Block, cb.Status) {
for {
if i.blockNumber < i.ledger.Height() {
block, err := i.ledger.blockStore.RetrieveBlockByNumber(i.blockNumber)
result, err := i.commonIterator.Next()
if err != nil {
return nil, cb.Status_SERVICE_UNAVAILABLE
}
i.blockNumber++
return block, cb.Status_SUCCESS
return result.(*cb.Block), cb.Status_SUCCESS
}
<-i.ledger.signal
}
......@@ -67,28 +69,41 @@ func (i *fileLedgerIterator) ReadyChan() <-chan struct{} {
return closedChan
}
// Close releases resources acquired by the Iterator
func (i *fileLedgerIterator) Close() {
i.commonIterator.Close()
}
// Iterator returns an Iterator, as specified by a cb.SeekInfo message, and its
// starting block number
func (fl *fileLedger) Iterator(startPosition *ab.SeekPosition) (ledger.Iterator, uint64) {
var startingBlockNumber uint64
switch start := startPosition.Type.(type) {
case *ab.SeekPosition_Oldest:
return &fileLedgerIterator{ledger: fl, blockNumber: 0}, 0
startingBlockNumber = 0
case *ab.SeekPosition_Newest:
info, err := fl.blockStore.GetBlockchainInfo()
if err != nil {
logger.Panic(err)
}
newestBlockNumber := info.Height - 1
return &fileLedgerIterator{ledger: fl, blockNumber: newestBlockNumber}, newestBlockNumber
startingBlockNumber = newestBlockNumber
case *ab.SeekPosition_Specified:
startingBlockNumber = start.Specified.Number
height := fl.Height()
if start.Specified.Number > height {
if startingBlockNumber > height {
return &ledger.NotFoundErrorIterator{}, 0
}
return &fileLedgerIterator{ledger: fl, blockNumber: start.Specified.Number}, start.Specified.Number
default:
return &ledger.NotFoundErrorIterator{}, 0
}
iterator, err := fl.blockStore.RetrieveBlocks(startingBlockNumber)
if err != nil {
return &ledger.NotFoundErrorIterator{}, 0
}
return &fileLedgerIterator{ledger: fl, blockNumber: startingBlockNumber, commonIterator: iterator}, startingBlockNumber
}
// Height returns the number of blocks on the ledger
......
......@@ -17,6 +17,7 @@ limitations under the License.
package fileledger
import (
"errors"
"fmt"
"io/ioutil"
"os"
......@@ -30,6 +31,7 @@ import (
"github.com/hyperledger/fabric/protos/peer"
logging "github.com/op/go-logging"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
var genesisBlock = cb.NewBlock(0, nil)
......@@ -118,6 +120,19 @@ func (mbs *mockBlockStore) RetrieveTxValidationCodeByTxID(txID string) (peer.TxV
func (*mockBlockStore) Shutdown() {
}
type mockBlockStoreIterator struct {
mock.Mock
}
func (m *mockBlockStoreIterator) Next() (cl.QueryResult, error) {
args := m.Called()
return args.Get(0), args.Error(1)
}
func (m *mockBlockStoreIterator) Close() {
m.Called()
}
func TestInitialization(t *testing.T) {
tev, fl := initialize(t)
defer tev.tearDown()
......@@ -188,6 +203,7 @@ func TestRetrieval(t *testing.T) {
defer tev.tearDown()
fl.Append(ledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}))
it, num := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}})
defer it.Close()
assert.Zero(t, num, "Expected genesis block iterator, but got %d", num)
signal := it.ReadyChan()
......@@ -221,6 +237,7 @@ func TestBlockedRetrieval(t *testing.T) {
tev, fl := initialize(t)
defer tev.tearDown()
it, num := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
if num != 1 {
t.Fatalf("Expected block iterator at 1, but got %d", num)
}
......@@ -296,13 +313,29 @@ func TestBlockstoreError(t *testing.T) {
signal: make(chan struct{}),
}
it, _ := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 42}}})
defer it.Close()
assert.IsType(
t,
&ledger.NotFoundErrorIterator{},
it,
"Expected Not Found Error if seek number is greater than ledger height")
}
it, _ = fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}})
{
resultsIterator := &mockBlockStoreIterator{}
resultsIterator.On("Next").Return(nil, errors.New("a mocked error"))
resultsIterator.On("Close").Return()
fl := &fileLedger{
blockStore: &mockBlockStore{
blockchainInfo: &cb.BlockchainInfo{Height: uint64(1)},
getBlockchainInfoError: nil,
retrieveBlockByNumberError: fmt.Errorf("Error retrieving block by number"),
resultsIterator: resultsIterator,
},
signal: make(chan struct{}),
}
it, _ := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}})
defer it.Close()
_, status := it.Next()
assert.Equal(t, cb.Status_SERVICE_UNAVAILABLE, status, "Expected service unavailable error")
}
......
......@@ -108,6 +108,8 @@ func (cu *cursor) ReadyChan() <-chan struct{} {
return closedChan
}
func (cu *cursor) Close() {}
// Iterator returns an Iterator, as specified by a cb.SeekInfo message, and its
// starting block number
func (jl *jsonLedger) Iterator(startPosition *ab.SeekPosition) (ledger.Iterator, uint64) {
......
......@@ -129,6 +129,7 @@ func TestRetrieval(t *testing.T) {
defer tev.tearDown()
fl.Append(ledger.CreateNextBlock(fl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}))
it, num := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}})
defer it.Close()
assert.Equal(t, uint64(0), num, "Expected genesis block iterator, but got %d", num)
signal := it.ReadyChan()
......@@ -162,6 +163,7 @@ func TestRaceCondition(t *testing.T) {
defer tev.tearDown()
it, _ := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
var block *cb.Block
var status cb.Status
......@@ -182,6 +184,7 @@ func TestBlockedRetrieval(t *testing.T) {
tev, fl := initialize(t)
defer tev.tearDown()
it, num := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
assert.Equal(t, uint64(1), num, "Expected block iterator at 1, but got %d", num)
signal := it.ReadyChan()
......@@ -222,6 +225,7 @@ func TestInvalidRetrieval(t *testing.T) {
defer tev.tearDown()
it, num := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 2}}})
defer it.Close()
assert.Equal(t, uint64(0), num, "Expected block number to be zero for invalid iterator")
_, status := it.Next()
......@@ -242,6 +246,7 @@ func TestBrokenBlockFile(t *testing.T) {
assert.NoError(t, file.Close(), "Expected to successfully close block file")
it, num := fl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}})
defer it.Close()
assert.Equal(t, uint64(0), num, "Expected genesis block iterator, but got %d", num)
_, status := it.Next()
......
......@@ -41,6 +41,8 @@ type Iterator interface {
Next() (*cb.Block, cb.Status)
// ReadyChan supplies a channel which will block until Next will not block
ReadyChan() <-chan struct{}
// Close releases resources acquired by the Iterator
Close()
}
// Reader allows the caller to inspect the ledger
......
......@@ -63,6 +63,9 @@ func (cu *cursor) ReadyChan() <-chan struct{} {
return cu.list.signal
}
// Close does nothing
func (cu *cursor) Close() {}
// Iterator returns an Iterator, as specified by a cb.SeekInfo message, and its
// starting block number
func (rl *ramLedger) Iterator(startPosition *ab.SeekPosition) (ledger.Iterator, uint64) {
......
......@@ -113,6 +113,7 @@ func TestRetrieval(t *testing.T) {
rl := newTestChain(3)
rl.Append(ledger.CreateNextBlock(rl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}))
it, num := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{}})
defer it.Close()
if num != 0 {
t.Fatalf("Expected genesis block iterator, but got %d", num)
}
......@@ -147,6 +148,7 @@ func TestRetrieval(t *testing.T) {
func TestBlockedRetrieval(t *testing.T) {
rl := newTestChain(3)
it, num := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
if num != 1 {
t.Fatalf("Expected block iterator at 1, but got %d", num)
}
......@@ -174,6 +176,7 @@ func TestBlockedRetrieval(t *testing.T) {
func TestIteratorPastEnd(t *testing.T) {
rl := newTestChain(3)
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 2}}})
defer it.Close()
if _, status := it.Next(); status != cb.Status_NOT_FOUND {
t.Fatalf("Expected block with status NOT_FOUND, but got %d", status)
}
......@@ -185,7 +188,8 @@ func TestIteratorOldest(t *testing.T) {
rl.Append(ledger.CreateNextBlock(rl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}))
rl.Append(ledger.CreateNextBlock(rl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}))
rl.Append(ledger.CreateNextBlock(rl, []*cb.Envelope{&cb.Envelope{Payload: []byte("My Data")}}))
_, num := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
it, num := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
if num != 1 {
t.Fatalf("Expected block iterator at 1, but got %d", num)
}
......
......@@ -43,6 +43,9 @@ func (nfei *NotFoundErrorIterator) ReadyChan() <-chan struct{} {
return closedChan
}
// Close does nothing
func (nfei *NotFoundErrorIterator) Close() {}
// CreateNextBlock provides a utility way to construct the next block from
// contents and metadata for a given ledger
// XXX This will need to be modified to accept marshaled envelopes
......
......@@ -223,6 +223,7 @@ func TestFilterCreation(t *testing.T) {
}
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
select {
case <-it.ReadyChan():
block, status := it.Next()
......@@ -260,6 +261,7 @@ func TestManagerImpl(t *testing.T) {
}
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
select {
case <-it.ReadyChan():
block, status := it.Next()
......@@ -567,22 +569,24 @@ func TestNewChain(t *testing.T) {
assert.True(t, ok, "Could not find system channel")
chainSupport.Enqueue(wrapped)
func() {
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
select {
case <-it.ReadyChan():
block, status := it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Could not retrieve block")
}
if len(block.Data.Data) != 1 {
t.Fatalf("Should have had only one message in the orderer transaction block")
}
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
select {
case <-it.ReadyChan():
block, status := it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Could not retrieve block")
assert.Equal(t, wrapped, utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0]), "Orderer config block contains wrong transaction")
case <-time.After(time.Second):
t.Fatalf("Block 1 not produced after timeout in system chain")
}
if len(block.Data.Data) != 1 {
t.Fatalf("Should have had only one message in the orderer transaction block")
}
assert.Equal(t, wrapped, utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0]), "Orderer config block contains wrong transaction")
case <-time.After(time.Second):
t.Fatalf("Block 1 not produced after timeout in system chain")
}
}()
chainSupport, ok = manager.GetChain(newChainID)
......@@ -599,7 +603,8 @@ func TestNewChain(t *testing.T) {
chainSupport.Enqueue(message)
}
it, _ = chainSupport.Reader().Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 0}}})
it, _ := chainSupport.Reader().Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 0}}})