Added demo and updated some javadoc and some mechanics.

This commit is contained in:
Andrew Lalis 2021-04-17 12:51:23 +02:00
parent 865bdaf35c
commit 020d8e2258
10 changed files with 119 additions and 25 deletions

View File

@ -4,6 +4,7 @@ import nl.andrewlalis.simply_scheduled.schedule.Task;
import java.time.Clock; import java.time.Clock;
import java.time.Instant; import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
@ -18,18 +19,26 @@ public class BasicScheduler extends Thread implements Scheduler {
private final ExecutorService executorService; private final ExecutorService executorService;
private boolean running = false; private boolean running = false;
public BasicScheduler(Clock clock) { public BasicScheduler(Clock clock, ExecutorService executorService) {
this.clock = clock; this.clock = clock;
this.tasks = new PriorityBlockingQueue<>(); this.tasks = new PriorityBlockingQueue<>();
this.executorService = Executors.newWorkStealingPool(); this.executorService = executorService;
} }
public BasicScheduler() { public BasicScheduler() {
this(Clock.systemDefaultZone()); this(Clock.systemDefaultZone(), Executors.newWorkStealingPool());
} }
/**
* Adds a task to this scheduler's queue.
* @param task The task to add.
* @throws RuntimeException If a task is added while the scheduler is running.
*/
@Override @Override
public void addTask(Task task) { public void addTask(Task task) {
if (this.running) {
throw new RuntimeException("Cannot add tasks to the basic scheduler while it is running.");
}
this.tasks.add(task); this.tasks.add(task);
} }
@ -40,13 +49,19 @@ public class BasicScheduler extends Thread implements Scheduler {
try { try {
Task nextTask = this.tasks.take(); Task nextTask = this.tasks.take();
Instant now = this.clock.instant(); Instant now = this.clock.instant();
long waitTime = nextTask.getSchedule().getNextExecutionTime(now).toEpochMilli() - now.toEpochMilli(); Optional<Instant> optionalNextExecution = nextTask.getSchedule().getNextExecutionTime(now);
if (optionalNextExecution.isEmpty()) {
continue; // Skip if the schedule doesn't have a next execution planned.
}
long waitTime = optionalNextExecution.get().toEpochMilli() - now.toEpochMilli();
if (waitTime > 0) { if (waitTime > 0) {
Thread.sleep(waitTime); Thread.sleep(waitTime);
} }
this.executorService.execute(nextTask.getRunnable()); this.executorService.execute(nextTask.getRunnable());
nextTask.getSchedule().markExecuted(this.clock.instant()); nextTask.getSchedule().markExecuted(this.clock.instant());
this.tasks.put(nextTask); // Put the task back in the queue. if (nextTask.getSchedule().isRepeating()) {
this.tasks.put(nextTask); // Put the task back in the queue.
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
this.setRunning(false); this.setRunning(false);
} }

View File

@ -0,0 +1,18 @@
package nl.andrewlalis.simply_scheduled;
import nl.andrewlalis.simply_scheduled.schedule.RepeatingSchedule;
import nl.andrewlalis.simply_scheduled.schedule.Schedule;
import nl.andrewlalis.simply_scheduled.schedule.Task;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
public class Demo {
public static void main(String[] args) {
Scheduler scheduler = new BasicScheduler();
Schedule schedule = new RepeatingSchedule(ChronoUnit.MILLIS, 250);
Runnable job = () -> System.out.println("Doing task: " + Instant.now().toString());
scheduler.addTask(Task.of(job, schedule));
scheduler.start();
}
}

View File

@ -1,6 +1,7 @@
package nl.andrewlalis.simply_scheduled.schedule; package nl.andrewlalis.simply_scheduled.schedule;
import java.time.*; import java.time.*;
import java.util.Optional;
/** /**
* A daily schedule plans for the execution of a task once per day, at a * A daily schedule plans for the execution of a task once per day, at a
@ -20,13 +21,13 @@ public class DailySchedule implements Schedule {
} }
@Override @Override
public Instant getNextExecutionTime(Instant referenceInstant) { public Optional<Instant> getNextExecutionTime(Instant referenceInstant) {
ZonedDateTime currentTime = referenceInstant.atZone(this.zoneId); ZonedDateTime currentTime = referenceInstant.atZone(this.zoneId);
LocalDate currentDay = LocalDate.from(referenceInstant); LocalDate currentDay = LocalDate.from(referenceInstant);
ZonedDateTime sameDayExecution = currentDay.atTime(this.time).atZone(this.zoneId); ZonedDateTime sameDayExecution = currentDay.atTime(this.time).atZone(this.zoneId);
if (sameDayExecution.isBefore(currentTime)) { if (sameDayExecution.isBefore(currentTime)) {
return sameDayExecution.toInstant(); return Optional.of(sameDayExecution.toInstant());
} }
return sameDayExecution.plusDays(1).toInstant(); return Optional.of(sameDayExecution.plusDays(1).toInstant());
} }
} }

View File

@ -4,6 +4,7 @@ import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Optional;
/** /**
* An hourly schedule is used to execute a task once per hour, at a specific * An hourly schedule is used to execute a task once per hour, at a specific
@ -26,12 +27,12 @@ public class HourlySchedule implements Schedule {
} }
@Override @Override
public Instant getNextExecutionTime(Instant referenceInstant) { public Optional<Instant> getNextExecutionTime(Instant referenceInstant) {
ZonedDateTime currentTime = referenceInstant.atZone(this.zoneId); ZonedDateTime currentTime = referenceInstant.atZone(this.zoneId);
int currentMinute = currentTime.getMinute(); int currentMinute = currentTime.getMinute();
if (currentMinute < this.minute) { if (currentMinute < this.minute) {
return currentTime.truncatedTo(ChronoUnit.MINUTES).plusMinutes(this.minute - currentMinute).toInstant(); return Optional.of(currentTime.truncatedTo(ChronoUnit.MINUTES).plusMinutes(this.minute - currentMinute).toInstant());
} }
return currentTime.plusHours(1).plusMinutes(this.minute).truncatedTo(ChronoUnit.MINUTES).toInstant(); return Optional.of(currentTime.plusHours(1).plusMinutes(this.minute).truncatedTo(ChronoUnit.MINUTES).toInstant());
} }
} }

View File

@ -4,6 +4,7 @@ import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Optional;
public class MinutelySchedule implements Schedule { public class MinutelySchedule implements Schedule {
@ -20,12 +21,12 @@ public class MinutelySchedule implements Schedule {
} }
@Override @Override
public Instant getNextExecutionTime(Instant referenceInstant) { public Optional<Instant> getNextExecutionTime(Instant referenceInstant) {
ZonedDateTime currentTime = referenceInstant.atZone(this.zoneId); ZonedDateTime currentTime = referenceInstant.atZone(this.zoneId);
int currentSecond = currentTime.getSecond(); int currentSecond = currentTime.getSecond();
if (currentSecond >= this.second) { if (currentSecond >= this.second) {
return currentTime.plusMinutes(1).withSecond(this.second).truncatedTo(ChronoUnit.SECONDS).toInstant(); return Optional.of(currentTime.plusMinutes(1).withSecond(this.second).truncatedTo(ChronoUnit.SECONDS).toInstant());
} }
return currentTime.withSecond(this.second).truncatedTo(ChronoUnit.SECONDS).toInstant(); return Optional.of(currentTime.withSecond(this.second).truncatedTo(ChronoUnit.SECONDS).toInstant());
} }
} }

View File

@ -2,6 +2,7 @@ package nl.andrewlalis.simply_scheduled.schedule;
import java.time.Instant; import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Optional;
/** /**
* A schedule which repeatedly executes a task at a regular interval specified * A schedule which repeatedly executes a task at a regular interval specified
@ -30,11 +31,11 @@ public class RepeatingSchedule implements Schedule {
* @return The next instant to execute the task at. * @return The next instant to execute the task at.
*/ */
@Override @Override
public Instant getNextExecutionTime(Instant referenceInstant) { public Optional<Instant> getNextExecutionTime(Instant referenceInstant) {
if (this.lastExecution == null) { if (this.lastExecution == null) {
this.lastExecution = referenceInstant; this.lastExecution = referenceInstant;
} }
return this.lastExecution.plus(multiple, unit); return Optional.of(this.lastExecution.plus(multiple, unit));
} }
@Override @Override

View File

@ -3,6 +3,7 @@ package nl.andrewlalis.simply_scheduled.schedule;
import nl.andrewlalis.simply_scheduled.Scheduler; import nl.andrewlalis.simply_scheduled.Scheduler;
import java.time.Instant; import java.time.Instant;
import java.util.Optional;
/** /**
* A schedule is used by a {@link Scheduler} to determine how long to wait for * A schedule is used by a {@link Scheduler} to determine how long to wait for
@ -16,9 +17,10 @@ public interface Schedule {
* *
* @param referenceInstant The instant representing the current time. * @param referenceInstant The instant representing the current time.
* @return An instant in the future indicating the next time at which a task * @return An instant in the future indicating the next time at which a task
* using this schedule should be executed. * using this schedule should be executed. If the optional is empty, it
* indicates that there are no planned execution times.
*/ */
Instant getNextExecutionTime(Instant referenceInstant); Optional<Instant> getNextExecutionTime(Instant referenceInstant);
/** /**
* This method is called on the schedule as an indication that the scheduler * This method is called on the schedule as an indication that the scheduler
@ -28,4 +30,13 @@ public interface Schedule {
default void markExecuted(Instant instant) { default void markExecuted(Instant instant) {
// Default no-op. // Default no-op.
} }
/**
* Tells whether tasks executed in accordance with this schedule should be
* re-queued again for another execution. Defaults to <code>true</code>.
* @return True if this schedule is repeating, or false otherwise.
*/
default boolean isRepeating() {
return true;
}
} }

