首页

关于jeromq源码包中定义二进制大字段对象类Blob源码说明

标签:BLOB,二进制大字段,jeromq,源码     发布时间:2018-03-31   

一、前言

关于JeroMQ的(jeromq-master源码包)通过定义zmq.util.Blob类,实现二进制大字段blob存储对象的源码。

二、源码说明

1.Blob类

package zmq.util;@b@@b@import java.util.Arrays; @b@import zmq.Msg;@b@@b@public class Blob@b@{@b@    private final byte[] buf;@b@@b@    private Blob(byte[] data)@b@    {@b@        buf = data;@b@    }@b@@b@    private static Blob createBlob(byte[] data, boolean copy)@b@    {@b@        if (copy) {@b@            byte[] b = new byte[data.length];@b@            System.arraycopy(data, 0, b, 0, data.length);@b@            return new Blob(b);@b@        }@b@        else {@b@            return new Blob(data);@b@        }@b@    }@b@@b@    public static Blob createBlob(Msg msg)@b@    {@b@        return createBlob(msg.data(), true);@b@    }@b@@b@    public static Blob createBlob(byte[] data)@b@    {@b@        return createBlob(data, false);@b@    }@b@@b@    public int size()@b@    {@b@        return buf.length;@b@    }@b@@b@    public byte[] data()@b@    {@b@        return buf;@b@    }@b@@b@    @Override@b@    public boolean equals(Object t)@b@    {@b@        if (t instanceof Blob) {@b@            return Arrays.equals(buf, ((Blob) t).buf);@b@        }@b@        return false;@b@    }@b@@b@    @Override@b@    public int hashCode()@b@    {@b@        return Arrays.hashCode(buf);@b@    }@b@}

2.Msg类

