import {
  type Server as PartyServer,
  type Room,
  type Request,
  type ConnectionContext,
  type Connection,
  Storage,
} from "partykit/server";
import z from "zod";
import {
  QuestionsClientMessage,
  QuestionsClientMessageSchema,
  QuestionsServerMessage,
  Speaker,
  Question,
  QuestionSchema,
} from "./questionsMessageTypes";
import OpenAI from "openai";
import { AxiomLogger } from "./utils/axiom";
import { TranscriptUpdate } from "./types";
import { Queue } from "./utils/queue";
import { openAiWithQueue } from "./utils/openAiWithQueue";
import { zodResponseFormat } from "openai/helpers/zod.mjs";
import type { ParsedChatCompletion } from "openai/resources/beta/chat/completions.mjs";
const THROTTLED_LLM_DETECTION = 5000;
const CUSTOMER_ONLY = false;
/* The QuestionsServer handles receiving transcript updates and speaker information from the client.
   It incrementally processes the transcript and detects questions, which are then sent to the client
   to display.
*/

export const QuestionContextSchema = z.object({
  question: z.string(),
  context: z.string(),
});

type QuestionContext = z.infer<typeof QuestionContextSchema>;

export const TranscriptQuestionClassificationSchema = z.enum([
  "meaningful_question",
  "irrelevant_question",
  "non_question",
  "unknown",
]);

export type TranscriptQuestionClassification = z.infer<
  typeof TranscriptQuestionClassificationSchema
>;

export default class QuestionsServer implements PartyServer {
  private transcriptQueue: Queue<TranscriptUpdate>;
  openai: OpenAI;
  axiom: AxiomLogger;
  transcript: TranscriptUpdate[] = [];
  speakers: Speaker[] = [];
  questions: Question[] = [];
  private lastProcessedTime: number = 0;
  private windowDuration: number = 30;
  storage: Storage;
  shouldProcess: boolean = false;
  private classificationQueue: TranscriptUpdate[] = [];

  constructor(readonly room: Room) {
    this.room = room;
    this.openai = new OpenAI({
      apiKey: this.room.env.OPEN_AI_API_KEY as string,
    });
    this.transcriptQueue = new Queue<TranscriptUpdate>();
    this.axiom = new AxiomLogger({
      orgId: "not_set",
      roomId: room.id,
    });
    this.storage = room.storage;
    this.loadQuestionsFromStorage();
    //this.startAsyncProcessing();
  }

  async onConnect(conn: Connection, ctx: ConnectionContext) {
    console.log("Questions connection established", conn.id);
    try {
      await this.loadQuestionsFromStorage();
      this.sendQuestionsToClient(conn);
    } catch (e) {
      console.error(
        "Questions: onConnect: Error loading and sending questions to client"
      );
      this.reportError(e, "questions.onConnect", { connectionId: conn.id });
    }
    this.shouldProcess = true;
  }

  async onMessage(message: string, sender: Connection) {
    console.log("Questions message received", message);
    try {
      const parsedMessage = QuestionsClientMessageSchema.parse(
        JSON.parse(message)
      );
      const { type, payload } = parsedMessage;
      this.axiom.log({
        event: "questions.onMessage",
        message: parsedMessage,
        connectionId: sender.id,
        state: sender.state,
      });
      console.log("Question server received:", type, payload);

      switch (type) {
        case "TRANSCRIPT_UPDATE":
          await this.addTranscript(payload);
          break;
        case "FULL_TRANSCRIPT":
          this.transcript = payload;
          this.shouldProcess = true;
          break;
        case "SPEAKERS":
          this.speakers = payload;
          break;
        case "ANSWER_QUESTION":
          const question = this.questions.find(
            (q) => q.id === payload.questionId
          );
          if (question) {
            question.isAnswered = true;
            await this.broadcastQuestion(question);
            await this.saveQuestionsToStorage();
          }
          break;
      }
    } catch (error) {
      console.error("Error processing message:", error);
      const errorMessage =
        error instanceof Error ? error.message : "An unknown error occurred";
      sender.send(
        JSON.stringify({
          type: "ERROR",
          payload: { message: errorMessage },
        })
      );
      this.reportError(error, "questions.onMessage.error", {
        connectionId: sender.id,
        state: sender.state,
      });
    }
  }

