1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| public abstract class HystrixConcurrencyStrategy { public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
final int dynamicCoreSize = corePoolSize.get(); final int dynamicMaximumSize = maximumPoolSize.get();
if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); } } public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get(); final int dynamicCoreSize = threadPoolProperties.coreSize().get(); final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get(); final int maxQueueSize = threadPoolProperties.maxQueueSize().get(); final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
if (allowMaximumSizeToDivergeFromCoreSize) { final int dynamicMaximumSize = threadPoolProperties.maximumSize().get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value"); return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory); } } private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) { if (!PlatformSpecific.isAppEngineStandardEnvironment()) { return new ThreadFactory() { private final AtomicInteger threadNumber = new AtomicInteger(0);
@Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet()); thread.setDaemon(true); return thread; }
}; } else { return PlatformSpecific.getAppEngineThreadFactory(); } }
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) { if (maxQueueSize <= 0) { return new SynchronousQueue<Runnable>(); } else { return new LinkedBlockingQueue<Runnable>(maxQueueSize); } }
public <T> Callable<T> wrapCallable(Callable<T> callable) { return callable; }
public <T> HystrixRequestVariable<T> getRequestVariable(final HystrixRequestVariableLifecycle<T> rv) { return new HystrixLifecycleForwardingRequestVariable<T>(rv); } }
|