1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
| // 任务优先级定义 private enum TaskPriority { HIGH(0), MEDIUM(5), LOW(10);
private final int value;
TaskPriority(int value) { this.value = value; }
public int getValue() { return value; } }
/** * manage-biz Powerjob 调度类 * 优化版本 - 任务削峰与队列管理 * * @author hht * @since 2024-09-10 */ @Component(value = "manageBizPowerjobDispatcher") @Slf4j @RequiredArgsConstructor public class ManageBizPowerjobDispatcher { private final IXxxScheduleService XxxScheduleService; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/** 平安通告powerjob任务id */ public static final String TASK_SAFETY_NOTICE_ID = "generateXxx";
public static final String SUCCESS = "success";
// 配置参数,可从配置文件注入 @Value("${powerjob.task.max-concurrent:10}") private int maxConcurrentTasks;
@Value("${powerjob.task.queue-capacity:500}") private int queueCapacity;
@Value("${powerjob.task.max-delay-minutes:5}") private int maxDelayMinutes;
@Value("${powerjob.task.worker-threads:20}") private int workerThreads;
// 延迟任务定义 @Data private static class DelayedTask implements Delayed { private final Runnable task; private final long executeTime; private final String taskId; private final String jobParams;
@Override public long getDelay(TimeUnit unit) { return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }
}
// 优先级任务定义 @Data private static class PriorityTask implements Comparable<PriorityTask> { private final Runnable task; private final TaskPriority priority; private final String taskId; private final String jobParams; private final long createTime;
@Override public int compareTo(PriorityTask other) { // 先按优先级排序,再按创建时间排序 int priorityCompare = Integer.compare(priority.getValue(), other.priority.getValue()); if (priorityCompare != 0) { return priorityCompare; } return Long.compare(createTime, other.createTime); } }
/** * 1.单独的线程,负责从队列中获取任务并分发 * 2.协调延迟队列和优先级队列 * 3.控制任务的并发执行数量 */ private class TaskDispatcher implements Runnable { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { // 先检查延迟队列 DelayedTask delayedTask = delayedTaskQueue.poll(); if (delayedTask != null) { // 将任务添加到优先级队列 submitToPriorityQueue(delayedTask.getTask(), TaskPriority.HIGH, delayedTask.getTaskId(), delayedTask.getJobParams()); continue; }
// 从优先级队列取任务执行 PriorityTask priorityTask = priorityTaskQueue.take(); if (priorityTask != null) { try { // 获取信号量,控制并发 taskSemaphore.acquire(); // 记录任务开始执行 activeTaskCount.incrementAndGet(); taskExecutionCount.computeIfAbsent(priorityTask.getTaskId(), k -> new AtomicInteger(0)).incrementAndGet(); // 提交到线程池执行 executorService.submit(() -> { try { log.info("执行任务: {}, 参数: {}", priorityTask.getTaskId(), priorityTask.getJobParams()); priorityTask.getTask().run(); } catch (Exception e) { log.error("任务执行异常: {}", priorityTask.getTaskId(), e); } finally { // 释放信号量 taskSemaphore.release(); // 更新计数器 activeTaskCount.decrementAndGet(); AtomicInteger counter = taskExecutionCount.get(priorityTask.getTaskId()); if (counter != null) { counter.decrementAndGet(); } } }); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } catch (Exception e) { log.error("任务分发器异常", e); } } } }
// 任务队列和执行器 private DelayQueue<DelayedTask> delayedTaskQueue; private PriorityBlockingQueue<PriorityTask> priorityTaskQueue; private ExecutorService executorService; private ExecutorService dispatcherService; private Semaphore taskSemaphore; private Random random;
// 任务执行状态监控 private AtomicLong totalTasksReceived = new AtomicLong(0); private AtomicLong totalTasksExecuted = new AtomicLong(0); private AtomicInteger activeTaskCount = new AtomicInteger(0); private Map<String, AtomicInteger> taskExecutionCount = new ConcurrentHashMap<>();
@PostConstruct public void init() { // 初始化任务队列 delayedTaskQueue = new DelayQueue<>(); priorityTaskQueue = new PriorityBlockingQueue<>(queueCapacity); // 初始化线程池 executorService = Executors.newFixedThreadPool(workerThreads, new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "task-worker-" + counter.getAndIncrement()); thread.setDaemon(true); return thread; } }); // 初始化分发器线程, dispatcherService = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r, "task-dispatcher"); thread.setDaemon(true); return thread; }); // 初始化信号量 taskSemaphore = new Semaphore(maxConcurrentTasks); // 初始化随机数生成器 random = new Random(); // 启动任务分发线程 dispatcherService.submit(new TaskDispatcher()); log.info("任务调度器初始化完成,最大并发任务数: {}, 队列容量: {}, 最大延迟分钟数: {}, 工作线程数: {}", maxConcurrentTasks, queueCapacity, maxDelayMinutes, workerThreads); }
@PreDestroy public void shutdown() { // 关闭调度器 if (dispatcherService != null) { dispatcherService.shutdownNow(); } // 关闭执行器 if (executorService != null) { executorService.shutdown(); try { if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { executorService.shutdownNow(); Thread.currentThread().interrupt(); } } log.info("任务调度器已关闭,总接收任务数: {}, 总执行任务数: {}", totalTasksReceived.get(), totalTasksExecuted.get()); }
/** * 提交任务到延迟队列 */ private void submitToDelayQueue(Runnable task, String taskId, String jobParams) { // 随机延迟时间,在0到maxDelayMinutes分钟之间 long delayMs = random.nextInt((int) TimeUnit.MINUTES.toMillis(maxDelayMinutes)); DelayedTask delayedTask = new DelayedTask(task, delayMs, taskId, jobParams); delayedTaskQueue.offer(delayedTask); totalTasksReceived.incrementAndGet(); log.info("任务已提交到延迟队列: {}, 延迟: {}ms", taskId, delayMs); }
/** * 提交任务到优先级队列 */ private void submitToPriorityQueue(Runnable task, TaskPriority priority, String taskId, String jobParams) { PriorityTask priorityTask = new PriorityTask(task, priority, taskId, jobParams); priorityTaskQueue.offer(priorityTask); log.info("任务已提交到优先级队列: {}, 优先级: {}", taskId, priority); }
/** * 获取任务类型对应的优先级 */ private TaskPriority getTaskPriority(String taskId) { switch (taskId) { case TASK_SAFETY_NOTICE_ID: case TASK_PUSH_SERVICE_STATUS_ID: return TaskPriority.HIGH; case TASK_TENANT_SERVICE_PHASE_ID: case TASK_WEEKLY_SUMMARY: return TaskPriority.MEDIUM; default: return TaskPriority.LOW; } }
/** * 创建可执行的任务 */ private Runnable createExecutableTask(String taskId, String jobParams, TaskContext taskContext) { switch (taskId) { case TASK_SAFETY_NOTICE_ID: return () -> generateXxxTask(taskContext); case TASK_OTHER: return () -> generateOtherTask(taskContext); ... default: throw new IllegalArgumentException("未知的任务类型: " + taskId); } }
/** * 通用任务提交方法 */ private ProcessResult submitTask(String taskId, TaskContext taskContext) { try { totalTasksReceived.incrementAndGet(); // 检查任务执行情况,如果已有大量相同类型任务,加入延迟队列 int activeCount = taskExecutionCount.computeIfAbsent(taskId, k -> new AtomicInteger(0)).get(); if (activeCount > maxConcurrentTasks / 2) { log.warn("当前任务类型 {} 正在执行的数量较多: {}, 将使用延迟队列分散负载", taskId, activeCount); submitToDelayQueue(createExecutableTask(taskId, taskContext.getJobParams(), taskContext), taskId, taskContext.getJobParams()); } else { // 根据任务类型分配优先级 TaskPriority priority = getTaskPriority(taskId); submitToPriorityQueue(createExecutableTask(taskId, taskContext.getJobParams(), taskContext), priority, taskId, taskContext.getJobParams()); } return new ProcessResult(true, formatResponse(SUCCESS, taskId)); } catch (Exception e) { log.error("提交任务异常: {}", taskId, e); return new ProcessResult(false, formatResponse(e.getMessage(), taskId)); } }
// ====== 以下是原始的PowerJob任务处理方法,改为使用队列系统 ======
/** * 告警任务 */ @PowerJobHandler(name = TASK_OTHER) public ProcessResult generateOtherTask(TaskContext taskContext) { log.info("==================== 调度触发(其它任务) ======================"); return submitTask(TASK_OTHER, taskContext); }
private ProcessResult generateOtherTask(TaskContext taskContext) { try { StrategyJobParams jobParams = new StrategyJobParams(); if (StringUtils.hasLength(taskContext.getJobParams())) { jobParams = OBJECT_MAPPER.readValue(taskContext.getJobParams(), StrategyJobParams.class); } strategyScheduleService.generateTask(jobParams); totalTasksExecuted.incrementAndGet(); return new ProcessResult(true, formatResponse(SUCCESS, TASK_OTHER)); } catch (Exception e) { log.error("调度执行【其它任务】异常", e); return new ProcessResult(false, formatResponse(e.getMessage(), TASK_OTHER)); } }
/** * 生成平安通告 */ @PowerJobHandler(name = TASK_SAFETY_NOTICE_ID) public ProcessResult generateXxx(TaskContext taskContext) { log.info("==================== 调度触发(平安通告) ======================"); return submitTask(TASK_SAFETY_NOTICE_ID, taskContext); }
private ProcessResult generateXxxTask(TaskContext taskContext) { try { // 获取调度任务的参数 StrategyJobParams jobParams = null; if (StringUtils.hasLength(taskContext.getJobParams())) { jobParams = OBJECT_MAPPER.readValue(taskContext.getJobParams(), StrategyJobParams.class); } // 生成平安通告 XxxScheduleService.autoGenerateBatch(jobParams); totalTasksExecuted.incrementAndGet(); return new ProcessResult(true, formatResponse("success", TASK_SAFETY_NOTICE_ID)); } catch (Exception e) { log.error("调度执行【平安通告】异常", e); return new ProcessResult(false, formatResponse(e.getMessage(), TASK_SAFETY_NOTICE_ID)); } } private String formatResponse(String info, String id) { return String.format("{\"taskId\": \"%s\", \"info\": \"%s\"}", id, info); } }
|