import dayjs from "npm:dayjs";
import State from "@infinity-beyond/modules/state.ts";
import { Entity, run_on_worker } from "@infinity-beyond/classes/entity.ts";
import type { EntityMetaConfig } from "@infinity-beyond/classes/entity_meta.ts";
import { LedgerMeta, type LedgerMetaType } from "@infinity-ledger/ledger.types/ledger.meta.ts"
import { LedgerPermissions, type LedgerPermissionsType } from "@infinity-ledger/ledger.types/ledger.permissions.ts";
import type { LedgerEvents } from "@infinity-ledger/ledger.types/ledger.events.ts";
import type { UnenteredLedgerEntry, LedgerEntry } from "@infinity-ledger/ledger.types/ledger.entry.ts";
import type { LedgerOptions, LedgerExpiration, LedgerExpirationOverride, KeyValidator, KeyValidation, KeyFormatter } from "@infinity-ledger/ledger.types/ledger.options.ts";
import type { LedgerUser } from "@infinity-ledger/ledger.types/ledger.user.ts";
import type { LedgerConsumption, LedgerConsumptionResponse, UnenteredLedgerConsumption } from "@infinity-ledger/ledger.types/ledger.consumption.ts";
import { AddLedgerEntry } from "@infinity-ledger/ledger.db.queries/add_entry.ledger.query.ts";
import { ConsumeFromLedger, type ConsumptionResponseMapped } from "@infinity-ledger/ledger.db.queries/consume.ledger.query.ts";
import { type EntryDataResponse, GetLedgerEntryData } from "@infinity-ledger/ledger.db.queries/entry_data.ledger.query.ts";
import { CreateEntryDataFunction } from "@infinity-ledger/ledger.db.functions/entry_data/ledger.entry_data.function.ts";
import { CreateConsumptionFunction } from "@infinity-ledger/ledger.db.functions/consumption/ledger.consumption.function.ts";
import { CreateAddLedgerEntryFunction } from "@infinity-ledger/ledger.db.functions/add_entry/ledger.add_entry.function.ts";
import { LedgerRest } from "@infinity-ledger/ledger.rest.ts";
import { CreateUserDataFunction } from "@infinity-ledger/ledger.db.functions/user_data/ledger.user_data.function.ts";
import { GetLedgerUserData } from "@infinity-ledger/ledger.db.queries/user_data.ledger.query.ts";
type With<T, K extends Record<string, any>> = T & { [P in keyof K]: K[P] }
// deno-lint-ignore ban-types
export class Ledger<EntityName extends string, PEMC extends EntityMetaConfig = {}> extends Entity<LedgerEvents, EntityName, LedgerPermissionsType<EntityName>, LedgerMetaType, PEMC> {
readonly allow_negative_balances: boolean
readonly key_max_length: number;
readonly expiration?: LedgerExpiration
readonly key_validator?: KeyValidator
readonly key_formatter?: KeyFormatter
readonly REST = new LedgerRest(this)
protected static override Permissions: readonly string[] = LedgerPermissions
constructor(name: EntityName, {
allow_negative_balances,
key_max_length,
meta_fields,
expiration,
key_validator,
key_formatter
}: LedgerOptions<PEMC> = {}) {
super(name, LedgerMeta as LedgerMetaType, meta_fields as PEMC);
this.allow_negative_balances = allow_negative_balances ?? false;
this.key_max_length = key_max_length ?? 11;
this.expiration = expiration;
this.key_validator = key_validator;
this.key_formatter = key_formatter;
}
format_key(key: string) {
return this.key_formatter?.(key) ?? key;
}
validate_key(key: string): KeyValidation {
if(!this.key_validator) {
return { valid: true, reason: null };
};
const validation_result = this.key_validator(key);
if(validation_result === true) {
return { valid: true, reason: null };
}
if(validation_result === false) {
return {
valid: false,
reason: 'Unspecified: Validator did not return a failure reason.'
};
}
return validation_result;
}
/**
* Add a new entry to the ledger, and updates the balance.
*
* @param key The unique value referring to a user. Usually their `msisdn`
* @param amount The amount to add to this user. Negative numbers will subtract from their balance.
* @returns The new balance
*/
@run_on_worker
async AddEntry(params: UnenteredLedgerEntry & LedgerExpirationOverride) {
params.key = this.format_key(params.key);
const { valid, reason } = this.validate_key(params.key);
if(!valid) {
this.emit('entry_creation_failed', reason, params as With<typeof params, { remaining: number }>);
return {
added: false,
reason,
};
}
if(params.amount < 0 && !this.allow_negative_balances) {
const reason = `adding negative rows is not allowed for [${this.name}]. Use 'ConsumeEntries' instead.`;
this.emit('entry_creation_failed', reason, params as With<typeof params, { remaining: number }>);
return {
added: false,
reason,
};
}
let expiration_date: Date | undefined;
if(this.expiration?.entries_expire) {
const { custom_calculator, period, round_up_to } = this.expiration;
expiration_calc: {
if(typeof custom_calculator === 'function') {
expiration_date = custom_calculator();
break expiration_calc;
}
if(period) {
let expiration = dayjs().add(period[0], period[1]);
if(round_up_to && round_up_to !== 'none') expiration = expiration.endOf(round_up_to);
expiration_date = expiration.toDate();
}
}
if(!params.override_expiration_date) {
params.expiration_date = expiration_date;
}
params.expired = params.expiration_date && dayjs().isAfter(params.expiration_date);
} else {
params.expiration_date = undefined;
}
delete params.override_expiration_date;
params.remaining = params.amount;
const {rows: [{
new_entry_id = null,
new_entry_timestamp = new Date(),
new_balance = 0 } = {}
]} = (await AddLedgerEntry.populate({
entries_table: this.entries_table
}).execute(State.PostgresClient,
params.key,
params.amount,
params.remaining,
params.reason,
params.description,
params.reference,
params.meta,
params.source,
params.request_id,
params.expiration_date,
!!params.expired
))
if(!new_entry_id) {
const reason = `Could not insert into ${this.entries_table}!`;
this.emit('entry_creation_failed', reason, params as With<typeof params, { remaining: number }>);
return {
added: false,
reason,
};
}
const entry = params as LedgerEntry;
entry.id = new_entry_id;
entry.timestamp = new_entry_timestamp;
this.emit('entry_created', entry);
return {
added: true,
balance: new_balance,
}
}
/**
* Consume from positive past entries linked to a specific user
*
* @param key The unique value referring to the user. Usually their `msisdn`
* @param amount How much value should be consumed
* @param source Where is this consumption coming from? (App, USSD, etc)
* @param reference Is this consumption tied to anything?
* @param description Why is this consumption happening? (USER-FACING)
*
* @returns Whether the consumption was successfull
*/
@run_on_worker
async ConsumeEntries(params: UnenteredLedgerConsumption): Promise<LedgerConsumptionResponse> {
params.key = this.format_key(params.key);
const key_validation = this.validate_key(params.key);
if(!key_validation.valid) {
this.emit('entry_creation_failed', key_validation.reason, params as With<typeof params, { remaining: number }>);
return {
key: params.key,
success: false,
reason: key_validation.reason,
params,
};
}
let should_run_consumption = true;
let failure_reason: string = 'Consumption cancelled by `.on("before-consumption")` event';
await this.emit('before-consumption', params, (are_you_sure, reason) => {
if(are_you_sure) {
should_run_consumption = false;
if(reason) failure_reason = reason;
}
})
if(!should_run_consumption) return {
key: params.key,
success: false,
reason: failure_reason,
params,
};
const {rows: [{
new_entry_id,
updated_entries,
new_user_balance,
error_message
} = {}]} = (await ConsumeFromLedger.populate({
slug: this.slug
}).execute(State.PostgresClient,
params.key,
params.amount,
params.source,
params.reference,
params.description,
params.request_id,
));
if(!new_entry_id || !updated_entries || !new_user_balance) return {
key: params.key,
success: false,
reason: error_message || 'Unspecified error during consumption query',
params
}
await this.emit('consumption', { new_entry_id, updated_entries, new_user_balance, error_message: error_message || null }, params);
return {
key: params.key,
success: true,
consumption_data: { new_entry_id, updated_entries, new_user_balance } as ConsumptionResponseMapped,
params
};
}
/**
* Get recent entries to the ledger history
*
* @param limit How many entries should be retrieved?
*
* Default: 50
*/
@run_on_worker
async History(count = 50, page = 1) {
if(!Number.isSafeInteger(count)) count = 10;
count = Math.max(1, count);
const { rows: entries = [] } = await State.PostgresClient.query<LedgerEntry>(`select * from ${this.entries_table} order by timestamp desc limit $1 offset $2`, [count, count * (page - 1)]);
return entries;
}
/**
* Get information regarding an entry, including consumption entries
*
* @param key The unique value referring to a user. Usually their `msisdn`
* @param limit How many entries should be retrieved?
*
* Default: 10
*/
@run_on_worker
async UserData(key: string, count = 10, page = 1) {
key = this.format_key(key);
if(!this.validate_key(key).valid) {
return null;
}
if(!Number.isSafeInteger(count)) count = 10;
count = Math.max(1, count);
const { rows: [{ history, entry_count, balance } = {}] } = (await GetLedgerUserData.populate({
slug: this.slug,
}).execute(State.PostgresClient, key, count, count * (page - 1)));
return { history, entry_count, balance };
}
/**
* Get the ledger history for a user
*
* @param key The unique value referring to a user. Usually their `msisdn`
* @param limit How many entries should be retrieved?
*
* Default: 10
*/
@run_on_worker
async HistoryFor(key: string, count = 10, page = 1) {
key = this.format_key(key);
if(!this.validate_key(key).valid) {
return [];
}
if(!Number.isSafeInteger(count)) count = 10;
count = Math.max(1, count);
const { rows: entries = [] } = await State.PostgresClient.query<LedgerEntry>(`select * from ${this.entries_table} where key=$1 order by timestamp desc limit $2 offset $3`, [key, count, count * (page - 1)]);
return entries;
}
/**
* Get information regarding an entry, including consumption entries
*
* @param key The unique value referring to a user. Usually their `msisdn`
* @param limit How many entries should be retrieved?
*
* Default: 10
*/
@run_on_worker
async EntryData(id: number) {
const { rows: [{ entry, deducted_by_entries, deducted_from_entries } = {}] } = (await GetLedgerEntryData.populate({
slug: this.slug,
}).execute(State.PostgresClient, id))
if(!entry) return null;
return {
entry,
deducted_by_entries,
deducted_from_entries,
} as EntryDataResponse;
}
/**
* Get the total balance for a user
*
* @param key The unique value referring to a user. Usually their `msisdn`
*/
@run_on_worker
async BalanceFor(key: string) {
key = this.format_key(key);
if(!this.validate_key(key).valid) {
return undefined;
}
const { rows: [{ balance = 0 } = {}] } = await State.PostgresClient.query<LedgerUser>(`select balance from ${this.user_table} where key=$1`, [key]);
return balance;
}
/**
* Get the total balance for multiple user
*
* @param keys The unique values used to refer to a user. Usually their `msisdn`
*/
@run_on_worker
async Balances(...keys: string[]): Promise<Pick<LedgerUser, 'key' | 'balance'>[]> {
const { rows } = await State.PostgresClient.query<LedgerUser>(`select balance from ${this.user_table} where key = ANY($1)`, [keys]);
return rows.map(({ key, balance }) => ({ key, balance }));
}
/**
* Get the amount of ledger entries for a user
*
* @param key The unique value referring to a user. Usually their `msisdn`
*/
@run_on_worker
async CountFor(key: string) {
key = this.format_key(key);
if(!this.validate_key(key).valid) {
return undefined;
}
const { rows: [{ entry_count = 0 } = {}] } = await State.PostgresClient.query<{ entry_count: number }>(`select (addition_count + consumption_count) as entry_count from ${this.user_table} where key=$1`, [key]);
return entry_count;
}
/**
* Get the total number of users that have been tracked in this ledger.
*
* A user is only tracked once an entry has been added for them.
*/
@run_on_worker
async UserCount() {
const { rows: [{ count = 0 } = {}] } = await State.PostgresClient.query<{ count: number }>(`select count(*)::int as count from ${this.user_table}`);
return count;
}
get entries_table() {
return `${this.slug}_entries`;
}
get user_table() {
return `${this.slug}_users`;
}
get consumption_table() {
return `${this.slug}_consumption`;
}
override async Setup() {
await super.Setup();
// #region Create DB Tables
const entries_table = this.entries_table;
const key_max_length = this.key_max_length;
// #region - Entries Table
await State.PostgresClient.CreateTable<LedgerEntry>(entries_table, {
id: "serial PRIMARY KEY",
key: `varchar(${key_max_length}) NOT NULL`,
amount: "int NOT NULL",
remaining: "int NOT NULL",
timestamp: "timestamptz NOT NULL default NOW()",
reason: 'text',
description: 'text',
reference: 'text',
meta: 'text',
source: 'text NOT NULL',
request_id: 'text',
expiration_date: "timestamptz",
expired: 'boolean NOT NULL DEFAULT false',
}, {
async onCreate() {
await State.PostgresClient.query(`
CREATE INDEX ${entries_table}_key_idx ON ${entries_table} using hash(key);
CREATE INDEX ${entries_table}_key_timestamp_idx ON ${entries_table} (key, timestamp desc);
`);
}
});
// #region - Users Table
const user_table = this.user_table;
await State.PostgresClient.CreateTable<LedgerUser>(user_table, {
id: "SERIAL PRIMARY KEY",
balance: "INT NOT NULL DEFAULT 0",
addition_count: "INT NOT NULL default 0",
consumption_count: "INT NOT NULL default 0",
key: `VARCHAR(${key_max_length}) NOT NULL`,
first_entry: 'TIMESTAMPTZ NOT NULL DEFAULT NOW()',
last_entry: 'TIMESTAMPTZ NOT NULL DEFAULT NOW()',
}, {
async onCreate() {
await State.PostgresClient.query(`
CREATE UNIQUE INDEX ${user_table}_key_idx ON ${user_table} (key);
CREATE INDEX ${user_table}_key_hash_idx ON ${user_table} using hash(key);
CREATE INDEX ${user_table}_balance_idx ON ${user_table} (balance);
`);
}
});
// #region - Consumption Table
const consumption_table = this.consumption_table;
await State.PostgresClient.CreateTable<LedgerConsumption>(consumption_table, {
id: "SERIAL PRIMARY KEY",
consumption_entry_id: `INT4 REFERENCES ${entries_table}(id) ON DELETE RESTRICT`,
source_entry_id: `INT4 REFERENCES ${entries_table}(id) ON DELETE RESTRICT`,
amount_deducted: `INT NOT NULL`,
timestamp: `TIMESTAMPTZ DEFAULT NOW()`
}, {
async onCreate() {
await State.PostgresClient.query(`
CREATE INDEX ${consumption_table}_consumption_id_idx ON ${consumption_table} using hash(consumption_entry_id);
CREATE INDEX ${consumption_table}_source_id_idx ON ${consumption_table} using hash(source_entry_id);
`);
}
});
// #region Create DB Functions
await CreateConsumptionFunction.populate({
comsumptions_table: this.consumption_table,
entries_table: this.entries_table,
meta_table: this.meta_table,
slug: this.slug,
users_table: this.user_table,
}).execute(State.PostgresClient);
await CreateAddLedgerEntryFunction.populate({
entries_table: this.entries_table,
meta_table: this.meta_table,
users_table: this.user_table,
}).execute(State.PostgresClient);
await CreateEntryDataFunction.populate({
comsumptions_table: this.consumption_table,
entries_table: this.entries_table,
slug: this.slug
}).execute(State.PostgresClient);
await CreateUserDataFunction.populate({
entries_table: this.entries_table,
slug: this.slug,
user_table: this.user_table
}).execute(State.PostgresClient);
return this;
}
}
Ledger.on('ledger created', (instance) => {
console.log(`[${instance.name}] Ledger initialized`);
})