




责任链模式(Chain of Responsibility Pattern)是将链中的每一个节点看做成一个对象,每个节点处理的请求均不同,且内部自动维护一个下一个节点对象,当一个请求从链式的首端发出时,会沿着链路的路径一次传递给每一个节点对象,直至有对象处理这个请求为止。




  1. 多个对象可以处理同一请求,但具体由哪个对象处理则在运行时动态决定。
  2. 在不明确指定接收者的情况下,向多个对象中的一个提交一个请求。
  3. 可动态指定一组对象处理请求。










定义出一个处理请求的接口。如果需要,接口可以定义 出一个方法以设定和返回对下家的引用。这个角色通常由一个Java抽象类或者java接口实现。Handler类的聚合关系给出了具体子类对下家的引用,抽象方法handlerequest()规范了子类处理请求的操作。


public abstract class RequestHandler { private final RequestHandler next; /** * Request handler. */ public void handleRequest(Request req) { if (next != null) { next.handleRequest(req); } } protected void printHandling(Request req) { LOGGER.info("{} handling request "{}"", this, req); } @Override public abstract String toString(); }




public class Soldier extends RequestHandler { public Soldier(RequestHandler handler) { super(handler); } @Override public void handleRequest(Request req) { if (RequestType.COLLECT_TAX == req.getRequestType()) { printHandling(req); req.markHandled(); } else { super.handleRequest(req); } } @Override public String toString() { return "soldier"; } }

public class Officer extends RequestHandler { public Officer(RequestHandler handler) { super(handler); } @Override public void handleRequest(Request req) { if (RequestType.TORTURE_PRISONER == req.getRequestType()) { printHandling(req); req.markHandled(); } else { super.handleRequest(req); } } @Override public String toString() { return "officer"; } }

public class Commander extends RequestHandler { public Commander(RequestHandler handler) { super(handler); } @Override public void handleRequest(Request req) { if (RequestType.DEFEND_CASTLE == req.getRequestType()) { printHandling(req); req.markHandled(); } else { super.handleRequest(req); } } @Override public String toString() { return "commander"; } }


public class King { private RequestHandler chain; public King() { buildChain(); } private void buildChain() { chain = new Commander(new Officer(new Soldier(null))); } public void makeRequest(Request req) { chain.handleRequest(req); } }



有些同学可能已经发现。在最终构建chain时,使用new Commander(new Officer(new Soldier(null)))方式实现的看着很不爽,特别是,这里有三种处理者还比较容易编写,但是当有更多的处理者时,这个构建就会相当的复杂。



public abstract class RequestHandler<T> { protected RequestHandler next; /** * Request handler. */ public void handleRequest(Request req) { if (next != null) { next.handleRequest(req); } } protected void printHandling(Request req) { LOGGER.info("{} handling request "{}"", this, req); } @Override public abstract String toString(); public static class Builder<T> { private RequestHandler<T> head; private RequestHandler<T> tail; public RequestHandler<T> build() { return this.head; } public Builder<T> addHandler(RequestHandler<T> handler) { if (this.head == null) { this.head = this.tail = handler; } this.tail.next = handler; this.tail = handler; return this; } } }


private void buildChain() { RequestHandler.Builder builder = new RequestHandler.Builder(); builder.addHandler(new Commander()) .addHandler(new Officer()) .addHandler(new Soldier()); chain = builder.build(); }







