0.1.1Updated 7 months ago
import { randomUUID } from "node:crypto";
import State from "@infinity-beyond/modules/state.ts";
import { Task } from "@infinity-beyond/modules/delegation/task.ts";

export class TaskProcessor {
  static readonly HOST_URL = Deno.env.get('HOST_URL')!;
  static uuid = randomUUID()

  private static socket: WebSocket;

  private static STARTED: boolean = false;

  static #CONNECTED: boolean = false;
  static get CONNECTED() { return this.#CONNECTED }
  private static set CONNECTED(newValue) { this.#CONNECTED = newValue }

  // #region Analytics

  static async RequestAnalytics(_request: I_ProcessorRequest) {
    try {

      /*
       *  Do worker-side analytics and task monitoring here
       */

    } catch(e: any) {
      console.warn("Processor::RequestAnalytics() error! Continueing with request...", e.message)
    }
  }

  static async ResponseAnalytics(_time_taken: number, _request: I_ProcessorRequest) {
    try {

      /*
       *  Do worker-side analytics and task monitoring here
       */

    } catch(e: any) {
      console.warn("Processor::ResponseAnalytics() error! Continueing with request...", e.message)
    }
  }

  // #region Handlers

  static ServeHandler = async (request: I_ProcessorRequest) => {
    try {
      await this.RequestAnalytics(request);

      const task_start_time = Date.now();

      Task.Queue(this.socket, request).then(task => {
        task.on('finished', async () => {
          await this.ResponseAnalytics(Date.now() - task_start_time, request);
        })
      });
    } catch(e: any) {
      console.error(`Could not start the requested task!`, e.message);
    }
  }

  private static Handshake() {
    console.log(`Attempting handshake...`);
    this.socket = this.Connect();

    this.socket.addEventListener('open', () => {
      this.CONNECTED = true;
      console.log(`♾️  Processor connected to Host`);
    })

    this.socket.addEventListener('close', () => {
      if(!this.CONNECTED) return;

      this.CONNECTED = false;
      console.log(`♾️X Processor disconnected from Host`);
    })

    this.socket.addEventListener('message', (ev: MessageEvent) => {
      try {
        this.ServeHandler(JSON.parse(ev.data));
      } catch(e: any) {
        console.error(`Could not parse message from host!`, e.message);
      }
    })

    this.socket.addEventListener('error', () => {
      console.log(`Could not handshake with host. Retrying in 3 seconds...`);
      setTimeout(this.Handshake.bind(this), 3000);
    })
  }

  protected static Connect() {
    return new WebSocket(new URL(`/worker/handshake?uuid=${this.uuid}`, this.HOST_URL).href, [`SIGNATURE`, State.SIGNATURE]);
  }

  static Start() {
    if(this.STARTED) return;

    State.PROCESSOR = this;

    this.STARTED = true;
    State.Environment = 'PROCESSOR';

    console.log(`♾️  Infinity processor has started [${TaskProcessor.uuid}]`);

    TaskProcessor.Handshake();
  }

  private constructor() {}
}

if(!TaskProcessor.HOST_URL) throw new Error(`HOST_URL not found!`);