import State from "@infinity-beyond/modules/state.ts";
import { Pool, Client, type ConnectionString } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { Sluggify } from "@infinity-beyond/utils/slug.ts";
export class PostgresClient {
private _setup_client!: Client
pool!: Pool
readonly connection_string!: string
readonly database!: string
private SETUP_FINISHED = false;
constructor(connection_string: ConnectionString) {
if(Deno.args.includes('build')) return this;
const [ _, extracted_connection_string, database ] = connection_string.match(/((?:postgresql|postgres):\/\/(?:[^:@\s]*(?::[^@\s]*)?@)?(?:[^\/\?\s]+))\b(?:\/(.+))?/) ?? [];
if(!extracted_connection_string) throw new Error('PostgresClient_Invalid connection_string provided')
this.connection_string = extracted_connection_string;
this.database = Sluggify(database);
this._setup_client = new Client(`${this.connection_string}/postgres`);
}
async CreateTable<T = Record<string, string>>(name: string, fields: { [P in keyof Required<T>]: string | CreateTableEnum }, { onCreate, crons }: CreateTableOptions = {}) {
const TABLE_ENUMS: string[] = [];
const fields_array: string[] = [];
for(const field of Object.entries(fields)) {
const field_name = field[0];
const input_type = field[1] as string | CreateTableEnum;
if(typeof input_type == 'string') {
fields_array.push(`${field_name} ${input_type}`);
} else {
const typname = `${name}_${field_name}_enum`;
TABLE_ENUMS.push(`
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = '${typname}') THEN
CREATE TYPE ${typname} AS ENUM ('${input_type.enum_values.join("', '")}');
END IF;
`);
fields_array.push(`${field_name} ${typname}`);
}
}
if(TABLE_ENUMS.length) {
const enums_query = `
DO $$
BEGIN
${TABLE_ENUMS.join(';\n')}
END$$ LANGUAGE plpgsql;
`;
await this.query(enums_query);
}
const query = `CREATE TABLE IF NOT EXISTS "${name}" (${fields_array.join(',')})`;
const response = await this.query(query);
const newly_created = response.warnings.length === 0;
if(newly_created) {
if(typeof onCreate === 'function') {
await onCreate();
}
}
if(crons?.length && State.Environment === 'HOST') {
let i = 0;
for(const cron of crons) {
Deno.cron(cron.name ?? `${name}_cron_${++i}`, cron.schedule, () => {
try {
this.query(cron.command);
} catch(e: any) {
console.warn(`${cron.name} CRON failed. ${e.message}`);
}
})
if(cron.runOnStart) {
try {
await this.query(cron.command);
} catch(_) { /* Run cron, but fail quietly */ }
}
}
}
return response;
}
async Setup() {
if(this.SETUP_FINISHED) return this;
await this.EnsureDatabaseExists(this.database);
this.SETUP_FINISHED = true;
await this._setup_client.end();
this.pool = new Pool(`${this.connection_string}/${this.database}`, 20, true);
return this;
}
private async EnsureDatabaseExists(name: string) {
if(this.SETUP_FINISHED) return;
const { rowCount = 0 } = await this._setup_client.queryObject(`SELECT datname FROM pg_catalog.pg_database where datname=$1`, [name]);
if(rowCount === 0) {
await this._setup_client.queryObject(`CREATE DATABASE "${name}"`);
}
}
async query<T = any>(query: string, args?: any[]) {
const client = await this.pool.connect();
try {
if(Deno.env.get('LOGGING_QUERIES')) console.log(query);
try {
return await client.queryObject<T>(query, args);
} catch(e: any) {
if(Deno.env.get('LOGGING_QUERIES')) console.warn(`Query error! ${e.message}`);
return {
rows: [],
warnings: [e.message],
}
}
} finally {
client.release()
}
}
get end() { return this.pool.end.bind(this.pool); }
}
interface CreateTableEnum {
enum_values: string[]
}
interface CreateTableOptions {
onCreate?: () => Promise<void>,
crons?: PgCron[]
}
type cronstring = `${'*' | number}`
type cronstring_slash = `*/${number}`
type cronstring_multi = `${cronstring_slash},${cronstring_slash}`
type cronstring_full = `${cronstring | cronstring_slash | cronstring_multi}`
export interface PgCron {
name?: string
schedule: `${cronstring_full} ${cronstring_full} ${cronstring_full} ${cronstring_full} ${cronstring_full}`,
command: string,
runOnStart?: boolean
}