0.1.4Updated 6 months ago
import { randomUUID } from "node:crypto";
import State from "@infinity-beyond/modules/state.ts";
import { TaskQueue } from "@infinity-beyond/modules/delegation/task.ts";
import { FetchInfinityClasses } from "@infinity-beyond/modules/delegation/entity_ref.ts";

export class TaskProcessor {
  static readonly HOST_URL = Deno.env.get('HOST_URL')!;
  static uuid = randomUUID()

  private static socket: WebSocket;

  private static STARTED: boolean = false;

  static #CONNECTED: boolean = false;
  static get CONNECTED() { return this.#CONNECTED }
  private static set CONNECTED(newValue) { this.#CONNECTED = newValue }

  // #region Analytics

  static async RequestAnalytics(_request: I_ProcessorRequest) {
    try {

      /*
       *  Do worker-side analytics and task monitoring here
       */

    } catch(e: any) {
      console.warn("Processor::RequestAnalytics() error! Continueing with request...", e.message)
    }
  }

  static async ResponseAnalytics(_time_taken: number, _request: I_ProcessorRequest) {
    try {

      /*
       *  Do worker-side analytics and task monitoring here
       */

    } catch(e: any) {
      console.warn("Processor::ResponseAnalytics() error! Continueing with request...", e.message)
    }
  }

  // #region Handlers

  static ServeHandler = async (request: I_ProcessorRequest) => {
    try {
      TaskQueue.add(this.socket, request);
    } catch(e: any) {
      console.error(`Could not start the requested task!`, e.message);
    }
  }

  private static Handshake() {
    console.log(`Attempting handshake...`);
    this.socket = this.Connect();

    this.socket.addEventListener('open', () => {
      this.CONNECTED = true;
      this.RETRY_COUNT = 0;

      console.log(`♾️  Processor connected to Host`);
    })

    this.socket.addEventListener('close', () => {
      if(!this.CONNECTED) return;

      this.CONNECTED = false;
      console.log(`♾️X Lost connection to the Host`);
    })

    this.socket.addEventListener('message', (ev: MessageEvent) => {
      try {
        this.ServeHandler(JSON.parse(ev.data));
      } catch(e: any) {
        console.error(`Could not parse message from host!`, e.message);
      }
    })

    this.socket.addEventListener('error', (ev: any) => {
      if((ev.message as string).includes('status code: 403')) {
        console.error(`Signature invalid! Exiting.`);
        Deno.exit(9);
      }

      if(this.CONNECTED) {
        console.log(`♾️X Lost connection to the host.`);
      } else {
        console.log(`Could not handshake with host.`);
      }

      this.CONNECTED = false;

      let seconds: number = 3;

      if(this.RETRY_COUNT == 0) {
        seconds = 0;
        console.log('Retrying immediately')
      } else {
        console.log(`Waiting ${seconds} seconds before retrying.`)
      }

      setTimeout(this.Handshake.bind(this), seconds * 1000);

      this.RETRY_COUNT++;
    })
  }

  private static RETRY_COUNT = 0;

  protected static Connect() {
    return new WebSocket(new URL(`/__worker__/handshake?uuid=${this.uuid}`, this.HOST_URL).href, [`SIGNATURE`, State.SIGNATURE]);
  }

  static async Start(import_meta_dirname: string) {
    if(this.STARTED) return;
    this.STARTED = true;

    State.PROCESSOR = this;
    State.Environment = 'PROCESSOR';

    TaskQueue.Entities = await FetchInfinityClasses(import_meta_dirname);

    console.log(`♾️  Infinity processor has started [${TaskProcessor.uuid}]`);

    TaskProcessor.Handshake();
  }

  private constructor() {}
}

if(!TaskProcessor.HOST_URL) throw new Error(`HOST_URL not found!`);