  async onRequest(req: Request) {
    console.log("Questions request received");

    if (req.method === "POST") {
      let payload: any;
      try {
        payload = await req.json();
        // console.log("Questions: received payload:", JSON.stringify(payload));
        // const parsedMessage = QuestionsClientMessageSchema.safeParse(payload);

        // if (parsedMessage.error) {
        //   console.log("Question: onRequest error", parsedMessage.error);
        // }
        // console.log("Questions: parsed message", parsedMessage);
        // if (parsedMessage.success) {
        await this.processClientMessage(payload as QuestionsClientMessage);
        return new Response(JSON.stringify({ result: "Done" }), {
          status: 200,
        });
        // } else {
        //   return new Response(
        //     JSON.stringify({ error: "Invalid request body" }),
        //     {
        //       status: 400,
        //     }
        //   );
        // }
      } catch (error) {
        this.reportError(error, "questions.onRequest.error", { payload });
        return new Response(JSON.stringify({ error: "Invalid request body" }), {
          status: 400,
        });
      }
    }
    return new Response("Not found", { status: 404 });
  }

  private async processClientMessage(message: QuestionsClientMessage) {
    switch (message.type) {
      case "TRANSCRIPT_UPDATE":
        this.addTranscript(message.payload);
        break;
      case "FULL_TRANSCRIPT":
        // Handle full transcript update
        this.transcript = message.payload;
        break;
      case "SPEAKERS":
        // Handle speakers update
        this.speakers = message.payload;
        break;
      case "DETECTED_QUESTIONS":
        const { questions } = message.payload;
        for (const question of questions) {
          await this.processDetectedQuestion(question);
          this.lastProcessedTime = Math.max(
            this.lastProcessedTime,
            question.end_time
          );
        }
        break;
      // Add other cases as needed
      default:
        console.warn(`Unhandled message type: ${message?.type}`);
    }
  }

  private async startAsyncProcessing() {
    const existingAlarm = await this.room.storage.getAlarm();
    if (!existingAlarm) {
      await this.room.storage.put("roomId", this.room.id);
      // If there is no alarm, set an alarm to do LLM detection on the transcript
      await this.room.storage.setAlarm(
        new Date(new Date().getTime() + THROTTLED_LLM_DETECTION)
      );
    }
  }

  async onAlarm(): Promise<void> {
    try {
      // We throttle the calls to use an LLM for detecting script completion
      // This alarm will be called every few seconds
      this.axiom.log({
        event: "questions.onAlarm",
        shouldProcess: this.shouldProcess,
      });
      if (this.shouldProcess) {
        await this.processTranscript();
      }
    } catch (e) {
      this.reportError(e, "questions.onAlarm");
    } finally {
      const existingAlarm = await this.room.storage.getAlarm();
      if (!existingAlarm) {
        await this.room.storage.setAlarm(
          new Date(new Date().getTime() + THROTTLED_LLM_DETECTION)
        );
      }
    }
  }

