0.1.2•Updated 7 months ago
import { randomUUID } from "node:crypto";
import State from "@infinity-beyond/modules/state.ts";
import { Task } from "@infinity-beyond/modules/delegation/task.ts";
export class TaskProcessor {
static readonly HOST_URL = Deno.env.get('HOST_URL')!;
static uuid = randomUUID()
private static socket: WebSocket;
private static STARTED: boolean = false;
static #CONNECTED: boolean = false;
static get CONNECTED() { return this.#CONNECTED }
private static set CONNECTED(newValue) { this.#CONNECTED = newValue }
// #region Analytics
static async RequestAnalytics(_request: I_ProcessorRequest) {
try {
/*
* Do worker-side analytics and task monitoring here
*/
} catch(e: any) {
console.warn("Processor::RequestAnalytics() error! Continueing with request...", e.message)
}
}
static async ResponseAnalytics(_time_taken: number, _request: I_ProcessorRequest) {
try {
/*
* Do worker-side analytics and task monitoring here
*/
} catch(e: any) {
console.warn("Processor::ResponseAnalytics() error! Continueing with request...", e.message)
}
}
// #region Handlers
static ServeHandler = async (request: I_ProcessorRequest) => {
try {
await this.RequestAnalytics(request);
const task_start_time = Date.now();
Task.Queue(this.socket, request).then(task => {
task.on('finished', async () => {
await this.ResponseAnalytics(Date.now() - task_start_time, request);
})
});
} catch(e: any) {
console.error(`Could not start the requested task!`, e.message);
}
}
private static Handshake() {
console.log(`Attempting handshake...`);
this.socket = this.Connect();
this.socket.addEventListener('open', () => {
this.CONNECTED = true;
this.RETRY_COUNT = 0;
console.log(`♾️ Processor connected to Host`);
})
this.socket.addEventListener('close', () => {
if(!this.CONNECTED) return;
this.CONNECTED = false;
console.log(`♾️X Lost connection to the Host`);
})
this.socket.addEventListener('message', (ev: MessageEvent) => {
try {
this.ServeHandler(JSON.parse(ev.data));
} catch(e: any) {
console.error(`Could not parse message from host!`, e.message);
}
})
this.socket.addEventListener('error', (ev: any) => {
if((ev.message as string).includes('status code: 403')) {
console.error(`Signature invalid! Exiting.`);
Deno.exit(9);
}
if(this.CONNECTED) {
console.log(`♾️X Lost connection to the host.`);
} else {
console.log(`Could not handshake with host.`);
}
this.CONNECTED = false;
let seconds: number = 3;
if(this.RETRY_COUNT == 0) {
seconds = 0;
console.log('Retrying immediately')
} else {
console.log(`Waiting ${seconds} seconds before retrying.`)
}
setTimeout(this.Handshake.bind(this), seconds * 1000);
this.RETRY_COUNT++;
})
}
private static RETRY_COUNT = 0;
protected static Connect() {
return new WebSocket(new URL(`/api/handshake?uuid=${this.uuid}`, this.HOST_URL).href, [`SIGNATURE`, State.SIGNATURE]);
}
static Start() {
if(this.STARTED) return;
State.PROCESSOR = this;
this.STARTED = true;
State.Environment = 'PROCESSOR';
console.log(`♾️ Infinity processor has started [${TaskProcessor.uuid}]`);
TaskProcessor.Handshake();
}
private constructor() {}
}
if(!TaskProcessor.HOST_URL) throw new Error(`HOST_URL not found!`);