diff --git a/modules/binary/impl/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/binary/impl/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java index 50e85ff230899..bd2b05ee5f151 100644 --- a/modules/binary/impl/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java +++ b/modules/binary/impl/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java @@ -193,6 +193,9 @@ public BinaryObjectImpl(BinaryContext ctx, byte[] arr, int start) { /** {@inheritDoc} */ @Override public byte[] valueBytes(CacheObjectValueContext ctx) throws IgniteCheckedException { + if (valBytes == null) + valBytes = valueBytesFromArray(ctx); + return valBytes; } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java index fe45e12a1c8b1..21c199161017d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java @@ -220,11 +220,6 @@ public long timeout() { params = U.unmarshal(ctx, paramsBytes, ldr); fragmentDesc.prepareUnmarshal(ctx); - - if (qryTxEntries != null) { - for (QueryTxEntry e : qryTxEntries) - e.prepareUnmarshal(ctx, ldr); - } } /** {@inheritDoc} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java index 7961f589d83ef..c065b6317f32e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java @@ -20,12 +20,10 @@ import java.util.Collection; import java.util.Comparator; import java.util.function.Function; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheObjectContext; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; @@ -38,21 +36,17 @@ * @see ExecutionContext#transactionChanges(int, int[], Function, Comparator) * @see QueryStartRequest#queryTransactionEntries() */ -public class QueryTxEntry implements CalciteMessage { - /** Cache id. */ - @Order(0) - int cacheId; - +public class QueryTxEntry extends GridCacheIdMessage implements CalciteMessage { /** Entry key. */ - @Order(1) + @Order(0) KeyCacheObject key; /** Entry value. */ - @Order(2) + @Order(1) CacheObject val; /** Entry version. */ - @Order(3) + @Order(2) GridCacheVersion ver; /** @@ -75,11 +69,6 @@ public QueryTxEntry(int cacheId, KeyCacheObject key, CacheObject val, GridCacheV this.ver = ver; } - /** @return Cache id. */ - public int cacheId() { - return cacheId; - } - /** @return Entry key. */ public KeyCacheObject key() { return key; @@ -95,24 +84,9 @@ public GridCacheVersion version() { return ver; } - /** */ - public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - CacheObjectContext coctx = ctx.cacheContext(cacheId).cacheObjectContext(); - - key.prepareMarshal(coctx); - - if (val != null) - val.prepareMarshal(coctx); - } - - /** */ - public void prepareUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - CacheObjectContext coctx = ctx.cacheContext(cacheId).cacheObjectContext(); - - key.finishUnmarshal(coctx, ldr); - - if (val != null) - val.finishUnmarshal(coctx, ldr); + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; } /** {@inheritDoc} */ diff --git a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java index 893a1ccaa4878..d147a23cd608e 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java @@ -417,7 +417,8 @@ private void returnFalseIfWriteFailed(VariableElement field) throws Exception { return; } - returnFalseIfWriteFailed(write, field, "writer.writeObjectArray", getExpr, messageCollectionItemTypes(field, type)); + returnFalseIfWriteFailed( + write, field, "writer.writeObjectArray", getExpr, messageCollectionItemTypes(field, type), "msg"); return; } @@ -443,6 +444,7 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) { args.add(getExpr); args.add(messageCollectionItemTypes(field, type)); + args.add("msg"); if (compress) args.add("true"); // the value of the compress argument in the MessageWriter#writeMap method @@ -451,10 +453,10 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) { } else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.KeyCacheObject"))) - returnFalseIfWriteFailed(write, field, "writer.writeKeyCacheObject", getExpr); + returnFalseIfWriteFailed(write, field, "writer.writeKeyCacheObject", getExpr, "msg"); else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.CacheObject"))) - returnFalseIfWriteFailed(write, field, "writer.writeCacheObject", getExpr); + returnFalseIfWriteFailed(write, field, "writer.writeCacheObject", getExpr, "msg"); else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList"))) returnFalseIfWriteFailed(write, field, "writer.writeGridLongList", getExpr); @@ -470,7 +472,8 @@ else if (assignableFrom(type, type(MESSAGE_INTERFACE))) { } else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) - returnFalseIfWriteFailed(write, field, "writer.writeCollection", getExpr, messageCollectionItemTypes(field, type)); + returnFalseIfWriteFailed( + write, field, "writer.writeCollection", getExpr, messageCollectionItemTypes(field, type), "msg"); else if (enumType(env, type)) { Element element = env.getTypeUtils().asElement(type); @@ -625,7 +628,7 @@ private void returnFalseIfReadFailed(VariableElement field) throws Exception { } if (componentType.getKind() == TypeKind.ARRAY) { - returnFalseIfReadFailed(field, "reader.readObjectArray", messageCollectionItemTypes(field, type)); + returnFalseIfReadFailed(field, "reader.readObjectArray", messageCollectionItemTypes(field, type), "msg"); return; } @@ -633,7 +636,7 @@ private void returnFalseIfReadFailed(VariableElement field) throws Exception { if (componentType.getKind() == TypeKind.DECLARED) { Element componentElement = ((DeclaredType)componentType).asElement(); - returnFalseIfReadFailed(field, "reader.readObjectArray", messageCollectionItemTypes(field, type)); + returnFalseIfReadFailed(field, "reader.readObjectArray", messageCollectionItemTypes(field, type), "msg"); if (!"java.lang".equals(env.getElementUtils().getPackageOf(componentElement).getQualifiedName().toString())) { String importCls = ((QualifiedNameable)componentElement).getQualifiedName().toString(); @@ -669,6 +672,7 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) { List args = new ArrayList<>(); args.add(messageCollectionItemTypes(field, type)); + args.add("msg"); if (compress) args.add("true"); // the value of the compress argument in the MessageReader#readMap method @@ -677,10 +681,10 @@ else if (assignableFrom(erasedType(type), type(Map.class.getName()))) { } else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.KeyCacheObject"))) - returnFalseIfReadFailed(field, "reader.readKeyCacheObject"); + returnFalseIfReadFailed(field, "reader.readKeyCacheObject", "msg"); else if (assignableFrom(type, type("org.apache.ignite.internal.processors.cache.CacheObject"))) - returnFalseIfReadFailed(field, "reader.readCacheObject"); + returnFalseIfReadFailed(field, "reader.readCacheObject", "msg"); else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList"))) returnFalseIfReadFailed(field, "reader.readGridLongList"); @@ -690,13 +694,13 @@ else if (assignableFrom(type, type(MESSAGE_INTERFACE))) { throw new IllegalArgumentException(COMPRESSED_MSG_ERROR); if (compress) - returnFalseIfReadFailed(field, "reader.readMessage", "true"); + returnFalseIfReadFailed(field, "reader.readMessage", "msg", "true"); else - returnFalseIfReadFailed(field, "reader.readMessage"); + returnFalseIfReadFailed(field, "reader.readMessage", "msg"); } else if (assignableFrom(erasedType(type), type(Collection.class.getName()))) { - returnFalseIfReadFailed(field, "reader.readCollection", messageCollectionItemTypes(field, type)); + returnFalseIfReadFailed(field, "reader.readCollection", messageCollectionItemTypes(field, type), "msg"); } else if (enumType(env, type)) { String fieldPrefix = typeNameToFieldName(env.getTypeUtils().asElement(type).getSimpleName().toString()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ExchangeInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/ExchangeInfo.java index 62c0a0fce7a36..1cea6641ff8b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/ExchangeInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/ExchangeInfo.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; /** */ -public final class ExchangeInfo extends IgniteDiagnosticRequest.DiagnosticBaseInfo { +public final class ExchangeInfo implements IgniteDiagnosticRequest.DiagnosticBaseInfo { /** */ @Order(0) AffinityTopologyVersion topVer; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticRequest.java index 18276f939b49a..40ff44b57993f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticRequest.java @@ -143,11 +143,11 @@ public long futureId() { } /** */ - public abstract static class DiagnosticBaseInfo implements Message { + public interface DiagnosticBaseInfo extends Message { /** * @param other Another info of the same type. */ - public void merge(DiagnosticBaseInfo other) { + default void merge(DiagnosticBaseInfo other) { // No-op. } @@ -155,6 +155,6 @@ public void merge(DiagnosticBaseInfo other) { * @param sb String builder. * @param ctx Grid context. */ - public abstract void appendInfo(StringBuilder sb, GridKernalContext ctx); + void appendInfo(StringBuilder sb, GridKernalContext ctx); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/TxEntriesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/TxEntriesInfo.java index 150c8719d306a..806106358366a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/TxEntriesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/TxEntriesInfo.java @@ -20,21 +20,17 @@ import java.util.Collection; import java.util.HashSet; import java.util.Objects; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.typedef.internal.U; /** */ -public final class TxEntriesInfo extends IgniteDiagnosticRequest.DiagnosticBaseInfo { +public final class TxEntriesInfo extends GridCacheIdMessage implements IgniteDiagnosticRequest.DiagnosticBaseInfo { /** */ @Order(0) - int cacheId; - - /** */ - @Order(1) Collection keys; /** @@ -66,16 +62,6 @@ public TxEntriesInfo() { return; } - try { - for (KeyCacheObject key : keys) - key.finishUnmarshal(cctx.cacheObjectContext(), null); - } - catch (IgniteCheckedException e) { - ctx.cluster().diagnosticLog().error("Failed to unmarshal key: " + e, e); - - sb.append("Failed to unmarshal key: ").append(e).append(U.nl()); - } - sb.append("Cache entries [cacheId=").append(cacheId) .append(", cacheName=").append(cctx.name()).append("]: "); @@ -112,4 +98,9 @@ public TxEntriesInfo() { @Override public int hashCode() { return Objects.hash(getClass(), cacheId); } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/TxInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/TxInfo.java index 9158ead811c45..bf988350f30a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/TxInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/TxInfo.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; /** */ -public final class TxInfo extends IgniteDiagnosticRequest.DiagnosticBaseInfo { +public final class TxInfo implements IgniteDiagnosticRequest.DiagnosticBaseInfo { /** */ @Order(0) GridCacheVersion dhtVer; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index d32d179ae55eb..d05dfc7067787 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.UUID; import java.util.function.Function; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.direct.state.DirectMessageState; import org.apache.ignite.internal.direct.state.DirectMessageStateItem; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; @@ -57,6 +58,9 @@ public class DirectMessageReader implements MessageReader { /** Cache object processor. */ private final IgniteCacheObjectProcessor cacheObjProc; + + /** */ + private final GridKernalContext ctx; /** Buffer for reading. */ private ByteBuffer buf; @@ -68,13 +72,14 @@ public class DirectMessageReader implements MessageReader { * @param msgFactory Message factory. * @param cacheObjProc Cache object processor. */ - public DirectMessageReader(final MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) { + public DirectMessageReader(final MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc, GridKernalContext ctx) { this.msgFactory = msgFactory; this.cacheObjProc = cacheObjProc; + this.ctx = ctx; state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure() { @Override public StateItem apply() { - return new StateItem(msgFactory, cacheObjProc); + return new StateItem(msgFactory, cacheObjProc, ctx); } }); } @@ -327,7 +332,7 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Nullable @Override public T readMessage(boolean compress) { + @Nullable @Override public T readMessage(Message encMsg, boolean compress) { DirectByteBufferStream stream = state.item().stream; T msg; @@ -335,10 +340,11 @@ public ByteBuffer getBuffer() { if (compress) msg = readCompressedMessageAndDeserialize( stream, - tmpReader -> tmpReader.state.item().stream.readMessage(tmpReader) + tmpReader -> tmpReader.state.item().stream.readMessage(encMsg, tmpReader), + encMsg ); else { - msg = stream.readMessage(this); + msg = stream.readMessage(encMsg, this); lastRead = stream.lastFinished(); } @@ -347,10 +353,10 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public CacheObject readCacheObject() { + @Override public CacheObject readCacheObject(Message msg) { DirectByteBufferStream stream = state.item().stream; - CacheObject val = stream.readCacheObject(); + CacheObject val = stream.readCacheObject(msg); lastRead = stream.lastFinished(); @@ -358,10 +364,10 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public KeyCacheObject readKeyCacheObject() { + @Override public KeyCacheObject readKeyCacheObject(Message msg) { DirectByteBufferStream stream = state.item().stream; - KeyCacheObject key = stream.readKeyCacheObject(); + KeyCacheObject key = stream.readKeyCacheObject(msg); lastRead = stream.lastFinished(); @@ -380,21 +386,21 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public T[] readObjectArray(MessageArrayType type) { + @Override public T[] readObjectArray(MessageArrayType type, Message msg) { DirectByteBufferStream stream = state.item().stream; - T[] msg = stream.readObjectArray(type, this); + T[] arr = stream.readObjectArray(type, msg, this); lastRead = stream.lastFinished(); - return msg; + return arr; } /** {@inheritDoc} */ - @Override public > C readCollection(MessageCollectionType type) { + @Override public > C readCollection(MessageCollectionType type, Message msg) { DirectByteBufferStream stream = state.item().stream; - C col = stream.readCollection(type, this); + C col = stream.readCollection(type, msg, this); lastRead = stream.lastFinished(); @@ -402,7 +408,7 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public > M readMap(MessageMapType type, boolean compress) { + @Override public > M readMap(MessageMapType type, Message msg, boolean compress) { DirectByteBufferStream stream = state.item().stream; M map; @@ -410,10 +416,11 @@ public ByteBuffer getBuffer() { if (compress) map = readCompressedMessageAndDeserialize( stream, - tmpReader -> tmpReader.state.item().stream.readMap(type, tmpReader) + tmpReader -> tmpReader.state.item().stream.readMap(type, msg, tmpReader), + msg ); else { - map = stream.readMap(type, this); + map = stream.readMap(type, msg, this); lastRead = stream.lastFinished(); } @@ -464,8 +471,8 @@ public ByteBuffer getBuffer() { } /** @return Deserialized object. */ - private T readCompressedMessageAndDeserialize(DirectByteBufferStream stream, Function fun) { - Message msg = stream.readMessage(this); + private T readCompressedMessageAndDeserialize(DirectByteBufferStream stream, Function fun, Message encMsg) { + Message msg = stream.readMessage(encMsg, this); lastRead = stream.lastFinished(); @@ -486,7 +493,7 @@ private T readCompressedMessageAndDeserialize(DirectByteBufferStream stream, tmpBuf.put(uncompressed); tmpBuf.flip(); - DirectMessageReader tmpReader = new DirectMessageReader(msgFactory, cacheObjProc); + DirectMessageReader tmpReader = new DirectMessageReader(msgFactory, cacheObjProc, ctx); tmpReader.setBuffer(tmpBuf); @@ -510,8 +517,8 @@ private static class StateItem implements DirectMessageStateItem { * @param msgFactory Message factory. * @param cacheObjProc Cache object processor. */ - public StateItem(MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) { - stream = new DirectByteBufferStream(msgFactory, cacheObjProc); + public StateItem(MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc, GridKernalContext ctx) { + stream = new DirectByteBufferStream(msgFactory, cacheObjProc, ctx); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index c88e2f5cb2616..3686a2dfba8e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.UUID; import java.util.function.Consumer; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.direct.state.DirectMessageState; import org.apache.ignite.internal.direct.state.DirectMessageStateItem; import org.apache.ignite.internal.direct.stream.DirectByteBufferStream; @@ -30,6 +31,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -59,6 +61,12 @@ public class DirectMessageWriter implements MessageWriter { /** Message factory. */ private final MessageFactory msgFactory; + /** Cache object processor. */ + private final IgniteCacheObjectProcessor cacheObjProc; + + /** */ + private final GridKernalContext ctx; + /** Compression level. Used only for {@link CompressedMessage}. */ private final int compressionLvl; @@ -66,21 +74,28 @@ public class DirectMessageWriter implements MessageWriter { private ByteBuffer buf; /** @param msgFactory Message factory. */ - public DirectMessageWriter(final MessageFactory msgFactory) { - this(msgFactory, DFLT_NETWORK_COMPRESSION); + public DirectMessageWriter(final MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc, GridKernalContext ctx) { + this(msgFactory, cacheObjProc, ctx, DFLT_NETWORK_COMPRESSION); } /** * @param msgFactory Message factory. * @param compressionLvl Compression level. */ - public DirectMessageWriter(final MessageFactory msgFactory, final int compressionLvl) { + public DirectMessageWriter( + final MessageFactory msgFactory, + IgniteCacheObjectProcessor cacheObjProc, + GridKernalContext ctx, + final int compressionLvl) { this.msgFactory = msgFactory; this.compressionLvl = compressionLvl; + this.cacheObjProc = cacheObjProc; + this.ctx = ctx; + state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure() { @Override public StateItem apply() { - return new StateItem(msgFactory); + return new StateItem(msgFactory, cacheObjProc, ctx); } }); } @@ -334,19 +349,19 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public boolean writeCacheObject(@Nullable CacheObject obj) { + @Override public boolean writeCacheObject(@Nullable CacheObject obj, Message msg) { DirectByteBufferStream stream = state.item().stream; - stream.writeCacheObject(obj); + stream.writeCacheObject(msg, obj); return stream.lastFinished(); } /** {@inheritDoc} */ - @Override public boolean writeKeyCacheObject(KeyCacheObject obj) { + @Override public boolean writeKeyCacheObject(KeyCacheObject obj, Message msg) { DirectByteBufferStream stream = state.item().stream; - stream.writeKeyCacheObject(obj); + stream.writeKeyCacheObject(msg, obj); return stream.lastFinished(); } @@ -361,35 +376,35 @@ public ByteBuffer getBuffer() { } /** {@inheritDoc} */ - @Override public boolean writeObjectArray(T[] arr, MessageArrayType type) { + @Override public boolean writeObjectArray(T[] arr, MessageArrayType type, Message msg) { DirectByteBufferStream stream = state.item().stream; - stream.writeObjectArray(arr, type, this); + stream.writeObjectArray(arr, type, msg, this); return stream.lastFinished(); } /** {@inheritDoc} */ - @Override public boolean writeCollection(Collection col, MessageCollectionType type) { + @Override public boolean writeCollection(Collection col, MessageCollectionType type, Message msg) { DirectByteBufferStream stream = state.item().stream; - stream.writeCollection(col, type, this); + stream.writeCollection(col, type, msg, this); return stream.lastFinished(); } /** {@inheritDoc} */ - @Override public boolean writeMap(Map map, MessageMapType type, boolean compress) { + @Override public boolean writeMap(Map map, MessageMapType type, Message msg, boolean compress) { DirectByteBufferStream stream = state.item().stream; if (compress) writeCompressedMessage( - tmpWriter -> tmpWriter.state.item().stream.writeMap(map, type, tmpWriter), + tmpWriter -> tmpWriter.state.item().stream.writeMap(map, type, msg, tmpWriter), map == null, stream ); else - stream.writeMap(map, type, this); + stream.writeMap(map, type, msg, this); return stream.lastFinished(); } @@ -456,7 +471,7 @@ private void writeCompressedMessage(Consumer consumer, bool if (!stream.serializeFinished()) { ByteBuffer tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY); - DirectMessageWriter tmpWriter = new DirectMessageWriter(msgFactory, compressionLvl); + DirectMessageWriter tmpWriter = new DirectMessageWriter(msgFactory, cacheObjProc, ctx, compressionLvl); tmpWriter.setBuffer(tmpBuf); @@ -509,8 +524,8 @@ private static class StateItem implements DirectMessageStateItem { private boolean hdrWritten; /** */ - public StateItem(MessageFactory msgFactory) { - stream = new DirectByteBufferStream(msgFactory); + public StateItem(MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc, GridKernalContext ctx) { + stream = new DirectByteBufferStream(msgFactory, cacheObjProc, ctx); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java index 053732880b6ec..16d2f41855feb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java @@ -32,14 +32,20 @@ import java.util.function.Supplier; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.managers.communication.CompressedMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -51,6 +57,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageType; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.plugin.extensions.communication.SkipCacheObjectsMarshallingMessage; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN; @@ -90,6 +97,13 @@ public class DirectByteBufferStream { /** */ private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0]; + /** */ + private static final ThreadLocal> coCtxs = new ThreadLocal<>(); + + /** */ + private static final CacheObjectContext NULL_CACHE_CTX = + new CacheObjectContext(null, null, null, false, false, false, false); + /** */ private static final ArrayCreator BYTE_ARR_CREATOR = new ArrayCreator() { @Override public byte[] create(int len) { @@ -350,17 +364,11 @@ public class DirectByteBufferStream { /** */ private boolean serializeFinished; - /** - * Constructror for stream used for writing messages. - * - * @param msgFactory Message factory. - */ - public DirectByteBufferStream(MessageFactory msgFactory) { - this.msgFactory = msgFactory; + /** */ + final GridKernalContext ctx; - // Is not used while writing messages. - cacheObjProc = null; - } + /** */ + private final CacheObjectContext fakeCacheObjectCtx; /** * Constructror for stream used for reading messages. @@ -368,9 +376,12 @@ public DirectByteBufferStream(MessageFactory msgFactory) { * @param msgFactory Message factory. * @param cacheObjProc Cache object processor. */ - public DirectByteBufferStream(MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc) { + public DirectByteBufferStream(MessageFactory msgFactory, IgniteCacheObjectProcessor cacheObjProc, GridKernalContext ctx) { this.msgFactory = msgFactory; this.cacheObjProc = cacheObjProc; + this.ctx = ctx; + + fakeCacheObjectCtx = new CacheObjectContext(ctx, null, null, false, false, false, false); } /** @@ -838,7 +849,7 @@ public void writeAffinityTopologyVersion(AffinityTopologyVersion val) { /** * @param obj Cache object. */ - public void writeCacheObject(CacheObject obj) { + public void writeCacheObject(Message msg, CacheObject obj) { try { if (obj != null) { switch (cacheObjState) { @@ -851,7 +862,11 @@ public void writeCacheObject(CacheObject obj) { cacheObjState++; case 1: - writeByteArray(obj.valueBytes(null)); + CacheObjectContext ctx = setContext(msg); + + writeByteArray(obj.valueBytes(context())); + + removeContext(ctx); if (!lastFinished) return; @@ -870,7 +885,7 @@ public void writeCacheObject(CacheObject obj) { /** * @param keyObj Key cache object. */ - public void writeKeyCacheObject(KeyCacheObject keyObj) { + public void writeKeyCacheObject(Message msg, KeyCacheObject keyObj) { try { if (keyObj != null) { switch (cacheObjState) { @@ -883,7 +898,11 @@ public void writeKeyCacheObject(KeyCacheObject keyObj) { cacheObjState++; case 1: - writeByteArray(keyObj.valueBytes(null)); + CacheObjectContext ctx = setContext(msg); + + writeByteArray(keyObj.valueBytes(context())); + + removeContext(ctx); if (!lastFinished) return; @@ -923,8 +942,13 @@ public void writeGridLongList(@Nullable GridLongList val) { */ public void writeMessage(Message msg, MessageWriter writer) { if (msg != null) { - if (buf.hasRemaining()) + if (buf.hasRemaining()) { + CacheObjectContext ctx = setContext(msg); + nestedWrite(writer, () -> msgFactory.serializer(msg.directType()).writeTo(msg, writer)); + + removeContext(ctx); + } else lastFinished = false; } @@ -937,7 +961,7 @@ public void writeMessage(Message msg, MessageWriter writer) { * @param type Type. * @param writer Writer. */ - public void writeObjectArray(T[] arr, MessageArrayType type, MessageWriter writer) { + public void writeObjectArray(T[] arr, MessageArrayType type, Message msg, MessageWriter writer) { if (arr != null) { int len = arr.length; @@ -954,7 +978,7 @@ public void writeObjectArray(T[] arr, MessageArrayType type, MessageWriter w if (arrCur == NULL) arrCur = arr[arrPos++]; - write(type.valueType(), arrCur, writer); + write(type.valueType(), msg, arrCur, writer); if (!lastFinished) return; @@ -971,12 +995,13 @@ public void writeObjectArray(T[] arr, MessageArrayType type, MessageWriter w /** * @param col Collection. * @param type Type. + * @param msg Message. * @param writer Writer. */ - public void writeCollection(Collection col, MessageCollectionType type, MessageWriter writer) { + public void writeCollection(Collection col, MessageCollectionType type, Message msg, MessageWriter writer) { if (col != null) { if (col instanceof List && col instanceof RandomAccess) - writeRandomAccessList((List)col, type, writer); + writeRandomAccessList((List)col, type, msg, writer); else { if (it == null) { writeInt(col.size()); @@ -991,7 +1016,7 @@ public void writeCollection(Collection col, MessageCollectionType type, M if (cur == NULL) cur = it.next(); - write(type.valueType(), cur, writer); + write(type.valueType(), msg, cur, writer); if (!lastFinished) return; @@ -1011,7 +1036,7 @@ public void writeCollection(Collection col, MessageCollectionType type, M * @param type Type. * @param writer Writer. */ - private void writeRandomAccessList(List list, MessageCollectionType type, MessageWriter writer) { + private void writeRandomAccessList(List list, MessageCollectionType type, Message msg, MessageWriter writer) { assert list instanceof RandomAccess; int size = list.size(); @@ -1029,7 +1054,7 @@ private void writeRandomAccessList(List list, MessageCollectionType type, if (arrCur == NULL) arrCur = list.get(arrPos++); - write(type.valueType(), arrCur, writer); + write(type.valueType(), msg, arrCur, writer); if (!lastFinished) return; @@ -1045,7 +1070,7 @@ private void writeRandomAccessList(List list, MessageCollectionType type, * @param type Type. * @param writer Writer. */ - public void writeMap(Map map, MessageMapType type, MessageWriter writer) { + public void writeMap(Map map, MessageMapType type, Message msg, MessageWriter writer) { if (map != null) { if (mapIt == null) { writeInt(map.size()); @@ -1065,7 +1090,7 @@ public void writeMap(Map map, MessageMapType type, MessageWriter wr e = (Map.Entry)mapCur; if (!keyDone) { - write(type.keyType(), e.getKey(), writer); + write(type.keyType(), msg, e.getKey(), writer); if (!lastFinished) return; @@ -1073,7 +1098,7 @@ public void writeMap(Map map, MessageMapType type, MessageWriter wr keyDone = true; } - write(type.valueType(), e.getValue(), writer); + write(type.valueType(), msg, e.getValue(), writer); if (!lastFinished) return; @@ -1473,7 +1498,7 @@ public AffinityTopologyVersion readAffinityTopologyVersion() { /** * @return Value. */ - public KeyCacheObject readKeyCacheObject() { + public KeyCacheObject readKeyCacheObject(Message msg) { switch (cacheObjState) { case 0: cacheObjType = readByte(); @@ -1501,7 +1526,11 @@ public KeyCacheObject readKeyCacheObject() { } try { - KeyCacheObject key = cacheObjProc.toKeyCacheObject(null, cacheObjType, cacheObjArr); + CacheObjectContext ctx = setContext(msg); + + KeyCacheObject key = cacheObjProc.toKeyCacheObject(context(), cacheObjType, cacheObjArr); + + removeContext(ctx); if (keyCacheObjPart != -1) key.partition(keyCacheObjPart); @@ -1516,7 +1545,7 @@ public KeyCacheObject readKeyCacheObject() { /** * @return Value. */ - public CacheObject readCacheObject() { + public CacheObject readCacheObject(Message msg) { switch (cacheObjState) { case 0: cacheObjType = readByte(); @@ -1535,7 +1564,13 @@ public CacheObject readCacheObject() { cacheObjState = 0; } - return cacheObjProc.toCacheObject(null, cacheObjType, cacheObjArr); + CacheObjectContext ctx = setContext(msg); + + CacheObject res = cacheObjProc.toCacheObject(context(), cacheObjType, cacheObjArr); + + removeContext(ctx); + + return res; } /** @@ -1549,9 +1584,10 @@ public GridLongList readGridLongList() { /** * @param reader Reader. + * @param encMsg Message. * @return Message. */ - public T readMessage(MessageReader reader) { + public T readMessage(Message encMsg, MessageReader reader) { if (!msgTypeDone) { if (buf.remaining() < Message.DIRECT_TYPE_SIZE) { lastFinished = false; @@ -1570,7 +1606,11 @@ public T readMessage(MessageReader reader) { try { reader.beforeNestedRead(); + CacheObjectContext ctx = setContext(encMsg); + lastFinished = msgFactory.serializer(msg.directType()).readFrom(msg, reader); + + removeContext(ctx); } finally { reader.afterNestedRead(lastFinished); @@ -1596,7 +1636,7 @@ public T readMessage(MessageReader reader) { * @param reader Reader. * @return Array. */ - public T[] readObjectArray(MessageArrayType type, MessageReader reader) { + public T[] readObjectArray(MessageArrayType type, Message msg, MessageReader reader) { if (readSize == -1) { int size = readInt(); @@ -1611,7 +1651,7 @@ public T[] readObjectArray(MessageArrayType type, MessageReader reader) { objArr = type.clazz() != null ? (Object[])Array.newInstance(type.clazz(), readSize) : new Object[readSize]; for (int i = readItems; i < readSize; i++) { - Object item = read(type.valueType(), reader); + Object item = read(type.valueType(), msg, reader); if (!lastFinished) return null; @@ -1640,7 +1680,7 @@ public T[] readObjectArray(MessageArrayType type, MessageReader reader) { * @param reader Reader. * @return {@link ArrayList} or a {@link HashSet}. */ - public > C readCollection(MessageCollectionType type, MessageReader reader) { + public > C readCollection(MessageCollectionType type, Message msg, MessageReader reader) { if (readSize == -1) { int size = readInt(); @@ -1655,7 +1695,7 @@ public > C readCollection(MessageCollectionType type, Me col = type.set() ? U.newHashSet(readSize) : new ArrayList<>(readSize); for (int i = readItems; i < readSize; i++) { - Object item = read(type.valueType(), reader); + Object item = read(type.valueType(), msg, reader); if (!lastFinished) return null; @@ -1682,7 +1722,7 @@ public > C readCollection(MessageCollectionType type, Me * @param reader Reader. * @return Map. */ - public > M readMap(MessageMapType type, MessageReader reader) { + public > M readMap(MessageMapType type, Message msg, MessageReader reader) { if (readSize == -1) { int size = readInt(); @@ -1698,7 +1738,7 @@ public > C readCollection(MessageCollectionType type, Me for (int i = readItems; i < readSize; i++) { if (!keyDone) { - Object key = read(type.keyType(), reader); + Object key = read(type.keyType(), msg, reader); if (!lastFinished) return null; @@ -1707,7 +1747,7 @@ public > C readCollection(MessageCollectionType type, Me keyDone = true; } - Object val = read(type.valueType(), reader); + Object val = read(type.valueType(), msg, reader); if (!lastFinished) return null; @@ -1988,7 +2028,9 @@ T readArrayLE(ArrayCreator creator, int typeSize, int lenShift, long off) * @param val Value. * @param writer Writer. */ - protected void write(MessageType type, Object val, MessageWriter writer) { + protected void write(MessageType type, Message msg, Object val, MessageWriter writer) { + CacheObjectContext ctx = setContext(msg); + switch (type.type()) { case BYTE: writeByte((Byte)val); @@ -2096,12 +2138,12 @@ protected void write(MessageType type, Object val, MessageWriter writer) break; case KEY_CACHE_OBJECT: - writeKeyCacheObject((KeyCacheObject)val); + writeKeyCacheObject(msg, (KeyCacheObject)val); break; case CACHE_OBJECT: - writeCacheObject((CacheObject)val); + writeCacheObject(msg, (CacheObject)val); break; @@ -2111,17 +2153,17 @@ protected void write(MessageType type, Object val, MessageWriter writer) break; case MAP: - nestedWrite(writer, () -> writer.writeMap((Map)val, (MessageMapType)type)); + nestedWrite(writer, () -> writer.writeMap((Map)val, (MessageMapType)type, msg)); break; case COLLECTION: - nestedWrite(writer, () -> writer.writeCollection((Collection)val, (MessageCollectionType)type)); + nestedWrite(writer, () -> writer.writeCollection((Collection)val, (MessageCollectionType)type, msg)); break; case ARRAY: - nestedWrite(writer, () -> writer.writeObjectArray((V[])val, (MessageArrayType)type)); + nestedWrite(writer, () -> writer.writeObjectArray((V[])val, (MessageArrayType)type, msg)); break; @@ -2133,6 +2175,8 @@ protected void write(MessageType type, Object val, MessageWriter writer) default: throw new IllegalArgumentException("Unknown type: " + type); } + + removeContext(ctx); } /** Performs a nested write with proper writer state enter/exit handling. */ @@ -2152,94 +2196,101 @@ private void nestedWrite(MessageWriter writer, BooleanSupplier s) { * @param reader Reader. * @return Value. */ - protected Object read(MessageType type, MessageReader reader) { - switch (type.type()) { - case BYTE: - return readByte(); + protected Object read(MessageType type, Message msg, MessageReader reader) { + CacheObjectContext ctx = setContext(msg); + + try { + switch (type.type()) { + case BYTE: + return readByte(); - case SHORT: - return readShort(); + case SHORT: + return readShort(); - case INT: - return readInt(); + case INT: + return readInt(); - case LONG: - return readLong(); + case LONG: + return readLong(); - case FLOAT: - return readFloat(); + case FLOAT: + return readFloat(); - case DOUBLE: - return readDouble(); + case DOUBLE: + return readDouble(); - case CHAR: - return readChar(); + case CHAR: + return readChar(); - case BOOLEAN: - return readBoolean(); + case BOOLEAN: + return readBoolean(); - case BYTE_ARR: - return readByteArray(); + case BYTE_ARR: + return readByteArray(); - case SHORT_ARR: - return readShortArray(); + case SHORT_ARR: + return readShortArray(); - case INT_ARR: - return readIntArray(); + case INT_ARR: + return readIntArray(); - case LONG_ARR: - return readLongArray(); + case LONG_ARR: + return readLongArray(); - case FLOAT_ARR: - return readFloatArray(); + case FLOAT_ARR: + return readFloatArray(); - case DOUBLE_ARR: - return readDoubleArray(); + case DOUBLE_ARR: + return readDoubleArray(); - case CHAR_ARR: - return readCharArray(); + case CHAR_ARR: + return readCharArray(); - case BOOLEAN_ARR: - return readBooleanArray(); + case BOOLEAN_ARR: + return readBooleanArray(); - case STRING: - return readString(); + case STRING: + return readString(); - case BIT_SET: - return readBitSet(); + case BIT_SET: + return readBitSet(); - case UUID: - return readUuid(); + case UUID: + return readUuid(); - case IGNITE_UUID: - return readIgniteUuid(); + case IGNITE_UUID: + return readIgniteUuid(); - case AFFINITY_TOPOLOGY_VERSION: - return readAffinityTopologyVersion(); + case AFFINITY_TOPOLOGY_VERSION: + return readAffinityTopologyVersion(); - case KEY_CACHE_OBJECT: - return readKeyCacheObject(); + case KEY_CACHE_OBJECT: + return readKeyCacheObject(msg); - case CACHE_OBJECT: - return readCacheObject(); + case CACHE_OBJECT: + return readCacheObject(msg); - case GRID_LONG_LIST: - return readGridLongList(); + case GRID_LONG_LIST: + return readGridLongList(); - case MAP: - return nestedRead(reader, () -> reader.readMap((MessageMapType)type)); + case MAP: + return nestedRead(reader, () -> reader.readMap((MessageMapType)type, msg)); - case COLLECTION: - return nestedRead(reader, () -> reader.readCollection((MessageCollectionType)type)); + case COLLECTION: + return nestedRead(reader, () -> reader.readCollection((MessageCollectionType)type, msg)); - case ARRAY: - return nestedRead(reader, () -> reader.readObjectArray((MessageArrayType)type)); + case ARRAY: + return nestedRead(reader, () -> reader.readObjectArray((MessageArrayType)type, msg)); - case MSG: - return readMessage(reader); + case MSG: + return readMessage(msg, reader); - default: - throw new IllegalArgumentException("Unknown type: " + type); + default: + throw new IllegalArgumentException("Unknown type: " + type); + } + } + finally { + removeContext(ctx); } } @@ -2303,6 +2354,62 @@ private void readUuidRaw() { } } + /** */ + private CacheObjectContext setContext(Message msg) { + CacheObjectContext coCtx = null; + + if (msg instanceof GridCacheIdMessage) + coCtx = resolveContext(((GridCacheIdMessage)msg).cacheId()); + + if (msg instanceof GridCacheGroupIdMessage) + coCtx = resolveContext(((GridCacheGroupIdMessage)msg).groupId()); + + boolean skipMarsh = msg instanceof SkipCacheObjectsMarshallingMessage; + + if (coCtx != null || skipMarsh) { + if (coCtxs.get() == null) + coCtxs.set(new ArrayList<>()); + + coCtxs.get().add(skipMarsh ? NULL_CACHE_CTX : coCtx); + + return coCtx; + } + + return null; + } + + /** */ + private CacheObjectContext resolveContext(int cacheId) { + if (cacheId != CU.UNDEFINED_CACHE_ID) { + GridCacheContext gcCtx = ctx.cache().context().cacheContext(cacheId); + + if (gcCtx == null) + return fakeCacheObjectCtx; // Cache was removed. Avoiding failure. Should be handled by message handler. + else + return gcCtx.cacheObjectContext(); + } + else + return null; + } + + /** */ + private void removeContext(CacheObjectContext coCtx) { + if (coCtx != null) + coCtxs.get().remove(coCtx); + } + + /** */ + private CacheObjectContext context() { + List list = coCtxs.get(); + + CacheObjectContext cand = list.get(list.size() - 1); + + if (cand == NULL_CACHE_CTX) + return null; + + return cand; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DirectByteBufferStream.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 7a164d1b6d5e3..18f1596b39529 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -433,11 +433,11 @@ public void resetMetrics() { else { formatter = new MessageFormatter() { @Override public MessageWriter writer(MessageFactory msgFactory) { - return new DirectMessageWriter(msgFactory, ctx.config().getNetworkCompressionLevel()); + return new DirectMessageWriter(msgFactory, ctx.cacheObjects(), ctx, ctx.config().getNetworkCompressionLevel()); } @Override public MessageReader reader(MessageFactory msgFactory) { - return new DirectMessageReader(msgFactory, ctx.cacheObjects()); + return new DirectMessageReader(msgFactory, ctx.cacheObjects(), ctx); } }; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicate.java index 36312a1591135..d8c8e7083ee62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicate.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicate.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; @@ -25,19 +24,6 @@ * */ public interface CacheEntryPredicate extends IgnitePredicate, Message { - /** - * @param ctx Context. - * @throws IgniteCheckedException If failed. - */ - public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException; - - /** - * @param ctx Context. - * @param ldr Class loader. - * @throws IgniteCheckedException If failed. - */ - public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException; - /** * @param locked Entry locked */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java index 0c30dc615aa66..d41a3da341e63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache; import java.util.Objects; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -118,17 +117,4 @@ public CacheEntryPredicateType type() { throw new IllegalStateException("Unknown cache entry predicate type: " + type); } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { - if (type == CacheEntryPredicateType.VALUE) - val.finishUnmarshal(ctx.cacheObjectContext(), ldr); - } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { - if (type == CacheEntryPredicateType.VALUE) - val.prepareMarshal(ctx.cacheObjectContext()); - } - } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java index 1276c6b63c82c..4490be4c1dfc9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java @@ -19,7 +19,6 @@ import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -111,19 +110,6 @@ public CacheObject result() { return ErrorMessage.error(errMsg); } - /** - * @param ctx Cache context. - * @throws IgniteCheckedException If failed. - */ - public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { - key.prepareMarshal(ctx.cacheObjectContext()); - - assert unprepareRes == null : "marshalResult() was not called for the result: " + this; - - if (res != null) - res.prepareMarshal(ctx.cacheObjectContext()); - } - /** * Converts the entry processor unprepared result to a cache object instance. * @@ -139,19 +125,6 @@ public void marshalResult(GridCacheContext ctx) { } } - /** - * @param ctx Cache context. - * @param ldr Class loader. - * @throws IgniteCheckedException If failed. - */ - public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { - key.finishUnmarshal(ctx.cacheObjectContext(), ldr); - - if (res != null) - res.finishUnmarshal(ctx.cacheObjectContext(), ldr); - } - - /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheInvokeDirectResult.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java index d8d710f62368f..96c57d4cf9462 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java @@ -23,12 +23,11 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.extensions.communication.Message; /** * Entry information that gets passed over wire. */ -public class GridCacheEntryInfo implements Message { +public class GridCacheEntryInfo extends GridCacheIdMessage { /** */ private static final int SIZE_OVERHEAD = 3 * 8 /* reference */ + 4 /* int */ + 2 * 8 /* long */ + 32 /* version */; @@ -37,24 +36,20 @@ public class GridCacheEntryInfo implements Message { @GridToStringInclude KeyCacheObject key; - /** Cache ID. */ - @Order(1) - int cacheId; - /** Cache value. */ - @Order(2) + @Order(1) CacheObject val; /** Time to live. */ - @Order(3) + @Order(2) long ttl; /** Expiration time. */ - @Order(4) + @Order(3) long expireTime; /** Entry version. */ - @Order(5) + @Order(4) GridCacheVersion ver; /** New flag. */ @@ -63,13 +58,6 @@ public class GridCacheEntryInfo implements Message { /** Deleted flag. */ private boolean deleted; - /** - * @return Cache ID. - */ - public int cacheId() { - return cacheId; - } - /** * @param cacheId Cache ID. */ @@ -175,16 +163,6 @@ public void setDeleted(boolean deleted) { this.deleted = deleted; } - /** - * @param ctx Context. - * @param ldr Loader. - * @throws IgniteCheckedException If failed. - */ - public void unmarshalValue(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { - if (val != null) - val.finishUnmarshal(ctx.cacheObjectContext(), ldr); - } - /** * @param ctx Cache object context. * @return Marshalled size. @@ -216,11 +194,6 @@ public void marshal(GridCacheContext ctx) throws IgniteCheckedException { public void marshal(CacheObjectContext ctx) throws IgniteCheckedException { assert key != null; - key.prepareMarshal(ctx); - - if (val != null) - val.prepareMarshal(ctx); - if (expireTime == 0) expireTime = -1; else { @@ -250,11 +223,6 @@ public void unmarshal(GridCacheContext ctx, ClassLoader clsLdr) throws IgniteChe * @throws IgniteCheckedException If unmarshalling failed. */ public void unmarshal(CacheObjectContext ctx, ClassLoader clsLdr) throws IgniteCheckedException { - key.finishUnmarshal(ctx, clsLdr); - - if (val != null) - val.finishUnmarshal(ctx, clsLdr); - long remaining = expireTime; expireTime = remaining < 0 ? 0 : U.currentTimeMillis() + remaining; @@ -264,6 +232,11 @@ public void unmarshal(CacheObjectContext ctx, ClassLoader clsLdr) throws IgniteC expireTime = 0; } + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheEntryInfo.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index ec670da2d1cbf..fb79d662571f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -269,6 +269,7 @@ public GridDeploymentInfoBean deployInfo() { * @param ctx Cache context. * @throws IgniteCheckedException If failed. */ + @Deprecated() // TODO, to be removed public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { // No-op. } @@ -281,6 +282,7 @@ public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteChecke * @param ldr Class loader. * @throws IgniteCheckedException If failed. */ + @Deprecated() // TODO, to be removed public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { // No-op. } @@ -306,11 +308,8 @@ protected final void marshalInfo(GridCacheEntryInfo info, CacheObject val = info.value(); - if (val != null) { - val.finishUnmarshal(cacheObjCtx, ctx.deploy().globalLoader()); - + if (val != null) prepareObject(val.value(cacheObjCtx, false), ctx); - } } } } @@ -535,8 +534,6 @@ public final void prepareMarshalCacheObjects(@Nullable List col, - GridCacheContext ctx, - ClassLoader ldr - ) throws IgniteCheckedException { - if (col == null) - return; - - int size = col.size(); - - for (int i = 0; i < size; i++) { - CacheObject obj = col.get(i); - - if (obj != null) - obj.finishUnmarshal(ctx.cacheObjectContext(), ldr); - } - } - - /** - * @param col Collection. - * @param ctx Context. - * @param ldr Class loader. - * @throws IgniteCheckedException If failed. - */ - protected final void finishUnmarshalCacheObjects(@Nullable Collection col, - GridCacheContext ctx, - ClassLoader ldr - ) throws IgniteCheckedException { - if (col == null) - return; - - for (CacheObject obj : col) { - if (obj != null) - obj.finishUnmarshal(ctx.cacheObjectContext(), ldr); - } - } - + /** * @param byteCol Collection to unmarshal. * @param ctx Context. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java index 7c78a10a5cf48..86a64fe6ea370 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java @@ -31,13 +31,12 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** * Return value for cases where both, value and success flag need to be returned. */ -public class GridCacheReturn implements Message { +public class GridCacheReturn extends GridCacheIdMessage { /** Value. */ @GridToStringInclude(sensitive = true) private volatile Object v; @@ -61,10 +60,6 @@ public class GridCacheReturn implements Message { /** Local result flag, if non local then do not need unwrap cache objects. */ private boolean loc; - /** Cache Id. */ - @Order(4) - int cacheId; - /** * Empty constructor. */ @@ -285,13 +280,6 @@ else if (err instanceof UnregisteredBinaryTypeException) } } - /** - * @return Cache ID. - */ - public int cacheId() { - return cacheId; - } - /** * @param other Other result to merge with. */ @@ -334,14 +322,6 @@ public void marshalResult(GridCacheContext ctx) { */ public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { assert !loc; - - if (cacheObj != null) - cacheObj.prepareMarshal(ctx.cacheObjectContext()); - - if (invokeRes && invokeResCol != null) { - for (CacheInvokeDirectResult res : invokeResCol) - res.prepareMarshal(ctx); - } } /** @@ -352,16 +332,10 @@ public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { loc = true; - if (cacheObj != null) { - cacheObj.finishUnmarshal(ctx.cacheObjectContext(), ldr); - + if (cacheObj != null) v = ctx.cacheObjectContext().unwrapBinaryIfNeeded(cacheObj, true, false, ldr); - } if (invokeRes && invokeResCol != null) { - for (CacheInvokeDirectResult res : invokeResCol) - res.finishUnmarshal(ctx, ldr); - Map map0 = U.newHashMap(invokeResCol.size()); for (CacheInvokeDirectResult res : invokeResCol) { @@ -376,6 +350,10 @@ public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws Ignite } } + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } /** {@inheritDoc} */ @Override public String toString() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java index c83e732f6181a..2aaf3e1bd12b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java @@ -170,12 +170,6 @@ public List nearVersions() { @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - finishUnmarshalCacheObjects(keys, cctx, ldr); - - finishUnmarshalCacheObjects(nearKeys, cctx, ldr); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index b02d75024fec5..04fec5ae42937 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -359,10 +359,6 @@ public long timeout() { /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - finishUnmarshalCacheObjects(keys, cctx, ldr); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java index cbf10745a7a19..877186405d499 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java @@ -170,8 +170,6 @@ protected int valuesSize() { /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - - finishUnmarshalCacheObjects(vals, ctx.cacheContext(cacheId), ldr); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index bd427bd47b56f..10ff9af976149 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -26,7 +26,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -382,12 +381,6 @@ public void applicationAttributes(Map appAttrs) { marshalTx(reads, ctx); if (dhtVers != null && dhtVerKeys == null) { - for (IgniteTxKey key : dhtVers.keySet()) { - GridCacheContext cctx = ctx.cacheContext(key.cacheId()); - - key.prepareMarshal(cctx); - } - dhtVerKeys = dhtVers.keySet(); dhtVerVals = dhtVers.values(); } @@ -415,8 +408,6 @@ public void applicationAttributes(Map appAttrs) { while (keyIt.hasNext()) { IgniteTxKey key = keyIt.next(); - key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr); - dhtVers.put(key, verIt.next()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridNearUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridNearUnlockRequest.java index 2c55bfd642547..f5565895878f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridNearUnlockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridNearUnlockRequest.java @@ -86,8 +86,6 @@ public void addKey(KeyCacheObject key) { /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - - finishUnmarshalCacheObjects(keys, ctx.cacheContext(cacheId), ldr); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 7277db3ccd698..082977d06c096 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -1214,9 +1214,9 @@ protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSing res0 = info; } else if (req.needVersion()) - res0 = new CacheVersionedValue(info.value(), info.version()); + res0 = new CacheVersionedValue(info.value(), info.version(), req.cacheId); else - res0 = new CacheVersionedValue(info.value(), null); + res0 = new CacheVersionedValue(info.value(), null, req.cacheId); } res = new GridNearSingleGetResponse( diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index 315997cecd8a6..d60475ac074ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -20,15 +20,12 @@ import java.util.BitSet; import java.util.Map; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockRequest; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLeanMap; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -50,36 +47,27 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { /** Owner mapped version, if any. */ @GridToStringInclude - private Map owned; - - /** Array of keys from {@link #owned}. Used during marshalling and unmarshalling. */ @Order(2) - @GridToStringExclude - KeyCacheObject[] ownedKeys; - - /** Array of values from {@link #owned}. Used during marshalling and unmarshalling. */ - @Order(3) - @GridToStringExclude - GridCacheVersion[] ownedValues; + Map owned; /** Topology version. */ - @Order(4) + @Order(3) AffinityTopologyVersion topVer; /** Task name hash. */ - @Order(5) + @Order(4) int taskNameHash; /** Indexes of keys needed to be preloaded. */ - @Order(6) + @Order(5) BitSet preloadKeys; /** TTL for read operation. */ - @Order(7) + @Order(6) long accessTtl; /** Transaction label. */ - @Order(8) + @Order(7) String txLbl; /** @@ -257,41 +245,10 @@ public long accessTtl() { } /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (owned != null && ownedKeys == null) { - ownedKeys = new KeyCacheObject[owned.size()]; - ownedValues = new GridCacheVersion[ownedKeys.length]; - - int i = 0; - - for (Map.Entry entry : owned.entrySet()) { - ownedKeys[i] = entry.getKey(); - ownedValues[i] = entry.getValue(); - i++; - } - } + @Override public short directType() { + return 30; } - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (ownedKeys != null) { - owned = new GridLeanMap<>(ownedKeys.length); - - for (int i = 0; i < ownedKeys.length; i++) { - ownedKeys[i].finishUnmarshal(ctx.cacheContext(cacheId).cacheObjectContext(), ldr); - owned.put(ownedKeys[i], ownedValues[i]); - } - - ownedKeys = null; - ownedValues = null; - } - } - - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtLockRequest.class, this, "super", super.toString()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index f8364dca6adb1..d07ee91af3fcc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -331,8 +331,6 @@ public boolean skipCompletedVersion() { for (IgniteTxKey key: ownedKeys) { GridCacheContext cctx = ctx.cacheContext(key.cacheId()); - key.prepareMarshal(cctx); - if (addDepInfo) prepareObject(key, cctx); } @@ -360,11 +358,8 @@ public boolean skipCompletedVersion() { GridCacheContext cacheCtx = ctx.cacheContext(key.cacheId()); - if (cacheCtx != null) { - key.finishUnmarshal(cacheCtx, ldr); - + if (cacheCtx != null) owned.put(key, valIter.next()); - } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java index 921f52790252b..ca95fab5b3bcf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java @@ -173,16 +173,6 @@ public void addPreloadEntry(GridCacheEntryInfo info) { @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - if (nearEvicted != null) { - for (IgniteTxKey key : nearEvicted) { - GridCacheContext cctx = ctx.cacheContext(key.cacheId()); - - // Can be null if client near cache was removed, in this case assume do not need prepareMarshal. - if (cctx != null) - key.prepareMarshal(cctx); - } - } - if (preloadEntries != null) { for (GridCacheEntryInfo info : preloadEntries) { GridCacheContext cctx = ctx.cacheContext(info.cacheId()); @@ -196,14 +186,6 @@ public void addPreloadEntry(GridCacheEntryInfo info) { @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (nearEvicted != null) { - for (IgniteTxKey key : nearEvicted) { - GridCacheContext cctx = ctx.cacheContext(key.cacheId()); - - key.finishUnmarshal(cctx, ldr); - } - } - if (preloadEntries != null) { for (GridCacheEntryInfo info : preloadEntries) { GridCacheContext cctx = ctx.cacheContext(info.cacheId()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java index 8c5fc051a69e7..528fc1076da2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java @@ -80,8 +80,6 @@ public void addNearKey(KeyCacheObject key) /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - - finishUnmarshalCacheObjects(nearKeys, ctx.cacheContext(cacheId), ldr); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java index f71e035bdac94..af44a5fcb9dd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java @@ -182,9 +182,6 @@ public long futureId() { /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - - if (errs != null) - errs.finishUnmarshal(this, ctx.cacheContext(cacheId), ldr); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java index 20aad2f7dec8b..e589692361f8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java @@ -23,7 +23,6 @@ import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -314,49 +313,11 @@ private void near(boolean near) { /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - prepareMarshalObject(key, cctx); - - prepareMarshalObject(val, cctx); - - prepareMarshalObject(prevVal, cctx); } /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - finishUnmarshalObject(key, cctx, ldr); - - finishUnmarshalObject(val, cctx, ldr); - - finishUnmarshalObject(prevVal, cctx, ldr); - } - - /** - * @param obj CacheObject to marshal - * @param ctx context - * @throws IgniteCheckedException if error - */ - private void prepareMarshalObject(CacheObject obj, GridCacheContext ctx) throws IgniteCheckedException { - if (obj != null) - obj.prepareMarshal(ctx.cacheObjectContext()); - } - - /** - * @param obj CacheObject un to marshal - * @param ctx context - * @param ldr class loader - * @throws IgniteCheckedException if error - */ - private void finishUnmarshalObject(@Nullable CacheObject obj, GridCacheContext ctx, - ClassLoader ldr) throws IgniteCheckedException { - if (obj != null) - obj.finishUnmarshal(ctx.cacheObjectContext(), ldr); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 4d06d1705daaf..d49c7a193cf16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -497,18 +497,6 @@ else if (conflictVers != null) @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - GridCacheContext cctx = ctx.cacheContext(cacheId); - - finishUnmarshalCacheObjects(keys, cctx, ldr); - - finishUnmarshalCacheObjects(vals, cctx, ldr); - - finishUnmarshalCacheObjects(nearKeys, cctx, ldr); - - finishUnmarshalCacheObjects(nearVals, cctx, ldr); - - finishUnmarshalCacheObjects(prevVals, cctx, ldr); - if (forceTransformBackups) { if (entryProcessors == null) entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index a63354bc634fa..0cca67faca24b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -136,13 +136,6 @@ public void nearEvicted(List nearEvicted) { /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - finishUnmarshalCacheObjects(nearEvicted, cctx, ldr); - - if (errs != null) - errs.finishUnmarshal(this, cctx, ldr); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java index 62fcf3966d916..59c6427968028 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java @@ -338,21 +338,6 @@ else if (conflictVers != null) prepareMarshalCacheObjects(keys, cctx); - if (filter != null) { - boolean hasFilter = false; - - for (CacheEntryPredicate p : filter) { - if (p != null) { - hasFilter = true; - - p.prepareMarshal(cctx); - } - } - - if (!hasFilter) - filter = null; - } - if (operation() == TRANSFORM) { // force addition of deployment info for entry processors if P2P is enabled globally. if (!addDepInfo && ctx.deploymentEnabled()) @@ -377,15 +362,6 @@ else if (conflictVers != null) if (expiryPlcBytes != null && expiryPlc == null) expiryPlc = U.unmarshal(ctx, expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - finishUnmarshalCacheObjects(keys, cctx, ldr); - - if (filter != null) { - for (CacheEntryPredicate p : filter) { - if (p != null) - p.finishUnmarshal(cctx, ldr); - } - } - if (operation() == TRANSFORM) { if (entryProcessors == null) entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); @@ -393,8 +369,6 @@ else if (conflictVers != null) if (invokeArgsBytes != null && invokeArgs == null) invokeArgs = unmarshalInvokeArguments(invokeArgsBytes.toArray(new byte[invokeArgsBytes.size()][]), ctx, ldr); } - else - finishUnmarshalCacheObjects(vals, cctx, ldr); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java index 7b072491d4f89..c5941eee9a9d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java @@ -24,7 +24,6 @@ import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; -import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.typedef.internal.S; @@ -94,37 +93,11 @@ public GridNearAtomicSingleUpdateFilterRequest() { /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - if (filter != null) { - boolean hasFilter = false; - - for (CacheEntryPredicate p : filter) { - if (p != null) { - hasFilter = true; - - p.prepareMarshal(cctx); - } - } - - if (!hasFilter) - filter = null; - } } /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - - if (filter != null) { - GridCacheContext cctx = ctx.cacheContext(cacheId); - - for (CacheEntryPredicate p : filter) { - if (p != null) - p.finishUnmarshal(cctx, ldr); - } - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java index 25b31b062c7c3..34edb238e0027 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java @@ -28,7 +28,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheUtils; @@ -219,25 +218,11 @@ public GridNearAtomicSingleUpdateRequest() { /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - prepareMarshalCacheObject(key, cctx); - - if (val != null) - prepareMarshalCacheObject(val, cctx); } /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - key.finishUnmarshal(cctx.cacheObjectContext(), ldr); - - if (val != null) - val.finishUnmarshal(cctx.cacheObjectContext(), ldr); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index a1103fb308d55..d82f7f9b8e51c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -360,12 +360,6 @@ synchronized void addFailedKeys(Collection keys, Throwable e) { GridCacheContext cctx = ctx.cacheContext(cacheId); - if (errs != null) - errs.finishUnmarshal(this, cctx, ldr); - - if (nearUpdates != null) - finishUnmarshalCacheObjects(nearUpdates.nearValues(), cctx, ldr); - if (ret != null) ret.finishUnmarshal(cctx, ldr); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java index 0e42f82082251..0a1848fd4c279 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java @@ -120,12 +120,6 @@ void prepareMarshal(GridCacheMessage msg, GridCacheContext cctx) throws Ig msg.prepareMarshalCacheObjects(failedKeys, cctx); } - /** */ - void finishUnmarshal(GridCacheMessage msg, GridCacheContext cctx, ClassLoader ldr) throws IgniteCheckedException { - msg.finishUnmarshalCacheObjects(failedKeys, cctx, ldr); - } - - /** {@inheritDoc} */ @Override public String toString() { return S.toString(UpdateErrors.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java index 898ad4478f086..2b4c1d986d74e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java @@ -125,10 +125,6 @@ public Collection keys() { /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - finishUnmarshalCacheObjects(keys, cctx, ldr); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java index cafcb5fa3e364..c5b5900fe847e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java @@ -152,9 +152,6 @@ public void addInfo(GridCacheEntryInfo info) { GridCacheContext cctx = ctx.cacheContext(cacheId); - if (missedKeys != null) - finishUnmarshalCacheObjects(missedKeys, cctx, ldr); - if (infos != null) { for (GridCacheEntryInfo info : infos) info.unmarshal(cctx.cacheObjectContext(), ldr); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java index af5d90f84d2a2..e71b310aefcb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java @@ -17,11 +17,9 @@ package org.apache.ignite.internal.processors.cache.distributed.near; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheObjectContext; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -30,7 +28,7 @@ /** * Cache object and version. */ -public class CacheVersionedValue implements Message { +public class CacheVersionedValue extends GridCacheIdMessage implements Message { /** Value. */ @Order(0) @GridToStringInclude @@ -50,9 +48,10 @@ public CacheVersionedValue() { * @param val Cache value. * @param ver Cache version. */ - public CacheVersionedValue(CacheObject val, GridCacheVersion ver) { + public CacheVersionedValue(CacheObject val, GridCacheVersion ver, int cacheId) { this.val = val; this.ver = ver; + this.cacheId = cacheId; } /** @@ -69,32 +68,11 @@ public CacheObject value() { return val; } - /** - * This method is called before the whole message is sent - * and is responsible for pre-marshalling state. - * - * @param ctx Cache object context. - * @throws IgniteCheckedException If failed. - */ - public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { - if (val != null) - val.prepareMarshal(ctx); - } - - /** - * This method is called after the whole message is received - * and is responsible for unmarshalling state. - * - * @param ctx Context. - * @param ldr Class loader. - * @throws IgniteCheckedException If failed. - */ - public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { - if (val != null) - val.finishUnmarshal(ctx.cacheObjectContext(), ldr); + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheVersionedValue.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index a8df9c74253c6..75ef1509bc4af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -628,8 +628,6 @@ private Map loadEntries( for (GridCacheEntryInfo info : infos) { try { - info.unmarshalValue(cctx, cctx.deploy().globalLoader()); - // Entries available locally in DHT should not be loaded into near cache for reading. if (!cctx.affinity().keyLocalNode(info.key(), cctx.affinity().affinityTopologyVersion())) { GridNearCacheEntry entry = savedEntries.get(info.key()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index 74ad48394405d..46f432c2b1b34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@ -17,25 +17,16 @@ package org.apache.ignite.internal.processors.cache.distributed.near; -import java.util.ArrayList; -import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable; import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -69,40 +60,33 @@ public class GridNearGetRequest extends GridCacheIdMessage implements GridCacheD @Order(2) GridCacheVersion ver; - /** */ - @GridToStringInclude - private LinkedHashMap keyMap; - /** */ @Order(3) - List keys; + @GridToStringInclude + LinkedHashMap keys; /** */ @Order(4) - List readersFlags; - - /** */ - @Order(5) byte flags; /** Topology version. */ - @Order(6) + @Order(5) AffinityTopologyVersion topVer; /** Task name hash. */ - @Order(7) + @Order(6) int taskNameHash; /** TTL for read operation. */ - @Order(8) + @Order(7) long createTtl; /** TTL for read operation. */ - @Order(9) + @Order(8) long accessTtl; /** Transaction label. */ - @Order(10) + @Order(9) @Nullable String txLbl; /** @@ -132,7 +116,7 @@ public GridNearGetRequest( IgniteUuid futId, IgniteUuid miniId, GridCacheVersion ver, - Map keys, + LinkedHashMap keys, boolean readThrough, @NotNull AffinityTopologyVersion topVer, int taskNameHash, @@ -151,19 +135,7 @@ public GridNearGetRequest( this.futId = futId; this.miniId = miniId; this.ver = ver; - - this.keys = new ArrayList<>(keys.size()); - - if (addReader) - readersFlags = new ArrayList<>(keys.size()); - - for (Map.Entry entry : keys.entrySet()) { - this.keys.add(entry.getKey()); - - if (addReader) - readersFlags.add(entry.getValue()); - } - + this.keys = keys; this.topVer = topVer; this.taskNameHash = taskNameHash; this.createTtl = createTtl; @@ -215,7 +187,7 @@ public int taskNameHash() { * @return Keys. */ public LinkedHashMap keyMap() { - return keyMap; + return keys; } /** @@ -268,7 +240,7 @@ public long accessTtl() { /** {@inheritDoc} */ @Override public int partition() { - return keys != null && !keys.isEmpty() ? keys.get(0).partition() : -1; + return keys != null && !keys.isEmpty() ? keys.keySet().iterator().next().partition() : -1; } /** @@ -280,50 +252,6 @@ public long accessTtl() { return txLbl; } - /** - * @param ctx Cache context. - * @throws IgniteCheckedException If failed. - */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - assert ctx != null; - assert !F.isEmpty(keys); - assert readersFlags == null || keys.size() == readersFlags.size(); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - prepareMarshalCacheObjects(keys, cctx); - } - - /** - * @param ctx Context. - * @param ldr Loader. - * @throws IgniteCheckedException If failed. - */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - finishUnmarshalCacheObjects(keys, cctx, ldr); - - assert !F.isEmpty(keys); - assert readersFlags == null || keys.size() == readersFlags.size(); - - if (keyMap == null) { - keyMap = U.newLinkedHashMap(keys.size()); - - Iterator keysIt = keys.iterator(); - - for (int i = 0; i < keys.size(); i++) { - Boolean addRdr = readersFlags != null ? readersFlags.get(i) : Boolean.FALSE; - - keyMap.put(keysIt.next(), addRdr); - } - } - } - /** {@inheritDoc} */ @Override public boolean addDeploymentInfo() { return addDepInfo; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java index 07ca1355fc0f0..e5c822040790b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java @@ -263,12 +263,6 @@ public boolean recovery() { /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - - assert key != null; - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - key.finishUnmarshal(cctx.cacheObjectContext(), ldr); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java index 73f72ec4c0247..8aee353a775f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java @@ -153,8 +153,6 @@ public long futureId() { if (res instanceof CacheObject) prepareMarshalCacheObject((CacheObject)res, cctx); - else if (res instanceof CacheVersionedValue) - ((CacheVersionedValue)res).prepareMarshal(cctx.cacheObjectContext()); else if (res instanceof GridCacheEntryInfo) ((GridCacheEntryInfo)res).marshal(cctx); } @@ -167,11 +165,7 @@ else if (res instanceof GridCacheEntryInfo) if (res != null) { GridCacheContext cctx = ctx.cacheContext(cacheId()); - if (res instanceof CacheObject) - ((CacheObject)res).finishUnmarshal(cctx.cacheObjectContext(), ldr); - else if (res instanceof CacheVersionedValue) - ((CacheVersionedValue)res).finishUnmarshal(cctx, ldr); - else if (res instanceof GridCacheEntryInfo) + if (res instanceof GridCacheEntryInfo) ((GridCacheEntryInfo)res).unmarshal(cctx, ldr); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index 646335209b86e..067275a21124b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -196,7 +196,7 @@ public void addOwnedValue(IgniteTxKey key, GridCacheVersion ver, CacheObject val if (ownedVals == null) ownedVals = new HashMap<>(); - CacheVersionedValue oVal = new CacheVersionedValue(val, ver); + CacheVersionedValue oVal = new CacheVersionedValue(val, ver, key.cacheId); ownedVals.put(key, oVal); } @@ -248,14 +248,6 @@ public boolean hasOwnedValue(IgniteTxKey key) { ownedValKeys = ownedVals.keySet(); ownedValVals = ownedVals.values(); - - for (Map.Entry entry : ownedVals.entrySet()) { - GridCacheContext cacheCtx = ctx.cacheContext(entry.getKey().cacheId()); - - entry.getKey().prepareMarshal(cacheCtx); - - entry.getValue().prepareMarshal(cacheCtx.cacheObjectContext()); - } } if (retVal != null && retVal.cacheId() != 0) { @@ -265,14 +257,6 @@ public boolean hasOwnedValue(IgniteTxKey key) { retVal.prepareMarshal(cctx); } - - if (filterFailedKeys != null) { - for (IgniteTxKey key : filterFailedKeys) { - GridCacheContext cctx = ctx.cacheContext(key.cacheId()); - - key.prepareMarshal(cctx); - } - } } /** {@inheritDoc} */ @@ -296,14 +280,8 @@ public boolean hasOwnedValue(IgniteTxKey key) { while (keyIter.hasNext()) { IgniteTxKey key = keyIter.next(); - GridCacheContext cctx = ctx.cacheContext(key.cacheId()); - CacheVersionedValue val = valIter.next(); - key.finishUnmarshal(cctx, ldr); - - val.finishUnmarshal(cctx, ldr); - ownedVals.put(key, val); } } @@ -315,14 +293,6 @@ public boolean hasOwnedValue(IgniteTxKey key) { retVal.finishUnmarshal(cctx, ldr); } - - if (filterFailedKeys != null) { - for (IgniteTxKey key : filterFailedKeys) { - GridCacheContext cctx = ctx.cacheContext(key.cacheId()); - - key.finishUnmarshal(cctx, ldr); - } - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index 84813c40420c9..ddec87b120750 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -21,7 +21,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; @@ -448,11 +447,6 @@ private static byte setDataPageScanEnabled(int flags, Boolean enabled) { idxQryDescBytes = CU.marshal(cctx, idxQryDesc); } - - if (!F.isEmpty(skipKeys)) { - for (KeyCacheObject k : skipKeys) - k.prepareMarshal(cctx.cacheObjectContext()); - } } /** {@inheritDoc} */ @@ -477,13 +471,6 @@ private static byte setDataPageScanEnabled(int flags, Boolean enabled) { if (idxQryDescBytes != null && idxQryDesc == null) idxQryDesc = U.unmarshal(mrsh, idxQryDescBytes, clsLdr); - - if (!F.isEmpty(skipKeys)) { - CacheObjectContext objCtx = ctx.cacheObjectContext(cacheId); - - for (KeyCacheObject k : skipKeys) - k.finishUnmarshal(objCtx, ldr); - } } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java index 6b295f2561182..7302b050e5407 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java @@ -25,12 +25,10 @@ import org.apache.ignite.internal.Order; import org.apache.ignite.internal.cache.query.index.IndexQueryResultMeta; import org.apache.ignite.internal.managers.communication.ErrorMessage; -import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -154,22 +152,9 @@ public GridCacheQueryResponse(int cacheId, long reqId, Throwable err, boolean ad ClassLoader ldr0 = U.resolveClassLoader(ldr, ctx.gridConfig()); - CacheObjectContext cacheObjCtx = null; - for (byte[] bytes : byteCol) { Object obj = bytes == null ? null : marsh.unmarshal(bytes, ldr0); - if (obj instanceof Map.Entry) { - Object key = ((Map.Entry)obj).getKey(); - - if (key instanceof KeyCacheObject) { - if (cacheObjCtx == null) - cacheObjCtx = ctx.cacheContext(cacheId).cacheObjectContext(); - - ((KeyCacheObject)key).finishUnmarshal(cacheObjCtx, ldr0); - } - } - col.add((T)obj); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index ef1bbce28a9f9..136148ae0283c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -47,7 +48,6 @@ import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; @@ -59,7 +59,7 @@ * {@link #equals(Object)} method, as transaction entries should use referential * equality. */ -public class IgniteTxEntry implements GridPeerDeployAware, Message { +public class IgniteTxEntry extends GridCacheIdMessage implements GridPeerDeployAware { /** */ private static final long serialVersionUID = 0L; @@ -106,17 +106,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { @Order(0) KeyCacheObject key; - /** Cache ID. */ - @GridToStringExclude - @Order(1) - int cacheId; - /** Transient tx key. */ private IgniteTxKey txKey; /** Cache value. */ @GridToStringInclude - @Order(2) + @Order(1) TxEntryValueHolder val = new TxEntryValueHolder(); /** Visible value for peek. */ @@ -125,7 +120,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Old value before update. */ @GridToStringInclude - @Order(3) + @Order(2) TxEntryValueHolder oldVal = new TxEntryValueHolder(); /** Transform. */ @@ -137,24 +132,24 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Transform closure bytes. */ @GridToStringExclude - @Order(4) + @Order(3) byte[] transformClosBytes; /** Time to live. */ - @Order(5) + @Order(4) long ttl; /** DR expire time (explicit) */ - @Order(6) + @Order(5) long conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; /** Conflict version. */ - @Order(7) + @Order(6) GridCacheVersion conflictVer; /** Explicit lock version if there is one. */ @GridToStringInclude - @Order(8) + @Order(7) GridCacheVersion explicitVer; /** DHT version. */ @@ -162,7 +157,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Put filters. */ @GridToStringInclude - @Order(9) + @Order(8) CacheEntryPredicate[] filters; /** Flag indicating whether filters passed. Used for fast-commit transactions. */ @@ -196,18 +191,18 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { private boolean transferExpiryPlc; /** Expiry policy bytes. */ - @Order(10) + @Order(9) byte[] expiryPlcBytes; /** Additional flags. */ - @Order(11) + @Order(10) byte flags; /** Partition update counter. */ private long partUpdateCntr; /** */ - @Order(12) + @Order(11) GridCacheVersion serReadVer; /** @@ -487,13 +482,6 @@ public KeyCacheObject key() { return key; } - /** - * @return Cache ID. - */ - public int cacheId() { - return cacheId; - } - /** * Sets skip store flag value. * @@ -929,13 +917,6 @@ public void filtersSet(boolean filtersSet) { * @throws IgniteCheckedException If failed. */ public void marshal(GridCacheSharedContext ctx, boolean transferExpiry) throws IgniteCheckedException { - if (filters != null) { - for (CacheEntryPredicate p : filters) { - if (p != null) - p.prepareMarshal(this.ctx); - } - } - // Do not serialize filters if they are null. if (transformClosBytes == null && entryProcessorsCol != null) transformClosBytes = CU.marshal(this.ctx, entryProcessorsCol); @@ -943,19 +924,12 @@ public void marshal(GridCacheSharedContext ctx, boolean transferExpiry) th if (transferExpiry) transferExpiryPlc = expiryPlc != null && expiryPlc != this.ctx.expiry(); - key.prepareMarshal(context().cacheObjectContext()); - - val.marshal(context()); - if (transferExpiryPlc) { if (expiryPlcBytes == null) expiryPlcBytes = CU.marshal(this.ctx, new IgniteExternalizableExpiryPolicy(expiryPlc)); } else expiryPlcBytes = null; - - if (oldVal != null) - oldVal.marshal(context()); } /** @@ -1017,22 +991,9 @@ public void unmarshal( if (filters == null) filters = CU.empty0(); - else { - for (CacheEntryPredicate p : filters) { - if (p != null) - p.finishUnmarshal(this.ctx, clsLdr); - } - } - - key.finishUnmarshal(coctx, clsLdr); - - val.unmarshal(coctx, clsLdr); if (expiryPlcBytes != null && expiryPlc == null) expiryPlc = U.unmarshal(ctx, expiryPlcBytes, U.resolveClassLoader(clsLdr, ctx.gridConfig())); - - if (hasOldValue()) - oldVal.unmarshal(coctx, clsLdr); } /** @@ -1109,6 +1070,11 @@ public void clearEntryReadVersion() { return deployClass().getClassLoader(); } + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } + /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(IgniteTxEntry.class, this, "xidVer", tx == null ? "null" : tx.xidVersion()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java index b50bc340c3ba1..1645e9ce3d40c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxKey.java @@ -17,28 +17,22 @@ package org.apache.ignite.internal.processors.cache.transactions; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; /** * Cache transaction key. This wrapper is needed because same keys may be enlisted in the same transaction * for multiple caches. */ -public class IgniteTxKey implements Message { +public class IgniteTxKey extends GridCacheIdMessage { /** Key. */ @Order(0) @GridToStringInclude(sensitive = true) KeyCacheObject key; - /** Cache ID. */ - @Order(1) - int cacheId; - /** * Empty constructor. */ @@ -62,32 +56,6 @@ public KeyCacheObject key() { return key; } - /** - * @return Cache ID. - */ - public int cacheId() { - return cacheId; - } - - /** - * @param ctx Context. - * @throws IgniteCheckedException If failed. - */ - public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { - key.prepareMarshal(ctx.cacheObjectContext()); - } - - /** - * @param ctx Context. - * @param ldr Class loader. - * @throws IgniteCheckedException If failed. - */ - public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { - assert key != null; - - key.finishUnmarshal(ctx.cacheObjectContext(), ldr); - } - /** {@inheritDoc} */ @Override public boolean equals(Object o) { if (this == o) @@ -110,6 +78,10 @@ public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws Ignite return res; } + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } /** {@inheritDoc} */ @Override public String toString() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java index 98e5830cb3b46..425e4d88cdfeb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java @@ -17,11 +17,8 @@ package org.apache.ignite.internal.processors.cache.transactions; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; -import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -119,25 +116,6 @@ public boolean hasReadValue() { return hasReadVal; } - /** - * @param ctx Cache context. - * @throws IgniteCheckedException If marshaling failed. - */ - public void marshal(GridCacheContext ctx) throws IgniteCheckedException { - if (hasWriteVal && val != null) - val.prepareMarshal(ctx.cacheObjectContext()); - } - - /** - * @param ctx Cache context. - * @param ldr Class loader. - * @throws IgniteCheckedException If unmarshalling failed. - */ - public void unmarshal(CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException { - if (hasWriteVal && val != null) - val.finishUnmarshal(ctx, ldr); - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(TxEntryValueHolder.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java index 4db908333d72b..b47b61bde2e81 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java @@ -19,15 +19,11 @@ import java.util.Collection; import java.util.Set; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.cache.GridCacheMessage; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; /** * Transactions lock list request. @@ -38,13 +34,9 @@ public class TxLocksRequest extends GridCacheMessage { long futId; /** Tx keys. */ - @GridToStringInclude - private Set txKeys; - - /** Array of txKeys from {@link #txKeys}. Used during marshalling and unmarshalling. */ - @GridToStringExclude @Order(1) - IgniteTxKey[] txKeysArr; + @GridToStringInclude + Set txKeys; /** * Default constructor. @@ -89,33 +81,7 @@ public Collection txKeys() { } /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - txKeysArr = new IgniteTxKey[txKeys.size()]; - - int i = 0; - - for (IgniteTxKey key : txKeys) { - key.prepareMarshal(ctx.cacheContext(key.cacheId())); - - txKeysArr[i++] = key; - } + @Override public short directType() { + return -24; } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - txKeys = U.newHashSet(txKeysArr.length); - - for (IgniteTxKey key : txKeysArr) { - key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr); - - txKeys.add(key); - } - - txKeysArr = null; - } - } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java index 8ebbcbddbd031..46cb9217a671c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java @@ -151,8 +151,6 @@ public void addKey(IgniteTxKey key) { for (Map.Entry> entry : nearTxKeyLocks.entrySet()) { IgniteTxKey key = entry.getKey(); - key.prepareMarshal(ctx.cacheContext(key.cacheId())); - nearTxKeysArr[i] = key; locksArr[i] = entry.getValue(); @@ -165,11 +163,8 @@ public void addKey(IgniteTxKey key) { int i = 0; - for (IgniteTxKey key : txKeys) { - key.prepareMarshal(ctx.cacheContext(key.cacheId())); - + for (IgniteTxKey key : txKeys) txKeysArr[i++] = key; - } } } @@ -182,8 +177,6 @@ public void addKey(IgniteTxKey key) { for (int i = 0; i < nearTxKeysArr.length; i++) { IgniteTxKey txKey = nearTxKeysArr[i]; - txKey.key().finishUnmarshal(ctx.cacheObjectContext(txKey.cacheId()), ldr); - txLocks().put(txKey, locksArr[i]); } @@ -194,11 +187,8 @@ public void addKey(IgniteTxKey key) { if (txKeysArr != null) { txKeys = U.newHashSet(txKeysArr.length); - for (IgniteTxKey txKey : txKeysArr) { - txKey.key().finishUnmarshal(ctx.cacheObjectContext(txKey.cacheId()), ldr); - + for (IgniteTxKey txKey : txKeysArr) txKeys.add(txKey); - } txKeysArr = null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java index 750c748b2ac60..a56fb8ccae11c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java @@ -23,6 +23,8 @@ import org.apache.ignite.configuration.DeploymentMode; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; +import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -33,7 +35,7 @@ /** * */ -public class DataStreamerRequest implements Message { +public class DataStreamerRequest extends GridCacheIdMessage implements Message { /** */ @Order(0) long reqId; @@ -162,6 +164,8 @@ public DataStreamerRequest( this.forceLocDep = forceLocDep; this.topVer = topVer; this.partId = partId; + + cacheId = GridCacheUtils.cacheId(cacheName); } /** @@ -269,6 +273,11 @@ public AffinityTopologyVersion topologyVersion() { return topVer; } + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } + /** * @return Partition ID. */ diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java index f2190a1e86a41..a47cbc6e69e49 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java @@ -193,36 +193,40 @@ public default void setBuffer(ByteBuffer buf) { /** * Reads nested message. + * @param encMsg Message. * * @param Type of the message. * @return Message. */ - public default T readMessage() { - return readMessage(false); + public default T readMessage(Message encMsg) { + return readMessage(encMsg, false); } /** * Reads nested message. * + * @param encMsg Message. * @param compress Whether message should be decompressed. * @param Type of the message. * @return Message. */ - public T readMessage(boolean compress); + public T readMessage(Message encMsg, boolean compress); /** * Reads {@link CacheObject}. + * @param msg Message. * * @return Cache object. */ - public CacheObject readCacheObject(); + public CacheObject readCacheObject(Message msg); /** * Reads {@link KeyCacheObject}. + * @param msg Message. * * @return Key cache object. */ - public KeyCacheObject readKeyCacheObject(); + public KeyCacheObject readKeyCacheObject(Message msg); /** * Reads {@link GridLongList}. @@ -236,39 +240,43 @@ public default T readMessage() { * * @param type Array component type. * @param Type of the read object. + * @param msg Message. * @return Array of objects. */ - public T[] readObjectArray(MessageArrayType type); + public T[] readObjectArray(MessageArrayType type, Message msg); /** * Reads any collection. * * @param type Collection item type. * @param Type of the read collection. + * @param msg Message. * @return Collection. */ - public > C readCollection(MessageCollectionType type); + public > C readCollection(MessageCollectionType type, Message msg); /** * Reads map. * * @param type Map type. * @param Type of the read map. + * @param msg Message. * @return Map. */ - public default > M readMap(MessageMapType type) { - return readMap(type, false); + public default > M readMap(MessageMapType type, Message msg) { + return readMap(type, msg, false); } /** * Reads map. * * @param type Map type. + * @param msg Message. * @param compress Whether map should be compressed. * @param Type of the read map. * @return Map. */ - public > M readMap(MessageMapType type, boolean compress); + public > M readMap(MessageMapType type, Message msg, boolean compress); /** * Tells whether last invocation of any of {@code readXXX(...)} diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java index c29e3ac093db9..33020e8276185 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java @@ -263,17 +263,19 @@ public default boolean writeMessage(Message val) { * Writes {@link CacheObject}. * * @param obj Cache object. + * @param msg Message. * @return Whether value was fully written. */ - public boolean writeCacheObject(CacheObject obj); + public boolean writeCacheObject(CacheObject obj, Message msg); /** * Writes {@link KeyCacheObject}. * * @param obj Key cache object. + * @param msg Message. * @return Whether value was fully written. */ - public boolean writeKeyCacheObject(KeyCacheObject obj); + public boolean writeKeyCacheObject(KeyCacheObject obj, Message msg); /** * Writes {@link GridLongList}. @@ -289,9 +291,10 @@ public default boolean writeMessage(Message val) { * @param arr Array of objects. * @param type Array component type. * @param Type of the objects that array contains. + * @param msg Message. * @return Whether array was fully written. */ - public boolean writeObjectArray(T[] arr, MessageArrayType type); + public boolean writeObjectArray(T[] arr, MessageArrayType type, Message msg); /** * Writes collection with its elements order. @@ -299,9 +302,10 @@ public default boolean writeMessage(Message val) { * @param col Collection. * @param type Collection item type. * @param Type of the objects that collection contains. + * @param msg Message. * @return Whether value was fully written. */ - public boolean writeCollection(Collection col, MessageCollectionType type); + public boolean writeCollection(Collection col, MessageCollectionType type, Message msg); /** * Writes map. @@ -310,10 +314,11 @@ public default boolean writeMessage(Message val) { * @param type Map type. * @param Initial key types of the map to write. * @param Initial value types of the map to write. + * @param msg Message. * @return Whether value was fully written. */ - public default boolean writeMap(Map map, MessageMapType type) { - return writeMap(map, type, false); + public default boolean writeMap(Map map, MessageMapType type, Message msg) { + return writeMap(map, type, msg, false); } /** @@ -321,12 +326,13 @@ public default boolean writeMap(Map map, MessageMapType type) { * * @param map Map. * @param type Map type. + * @param msg Message. * @param compress Whether map should be compressed. * @param Initial key types of the map to write. * @param Initial value types of the map to write. * @return Whether value was fully written. */ - public boolean writeMap(Map map, MessageMapType type, boolean compress); + public boolean writeMap(Map map, MessageMapType type, Message msg, boolean compress); /** * @return Whether header of current message is already written. diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/SkipCacheObjectsMarshallingMessage.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/SkipCacheObjectsMarshallingMessage.java new file mode 100644 index 0000000000000..cb8c2c55b2d93 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/SkipCacheObjectsMarshallingMessage.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.plugin.extensions.communication; + +/** */ +public interface SkipCacheObjectsMarshallingMessage { +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java index 97a048a508ddd..6000c6f1a5ab0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java @@ -878,7 +878,7 @@ private MessageFactory get() { @Override public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException { // Enable sending wait message for a communication peer while context isn't initialized. if (!stateProvider.spiContextAvailable()) - return new DirectMessageWriter(msgFactory, igniteCfg.getNetworkCompressionLevel()); + return new DirectMessageWriter(msgFactory, null, null, igniteCfg.getNetworkCompressionLevel()); final IgniteSpiContext ctx = stateProvider.getSpiContextWithoutInitialLatch(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 30e9b1b73f810..d9f2aa4925c97 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -108,8 +108,8 @@ public class TcpDiscoveryIoSession { msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); - msgWriter = new DirectMessageWriter(spi.messageFactory()); - msgReader = new DirectMessageReader(spi.messageFactory(), null); + msgWriter = new DirectMessageWriter(spi.messageFactory(), null, null); + msgReader = new DirectMessageReader(spi.messageFactory(), null, null); try { int sendBufSize = sock.getSendBufferSize() > 0 ? sock.getSendBufferSize() : DFLT_SOCK_BUFFER_SIZE; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java index cd7fccd16fd43..0ec81af6cde37 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java @@ -90,7 +90,7 @@ public void testNestedContainers() { private T doMarshalUnmarshalChunked(T srcMsg) { ByteBuffer buf = ByteBuffer.allocate(256); - DirectMessageWriter writer = new DirectMessageWriter(msgFactory); + DirectMessageWriter writer = new DirectMessageWriter(msgFactory, null, null); boolean fullyWritten = false; @@ -111,7 +111,7 @@ private T doMarshalUnmarshalChunked(T srcMsg) { buf.flip(); buf.get(bytes); - DirectMessageReader reader = new DirectMessageReader(msgFactory, null); + DirectMessageReader reader = new DirectMessageReader(msgFactory, null, null); Message resMsg = null; @@ -128,7 +128,7 @@ private T doMarshalUnmarshalChunked(T srcMsg) { reader.setBuffer(chunk); - resMsg = reader.readMessage(false); + resMsg = reader.readMessage(srcMsg, false); pos += chunk.position(); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java index 44f44be77f700..0efe0c3aa9692 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java @@ -101,7 +101,7 @@ private static DirectByteBufferStream createStream(ByteBuffer buff) { @Override public MessageSerializer serializer(short type) { return null; } - }); + }, null, null); stream.setBuffer(buff); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java index e003670f18f10..ac9df8b7aa0b7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java @@ -275,12 +275,12 @@ private boolean writeField(Class type) { } /** {@inheritDoc} */ - @Override public boolean writeCacheObject(CacheObject obj) { + @Override public boolean writeCacheObject(CacheObject obj, Message msg) { return writeField(CacheObject.class); } /** {@inheritDoc} */ - @Override public boolean writeKeyCacheObject(KeyCacheObject obj) { + @Override public boolean writeKeyCacheObject(KeyCacheObject obj, Message msg) { return writeField(KeyCacheObject.class); } @@ -295,17 +295,17 @@ private boolean writeField(Class type) { } /** {@inheritDoc} */ - @Override public boolean writeObjectArray(T[] arr, MessageArrayType type) { + @Override public boolean writeObjectArray(T[] arr, MessageArrayType type, Message msg) { return writeField(Object[].class); } /** {@inheritDoc} */ - @Override public boolean writeCollection(Collection col, MessageCollectionType type) { + @Override public boolean writeCollection(Collection col, MessageCollectionType type, Message msg) { return writeField(type.set() ? Set.class : Collection.class); } /** {@inheritDoc} */ - @Override public boolean writeMap(Map map, MessageMapType type, boolean compress) { + @Override public boolean writeMap(Map map, MessageMapType type, Message msg, boolean compress) { return writeField(type.linked() ? LinkedHashMap.class : HashMap.class); } @@ -522,21 +522,21 @@ private void readField(Class type) { } /** {@inheritDoc} */ - @Override public T readMessage(boolean compress) { + @Override public T readMessage(Message encMsg, boolean compress) { readField(Message.class); return null; } /** {@inheritDoc} */ - @Override public CacheObject readCacheObject() { + @Override public CacheObject readCacheObject(Message msg) { readField(CacheObject.class); return null; } /** {@inheritDoc} */ - @Override public KeyCacheObject readKeyCacheObject() { + @Override public KeyCacheObject readKeyCacheObject(Message msg) { readField(KeyCacheObject.class); return null; @@ -550,21 +550,21 @@ private void readField(Class type) { } /** {@inheritDoc} */ - @Override public T[] readObjectArray(MessageArrayType type) { + @Override public T[] readObjectArray(MessageArrayType type, Message msg) { readField(Object[].class); return null; } /** {@inheritDoc} */ - @Override public > C readCollection(MessageCollectionType type) { + @Override public > C readCollection(MessageCollectionType type, Message msg) { readField(type.set() ? Set.class : Collection.class); return null; } /** {@inheritDoc} */ - @Override public > M readMap(MessageMapType type, boolean compress) { + @Override public > M readMap(MessageMapType type, Message msg, boolean compress) { readField(type.linked() ? LinkedHashMap.class : HashMap.class); return null; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java index fb96d7da32ed4..7716517772962 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java @@ -48,7 +48,7 @@ public void testWriteReadHugeMessage() { MessageFactory msgFactory = new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{ new GridIoMessageFactory(jdk(), U.gridClassLoader())}); - DirectMessageWriter writer = new DirectMessageWriter(msgFactory); + DirectMessageWriter writer = new DirectMessageWriter(msgFactory, null, null); ByteBuffer tmpBuf = ByteBuffer.allocate(4096); @@ -94,11 +94,11 @@ public void testWriteReadHugeMessage() { msgBuf.flip(); - DirectMessageReader reader = new DirectMessageReader(msgFactory, null); + DirectMessageReader reader = new DirectMessageReader(msgFactory, null, null); reader.setBuffer(msgBuf); - Message readMsg = reader.readMessage(true); + Message readMsg = reader.readMessage(fullMsg, true); assertTrue(readMsg instanceof GridDhtPartitionsFullMessage); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java index 77616ae9e8d2b..46dfb98d413c4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsCacheSizeTest.java @@ -112,7 +112,7 @@ public void testCacheSize() throws Exception { // We have to skip this header at the further message reading. AtomicInteger initHdrSize = new AtomicInteger(); - DirectMessageWriter msgWritter = new DirectMessageWriter(msgFactory) { + DirectMessageWriter msgWritter = new DirectMessageWriter(msgFactory, null, null) { @Override public void onHeaderWritten() { super.onHeaderWritten(); @@ -127,7 +127,7 @@ public void testCacheSize() throws Exception { assertTrue(msgWritter.getBuffer().hasRemaining()); - DirectMessageReader msgReader = new DirectMessageReader(msgFactory, null); + DirectMessageReader msgReader = new DirectMessageReader(msgFactory, null, null); msgReader.setBuffer(msgWritter.getBuffer()); msgWritter.getBuffer().rewind(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java index e75ed12f40019..1d803bb82424f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java @@ -153,7 +153,7 @@ public void testCacheContinuousQueryEntrySerialization() { new IgniteMessageFactoryImpl(new MessageFactoryProvider[]{new GridIoMessageFactory(jdk(), U.gridClassLoader())}); ByteBuffer buf = ByteBuffer.allocate(4096); - DirectMessageWriter writer = new DirectMessageWriter(msgFactory); + DirectMessageWriter writer = new DirectMessageWriter(msgFactory, null, null); var serializer = msgFactory.serializer(e0.directType()); assertNotNull("Serializer not found for message type " + e0.directType(), serializer); @@ -166,7 +166,7 @@ public void testCacheContinuousQueryEntrySerialization() { CacheContinuousQueryEntry e1 = new CacheContinuousQueryEntry(); - final DirectMessageReader reader = new DirectMessageReader(msgFactory, null); + final DirectMessageReader reader = new DirectMessageReader(msgFactory, null, null); reader.setBuffer(ByteBuffer.wrap(buf.array())); diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 8809b1c204b41..3439b17e80eee 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -539,11 +539,11 @@ public void triggerEvent(Event evt) { if (formatter == null) { formatter = new MessageFormatter() { @Override public MessageWriter writer(MessageFactory msgFactory) { - return new DirectMessageWriter(msgFactory); + return new DirectMessageWriter(msgFactory, null, null); } @Override public MessageReader reader(MessageFactory msgFactory) { - return new DirectMessageReader(msgFactory, null); + return new DirectMessageReader(msgFactory, null, null); } }; } diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridLongListSelfTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridLongListSelfTest.java index 089ba2bdfa8c0..e5e28f836d463 100644 --- a/modules/core/src/test/java/org/apache/ignite/util/GridLongListSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/util/GridLongListSelfTest.java @@ -155,7 +155,7 @@ public void testSerializationCopyConstructor() { * @param initSz Initial size of list. */ private static void testSerialization(GridLongList ll, int initSz) { - DirectMessageWriter writer = new DirectMessageWriter(null); + DirectMessageWriter writer = new DirectMessageWriter(null, null, null); ByteBuffer buf = ByteBuffer.allocate(4096); @@ -236,7 +236,7 @@ private static void testSerialization(GridLongList ll, int initSz) { /** */ @Test public void testSerializationInsufficientBuffer() { - DirectMessageWriter writer = new DirectMessageWriter(null); + DirectMessageWriter writer = new DirectMessageWriter(null, null, null); ByteBuffer buf = ByteBuffer.allocate(10); @@ -252,7 +252,7 @@ public void testSerializationInsufficientBuffer() { /** */ @Test public void testSerializationOfNullValue() { - DirectMessageWriter writer = new DirectMessageWriter(null); + DirectMessageWriter writer = new DirectMessageWriter(null, null, null); ByteBuffer buf = ByteBuffer.allocate(4096); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java index c24857ef13a17..53f37f846b165 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java @@ -20,12 +20,13 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.SkipCacheObjectsMarshallingMessage; import org.h2.value.Value; /** * Abstract message wrapper for H2 values. */ -public abstract class GridH2ValueMessage implements Message { +public abstract class GridH2ValueMessage implements Message, SkipCacheObjectsMarshallingMessage { /** * Gets H2 value. * diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java index ea83271c9f54d..1b3c96f5e13b4 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java @@ -115,7 +115,7 @@ public DiscoverySpiCustomMessage unmarshalZip(byte[] bytes) { /** */ private void serializeMessage(Message m, OutputStream out) throws IOException { - DirectMessageWriter msgWriter = new DirectMessageWriter(msgFactory); + DirectMessageWriter msgWriter = new DirectMessageWriter(msgFactory, null, null); ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); msgWriter.setBuffer(msgBuf); @@ -136,7 +136,7 @@ private void serializeMessage(Message m, OutputStream out) throws IOException { /** */ private Message deserializeMessage(InputStream in) throws IOException { - DirectMessageReader msgReader = new DirectMessageReader(msgFactory, null); + DirectMessageReader msgReader = new DirectMessageReader(msgFactory, null, null); ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); msgReader.setBuffer(msgBuf);