Commit a9e8cd71 authored by Will Lahti's avatar Will Lahti Committed by Gari Singh
Browse files

FAB-7382 Remove ReadyChan() from Deliver ledgers



This CR reworks the ledger interfaces and implementations
used by the Deliver service to remove the ReadyChan() function,
which has not been used since FAB-7273 was merged. The
chainSupport struct for the peer is also updated to only
include the ledger. The file ledger is now returned when
Reader() is called by the deliver service.

FAB-7382 #done

Change-Id: I6d08f30b9093ec3120cdeee749aa51afe54e2cdc
Signed-off-by: default avatarWill Lahti <wtlahti@us.ibm.com>
parent 91eee76c
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/
package blockledger_test
......@@ -144,12 +134,7 @@ func testRetrieval(lf ledgerTestFactory, t *testing.T) {
if num != 0 {
t.Fatalf("Expected genesis block iterator, but got %d", num)
}
signal := it.ReadyChan()
select {
case <-signal:
default:
t.Fatalf("Should be ready for block read")
}
block, status := it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Expected to successfully read the genesis block")
......@@ -157,12 +142,7 @@ func testRetrieval(lf ledgerTestFactory, t *testing.T) {
if block.Header.Number != 0 {
t.Fatalf("Expected to successfully retrieve the genesis block")
}
signal = it.ReadyChan()
select {
case <-signal:
default:
t.Fatalf("Should still be ready for block read")
}
block, status = it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Expected to successfully read the second block")
......@@ -183,18 +163,9 @@ func testBlockedRetrieval(lf ledgerTestFactory, t *testing.T) {
if num != 1 {
t.Fatalf("Expected block iterator at 1, but got %d", num)
}
signal := it.ReadyChan()
select {
case <-signal:
t.Fatalf("Should not be ready for block read")
default:
}
li.Append(blockledger.CreateNextBlock(li, []*cb.Envelope{{Payload: []byte("My Data")}}))
select {
case <-signal:
default:
t.Fatalf("Should now be ready for block read")
}
block, status := it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Expected to successfully read the second block")
......
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/
package fileledger
......@@ -78,15 +68,6 @@ func (i *fileLedgerIterator) Next() (*cb.Block, cb.Status) {
return result.(*cb.Block), cb.Status_SUCCESS
}
// ReadyChan supplies a channel which will block until Next will not block
func (i *fileLedgerIterator) ReadyChan() <-chan struct{} {
signal := i.ledger.signal
if i.blockNumber > i.ledger.Height()-1 {
return signal
}
return closedChan
}
// Close releases resources acquired by the Iterator
func (i *fileLedgerIterator) Close() {
i.commonIterator.Close()
......
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/
package fileledger
......
......@@ -101,17 +101,6 @@ func (cu *cursor) Next() (*cb.Block, cb.Status) {
}
}
// ReadyChan supplies a channel which will block until Next will not block
func (cu *cursor) ReadyChan() <-chan struct{} {
cu.jl.mutex.Lock()
signal := cu.jl.signal
cu.jl.mutex.Unlock()
if _, err := os.Stat(cu.jl.blockFilename(cu.blockNumber)); os.IsNotExist(err) {
return signal
}
return closedChan
}
func (cu *cursor) Close() {}
// Iterator returns an Iterator, as specified by a ab.SeekInfo message, and its
......
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/
package jsonledger
......@@ -132,24 +122,10 @@ func TestRetrieval(t *testing.T) {
defer it.Close()
assert.Equal(t, uint64(0), num, "Expected genesis block iterator, but got %d", num)
signal := it.ReadyChan()
select {
case <-signal:
default:
t.Fatalf("Should be ready for block read")
}
block, status := it.Next()
assert.Equal(t, cb.Status_SUCCESS, status, "Expected to successfully read the genesis block")
assert.Equal(t, uint64(0), block.Header.Number, "Expected to successfully retrieve the genesis block")
signal = it.ReadyChan()
select {
case <-signal:
default:
t.Fatalf("Should still be ready for block read")
}
block, status = it.Next()
assert.Equal(t, cb.Status_SUCCESS, status, "Expected to successfully read the second block")
assert.Equal(t, uint64(1), block.Header.Number, "Expected to successfully retrieve the second block but got block number %d", block.Header.Number)
......@@ -186,19 +162,7 @@ func TestBlockedRetrieval(t *testing.T) {
defer it.Close()
assert.Equal(t, uint64(1), num, "Expected block iterator at 1, but got %d", num)
signal := it.ReadyChan()
select {
case <-signal:
t.Fatalf("Should not be ready for block read")
default:
}
fl.Append(blockledger.CreateNextBlock(fl, []*cb.Envelope{{Payload: []byte("My Data")}}))
select {
case <-signal:
default:
t.Fatalf("Should now be ready for block read")
}
block, status := it.Next()
assert.Equal(t, cb.Status_SUCCESS, status, "Expected to successfully read the second block")
......
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package blockledger
......@@ -36,8 +29,6 @@ type Iterator interface {
// Next blocks until there is a new block available, or returns an error if
// the next block is no longer retrievable
Next() (*cb.Block, cb.Status)
// ReadyChan supplies a channel which will block until Next will not block
ReadyChan() <-chan struct{}
// Close releases resources acquired by the Iterator
Close()
}
......
......@@ -57,11 +57,6 @@ func (cu *cursor) Next() (*cb.Block, cb.Status) {
}
}
// ReadyChan supplies a channel which will block until Next will not block
func (cu *cursor) ReadyChan() <-chan struct{} {
return cu.list.signal
}
// Close does nothing
func (cu *cursor) Close() {}
......
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/
package ramledger
......@@ -116,12 +106,7 @@ func TestRetrieval(t *testing.T) {
if num != 0 {
t.Fatalf("Expected genesis block iterator, but got %d", num)
}
signal := it.ReadyChan()
select {
case <-signal:
default:
t.Fatalf("Should be ready for block read")
}
block, status := it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Expected to successfully read the genesis block")
......@@ -129,12 +114,7 @@ func TestRetrieval(t *testing.T) {
if block.Header.Number != 0 {
t.Fatalf("Expected to successfully retrieve the genesis block")
}
signal = it.ReadyChan()
select {
case <-signal:
default:
t.Fatalf("Should still be ready for block read")
}
block, status = it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Expected to successfully read the second block")
......@@ -151,18 +131,9 @@ func TestBlockedRetrieval(t *testing.T) {
if num != 1 {
t.Fatalf("Expected block iterator at 1, but got %d", num)
}
signal := it.ReadyChan()
select {
case <-signal:
t.Fatalf("Should not be ready for block read")
default:
}
rl.Append(blockledger.CreateNextBlock(rl, []*cb.Envelope{{Payload: []byte("My Data")}}))
select {
case <-signal:
default:
t.Fatalf("Should now be ready for block read")
}
block, status := it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Expected to successfully read the second block")
......
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package blockledger
......@@ -57,7 +50,6 @@ func CreateNextBlock(rl Reader, messages []*cb.Envelope) *cb.Block {
Newest: &ab.SeekNewest{},
},
})
<-it.ReadyChan() // Should never block, but just in case
block, status := it.Next()
if status != cb.Status_SUCCESS {
panic("Error seeking to newest block for chain with non-zero height")
......@@ -92,14 +84,9 @@ func GetBlock(rl Reader, index uint64) *cb.Block {
Specified: &ab.SeekSpecified{Number: index},
},
})
select {
case <-i.ReadyChan():
block, status := i.Next()
if status != cb.Status_SUCCESS {
return nil
}
return block
default:
block, status := i.Next()
if status != cb.Status_SUCCESS {
return nil
}
return block
}
......@@ -66,8 +66,7 @@ type chainSupport struct {
bundleSource *channelconfig.BundleSource
channelconfig.Resources
channelconfig.Application
ledger ledger.PeerLedger
fileLedger *fileledger.FileLedger
ledger ledger.PeerLedger
}
var TransientStoreFactory = &storeProvider{stores: make(map[string]transientstore.Store)}
......@@ -152,9 +151,16 @@ func (cs *chainSupport) Sequence() uint64 {
sb := cs.bundleSource.StableBundle()
return sb.ConfigtxValidator().Sequence()
}
// Reader returns an iterator to read from the ledger
func (cs *chainSupport) Reader() blockledger.Reader {
return cs.fileLedger
return fileledger.NewFileLedger(fileLedgerBlockStore{cs.ledger})
}
// Errored returns a channel that can be used to determine
// if a backing resource has errored. At this point in time,
// the peer does not have any error conditions that lead to
// this function signaling that an error has occurred.
func (cs *chainSupport) Errored() <-chan struct{} {
return nil
}
......@@ -338,7 +344,6 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block, ccp ccp
cs := &chainSupport{
Application: ac, // TODO, refactor as this is accessible through Manager
ledger: ledger,
fileLedger: fileledger.NewFileLedger(fileLedgerBlockStore{ledger}),
}
peerSingletonCallback := func(bundle *channelconfig.Bundle) {
......
......@@ -8,7 +8,6 @@ package multichannel
import (
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/crypto"
......@@ -157,15 +156,10 @@ func TestManagerImpl(t *testing.T) {
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
select {
case <-it.ReadyChan():
block, status := it.Next()
assert.Equal(t, cb.Status_SUCCESS, status, "Could not retrieve block")
for i := 0; i < int(conf.Orderer.BatchSize.MaxMessageCount); i++ {
assert.True(t, proto.Equal(messages[i], utils.ExtractEnvelopeOrPanic(block, i)), "Block contents wrong at index %d", i)
}
case <-time.After(time.Second):
t.Fatalf("Block 1 not produced after timeout")
block, status := it.Next()
assert.Equal(t, cb.Status_SUCCESS, status, "Could not retrieve block")
for i := 0; i < int(conf.Orderer.BatchSize.MaxMessageCount); i++ {
assert.True(t, proto.Equal(messages[i], utils.ExtractEnvelopeOrPanic(block, i)), "Block contents wrong at index %d", i)
}
}
......@@ -205,20 +199,15 @@ func TestNewChain(t *testing.T) {
func() {
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
select {
case <-it.ReadyChan():
block, status := it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Could not retrieve block")
}
if len(block.Data.Data) != 1 {
t.Fatalf("Should have had only one message in the orderer transaction block")
}
assert.True(t, proto.Equal(wrapped, utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0])), "Orderer config block contains wrong transaction")
case <-time.After(time.Second):
t.Fatalf("Block 1 not produced after timeout in system chain")
block, status := it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Could not retrieve block")
}
if len(block.Data.Data) != 1 {
t.Fatalf("Should have had only one message in the orderer transaction block")
}
assert.True(t, proto.Equal(wrapped, utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0])), "Orderer config block contains wrong transaction")
}()
chainSupport, ok = manager.GetChain(newChainID)
......@@ -238,36 +227,26 @@ func TestNewChain(t *testing.T) {
it, _ := chainSupport.Reader().Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 0}}})
defer it.Close()
select {
case <-it.ReadyChan():
block, status := it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Could not retrieve new chain genesis block")
}
testLastConfigBlockNumber(t, block, expectedLastConfigBlockNumber)
if len(block.Data.Data) != 1 {
t.Fatalf("Should have had only one message in the new genesis block")
}
assert.True(t, proto.Equal(ingressTx, utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0])), "Genesis block contains wrong transaction")
case <-time.After(time.Second):
t.Fatalf("Block 1 not produced after timeout in system chain")
block, status := it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Could not retrieve new chain genesis block")
}
testLastConfigBlockNumber(t, block, expectedLastConfigBlockNumber)
if len(block.Data.Data) != 1 {
t.Fatalf("Should have had only one message in the new genesis block")
}
select {
case <-it.ReadyChan():
block, status := it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Could not retrieve block on new chain")
}
testLastConfigBlockNumber(t, block, expectedLastConfigBlockNumber)
for i := 0; i < int(conf.Orderer.BatchSize.MaxMessageCount); i++ {
if !proto.Equal(utils.ExtractEnvelopeOrPanic(block, i), messages[i]) {
t.Errorf("Block contents wrong at index %d in new chain", i)
}
assert.True(t, proto.Equal(ingressTx, utils.UnmarshalEnvelopeOrPanic(block.Data.Data[0])), "Genesis block contains wrong transaction")
block, status = it.Next()
if status != cb.Status_SUCCESS {
t.Fatalf("Could not retrieve block on new chain")
}
testLastConfigBlockNumber(t, block, expectedLastConfigBlockNumber)
for i := 0; i < int(conf.Orderer.BatchSize.MaxMessageCount); i++ {
if !proto.Equal(utils.ExtractEnvelopeOrPanic(block, i), messages[i]) {
t.Errorf("Block contents wrong at index %d in new chain", i)
}
case <-time.After(time.Second):
t.Fatalf("Block 1 not produced after timeout on new chain")
}
rcs := newChainSupport(manager, chainSupport.ledgerResources, consenters, mockCrypto())
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment