Added real-time balance time series analytics endpoint.

This commit is contained in:
Andrew Lalis 2026-03-01 10:35:50 -05:00
parent d7630f3c15
commit fb7850c181
10 changed files with 332 additions and 49 deletions

View File

@ -1,17 +1,16 @@
module analytics.api; module analytics.api;
import handy_http_primitives; import handy_http_primitives;
import handy_http_handlers.path_handler : PathMapping; import handy_http_handlers.path_handler : PathMapping, GetMapping;
import handy_http_data : writeJsonBody;
import std.datetime;
import profile.data; import profile.data;
import profile.service; import profile.service;
import profile.api : PROFILE_PATH; import profile.api : PROFILE_PATH;
import analytics.balances;
@PathMapping(HttpMethod.GET, PROFILE_PATH ~ "/analytics/balance-time-series") import util.money;
void handleGetBalanceTimeSeries(ref ServerHttpRequest request, ref ServerHttpResponse response) { import util.data;
auto ds = getProfileDataSource(request);
serveJsonFromProperty(response, ds, "analytics.balanceTimeSeries");
}
@PathMapping(HttpMethod.GET, PROFILE_PATH ~ "/analytics/category-spend-time-series") @PathMapping(HttpMethod.GET, PROFILE_PATH ~ "/analytics/category-spend-time-series")
void handleGetCategorySpendTimeSeries(ref ServerHttpRequest request, ref ServerHttpResponse response) { void handleGetCategorySpendTimeSeries(ref ServerHttpRequest request, ref ServerHttpResponse response) {
@ -19,6 +18,15 @@ void handleGetCategorySpendTimeSeries(ref ServerHttpRequest request, ref ServerH
serveJsonFromProperty(response, ds, "analytics.categorySpendTimeSeries"); serveJsonFromProperty(response, ds, "analytics.categorySpendTimeSeries");
} }
@GetMapping(PROFILE_PATH ~ "/analytics/balance-time-series")
void handleGetBalanceTimeSeriesV2(ref ServerHttpRequest request, ref ServerHttpResponse response) {
auto ds = getProfileDataSource(request);
Currency currency = Currency.ofCode(request.getParamAs!string("currency", Currencies.USD.code));
TimeRange timeRange = TimeRange(Optional!(SysTime).empty(), Optional!(SysTime).empty());
auto data = computeBalanceTimeSeriesV2(ds, currency, timeRange);
writeJsonBody(response, data);
}
/** /**
* Helper method to serve JSON analytics data to a client by fetching it * Helper method to serve JSON analytics data to a client by fetching it
* directly from the user's profile properties table and writing it to the * directly from the user's profile properties table and writing it to the

View File

@ -5,6 +5,9 @@ import std.datetime;
import std.stdio; import std.stdio;
import std.path; import std.path;
import std.file; import std.file;
import std.algorithm;
import std.array;
import std.conv;
import slf4d; import slf4d;
import asdf; import asdf;
@ -17,6 +20,9 @@ import analytics.util;
import transaction.model; import transaction.model;
import transaction.dto; import transaction.dto;
import util.pagination; import util.pagination;
import util.money;
import util.data;
import analytics.data;
struct TimeSeriesPoint { struct TimeSeriesPoint {
/// The millisecond UTC timestamp. /// The millisecond UTC timestamp.
@ -25,64 +31,130 @@ struct TimeSeriesPoint {
long y; long y;
} }
alias CurrencyGroupedTimeSeries = TimeSeriesPoint[][string];
struct AccountBalanceData { struct AccountBalanceData {
ulong accountId; ulong accountId;
string currencyCode;
TimeSeriesPoint[] data; TimeSeriesPoint[] data;
} }
struct BalanceTimeSeriesAnalytics { struct BalanceTimeSeriesAnalytics {
AccountBalanceData[] accounts; AccountBalanceData[] accounts;
CurrencyGroupedTimeSeries totals; TimeSeriesPoint[] totals;
} }
void computeAccountBalanceTimeSeries(Profile profile, ProfileRepository profileRepo) { /**
ProfileDataSource ds = profileRepo.getDataSource(profile); * Computes a time series tracking the balance of each account (and total of
Account[] accounts = ds.getAccountRepository().findAll(); * all accounts) over the given time range.
* Params:
* ds = The profile data source.
* currency = The currency to report data for.
* timeRange = The time range to generate the time series for.
* Returns: An analytics response containing a "totals" time series, as well
* as a time series for each known account in the given time range.
*/
BalanceTimeSeriesAnalytics computeBalanceTimeSeriesV2(
ProfileDataSource ds,
in Currency currency,
in TimeRange timeRange
) {
SysTime[] timestamps = generateTimeSeriesTimestamps(days(1), timeRange);
AnalyticsRepository repo = ds.getAnalyticsRepository();
JournalEntryStub[] journalEntries = repo.getJournalEntries(currency, timeRange);
BalanceRecordStub[] balanceRecords = repo.getBalanceRecords(currency, timeRange);
auto accountIds = balanceRecords.map!(br => br.accountId).uniq.array.sort;
// Initialize the data structure that'll store the analytics info. BalanceTimeSeriesAnalytics result;
BalanceTimeSeriesAnalytics data;
foreach (account; accounts) {
AccountBalanceData accountData;
accountData.accountId = account.id;
accountData.currencyCode = account.currency.code.idup;
data.accounts ~= accountData;
}
foreach (timestamp; generateTimeSeriesTimestamps(days(1), 365)) { foreach (timestamp; timestamps) {
// Compute the balance of each account at this timestamp. long totalBalance = 0;
foreach (idx, account; accounts) { foreach (accountId; accountIds) {
auto balance = getBalance(ds, account.id, timestamp); Optional!long optionalBalance = deriveBalance(accountId, journalEntries, balanceRecords, timestamp);
if (!balance.isNull) { if (!optionalBalance.isNull) {
data.accounts[idx].data ~= TimeSeriesPoint( TimeSeriesPoint p = TimeSeriesPoint(timestamp.toUnixMillis(), optionalBalance.value);
timestamp.toUnixMillis(), bool isAccountDataPresent = false;
balance.value foreach (ref accountData; result.accounts) {
); if (accountData.accountId == accountId) {
accountData.data ~= p;
isAccountDataPresent = true;
break;
}
}
if (!isAccountDataPresent) {
result.accounts ~= AccountBalanceData(accountId, [p]);
}
totalBalance += optionalBalance.value;
} }
} }
result.totals ~= TimeSeriesPoint(timestamp.toUnixMillis(), totalBalance);
// Compute total balances for this timestamp.
auto totalBalances = getTotalBalanceForAllAccounts(ds, timestamp);
foreach (CurrencyBalance bal; totalBalances) {
data.totals[bal.currency.code.idup] ~= TimeSeriesPoint(timestamp.toUnixMillis(), bal.balance);
}
} }
ds.doTransaction(() { return result;
ds.getPropertiesRepository().deleteProperty("analytics.balanceTimeSeries");
ds.getPropertiesRepository().setProperty(
"analytics.balanceTimeSeries",
serializeToJsonPretty(data)
);
});
infoF!"Computed account balance analytics for user %s, profile %s."(
profile.username,
profile.name
);
} }
private Optional!long deriveBalance(
ulong accountId,
in JournalEntryStub[] journalEntries,
in BalanceRecordStub[] balanceRecords,
in SysTime timestamp
) {
import core.time : abs;
Optional!BalanceRecordStub nearestBalanceRecord;
foreach (br; balanceRecords) {
if (br.accountId == accountId) {
if (nearestBalanceRecord.isNull) {
nearestBalanceRecord = Optional!(BalanceRecordStub).of(br);
} else {
Duration currentDiff = abs(nearestBalanceRecord.value.timestamp - timestamp);
Duration newDiff = abs(br.timestamp - timestamp);
if (newDiff < currentDiff) {
nearestBalanceRecord = Optional!(BalanceRecordStub).of(br);
}
}
}
}
if (nearestBalanceRecord.isNull) {
return Optional!(long).empty();
}
if (timestamp == nearestBalanceRecord.value.timestamp) {
return Optional!(long).of(nearestBalanceRecord.value.value);
}
// Now that we have a balance record, work our way towards the desired
// timestamp, applying journal entry changes.
SysTime startTimestamp;
SysTime endTimestamp;
long balance = nearestBalanceRecord.value.value;
if (timestamp > nearestBalanceRecord.value.timestamp) {
startTimestamp = nearestBalanceRecord.value.timestamp;
endTimestamp = timestamp;
} else {
startTimestamp = timestamp;
endTimestamp = nearestBalanceRecord.value.timestamp;
}
auto relevantJournalEntries = journalEntries
.filter!(je => (
je.accountId == accountId &&
je.timestamp >= startTimestamp &&
je.timestamp <= endTimestamp
));
foreach (je; relevantJournalEntries) {
long entryValue = je.amount;
if (je.type == AccountJournalEntryType.CREDIT) {
entryValue *= -1;
}
if (!je.accountType.debitsPositive) {
entryValue *= -1;
}
if (je.timestamp < nearestBalanceRecord.value.timestamp) {
entryValue *= -1;
}
balance += entryValue;
}
return Optional!(long).of(balance);
}
alias CurrencyGroupedTimeSeries = TimeSeriesPoint[][string];
struct CategorySpendData { struct CategorySpendData {
ulong categoryId; ulong categoryId;
string categoryName; string categoryName;

View File

@ -0,0 +1,37 @@
module analytics.data;
import std.datetime;
import util.money;
import util.data;
import analytics.balances;
import account.model;
struct JournalEntryStub {
SysTime timestamp;
ulong accountId;
AccountType accountType;
ulong amount;
AccountJournalEntryType type;
}
struct BalanceRecordStub {
SysTime timestamp;
ulong accountId;
long value;
}
/**
* Repository that provides various functions for fetching data that's used in
* the calculation of analytics, separate from usual app functionality.
*/
interface AnalyticsRepository {
JournalEntryStub[] getJournalEntries(
in Currency currency,
in TimeRange timeRange
);
BalanceRecordStub[] getBalanceRecords(
in Currency currency,
in TimeRange timeRange
);
}

View File

@ -0,0 +1,104 @@
module analytics.data_impl_sqlite;
import d2sqlite3;
import std.array;
import std.datetime;
import util.money;
import util.data;
import util.sqlite;
import account.model : AccountJournalEntryType, AccountType;
import analytics.balances;
import analytics.data;
class SqliteAnalyticsRepository : AnalyticsRepository {
private Database db;
this(Database db) {
this.db = db;
}
JournalEntryStub[] getJournalEntries(
in Currency currency,
in TimeRange timeRange
) {
QueryBuilder qb = QueryBuilder("account_journal_entry je")
.select("je.timestamp,je.account_id,je.amount,je.type,account.type")
.join("LEFT JOIN account ON account.id = je.account_id")
.where("UPPER(je.currency) = ?")
.withArgBinding((ref stmt, ref idx) {
stmt.bind(idx++, currency.codeString);
});
if (timeRange.fromTime) {
qb.where("je.timestamp >= ?")
.withArgBinding((ref stmt, ref idx) {
stmt.bind(idx++, timeRange.fromTime.value);
});
}
if (timeRange.toTime) {
qb.where("je.timestamp <= ?")
.withArgBinding((ref stmt, ref idx) {
stmt.bind(idx++, timeRange.fromTime.value);
});
}
string query = qb.build() ~ " ORDER BY je.timestamp";
Statement stmt = db.prepare(query);
qb.applyArgBindings(stmt);
ResultRange result = stmt.execute();
Appender!(JournalEntryStub[]) app;
foreach (row; result) {
auto journalEntryTypeStr = row.peek!(string, PeekMode.slice)(3);
AccountJournalEntryType type;
if (journalEntryTypeStr == AccountJournalEntryType.CREDIT) {
type = AccountJournalEntryType.CREDIT;
} else {
type = AccountJournalEntryType.DEBIT;
}
app ~= JournalEntryStub(
SysTime.fromISOExtString(row.peek!(string, PeekMode.slice)(0)),
row.peek!ulong(1),
AccountType.fromId(row.peek!(string, PeekMode.slice)(4)),
row.peek!ulong(2),
type
);
}
return app[];
}
BalanceRecordStub[] getBalanceRecords(
in Currency currency,
in TimeRange timeRange
) {
QueryBuilder qb = QueryBuilder("account_value_record")
.select("timestamp,account_id,value,type")
.where("UPPER(currency) = ?")
.withArgBinding((ref stmt, ref idx) {
stmt.bind(idx++, currency.codeString);
})
.where("UPPER(type) = 'BALANCE'");
if (timeRange.fromTime) {
qb.where("timestamp >= ?")
.withArgBinding((ref stmt, ref idx) {
stmt.bind(idx++, timeRange.fromTime.value);
});
}
if (timeRange.toTime) {
qb.where("timestamp <= ?")
.withArgBinding((ref stmt, ref idx) {
stmt.bind(idx++, timeRange.fromTime.value);
});
}
string query = qb.build() ~ " ORDER BY timestamp";
Statement stmt = db.prepare(query);
qb.applyArgBindings(stmt);
ResultRange result = stmt.execute();
Appender!(BalanceRecordStub[]) app;
foreach (row; result) {
app ~= BalanceRecordStub(
SysTime.fromISOExtString(row.peek!(string, PeekMode.slice)(0)),
row.peek!ulong(1),
row.peek!long(2)
);
}
return app[];
}
}

View File

@ -1,6 +1,8 @@
module analytics.util; module analytics.util;
import std.datetime; import std.datetime;
import std.array;
import util.data;
SysTime[] generateTimeSeriesTimestamps(Duration intervalSize, int intervalCount) { SysTime[] generateTimeSeriesTimestamps(Duration intervalSize, int intervalCount) {
const SysTime now = Clock.currTime(UTC()); const SysTime now = Clock.currTime(UTC());
@ -18,6 +20,34 @@ SysTime[] generateTimeSeriesTimestamps(Duration intervalSize, int intervalCount)
return timestamps; return timestamps;
} }
SysTime[] generateTimeSeriesTimestamps(Duration intervalSize, in TimeRange timeRange) {
SysTime endOfRange;
SysTime startOfRange;
if (timeRange.toTime.isNull) {
endOfRange = Clock.currTime(UTC());
} else {
endOfRange = timeRange.toTime.value;
}
if (timeRange.fromTime.isNull) {
startOfRange = endOfRange;
startOfRange.add!"years"(-5);
} else {
startOfRange = timeRange.fromTime.value;
}
import std.stdio;
writefln!"start = %s, end = %s"(startOfRange, endOfRange);
Appender!(SysTime[]) app;
app ~= startOfRange;
SysTime timestamp = startOfRange;
while (timestamp + intervalSize <= endOfRange) {
timestamp += intervalSize;
app ~= timestamp;
}
return app[];
}
ulong toUnixMillis(in SysTime ts) { ulong toUnixMillis(in SysTime ts) {
return (ts - SysTime(unixTimeToStdTime(0))).total!"msecs"; return (ts - SysTime(unixTimeToStdTime(0))).total!"msecs";
} }

View File

@ -40,6 +40,7 @@ interface ProfileDataSource {
import account.data; import account.data;
import transaction.data; import transaction.data;
import attachment.data; import attachment.data;
import analytics.data;
PropertiesRepository getPropertiesRepository(); PropertiesRepository getPropertiesRepository();
AttachmentRepository getAttachmentRepository(); AttachmentRepository getAttachmentRepository();
@ -53,6 +54,8 @@ interface ProfileDataSource {
TransactionTagRepository getTransactionTagRepository(); TransactionTagRepository getTransactionTagRepository();
TransactionRepository getTransactionRepository(); TransactionRepository getTransactionRepository();
AnalyticsRepository getAnalyticsRepository();
void doTransaction(void delegate () dg); void doTransaction(void delegate () dg);
} }
@ -61,6 +64,7 @@ version(unittest) {
import account.data; import account.data;
import transaction.data; import transaction.data;
import attachment.data; import attachment.data;
import analytics.data;
PropertiesRepository getPropertiesRepository() { PropertiesRepository getPropertiesRepository() {
throw new Exception("Not implemented"); throw new Exception("Not implemented");
@ -89,6 +93,9 @@ version(unittest) {
TransactionRepository getTransactionRepository() { TransactionRepository getTransactionRepository() {
throw new Exception("Not implemented"); throw new Exception("Not implemented");
} }
AnalyticsRepository getAnalyticsRepository() {
throw new Exception("Not implemented");
}
void doTransaction(void delegate () dg) { void doTransaction(void delegate () dg) {
throw new Exception("Not implemented"); throw new Exception("Not implemented");
} }

View File

@ -181,6 +181,8 @@ class SqliteProfileDataSource : ProfileDataSource {
import transaction.data_impl_sqlite; import transaction.data_impl_sqlite;
import attachment.data; import attachment.data;
import attachment.data_impl_sqlite; import attachment.data_impl_sqlite;
import analytics.data;
import analytics.data_impl_sqlite;
private const string dbPath; private const string dbPath;
Database db; Database db;
@ -195,6 +197,7 @@ class SqliteProfileDataSource : ProfileDataSource {
TransactionCategoryRepository transactionCategoryRepo; TransactionCategoryRepository transactionCategoryRepo;
TransactionTagRepository transactionTagRepo; TransactionTagRepository transactionTagRepo;
TransactionRepository transactionRepo; TransactionRepository transactionRepo;
AnalyticsRepository analyticsRepo;
this(string path) { this(string path) {
this.dbPath = path; this.dbPath = path;
@ -276,6 +279,13 @@ class SqliteProfileDataSource : ProfileDataSource {
return transactionRepo; return transactionRepo;
} }
AnalyticsRepository getAnalyticsRepository() {
if (analyticsRepo is null) {
analyticsRepo = new SqliteAnalyticsRepository(db);
}
return analyticsRepo;
}
void doTransaction(void delegate () dg) { void doTransaction(void delegate () dg) {
util.sqlite.doTransaction(db, dg); util.sqlite.doTransaction(db, dg);
} }

View File

@ -15,7 +15,6 @@ void startScheduledJobs() {
JobScheduler jobScheduler = new TaskPoolScheduler(); JobScheduler jobScheduler = new TaskPoolScheduler();
jobScheduler.addJob(() { jobScheduler.addJob(() {
info("Computing account balance time series analytics for all users..."); info("Computing account balance time series analytics for all users...");
doForAllUserProfiles(&computeAccountBalanceTimeSeries);
doForAllUserProfiles(&computeCategorySpendTimeSeries); doForAllUserProfiles(&computeCategorySpendTimeSeries);
info("Done computing analytics!"); info("Done computing analytics!");
}, analyticsSchedule); }, analyticsSchedule);

View File

@ -3,6 +3,7 @@ module util.data;
import handy_http_primitives; import handy_http_primitives;
import handy_http_data.multipart; import handy_http_data.multipart;
import std.typecons; import std.typecons;
import std.datetime;
Optional!T toOptional(T)(Nullable!T value) { Optional!T toOptional(T)(Nullable!T value) {
if (value.isNull) { if (value.isNull) {
@ -125,3 +126,13 @@ private MultipartFile parseMultipartFile(in MultipartElement e) {
cast(ubyte[]) e.content cast(ubyte[]) e.content
); );
} }
/**
* Struct representing a user-provided time range with optional "from" and "to"
* timestamps. If both are empty, it is assumed that the user is requesting all
* data.
*/
struct TimeRange {
Optional!SysTime fromTime;
Optional!SysTime toTime;
}

View File

@ -27,6 +27,11 @@ struct Currency {
} }
throw new Exception("Unknown currency code: " ~ code); throw new Exception("Unknown currency code: " ~ code);
} }
string codeString() const {
import std.conv : to;
return code.to!string;
}
} }
/** /**