View File

@ -0,0 +1,38 @@
package nl.andrewlalis.simply_scheduled.schedule;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.stream.Collectors;
/**
* A schedule in which a discrete number of execution times are defined at
* initialization.
*/
public class SpecificInstantSchedule implements Schedule {
private final List<Instant> executionTimes;
public SpecificInstantSchedule(Instant... executionTimes) {
this(new ArrayList<>(Arrays.asList(executionTimes)));
}
public SpecificInstantSchedule(ZonedDateTime... zonedDateTimes) {
this(Arrays.stream(zonedDateTimes).map(ZonedDateTime::toInstant).collect(Collectors.toList()));
}
private SpecificInstantSchedule(List<Instant> executionTimes) {
this.executionTimes = executionTimes;
Collections.sort(this.executionTimes);
}
@Override
public Optional<Instant> getNextExecutionTime(Instant referenceInstant) {
while (!this.executionTimes.isEmpty()) {
Instant nextExecutionTime = this.executionTimes.remove(0);
if (nextExecutionTime.isBefore(referenceInstant)) {
return Optional.of(nextExecutionTime);
}
}
return Optional.empty();
}
}

View File

@ -2,6 +2,7 @@ package nl.andrewlalis.simply_scheduled.schedule;
import java.time.Clock; import java.time.Clock;
import java.time.Instant; import java.time.Instant;
import java.util.Optional;
/** /**
* A task consists of a runnable job, and a schedule that defines precisely when * A task consists of a runnable job, and a schedule that defines precisely when
@ -19,8 +20,8 @@ public class Task implements Comparable<Task>{
this.schedule = schedule; this.schedule = schedule;
} }
public Task(Runnable runnable, Schedule schedule) { public static Task of(Runnable runnable, Schedule schedule) {
this(Clock.systemDefaultZone(), runnable, schedule); return new Task(Clock.systemDefaultZone(), runnable, schedule);
} }
public Runnable getRunnable() { public Runnable getRunnable() {
@ -34,6 +35,13 @@ public class Task implements Comparable<Task>{
@Override @Override
public int compareTo(Task o) { public int compareTo(Task o) {
Instant now = clock.instant(); Instant now = clock.instant();
return this.schedule.getNextExecutionTime(now).compareTo(o.getSchedule().getNextExecutionTime(now)); Optional<Instant> t1 = this.schedule.getNextExecutionTime(now);
Optional<Instant> t2 = o.getSchedule().getNextExecutionTime(now);
if (t1.isEmpty() && t2.isEmpty()) return 0;
if (t1.isPresent() && t2.isEmpty()) return 1;
if (t1.isEmpty()) return -1;
return t1.get().compareTo(t2.get());
} }
} }

View File

@ -10,6 +10,7 @@ import java.time.Clock;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneOffset; import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -20,7 +21,7 @@ public class SchedulerTest {
@Test @Test
void testSchedule() { void testSchedule() {
Clock clock = Clock.fixed(Instant.now().truncatedTo(ChronoUnit.MINUTES), ZoneOffset.UTC); Clock clock = Clock.fixed(Instant.now().truncatedTo(ChronoUnit.MINUTES), ZoneOffset.UTC);
Scheduler scheduler = new BasicScheduler(clock); Scheduler scheduler = new BasicScheduler(clock, Executors.newSingleThreadExecutor());
int secondsLeft = 4; int secondsLeft = 4;
AtomicBoolean flag = new AtomicBoolean(false); AtomicBoolean flag = new AtomicBoolean(false);
Runnable taskRunnable = () -> { Runnable taskRunnable = () -> {
@ -45,15 +46,14 @@ public class SchedulerTest {
@Test @Test
void testRepeatingSchedule() { void testRepeatingSchedule() {
Scheduler scheduler = new BasicScheduler(); Scheduler scheduler = new BasicScheduler(Clock.systemUTC(), Executors.newWorkStealingPool());
Schedule schedule = new RepeatingSchedule(ChronoUnit.SECONDS, 1); Schedule schedule = new RepeatingSchedule(ChronoUnit.SECONDS, 1);
AtomicInteger value = new AtomicInteger(0); AtomicInteger value = new AtomicInteger(0);
Runnable taskRunnable = () -> { Runnable taskRunnable = () -> {
value.set(value.get() + 1); value.set(value.get() + 1);
System.out.println("\tExecuted task."); System.out.println("\tExecuted task.");
}; };
Task task = new Task(taskRunnable, schedule); scheduler.addTask(Task.of(taskRunnable, schedule));
scheduler.addTask(task);
assertEquals(0, value.get()); assertEquals(0, value.get());
scheduler.start(); scheduler.start();
System.out.println("Waiting 3.5 seconds for 3 iterations."); System.out.println("Waiting 3.5 seconds for 3 iterations.");