import { join } from "node:path";
import { walkSync } from "jsr:@std/fs";
import { Entity } from "@infinity-beyond/modules/entity.ts";
import State from "@infinity-beyond/modules/state.ts";
const entities: Record<string, typeof Entity> = {}
const classes_path = join(Deno.cwd(), 'classes');
if(State.Environment == 'PROCESSOR') {
for(const dirEntry of walkSync(classes_path, { includeDirs: false })) {
const entity_import = await import(`file://${dirEntry.path}`);
const entity = Object.values(entity_import)[0] as typeof Entity;
if(entity?.__is_typeof_entity || entity instanceof Entity) entities[entity.name] = entity;
}
}
type TaskEvent = 'succeed' | 'fail' | 'finished'
export class Task {
static readonly HOST_URL: string = Deno.env.get('HOST_URL')!;
readonly id: string
readonly method: string
readonly entity_name: string
readonly args: any[]
readonly has_run: boolean = false
private readonly fns: Record<TaskEvent, VagueFunction[]> = { succeed: [], fail: [], finished: [] }
private socket: WebSocket;
private constructor(socket: WebSocket, { id, method, entity_name, args }: I_ProcessorRequest) {
this.socket = socket;
this.id = id;
this.method = method;
this.entity_name = entity_name;
this.args = args;
Queue.add(this);
}
on(event: TaskEvent, fn: VagueFunction) {
this.fns[event].push(fn);
}
async run() {
if(this.has_run) return;
(this as any).has_run = true;
const entity = entities[this.entity_name];
if(!entity) return this.fail(`Could not find [${this.entity_name}]!`);
const method = (entity as any)[this.method] as VagueFunction;
if(!method) return this.fail(`Could not find [${this.entity_name}::${this.method}()]!`);
try {
const response = await method.apply(entity, this.args) as Record<string, any>;
return this.succeed(response);
} catch(e: any) {
return this.fail(`An error ocurred when trying to execute [${this.entity_name}::${this.method}()]`, e);
}
}
private succeed(data: Record<string, any>) {
for(const fn of this.fns.succeed) fn();
for(const fn of this.fns.finished) fn();
return this.sendResponse({
id: this.id,
success: true,
data
})
}
private fail(message: string, error?: Error) {
for(const fn of this.fns.fail) fn();
for(const fn of this.fns.finished) fn();
return this.sendResponse({
id: this.id,
success: false,
message,
error: error && { name: error.name, message: error.message, cause: error.cause }
})
}
private sendResponse(body: I_ProcessorResponse) {
this.socket.send(JSON.stringify(body));
}
// #region Queue Management
static async Queue(socket: WebSocket, request: I_ProcessorRequest) {
return new Task(socket, request);
}
}
class Queue {
protected static tasks: Task[] = []
protected static is_running: boolean = false;
static add(task: Task) {
this.tasks.push(task);
this.process_queue();
}
private static async process_queue() {
if(this.is_running) return;
if(this.tasks.length == 0) return this.is_running = false;
const task = this.tasks.shift();
await task!.run();
this.is_running = false;
this.process_queue();
}
private constructor() {}
}