  private async startClassificationWorker() {
    if (this.classificationQueue.length === 0) return;

    const item = this.classificationQueue.shift();
    if (item) {
      try {
        const classification = await this.classifyAsQuestion(item);
        // Update the classification attribute of the item in the transcript array
        const transcriptIndex = this.transcript.findIndex(
          (transcriptItem) => transcriptItem.id === item.id
        );
        if (transcriptIndex !== -1) {
          this.transcript[transcriptIndex].classification = classification;
        } else {
          console.warn(
            `Transcript item with id ${item.id} not found in transcript array`
          );
        }
        if (classification === "meaningful_question") {
          console.log("Detected a meaningful question");
          const context = await this.addContextToQuestion(item);
          console.log("Added context to question", context);
          if (context) {
            const question: Question = {
              id: item.id,
              speaker_id: item.speaker_id,
              question: context.question,
              start_time: item.words[0].start_time,
              end_time: item.words[item.words.length - 1].end_time,
              context: context.context,
              isAnswered: false,
            };
            this.questions.push(question);
            this.broadcastQuestion(question);
            this.saveQuestionsToStorage();
          }
        }
      } catch (error) {
        console.error("Error classifying transcript item:", error);
        this.reportError(error, "questions.classificationWorker", { item });
      }
    }

    // Schedule the next classification worker iteration
    setTimeout(() => this.startClassificationWorker(), 0);
  }

  async addTranscript(payload: TranscriptUpdate) {
    this.classificationQueue.push(payload);
    this.startClassificationWorker();
    this.axiom.log({ event: "questions.addTranscript", payload: payload });
    const existingIndex = this.transcript.findIndex(
      (item) => item.id === payload.id
    );
    if (existingIndex !== -1) {
      this.transcript[existingIndex] = payload;
    } else {
      this.transcript.push(payload);
    }
    this.transcript.sort((a, b) => {
      const aStartTime = a.words[0]?.start_time ?? 0;
      const bStartTime = b.words[0]?.start_time ?? 0;
      return aStartTime - bStartTime;
    });
    this.shouldProcess = true;
  }

  private async addContextToQuestion(
    item: TranscriptUpdate
  ): Promise<QuestionContext | null> {
    const systemMessage = `You are a question context extractor. Given a transcript and a question, extract the full question text and relevant context.`;

    const precedingItems = this.transcript
      .slice(
        0,
        this.transcript.findIndex((i) => i.id === item.id)
      )
      .reverse()
      .slice(0, 50);

    const precedingTranscript = precedingItems
      .map((i) => this.itemToTranscriptText(i))
      .filter((text) => text !== undefined)
      .join("\n");

    const itemText = item.words.map((w) => w.text).join(" ");

    const message = `Given the following transcript:
<preceding-transcript>
${precedingTranscript}
</preceding-transcript>

And the following question which has been detected as being asked:
<question>
${itemText}
</question>

Generate a full question, which might need to include text from the preceding transcript and provide a summary of the relevant context.
The question should be formatted as a question with a question mark, and should be standalone.
For instance, if the transcript is incorrect and is "And you tell me how fast the Cybertruck goes.", the question should be "How fast does the Cybertruck go?".
Use short, businesslike, concise language. Bullet points are allowed and should be in markdown if needed.
Do not make quality judgments about the question or the content.
Be useful and informative.
Imagine the salesperson is trying to understand the question asked by the customer and wants a short reminder of what the question is about.
Do not mention the transcript itself.
The result will be displayed to the user as a quick summary of what the question is about so that it easy to understand afterwards.
Return the result as a JSON object with "question" and "context" properties.`;

    try {
      const result = await this.openai.beta.chat.completions.parse({
        model: "gpt-4o-mini",
        messages: [
          { role: "system", content: systemMessage },
          { role: "user", content: message },
        ],
        temperature: 0.7,
        response_format: zodResponseFormat(QuestionContextSchema, "question"),
      });

      const parsed = result?.choices?.[0].message?.parsed;

      return parsed;
    } catch (error) {
      console.error("Error extracting question context:", error);
      this.reportError(error, "questions.addContextToQuestion", { item });
      return null;
    }
  }

