import { callEin } from "spread/ein";

const POLL_INTERVAL = 2000;

export type FlowExecutionStatus =
  | "QUEUED"
  | "PROCESSING"
  | "SUCCESS"
  | "FAILURE"
  | "TERMINATED";
export interface FlowExecutionResult {
  id: string;
  status: FlowExecutionStatus;
  resultPayload: unknown;
  createdAt: string;
  finishedAt: string;
}

export async function runFlow(
  id: string,
  parameters?: Record<string, unknown>,
): Promise<unknown> {
  try {
    const executionResult = await executeFlowWithPolling(id, parameters);

    if (executionResult.status !== "SUCCESS") {
      throw new Error(
        `Flow execution finished with status "${executionResult.status}".`,
      );
    }

    return executionResult.resultPayload;
  } catch (e) {
    const stringifiedParams = parameters ? JSON.stringify(parameters) : "";
    throw new Error(
      `[Flow "${id}"${stringifiedParams ? ` (${stringifiedParams})` : ""})] Execution error: ${(e as any).message}`,
    );
  }
}

/**
 * Poll `fn` every `interval` milliseconds until it returns a truthy value.
 */
async function poll<T>(
  fn: () => Promise<T | null>,
  interval: number,
): Promise<T> {
  const result = await fn();
  if (result) {
    return result;
  }

  return new Promise((resolve) => {
    setTimeout(async () => {
      resolve(await poll(fn, interval));
    }, interval);
  });
}

function isFlowFinalStatus(status: FlowExecutionStatus) {
  return (
    status === "SUCCESS" || status === "FAILURE" || status === "TERMINATED"
  );
}

export async function executeFlowWithPolling(
  id: string,
  params?: Record<string, unknown>,
) {
  const FLOW_EXECUTION_RESULT_FRAGMENT = /* GraphQL */ `
    fragment FlowExecutionResultFragment on FlowExecution {
      id
      status
      resultPayload
      createdAt
      finishedAt
    }
  `;

  const { executeFlow: startExecutionResult } = await callEin<{
    executeFlow: FlowExecutionResult;
  }>(
    /* GraphQL */ `
      ${FLOW_EXECUTION_RESULT_FRAGMENT}

      mutation ExecuteFlow($flowId: ID!, $input: ExecuteFlowInput!) {
        executeFlow(id: $flowId, input: $input) {
          ...FlowExecutionResultFragment
        }
      }
    `,
    {
      flowId: id,
      input: {
        parameters: params
          ? Object.entries(params)
              // filter out undefined values
              .filter(([, value]) => value !== undefined)
              .map(([name, value]) => ({
                name,
                value,
              }))
          : null,
      },
    },
  );

  if (isFlowFinalStatus(startExecutionResult.status)) {
    return startExecutionResult;
  }

  // wait before starting to poll
  await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL));

  const finalResult = await poll(async () => {
    const { flowExecutions } = await callEin<{
      flowExecutions: { items: FlowExecutionResult[] };
    }>(
      /* GraphQL */ `
        ${FLOW_EXECUTION_RESULT_FRAGMENT}

        query GetFlowExecution($flowId: ID!) {
          flowExecutions(where: { id: $flowId }) {
            items {
              ...FlowExecutionResultFragment
            }
          }
        }
      `,
      { flowId: startExecutionResult.id },
    );
    const execution = flowExecutions.items[0];

    if (!isFlowFinalStatus(execution.status)) {
      // continue polling
      return null;
    }

    return execution;
  }, POLL_INTERVAL);

  return finalResult;
}
