Added more implementations of schedules, and a task wrapper object.

This commit is contained in:
Andrew Lalis 2021-04-16 23:19:37 +02:00
parent 10dbec8226
commit 967dcfffe3
9 changed files with 240 additions and 31 deletions

View File

@ -1,20 +1,27 @@
package nl.andrewlalis.simply_scheduled;
import nl.andrewlalis.simply_scheduled.schedule.Schedule;
import nl.andrewlalis.simply_scheduled.schedule.Task;
import java.time.Clock;
import java.time.Instant;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
public class BasicScheduler implements Scheduler {
private ScheduledExecutorService executorService;
/**
* A simple thread-based scheduler that sleeps until the next task, runs it
* using a work-stealing executor thread pool, and continues with the next task.
*/
public class BasicScheduler extends Thread implements Scheduler {
private final Clock clock;
private final PriorityBlockingQueue<Task> tasks;
private final ExecutorService executorService;
private boolean running = false;
public BasicScheduler(Clock clock) {
this.clock = clock;
this.executorService = new ScheduledThreadPoolExecutor(1);
this.tasks = new PriorityBlockingQueue<>();
this.executorService = Executors.newWorkStealingPool();
}
public BasicScheduler() {
@ -22,25 +29,40 @@ public class BasicScheduler implements Scheduler {
}
@Override
public void addTask(Runnable task, Schedule schedule) {
Instant nextExecution = schedule.getNextExecutionTime(this.clock.instant());
long diff = nextExecution.toEpochMilli() - System.currentTimeMillis();
if (diff < 1) return; // Exit immediately, if the next scheduled execution is in the past.
this.executorService.schedule(task, diff, TimeUnit.MILLISECONDS);
public void addTask(Task task) {
this.tasks.add(task);
}
@Override
public void start() {
public void run() {
this.running = true;
while (this.running) {
try {
Task nextTask = this.tasks.take();
Instant now = this.clock.instant();
long waitTime = nextTask.getSchedule().computeNextExecutionTime(now).toEpochMilli() - now.toEpochMilli();
if (waitTime > 0) {
Thread.sleep(waitTime);
}
this.executorService.execute(nextTask.getRunnable());
this.tasks.put(nextTask); // Put the task back in the queue.
} catch (InterruptedException e) {
this.setRunning(false);
}
}
}
@Override
public void stop(boolean force) {
if (this.executorService != null) {
this.setRunning(false);
if (force) {
this.executorService.shutdownNow();
} else {
this.executorService.shutdown();
}
}
private synchronized void setRunning(boolean running) {
this.running = running;
}
}

View File

@ -1,9 +1,30 @@
package nl.andrewlalis.simply_scheduled;
import nl.andrewlalis.simply_scheduled.schedule.Schedule;
import nl.andrewlalis.simply_scheduled.schedule.Task;
/**
* A scheduler is responsible for storing and executing tasks as defined by each
* task's schedule.
*/
public interface Scheduler {
void addTask(Runnable task, Schedule schedule);
/**
* Adds a task to this scheduler, so that when the scheduler starts, the
* task is executed in accordance with its defined schedule.
* @param task The task to add.
*/
void addTask(Task task);
/**
* Starts the scheduler. A scheduler should only execute tasks once it has
* started, and it is up to the implementation to determine whether new
* tasks may be added while the scheduler is running.
*/
void start();
/**
* Stops the scheduler.
* @param force Whether to forcibly stop the scheduler. When set to true,
* any currently-executing tasks are immediately shutdown.
*/
void stop(boolean force);
}

View File

@ -16,7 +16,7 @@ public class DailySchedule implements Schedule {
}
@Override
public Instant getNextExecutionTime(Instant referenceInstant) {
public Instant computeNextExecutionTime(Instant referenceInstant) {
ZonedDateTime currentTime = referenceInstant.atZone(this.zoneId);
LocalDate currentDay = LocalDate.from(referenceInstant);
ZonedDateTime sameDayExecution = currentDay.atTime(this.time).atZone(this.zoneId);

View File

@ -22,7 +22,7 @@ public class HourlySchedule implements Schedule {
}
@Override
public Instant getNextExecutionTime(Instant referenceInstant) {
public Instant computeNextExecutionTime(Instant referenceInstant) {
ZonedDateTime currentTime = referenceInstant.atZone(this.zoneId);
int currentMinute = currentTime.getMinute();
if (currentMinute < this.minute) {

View File

@ -0,0 +1,31 @@
package nl.andrewlalis.simply_scheduled.schedule;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
public class MinutelySchedule implements Schedule {
private final int second;
private final ZoneId zoneId;
public MinutelySchedule(int second, ZoneId zoneId) {
this.second = second;
this.zoneId = zoneId;
}
public MinutelySchedule(int second) {
this(second, ZoneId.systemDefault());
}
@Override
public Instant computeNextExecutionTime(Instant referenceInstant) {
ZonedDateTime currentTime = referenceInstant.atZone(this.zoneId);
int currentSecond = currentTime.getSecond();
if (currentSecond >= this.second) {
return currentTime.plusMinutes(1).withSecond(this.second).truncatedTo(ChronoUnit.SECONDS).toInstant();
}
return currentTime.withSecond(this.second).truncatedTo(ChronoUnit.SECONDS).toInstant();
}
}

View File

@ -0,0 +1,41 @@
package nl.andrewlalis.simply_scheduled.schedule;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
/**
* A schedule which repeatedly executes a task at a regular interval specified
* as some integer multiple of a unit of time, such as 5 seconds, or 2 minutes.
*/
public class RepeatingSchedule implements Schedule {
private final ChronoUnit unit;
private final long multiple;
private Instant lastExecution;
/**
* Constructs a new repeating schedule.
* @param unit The unit of time that the interval consists of.
* @param multiple The
*/
public RepeatingSchedule(ChronoUnit unit, long multiple) {
this.unit = unit;
this.multiple = multiple;
this.lastExecution = null;
}
/**
* Computes the next execution time for a task. This keeps track of the last
* execution time, so that tasks repeat at an exact interval.
* @param referenceInstant The instant representing the current time.
* @return The next instant to execute the task at.
*/
@Override
public Instant computeNextExecutionTime(Instant referenceInstant) {
if (this.lastExecution == null) {
this.lastExecution = referenceInstant;
}
Instant nextExecution = this.lastExecution.plus(multiple, unit);
this.lastExecution = nextExecution;
return nextExecution;
}
}

View File

@ -1,7 +1,27 @@
package nl.andrewlalis.simply_scheduled.schedule;
import nl.andrewlalis.simply_scheduled.Scheduler;
import java.time.Instant;
/**
* A schedule is used by a {@link Scheduler} to determine how long to wait for
* the next time that a task should be executed.
*/
public interface Schedule {
Instant getNextExecutionTime(Instant referenceInstant);
/**
* Given some instant referring to the current time, this method should
* produce an instant sometime in the future at which the next execution of
* a task should happen.
*
* <p>
* <strong>Note that certain implementations may introduce side-effects
* when this method is called more than once.</strong>
* </p>
*
* @param referenceInstant The instant representing the current time.
* @return An instant in the future indicating the next time at which a task
* using this schedule should be executed.
*/
Instant computeNextExecutionTime(Instant referenceInstant);
}

View File

@ -0,0 +1,39 @@
package nl.andrewlalis.simply_scheduled.schedule;
import java.time.Clock;
import java.time.Instant;
/**
* A task consists of a runnable job, and a schedule that defines precisely when
* the job will be run. It implements comparable so that schedulers may use a
* priority queue to insert new tasks.
*/
public class Task implements Comparable<Task>{
private final Clock clock;
private final Runnable runnable;
private final Schedule schedule;
public Task(Clock clock, Runnable runnable, Schedule schedule) {
this.clock = clock;
this.runnable = runnable;
this.schedule = schedule;
}
public Task(Runnable runnable, Schedule schedule) {
this(Clock.systemDefaultZone(), runnable, schedule);
}
public Runnable getRunnable() {
return runnable;
}
public Schedule getSchedule() {
return schedule;
}
@Override
public int compareTo(Task o) {
Instant now = clock.instant();
return this.schedule.computeNextExecutionTime(now).compareTo(o.getSchedule().computeNextExecutionTime(now));
}
}

View File

@ -1,33 +1,68 @@
package nl.andrewlalis.simply_scheduled;
import nl.andrewlalis.simply_scheduled.schedule.HourlySchedule;
import nl.andrewlalis.simply_scheduled.schedule.MinutelySchedule;
import nl.andrewlalis.simply_scheduled.schedule.RepeatingSchedule;
import nl.andrewlalis.simply_scheduled.schedule.Schedule;
import nl.andrewlalis.simply_scheduled.schedule.Task;
import org.junit.jupiter.api.Test;
import java.time.Clock;
import java.time.Instant;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;
public class SchedulerTest {
@Test
void testSchedule() {
Clock clock = Clock.fixed(Instant.now(), ZoneOffset.UTC);
Clock clock = Clock.fixed(Instant.now().truncatedTo(ChronoUnit.MINUTES), ZoneOffset.UTC);
Scheduler scheduler = new BasicScheduler(clock);
LocalTime time = LocalTime.now();
int secondsLeft = 60 - time.getSecond() + 1;
int secondsLeft = 4;
AtomicBoolean flag = new AtomicBoolean(false);
scheduler.addTask(() -> flag.set(true), new HourlySchedule(time.getMinute() + 1));
Runnable taskRunnable = () -> {
flag.set(true);
System.out.println("\tExecuted task.");
};
Task task = new Task(clock, taskRunnable, new MinutelySchedule(3));
scheduler.addTask(task);
scheduler.start();
System.out.println("Now: " + clock.instant().toString());
System.out.println("Next task execution: " + task.getSchedule().computeNextExecutionTime(clock.instant()));
System.out.printf("Waiting %d seconds for task to run...", secondsLeft);
assertFalse(flag.get());
try {
Thread.sleep(secondsLeft * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
assertTrue(flag.get());
scheduler.stop(true);
}
@Test
void testRepeatingSchedule() {
Scheduler scheduler = new BasicScheduler();
Schedule schedule = new RepeatingSchedule(ChronoUnit.SECONDS, 1);
AtomicInteger value = new AtomicInteger(0);
Runnable taskRunnable = () -> {
value.set(value.get() + 1);
System.out.println("\tExecuted task.");
};
Task task = new Task(taskRunnable, schedule);
scheduler.addTask(task);
assertEquals(0, value.get());
scheduler.start();
System.out.println("Waiting 3.5 seconds for 3 iterations.");
try {
Thread.sleep(3500);
} catch (InterruptedException e) {
e.printStackTrace();
}
assertEquals(3, value.get());
scheduler.stop(true);
}
}