Improved scheduling and test reliability.

This commit is contained in:
Andrew Lalis 2021-08-06 01:12:29 +02:00
parent 020d8e2258
commit 27dce051f7
9 changed files with 123 additions and 69 deletions

33
pom.xml
View File

@ -7,6 +7,33 @@
<groupId>nl.andrewlalis</groupId> <groupId>nl.andrewlalis</groupId>
<artifactId>simply-scheduled</artifactId> <artifactId>simply-scheduled</artifactId>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Simply Scheduled</name>
<description>Lightweight task scheduling library.</description>
<url>https://github.com/andrewlalis/SimplyScheduled</url>
<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<developers>
<developer>
<name>Andrew Lalis</name>
<email>andrewlalisofficial@gmail.com</email>
<timezone>CEST</timezone>
</developer>
</developers>
<scm>
<connection>scm:git:git://github.com/andrewlalis/SimplyScheduled.git</connection>
<developerConnection>scm:git:ssh://github.com:andrewlalis/SimplyScheduled.git</developerConnection>
<url>https://github.com/andrewlalis/SimplyScheduled</url>
</scm>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@ -20,7 +47,7 @@
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId> <artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version> <version>3.0.0-M5</version>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
@ -30,7 +57,7 @@
<dependency> <dependency>
<groupId>org.junit</groupId> <groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId> <artifactId>junit-bom</artifactId>
<version>5.7.1</version> <version>5.7.2</version>
<type>pom</type> <type>pom</type>
<scope>import</scope> <scope>import</scope>
</dependency> </dependency>
@ -42,7 +69,7 @@
<dependency> <dependency>
<groupId>org.junit.jupiter</groupId> <groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId> <artifactId>junit-jupiter</artifactId>
<version>5.7.1</version> <version>5.7.2</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -1,5 +1,6 @@
module simply_scheduled { module simply_scheduled {
exports nl.andrewlalis.simply_scheduled; exports nl.andrewlalis.simply_scheduled;
exports nl.andrewlalis.simply_scheduled.schedule; exports nl.andrewlalis.simply_scheduled.schedule;
opens nl.andrewlalis.simply_scheduled; opens nl.andrewlalis.simply_scheduled;
} }

View File

@ -8,6 +8,7 @@ import java.util.Optional;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
/** /**
* A simple thread-based scheduler that sleeps until the next task, runs it * A simple thread-based scheduler that sleeps until the next task, runs it
@ -47,9 +48,9 @@ public class BasicScheduler extends Thread implements Scheduler {
this.running = true; this.running = true;
while (this.running) { while (this.running) {
try { try {
Task nextTask = this.tasks.take(); final Task nextTask = this.tasks.take();
Instant now = this.clock.instant(); final Instant now = this.clock.instant();
Optional<Instant> optionalNextExecution = nextTask.getSchedule().getNextExecutionTime(now); final Optional<Instant> optionalNextExecution = nextTask.getSchedule().getNextExecutionTime(now);
if (optionalNextExecution.isEmpty()) { if (optionalNextExecution.isEmpty()) {
continue; // Skip if the schedule doesn't have a next execution planned. continue; // Skip if the schedule doesn't have a next execution planned.
} }
@ -57,10 +58,18 @@ public class BasicScheduler extends Thread implements Scheduler {
if (waitTime > 0) { if (waitTime > 0) {
Thread.sleep(waitTime); Thread.sleep(waitTime);
} }
this.executorService.execute(nextTask.getRunnable()); try {
this.executorService.execute(nextTask.getRunnable());
} catch (RejectedExecutionException e) {
if (!this.executorService.isShutdown()) {
// Only show the stack trace if the executor service is not being shut down.
// We expect the service to reject executions if it is shutting down.
e.printStackTrace();
}
}
nextTask.getSchedule().markExecuted(this.clock.instant()); nextTask.getSchedule().markExecuted(this.clock.instant());
if (nextTask.getSchedule().isRepeating()) { if (nextTask.getSchedule().isRepeating()) {
this.tasks.put(nextTask); // Put the task back in the queue. this.tasks.put(nextTask);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
this.setRunning(false); this.setRunning(false);

View File

@ -2,7 +2,6 @@ package nl.andrewlalis.simply_scheduled;
import nl.andrewlalis.simply_scheduled.schedule.RepeatingSchedule; import nl.andrewlalis.simply_scheduled.schedule.RepeatingSchedule;
import nl.andrewlalis.simply_scheduled.schedule.Schedule; import nl.andrewlalis.simply_scheduled.schedule.Schedule;
import nl.andrewlalis.simply_scheduled.schedule.Task;
import java.time.Instant; import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
@ -10,9 +9,9 @@ import java.time.temporal.ChronoUnit;
public class Demo { public class Demo {
public static void main(String[] args) { public static void main(String[] args) {
Scheduler scheduler = new BasicScheduler(); Scheduler scheduler = new BasicScheduler();
Schedule schedule = new RepeatingSchedule(ChronoUnit.MILLIS, 250); Schedule schedule = new RepeatingSchedule(ChronoUnit.SECONDS, 5, Instant.now().truncatedTo(ChronoUnit.SECONDS));
Runnable job = () -> System.out.println("Doing task: " + Instant.now().toString()); Runnable job = () -> System.out.println("Doing task: " + Instant.now().toString());
scheduler.addTask(Task.of(job, schedule)); scheduler.addTask(job, schedule);
scheduler.start(); scheduler.start();
} }
} }

View File

@ -1,5 +1,6 @@
package nl.andrewlalis.simply_scheduled; package nl.andrewlalis.simply_scheduled;
import nl.andrewlalis.simply_scheduled.schedule.Schedule;
import nl.andrewlalis.simply_scheduled.schedule.Task; import nl.andrewlalis.simply_scheduled.schedule.Task;
/** /**
@ -14,6 +15,15 @@ public interface Scheduler {
*/ */
void addTask(Task task); void addTask(Task task);
/**
* Adds a task to this scheduler.
* @param runnable The code to run.
* @param schedule The schedule that dictates when the code should run.
*/
default void addTask(Runnable runnable, Schedule schedule) {
addTask(new Task(runnable, schedule));
}
/** /**
* Starts the scheduler. A scheduler should only execute tasks once it has * Starts the scheduler. A scheduler should only execute tasks once it has
* started, and it is up to the implementation to determine whether new * started, and it is up to the implementation to determine whether new
@ -27,4 +37,11 @@ public interface Scheduler {
* any currently-executing tasks are immediately shutdown. * any currently-executing tasks are immediately shutdown.
*/ */
void stop(boolean force); void stop(boolean force);
/**
* Stops the scheduler, and waits for any currently-executing tasks to finish.
*/
default void stop() {
stop(false);
}
} }

