import { Entity, run_on_worker } from "@infinity-beyond/classes/entity.ts";
import State from "@infinity-beyond/modules/state.ts";
import { LedgerMeta } from "@infinity-beyond/classes/data_types/ledger/ledger.types.ts"
import type { LedgerMetaType, LedgerUser, LedgerEntry, LedgerEvents, LedgerOptions, LedgerExpiration, LedgerExpirationOverride, UnenteredLedgerEntry } from "@infinity-beyond/classes/data_types/ledger/ledger.types.ts"
import { StripObject } from "@infinity-beyond/modules/strip_object.ts";
import type { EntityMetaConfig } from "@infinity-beyond/classes/entity_meta.ts";
import * as LedgerQuery from "@infinity-beyond/classes/data_types/ledger/ledger.queries.ts";
import dayjs from "npm:dayjs";
type With<T, K extends Record<string, any>> = T & { [P in keyof K]: K[P] }
// deno-lint-ignore ban-types
export class Ledger<PEMC extends EntityMetaConfig = {}> extends Entity<LedgerEvents, LedgerMetaType, PEMC> {
readonly allow_negative_balances: boolean
readonly key_max_length: number;
readonly expiration?: LedgerExpiration
constructor(name: string, { allow_negative_balances, key_max_length, meta_fields, expiration }: LedgerOptions<PEMC> = {}) {
super(name, LedgerMeta as LedgerMetaType, meta_fields as PEMC);
this.allow_negative_balances = allow_negative_balances ?? false;
this.key_max_length = key_max_length ?? 11;
this.expiration = expiration;
}
/**
* Add a new entry to the ledger, and updates the balance.
*
* @param key The unique value referring to a user. Usually their `msisdn`
* @param amount The amount to add to this user. Negative numbers will subtract from their balance.
* @returns The new balance
*/
@run_on_worker
async AddEntry(params: UnenteredLedgerEntry & LedgerExpirationOverride) {
if(!this.allow_negative_balances && params.amount < 0) {
const balance = await this.BalanceFor(params.key);
params.remaining = 0;
const reason = 'insufficient balance';
this.emit('entry_creation_failed', reason, params as With<typeof params, { remaining: number }>);
if(balance + params.amount < 0) return {
added: false,
reason,
};
}
let expiration_date: Date | undefined;
if(this.expiration?.entries_expire) {
const { custom_calculator, period, round_up_to } = this.expiration;
expiration_calc: {
if(typeof custom_calculator === 'function') {
expiration_date = custom_calculator();
break expiration_calc;
}
if(period) {
let expiration = dayjs().add(period[0], period[1]);
if(round_up_to && round_up_to !== 'none') expiration = expiration.endOf(round_up_to);
expiration_date = expiration.toDate();
}
}
if(!params.override_expiration_date) {
params.expiration_date = expiration_date;
}
params.expired = params.expiration_date && dayjs().isAfter(params.expiration_date);
} else {
params.expiration_date = undefined;
}
delete params.override_expiration_date;
params.remaining = params.amount;
if(params.reference_entry_id && params.amount < 0) {
const update_reference_entry_query = `UPDATE ${this.entries_table} SET remaining = MAX(0, MIN(remaining, remaining - $2)) WHERE id=$1`;
const { rowCount = 0 } = await State.PostgresClient.query(update_reference_entry_query, [params.reference_entry_id, Math.abs(params.amount)]);
if(rowCount > 0) params.remaining = 0;
}
const { fields, placeholders, values } = StripObject<UnenteredLedgerEntry & LedgerExpirationOverride>(params, {
key: '',
amount: '',
remaining: '',
reason: '',
description: '',
reference: '',
meta: '',
source: '',
request_id: '',
expiration_date: '',
expired: '',
reference_entry_id: ''
});
const insert_entries_query = `INSERT INTO ${this.entries_table} (${fields}) values (${placeholders}) returning id, timestamp`;
const {
rowCount: entry_insert_count,
warnings: entry_insert_warnings,
rows: [{ id: new_entry_id, timestamp: new_entry_timestamp }]
} = await State.PostgresClient.query<{ id: number, timestamp: Date }>(insert_entries_query, values);
if(entry_insert_count == 0) {
const reason = `Could not insert into ${this.entries_table}!\n${entry_insert_warnings.map(w => w.message).join(', ')}`;
this.emit('entry_creation_failed', reason, params as With<typeof params, { remaining: number }>);
return {
added: false,
reason,
};
}
const insert_balance_query = `INSERT INTO ${this.user_table} (key, balance, entry_count) values ($1, $2, 1) ON CONFLICT (key) DO UPDATE SET balance = ${this.user_table}.balance + excluded.balance, entry_count = ${this.user_table}.entry_count + 1, last_entry = NOW() RETURNING ${this.user_table}.balance`;
const {
rowCount: balance_insert_count,
warnings: balance_insert_warnings,
rows: [{ balance }]
} = await State.PostgresClient.query<LedgerUser>(
insert_balance_query, [params.key, params.amount]
);
if(balance_insert_count == 0) {
const reason = `Could not update balance!\n[${balance_insert_warnings.map(w => w.message).join(', ')}]`;
this.emit('entry_creation_failed', reason, params as With<typeof params, { remaining: number }>);
return {
added: false,
reason,
};
}
await this.meta.increment('total_entries', 1);
await this.meta.increment('aggregate_total', params.amount);
const entry = params as LedgerEntry;
entry.id = new_entry_id;
entry.timestamp = new_entry_timestamp;
this.emit('entry_created', entry);
return {
added: true,
balance,
}
}
// TODO: REMOVE
/**
* Find an entry linked to a specific user
*
* @param key The unique value referring to the user. Usually their `msisdn`
* @param options Search options
*
* @deprecated
*/
@run_on_worker
async FindEntryFor(key: string, { oldest = true , include_expired, needs_remaining = true }: { oldest?: boolean, include_expired?: boolean, needs_remaining?: boolean } = {}) {
let query = `SELECT * FROM ${this.entries_table} WHERE key=$1`;
if(!include_expired) query += ` AND expired = false`;
if(needs_remaining) query += ` AND remaining > 0`;
query += ` ORDER BY timestamp ${oldest ? 'ASC' : 'DESC'} LIMIT 1`;
const { rows: [ entry ] } = await State.PostgresClient.query<LedgerEntry>(query, [key]);
return entry ?? null;
}
/**
* Consume from positive past entries linked to a specific user
*
* @param key The unique value referring to the user. Usually their `msisdn`
* @param amount How much value should be consumed
*
* @returns Whether the consumption was successfull
*/
@run_on_worker
async ConsumeEntries(key: string, amount: number) {
const { rows } = await LedgerQuery.FindConsumptionEntries
.populate({ entries_table: this.entries_table })
.execute(State.PostgresClient, key, amount);
console.log(rows);
return rows;
// return false;
}
/**
* Get recent entries to the ledger history
*
* @param limit How many entries should be retrieved?
*
* Default: 50
*/
@run_on_worker
async History(count = 50) {
if(!Number.isSafeInteger(count)) count = 10;
count = Math.max(1, count);
const { rows: entries = [] } = await State.PostgresClient.query<LedgerEntry>(`select * from ${this.entries_table} order by timestamp desc limit $1`, [count]);
return entries;
}
/**
* Get the ledger history for a user
*
* @param key The unique value referring to a user. Usually their `msisdn`
* @param limit How many entries should be retrieved?
*
* Default: 10
*/
@run_on_worker
async HistoryFor(key: string, count = 10) {
if(!Number.isSafeInteger(count)) count = 10;
count = Math.max(1, count);
const { rows: entries = [] } = await State.PostgresClient.query<LedgerEntry>(`select * from ${this.entries_table} where key=$1 order by timestamp desc limit $2`, [key, count]);
return entries;
}
/**
* Get the total balance for a user
*
* @param key The unique value referring to a user. Usually their `msisdn`
*/
@run_on_worker
async BalanceFor(key: string) {
const { rows: [{ balance = 0 } = {}] } = await State.PostgresClient.query<LedgerUser>(`select balance from ${this.user_table} where key=$1`, [key]);
return balance;
}
/**
* Get the total balance for multiple user
*
* @param keys The unique values used to refer to a user. Usually their `msisdn`
*/
@run_on_worker
async Balances(...keys: string[]): Promise<Pick<LedgerUser, 'key' | 'balance'>[]> {
const { rows } = await State.PostgresClient.query<LedgerUser>(`select balance from ${this.user_table} where key = ANY($1)`, [keys]);
return rows.map(({ key, balance }) => ({ key, balance }));
}
/**
* Get the amount of ledger entries for a user
*
* @param key The unique value referring to a user. Usually their `msisdn`
*/
@run_on_worker
async CountFor(key: string) {
const { rows: [{ entry_count = 0 } = {}] } = await State.PostgresClient.query<LedgerUser>(`select balance from ${this.user_table} where key=$1`, [key]);
return entry_count;
}
/**
* Get the total number of users that have been tracked in this ledger.
*
* A user is only tracked once an entry has been added for them.
*/
@run_on_worker
async UserCount() {
const { rows: [{ count = 0 } = {}] } = await State.PostgresClient.query<{ count: number }>(`select count(*)::int as count from ${this.user_table}`);
return count;
}
get entries_table() {
return `${this.slug}_entries`;
}
get user_table() {
return `${this.slug}_users`;
}
override async Setup() {
await super.Setup();
const entries_table = this.entries_table;
const key_max_length = this.key_max_length;
await State.PostgresClient.CreateTable<LedgerEntry>(entries_table, {
id: "serial PRIMARY KEY",
key: `varchar(${key_max_length}) NOT NULL`,
amount: "int NOT NULL",
remaining: "int NOT NULL",
timestamp: "timestamptz NOT NULL default NOW()",
reason: 'text',
description: 'text',
reference: 'text',
meta: 'text',
source: 'text NOT NULL',
request_id: 'text',
expiration_date: "timestamptz",
expired: 'boolean NOT NULL DEFAULT false',
reference_entry_id: `int references ${entries_table}(id)`,
}, {
async onCreate() {
await State.PostgresClient.query(`
CREATE INDEX ${entries_table}_key_idx ON ${entries_table} (key);
CREATE INDEX ${entries_table}_key_timestamp_idx ON ${entries_table} (key, timestamp desc);
`);
}
});
const user_table = this.user_table;
await State.PostgresClient.CreateTable<LedgerUser>(user_table, {
id: "serial PRIMARY KEY",
balance: "int NOT NULL DEFAULT 0",
entry_count: "int NOT NULL default 0",
key: `varchar(${key_max_length}) UNIQUE NOT NULL`,
first_entry: 'timestamptz NOT NULL DEFAULT NOW()',
last_entry: 'timestamptz NOT NULL DEFAULT NOW()',
}, {
async onCreate() {
await State.PostgresClient.query(`
CREATE INDEX ${user_table}_key_idx ON ${entries_table} (key);
CREATE INDEX ${user_table}_key_timestamp_idx ON ${entries_table} (key, timestamp desc);
`);
}
});
return this;
}
}
Ledger.on('ledger created', (instance) => {
console.log(`[${instance.name}] Ledger initialized`);
})