public interface Filter { // 省去无关代码 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException; // 省去无关代码 }


package javax.servlet; import java.io.IOException; /** * A FilterChain is an object provided by the servlet container to the developer * giving a view into the invocation chain of a filtered request for a resource. Filters * use the FilterChain to invoke the next filter in the chain, or if the calling filter * is the last filter in the chain, to invoke the resource at the end of the chain. * * @see Filter * @since Servlet 2.3 **/ public interface FilterChain { /** * Causes the next filter in the chain to be invoked, or if the calling filter is the last filter * in the chain, causes the resource at the end of the chain to be invoked. * * @param request the request to pass along the chain. * @param response the response to pass along the chain. */ public void doFilter ( ServletRequest request, ServletResponse response ) throws IOException, ServletException; }


public void doFilter(ServletRequest request, ServletResponse response) throws IOException, ServletException { final Request baseRequest=Request.getBaseRequest(request); // pass to next filter if (_filterHolder!=null) { if (LOG.isDebugEnabled()) LOG.debug("call filter {}", _filterHolder); Filter filter= _filterHolder.getFilter(); //if the request already does not support async, then the setting for the filter //is irrelevant. However if the request supports async but this filter does not //temporarily turn it off for the execution of the filter if (baseRequest.isAsyncSupported() && !_filterHolder.isAsyncSupported()) { try { baseRequest.setAsyncSupported(false,_filterHolder.toString()); filter.doFilter(request, response, _next); } finally { baseRequest.setAsyncSupported(true,null); } } else filter.doFilter(request, response, _next); return; } // Call servlet HttpServletRequest srequest = (HttpServletRequest)request; if (_servletHolder == null) notFound(baseRequest, srequest, (HttpServletResponse)response); else { if (LOG.isDebugEnabled()) LOG.debug("call servlet " _servletHolder); _servletHolder.handle(baseRequest,request, response); } }




private static class VirtualFilterChain implements FilterChain { private final FilterChain originalChain; private final List<? extends Filter> additionalFilters; private int currentPosition = 0; public VirtualFilterChain(FilterChain chain, List<? extends Filter> additionalFilters) { this.originalChain = chain; this.additionalFilters = additionalFilters; } @Override public void doFilter(final ServletRequest request, final ServletResponse response) throws IOException, ServletException { if (this.currentPosition == this.additionalFilters.size()) { this.originalChain.doFilter(request, response); } else { this.currentPosition ; Filter nextFilter = this.additionalFilters.get(this.currentPosition - 1); nextFilter.doFilter(request, response, this); } } }


@Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { new VirtualFilterChain(chain, this.filters).doFilter(request, response); }




* <pre> * I/O Request * via {@link Channel} or * {@link ChannelHandlerContext} * | * --------------------------------------------------- --------------- * | ChannelPipeline | | * | |/ | * | --------------------- ----------- ---------- | * | | Inbound Handler N | | Outbound Handler 1 | | * | ---------- ---------- ----------- ---------- | * | /|\ | | * | | |/ | * | ---------- ---------- ----------- ---------- | * | | Inbound Handler N-1 | | Outbound Handler 2 | | * | ---------- ---------- ----------- ---------- | * | /|\ . | * | . . | * | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| * | [ method call] [method call] | * | . . | * | . |/ | * | ---------- ---------- ----------- ---------- | * | | Inbound Handler 2 | | Outbound Handler M-1 | | * | ---------- ---------- ----------- ---------- | * | /|\ | | * | | |/ | * | ---------- ---------- ----------- ---------- | * | | Inbound Handler 1 | | Outbound Handler M | | * | ---------- ---------- ----------- ---------- | * | /|\ | | * --------------- ----------------------------------- --------------- * | |/ * --------------- ----------------------------------- --------------- * | | | | * | [ Socket.read() ] [ Socket.write() ] | * | | * | Netty Internal I/O Threads (Transport Implementation) | * ------------------------------------------------------------------- * </pre>


public interface ChannelHandler { /** * Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events. */ void handlerAdded(ChannelHandlerContext ctx) throws Exception; /** * Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events * anymore. */ void handlerRemoved(ChannelHandlerContext ctx) throws Exception; /** * Gets called if a {@link Throwable} was thrown. * * @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and * implement the method there. */ @Deprecated void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception; /** * Indicates that the same instance of the annotated {@link ChannelHandler} * can be added to one or more {@link ChannelPipeline}s multiple times * without a race condition. * <p> * If this annotation is not specified, you have to create a new handler * instance every time you add it to a pipeline because it has unshared * state such as member variables. * <p> * This annotation is provided for documentation purpose, just like * <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>. */ @Inherited @Documented @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @interface Sharable { // no value } }


public class DefaultChannelPipeline implements ChannelPipeline { static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); private static final String HEAD_NAME = generateName0(HeadContext.class); private static final String TAIL_NAME = generateName0(TailContext.class); private static final FastThreadLocal<Map<Class<?>, String>> nameCaches = new FastThreadLocal<Map<Class<?>, String>>() { @Override protected Map<Class<?>, String> initialValue() { return new WeakHashMap<Class<?>, String>(); } }; private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR = AtomicReferenceFieldUpdater.newUpdater( DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle"); final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; private final Channel channel; private final ChannelFuture succeededFuture; private final VoidChannelPromise voidPromise; private final boolean touch = ResourceLeakDetector.isEnabled(); private Map<EventExecutorGroup, EventExecutor> childExecutors; private volatile MessageSizeEstimator.Handle estimatorHandle; private boolean firstRegistration = true; /** * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}. * * We only keep the head because it is expected that the list is used infrequently and its size is small. * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management * complexity. */ private PendingHandlerCallback pendingHandlerCallbackHead; /** * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never * change. */ private boolean registered; protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }



public class TorrentProcessorFactory implements ProcessorFactory { private Map<Class<?>, Processor<?>> processors() { Map<Class<?>, Processor<?>> processors = new HashMap<>(); processors.put(TorrentContext.class, createTorrentProcessor()); processors.put(MagnetContext.class, createMagnetProcessor()); return processors; } protected ChainProcessor<TorrentContext> createTorrentProcessor() { ProcessingStage<TorrentContext> stage5 = new SeedStage<>(null, torrentRegistry); ProcessingStage<TorrentContext> stage4 = new ProcessTorrentStage<>(stage5, torrentRegistry, trackerService, eventSink); ProcessingStage<TorrentContext> stage3 = new ChooseFilesStage<>(stage4, torrentRegistry, assignmentFactory, config); ProcessingStage<TorrentContext> stage2 = new InitializeTorrentProcessingStage<>(stage3, connectionPool, torrentRegistry, dataWorker, bufferedPieceRegistry, manualControlService, eventSink, config); ProcessingStage<TorrentContext> stage1 = new CreateSessionStage<>(stage2, torrentRegistry, eventSource, connectionSource, messageDispatcher, messagingAgents, config); ProcessingStage<TorrentContext> stage0 = new FetchTorrentStage(stage1, eventSink); return new ChainProcessor<>(stage0, executor, new TorrentContextFinalizer<>(torrentRegistry, eventSink)); } protected ChainProcessor<MagnetContext> createMagnetProcessor() { ProcessingStage<MagnetContext> stage5 = new SeedStage<>(null, torrentRegistry); ProcessingStage<MagnetContext> stage4 = new ProcessMagnetTorrentStage(stage5, torrentRegistry, trackerService, eventSink); ProcessingStage<MagnetContext> stage3 = new ChooseFilesStage<>(stage4, torrentRegistry, assignmentFactory, config); ProcessingStage<MagnetContext> stage2 = new InitializeMagnetTorrentProcessingStage(stage3, connectionPool, torrentRegistry, dataWorker, bufferedPieceRegistry, manualControlService, eventSink, config); ProcessingStage<MagnetContext> stage1 = new FetchMetadataStage(stage2, metadataService, torrentRegistry, peerRegistry, eventSink, eventSource, config); ProcessingStage<MagnetContext> stage0 = new CreateSessionStage<>(stage1, torrentRegistry, eventSource, connectionSource, messageDispatcher, messagingAgents, config); return new ChainProcessor<>(stage0, executor, new TorrentContextFinalizer<>(torrentRegistry, eventSink)); }


package bt.processor; import bt.processor.listener.ProcessingEvent; /** * @param <C> Type of processing context * @since 1.3 */ public interface ProcessingStage<C extends ProcessingContext> { /** * @return Type of event, that should be triggered after this stage has completed. * @since 1.5 */ ProcessingEvent after(); /** * @param context Processing context * @return Next stage * @since 1.3 */ ProcessingStage<C> execute(C context); }


@Override public CompletableFuture<?> process(C context, ListenerSource<C> listenerSource) { Runnable r = () -> executeStage(chainHead, context, listenerSource); return CompletableFuture.runAsync(r, executor); } private void executeStage(ProcessingStage<C> chainHead, C context, ListenerSource<C> listenerSource) { ProcessingEvent stageFinished = chainHead.after(); Collection<BiFunction<C, ProcessingStage<C>, ProcessingStage<C>>> listeners; if (stageFinished != null) { listeners = listenerSource.getListeners(stageFinished); } else { listeners = Collections.emptyList(); } ProcessingStage<C> next = doExecute(chainHead, context, listeners); if (next != null) { executeStage(next, context, listenerSource); } }



  1. 将请求与处理解耦。
  2. 请求处理者(节点对象)只需要关注自己感兴趣的请求进行处理即可,对于不感兴趣的请求,直接转发给下一级的next节点对象即可。
  3. 具备链式传递处理请求功能,请求发送者无需知晓链路结构,只需等待请求在最终链路处理完成后的处理结果。
  4. 链路结构灵活,可以通过改变链路结构动态地新增或删除责任的具体节点。
  5. 易于扩展新的请求处理能力,符合设计模式的开闭原则。
  1. 责任链太长或者处理时间过长,会影响整体性能。
  2. 如果节点对象存在循环引用时,会造成死循环,导致系统崩溃。


