import EventEmitter from "node:events";
import State from "./state.ts";
import { randomUUID } from "node:crypto";
import { EntityWorker } from "./entity_worker.ts";
import { Sluggify } from "../utils/slug.ts";
interface I_Task extends I_ProcessorRequest {
is_static: boolean
instance_data?: any
resolve: VagueFunction
}
export type EventMap<T extends Record<string, any[]>> = T;
type EntityEvents = EventMap<{
/**
* Triggered when a new class instance is created
*
* @param entity The instance that was created
*/
create: [Entity<any>]
}>
export class Entity<T extends Record<string, any[]>> extends EventEmitter<T | EntityEvents> {
private static __tasks_in_progress: Record<string, I_Task> = {}
private static __tasks_by_entity: Record<string, Record<string, I_Task>> = {}
static readonly __is_typeof_entity = true;
static readonly uuid = randomUUID();
readonly name: string
constructor(name: string) {
super();
this.name = name;
this.emit('create', this);
}
static get __tasks() {
return Object.values(Entity.__tasks_by_entity[this.name] ||= {});
}
protected get slug() {
return Sluggify((this.constructor as typeof Entity).name);
}
private static __workers: EntityWorker[] = []
static get __worker_count() {
return Entity.__workers.length;
}
protected static get __worker_methods() { return Entity.___worker_methods[this.name] ||= []; }
private static ___worker_methods: Record<string, string[]> = {}
private static __delegation_index = 0
protected static get __next_worker() {
if(++Entity.__delegation_index >= Entity.__workers.length) Entity.__delegation_index = 0;
return Entity.__workers[Entity.__delegation_index];
}
// #region Queueing and Delegation
protected static __task_queue: I_Task[] = []
test = setInterval(() => {
Entity.log(Entity.uuid, Entity.__workers.length);
}, 100);
protected static __queue_processing = false;
protected static __ProcessQueue() {
this.log('Processing queue', Entity.__worker_count);
if(Entity.__queue_processing) return;
if(!Entity.__task_queue.length) return Entity.__queue_processing = false;
if(!Entity.__worker_count) {
this.warn('::no_workers -', `${Entity.__task_queue.length} tasks waiting for a worker!`);
return;
}
Entity.__queue_processing = true;
while(Entity.__task_queue.length) {
const task = Entity.__task_queue.shift();
if(!task) continue;
Entity.__next_worker.dispatch(task).then(response => {
task.resolve(response);
});
}
Entity.__queue_processing = false;
}
static __delegate_instance(instance: Entity<any>, method: string, args: any[]) {
return new Promise(resolve => {
let id: string;
do {
id = randomUUID();
} while(Entity.__tasks_in_progress[id!]);
const instance_data = Object.assign(
{},
...Object.entries(instance)
.map(([key, value]) => {
if(Object.hasOwn(instance, key)) return { [key]: value };
return false;
})
.filter(e => e !== false)
);
Entity.__task_queue.push({
id,
entity_name: instance.name,
is_static: false,
instance_data,
method,
args,
resolve,
});
Entity.__ProcessQueue();
})
}
static __delegate(method: string, args: any[]) {
return new Promise(resolve => {
let id: string;
do {
id = randomUUID();
} while(Entity.__tasks_in_progress[id!]);
Entity.__task_queue.push({
id,
entity_name: this.name,
is_static: true,
method,
args,
resolve,
});
Entity.__ProcessQueue();
})
}
// #region Workers
static async __AddWorker(uuid: string, socket: WebSocket) {
const worker = new EntityWorker(uuid, socket);
Entity.log('Pushing worker...', Entity.__workers.length);
Entity.__workers.push(worker);
Entity.log('Pushed worker?', Entity.uuid, Entity.__workers.length);
await sleep(500);
Entity.log('Pushed worker?', Entity.uuid, Entity.__workers);
await sleep(500);
Entity.log('Pushed worker?', Entity.uuid, Entity.__workers.length);
await sleep(500);
Entity.log('Pushed worker?', Entity.uuid, Entity.__workers);
await sleep(500);
Entity.__ProcessQueue();
}
static __RemoveWorker(worker: EntityWorker) {
console.log(`Worker lost! [${worker.uuid}]`);
const index = Entity.__workers.findIndex(w => w.uuid == worker.uuid);
if(index > -1) Entity.__workers.splice(index, 1);
}
// #region Logging
static log(...args: any[]) {
console.log(`[${this.name}]`, ...args);
}
static warn(...args: any[]) {
console.warn(`[${this.name}]`, ...args);
}
}
// #region Decorators
type OffloadDecoratorMethod = (originalMethod: VagueAsyncFunction, context: ClassMethodDecoratorContext) => void;
type OffloadDecoratorWithWrapper = <T>(wrapper: new (...args: any[]) => T) => OffloadDecoratorMethod;
type OffloadDecorator = {
with_response_wrapper: OffloadDecoratorWithWrapper;
} & OffloadDecoratorMethod;
const run_on_worker_method: OffloadDecoratorMethod = function(this: any, originalMethod: VagueAsyncFunction, { name: methodName, static: isStatic, addInitializer }: ClassMethodDecoratorContext) {
addInitializer(function() {
if(State.IS_PROCESSOR) return;
let cls: typeof Entity;
let instance: Entity<any>;
if(isStatic) {
cls = this as typeof Entity;
} else {
instance = (this as Entity<any>);
cls = instance.constructor as typeof Entity;
}
(cls as any).__worker_methods.push(originalMethod.name);
if(isStatic) {
Object.assign(cls, {
[methodName]: (...args: any[]) => {
return cls.__delegate(originalMethod.name, args);
}
})
} else {
Object.assign(cls.prototype, {
[methodName]: function(this: Entity<any>, ...args: any[]) {
return cls.__delegate_instance(instance, originalMethod.name, args);
}
})
}
})
};
export const run_on_worker: OffloadDecorator = Object.assign(run_on_worker_method, {
with_response_wrapper: ((wrapper) => {
return run_on_worker_method.bind(wrapper);
}) as OffloadDecoratorWithWrapper
})
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));