impl.go 4.63 KB
Newer Older
1
/*
2
Copyright IBM Corp. All Rights Reserved.
3

4
SPDX-License-Identifier: Apache-2.0
5
6
7
8
9
*/

package ramledger

import (
10
	"bytes"
11
	"sync"
12

13
	"github.com/hyperledger/fabric/common/flogging"
14
	"github.com/hyperledger/fabric/common/ledger/blockledger"
15
16
	cb "github.com/hyperledger/fabric/protos/common"
	ab "github.com/hyperledger/fabric/protos/orderer"
17
	"github.com/op/go-logging"
18
	"github.com/pkg/errors"
19
20
)

21
22
23
24
25
26
27
const pkgLogID = "orderer/ledger/ramledger"

var logger *logging.Logger

func init() {
	logger = flogging.MustGetLogger(pkgLogID)
}
28
29
30
31
32
33
34
35

type cursor struct {
	list *simpleList
}

type simpleList struct {
	next   *simpleList
	signal chan struct{}
36
	block  *cb.Block
37
38
39
}

type ramLedger struct {
40
	lock    sync.RWMutex
41
42
43
44
45
46
	maxSize int
	size    int
	oldest  *simpleList
	newest  *simpleList
}

47
48
49
50
51
52
53
54
55
56
// Next blocks until there is a new block available, or returns an error if the
// next block is no longer retrievable
func (cu *cursor) Next() (*cb.Block, cb.Status) {
	// This only loops once, as signal reading indicates non-nil next
	for {
		if cu.list.next != nil {
			cu.list = cu.list.next
			return cu.list.block, cb.Status_SUCCESS
		}
		<-cu.list.signal
57
58
59
	}
}

60
61
62
// ReadyChan supplies a channel which will block until Next will not block
func (cu *cursor) ReadyChan() <-chan struct{} {
	return cu.list.signal
63
64
}

65
66
67
// Close does nothing
func (cu *cursor) Close() {}

68
// Iterator returns an Iterator, as specified by a ab.SeekInfo message, and its
69
// starting block number
70
func (rl *ramLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iterator, uint64) {
71
72
73
	rl.lock.RLock()
	defer rl.lock.RUnlock()

74
	var list *simpleList
75
76
	switch start := startPosition.Type.(type) {
	case *ab.SeekPosition_Oldest:
77
78
		oldest := rl.oldest
		list = &simpleList{
79
			block:  &cb.Block{Header: &cb.BlockHeader{Number: oldest.block.Header.Number - 1}},
80
81
82
83
			next:   oldest,
			signal: make(chan struct{}),
		}
		close(list.signal)
84
	case *ab.SeekPosition_Newest:
85
86
		newest := rl.newest
		list = &simpleList{
87
			block:  &cb.Block{Header: &cb.BlockHeader{Number: newest.block.Header.Number - 1}},
88
89
90
91
			next:   newest,
			signal: make(chan struct{}),
		}
		close(list.signal)
92
	case *ab.SeekPosition_Specified:
93
		oldest := rl.oldest
94
95
96
		specified := start.Specified.Number
		logger.Debugf("Attempting to return block %d", specified)

97
		// Note the two +1's here is to accommodate the 'preGenesis' block of ^uint64(0)
98
		if specified+1 < oldest.block.Header.Number+1 || specified > rl.newest.block.Header.Number+1 {
99
100
			logger.Debugf("Returning error iterator because specified seek was %d with oldest %d and newest %d",
				specified, rl.oldest.block.Header.Number, rl.newest.block.Header.Number)
101
			return &blockledger.NotFoundErrorIterator{}, 0
102
103
		}

104
		if specified == oldest.block.Header.Number {
105
			list = &simpleList{
106
				block:  &cb.Block{Header: &cb.BlockHeader{Number: oldest.block.Header.Number - 1}},
107
108
109
110
111
112
113
114
				next:   oldest,
				signal: make(chan struct{}),
			}
			close(list.signal)
			break
		}

		list = oldest
115
		for {
116
			if list.block.Header.Number == specified-1 {
117
118
119
120
121
				break
			}
			list = list.next // No need for nil check, because of range check above
		}
	}
122
123
124
125
126
127
128
129
130
131
	cursor := &cursor{list: list}
	blockNum := list.block.Header.Number + 1

	// If the cursor is for pre-genesis, skip it, the block number wraps
	if blockNum == ^uint64(0) {
		cursor.Next()
		blockNum++
	}

	return cursor, blockNum
132
133
}

134
135
// Height returns the number of blocks on the ledger
func (rl *ramLedger) Height() uint64 {
136
137
	rl.lock.RLock()
	defer rl.lock.RUnlock()
138
	return rl.newest.block.Header.Number + 1
139
140
}

141
142
// Append appends a new block to the ledger
func (rl *ramLedger) Append(block *cb.Block) error {
143
144
145
	rl.lock.Lock()
	defer rl.lock.Unlock()

146
	if block.Header.Number != rl.newest.block.Header.Number+1 {
147
		return errors.Errorf("block number should have been %d but was %d",
148
			rl.newest.block.Header.Number+1, block.Header.Number)
149
150
	}

151
152
	if rl.newest.block.Header.Number+1 != 0 { // Skip this check for genesis block insertion
		if !bytes.Equal(block.Header.PreviousHash, rl.newest.block.Header.Hash()) {
153
			return errors.Errorf("block should have had previous hash of %x but was %x",
154
				rl.newest.block.Header.Hash(), block.Header.PreviousHash)
155
		}
156
157
	}

158
	rl.appendBlock(block)
159
	return nil
160
161
}

162
func (rl *ramLedger) appendBlock(block *cb.Block) {
163
164
165
166
167
168
	rl.newest.next = &simpleList{
		signal: make(chan struct{}),
		block:  block,
	}

	lastSignal := rl.newest.signal
169
	logger.Debugf("Sending signal that block %d has a successor", rl.newest.block.Header.Number)
170
171
172
173
174
175
	rl.newest = rl.newest.next
	close(lastSignal)

	rl.size++

	if rl.size > rl.maxSize {
176
177
		logger.Debugf("RAM ledger max size about to be exceeded, removing oldest item: %d",
			rl.oldest.block.Header.Number)
178
179
180
181
		rl.oldest = rl.oldest.next
		rl.size--
	}
}