impl.go 4.82 KB
Newer Older
1
/*
2
Copyright IBM Corp. All Rights Reserved.
3

4
SPDX-License-Identifier: Apache-2.0
5
6
7
8
9
10
11
12
13
*/

package jsonledger

import (
	"bytes"
	"fmt"
	"os"
	"path/filepath"
14
	"sync"
15

16
	"github.com/golang/protobuf/jsonpb"
17
	"github.com/hyperledger/fabric/common/flogging"
18
	"github.com/hyperledger/fabric/common/ledger/blockledger"
19
20
21
	cb "github.com/hyperledger/fabric/protos/common"
	ab "github.com/hyperledger/fabric/protos/orderer"
	"github.com/op/go-logging"
22
	"github.com/pkg/errors"
23
24
)

25
26
27
const pkgLogID = "orderer/ledger/jsonledger"

var logger *logging.Logger
28
var closedChan chan struct{}
29
var fileLock sync.Mutex
30
31

func init() {
32
	logger = flogging.MustGetLogger(pkgLogID)
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
	closedChan = make(chan struct{})
	close(closedChan)
}

const (
	blockFileFormatString      = "block_%020d.json"
	chainDirectoryFormatString = "chain_%s"
)

type cursor struct {
	jl          *jsonLedger
	blockNumber uint64
}

type jsonLedger struct {
48
49
50
51
	directory string
	height    uint64
	lastHash  []byte
	marshaler *jsonpb.Marshaler
52
53
54

	mutex  sync.Mutex
	signal chan struct{}
55
56
57
58
}

// readBlock returns the block or nil, and whether the block was found or not, (nil,true) generally indicates an irrecoverable problem
func (jl *jsonLedger) readBlock(number uint64) (*cb.Block, bool) {
59
60
61
62
63
64
65
66
	name := jl.blockFilename(number)

	// In case of ongoing write, reading the block file may result in `unexpected EOF` error.
	// Therefore, we use file mutex here to prevent this race condition.
	fileLock.Lock()
	defer fileLock.Unlock()

	file, err := os.Open(name)
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
	if err == nil {
		defer file.Close()
		block := &cb.Block{}
		err = jsonpb.Unmarshal(file, block)
		if err != nil {
			return nil, true
		}
		logger.Debugf("Read block %d", block.Header.Number)
		return block, true
	}
	return nil, false
}

// Next blocks until there is a new block available, or returns an error if the
// next block is no longer retrievable
func (cu *cursor) Next() (*cb.Block, cb.Status) {
	// This only loops once, as signal reading
	// indicates the new block has been written
	for {
		block, found := cu.jl.readBlock(cu.blockNumber)
		if found {
			if block == nil {
				return nil, cb.Status_SERVICE_UNAVAILABLE
			}
			cu.blockNumber++
			return block, cb.Status_SUCCESS
		}
94
95
96
97
98
99
100

		// copy the signal channel under lock to avoid race
		// with new signal channel in append
		cu.jl.mutex.Lock()
		signal := cu.jl.signal
		cu.jl.mutex.Unlock()
		<-signal
101
102
103
104
105
	}
}

// ReadyChan supplies a channel which will block until Next will not block
func (cu *cursor) ReadyChan() <-chan struct{} {
106
	cu.jl.mutex.Lock()
107
	signal := cu.jl.signal
108
	cu.jl.mutex.Unlock()
109
110
111
112
113
114
	if _, err := os.Stat(cu.jl.blockFilename(cu.blockNumber)); os.IsNotExist(err) {
		return signal
	}
	return closedChan
}

115
116
func (cu *cursor) Close() {}

117
// Iterator returns an Iterator, as specified by a ab.SeekInfo message, and its
118
// starting block number
119
func (jl *jsonLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iterator, uint64) {
120
121
122
123
124
125
126
127
	switch start := startPosition.Type.(type) {
	case *ab.SeekPosition_Oldest:
		return &cursor{jl: jl, blockNumber: 0}, 0
	case *ab.SeekPosition_Newest:
		high := jl.height - 1
		return &cursor{jl: jl, blockNumber: high}, high
	case *ab.SeekPosition_Specified:
		if start.Specified.Number > jl.height {
128
			return &blockledger.NotFoundErrorIterator{}, 0
129
130
		}
		return &cursor{jl: jl, blockNumber: start.Specified.Number}, start.Specified.Number
131
	default:
132
		return &blockledger.NotFoundErrorIterator{}, 0
133
134
135
136
137
138
139
140
141
142
143
	}
}

// Height returns the number of blocks on the ledger
func (jl *jsonLedger) Height() uint64 {
	return jl.height
}

// Append appends a new block to the ledger
func (jl *jsonLedger) Append(block *cb.Block) error {
	if block.Header.Number != jl.height {
144
		return errors.Errorf("block number should have been %d but was %d", jl.height, block.Header.Number)
145
146
147
	}

	if !bytes.Equal(block.Header.PreviousHash, jl.lastHash) {
148
		return errors.Errorf("block should have had previous hash of %x but was %x", jl.lastHash, block.Header.PreviousHash)
149
150
151
152
153
	}

	jl.writeBlock(block)
	jl.lastHash = block.Header.Hash()
	jl.height++
154
155
156

	// Manage the signal channel under lock to avoid race with read in Next
	jl.mutex.Lock()
157
158
	close(jl.signal)
	jl.signal = make(chan struct{})
159
	jl.mutex.Unlock()
160
161
162
163
164
	return nil
}

// writeBlock commits a block to disk
func (jl *jsonLedger) writeBlock(block *cb.Block) {
165
166
167
168
169
170
	name := jl.blockFilename(block.Header.Number)

	fileLock.Lock()
	defer fileLock.Unlock()

	file, err := os.Create(name)
171
172
173
174
	if err != nil {
		panic(err)
	}
	defer file.Close()
175

176
177
178
	err = jl.marshaler.Marshal(file, block)
	logger.Debugf("Wrote block %d", block.Header.Number)
	if err != nil {
179
		logger.Panicf("Error marshalling with block number [%d]: %s", block.Header.Number, err)
180
181
182
183
184
185
186
187
	}
}

// blockFilename returns the fully qualified path to where a block
// of a given number should be stored on disk
func (jl *jsonLedger) blockFilename(number uint64) string {
	return filepath.Join(jl.directory, fmt.Sprintf(blockFileFormatString, number))
}