  private async classifyAsQuestion(
    payload: TranscriptUpdate
  ): Promise<TranscriptQuestionClassification> {
    console.log("classifying as question", payload);
    //return Promise.resolve("unknown");
    const systemMessage = `You are a helpful assistant that classifies questions.`;

    const precedingItems = this.transcript
      .slice(
        0,
        this.transcript.findIndex((item) => item.id === payload.id)
      )
      .reverse()
      .filter(
        (item) =>
          item.speaker_id === payload.speaker_id &&
          item.classification === "non_question"
      )
      .slice(0, 3);

    const precedingText = precedingItems
      .map((item) => item.words.map((w) => w.text).join(" "))
      .join("\n");

    const itemText = payload.words.map((w) => w.text).join(" ");

    const message = `${precedingText} ${itemText}`;

    try {
      const result = await this.openai.chat.completions.create({
        model: "ft:gpt-4o-mini-2024-07-18:frontier-ai::AS195Esx",
        messages: [
          { role: "system", content: systemMessage },
          { role: "user", content: message },
        ],
        temperature: 0.7,
      });

      const classification = TranscriptQuestionClassificationSchema.parse(
        result?.choices?.[0]?.message.content ?? "unknown"
      );
      console.log("Classification result", classification);

      return classification;
    } catch (error) {
      console.error("Error detecting questions:", error);
      this.reportError(error, "questions.detectQuestions", {
        transcript: window,
      });
      return "unknown";
    }
  }

  private async processTranscript() {
    console.log("Questions: process transcript");
    const unprocessedTranscript = this.transcript.filter((item) => {
      const lastWordTime = item.words[item.words.length - 1]?.end_time ?? 0;
      return lastWordTime > this.lastProcessedTime;
    });

    if (unprocessedTranscript.length === 0) return;

    const success = await this.detectQuestionsWithDynamicWindow(
      unprocessedTranscript
    );

    this.shouldProcess = false;
    return success;
  }

  private async processDetectedQuestion(question: Question) {
    console.log("Detected question:", question);
    const existingQuestion = this.questions.find((q) => q.id === question.id);

    if (!existingQuestion) {
      // New question detected
      this.questions.push(question);
      await this.broadcastQuestion(question);
    } else if (JSON.stringify(existingQuestion) !== JSON.stringify(question)) {
      // Question updated
      Object.assign(existingQuestion, question);
      await this.broadcastQuestion(existingQuestion);
    }

    // Save questions to storage after processing
    await this.saveQuestionsToStorage();
  }
  private async detectQuestionsWithDynamicWindow(
    transcript: TranscriptUpdate[]
  ): Promise<boolean> {
    const windowStart = this.lastProcessedTime;
    const windowEnd =
      transcript[transcript.length - 1].words[
        transcript[transcript.length - 1].words.length - 1
      ].end_time;

    const window = transcript.filter((item) => {
      const itemStartTime = item.words[0]?.start_time ?? 0;
      const itemEndTime = item.words[item.words.length - 1]?.end_time ?? 0;
      return (
        (itemStartTime >= windowStart && itemStartTime < windowEnd) ||
        (itemEndTime > windowStart && itemEndTime <= windowEnd) ||
        (itemStartTime <= windowStart && itemEndTime >= windowEnd)
      );
    });

    if (window.length === 0) {
      // No new items in the window
      return true;
    }

    const success = await this.detectQuestions(window);

    // Update the last processed time to the end of the window
    this.lastProcessedTime = windowEnd;

    return success;
  }

