Finalize cross-service upload workflow.

This commit is contained in:
Andrew Lalis 2023-04-05 14:43:40 +02:00
parent ab3cf591c6
commit ffe1d9bd40
12 changed files with 200 additions and 48 deletions

View File

@ -8,5 +8,5 @@ public record SubmissionPayload(
float weight,
String weightUnit,
int reps,
String videoFileId
long taskId
) {}

View File

@ -32,13 +32,24 @@ public class Submission {
@Column(nullable = false)
private LocalDateTime performedAt;
/**
* The id of the video processing task that a user gives to us when they
* create the submission, so that when the task finishes processing, we can
* route its data to the right submission.
*/
@Column(nullable = false, updatable = false)
private long videoProcessingTaskId;
/**
* The id of the video file that was submitted for this submission. It lives
* on the <em>gymboard-cdn</em> service as a stored file, which can be
* accessed via <code>GET https://CDN-HOST/files/{videoFileId}</code>.
*/
@Column(nullable = false, updatable = false, length = 26)
private String videoFileId;
@Column(length = 26)
private String videoFileId = null;
@Column(length = 26)
private String thumbnailFileId = null;
@Column(nullable = false, precision = 7, scale = 2)
private BigDecimal rawWeight;
@ -64,7 +75,7 @@ public class Submission {
Exercise exercise,
User user,
LocalDateTime performedAt,
String videoFileId,
long videoProcessingTaskId,
BigDecimal rawWeight,
WeightUnit unit,
BigDecimal metricWeight,
@ -73,9 +84,9 @@ public class Submission {
this.id = id;
this.gym = gym;
this.exercise = exercise;
this.videoFileId = videoFileId;
this.user = user;
this.performedAt = performedAt;
this.videoProcessingTaskId = videoProcessingTaskId;
this.rawWeight = rawWeight;
this.weightUnit = unit;
this.metricWeight = metricWeight;
@ -99,10 +110,26 @@ public class Submission {
return exercise;
}
public long getVideoProcessingTaskId() {
return videoProcessingTaskId;
}
public String getVideoFileId() {
return videoFileId;
}
public String getThumbnailFileId() {
return thumbnailFileId;
}
public void setVideoFileId(String videoFileId) {
this.videoFileId = videoFileId;
}
public void setThumbnailFileId(String thumbnailFileId) {
this.thumbnailFileId = thumbnailFileId;
}
public User getUser() {
return user;
}

View File

@ -50,4 +50,14 @@ public class CdnClient {
HttpResponse<String> response = httpClient.send(req, HttpResponse.BodyHandlers.ofString());
return objectMapper.readValue(response.body(), responseType);
}
public void post(String urlPath) throws IOException, InterruptedException {
HttpRequest req = HttpRequest.newBuilder(URI.create(baseUrl + urlPath))
.POST(HttpRequest.BodyPublishers.noBody())
.build();
HttpResponse<Void> response = httpClient.send(req, HttpResponse.BodyHandlers.discarding());
if (response.statusCode() != 200) {
throw new IOException("Request failed with code " + response.statusCode());
}
}
}

View File

@ -3,8 +3,12 @@ package nl.andrewlalis.gymboard_api.domains.api.service.cdn_client;
import java.nio.file.Path;
public record UploadsClient(CdnClient client) {
public record FileUploadResponse(String id) {}
public record VideoProcessingTaskStatusResponse(String status) {}
public record FileUploadResponse(long taskId) {}
public record VideoProcessingTaskStatusResponse(
String status,
String videoFileId,
String thumbnailFileId
) {}
public record FileMetadataResponse(
String filename,
@ -14,15 +18,19 @@ public record UploadsClient(CdnClient client) {
boolean availableForDownload
) {}
public FileUploadResponse uploadVideo(Path filePath, String contentType) throws Exception {
return client.postFile("/uploads/video", filePath, contentType, FileUploadResponse.class);
public long uploadVideo(Path filePath, String contentType) throws Exception {
return client.postFile("/uploads/video", filePath, contentType, FileUploadResponse.class).taskId();
}
public VideoProcessingTaskStatusResponse getVideoProcessingStatus(String id) throws Exception {
public VideoProcessingTaskStatusResponse getVideoProcessingTaskStatus(long id) throws Exception {
return client.get("/uploads/video/" + id + "/status", VideoProcessingTaskStatusResponse.class);
}
public FileMetadataResponse getFileMetadata(String id) throws Exception {
return client.get("/files/" + id + "/metadata", FileMetadataResponse.class);
}
public void startTask(long taskId) throws Exception {
client.post("/uploads/video/" + taskId + "/start");
}
}

View File

@ -91,9 +91,14 @@ public class ExerciseSubmissionService {
Submission submission = submissionRepository.saveAndFlush(new Submission(
ulid.nextULID(), gym, exercise, user,
performedAt,
payload.videoFileId(),
payload.taskId(),
rawWeight, weightUnit, metricWeight, payload.reps()
));
try {
cdnClient.uploads.startTask(submission.getVideoProcessingTaskId());
} catch (Exception e) {
log.error("Failed to start video processing task for submission " + submission.getId(), e);
}
return new SubmissionResponse(submission);
}
@ -118,17 +123,13 @@ public class ExerciseSubmissionService {
}
try {
UploadsClient.FileMetadataResponse metadata = cdnClient.uploads.getFileMetadata(data.videoFileId());
if (metadata == null) {
response.addMessage("Missing video file.");
} else if (!metadata.availableForDownload()) {
response.addMessage("File not yet available for download.");
} else if (!"video/mp4".equals(metadata.mimeType())) {
response.addMessage("Invalid video file format.");
var status = cdnClient.uploads.getVideoProcessingTaskStatus(data.taskId());
if (!status.status().equalsIgnoreCase("NOT_STARTED")) {
response.addMessage("Invalid video processing task.");
}
} catch (Exception e) {
log.error("Error fetching file metadata.", e);
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Error fetching uploaded video file metadata.");
log.error("Error fetching task status.", e);
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "Error fetching uploaded video task status.");
}
return response;
}

View File

@ -56,7 +56,7 @@ public class SampleDataLoader implements ApplicationListener<ContextRefreshedEve
gen.generate();
completed.add(gen);
} catch (Exception e) {
throw new RuntimeException("Generator failed: " + gen.getClass().getSimpleName());
throw new RuntimeException("Generator failed: " + gen.getClass().getSimpleName(), e);
}
}
}

View File

@ -1,21 +1,25 @@
package nl.andrewlalis.gymboard_api.util.sample_data;
import nl.andrewlalis.gymboard_api.domains.api.dao.GymRepository;
import nl.andrewlalis.gymboard_api.domains.api.dao.ExerciseRepository;
import nl.andrewlalis.gymboard_api.domains.api.dao.GymRepository;
import nl.andrewlalis.gymboard_api.domains.api.dao.submission.SubmissionRepository;
import nl.andrewlalis.gymboard_api.domains.api.model.Exercise;
import nl.andrewlalis.gymboard_api.domains.api.model.Gym;
import nl.andrewlalis.gymboard_api.domains.api.model.WeightUnit;
import nl.andrewlalis.gymboard_api.domains.api.model.Exercise;
import nl.andrewlalis.gymboard_api.domains.api.model.submission.Submission;
import nl.andrewlalis.gymboard_api.domains.api.service.cdn_client.CdnClient;
import nl.andrewlalis.gymboard_api.domains.api.service.submission.ExerciseSubmissionService;
import nl.andrewlalis.gymboard_api.domains.api.service.cdn_client.UploadsClient;
import nl.andrewlalis.gymboard_api.domains.auth.dao.UserRepository;
import nl.andrewlalis.gymboard_api.domains.auth.model.User;
import nl.andrewlalis.gymboard_api.util.ULID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.file.Path;
import java.time.Duration;
@ -25,35 +29,30 @@ import java.util.*;
@Component
@Profile("development")
public class SampleSubmissionGenerator implements SampleDataGenerator {
private static final Logger log = LoggerFactory.getLogger(SampleSubmissionGenerator.class);
private final GymRepository gymRepository;
private final UserRepository userRepository;
private final ExerciseRepository exerciseRepository;
private final ExerciseSubmissionService submissionService;
private final SubmissionRepository submissionRepository;
private final ULID ulid;
@Value("${app.cdn-origin}")
private String cdnOrigin;
public SampleSubmissionGenerator(GymRepository gymRepository, UserRepository userRepository, ExerciseRepository exerciseRepository, ExerciseSubmissionService submissionService, SubmissionRepository submissionRepository, ULID ulid) {
public SampleSubmissionGenerator(GymRepository gymRepository, UserRepository userRepository, ExerciseRepository exerciseRepository, SubmissionRepository submissionRepository, ULID ulid) {
this.gymRepository = gymRepository;
this.userRepository = userRepository;
this.exerciseRepository = exerciseRepository;
this.submissionService = submissionService;
this.submissionRepository = submissionRepository;
this.ulid = ulid;
}
@Override
public void generate() throws Exception {
final CdnClient cdnClient = new CdnClient(cdnOrigin);
List<String> videoIds = new ArrayList<>();
var video1 = cdnClient.uploads.uploadVideo(Path.of("sample_data", "sample_video_curl.mp4"), "video/mp4");
var video2 = cdnClient.uploads.uploadVideo(Path.of("sample_data", "sample_video_ohp.mp4"), "video/mp4");
videoIds.add(video1.id());
videoIds.add(video2.id());
var uploads = generateUploads();
// Now that uploads are complete, we can proceed with generating the submissions.
List<Gym> gyms = gymRepository.findAll();
List<User> users = userRepository.findAll();
List<Exercise> exercises = exerciseRepository.findAll();
@ -65,24 +64,27 @@ public class SampleSubmissionGenerator implements SampleDataGenerator {
Random random = new Random(1);
List<Submission> submissions = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
submissions.add(generateRandomSubmission(
Submission submission = generateRandomSubmission(
gyms,
users,
exercises,
videoIds,
uploads,
earliestSubmission,
latestSubmission,
random
));
);
submissions.add(submission);
}
submissionRepository.saveAll(submissions);
// After adding all the submissions, we'll signal to CDN that it can start processing.
}
private Submission generateRandomSubmission(
List<Gym> gyms,
List<User> users,
List<Exercise> exercises,
List<String> videoIds,
Map<Long, Pair<String, String>> uploads,
LocalDateTime earliestSubmission,
LocalDateTime latestSubmission,
Random random
@ -102,13 +104,16 @@ public class SampleSubmissionGenerator implements SampleDataGenerator {
randomChoice(exercises, random),
randomChoice(users, random),
time,
randomChoice(videoIds, random),
randomChoice(new ArrayList<>(uploads.keySet()), random),
rawWeight,
weightUnit,
metricWeight,
random.nextInt(13) + 1
);
submission.setVerified(true);
var uploadData = uploads.get(submission.getVideoProcessingTaskId());
submission.setVideoFileId(uploadData.getFirst());
submission.setThumbnailFileId(uploadData.getSecond());
return submission;
}
@ -125,4 +130,48 @@ public class SampleSubmissionGenerator implements SampleDataGenerator {
Duration dur = Duration.between(start, end);
return start.plusSeconds(rand.nextLong(dur.toSeconds() + 1));
}
/**
* Generates a set of sample video uploads to use for all the sample
* submissions.
* @return A map containing keys representing video processing task ids, and
* values being a pair of video and thumbnail file ids.
* @throws Exception If an error occurs.
*/
private Map<Long, Pair<String, String>> generateUploads() throws Exception {
final CdnClient cdnClient = new CdnClient(cdnOrigin);
List<Long> taskIds = new ArrayList<>();
taskIds.add(cdnClient.uploads.uploadVideo(Path.of("sample_data", "sample_video_curl.mp4"), "video/mp4"));
taskIds.add(cdnClient.uploads.uploadVideo(Path.of("sample_data", "sample_video_ohp.mp4"), "video/mp4"));
Map<Long, UploadsClient.VideoProcessingTaskStatusResponse> taskStatus = new HashMap<>();
for (long taskId : taskIds) {
cdnClient.uploads.startTask(taskId);
taskStatus.put(taskId, cdnClient.uploads.getVideoProcessingTaskStatus(taskId));
}
// Wait for all video uploads to complete.
while (
taskStatus.values().stream()
.map(UploadsClient.VideoProcessingTaskStatusResponse::status)
.anyMatch(status -> !List.of("COMPLETED", "FAILED").contains(status.toUpperCase()))
) {
log.info("Waiting for sample video upload tasks to finish...");
Thread.sleep(1000);
for (long taskId : taskIds) taskStatus.put(taskId, cdnClient.uploads.getVideoProcessingTaskStatus(taskId));
}
// If any upload failed, throw an exception and cancel this generator.
if (taskStatus.values().stream().anyMatch(r -> r.status().equalsIgnoreCase("FAILED"))) {
throw new IOException("Video upload task processing failed.");
}
// Prepare the final data structure.
Map<Long, Pair<String, String>> finalResults = new HashMap<>();
for (var entry : taskStatus.entrySet()) {
finalResults.put(entry.getKey(), Pair.of(entry.getValue().videoFileId(), entry.getValue().thumbnailFileId()));
}
return finalResults;
}
}

View File

@ -180,6 +180,20 @@ public class FileStorageService {
Files.deleteIfExists(filePath);
}
public void copyTo(String fileId, Path filePath) throws IOException {
Path inputFilePath = getStoragePathForFile(fileId);
if (Files.notExists(inputFilePath)) {
throw new IOException("File " + fileId + " not found.");
}
try (
var in = Files.newInputStream(inputFilePath);
var out = Files.newOutputStream(filePath)
) {
readMetadata(in);
in.transferTo(out);
}
}
private static LocalDateTime dateFromULID(ULID.Value value) {
return Instant.ofEpochMilli(value.timestamp())
.atOffset(ZoneOffset.UTC)

View File

@ -1,5 +1,7 @@
package nl.andrewlalis.gymboardcdn.uploads.api;
public record VideoProcessingTaskStatusResponse(
String status
String status,
String videoFileId,
String thumbnailFileId
) {}

View File

@ -45,6 +45,12 @@ public class VideoProcessingTask {
@Column(nullable = false, updatable = false, length = 26)
private String uploadFileId;
@Column(length = 26)
private String videoFileId;
@Column(length = 26)
private String thumbnailFileId;
public VideoProcessingTask() {}
public VideoProcessingTask(Status status, String uploadFileId) {
@ -71,4 +77,20 @@ public class VideoProcessingTask {
public String getUploadFileId() {
return uploadFileId;
}
public String getVideoFileId() {
return videoFileId;
}
public void setVideoFileId(String videoFileId) {
this.videoFileId = videoFileId;
}
public String getThumbnailFileId() {
return thumbnailFileId;
}
public void setThumbnailFileId(String thumbnailFileId) {
this.thumbnailFileId = thumbnailFileId;
}
}

View File

@ -78,7 +78,11 @@ public class UploadService {
public VideoProcessingTaskStatusResponse getVideoProcessingStatus(long id) {
VideoProcessingTask task = videoTaskRepository.findById(id)
.orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND));
return new VideoProcessingTaskStatusResponse(task.getStatus().name());
return new VideoProcessingTaskStatusResponse(
task.getStatus().name(),
task.getVideoFileId(),
task.getThumbnailFileId()
);
}
/**
@ -91,7 +95,9 @@ public class UploadService {
public void startVideoProcessing(long taskId) {
VideoProcessingTask task = videoTaskRepository.findById(taskId)
.orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND));
if (task.getStatus() == VideoProcessingTask.Status.NOT_STARTED) {
task.setStatus(VideoProcessingTask.Status.WAITING);
videoTaskRepository.save(task);
}
}
}

View File

@ -89,20 +89,29 @@ public class VideoProcessingService {
log.info("Started processing task {}.", task.getId());
Path uploadFile = fileStorageService.getStoragePathForFile(task.getUploadFileId());
Path rawUploadFile = uploadFile.resolveSibling(task.getUploadFileId() + "-vid-in");
if (Files.notExists(uploadFile) || !Files.isReadable(uploadFile)) {
log.error("Uploaded video file {} doesn't exist or isn't readable.", uploadFile);
updateTask(task, VideoProcessingTask.Status.FAILED);
return;
}
try {
fileStorageService.copyTo(task.getUploadFileId(), rawUploadFile);
} catch (IOException e) {
log.error("Failed to copy raw video file {} to {}.", uploadFile, rawUploadFile);
e.printStackTrace();
updateTask(task, VideoProcessingTask.Status.FAILED);
return;
}
// Run the actual processing here.
Path videoFile = uploadFile.resolveSibling(task.getUploadFileId() + "-vid-out");
Path thumbnailFile = uploadFile.resolveSibling(task.getUploadFileId() + "-thm-out");
Path videoFile = uploadFile.resolveSibling(task.getUploadFileId() + "-vid-out.mp4");
Path thumbnailFile = uploadFile.resolveSibling(task.getUploadFileId() + "-thm-out.jpeg");
try {
log.info("Processing video for uploaded video file {}.", uploadFile.getFileName());
videoProcessor.processVideo(uploadFile, videoFile);
videoProcessor.processVideo(rawUploadFile, videoFile);
log.info("Generating thumbnail for uploaded video file {}.", uploadFile.getFileName());
thumbnailGenerator.generateThumbnailImage(uploadFile, thumbnailFile);
thumbnailGenerator.generateThumbnailImage(videoFile, thumbnailFile);
} catch (Exception e) {
e.printStackTrace();
log.error("""
@ -111,7 +120,7 @@ public class VideoProcessingService {
Output file: {}
Exception message: {}""",
task.getId(),
uploadFile,
rawUploadFile,
videoFile,
e.getMessage()
);
@ -129,6 +138,9 @@ public class VideoProcessingService {
// Save the thumbnail too.
FileMetadata thumbnailMetadata = new FileMetadata("thumbnail.jpeg", "image/jpeg", true);
String thumbnailFileId = fileStorageService.save(thumbnailFile, thumbnailMetadata);
task.setVideoFileId(videoFileId);
task.setThumbnailFileId(thumbnailFileId);
updateTask(task, VideoProcessingTask.Status.COMPLETED);
log.info("Finished processing task {}.", task.getId());
@ -140,6 +152,7 @@ public class VideoProcessingService {
} finally {
try {
fileStorageService.delete(task.getUploadFileId());
Files.deleteIfExists(rawUploadFile);
Files.deleteIfExists(videoFile);
Files.deleteIfExists(thumbnailFile);
} catch (IOException e) {