Commit 53e49bc7 authored by Srinivasan Muralidharan's avatar Srinivasan Muralidharan
Browse files

[FAB-6848] add channel ID to chaincode message

Chaincodes need the channel ID in the transaction context. This
CR follows the work in https://gerrit.hyperledger.org/r/#/c/14919

.

Change-Id: I0cc33e0b8c3279e069771b2925fc1f33ab912cd4
Signed-off-by: default avatarSrinivasan Muralidharan <srinivasan.muralidharan99@gmail.com>
parent 92ca115f
......@@ -77,6 +77,13 @@ public interface ChaincodeStub {
*/
String getTxId();
/**
* Returns the channel id
*
* @return the channel id
*/
String getChannelId();
/**
* Invoke another chaincode using the same transaction context.
*
......
......@@ -39,12 +39,14 @@ import com.google.protobuf.InvalidProtocolBufferException;
class ChaincodeStubImpl implements ChaincodeStub {
private final String channelId;
private final String txId;
private final Handler handler;
private final List<ByteString> args;
private ChaincodeEvent event;
ChaincodeStubImpl(String txId, Handler handler, List<ByteString> args) {
ChaincodeStubImpl(String channelId, String txId, Handler handler, List<ByteString> args) {
this.channelId = channelId;
this.txId = txId;
this.handler = handler;
this.args = Collections.unmodifiableList(args);
......@@ -90,6 +92,11 @@ class ChaincodeStubImpl implements ChaincodeStub {
return event;
}
@Override
public String getChannelId() {
return channelId;
}
@Override
public String getTxId() {
return txId;
......@@ -97,23 +104,23 @@ class ChaincodeStubImpl implements ChaincodeStub {
@Override
public byte[] getState(String key) {
return handler.getState(txId, key).toByteArray();
return handler.getState(channelId, txId, key).toByteArray();
}
@Override
public void putState(String key, byte[] value) {
handler.putState(txId, key, ByteString.copyFrom(value));
handler.putState(channelId, txId, key, ByteString.copyFrom(value));
}
@Override
public void delState(String key) {
handler.deleteState(txId, key);
handler.deleteState(channelId, txId, key);
}
@Override
public QueryResultsIterator<KeyValue> getStateByRange(String startKey, String endKey) {
return new QueryResultsIteratorImpl<KeyValue>(this.handler, getTxId(),
handler.getStateByRange(getTxId(), startKey, endKey),
return new QueryResultsIteratorImpl<KeyValue>(this.handler, getChannelId(), getTxId(),
handler.getStateByRange(getChannelId(), getTxId(), startKey, endKey),
queryResultBytesToKv.andThen(KeyValueImpl::new)
);
}
......@@ -145,16 +152,16 @@ class ChaincodeStubImpl implements ChaincodeStub {
@Override
public QueryResultsIterator<KeyValue> getQueryResult(String query) {
return new QueryResultsIteratorImpl<KeyValue>(this.handler, getTxId(),
handler.getQueryResult(getTxId(), query),
return new QueryResultsIteratorImpl<KeyValue>(this.handler, getChannelId(), getTxId(),
handler.getQueryResult(getChannelId(), getTxId(), query),
queryResultBytesToKv.andThen(KeyValueImpl::new)
);
}
@Override
public QueryResultsIterator<KeyModification> getHistoryForKey(String key) {
return new QueryResultsIteratorImpl<KeyModification>(this.handler, getTxId(),
handler.getHistoryForKey(getTxId(), key),
return new QueryResultsIteratorImpl<KeyModification>(this.handler, getChannelId(), getTxId(),
handler.getHistoryForKey(getChannelId(), getTxId(), key),
queryResultBytesToKeyModification.andThen(KeyModificationImpl::new)
);
}
......@@ -178,7 +185,7 @@ class ChaincodeStubImpl implements ChaincodeStub {
} else {
compositeName = chaincodeName;
}
return handler.invokeChaincode(this.txId, compositeName, args);
return handler.invokeChaincode(this.channelId, this.txId, compositeName, args);
}
}
......@@ -131,9 +131,10 @@ public class Handler {
}
}
private synchronized Channel<ChaincodeMessage> aquireResponseChannelForTx(final String txId) {
private synchronized Channel<ChaincodeMessage> aquireResponseChannelForTx(final String channelId, final String txId) {
final Channel<ChaincodeMessage> channel = new Channel<>();
if (this.responseChannel.putIfAbsent(txId, channel) != null) {
String key = channelId+txId;
if (this.responseChannel.putIfAbsent(key, channel) != null) {
throw new IllegalStateException(format("[%-8s]Response channel already exists. Another request must be pending.", txId));
}
if (logger.isTraceEnabled()) logger.trace(format("[%-8s]Response channel created.", txId));
......@@ -141,12 +142,13 @@ public class Handler {
}
private synchronized void sendChannel(ChaincodeMessage message) {
if (!responseChannel.containsKey(message.getTxid())) {
String key = message.getChannelId()+message.getTxid();
if (!responseChannel.containsKey(key)) {
throw new IllegalStateException(format("[%-8s]sendChannel does not exist", message.getTxid()));
}
logger.debug(String.format("[%-8s]Before send", message.getTxid()));
responseChannel.get(message.getTxid()).add(message);
responseChannel.get(key).add(message);
logger.debug(String.format("[%-8s]After send", message.getTxid()));
}
......@@ -162,32 +164,37 @@ public class Handler {
}
}
private synchronized void releaseResponseChannelForTx(String txId) {
final Channel<ChaincodeMessage> channel = responseChannel.remove(txId);
private synchronized void releaseResponseChannelForTx(String channelId, String txId) {
String key = channelId+txId;
final Channel<ChaincodeMessage> channel = responseChannel.remove(key);
if (channel != null) channel.close();
if (logger.isTraceEnabled()) logger.trace(format("[%-8s]Response channel closed.",txId));
}
/**
* Marks a UUID as either a transaction or a query
* Marks a CHANNELID+UUID as either a transaction or a query
*
* @param channelId
* channel ID to be marked
* @param uuid
* ID to be marked
* @param isTransaction
* true for transaction, false for query
* @return whether or not the UUID was successfully marked
*/
private synchronized boolean markIsTransaction(String uuid, boolean isTransaction) {
private synchronized boolean markIsTransaction(String channelId, String uuid, boolean isTransaction) {
if (this.isTransaction == null) {
return false;
}
this.isTransaction.put(uuid, isTransaction);
String key = channelId+uuid;
this.isTransaction.put(key, isTransaction);
return true;
}
private synchronized void deleteIsTransaction(String uuid) {
isTransaction.remove(uuid);
private synchronized void deleteIsTransaction(String channelId, String uuid) {
String key = channelId+uuid;
isTransaction.remove(key);
}
private void beforeRegistered(Event event) {
......@@ -209,11 +216,11 @@ public class Handler {
final ChaincodeInput input = ChaincodeInput.parseFrom(message.getPayload());
// Mark as a transaction (allow put/del state)
markIsTransaction(message.getTxid(), true);
markIsTransaction(message.getChannelId(), message.getTxid(), true);
// Create the ChaincodeStub which the chaincode can use to
// callback
final ChaincodeStub stub = new ChaincodeStubImpl(message.getTxid(), this, input.getArgsList());
final ChaincodeStub stub = new ChaincodeStubImpl(message.getChannelId(), message.getTxid(), this, input.getArgsList());
// Call chaincode's init
final Chaincode.Response result = chaincode.init(stub);
......@@ -221,19 +228,19 @@ public class Handler {
if (result.getStatus().getCode() >= Chaincode.Response.Status.INTERNAL_SERVER_ERROR.getCode()) {
// Send ERROR with entire result.Message as payload
logger.error(String.format("[%-8s]Init failed. Sending %s", message.getTxid(), ERROR));
triggerNextState(newErrorEventMessage(message.getTxid(), result.getMessage(), stub.getEvent()), true);
triggerNextState(newErrorEventMessage(message.getChannelId(), message.getTxid(), result.getMessage(), stub.getEvent()), true);
} else {
// Send COMPLETED with entire result as payload
logger.debug(String.format(String.format("[%-8s]Init succeeded. Sending %s", message.getTxid(), COMPLETED)));
triggerNextState(newCompletedEventMessage(message.getTxid(), result, stub.getEvent()), true);
triggerNextState(newCompletedEventMessage(message.getChannelId(), message.getTxid(), result, stub.getEvent()), true);
}
} catch (InvalidProtocolBufferException | RuntimeException e) {
logger.error(String.format("[%-8s]Init failed. Sending %s", message.getTxid(), ERROR), e);
triggerNextState(newErrorEventMessage(message.getTxid(), e), true);
triggerNextState(newErrorEventMessage(message.getChannelId(), message.getTxid(), e), true);
} finally {
// delete isTransaction entry
deleteIsTransaction(message.getTxid());
deleteIsTransaction(message.getChannelId(), message.getTxid());
}
}).start();
}
......@@ -259,11 +266,11 @@ public class Handler {
final ChaincodeInput input = ChaincodeInput.parseFrom(message.getPayload());
// Mark as a transaction (allow put/del state)
markIsTransaction(message.getTxid(), true);
markIsTransaction(message.getChannelId(), message.getTxid(), true);
// Create the ChaincodeStub which the chaincode can use to
// callback
final ChaincodeStub stub = new ChaincodeStubImpl(message.getTxid(), this, input.getArgsList());
final ChaincodeStub stub = new ChaincodeStubImpl(message.getChannelId(), message.getTxid(), this, input.getArgsList());
// Call chaincode's invoke
final Chaincode.Response result = chaincode.invoke(stub);
......@@ -271,19 +278,19 @@ public class Handler {
if (result.getStatus().getCode() >= Chaincode.Response.Status.INTERNAL_SERVER_ERROR.getCode()) {
// Send ERROR with entire result.Message as payload
logger.error(String.format("[%-8s]Invoke failed. Sending %s", message.getTxid(), ERROR));
triggerNextState(newErrorEventMessage(message.getTxid(), result.getMessage(), stub.getEvent()), true);
triggerNextState(newErrorEventMessage(message.getChannelId(), message.getTxid(), result.getMessage(), stub.getEvent()), true);
} else {
// Send COMPLETED with entire result as payload
logger.debug(String.format(String.format("[%-8s]Invoke succeeded. Sending %s", message.getTxid(), COMPLETED)));
triggerNextState(newCompletedEventMessage(message.getTxid(), result, stub.getEvent()), true);
triggerNextState(newCompletedEventMessage(message.getChannelId(), message.getTxid(), result, stub.getEvent()), true);
}
} catch (InvalidProtocolBufferException | RuntimeException e) {
logger.error(String.format("[%-8s]Invoke failed. Sending %s", message.getTxid(), ERROR), e);
triggerNextState(newErrorEventMessage(message.getTxid(), e), true);
triggerNextState(newErrorEventMessage(message.getChannelId(), message.getTxid(), e), true);
} finally {
// delete isTransaction entry
deleteIsTransaction(message.getTxid());
deleteIsTransaction(message.getChannelId(), message.getTxid());
}
}).start();
}
......@@ -344,59 +351,60 @@ public class Handler {
}
// handleGetState communicates with the validator to fetch the requested state information from the ledger.
ByteString getState(String txId, String key) {
return invokeChaincodeSupport(newGetStateEventMessage(txId, key));
ByteString getState(String channelId, String txId, String key) {
return invokeChaincodeSupport(newGetStateEventMessage(channelId, txId, key));
}
private boolean isTransaction(String uuid) {
return isTransaction.containsKey(uuid) && isTransaction.get(uuid);
private boolean isTransaction(String channelId, String uuid) {
String key = channelId+uuid;
return isTransaction.containsKey(key) && isTransaction.get(key);
}
void putState(String txId, String key, ByteString value) {
logger.debug(format("[%-8s]Inside putstate (\"%s\":\"%s\"), isTransaction = %s", txId, key, value, isTransaction(txId)));
if (!isTransaction(txId)) throw new IllegalStateException("Cannot put state in query context");
invokeChaincodeSupport(newPutStateEventMessage(txId, key, value));
void putState(String channelId, String txId, String key, ByteString value) {
logger.debug(format("[%-8s]Inside putstate (\"%s\":\"%s\"), isTransaction = %s", txId, key, value, isTransaction(channelId, txId)));
if (!isTransaction(channelId, txId)) throw new IllegalStateException("Cannot put state in query context");
invokeChaincodeSupport(newPutStateEventMessage(channelId, txId, key, value));
}
void deleteState(String txId, String key) {
if (!isTransaction(txId)) throw new RuntimeException("Cannot del state in query context");
invokeChaincodeSupport(newDeleteStateEventMessage(txId, key));
void deleteState(String channelId, String txId, String key) {
if (!isTransaction(channelId, txId)) throw new RuntimeException("Cannot del state in query context");
invokeChaincodeSupport(newDeleteStateEventMessage(channelId, txId, key));
}
QueryResponse getStateByRange(String txId, String startKey, String endKey) {
return invokeQueryResponseMessage(txId, GET_STATE_BY_RANGE, GetStateByRange.newBuilder()
QueryResponse getStateByRange(String channelId, String txId, String startKey, String endKey) {
return invokeQueryResponseMessage(channelId, txId, GET_STATE_BY_RANGE, GetStateByRange.newBuilder()
.setStartKey(startKey)
.setEndKey(endKey)
.build().toByteString());
}
QueryResponse queryStateNext(String txId, String queryId) {
return invokeQueryResponseMessage(txId, QUERY_STATE_NEXT, QueryStateNext.newBuilder()
QueryResponse queryStateNext(String channelId, String txId, String queryId) {
return invokeQueryResponseMessage(channelId, txId, QUERY_STATE_NEXT, QueryStateNext.newBuilder()
.setId(queryId)
.build().toByteString());
}
void queryStateClose(String txId, String queryId) {
invokeQueryResponseMessage(txId, QUERY_STATE_CLOSE, QueryStateClose.newBuilder()
void queryStateClose(String channelId, String txId, String queryId) {
invokeQueryResponseMessage(channelId, txId, QUERY_STATE_CLOSE, QueryStateClose.newBuilder()
.setId(queryId)
.build().toByteString());
}
QueryResponse getQueryResult(String txId, String query) {
return invokeQueryResponseMessage(txId, GET_QUERY_RESULT, GetQueryResult.newBuilder()
QueryResponse getQueryResult(String channelId, String txId, String query) {
return invokeQueryResponseMessage(channelId, txId, GET_QUERY_RESULT, GetQueryResult.newBuilder()
.setQuery(query)
.build().toByteString());
}
QueryResponse getHistoryForKey(String txId, String key) {
return invokeQueryResponseMessage(txId, Type.GET_HISTORY_FOR_KEY, GetQueryResult.newBuilder()
QueryResponse getHistoryForKey(String channelId, String txId, String key) {
return invokeQueryResponseMessage(channelId, txId, Type.GET_HISTORY_FOR_KEY, GetQueryResult.newBuilder()
.setQuery(key)
.build().toByteString());
}
private QueryResponse invokeQueryResponseMessage(String txId, ChaincodeMessage.Type type, ByteString payload) {
private QueryResponse invokeQueryResponseMessage(String channelId, String txId, ChaincodeMessage.Type type, ByteString payload) {
try {
return QueryResponse.parseFrom(invokeChaincodeSupport(newEventMessage(type, txId, payload)));
return QueryResponse.parseFrom(invokeChaincodeSupport(newEventMessage(type, channelId, txId, payload)));
} catch (InvalidProtocolBufferException e) {
logger.error(String.format("[%-8s]unmarshall error", txId));
throw new RuntimeException("Error unmarshalling QueryResponse.", e);
......@@ -404,11 +412,12 @@ public class Handler {
}
private ByteString invokeChaincodeSupport(final ChaincodeMessage message) {
final String channelId = message.getChannelId();
final String txId = message.getTxid();
try {
// create a new response channel
Channel<ChaincodeMessage> responseChannel = aquireResponseChannelForTx(txId);
Channel<ChaincodeMessage> responseChannel = aquireResponseChannelForTx(channelId, txId);
// send the message
serialSend(message);
......@@ -430,11 +439,11 @@ public class Handler {
throw new RuntimeException(format("[%-8s]Unexpected %s response received. Expected %s or %s.", txId, response.getType(), RESPONSE, ERROR));
}
} finally {
releaseResponseChannelForTx(txId);
releaseResponseChannelForTx(channelId, txId);
}
}
Chaincode.Response invokeChaincode(String txId, String chaincodeName, List<byte[]> args) {
Chaincode.Response invokeChaincode(String channelId, String txId, String chaincodeName, List<byte[]> args) {
try {
// create invocation specification of the chaincode to invoke
final ChaincodeSpec invocationSpec = ChaincodeSpec.newBuilder()
......@@ -447,7 +456,7 @@ public class Handler {
.build();
// invoke other chaincode
final ByteString payload = invokeChaincodeSupport(newInvokeChaincodeMessage(txId, invocationSpec.toByteString()));
final ByteString payload = invokeChaincodeSupport(newInvokeChaincodeMessage(channelId, txId, invocationSpec.toByteString()));
// response message payload should be yet another chaincode
// message (the actual response message)
......@@ -481,7 +490,7 @@ public class Handler {
if (fsm.eventCannotOccur(message.getType().toString())) {
String errStr = String.format("[%s]Chaincode handler org.hyperledger.fabric.shim.fsm cannot handle message (%s) with payload size (%d) while in state: %s", message.getTxid(), message.getType(), message.getPayload().size(), fsm.current());
serialSend(newErrorEventMessage(message.getTxid(), errStr));
serialSend(newErrorEventMessage(message.getChannelId(), message.getTxid(), errStr));
throw new RuntimeException(errStr);
}
......@@ -510,61 +519,63 @@ public class Handler {
return new Chaincode.Response(Chaincode.Response.Status.INTERNAL_SERVER_ERROR, message, null);
}
private static ChaincodeMessage newGetStateEventMessage(final String txId, final String key) {
return newEventMessage(GET_STATE, txId, GetState.newBuilder()
private static ChaincodeMessage newGetStateEventMessage(final String channelId, final String txId, final String key) {
return newEventMessage(GET_STATE, channelId, txId, GetState.newBuilder()
.setKey(key)
.build().toByteString());
}
private static ChaincodeMessage newPutStateEventMessage(final String txId, final String key, final ByteString value) {
return newEventMessage(PUT_STATE, txId, PutState.newBuilder()
private static ChaincodeMessage newPutStateEventMessage(final String channelId, final String txId, final String key, final ByteString value) {
return newEventMessage(PUT_STATE, channelId, txId, PutState.newBuilder()
.setKey(key)
.setValue(value)
.build().toByteString());
}
private static ChaincodeMessage newDeleteStateEventMessage(final String txId, final String key) {
return newEventMessage(DEL_STATE, txId, DelState.newBuilder()
private static ChaincodeMessage newDeleteStateEventMessage(final String channelId, final String txId, final String key) {
return newEventMessage(DEL_STATE, channelId, txId, DelState.newBuilder()
.setKey(key)
.build().toByteString());
}
private static ChaincodeMessage newErrorEventMessage(final String txId, final Throwable throwable) {
return newErrorEventMessage(txId, printStackTrace(throwable));
private static ChaincodeMessage newErrorEventMessage(final String channelId, final String txId, final Throwable throwable) {
return newErrorEventMessage(channelId, txId, printStackTrace(throwable));
}
private static ChaincodeMessage newErrorEventMessage(final String txId, final String message) {
return newErrorEventMessage(txId, message, null);
private static ChaincodeMessage newErrorEventMessage(final String channelId, final String txId, final String message) {
return newErrorEventMessage(channelId, txId, message, null);
}
private static ChaincodeMessage newErrorEventMessage(final String txId, final String message, final ChaincodeEvent event) {
return newEventMessage(ERROR, txId, ByteString.copyFromUtf8(message), event);
private static ChaincodeMessage newErrorEventMessage(final String channelId, final String txId, final String message, final ChaincodeEvent event) {
return newEventMessage(ERROR, channelId, txId, ByteString.copyFromUtf8(message), event);
}
private static ChaincodeMessage newCompletedEventMessage(final String txId, final Chaincode.Response response, final ChaincodeEvent event) {
return newEventMessage(COMPLETED, txId, toProtoResponse(response).toByteString(), event);
private static ChaincodeMessage newCompletedEventMessage(final String channelId, final String txId, final Chaincode.Response response, final ChaincodeEvent event) {
return newEventMessage(COMPLETED, channelId, txId, toProtoResponse(response).toByteString(), event);
}
private static ChaincodeMessage newInvokeChaincodeMessage(final String txId, final ByteString payload) {
return newEventMessage(INVOKE_CHAINCODE, txId, payload, null);
private static ChaincodeMessage newInvokeChaincodeMessage(final String channelId, final String txId, final ByteString payload) {
return newEventMessage(INVOKE_CHAINCODE, channelId, txId, payload, null);
}
private static ChaincodeMessage newEventMessage(final Type type, final String txId, final ByteString payload) {
return newEventMessage(type, txId, payload, null);
private static ChaincodeMessage newEventMessage(final Type type, final String channelId, final String txId, final ByteString payload) {
return newEventMessage(type, channelId, txId, payload, null);
}
private static ChaincodeMessage newEventMessage(final Type type, final String txId, final ByteString payload, final ChaincodeEvent event) {
private static ChaincodeMessage newEventMessage(final Type type, final String channelId, final String txId, final ByteString payload, final ChaincodeEvent event) {
if (event == null) {
return ChaincodeMessage.newBuilder()
.setType(type)
.setTxid(txId)
.setPayload(payload)
.setChannelId(channelId)
.build();
} else {
return ChaincodeMessage.newBuilder()
.setType(type)
.setTxid(txId)
.setPayload(payload)
.setChannelId(channelId)
.setChaincodeEvent(event)
.build();
}
......
......@@ -27,13 +27,15 @@ import org.hyperledger.fabric.shim.ledger.QueryResultsIterator;
class QueryResultsIteratorImpl<T> implements QueryResultsIterator<T> {
private final Handler handler;
private final String channelId;
private final String txId;
private Iterator<QueryResultBytes> currentIterator;
private QueryResponse currentQueryResponse;
private Function<QueryResultBytes, T> mapper;
public QueryResultsIteratorImpl(final Handler handler, final String txId, final QueryResponse queryResponse, Function<QueryResultBytes, T> mapper) {
public QueryResultsIteratorImpl(final Handler handler, final String channelId, final String txId, final QueryResponse queryResponse, Function<QueryResultBytes, T> mapper) {
this.handler = handler;
this.channelId = channelId;
this.txId = txId;
this.currentQueryResponse = queryResponse;
this.currentIterator = currentQueryResponse.getResultsList().iterator();
......@@ -59,7 +61,7 @@ class QueryResultsIteratorImpl<T> implements QueryResultsIterator<T> {
if(!currentQueryResponse.getHasMore()) throw new NoSuchElementException();
// get more results from peer
currentQueryResponse = handler.queryStateNext(txId, currentQueryResponse.getId());
currentQueryResponse = handler.queryStateNext(channelId, txId, currentQueryResponse.getId());
currentIterator = currentQueryResponse.getResultsList().iterator();
// return next fetched result
......@@ -72,7 +74,7 @@ class QueryResultsIteratorImpl<T> implements QueryResultsIterator<T> {
@Override
public void close() throws Exception {
this.handler.queryStateClose(txId, currentQueryResponse.getId());
this.handler.queryStateClose(channelId, txId, currentQueryResponse.getId());
this.currentIterator = Collections.emptyIterator();
this.currentQueryResponse = QueryResponse.newBuilder().setHasMore(false).build();
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment