Add scheduled analytics jobs instead of on-the-fly analytics.
This commit is contained in:
parent
2b85abcfae
commit
ddfd32c777
|
|
@ -138,14 +138,6 @@ void handleGetTotalBalances(ref ServerHttpRequest request, ref ServerHttpRespons
|
|||
writeJsonBody(response, balances);
|
||||
}
|
||||
|
||||
void handleGetAccountBalanceTimeSeries(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||
auto ds = getProfileDataSource(request);
|
||||
ulong accountId = request.getPathParamOrThrow!ulong("accountId");
|
||||
int timeZoneOffset = request.getParamAs!int("time-zone-offset", 0);
|
||||
auto series = getBalanceTimeSeries(ds, accountId, timeZoneOffset);
|
||||
writeJsonBody(response, series);
|
||||
}
|
||||
|
||||
// Value records:
|
||||
|
||||
const PageRequest VALUE_RECORD_DEFAULT_PAGE_REQUEST = PageRequest(1, 10, [Sort("timestamp", SortDir.DESC)]);
|
||||
|
|
|
|||
|
|
@ -79,33 +79,6 @@ CurrencyBalance[] getTotalBalanceForAllAccounts(ProfileDataSource ds, SysTime ti
|
|||
return balances;
|
||||
}
|
||||
|
||||
struct BalanceTimeSeriesPoint {
|
||||
long balance;
|
||||
string timestamp;
|
||||
}
|
||||
|
||||
BalanceTimeSeriesPoint[] getBalanceTimeSeries(ProfileDataSource ds, ulong accountId, int timeZoneOffsetMinutes) {
|
||||
BalanceTimeSeriesPoint[] points;
|
||||
immutable TimeZone tz = new immutable SimpleTimeZone(minutes(timeZoneOffsetMinutes));
|
||||
SysTime now = Clock.currTime(tz);
|
||||
SysTime endOfToday = SysTime(
|
||||
DateTime(now.year, now.month, now.day, 23, 59, 59),
|
||||
tz
|
||||
);
|
||||
SysTime timestamp = endOfToday.toOtherTZ(UTC());
|
||||
for (int i = 0; i < 30; i++) {
|
||||
auto balance = getBalance(ds, accountId, timestamp);
|
||||
if (!balance.isNull) {
|
||||
points ~= BalanceTimeSeriesPoint(
|
||||
balance.value,
|
||||
timestamp.toISOExtString()
|
||||
);
|
||||
}
|
||||
timestamp = timestamp - days(1);
|
||||
}
|
||||
return points;
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method that derives a balance for an account, by using the nearest
|
||||
* value record, and all journal entries between that record and the desired
|
||||
|
|
|
|||
|
|
@ -0,0 +1,30 @@
|
|||
module analytics.api;
|
||||
|
||||
import handy_http_primitives;
|
||||
import profile.data;
|
||||
import profile.service;
|
||||
|
||||
void handleGetBalanceTimeSeries(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||
auto ds = getProfileDataSource(request);
|
||||
serveJsonFromProperty(response, ds, "analytics.balanceTimeSeries");
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to serve JSON analytics data to a client by fetching it
|
||||
* directly from the user's profile properties table and writing it to the
|
||||
* response.
|
||||
* Params:
|
||||
* response = The response to write to.
|
||||
* ds = The datasource to serve data from.
|
||||
* key = The name of the analytics property to read data from.
|
||||
*/
|
||||
private void serveJsonFromProperty(ref ServerHttpResponse response, ref ProfileDataSource ds, string key) {
|
||||
PropertiesRepository propsRepo = ds.getPropertiesRepository();
|
||||
string jsonStr = propsRepo.findProperty(key).orElse(null);
|
||||
if (jsonStr is null || jsonStr.length == 0) {
|
||||
response.status = HttpStatus.NOT_FOUND;
|
||||
response.writeBodyString("No data found for " ~ key);
|
||||
} else {
|
||||
response.writeBodyString(jsonStr, ContentTypes.APPLICATION_JSON);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,100 @@
|
|||
module analytics.balances;
|
||||
|
||||
import handy_http_primitives;
|
||||
import std.datetime;
|
||||
import std.stdio;
|
||||
import std.path;
|
||||
import std.file;
|
||||
import slf4d;
|
||||
import asdf;
|
||||
|
||||
import profile.data;
|
||||
import profile.model;
|
||||
import account.data;
|
||||
import account.model;
|
||||
import account.service;
|
||||
import analytics.util;
|
||||
|
||||
struct BalanceSnapshot {
|
||||
long balance;
|
||||
string timestamp;
|
||||
}
|
||||
|
||||
struct AccountBalanceTimeSeries {
|
||||
ulong accountId;
|
||||
string currencyCode;
|
||||
BalanceSnapshot[] balanceTimeSeries;
|
||||
}
|
||||
|
||||
struct TotalBalanceTimeSeries {
|
||||
string currencyCode;
|
||||
BalanceSnapshot[] balanceTimeSeries;
|
||||
}
|
||||
|
||||
struct BalanceTimeSeriesAnalytics {
|
||||
AccountBalanceTimeSeries[] accounts;
|
||||
TotalBalanceTimeSeries[] totals;
|
||||
}
|
||||
|
||||
void computeAccountBalanceTimeSeries(Profile profile, ProfileRepository profileRepo) {
|
||||
ProfileDataSource ds = profileRepo.getDataSource(profile);
|
||||
Account[] accounts = ds.getAccountRepository().findAll();
|
||||
|
||||
// Initialize the data structure that'll store the analytics info.
|
||||
BalanceTimeSeriesAnalytics data;
|
||||
foreach (account; accounts) {
|
||||
data.accounts ~= AccountBalanceTimeSeries(
|
||||
account.id,
|
||||
account.currency.code.idup,
|
||||
[]
|
||||
);
|
||||
}
|
||||
|
||||
foreach (timestamp; generateTimeSeriesTimestamps(days(1), 365)) {
|
||||
// Compute the balance of each account at this timestamp.
|
||||
foreach (idx, account; accounts) {
|
||||
auto balance = getBalance(ds, account.id, timestamp);
|
||||
if (!balance.isNull) {
|
||||
data.accounts[idx].balanceTimeSeries ~= BalanceSnapshot(
|
||||
balance.value,
|
||||
timestamp.toISOExtString()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Compute total balances for this timestamp.
|
||||
auto totalBalances = getTotalBalanceForAllAccounts(ds, timestamp);
|
||||
foreach (bal; totalBalances) {
|
||||
// Assign the balance to one of our running totals.
|
||||
bool currencyFound = false;
|
||||
foreach (ref currencyTotal; data.totals) {
|
||||
if (currencyTotal.currencyCode == bal.currency.code) {
|
||||
currencyTotal.balanceTimeSeries ~= BalanceSnapshot(
|
||||
bal.balance,
|
||||
timestamp.toISOExtString()
|
||||
);
|
||||
currencyFound = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!currencyFound) {
|
||||
data.totals ~= TotalBalanceTimeSeries(
|
||||
bal.currency.code.idup,
|
||||
[BalanceSnapshot(bal.balance, timestamp.toISOExtString())]
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ds.doTransaction(() {
|
||||
ds.getPropertiesRepository().deleteAllByPrefix("analytics");
|
||||
ds.getPropertiesRepository().setProperty(
|
||||
"analytics.balanceTimeSeries",
|
||||
serializeToJsonPretty(data)
|
||||
);
|
||||
});
|
||||
infoF!"Computed account balance analytics for user %s, profile %s."(
|
||||
profile.username,
|
||||
profile.name
|
||||
);
|
||||
}
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
module analytics;
|
||||
|
||||
public import analytics.balances;
|
||||
|
||||
import profile.data;
|
||||
import profile.model;
|
||||
|
||||
/**
|
||||
* Helper function to run a function on each available user profile.
|
||||
* Params:
|
||||
* fn = The function to run.
|
||||
*/
|
||||
void doForAllUserProfiles(
|
||||
void function(Profile, ProfileRepository) fn
|
||||
) {
|
||||
import auth.data;
|
||||
import auth.data_impl_fs;
|
||||
import profile.data;
|
||||
import profile.data_impl_sqlite;
|
||||
|
||||
UserRepository userRepo = new FileSystemUserRepository();
|
||||
foreach (user; userRepo.findAll()) {
|
||||
ProfileRepository profileRepo = new FileSystemProfileRepository(user.username);
|
||||
foreach (prof; profileRepo.findAll()) {
|
||||
fn(prof, profileRepo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
module analytics.util;
|
||||
|
||||
import std.datetime;
|
||||
|
||||
SysTime[] generateTimeSeriesTimestamps(Duration intervalSize, int intervalCount) {
|
||||
const SysTime now = Clock.currTime(UTC());
|
||||
const SysTime endOfToday = SysTime(
|
||||
DateTime(now.year, now.month, now.day, 23, 59, 59),
|
||||
now.timezone
|
||||
);
|
||||
SysTime timestamp = endOfToday;
|
||||
SysTime[] timestamps = new SysTime[intervalCount + 1];
|
||||
timestamps[0] = timestamp;
|
||||
for (int i = 0; i < intervalCount; i++) {
|
||||
timestamp = timestamp - intervalSize;
|
||||
timestamps[i + 1] = timestamp;
|
||||
}
|
||||
return timestamps;
|
||||
}
|
||||
|
|
@ -22,7 +22,7 @@ HttpRequestHandler mapApiHandlers(string webOrigin) {
|
|||
h.map(HttpMethod.OPTIONS, "/**", &getOptions);
|
||||
|
||||
// Dev endpoint for sample data: REMOVE BEFORE DEPLOYING!!!
|
||||
h.map(HttpMethod.POST, "/sample-data", &sampleDataEndpoint);
|
||||
// h.map(HttpMethod.POST, "/sample-data", &sampleDataEndpoint);
|
||||
|
||||
// Auth endpoints:
|
||||
import auth.api;
|
||||
|
|
@ -66,7 +66,6 @@ HttpRequestHandler mapApiHandlers(string webOrigin) {
|
|||
a.map(HttpMethod.GET, ACCOUNT_PATH ~ "/value-records/:valueRecordId:ulong", &handleGetValueRecord);
|
||||
a.map(HttpMethod.POST, ACCOUNT_PATH ~ "/value-records", &handleCreateValueRecord);
|
||||
a.map(HttpMethod.DELETE, ACCOUNT_PATH ~ "/value-records/:valueRecordId:ulong", &handleDeleteValueRecord);
|
||||
a.map(HttpMethod.GET, ACCOUNT_PATH ~ "/balance-time-series", &handleGetAccountBalanceTimeSeries);
|
||||
|
||||
import transaction.api;
|
||||
// Transaction vendor endpoints:
|
||||
|
|
@ -91,6 +90,10 @@ HttpRequestHandler mapApiHandlers(string webOrigin) {
|
|||
|
||||
a.map(HttpMethod.GET, PROFILE_PATH ~ "/transaction-tags", &handleGetAllTags);
|
||||
|
||||
// Analytics endpoints:
|
||||
import analytics.api;
|
||||
a.map(HttpMethod.GET, PROFILE_PATH ~ "/analytics/balance-time-series", &handleGetBalanceTimeSeries);
|
||||
|
||||
import data_api;
|
||||
// Various other data endpoints:
|
||||
a.map(HttpMethod.GET, "/currencies", &handleGetCurrencies);
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import std.datetime;
|
|||
|
||||
import api_mapping;
|
||||
import util.config;
|
||||
import analytics;
|
||||
|
||||
void main() {
|
||||
const config = readConfig();
|
||||
|
|
@ -15,10 +16,15 @@ void main() {
|
|||
infoF!"Loaded app config: port = %d, webOrigin = %s"(config.port, config.webOrigin);
|
||||
|
||||
// Start scheduled tasks in a separate thread:
|
||||
// JobSchedule analyticsSchedule = new FixedIntervalSchedule(minutes(10));
|
||||
JobSchedule analyticsSchedule = new DailySchedule(TimeOfDay.min());
|
||||
|
||||
JobScheduler jobScheduler = new TaskPoolScheduler();
|
||||
jobScheduler.addJob(() {
|
||||
info("Executing scheduled job with fixed interval.");
|
||||
}, new FixedIntervalSchedule(minutes(1)));
|
||||
info("Computing account balance time series analytics for all users...");
|
||||
doForAllUserProfiles(&computeAccountBalanceTimeSeries);
|
||||
info("Done computing analytics!");
|
||||
}, analyticsSchedule);
|
||||
jobScheduler.start();
|
||||
|
||||
Http1TransportConfig transportConfig = defaultConfig();
|
||||
|
|
|
|||
|
|
@ -37,6 +37,8 @@ class FileSystemUserRepository : UserRepository {
|
|||
string username = baseName(entry.name);
|
||||
users ~= readUser(username);
|
||||
}
|
||||
import std.algorithm : sort;
|
||||
sort!((u1, u2) => u1.username < u2.username)(users);
|
||||
return users;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,8 +8,8 @@ module auth.model;
|
|||
* or more profiles.
|
||||
*/
|
||||
struct User {
|
||||
immutable string username;
|
||||
immutable string passwordHash;
|
||||
string username;
|
||||
string passwordHash;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ interface ProfileRepository {
|
|||
Profile[] findAll();
|
||||
void deleteByName(string name);
|
||||
ProfileDataSource getDataSource(in Profile profile);
|
||||
string getFilesPath(in Profile profile);
|
||||
}
|
||||
|
||||
/// Repository for accessing the properties of a profile.
|
||||
|
|
@ -19,6 +20,7 @@ interface PropertiesRepository {
|
|||
void setProperty(string name, string value);
|
||||
void deleteProperty(string name);
|
||||
ProfileProperty[] findAll();
|
||||
void deleteAllByPrefix(string prefix);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ class FileSystemProfileRepository : ProfileRepository {
|
|||
Optional!Profile findByName(string name) {
|
||||
string path = getProfilePath(name);
|
||||
if (!exists(path)) return Optional!Profile.empty;
|
||||
return Optional!Profile.of(new Profile(name));
|
||||
return Optional!Profile.of(new Profile(name, username));
|
||||
}
|
||||
|
||||
Profile createProfile(string name) {
|
||||
|
|
@ -43,7 +43,7 @@ class FileSystemProfileRepository : ProfileRepository {
|
|||
propsRepo.setProperty("name", name);
|
||||
propsRepo.setProperty("createdAt", Clock.currTime(UTC()).toISOExtString());
|
||||
propsRepo.setProperty("user", username);
|
||||
return new Profile(name);
|
||||
return new Profile(name, username);
|
||||
}
|
||||
|
||||
Profile[] findAll() {
|
||||
|
|
@ -55,7 +55,7 @@ class FileSystemProfileRepository : ProfileRepository {
|
|||
const suffix = ".sqlite";
|
||||
if (endsWith(entry.name, suffix)) {
|
||||
string profileName = baseName(entry.name, suffix);
|
||||
profiles ~= new Profile(profileName);
|
||||
profiles ~= new Profile(profileName, username);
|
||||
}
|
||||
}
|
||||
import std.algorithm.sorting : sort;
|
||||
|
|
@ -74,6 +74,10 @@ class FileSystemProfileRepository : ProfileRepository {
|
|||
return new SqliteProfileDataSource(getProfilePath(profile.name));
|
||||
}
|
||||
|
||||
string getFilesPath(in Profile profile) {
|
||||
return buildPath(getProfilesDir(), profile.name ~ "_files");
|
||||
}
|
||||
|
||||
private string getProfilesDir() {
|
||||
return buildPath(this.usersDir, username, "profiles");
|
||||
}
|
||||
|
|
@ -131,6 +135,14 @@ class SqlitePropertiesRepository : PropertiesRepository {
|
|||
}
|
||||
return props;
|
||||
}
|
||||
|
||||
void deleteAllByPrefix(string prefix) {
|
||||
util.sqlite.update(
|
||||
db,
|
||||
"DELETE FROM profile_property WHERE property LIKE ?",
|
||||
prefix ~ "%"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private const SCHEMA = import("sql/schema.sql");
|
||||
|
|
|
|||
|
|
@ -9,9 +9,11 @@ import profile.data;
|
|||
*/
|
||||
class Profile {
|
||||
string name;
|
||||
string username;
|
||||
|
||||
this(string name) {
|
||||
this(string name, string username) {
|
||||
this.name = name;
|
||||
this.username = username;
|
||||
}
|
||||
|
||||
override int opCmp(Object other) const {
|
||||
|
|
|
|||
|
|
@ -143,11 +143,6 @@ export interface CurrencyBalance {
|
|||
balance: number
|
||||
}
|
||||
|
||||
export interface BalanceTimeSeriesPoint {
|
||||
balance: number
|
||||
timestamp: string
|
||||
}
|
||||
|
||||
export class AccountApiClient extends ApiClient {
|
||||
readonly path: string
|
||||
readonly profileName: string
|
||||
|
|
@ -194,15 +189,6 @@ export class AccountApiClient extends ApiClient {
|
|||
return super.getJson(`/profiles/${this.profileName}/account-balances`)
|
||||
}
|
||||
|
||||
getBalanceTimeSeries(
|
||||
accountId: number,
|
||||
timeZoneOffsetMinutes: number,
|
||||
): Promise<BalanceTimeSeriesPoint[]> {
|
||||
return super.getJson(
|
||||
`/profiles/${this.profileName}/accounts/${accountId}/balance-time-series?time-zone-offset=${timeZoneOffsetMinutes}`,
|
||||
)
|
||||
}
|
||||
|
||||
getValueRecords(accountId: number, pageRequest: PageRequest): Promise<Page<AccountValueRecord>> {
|
||||
return super.getJsonPage(this.path + '/' + accountId + '/value-records', pageRequest)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,39 @@
|
|||
import type { RouteLocation } from 'vue-router'
|
||||
import { ApiClient } from './base'
|
||||
import { getSelectedProfile } from './profile'
|
||||
|
||||
export interface BalanceTimeSeriesAnalytics {
|
||||
accounts: AccountBalanceTimeSeries[]
|
||||
totals: TotalBalanceTimeSeries[]
|
||||
}
|
||||
|
||||
export interface AccountBalanceTimeSeries {
|
||||
accountId: number
|
||||
currencyCode: string
|
||||
balanceTimeSeries: BalanceSnapshot[]
|
||||
}
|
||||
|
||||
export interface TotalBalanceTimeSeries {
|
||||
currencyCode: string
|
||||
balanceTimeSeries: BalanceSnapshot[]
|
||||
}
|
||||
|
||||
export interface BalanceSnapshot {
|
||||
balance: number
|
||||
timestamp: string
|
||||
}
|
||||
|
||||
export class AnalyticsApiClient extends ApiClient {
|
||||
readonly profileName: string
|
||||
readonly path: string
|
||||
|
||||
constructor(route: RouteLocation) {
|
||||
super()
|
||||
this.profileName = getSelectedProfile(route)
|
||||
this.path = `/profiles/${this.profileName}/analytics`
|
||||
}
|
||||
|
||||
getBalanceTimeSeries(): Promise<BalanceTimeSeriesAnalytics> {
|
||||
return super.getJson(this.path + '/balance-time-series')
|
||||
}
|
||||
}
|
||||
|
|
@ -8,6 +8,7 @@ import { Line } from 'vue-chartjs';
|
|||
import { useRoute } from 'vue-router';
|
||||
import 'chartjs-adapter-date-fns';
|
||||
import { integerMoneyToFloat } from '@/api/data';
|
||||
import { AnalyticsApiClient } from '@/api/analytics';
|
||||
|
||||
const route = useRoute()
|
||||
|
||||
|
|
@ -31,22 +32,38 @@ const COLORS = [
|
|||
|
||||
onMounted(async () => {
|
||||
const api = new AccountApiClient(route)
|
||||
const analyticsApi = new AnalyticsApiClient(route)
|
||||
const accounts = await api.getAccounts()
|
||||
const timeSeriesData = await analyticsApi.getBalanceTimeSeries()
|
||||
const datasets: ChartDataset<"line">[] = []
|
||||
const timeZoneOffset = -(new Date().getTimezoneOffset())
|
||||
// const timeZoneOffset = -(new Date().getTimezoneOffset())
|
||||
let colorIdx = 0
|
||||
for (const account of accounts) {
|
||||
if (account.currency.code !== 'USD') continue
|
||||
const points = await api.getBalanceTimeSeries(account.id, timeZoneOffset)
|
||||
|
||||
for (const accountData of timeSeriesData.accounts) {
|
||||
if (accountData.currencyCode !== 'USD') continue
|
||||
const account = accounts.find(a => a.id === accountData.accountId)
|
||||
if (!account) {
|
||||
console.warn("Couldn't find account id " + accountData.accountId)
|
||||
continue
|
||||
}
|
||||
|
||||
const color = COLORS[colorIdx++]
|
||||
const points = accountData.balanceTimeSeries.map(p => {
|
||||
return {
|
||||
x: getTime(p.timestamp),
|
||||
y: integerMoneyToFloat(p.balance, account.currency)
|
||||
}
|
||||
})
|
||||
datasets.push({
|
||||
label: "Account #" + account.numberSuffix,
|
||||
data: points.map(p => {
|
||||
return { x: getTime(p.timestamp), y: integerMoneyToFloat(p.balance, account.currency) }
|
||||
}),
|
||||
data: points,
|
||||
cubicInterpolationMode: "monotone",
|
||||
borderColor: `rgb(${color[0]}, ${color[1]}, ${color[2]})`,
|
||||
backgroundColor: `rgba(${color[0]}, ${color[1]}, ${color[2]}, 0.25)`
|
||||
backgroundColor: `rgba(${color[0]}, ${color[1]}, ${color[2]}, 0.25)`,
|
||||
pointRadius: 0,
|
||||
borderWidth: 2,
|
||||
pointHoverRadius: 5,
|
||||
pointHitRadius: 5,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue