Commit decb95f8 authored by Jim Zhang's avatar Jim Zhang Committed by Gerrit Code Review
Browse files

Merge "[FAB-6848] add channel ID to chaincode message"

parents 67784391 53e49bc7
...@@ -77,6 +77,13 @@ public interface ChaincodeStub { ...@@ -77,6 +77,13 @@ public interface ChaincodeStub {
*/ */
String getTxId(); String getTxId();
/**
* Returns the channel id
*
* @return the channel id
*/
String getChannelId();
/** /**
* Invoke another chaincode using the same transaction context. * Invoke another chaincode using the same transaction context.
* *
......
...@@ -39,12 +39,14 @@ import com.google.protobuf.InvalidProtocolBufferException; ...@@ -39,12 +39,14 @@ import com.google.protobuf.InvalidProtocolBufferException;
class ChaincodeStubImpl implements ChaincodeStub { class ChaincodeStubImpl implements ChaincodeStub {
private final String channelId;
private final String txId; private final String txId;
private final Handler handler; private final Handler handler;
private final List<ByteString> args; private final List<ByteString> args;
private ChaincodeEvent event; private ChaincodeEvent event;
ChaincodeStubImpl(String txId, Handler handler, List<ByteString> args) { ChaincodeStubImpl(String channelId, String txId, Handler handler, List<ByteString> args) {
this.channelId = channelId;
this.txId = txId; this.txId = txId;
this.handler = handler; this.handler = handler;
this.args = Collections.unmodifiableList(args); this.args = Collections.unmodifiableList(args);
...@@ -90,6 +92,11 @@ class ChaincodeStubImpl implements ChaincodeStub { ...@@ -90,6 +92,11 @@ class ChaincodeStubImpl implements ChaincodeStub {
return event; return event;
} }
@Override
public String getChannelId() {
return channelId;
}
@Override @Override
public String getTxId() { public String getTxId() {
return txId; return txId;
...@@ -97,23 +104,23 @@ class ChaincodeStubImpl implements ChaincodeStub { ...@@ -97,23 +104,23 @@ class ChaincodeStubImpl implements ChaincodeStub {
@Override @Override
public byte[] getState(String key) { public byte[] getState(String key) {
return handler.getState(txId, key).toByteArray(); return handler.getState(channelId, txId, key).toByteArray();
} }
@Override @Override
public void putState(String key, byte[] value) { public void putState(String key, byte[] value) {
handler.putState(txId, key, ByteString.copyFrom(value)); handler.putState(channelId, txId, key, ByteString.copyFrom(value));
} }
@Override @Override
public void delState(String key) { public void delState(String key) {
handler.deleteState(txId, key); handler.deleteState(channelId, txId, key);
} }
@Override @Override
public QueryResultsIterator<KeyValue> getStateByRange(String startKey, String endKey) { public QueryResultsIterator<KeyValue> getStateByRange(String startKey, String endKey) {
return new QueryResultsIteratorImpl<KeyValue>(this.handler, getTxId(), return new QueryResultsIteratorImpl<KeyValue>(this.handler, getChannelId(), getTxId(),
handler.getStateByRange(getTxId(), startKey, endKey), handler.getStateByRange(getChannelId(), getTxId(), startKey, endKey),
queryResultBytesToKv.andThen(KeyValueImpl::new) queryResultBytesToKv.andThen(KeyValueImpl::new)
); );
} }
...@@ -145,16 +152,16 @@ class ChaincodeStubImpl implements ChaincodeStub { ...@@ -145,16 +152,16 @@ class ChaincodeStubImpl implements ChaincodeStub {
@Override @Override
public QueryResultsIterator<KeyValue> getQueryResult(String query) { public QueryResultsIterator<KeyValue> getQueryResult(String query) {
return new QueryResultsIteratorImpl<KeyValue>(this.handler, getTxId(), return new QueryResultsIteratorImpl<KeyValue>(this.handler, getChannelId(), getTxId(),
handler.getQueryResult(getTxId(), query), handler.getQueryResult(getChannelId(), getTxId(), query),
queryResultBytesToKv.andThen(KeyValueImpl::new) queryResultBytesToKv.andThen(KeyValueImpl::new)
); );
} }
@Override @Override
public QueryResultsIterator<KeyModification> getHistoryForKey(String key) { public QueryResultsIterator<KeyModification> getHistoryForKey(String key) {
return new QueryResultsIteratorImpl<KeyModification>(this.handler, getTxId(), return new QueryResultsIteratorImpl<KeyModification>(this.handler, getChannelId(), getTxId(),
handler.getHistoryForKey(getTxId(), key), handler.getHistoryForKey(getChannelId(), getTxId(), key),
queryResultBytesToKeyModification.andThen(KeyModificationImpl::new) queryResultBytesToKeyModification.andThen(KeyModificationImpl::new)
); );
} }
...@@ -178,7 +185,7 @@ class ChaincodeStubImpl implements ChaincodeStub { ...@@ -178,7 +185,7 @@ class ChaincodeStubImpl implements ChaincodeStub {
} else { } else {
compositeName = chaincodeName; compositeName = chaincodeName;
} }
return handler.invokeChaincode(this.txId, compositeName, args); return handler.invokeChaincode(this.channelId, this.txId, compositeName, args);
} }
} }
...@@ -131,9 +131,10 @@ public class Handler { ...@@ -131,9 +131,10 @@ public class Handler {
} }
} }
private synchronized Channel<ChaincodeMessage> aquireResponseChannelForTx(final String txId) { private synchronized Channel<ChaincodeMessage> aquireResponseChannelForTx(final String channelId, final String txId) {
final Channel<ChaincodeMessage> channel = new Channel<>(); final Channel<ChaincodeMessage> channel = new Channel<>();
if (this.responseChannel.putIfAbsent(txId, channel) != null) { String key = channelId+txId;
if (this.responseChannel.putIfAbsent(key, channel) != null) {
throw new IllegalStateException(format("[%-8s]Response channel already exists. Another request must be pending.", txId)); throw new IllegalStateException(format("[%-8s]Response channel already exists. Another request must be pending.", txId));
} }
if (logger.isTraceEnabled()) logger.trace(format("[%-8s]Response channel created.", txId)); if (logger.isTraceEnabled()) logger.trace(format("[%-8s]Response channel created.", txId));
...@@ -141,12 +142,13 @@ public class Handler { ...@@ -141,12 +142,13 @@ public class Handler {
} }
private synchronized void sendChannel(ChaincodeMessage message) { private synchronized void sendChannel(ChaincodeMessage message) {
if (!responseChannel.containsKey(message.getTxid())) { String key = message.getChannelId()+message.getTxid();
if (!responseChannel.containsKey(key)) {
throw new IllegalStateException(format("[%-8s]sendChannel does not exist", message.getTxid())); throw new IllegalStateException(format("[%-8s]sendChannel does not exist", message.getTxid()));
} }
logger.debug(String.format("[%-8s]Before send", message.getTxid())); logger.debug(String.format("[%-8s]Before send", message.getTxid()));
responseChannel.get(message.getTxid()).add(message); responseChannel.get(key).add(message);
logger.debug(String.format("[%-8s]After send", message.getTxid())); logger.debug(String.format("[%-8s]After send", message.getTxid()));
} }
...@@ -162,32 +164,37 @@ public class Handler { ...@@ -162,32 +164,37 @@ public class Handler {
} }
} }
private synchronized void releaseResponseChannelForTx(String txId) { private synchronized void releaseResponseChannelForTx(String channelId, String txId) {
final Channel<ChaincodeMessage> channel = responseChannel.remove(txId); String key = channelId+txId;
final Channel<ChaincodeMessage> channel = responseChannel.remove(key);
if (channel != null) channel.close(); if (channel != null) channel.close();
if (logger.isTraceEnabled()) logger.trace(format("[%-8s]Response channel closed.",txId)); if (logger.isTraceEnabled()) logger.trace(format("[%-8s]Response channel closed.",txId));
} }
/** /**
* Marks a UUID as either a transaction or a query * Marks a CHANNELID+UUID as either a transaction or a query
* *
* @param channelId
* channel ID to be marked
* @param uuid * @param uuid
* ID to be marked * ID to be marked
* @param isTransaction * @param isTransaction
* true for transaction, false for query * true for transaction, false for query
* @return whether or not the UUID was successfully marked * @return whether or not the UUID was successfully marked
*/ */
private synchronized boolean markIsTransaction(String uuid, boolean isTransaction) { private synchronized boolean markIsTransaction(String channelId, String uuid, boolean isTransaction) {
if (this.isTransaction == null) { if (this.isTransaction == null) {
return false; return false;
} }
this.isTransaction.put(uuid, isTransaction); String key = channelId+uuid;
this.isTransaction.put(key, isTransaction);
return true; return true;
} }
private synchronized void deleteIsTransaction(String uuid) { private synchronized void deleteIsTransaction(String channelId, String uuid) {
isTransaction.remove(uuid); String key = channelId+uuid;
isTransaction.remove(key);
} }
private void beforeRegistered(Event event) { private void beforeRegistered(Event event) {
...@@ -209,11 +216,11 @@ public class Handler { ...@@ -209,11 +216,11 @@ public class Handler {
final ChaincodeInput input = ChaincodeInput.parseFrom(message.getPayload()); final ChaincodeInput input = ChaincodeInput.parseFrom(message.getPayload());
// Mark as a transaction (allow put/del state) // Mark as a transaction (allow put/del state)
markIsTransaction(message.getTxid(), true); markIsTransaction(message.getChannelId(), message.getTxid(), true);
// Create the ChaincodeStub which the chaincode can use to // Create the ChaincodeStub which the chaincode can use to
// callback // callback
final ChaincodeStub stub = new ChaincodeStubImpl(message.getTxid(), this, input.getArgsList()); final ChaincodeStub stub = new ChaincodeStubImpl(message.getChannelId(), message.getTxid(), this, input.getArgsList());
// Call chaincode's init // Call chaincode's init
final Chaincode.Response result = chaincode.init(stub); final Chaincode.Response result = chaincode.init(stub);
...@@ -221,19 +228,19 @@ public class Handler { ...@@ -221,19 +228,19 @@ public class Handler {
if (result.getStatus().getCode() >= Chaincode.Response.Status.INTERNAL_SERVER_ERROR.getCode()) { if (result.getStatus().getCode() >= Chaincode.Response.Status.INTERNAL_SERVER_ERROR.getCode()) {
// Send ERROR with entire result.Message as payload // Send ERROR with entire result.Message as payload
logger.error(String.format("[%-8s]Init failed. Sending %s", message.getTxid(), ERROR)); logger.error(String.format("[%-8s]Init failed. Sending %s", message.getTxid(), ERROR));
triggerNextState(newErrorEventMessage(message.getTxid(), result.getMessage(), stub.getEvent()), true); triggerNextState(newErrorEventMessage(message.getChannelId(), message.getTxid(), result.getMessage(), stub.getEvent()), true);
} else { } else {
// Send COMPLETED with entire result as payload // Send COMPLETED with entire result as payload
logger.debug(String.format(String.format("[%-8s]Init succeeded. Sending %s", message.getTxid(), COMPLETED))); logger.debug(String.format(String.format("[%-8s]Init succeeded. Sending %s", message.getTxid(), COMPLETED)));
triggerNextState(newCompletedEventMessage(message.getTxid(), result, stub.getEvent()), true); triggerNextState(newCompletedEventMessage(message.getChannelId(), message.getTxid(), result, stub.getEvent()), true);
} }
} catch (InvalidProtocolBufferException | RuntimeException e) { } catch (InvalidProtocolBufferException | RuntimeException e) {
logger.error(String.format("[%-8s]Init failed. Sending %s", message.getTxid(), ERROR), e); logger.error(String.format("[%-8s]Init failed. Sending %s", message.getTxid(), ERROR), e);
triggerNextState(newErrorEventMessage(message.getTxid(), e), true); triggerNextState(newErrorEventMessage(message.getChannelId(), message.getTxid(), e), true);
} finally { } finally {
// delete isTransaction entry // delete isTransaction entry
deleteIsTransaction(message.getTxid()); deleteIsTransaction(message.getChannelId(), message.getTxid());
} }
}).start(); }).start();
} }
...@@ -259,11 +266,11 @@ public class Handler { ...@@ -259,11 +266,11 @@ public class Handler {
final ChaincodeInput input = ChaincodeInput.parseFrom(message.getPayload()); final ChaincodeInput input = ChaincodeInput.parseFrom(message.getPayload());
// Mark as a transaction (allow put/del state) // Mark as a transaction (allow put/del state)
markIsTransaction(message.getTxid(), true); markIsTransaction(message.getChannelId(), message.getTxid(), true);
// Create the ChaincodeStub which the chaincode can use to // Create the ChaincodeStub which the chaincode can use to
// callback // callback
final ChaincodeStub stub = new ChaincodeStubImpl(message.getTxid(), this, input.getArgsList()); final ChaincodeStub stub = new ChaincodeStubImpl(message.getChannelId(), message.getTxid(), this, input.getArgsList());
// Call chaincode's invoke // Call chaincode's invoke
final Chaincode.Response result = chaincode.invoke(stub); final Chaincode.Response result = chaincode.invoke(stub);
...@@ -271,19 +278,19 @@ public class Handler { ...@@ -271,19 +278,19 @@ public class Handler {
if (result.getStatus().getCode() >= Chaincode.Response.Status.INTERNAL_SERVER_ERROR.getCode()) { if (result.getStatus().getCode() >= Chaincode.Response.Status.INTERNAL_SERVER_ERROR.getCode()) {
// Send ERROR with entire result.Message as payload // Send ERROR with entire result.Message as payload
logger.error(String.format("[%-8s]Invoke failed. Sending %s", message.getTxid(), ERROR)); logger.error(String.format("[%-8s]Invoke failed. Sending %s", message.getTxid(), ERROR));
triggerNextState(newErrorEventMessage(message.getTxid(), result.getMessage(), stub.getEvent()), true); triggerNextState(newErrorEventMessage(message.getChannelId(), message.getTxid(), result.getMessage(), stub.getEvent()), true);
} else { } else {
// Send COMPLETED with entire result as payload // Send COMPLETED with entire result as payload
logger.debug(String.format(String.format("[%-8s]Invoke succeeded. Sending %s", message.getTxid(), COMPLETED))); logger.debug(String.format(String.format("[%-8s]Invoke succeeded. Sending %s", message.getTxid(), COMPLETED)));
triggerNextState(newCompletedEventMessage(message.getTxid(), result, stub.getEvent()), true); triggerNextState(newCompletedEventMessage(message.getChannelId(), message.getTxid(), result, stub.getEvent()), true);
} }
} catch (InvalidProtocolBufferException | RuntimeException e) { } catch (InvalidProtocolBufferException | RuntimeException e) {
logger.error(String.format("[%-8s]Invoke failed. Sending %s", message.getTxid(), ERROR), e); logger.error(String.format("[%-8s]Invoke failed. Sending %s", message.getTxid(), ERROR), e);
triggerNextState(newErrorEventMessage(message.getTxid(), e), true); triggerNextState(newErrorEventMessage(message.getChannelId(), message.getTxid(), e), true);
} finally { } finally {
// delete isTransaction entry // delete isTransaction entry
deleteIsTransaction(message.getTxid()); deleteIsTransaction(message.getChannelId(), message.getTxid());
} }
}).start(); }).start();
} }
...@@ -344,59 +351,60 @@ public class Handler { ...@@ -344,59 +351,60 @@ public class Handler {
} }
// handleGetState communicates with the validator to fetch the requested state information from the ledger. // handleGetState communicates with the validator to fetch the requested state information from the ledger.
ByteString getState(String txId, String key) { ByteString getState(String channelId, String txId, String key) {
return invokeChaincodeSupport(newGetStateEventMessage(txId, key)); return invokeChaincodeSupport(newGetStateEventMessage(channelId, txId, key));
} }
private boolean isTransaction(String uuid) { private boolean isTransaction(String channelId, String uuid) {
return isTransaction.containsKey(uuid) && isTransaction.get(uuid); String key = channelId+uuid;
return isTransaction.containsKey(key) && isTransaction.get(key);
} }
void putState(String txId, String key, ByteString value) { void putState(String channelId, String txId, String key, ByteString value) {
logger.debug(format("[%-8s]Inside putstate (\"%s\":\"%s\"), isTransaction = %s", txId, key, value, isTransaction(txId))); logger.debug(format("[%-8s]Inside putstate (\"%s\":\"%s\"), isTransaction = %s", txId, key, value, isTransaction(channelId, txId)));
if (!isTransaction(txId)) throw new IllegalStateException("Cannot put state in query context"); if (!isTransaction(channelId, txId)) throw new IllegalStateException("Cannot put state in query context");
invokeChaincodeSupport(newPutStateEventMessage(txId, key, value)); invokeChaincodeSupport(newPutStateEventMessage(channelId, txId, key, value));
} }
void deleteState(String txId, String key) { void deleteState(String channelId, String txId, String key) {
if (!isTransaction(txId)) throw new RuntimeException("Cannot del state in query context"); if (!isTransaction(channelId, txId)) throw new RuntimeException("Cannot del state in query context");
invokeChaincodeSupport(newDeleteStateEventMessage(txId, key)); invokeChaincodeSupport(newDeleteStateEventMessage(channelId, txId, key));
} }
QueryResponse getStateByRange(String txId, String startKey, String endKey) { QueryResponse getStateByRange(String channelId, String txId, String startKey, String endKey) {
return invokeQueryResponseMessage(txId, GET_STATE_BY_RANGE, GetStateByRange.newBuilder() return invokeQueryResponseMessage(channelId, txId, GET_STATE_BY_RANGE, GetStateByRange.newBuilder()
.setStartKey(startKey) .setStartKey(startKey)
.setEndKey(endKey) .setEndKey(endKey)
.build().toByteString()); .build().toByteString());
} }
QueryResponse queryStateNext(String txId, String queryId) { QueryResponse queryStateNext(String channelId, String txId, String queryId) {
return invokeQueryResponseMessage(txId, QUERY_STATE_NEXT, QueryStateNext.newBuilder() return invokeQueryResponseMessage(channelId, txId, QUERY_STATE_NEXT, QueryStateNext.newBuilder()
.setId(queryId) .setId(queryId)
.build().toByteString()); .build().toByteString());
} }
void queryStateClose(String txId, String queryId) { void queryStateClose(String channelId, String txId, String queryId) {
invokeQueryResponseMessage(txId, QUERY_STATE_CLOSE, QueryStateClose.newBuilder() invokeQueryResponseMessage(channelId, txId, QUERY_STATE_CLOSE, QueryStateClose.newBuilder()
.setId(queryId) .setId(queryId)
.build().toByteString()); .build().toByteString());
} }
QueryResponse getQueryResult(String txId, String query) { QueryResponse getQueryResult(String channelId, String txId, String query) {
return invokeQueryResponseMessage(txId, GET_QUERY_RESULT, GetQueryResult.newBuilder() return invokeQueryResponseMessage(channelId, txId, GET_QUERY_RESULT, GetQueryResult.newBuilder()
.setQuery(query) .setQuery(query)
.build().toByteString()); .build().toByteString());
} }
QueryResponse getHistoryForKey(String txId, String key) { QueryResponse getHistoryForKey(String channelId, String txId, String key) {
return invokeQueryResponseMessage(txId, Type.GET_HISTORY_FOR_KEY, GetQueryResult.newBuilder() return invokeQueryResponseMessage(channelId, txId, Type.GET_HISTORY_FOR_KEY, GetQueryResult.newBuilder()
.setQuery(key) .setQuery(key)
.build().toByteString()); .build().toByteString());
} }
private QueryResponse invokeQueryResponseMessage(String txId, ChaincodeMessage.Type type, ByteString payload) { private QueryResponse invokeQueryResponseMessage(String channelId, String txId, ChaincodeMessage.Type type, ByteString payload) {
try { try {
return QueryResponse.parseFrom(invokeChaincodeSupport(newEventMessage(type, txId, payload))); return QueryResponse.parseFrom(invokeChaincodeSupport(newEventMessage(type, channelId, txId, payload)));
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
logger.error(String.format("[%-8s]unmarshall error", txId)); logger.error(String.format("[%-8s]unmarshall error", txId));
throw new RuntimeException("Error unmarshalling QueryResponse.", e); throw new RuntimeException("Error unmarshalling QueryResponse.", e);
...@@ -404,11 +412,12 @@ public class Handler { ...@@ -404,11 +412,12 @@ public class Handler {
} }
private ByteString invokeChaincodeSupport(final ChaincodeMessage message) { private ByteString invokeChaincodeSupport(final ChaincodeMessage message) {
final String channelId = message.getChannelId();
final String txId = message.getTxid(); final String txId = message.getTxid();
try { try {
// create a new response channel // create a new response channel
Channel<ChaincodeMessage> responseChannel = aquireResponseChannelForTx(txId); Channel<ChaincodeMessage> responseChannel = aquireResponseChannelForTx(channelId, txId);
// send the message // send the message
serialSend(message); serialSend(message);
...@@ -430,11 +439,11 @@ public class Handler { ...@@ -430,11 +439,11 @@ public class Handler {
throw new RuntimeException(format("[%-8s]Unexpected %s response received. Expected %s or %s.", txId, response.getType(), RESPONSE, ERROR)); throw new RuntimeException(format("[%-8s]Unexpected %s response received. Expected %s or %s.", txId, response.getType(), RESPONSE, ERROR));
} }
} finally { } finally {
releaseResponseChannelForTx(txId); releaseResponseChannelForTx(channelId, txId);
} }
} }
Chaincode.Response invokeChaincode(String txId, String chaincodeName, List<byte[]> args) { Chaincode.Response invokeChaincode(String channelId, String txId, String chaincodeName, List<byte[]> args) {
try { try {
// create invocation specification of the chaincode to invoke // create invocation specification of the chaincode to invoke
final ChaincodeSpec invocationSpec = ChaincodeSpec.newBuilder() final ChaincodeSpec invocationSpec = ChaincodeSpec.newBuilder()
...@@ -447,7 +456,7 @@ public class Handler { ...@@ -447,7 +456,7 @@ public class Handler {
.build(); .build();
// invoke other chaincode // invoke other chaincode
final ByteString payload = invokeChaincodeSupport(newInvokeChaincodeMessage(txId, invocationSpec.toByteString())); final ByteString payload = invokeChaincodeSupport(newInvokeChaincodeMessage(channelId, txId, invocationSpec.toByteString()));
// response message payload should be yet another chaincode // response message payload should be yet another chaincode
// message (the actual response message) // message (the actual response message)
...@@ -481,7 +490,7 @@ public class Handler { ...@@ -481,7 +490,7 @@ public class Handler {
if (fsm.eventCannotOccur(message.getType().toString())) { if (fsm.eventCannotOccur(message.getType().toString())) {
String errStr = String.format("[%s]Chaincode handler org.hyperledger.fabric.shim.fsm cannot handle message (%s) with payload size (%d) while in state: %s", message.getTxid(), message.getType(), message.getPayload().size(), fsm.current()); String errStr = String.format("[%s]Chaincode handler org.hyperledger.fabric.shim.fsm cannot handle message (%s) with payload size (%d) while in state: %s", message.getTxid(), message.getType(), message.getPayload().size(), fsm.current());
serialSend(newErrorEventMessage(message.getTxid(), errStr)); serialSend(newErrorEventMessage(message.getChannelId(), message.getTxid(), errStr));
throw new RuntimeException(errStr); throw new RuntimeException(errStr);
} }
...@@ -510,61 +519,63 @@ public class Handler { ...@@ -510,61 +519,63 @@ public class Handler {
return new Chaincode.Response(Chaincode.Response.Status.INTERNAL_SERVER_ERROR, message, null); return new Chaincode.Response(Chaincode.Response.Status.INTERNAL_SERVER_ERROR, message, null);
} }
private static ChaincodeMessage newGetStateEventMessage(final String txId, final String key) { private static ChaincodeMessage newGetStateEventMessage(final String channelId, final String txId, final String key) {
return newEventMessage(GET_STATE, txId, GetState.newBuilder() return newEventMessage(GET_STATE, channelId, txId, GetState.newBuilder()
.setKey(key) .setKey(key)
.build().toByteString