首页

多线程运行不同场景处理工具类RunUtil - 并发&异步&延时&定时等分组处理

标签:多线程     发布时间:2024-09-05   
package org.noear.solon.core.util;@b@@b@import org.noear.solon.Solon;@b@import org.noear.solon.Utils;@b@@b@import java.util.concurrent.*;@b@import java.util.function.Supplier;@b@@b@/**@b@ * 运行工具@b@ *@b@ * @author noear@b@ * @since 1.12@b@ */@b@public class RunUtil {@b@    /**@b@     * 并行执行器(一般用于执行简单的定时任务)@b@     */@b@    private static ExecutorService parallelExecutor;@b@    /**@b@     * 异步执行器(一般用于执行 @Async 注解任务)@b@     */@b@    private static ExecutorService asyncExecutor;@b@    /**@b@     * 调度执行器(一般用于延时任务)@b@     */@b@    private static ScheduledExecutorService scheduledExecutor;@b@@b@    static {@b@        if (Solon.app() != null && Solon.cfg().isEnabledVirtualThreads()) {@b@            parallelExecutor = ThreadsUtil.newVirtualThreadPerTaskExecutor();@b@            asyncExecutor = ThreadsUtil.newVirtualThreadPerTaskExecutor();@b@        } else {@b@            parallelExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,@b@                    60L, TimeUnit.SECONDS,@b@                    new SynchronousQueue<Runnable>(),@b@                    new NamedThreadFactory("Solon-executor-"));@b@@b@            int asyncPoolSize = Runtime.getRuntime().availableProcessors() * 2;@b@            asyncExecutor = new ThreadPoolExecutor(asyncPoolSize, asyncPoolSize,@b@                    0L, TimeUnit.MILLISECONDS,@b@                    new LinkedBlockingQueue<Runnable>(),@b@                    new NamedThreadFactory("Solon-asyncExecutor-"));@b@        }@b@@b@        int scheduledPoolSize = Runtime.getRuntime().availableProcessors() * 2;@b@        scheduledExecutor = new ScheduledThreadPoolExecutor(scheduledPoolSize,@b@                new NamedThreadFactory("Solon-scheduledExecutor-"));@b@    }@b@@b@    public static void setScheduledExecutor(ScheduledExecutorService scheduledExecutor) {@b@        if (scheduledExecutor != null) {@b@            ScheduledExecutorService old = RunUtil.scheduledExecutor;@b@            RunUtil.scheduledExecutor = scheduledExecutor;@b@            old.shutdown();@b@        }@b@    }@b@@b@    /**@b@     * @deprecated 2.5@b@     */@b@    @Deprecated@b@    public static void setExecutor(ExecutorService executor) {@b@        setParallelExecutor(executor);@b@    }@b@@b@    public static void setParallelExecutor(ExecutorService parallelExecutor) {@b@        if (parallelExecutor != null) {@b@            ExecutorService old = RunUtil.parallelExecutor;@b@            RunUtil.parallelExecutor = parallelExecutor;@b@            old.shutdown();@b@        }@b@    }@b@@b@    public static void setAsyncExecutor(ExecutorService asyncExecutor) {@b@        if (asyncExecutor != null) {@b@            ExecutorService old = RunUtil.asyncExecutor;@b@            RunUtil.asyncExecutor = asyncExecutor;@b@            old.shutdown();@b@        }@b@    }@b@@b@    /**@b@     * 运行或异常@b@     */@b@    public static void runOrThrow(RunnableEx task) {@b@        try {@b@            task.run();@b@        } catch (Throwable e) {@b@            e = Utils.throwableUnwrap(e);@b@            if (e instanceof RuntimeException) {@b@                throw (RuntimeException) e;@b@            } else {@b@                throw new RuntimeException(e);@b@            }@b@        }@b@    }@b@@b@    /**@b@     * 运行并吃掉异常@b@     */@b@    public static void runAndTry(RunnableEx task) {@b@        try {@b@            task.run();@b@        } catch (Throwable e) {@b@            //略过@b@        }@b@    }@b@@b@    /**@b@     * 并行执行@b@     */@b@    public static Future<?> parallel(Runnable task) {@b@        return parallelExecutor.submit(task);@b@    }@b@@b@    /**@b@     * 并行执行@b@     */@b@    public static <T> Future<T> parallel(Callable<T> task) {@b@        return parallelExecutor.submit(task);@b@    }@b@@b@    /**@b@     * 异步执行@b@     */@b@    public static CompletableFuture<Void> async(Runnable task) {@b@        return CompletableFuture.runAsync(task, asyncExecutor);@b@    }@b@@b@    /**@b@     * 异步执行@b@     */@b@    public static <U> CompletableFuture<U> async(Supplier<U> task) {@b@        return CompletableFuture.supplyAsync(task, asyncExecutor);@b@    }@b@@b@    public static CompletableFuture<Void> asyncAndTry(RunnableEx task) {@b@        return CompletableFuture.runAsync(()->{@b@            runAndTry(task);@b@        }, asyncExecutor);@b@    }@b@@b@    /**@b@     * 延迟执行@b@     */@b@    public static ScheduledFuture<?> delay(Runnable task, long millis) {@b@        return scheduledExecutor.schedule(task, millis, TimeUnit.MILLISECONDS);@b@    }@b@@b@    /**@b@     * 延迟执行并重复@b@     */@b@    public static ScheduledFuture<?> delayAndRepeat(Runnable task, long millis) {@b@        return scheduledExecutor.scheduleWithFixedDelay(task, 1000, millis, TimeUnit.MILLISECONDS);@b@    }@b@@b@    /**@b@     * 定时任务@b@     * */@b@    public static ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long millisPeriod) {@b@        return scheduledExecutor.scheduleAtFixedRate(task, initialDelay, millisPeriod, TimeUnit.MILLISECONDS);@b@    }@b@@b@    /**@b@     * 定时任务@b@     * */@b@    public static ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long millisDelay) {@b@        return scheduledExecutor.scheduleWithFixedDelay(task, initialDelay, millisDelay, TimeUnit.MILLISECONDS);@b@    }@b@}


<<热门下载>>