From 967dcfffe3a41aad9cae8dbdfea4554618dbb6e8 Mon Sep 17 00:00:00 2001 From: Andrew Lalis Date: Fri, 16 Apr 2021 23:19:37 +0200 Subject: [PATCH] Added more implementations of schedules, and a task wrapper object. --- .../simply_scheduled/BasicScheduler.java | 60 +++++++++++++------ .../simply_scheduled/Scheduler.java | 25 +++++++- .../schedule/DailySchedule.java | 2 +- .../schedule/HourlySchedule.java | 2 +- .../schedule/MinutelySchedule.java | 31 ++++++++++ .../schedule/RepeatingSchedule.java | 41 +++++++++++++ .../simply_scheduled/schedule/Schedule.java | 22 ++++++- .../simply_scheduled/schedule/Task.java | 39 ++++++++++++ .../simply_scheduled/SchedulerTest.java | 49 ++++++++++++--- 9 files changed, 240 insertions(+), 31 deletions(-) create mode 100644 src/main/java/nl/andrewlalis/simply_scheduled/schedule/MinutelySchedule.java create mode 100644 src/main/java/nl/andrewlalis/simply_scheduled/schedule/RepeatingSchedule.java create mode 100644 src/main/java/nl/andrewlalis/simply_scheduled/schedule/Task.java diff --git a/src/main/java/nl/andrewlalis/simply_scheduled/BasicScheduler.java b/src/main/java/nl/andrewlalis/simply_scheduled/BasicScheduler.java index 8cae0ab..96ab955 100644 --- a/src/main/java/nl/andrewlalis/simply_scheduled/BasicScheduler.java +++ b/src/main/java/nl/andrewlalis/simply_scheduled/BasicScheduler.java @@ -1,20 +1,27 @@ package nl.andrewlalis.simply_scheduled; -import nl.andrewlalis.simply_scheduled.schedule.Schedule; +import nl.andrewlalis.simply_scheduled.schedule.Task; import java.time.Clock; import java.time.Instant; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.PriorityBlockingQueue; -public class BasicScheduler implements Scheduler { - private ScheduledExecutorService executorService; +/** + * A simple thread-based scheduler that sleeps until the next task, runs it + * using a work-stealing executor thread pool, and continues with the next task. + */ +public class BasicScheduler extends Thread implements Scheduler { private final Clock clock; + private final PriorityBlockingQueue tasks; + private final ExecutorService executorService; + private boolean running = false; public BasicScheduler(Clock clock) { this.clock = clock; - this.executorService = new ScheduledThreadPoolExecutor(1); + this.tasks = new PriorityBlockingQueue<>(); + this.executorService = Executors.newWorkStealingPool(); } public BasicScheduler() { @@ -22,25 +29,40 @@ public class BasicScheduler implements Scheduler { } @Override - public void addTask(Runnable task, Schedule schedule) { - Instant nextExecution = schedule.getNextExecutionTime(this.clock.instant()); - long diff = nextExecution.toEpochMilli() - System.currentTimeMillis(); - if (diff < 1) return; // Exit immediately, if the next scheduled execution is in the past. - this.executorService.schedule(task, diff, TimeUnit.MILLISECONDS); + public void addTask(Task task) { + this.tasks.add(task); } @Override - public void start() { + public void run() { + this.running = true; + while (this.running) { + try { + Task nextTask = this.tasks.take(); + Instant now = this.clock.instant(); + long waitTime = nextTask.getSchedule().computeNextExecutionTime(now).toEpochMilli() - now.toEpochMilli(); + if (waitTime > 0) { + Thread.sleep(waitTime); + } + this.executorService.execute(nextTask.getRunnable()); + this.tasks.put(nextTask); // Put the task back in the queue. + } catch (InterruptedException e) { + this.setRunning(false); + } + } } @Override public void stop(boolean force) { - if (this.executorService != null) { - if (force) { - this.executorService.shutdownNow(); - } else { - this.executorService.shutdown(); - } + this.setRunning(false); + if (force) { + this.executorService.shutdownNow(); + } else { + this.executorService.shutdown(); } } + + private synchronized void setRunning(boolean running) { + this.running = running; + } } diff --git a/src/main/java/nl/andrewlalis/simply_scheduled/Scheduler.java b/src/main/java/nl/andrewlalis/simply_scheduled/Scheduler.java index b7f7835..c948874 100644 --- a/src/main/java/nl/andrewlalis/simply_scheduled/Scheduler.java +++ b/src/main/java/nl/andrewlalis/simply_scheduled/Scheduler.java @@ -1,9 +1,30 @@ package nl.andrewlalis.simply_scheduled; -import nl.andrewlalis.simply_scheduled.schedule.Schedule; +import nl.andrewlalis.simply_scheduled.schedule.Task; +/** + * A scheduler is responsible for storing and executing tasks as defined by each + * task's schedule. + */ public interface Scheduler { - void addTask(Runnable task, Schedule schedule); + /** + * Adds a task to this scheduler, so that when the scheduler starts, the + * task is executed in accordance with its defined schedule. + * @param task The task to add. + */ + void addTask(Task task); + + /** + * Starts the scheduler. A scheduler should only execute tasks once it has + * started, and it is up to the implementation to determine whether new + * tasks may be added while the scheduler is running. + */ void start(); + + /** + * Stops the scheduler. + * @param force Whether to forcibly stop the scheduler. When set to true, + * any currently-executing tasks are immediately shutdown. + */ void stop(boolean force); } diff --git a/src/main/java/nl/andrewlalis/simply_scheduled/schedule/DailySchedule.java b/src/main/java/nl/andrewlalis/simply_scheduled/schedule/DailySchedule.java index aa8748d..7213455 100644 --- a/src/main/java/nl/andrewlalis/simply_scheduled/schedule/DailySchedule.java +++ b/src/main/java/nl/andrewlalis/simply_scheduled/schedule/DailySchedule.java @@ -16,7 +16,7 @@ public class DailySchedule implements Schedule { } @Override - public Instant getNextExecutionTime(Instant referenceInstant) { + public Instant computeNextExecutionTime(Instant referenceInstant) { ZonedDateTime currentTime = referenceInstant.atZone(this.zoneId); LocalDate currentDay = LocalDate.from(referenceInstant); ZonedDateTime sameDayExecution = currentDay.atTime(this.time).atZone(this.zoneId); diff --git a/src/main/java/nl/andrewlalis/simply_scheduled/schedule/HourlySchedule.java b/src/main/java/nl/andrewlalis/simply_scheduled/schedule/HourlySchedule.java index 1ccca9d..c1a74d3 100644 --- a/src/main/java/nl/andrewlalis/simply_scheduled/schedule/HourlySchedule.java +++ b/src/main/java/nl/andrewlalis/simply_scheduled/schedule/HourlySchedule.java @@ -22,7 +22,7 @@ public class HourlySchedule implements Schedule { } @Override - public Instant getNextExecutionTime(Instant referenceInstant) { + public Instant computeNextExecutionTime(Instant referenceInstant) { ZonedDateTime currentTime = referenceInstant.atZone(this.zoneId); int currentMinute = currentTime.getMinute(); if (currentMinute < this.minute) { diff --git a/src/main/java/nl/andrewlalis/simply_scheduled/schedule/MinutelySchedule.java b/src/main/java/nl/andrewlalis/simply_scheduled/schedule/MinutelySchedule.java new file mode 100644 index 0000000..c029b5a --- /dev/null +++ b/src/main/java/nl/andrewlalis/simply_scheduled/schedule/MinutelySchedule.java @@ -0,0 +1,31 @@ +package nl.andrewlalis.simply_scheduled.schedule; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; + +public class MinutelySchedule implements Schedule { + + private final int second; + private final ZoneId zoneId; + + public MinutelySchedule(int second, ZoneId zoneId) { + this.second = second; + this.zoneId = zoneId; + } + + public MinutelySchedule(int second) { + this(second, ZoneId.systemDefault()); + } + + @Override + public Instant computeNextExecutionTime(Instant referenceInstant) { + ZonedDateTime currentTime = referenceInstant.atZone(this.zoneId); + int currentSecond = currentTime.getSecond(); + if (currentSecond >= this.second) { + return currentTime.plusMinutes(1).withSecond(this.second).truncatedTo(ChronoUnit.SECONDS).toInstant(); + } + return currentTime.withSecond(this.second).truncatedTo(ChronoUnit.SECONDS).toInstant(); + } +} diff --git a/src/main/java/nl/andrewlalis/simply_scheduled/schedule/RepeatingSchedule.java b/src/main/java/nl/andrewlalis/simply_scheduled/schedule/RepeatingSchedule.java new file mode 100644 index 0000000..ca721f6 --- /dev/null +++ b/src/main/java/nl/andrewlalis/simply_scheduled/schedule/RepeatingSchedule.java @@ -0,0 +1,41 @@ +package nl.andrewlalis.simply_scheduled.schedule; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +/** + * A schedule which repeatedly executes a task at a regular interval specified + * as some integer multiple of a unit of time, such as 5 seconds, or 2 minutes. + */ +public class RepeatingSchedule implements Schedule { + private final ChronoUnit unit; + private final long multiple; + private Instant lastExecution; + + /** + * Constructs a new repeating schedule. + * @param unit The unit of time that the interval consists of. + * @param multiple The + */ + public RepeatingSchedule(ChronoUnit unit, long multiple) { + this.unit = unit; + this.multiple = multiple; + this.lastExecution = null; + } + + /** + * Computes the next execution time for a task. This keeps track of the last + * execution time, so that tasks repeat at an exact interval. + * @param referenceInstant The instant representing the current time. + * @return The next instant to execute the task at. + */ + @Override + public Instant computeNextExecutionTime(Instant referenceInstant) { + if (this.lastExecution == null) { + this.lastExecution = referenceInstant; + } + Instant nextExecution = this.lastExecution.plus(multiple, unit); + this.lastExecution = nextExecution; + return nextExecution; + } +} diff --git a/src/main/java/nl/andrewlalis/simply_scheduled/schedule/Schedule.java b/src/main/java/nl/andrewlalis/simply_scheduled/schedule/Schedule.java index 0a97cad..ca50371 100644 --- a/src/main/java/nl/andrewlalis/simply_scheduled/schedule/Schedule.java +++ b/src/main/java/nl/andrewlalis/simply_scheduled/schedule/Schedule.java @@ -1,7 +1,27 @@ package nl.andrewlalis.simply_scheduled.schedule; +import nl.andrewlalis.simply_scheduled.Scheduler; + import java.time.Instant; +/** + * A schedule is used by a {@link Scheduler} to determine how long to wait for + * the next time that a task should be executed. + */ public interface Schedule { - Instant getNextExecutionTime(Instant referenceInstant); + /** + * Given some instant referring to the current time, this method should + * produce an instant sometime in the future at which the next execution of + * a task should happen. + * + *

+ * Note that certain implementations may introduce side-effects + * when this method is called more than once. + *

+ * + * @param referenceInstant The instant representing the current time. + * @return An instant in the future indicating the next time at which a task + * using this schedule should be executed. + */ + Instant computeNextExecutionTime(Instant referenceInstant); } diff --git a/src/main/java/nl/andrewlalis/simply_scheduled/schedule/Task.java b/src/main/java/nl/andrewlalis/simply_scheduled/schedule/Task.java new file mode 100644 index 0000000..79ae598 --- /dev/null +++ b/src/main/java/nl/andrewlalis/simply_scheduled/schedule/Task.java @@ -0,0 +1,39 @@ +package nl.andrewlalis.simply_scheduled.schedule; + +import java.time.Clock; +import java.time.Instant; + +/** + * A task consists of a runnable job, and a schedule that defines precisely when + * the job will be run. It implements comparable so that schedulers may use a + * priority queue to insert new tasks. + */ +public class Task implements Comparable{ + private final Clock clock; + private final Runnable runnable; + private final Schedule schedule; + + public Task(Clock clock, Runnable runnable, Schedule schedule) { + this.clock = clock; + this.runnable = runnable; + this.schedule = schedule; + } + + public Task(Runnable runnable, Schedule schedule) { + this(Clock.systemDefaultZone(), runnable, schedule); + } + + public Runnable getRunnable() { + return runnable; + } + + public Schedule getSchedule() { + return schedule; + } + + @Override + public int compareTo(Task o) { + Instant now = clock.instant(); + return this.schedule.computeNextExecutionTime(now).compareTo(o.getSchedule().computeNextExecutionTime(now)); + } +} diff --git a/src/test/java/nl/andrewlalis/simply_scheduled/SchedulerTest.java b/src/test/java/nl/andrewlalis/simply_scheduled/SchedulerTest.java index 8f4d414..867b54e 100644 --- a/src/test/java/nl/andrewlalis/simply_scheduled/SchedulerTest.java +++ b/src/test/java/nl/andrewlalis/simply_scheduled/SchedulerTest.java @@ -1,33 +1,68 @@ package nl.andrewlalis.simply_scheduled; -import nl.andrewlalis.simply_scheduled.schedule.HourlySchedule; +import nl.andrewlalis.simply_scheduled.schedule.MinutelySchedule; +import nl.andrewlalis.simply_scheduled.schedule.RepeatingSchedule; +import nl.andrewlalis.simply_scheduled.schedule.Schedule; +import nl.andrewlalis.simply_scheduled.schedule.Task; import org.junit.jupiter.api.Test; import java.time.Clock; import java.time.Instant; -import java.time.LocalTime; import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; public class SchedulerTest { @Test void testSchedule() { - Clock clock = Clock.fixed(Instant.now(), ZoneOffset.UTC); + Clock clock = Clock.fixed(Instant.now().truncatedTo(ChronoUnit.MINUTES), ZoneOffset.UTC); Scheduler scheduler = new BasicScheduler(clock); - LocalTime time = LocalTime.now(); - int secondsLeft = 60 - time.getSecond() + 1; + int secondsLeft = 4; AtomicBoolean flag = new AtomicBoolean(false); - scheduler.addTask(() -> flag.set(true), new HourlySchedule(time.getMinute() + 1)); + Runnable taskRunnable = () -> { + flag.set(true); + System.out.println("\tExecuted task."); + }; + Task task = new Task(clock, taskRunnable, new MinutelySchedule(3)); + scheduler.addTask(task); scheduler.start(); + System.out.println("Now: " + clock.instant().toString()); + System.out.println("Next task execution: " + task.getSchedule().computeNextExecutionTime(clock.instant())); System.out.printf("Waiting %d seconds for task to run...", secondsLeft); + assertFalse(flag.get()); try { Thread.sleep(secondsLeft * 1000); } catch (InterruptedException e) { e.printStackTrace(); } assertTrue(flag.get()); + scheduler.stop(true); + } + + @Test + void testRepeatingSchedule() { + Scheduler scheduler = new BasicScheduler(); + Schedule schedule = new RepeatingSchedule(ChronoUnit.SECONDS, 1); + AtomicInteger value = new AtomicInteger(0); + Runnable taskRunnable = () -> { + value.set(value.get() + 1); + System.out.println("\tExecuted task."); + }; + Task task = new Task(taskRunnable, schedule); + scheduler.addTask(task); + assertEquals(0, value.get()); + scheduler.start(); + System.out.println("Waiting 3.5 seconds for 3 iterations."); + try { + Thread.sleep(3500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + assertEquals(3, value.get()); + scheduler.stop(true); } }