View File

@ -1,32 +0,0 @@
package nl.andrewlalis.simply_scheduled.schedule;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
public class MinutelySchedule implements Schedule {
private final int second;
private final ZoneId zoneId;
public MinutelySchedule(int second, ZoneId zoneId) {
this.second = second;
this.zoneId = zoneId;
}
public MinutelySchedule(int second) {
this(second, ZoneId.systemDefault());
}
@Override
public Optional<Instant> getNextExecutionTime(Instant referenceInstant) {
ZonedDateTime currentTime = referenceInstant.atZone(this.zoneId);
int currentSecond = currentTime.getSecond();
if (currentSecond >= this.second) {
return Optional.of(currentTime.plusMinutes(1).withSecond(this.second).truncatedTo(ChronoUnit.SECONDS).toInstant());
}
return Optional.of(currentTime.withSecond(this.second).truncatedTo(ChronoUnit.SECONDS).toInstant());
}
}

View File

@ -11,17 +11,29 @@ import java.util.Optional;
public class RepeatingSchedule implements Schedule { public class RepeatingSchedule implements Schedule {
private final ChronoUnit unit; private final ChronoUnit unit;
private final long multiple; private final long multiple;
private Instant lastExecution; private long elapsedIntervals = 0;
private final Instant start;
/** /**
* Constructs a new repeating schedule. * Constructs a new repeating schedule.
* @param unit The unit of time that the interval consists of. * @param unit The unit of time that the interval consists of.
* @param multiple The * @param multiple The number of units of time that each interval consists of.
* @param start The starting point for this schedule.
*/ */
public RepeatingSchedule(ChronoUnit unit, long multiple) { public RepeatingSchedule(ChronoUnit unit, long multiple, Instant start) {
this.unit = unit; this.unit = unit;
this.multiple = multiple; this.multiple = multiple;
this.lastExecution = null; this.start = start;
}
/**
* Constructs a new repeating schedule, using {@link Instant#now()} as the
* starting point.
* @param unit The unit of time that the interval consists of.
* @param multiple The number of units of time that each interval consists of.
*/
public RepeatingSchedule(ChronoUnit unit, long multiple) {
this(unit, multiple, Instant.now());
} }
/** /**
@ -32,14 +44,11 @@ public class RepeatingSchedule implements Schedule {
*/ */
@Override @Override
public Optional<Instant> getNextExecutionTime(Instant referenceInstant) { public Optional<Instant> getNextExecutionTime(Instant referenceInstant) {
if (this.lastExecution == null) { return Optional.of(this.start.plus(elapsedIntervals * multiple, unit));
this.lastExecution = referenceInstant;
}
return Optional.of(this.lastExecution.plus(multiple, unit));
} }
@Override @Override
public void markExecuted(Instant instant) { public void markExecuted(Instant instant) {
this.lastExecution = instant; this.elapsedIntervals++;
} }
} }

View File

