一、前言
关于apache的avro-ipc源码包(1.7.5)中异步回调org.apache.avro.ipc.Callback、org.apache.avro.ipc.CallFuture、org.apache.avro.ipc.TransceiverCallback接口及其实现类的源码示例分析说明,从中我们可以了解一个回调处理设计实现案例,便于更好的了解callback的应用场景和实现设计等。
二、源码说明
1.Callback接口
package org.apache.avro.ipc;@b@@b@public abstract interface Callback<T>@b@{@b@ public abstract void handleResult(T paramT);@b@@b@ public abstract void handleError(Throwable paramThrowable);@b@}
2.原始实现类org.apache.avro.ipc.TransceiverCallback
protected class TransceiverCallback<T> implements Callback<List<ByteBuffer>> {@b@ private final Request request;@b@ private final Callback<T> callback; @b@ @b@ public TransceiverCallback(Request request, Callback<T> callback) {@b@ this.request = request;@b@ this.callback = callback;@b@ }@b@ @b@ @Override@b@ @SuppressWarnings("unchecked")@b@ public void handleResult(List<ByteBuffer> responseBytes) {@b@ ByteBufferInputStream bbi = new ByteBufferInputStream(responseBytes);@b@ BinaryDecoder in = DecoderFactory.get().binaryDecoder(bbi, null);@b@ try {@b@ if (!readHandshake(in)) {@b@ // Resend the handshake and return@b@ Request handshake = new Request(request);@b@ getTransceiver().transceive@b@ (handshake.getBytes(),@b@ new TransceiverCallback<T>(handshake, callback));@b@ return;@b@ }@b@ } catch (Exception e) {@b@ LOG.error("Error handling transceiver callback: " + e, e);@b@ }@b@ @b@ // Read response; invoke callback@b@ Response response = new Response(request, in);@b@ Object responseObject;@b@ try {@b@ try {@b@ responseObject = response.getResponse();@b@ } catch (Exception e) {@b@ if (callback != null) {@b@ callback.handleError(e);@b@ }@b@ return;@b@ }@b@ if (callback != null) {@b@ callback.handleResult((T)responseObject);@b@ }@b@ } catch (Throwable t) {@b@ LOG.error("Error in callback handler: " + t, t);@b@ }@b@ }@b@ @b@ @Override@b@ public void handleError(Throwable error) {@b@ callback.handleError(error);@b@ }@b@ }
3.CallFuture代理引用实现类
package org.apache.avro.ipc;@b@@b@import java.util.concurrent.CountDownLatch;@b@import java.util.concurrent.ExecutionException;@b@import java.util.concurrent.Future;@b@import java.util.concurrent.TimeUnit;@b@import java.util.concurrent.TimeoutException;@b@@b@public class CallFuture<T>@b@ implements Future<T>, Callback<T>@b@{@b@ private final CountDownLatch latch;@b@ private final Callback<T> chainedCallback;@b@ private T result;@b@ private Throwable error;@b@@b@ public CallFuture()@b@ {@b@ this(null);@b@ }@b@@b@ public CallFuture(Callback<T> chainedCallback)@b@ {@b@ this.latch = new CountDownLatch(1);@b@@b@ this.result = null;@b@ this.error = null;@b@@b@ this.chainedCallback = chainedCallback;@b@ }@b@@b@ public void handleResult(T result)@b@ {@b@ this.result = result;@b@ this.latch.countDown();@b@ if (this.chainedCallback != null)@b@ this.chainedCallback.handleResult(result);@b@ }@b@@b@ public void handleError(Throwable error)@b@ {@b@ this.error = error;@b@ this.latch.countDown();@b@ if (this.chainedCallback != null)@b@ this.chainedCallback.handleError(error);@b@ }@b@@b@ public T getResult()@b@ {@b@ return this.result;@b@ }@b@@b@ public Throwable getError()@b@ {@b@ return this.error;@b@ }@b@@b@ public boolean cancel(boolean mayInterruptIfRunning)@b@ {@b@ return false;@b@ }@b@@b@ public boolean isCancelled()@b@ {@b@ return false;@b@ }@b@@b@ public T get()@b@ throws InterruptedException, ExecutionException@b@ {@b@ this.latch.await();@b@ if (this.error != null)@b@ throw new ExecutionException(this.error);@b@@b@ return this.result;@b@ }@b@@b@ public T get(long timeout, TimeUnit unit)@b@ throws InterruptedException, ExecutionException, TimeoutException@b@ {@b@ if (this.latch.await(timeout, unit)) {@b@ if (this.error != null)@b@ throw new ExecutionException(this.error);@b@@b@ return this.result;@b@ }@b@ throw new TimeoutException();@b@ }@b@@b@ public void await()@b@ throws InterruptedException@b@ {@b@ this.latch.await();@b@ }@b@@b@ public void await(long timeout, TimeUnit unit)@b@ throws InterruptedException, TimeoutException@b@ {@b@ if (!(this.latch.await(timeout, unit)))@b@ throw new TimeoutException();@b@ }@b@@b@ public boolean isDone()@b@ {@b@ return (this.latch.getCount() <= 0L);@b@ }@b@}