Hystrix超時實現機制

HystrixCommand在執行的本篇假設你們都具有了如何使用hystrix的能力,若是還沒接觸過的朋友可參閱
Hystrix介紹
常規使用姿式java

HystrixCommand在執行的過程當中如何探測超時,本篇主要對此進行介紹說明。segmentfault

1.主入口:executeCommandAndObserve

#com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        ···省略部分代碼···
        Observable<R> execution;

        //判斷是否開啓超時監測
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }
executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd));

能夠簡單的認爲lift 裏面的對前面的Observable包含,相似裝飾者,後面的parent就是指上層的Observable。其中 HystrixObservableTimeoutOperator 就是關鍵的部分。ide

2.關鍵點: HystrixObservableTimeoutOperator

先看下HystrixObservableTimeoutOperator.call(),TimerListener的實現ui

TimerListener listener = new TimerListener() {

                @Override
                public void tick() {
                   
                    if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                        // 標記事件,能夠認爲是開的hook,這裏暫忽略
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

                        //取消原Obserable的訂閱
                        s.unsubscribe();

                        final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

                            @Override
                            public void run() {
                                child.onError(new HystrixTimeoutException());
                            }
                        });
                        timeoutRunnable.run();
                    }
                }

                //獲取配置的超時時間配置
                @Override
                public int getIntervalTimeInMilliseconds() {
                    return originalCommand.properties.executionTimeoutInMilliseconds().get();
                }
            };

這段代碼的意思就是,給當前command的超時狀態置爲超時,若是設置成功就拋出HystrixTimeoutException異常,緊接着被command的 doOnErron接收走 fallback邏輯this

fallback
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

        .................................

        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    //此處catch到超時異常
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };

        .................................

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }

同時s.unsubscribe()通知正在執行的線程,終止任務。如何終止呢?spa

executeCommandWithSpecifiedIsolation.subscribeOn()

subscribeOne的參數就是HystrixContextScheduler, Rxjava裏 scheduler具體幹活的是 worker,咱們先看下Hystrix自定義scheduler的結構示意圖

那麼咱們直奔主題,直接看 ThreadPoolWorker.net

//ThreadPoolWorker.schedule
@Override
public Subscription schedule(final Action0 action) {
    if (subscription.isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }

    ScheduledAction sa = new ScheduledAction(action);

    subscription.add(sa);
    sa.addParent(subscription);

    ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
    FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
    sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

    return sa;
}

1.開始的時候判斷observable是否被訂閱
2.被訂閱後,將任務 submit到線程池
3.FutureCompleterWithConfigurableInterrupt scheduler在執行的時候,增長了observable的中斷探測線程

private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
    private final FutureTask<?> f;
    private final Func0<Boolean> shouldInterruptThread;
    private final ThreadPoolExecutor executor;

    private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {
        this.f = f;
        this.shouldInterruptThread = shouldInterruptThread;
        this.executor = executor;
    }

    @Override
    public void unsubscribe() {
        executor.remove(f);
        if (shouldInterruptThread.call()) {
            f.cancel(true);
        } else {
            f.cancel(false);
        }
    }

    .....省略代碼.......
}

當observable 取消訂閱時,就會把當前任務移除,並中斷任務設計

到這裏只是講說了超時後的處理,如何認定執行超時呢?3d

3.匠心之巧

這裏有個很巧妙的設計,再探HystrixObservableTimeoutOperator

final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

#com.netflix.hystrix.util.HystrixTimer#addTimerListener
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
        startThreadIfNeeded();
        // add the listener

        Runnable r = new Runnable() {

            @Override
            public void run() {
                try {
                    listener.tick();
                } catch (Exception e) {
                    logger.error("Failed while ticking TimerListener", e);
                }
            }
        };

        ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
        return new TimerReference(listener, f);
    }

利用了ScheduledThreadPoolExecutor,延遲執行,延遲時間就是咱們設定的超時時間,咱們再看下

#HystrixObservableTimeoutOperator
Subscriber<R> parent = new Subscriber<R>() {

                @Override
                public void onCompleted() {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onCompleted();
                    }
                }

                @Override
                public void onError(Throwable e) {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onError(e);
                    }
                }

                .....  .....  .....  .....  .....  .....  .....  .....  .....

                private boolean isNotTimedOut() {
                    // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
                    return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                            originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
                }

            };

這裏parent就是指上層的obserable,這裏能夠抽象的認爲是咱們的HystrixCommand執行線程, 當command執行線程執行完成的時候或異常的時候,會執行 tl.clear(), 也就是Future.cancel()會中斷 TimerListener 的ScheduledFuture 線程,迫使超時機制失效。

// tl.clear()
private static class TimerReference extends SoftReference<TimerListener> {
        private final ScheduledFuture<?> f;
        ....        ....        ....        ....        ....
        @Override
        public void clear() {
            super.clear();
            // stop this ScheduledFuture from any further executions
            f.cancel(false);
        }
    }

4.迴歸文字

HystrixCommand裏有個 TimedOutStatus 超時狀態
TimedOutStatus
如今能夠認爲有兩個線程,一個是hystrixCommand任務執行線程,一個是等着給hystrixCommand斷定超時的線程,如今兩個線程看誰能先把hystrixCommand的狀態置換,只要任何一個線程對hystrixCommand打上標就意味着超時斷定結束。