@ -14,20 +14,40 @@ public class Task implements Comparable<Task>{
private final Runnable runnable; private final Runnable runnable;
private final Schedule schedule; private final Schedule schedule;
/**
* Constructs a new task that will run the given runnable according to the
* given schedule. Allows for specifying a {@link Clock}, this is mostly
* useful for testing purposes.
* @param clock The clock to use for time-sensitive operations.
* @param runnable The code to run when the task is executed.
* @param schedule The schedule which determines when the task is executed.
*/
public Task(Clock clock, Runnable runnable, Schedule schedule) { public Task(Clock clock, Runnable runnable, Schedule schedule) {
this.clock = clock; this.clock = clock;
this.runnable = runnable; this.runnable = runnable;
this.schedule = schedule; this.schedule = schedule;
} }
public static Task of(Runnable runnable, Schedule schedule) { /**
return new Task(Clock.systemDefaultZone(), runnable, schedule); * Constructs a new task that will run the given runnable according to the
* given schedule.
* @param runnable The code to run when the task is executed.
* @param schedule The schedule which determines when the task is executed.
*/
public Task(Runnable runnable, Schedule schedule) {
this(Clock.systemDefaultZone(), runnable, schedule);
} }
/**
* @return The runnable which will be executed when this task is scheduled.
*/
public Runnable getRunnable() { public Runnable getRunnable() {
return runnable; return runnable;
} }
/**
* @return The schedule for this task.
*/
public Schedule getSchedule() { public Schedule getSchedule() {
return schedule; return schedule;
} }

View File

@ -1,6 +1,5 @@
package nl.andrewlalis.simply_scheduled; package nl.andrewlalis.simply_scheduled;
import nl.andrewlalis.simply_scheduled.schedule.MinutelySchedule;
import nl.andrewlalis.simply_scheduled.schedule.RepeatingSchedule; import nl.andrewlalis.simply_scheduled.schedule.RepeatingSchedule;
import nl.andrewlalis.simply_scheduled.schedule.Schedule; import nl.andrewlalis.simply_scheduled.schedule.Schedule;
import nl.andrewlalis.simply_scheduled.schedule.Task; import nl.andrewlalis.simply_scheduled.schedule.Task;
@ -8,6 +7,7 @@ import org.junit.jupiter.api.Test;
import java.time.Clock; import java.time.Clock;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -16,7 +16,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.*;
public class SchedulerTest { /**
* Tests the functionality of the {@link BasicScheduler} to reliably execute
* scheduled tasks.
*/
public class BasicSchedulerTest {
@Test @Test
void testSchedule() { void testSchedule() {
@ -28,7 +32,7 @@ public class SchedulerTest {
flag.set(true); flag.set(true);
System.out.println("\tExecuted task."); System.out.println("\tExecuted task.");
}; };
Task task = new Task(clock, taskRunnable, new MinutelySchedule(3)); Task task = new Task(clock, taskRunnable, new RepeatingSchedule(ChronoUnit.SECONDS, 2, clock.instant().plusSeconds(1)));
scheduler.addTask(task); scheduler.addTask(task);
scheduler.start(); scheduler.start();
System.out.println("Now: " + clock.instant().toString()); System.out.println("Now: " + clock.instant().toString());
@ -45,24 +49,24 @@ public class SchedulerTest {
} }
@Test @Test
void testRepeatingSchedule() { void testRepeatingSchedule() throws InterruptedException {
Scheduler scheduler = new BasicScheduler(Clock.systemUTC(), Executors.newWorkStealingPool()); Scheduler scheduler = new BasicScheduler(Clock.systemUTC(), Executors.newFixedThreadPool(3));
Schedule schedule = new RepeatingSchedule(ChronoUnit.SECONDS, 1); Instant startInstant = Instant.now(Clock.systemUTC());
Schedule schedule = new RepeatingSchedule(ChronoUnit.SECONDS, 1, startInstant);
AtomicInteger value = new AtomicInteger(0); AtomicInteger value = new AtomicInteger(0);
Runnable taskRunnable = () -> { Runnable taskRunnable = () -> {
value.set(value.get() + 1); value.set(value.get() + 1);
System.out.println("\tExecuted task."); System.out.println("\tExecuted task at " + LocalDateTime.now());
}; };
scheduler.addTask(Task.of(taskRunnable, schedule)); scheduler.addTask(new Task(taskRunnable, schedule));
assertEquals(0, value.get()); assertEquals(0, value.get());
scheduler.start(); scheduler.start();
System.out.println("Waiting 3.5 seconds for 3 iterations.");
try { Thread.sleep(3500);
Thread.sleep(3500); // We expect the task to have run 4 times:
} catch (InterruptedException e) { // at t=0, t=1, t=2, and t=3.
e.printStackTrace(); assertEquals(4, value.get());
}
assertEquals(3, value.get());
scheduler.stop(true); scheduler.stop(true);
} }
} }