  private itemToTranscriptText(item: TranscriptUpdate): string | undefined {
    try {
      const id = item.speaker_id;
      const startTime = item.words[0].start_time.toFixed(0);
      const endTime = item.words[item.words.length - 1].end_time.toFixed(0);
      const text = item.words.map((w) => w.text).join(" ");
      return `${id} : ${startTime} : ${endTime} : ${text}`;
    } catch (error) {
      console.error("Error converting item to transcript text:", error);
      this.reportError(error, "questions.itemToTranscriptText", { item });
      return undefined;
    }
  }
  private async detectQuestions(window: TranscriptUpdate[]): Promise<boolean> {
    const speakerInfo = this.speakers
      .map((s) => `Speaker ${s.id}: ${s.role}`)
      .join("\n");
    const transcriptText = window
      .map((item) => this.itemToTranscriptText(item))
      .filter((text) => text !== undefined)
      .join("\n");

    const windowStartIndex = this.transcript.findIndex(
      (item) => item === window[0]
    );
    const precedingItems = this.transcript.slice(
      Math.max(0, windowStartIndex - 50),
      windowStartIndex
    );
    const precedingTranscript = precedingItems
      .map((item) => this.itemToTranscriptText(item))
      .filter((text) => text !== undefined)
      .join("\n");

    const systemMessage = `You are a question classifier service. Given a transcript and speaker information, detect any questions asked by customers. For each question, provide the speaker ID, question text, start and end times, and relevant context.`;
    const recentQuestions = JSON.stringify(this.questions.slice(-5));
    const message = `Given the following transcript and speaker information:
The format of the transcript is as follows:
<speaker_id> : <start_time-end_time> : <text>
<speakers>
${speakerInfo}
</speakers>
<new-transcript>
${transcriptText}
</new-transcript>
<preceding-transcript
${precedingTranscript}
<recent-questions>
${recentQuestions}
</recent-questions>

Detect any new questions asked by customers on the call in the newly detected transcript.
The preceding transcript is provided to give context for the questions.
Treat the start_time as a unique identifier for each question, so if an existing question is detected, do not include it in the results.
For the last items in the transcript, only process them if there is a complete sentence ending in punctuation.
For each question, provide:
1. id: the ID of the question. This is the start_time of the question.
2. speaker_id: The ID of the speaker asking the question
3. question: The full text of the question
4. start_time: The start time of the question
5. end_time: The end time of the question
6. transcript_start_time: The start time of the first relevant context item
7. transcript_end_time: The end time of the last relevant context item
8. context: A summary of the relevant context

${CUSTOMER_ONLY ? 'Only include questions from speakers with the "Customer" role. Include context from all speakers.' : ""}

Do not refer to "The salesperson" or "The customer" in the questions context.
Use short, businesslike, concise language.
Do not make quality judgments about the questions or the content.
Be useful and informative.
Imagine the salesperson is trying to understand the questions asked by the customer and wants a short reminder of what the question is about.
Return the results as a JSON array of objects with these properties.
If no new questions are detected, return an empty array.`;

    console.log(systemMessage);
    console.log(message);

    const DetectionResponseSchema = z.object({
      questions: z.array(QuestionSchema),
    });

    try {
      const messages = [
        { role: "system", content: systemMessage },
        { role: "user", content: message },
      ];

      await openAiWithQueue({
        room: this.room,
        model: "gpt-4o-mini",
        messages,
        schema: DetectionResponseSchema,
        rootKey: "detected_questions",
        temperature: 0.7,
        callbackMessageType: "DETECTED_QUESTIONS",
      });
      return true;
    } catch (error) {
      console.error("Error detecting questions:", error);
      this.reportError(error, "questions.detectQuestions", {
        transcript: window,
      });
      return false;
    }
  }

  private async broadcastQuestion(question: Question) {
    const message: QuestionsServerMessage = {
      type: "QUESTION",
      payload: question,
    };
    this.room.broadcast(JSON.stringify(message));
  }

  private async sendQuestionsToClient(conn: Connection) {
    const message: QuestionsServerMessage = {
      type: "QUESTIONS",
      payload: this.questions,
    };
    conn.send(JSON.stringify(message));
  }

  private async saveQuestionsToStorage() {
    try {
      await this.storage.put<Question[]>("questions", this.questions);
    } catch (error) {
      this.reportError(error, "questions.saveQuestionsToStorage");
    }
  }

  private async loadQuestionsFromStorage() {
    try {
      const storedQuestions = await this.storage.get<Question[]>("questions");
      if (storedQuestions) {
        this.questions = storedQuestions;
      }
    } catch (error) {
      this.reportError(error, "questions.loadQuestionsFromStorage");
    }
  }
  private reportError(
    error: unknown,
    event: string,
    additionalInfo: { schema?: string; [key: string]: any } = {}
  ) {
    this.axiom.error(error, event, additionalInfo);
  }
}