package zmq;@b@@b@import java.io.ByteArrayOutputStream;@b@import java.nio.ByteBuffer;@b@import java.nio.ByteOrder;@b@import java.nio.channels.SocketChannel;@b@@b@import zmq.io.Metadata;@b@import zmq.util.Wire;@b@@b@public class Msg@b@{@b@    // dynamic message building used when the size is not known in advance@b@    public static final class Builder extends Msg@b@    {@b@        private final ByteArrayOutputStream out = new ByteArrayOutputStream();@b@@b@        public Builder()@b@        {@b@            super();@b@        }@b@@b@        @Override@b@        public int size()@b@        {@b@            return out.size();@b@        }@b@@b@        @Override@b@        protected Msg put(int index, byte b)@b@        {@b@            out.write(b);@b@            return this;@b@        }@b@@b@        @Override@b@        public Msg put(byte[] src, int off, int len)@b@        {@b@            if (src == null) {@b@                return this;@b@            }@b@            out.write(src, off, len);@b@            setWriteIndex(getWriteIndex() + len);@b@            return this;@b@        }@b@@b@        @Override@b@        public Msg put(ByteBuffer src, int off, int len)@b@        {@b@            if (src == null) {@b@                return this;@b@            }@b@            for (int idx = off; idx < off + len; ++idx) {@b@                out.write(src.get(idx));@b@            }@b@            setWriteIndex(getWriteIndex() + len);@b@            return this;@b@        }@b@@b@        @Override@b@        public void setFlags(int flags)@b@        {@b@            super.setFlags(flags);@b@        }@b@@b@        public Msg build()@b@        {@b@            return new Msg(this, out);@b@        }@b@    }@b@@b@    enum Type@b@    {@b@        DATA,@b@        DELIMITER@b@    }@b@@b@    public static final int MORE       = 1;  //  Followed by more parts@b@    public static final int COMMAND    = 2;  //  Command frame (see ZMTP spec)@b@    public static final int CREDENTIAL = 32;@b@    public static final int IDENTITY   = 64;@b@    public static final int SHARED     = 128;@b@@b@    private Metadata metadata;@b@    private int      flags;@b@    private Type     type;@b@@b@    // the file descriptor where this message originated, needs to be 64bit due to alignment@b@    private SocketChannel fileDesc;@b@@b@    private int              size;@b@    private byte[]           data;@b@    private final ByteBuffer buf;@b@    // keep track of relative write position@b@    private int writeIndex = 0;@b@    // keep track of relative read position@b@    private int readIndex = 0;@b@@b@    public Msg()@b@    {@b@        this(0);@b@    }@b@@b@    public Msg(int capacity)@b@    {@b@        this.type = Type.DATA;@b@        this.flags = 0;@b@        this.size = capacity;@b@        this.buf = ByteBuffer.wrap(new byte[capacity]).order(ByteOrder.BIG_ENDIAN);@b@        this.data = buf.array();@b@    }@b@@b@    public Msg(byte[] src)@b@    {@b@        if (src == null) {@b@            src = new byte[0];@b@        }@b@        this.type = Type.DATA;@b@        this.flags = 0;@b@        this.size = src.length;@b@        this.data = src;@b@        this.buf = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);@b@    }@b@@b@    public Msg(final ByteBuffer src)@b@    {@b@        if (src == null) {@b@            throw new IllegalArgumentException("ByteBuffer cannot be null");@b@        }@b@        this.type = Type.DATA;@b@        this.flags = 0;@b@        this.buf = src.duplicate();@b@        if (buf.hasArray() && buf.position() == 0 && buf.limit() == buf.capacity()) {@b@            this.data = buf.array();@b@        }@b@        else {@b@            this.data = null;@b@        }@b@        this.size = buf.remaining();@b@    }@b@@b@    public Msg(final Msg m)@b@    {@b@        if (m == null) {@b@            throw new IllegalArgumentException("Msg cannot be null");@b@        }@b@        this.type = m.type;@b@        this.flags = m.flags;@b@        this.size = m.size;@b@        this.buf = m.buf != null ? m.buf.duplicate() : null;@b@        if (m.data != null) {@b@            this.data = new byte[this.size];@b@            System.arraycopy(m.data, 0, this.data, 0, m.size);@b@        }@b@    }@b@@b@    private Msg(Msg src, ByteArrayOutputStream out)@b@    {@b@        this(ByteBuffer.wrap(out.toByteArray()));@b@        this.type = src.type;@b@        this.flags = src.flags;@b@    }@b@@b@    public boolean isIdentity()@b@    {@b@        return (flags & IDENTITY) == IDENTITY;@b@    }@b@@b@    public boolean isDelimiter()@b@    {@b@        return type == Type.DELIMITER;@b@    }@b@@b@    public boolean check()@b@    {@b@        return true; // type >= TYPE_MIN && type <= TYPE_MAX;@b@    }@b@@b@    public int flags()@b@    {@b@        return flags;@b@    }@b@@b@    public boolean hasMore()@b@    {@b@        return (flags & MORE) > 0;@b@    }@b@@b@    public boolean isCommand()@b@    {@b@        return (flags & COMMAND) == COMMAND;@b@    }@b@@b@    public boolean isCredential()@b@    {@b@        return (flags & CREDENTIAL) == CREDENTIAL;@b@    }@b@@b@    public void setFlags(int flags)@b@    {@b@        this.flags |= flags;@b@    }@b@@b@    public void initDelimiter()@b@    {@b@        type = Type.DELIMITER;@b@        metadata = null;@b@        flags = 0;@b@    }@b@@b@    public byte[] data()@b@    {@b@        if (data == null) {@b@            data = new byte[buf.remaining()];@b@            buf.duplicate().get(data);@b@        }@b@        return data;@b@    }@b@@b@    public ByteBuffer buf()@b@    {@b@        return buf.duplicate();@b@    }@b@@b@    public int size()@b@    {@b@        return size;@b@    }@b@@b@    public void resetFlags(int f)@b@    {@b@        flags = flags & ~f;@b@    }@b@@b@    public void setFd(SocketChannel fileDesc)@b@    {@b@        this.fileDesc = fileDesc;@b@    }@b@@b@    // TODO V4 use the source channel@b@    public SocketChannel fd()@b@    {@b@        return fileDesc;@b@    }@b@@b@    public Metadata getMetadata()@b@    {@b@        return metadata;@b@    }@b@@b@    public Msg setMetadata(Metadata metadata)@b@    {@b@        this.metadata = metadata;@b@        return this;@b@    }@b@@b@    public void resetMetadata()@b@    {@b@        setMetadata(null);@b@    }@b@@b@    public byte get()@b@    {@b@        return get(readIndex++);@b@    }@b@@b@    public byte get(int index)@b@    {@b@        return buf.get(index);@b@    }@b@@b@    public Msg put(byte b)@b@    {@b@        return put(writeIndex++, b);@b@    }@b@@b@    public Msg put(int b)@b@    {@b@        return put(writeIndex++, (byte) b);@b@    }@b@@b@    protected Msg put(int index, byte b)@b@    {@b@        buf.put(index, b);@b@        return this;@b@    }@b@@b@    public Msg put(byte[] src)@b@    {@b@        return put(src, 0, src.length);@b@    }@b@@b@    public Msg put(byte[] src, int off, int len)@b@    {@b@        if (src == null) {@b@            return this;@b@        }@b@        ByteBuffer dup = buf.duplicate();@b@        dup.position(writeIndex);@b@        writeIndex += len;@b@        dup.put(src, off, len);@b@        return this;@b@    }@b@@b@    public Msg put(ByteBuffer src, int off, int len)@b@    {@b@        if (src == null) {@b@            return this;@b@        }@b@        int position = src.position();@b@        int limit = src.limit();@b@        src.limit(off + len).position(off);@b@        put(src);@b@        src.limit(limit).position(position);@b@        return this;@b@    }@b@@b@    public Msg put(ByteBuffer src)@b@    {@b@        ByteBuffer dup = buf.duplicate();@b@        dup.position(writeIndex);@b@        writeIndex += Math.min(dup.remaining(), src.remaining());@b@        dup.put(src);@b@        return this;@b@    }@b@@b@    public int getBytes(int index, byte[] dst, int off, int len)@b@    {@b@        int count = Math.min(len, size - index);@b@        if (data == null) {@b@            ByteBuffer dup = buf.duplicate();@b@            dup.position(index);@b@            dup.put(dst, off, count);@b@        }@b@        else {@b@            System.arraycopy(data, index, dst, off, count);@b@        }@b@@b@        return count;@b@    }@b@@b@    public int getBytes(int index, ByteBuffer bb, int len)@b@    {@b@        ByteBuffer dup = buf.duplicate();@b@        dup.position(index);@b@        int count = Math.min(bb.remaining(), dup.remaining());@b@        count = Math.min(count, len);@b@        bb.put(dup);@b@        return count;@b@    }@b@@b@    @Override@b@    public String toString()@b@    {@b@        return String.format("#zmq.Msg{type=%s, size=%s, flags=%s}", type, size, flags);@b@    }@b@@b@    protected final int getWriteIndex()@b@    {@b@        return writeIndex;@b@    }@b@@b@    protected final void setWriteIndex(int writeIndex)@b@    {@b@        this.writeIndex = writeIndex;@b@    }@b@@b@    public long getLong(int offset)@b@    {@b@        return Wire.getUInt64(buf, offset);@b@    }@b@@b@    public int getInt(int offset)@b@    {@b@        return Wire.getUInt32(buf, offset);@b@    }@b@@b@    public void transfer(ByteBuffer destination, int srcOffset, int srcLength)@b@    {@b@        int position = buf.position();@b@        int limit = buf.limit();@b@@b@        buf.limit(srcOffset + srcLength).position(srcOffset);@b@        destination.put(buf);@b@        buf.limit(limit).position(position);@b@    }@b@}


<<热门下载>>