一、前言
关于JeroMQ的(jeromq-master源码包)通过定义zmq.util.Blob类,实现二进制大字段blob存储对象的源码。
二、源码说明
1.Blob类
package zmq.util;@b@@b@import java.util.Arrays; @b@import zmq.Msg;@b@@b@public class Blob@b@{@b@ private final byte[] buf;@b@@b@ private Blob(byte[] data)@b@ {@b@ buf = data;@b@ }@b@@b@ private static Blob createBlob(byte[] data, boolean copy)@b@ {@b@ if (copy) {@b@ byte[] b = new byte[data.length];@b@ System.arraycopy(data, 0, b, 0, data.length);@b@ return new Blob(b);@b@ }@b@ else {@b@ return new Blob(data);@b@ }@b@ }@b@@b@ public static Blob createBlob(Msg msg)@b@ {@b@ return createBlob(msg.data(), true);@b@ }@b@@b@ public static Blob createBlob(byte[] data)@b@ {@b@ return createBlob(data, false);@b@ }@b@@b@ public int size()@b@ {@b@ return buf.length;@b@ }@b@@b@ public byte[] data()@b@ {@b@ return buf;@b@ }@b@@b@ @Override@b@ public boolean equals(Object t)@b@ {@b@ if (t instanceof Blob) {@b@ return Arrays.equals(buf, ((Blob) t).buf);@b@ }@b@ return false;@b@ }@b@@b@ @Override@b@ public int hashCode()@b@ {@b@ return Arrays.hashCode(buf);@b@ }@b@}
2.Msg类
package zmq;@b@@b@import java.io.ByteArrayOutputStream;@b@import java.nio.ByteBuffer;@b@import java.nio.ByteOrder;@b@import java.nio.channels.SocketChannel;@b@@b@import zmq.io.Metadata;@b@import zmq.util.Wire;@b@@b@public class Msg@b@{@b@ // dynamic message building used when the size is not known in advance@b@ public static final class Builder extends Msg@b@ {@b@ private final ByteArrayOutputStream out = new ByteArrayOutputStream();@b@@b@ public Builder()@b@ {@b@ super();@b@ }@b@@b@ @Override@b@ public int size()@b@ {@b@ return out.size();@b@ }@b@@b@ @Override@b@ protected Msg put(int index, byte b)@b@ {@b@ out.write(b);@b@ return this;@b@ }@b@@b@ @Override@b@ public Msg put(byte[] src, int off, int len)@b@ {@b@ if (src == null) {@b@ return this;@b@ }@b@ out.write(src, off, len);@b@ setWriteIndex(getWriteIndex() + len);@b@ return this;@b@ }@b@@b@ @Override@b@ public Msg put(ByteBuffer src, int off, int len)@b@ {@b@ if (src == null) {@b@ return this;@b@ }@b@ for (int idx = off; idx < off + len; ++idx) {@b@ out.write(src.get(idx));@b@ }@b@ setWriteIndex(getWriteIndex() + len);@b@ return this;@b@ }@b@@b@ @Override@b@ public void setFlags(int flags)@b@ {@b@ super.setFlags(flags);@b@ }@b@@b@ public Msg build()@b@ {@b@ return new Msg(this, out);@b@ }@b@ }@b@@b@ enum Type@b@ {@b@ DATA,@b@ DELIMITER@b@ }@b@@b@ public static final int MORE = 1; // Followed by more parts@b@ public static final int COMMAND = 2; // Command frame (see ZMTP spec)@b@ public static final int CREDENTIAL = 32;@b@ public static final int IDENTITY = 64;@b@ public static final int SHARED = 128;@b@@b@ private Metadata metadata;@b@ private int flags;@b@ private Type type;@b@@b@ // the file descriptor where this message originated, needs to be 64bit due to alignment@b@ private SocketChannel fileDesc;@b@@b@ private int size;@b@ private byte[] data;@b@ private final ByteBuffer buf;@b@ // keep track of relative write position@b@ private int writeIndex = 0;@b@ // keep track of relative read position@b@ private int readIndex = 0;@b@@b@ public Msg()@b@ {@b@ this(0);@b@ }@b@@b@ public Msg(int capacity)@b@ {@b@ this.type = Type.DATA;@b@ this.flags = 0;@b@ this.size = capacity;@b@ this.buf = ByteBuffer.wrap(new byte[capacity]).order(ByteOrder.BIG_ENDIAN);@b@ this.data = buf.array();@b@ }@b@@b@ public Msg(byte[] src)@b@ {@b@ if (src == null) {@b@ src = new byte[0];@b@ }@b@ this.type = Type.DATA;@b@ this.flags = 0;@b@ this.size = src.length;@b@ this.data = src;@b@ this.buf = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);@b@ }@b@@b@ public Msg(final ByteBuffer src)@b@ {@b@ if (src == null) {@b@ throw new IllegalArgumentException("ByteBuffer cannot be null");@b@ }@b@ this.type = Type.DATA;@b@ this.flags = 0;@b@ this.buf = src.duplicate();@b@ if (buf.hasArray() && buf.position() == 0 && buf.limit() == buf.capacity()) {@b@ this.data = buf.array();@b@ }@b@ else {@b@ this.data = null;@b@ }@b@ this.size = buf.remaining();@b@ }@b@@b@ public Msg(final Msg m)@b@ {@b@ if (m == null) {@b@ throw new IllegalArgumentException("Msg cannot be null");@b@ }@b@ this.type = m.type;@b@ this.flags = m.flags;@b@ this.size = m.size;@b@ this.buf = m.buf != null ? m.buf.duplicate() : null;@b@ if (m.data != null) {@b@ this.data = new byte[this.size];@b@ System.arraycopy(m.data, 0, this.data, 0, m.size);@b@ }@b@ }@b@@b@ private Msg(Msg src, ByteArrayOutputStream out)@b@ {@b@ this(ByteBuffer.wrap(out.toByteArray()));@b@ this.type = src.type;@b@ this.flags = src.flags;@b@ }@b@@b@ public boolean isIdentity()@b@ {@b@ return (flags & IDENTITY) == IDENTITY;@b@ }@b@@b@ public boolean isDelimiter()@b@ {@b@ return type == Type.DELIMITER;@b@ }@b@@b@ public boolean check()@b@ {@b@ return true; // type >= TYPE_MIN && type <= TYPE_MAX;@b@ }@b@@b@ public int flags()@b@ {@b@ return flags;@b@ }@b@@b@ public boolean hasMore()@b@ {@b@ return (flags & MORE) > 0;@b@ }@b@@b@ public boolean isCommand()@b@ {@b@ return (flags & COMMAND) == COMMAND;@b@ }@b@@b@ public boolean isCredential()@b@ {@b@ return (flags & CREDENTIAL) == CREDENTIAL;@b@ }@b@@b@ public void setFlags(int flags)@b@ {@b@ this.flags |= flags;@b@ }@b@@b@ public void initDelimiter()@b@ {@b@ type = Type.DELIMITER;@b@ metadata = null;@b@ flags = 0;@b@ }@b@@b@ public byte[] data()@b@ {@b@ if (data == null) {@b@ data = new byte[buf.remaining()];@b@ buf.duplicate().get(data);@b@ }@b@ return data;@b@ }@b@@b@ public ByteBuffer buf()@b@ {@b@ return buf.duplicate();@b@ }@b@@b@ public int size()@b@ {@b@ return size;@b@ }@b@@b@ public void resetFlags(int f)@b@ {@b@ flags = flags & ~f;@b@ }@b@@b@ public void setFd(SocketChannel fileDesc)@b@ {@b@ this.fileDesc = fileDesc;@b@ }@b@@b@ // TODO V4 use the source channel@b@ public SocketChannel fd()@b@ {@b@ return fileDesc;@b@ }@b@@b@ public Metadata getMetadata()@b@ {@b@ return metadata;@b@ }@b@@b@ public Msg setMetadata(Metadata metadata)@b@ {@b@ this.metadata = metadata;@b@ return this;@b@ }@b@@b@ public void resetMetadata()@b@ {@b@ setMetadata(null);@b@ }@b@@b@ public byte get()@b@ {@b@ return get(readIndex++);@b@ }@b@@b@ public byte get(int index)@b@ {@b@ return buf.get(index);@b@ }@b@@b@ public Msg put(byte b)@b@ {@b@ return put(writeIndex++, b);@b@ }@b@@b@ public Msg put(int b)@b@ {@b@ return put(writeIndex++, (byte) b);@b@ }@b@@b@ protected Msg put(int index, byte b)@b@ {@b@ buf.put(index, b);@b@ return this;@b@ }@b@@b@ public Msg put(byte[] src)@b@ {@b@ return put(src, 0, src.length);@b@ }@b@@b@ public Msg put(byte[] src, int off, int len)@b@ {@b@ if (src == null) {@b@ return this;@b@ }@b@ ByteBuffer dup = buf.duplicate();@b@ dup.position(writeIndex);@b@ writeIndex += len;@b@ dup.put(src, off, len);@b@ return this;@b@ }@b@@b@ public Msg put(ByteBuffer src, int off, int len)@b@ {@b@ if (src == null) {@b@ return this;@b@ }@b@ int position = src.position();@b@ int limit = src.limit();@b@ src.limit(off + len).position(off);@b@ put(src);@b@ src.limit(limit).position(position);@b@ return this;@b@ }@b@@b@ public Msg put(ByteBuffer src)@b@ {@b@ ByteBuffer dup = buf.duplicate();@b@ dup.position(writeIndex);@b@ writeIndex += Math.min(dup.remaining(), src.remaining());@b@ dup.put(src);@b@ return this;@b@ }@b@@b@ public int getBytes(int index, byte[] dst, int off, int len)@b@ {@b@ int count = Math.min(len, size - index);@b@ if (data == null) {@b@ ByteBuffer dup = buf.duplicate();@b@ dup.position(index);@b@ dup.put(dst, off, count);@b@ }@b@ else {@b@ System.arraycopy(data, index, dst, off, count);@b@ }@b@@b@ return count;@b@ }@b@@b@ public int getBytes(int index, ByteBuffer bb, int len)@b@ {@b@ ByteBuffer dup = buf.duplicate();@b@ dup.position(index);@b@ int count = Math.min(bb.remaining(), dup.remaining());@b@ count = Math.min(count, len);@b@ bb.put(dup);@b@ return count;@b@ }@b@@b@ @Override@b@ public String toString()@b@ {@b@ return String.format("#zmq.Msg{type=%s, size=%s, flags=%s}", type, size, flags);@b@ }@b@@b@ protected final int getWriteIndex()@b@ {@b@ return writeIndex;@b@ }@b@@b@ protected final void setWriteIndex(int writeIndex)@b@ {@b@ this.writeIndex = writeIndex;@b@ }@b@@b@ public long getLong(int offset)@b@ {@b@ return Wire.getUInt64(buf, offset);@b@ }@b@@b@ public int getInt(int offset)@b@ {@b@ return Wire.getUInt32(buf, offset);@b@ }@b@@b@ public void transfer(ByteBuffer destination, int srcOffset, int srcLength)@b@ {@b@ int position = buf.position();@b@ int limit = buf.limit();@b@@b@ buf.limit(srcOffset + srcLength).position(srcOffset);@b@ destination.put(buf);@b@ buf.limit(limit).position(position);@b@ }@b@}