import { Row } from 'shared/src/clickhouse/types';

import { JSONArrayValue, jsonIsArray, jsonIsObject, JSONValue, parseJSON } from 'shared/src/json';
import { WakeServiceResponse } from 'shared/src/types/service';
import { parseQuery } from 'src/lib/clickhouse/parseQuery';
import config from 'src/lib/config';
import { Credential } from 'src/state/connection';

import { errorMessage } from 'src/lib/errors/errorMessage';
import { HttpClient } from 'src/lib/http';

import { selectTimeoutSeconds } from 'src/lib/query/constants';
import { LineReader } from 'src/lib/query/LineReader';
import { dumpClientState } from 'src/lib/dumpClientState';

export class ResponseFormatError extends Error {}

function parseColumn(col: JSONValue): Column {
  if (
    !(
      jsonIsObject(col) &&
      'type' in col &&
      'name' in col &&
      typeof col.type === 'string' &&
      typeof col.name === 'string'
    )
  ) {
    throw new ResponseFormatError('Invalid Column');
  }
  return {
    name: col.name,
    type: col.type
  };
}

function parseColumnsMessage(message: JSONArrayValue): Array<Column> {
  if (message.length < 2 || message[0] !== 'columns') {
    throw new ResponseFormatError('Invalid columns message');
  }
  const columns = message[1];
  if (!jsonIsArray(columns)) {
    throw new ResponseFormatError('Invalid columns message payload');
  }
  return columns.map(parseColumn);
}

function parseDataElement(element: JSONValue): string | null {
  if (element !== null && typeof element !== 'string') {
    throw new ResponseFormatError('Invalid data message');
  }
  return element;
}

function parseDataMessage(message: JSONArrayValue): Row {
  if (message.length < 2 || message[0] !== 'row') {
    throw new ResponseFormatError('Invalid data message');
  }
  const columns = message[1];
  if (!jsonIsArray(columns)) {
    throw new ResponseFormatError('Invalid data message payload');
  }
  return columns.map(parseDataElement);
}

function parseTotals(message: JSONArrayValue): Row {
  if (message.length < 2 || message[0] !== 'totals') {
    throw new ResponseFormatError('Invalid totals');
  }
  const columns = message[1];
  if (!jsonIsArray(columns)) {
    throw new ResponseFormatError('Invalid data message payload');
  }
  return columns.map(parseDataElement);
}

function parseMetricsMessage(message: JSONArrayValue): QueryMetrics {
  if (message.length < 2 || message[0] !== 'metrics') {
    throw new ResponseFormatError('Invalid metrics message');
  }
  const metricsObj = message[1];
  if (
    !jsonIsObject(metricsObj) ||
    typeof metricsObj.rows !== 'number' ||
    typeof metricsObj.bytes !== 'number' ||
    typeof metricsObj.durationMS !== 'number'
  ) {
    throw new ResponseFormatError('Invalid metrics message');
  }
  return {
    rows: metricsObj.rows,
    bytes: metricsObj.bytes,
    durationMS: metricsObj.durationMS
  };
}

function parseStatusMessage(message: JSONArrayValue): boolean {
  if (
    message.length < 2 ||
    message[0] !== 'status' ||
    !jsonIsObject(message[1]) ||
    !('isAwake' in message[1] || typeof message[1].isAwake !== 'number')
  ) {
    throw new ResponseFormatError('Invalid status message');
  }

  return message[1].isAwake !== 0;
}

function parseErrorContext(context: JSONValue | undefined): ErrorContext {
  const result: ErrorContext = { type: 'server' };
  if (context !== undefined && jsonIsObject(context)) {
    if (typeof context.statementIndex === 'number') {
      result.statementIndex = context.statementIndex;
    }
  }
  return result;
}

export function parseQueryProgress(message: JSONArrayValue): QueryProgress {
  if (message.length < 2 || message[0] !== 'progress') {
    throw new ResponseFormatError('Invalid progress message');
  }
  const progressObj = message[1];
  if (
    !jsonIsObject(progressObj) ||
    typeof progressObj.read_rows !== 'number' ||
    typeof progressObj.read_bytes !== 'number' ||
    typeof progressObj.written_rows !== 'number' ||
    typeof progressObj.written_bytes !== 'number' ||
    typeof progressObj.total_rows_to_read !== 'number' ||
    typeof progressObj.result_rows !== 'number' ||
    typeof progressObj.result_bytes !== 'number' ||
    typeof progressObj.elapsed_ns !== 'number' ||
    typeof progressObj.peak_memory_usage !== 'number'
  ) {
    throw new ResponseFormatError('Invalid progress message');
  }
  return {
    read_rows: progressObj.read_rows,
    read_bytes: progressObj.read_bytes,
    written_rows: progressObj.written_rows,
    written_bytes: progressObj.written_bytes,
    total_rows_to_read: progressObj.total_rows_to_read,
    result_rows: progressObj.result_rows,
    result_bytes: progressObj.result_bytes,
    elapsed_ns: progressObj.elapsed_ns,
    peak_memory_usage: progressObj.peak_memory_usage
  };
}

interface Column {
  name: string;
  type: string;
}

type StreamingQueryResponse = WakeServiceResponse;

export interface StreamingQuery {
  runId: string;
  abort: () => Promise<void>;
  completion: Promise<StreamingQueryResponse>;
}

export interface QueryArgs {
  connectionCredentials: Credential | null;
  http: HttpClient;
  isStopped: boolean;
  serviceId: string;
  orgId: string;
  runId: string;
  skipConnectionInit?: boolean;
  sql: string;
  variables?: Record<string, string>;
  wakeService: boolean;
}
export type StreamingQueryArgs = QueryArgs & QueryCallbacks;

export interface QueryMetrics {
  rows: number;
  bytes: number;
  durationMS: number;
}

export type QueryProgress = {
  // Number of rows read.
  read_rows: number;
  // Volume of data read in bytes.
  read_bytes: number;
  // Number of rows written.
  written_rows: number;
  // Volume of data written in bytes.
  written_bytes: number;
  // Total number of rows to be read.
  total_rows_to_read: number;
  result_rows: number;
  result_bytes: number;
  elapsed_ns: number;
  peak_memory_usage: number;
};

export type QueryErrorType =
  // User aborted the query
  | 'aborted'

  // We failed to parse the response from the server
  | 'responseFormat'

  // other error internal to the web querying code
  | 'clientInternal'

  // Error connecting to the API
  | 'network'

  // Error sent from the API. We should eventually distinguish between types of errors coming
  // from server
  | 'server';

export interface ErrorContext {
  statementIndex?: number;
  type: QueryErrorType;
}

export interface QueryCallbacks {
  onColumns: (columns: Column[]) => Promise<void>;
  onData: (rows: Row) => Promise<void>;
  onRawData: (data: string) => Promise<void>;
  onEnd: () => Promise<void>;
  onError: (e: string, errorContext: ErrorContext) => Promise<void>;
  onMessage: (result: string) => Promise<void>;
  onMetrics: (metrics: QueryMetrics) => Promise<void>;
  onStatus: (isAwake: boolean) => Promise<void>;
  onTotals: (totals: Row) => Promise<void>;
  onProgress: (progress: QueryProgress) => Promise<void>;
}

async function dispatchMessage(line: string, callbacks: QueryCallbacks): Promise<boolean> {
  const message = parseJSON(line) as string[];
  if (!Array.isArray(message) || message.length < 1) {
    await callbacks.onError('Invalid message', { type: 'responseFormat' });
    return false;
  }
  const messageType = message[0];
  switch (messageType) {
    case 'columns':
      await callbacks.onColumns(parseColumnsMessage(message));
      break;
    case 'row':
      await callbacks.onData(parseDataMessage(message));
      break;
    case 'raw-data':
      await callbacks.onRawData(message[1]);
      break;
    case 'error':
      if (message.length < 2 || typeof message[1] !== 'string') {
        throw new ResponseFormatError('Invalid error message');
      }

      await callbacks.onError(message[1], parseErrorContext(message[2]));
      break;
    case 'message':
      if (message.length < 2 || typeof message[1] !== 'string') {
        throw new ResponseFormatError('Invalid message');
      }
      await callbacks.onMessage(message[1]);
      break;
    case 'metrics':
      await callbacks.onMetrics(parseMetricsMessage(message));
      break;
    case 'status':
      await callbacks.onStatus(parseStatusMessage(message));
      break;
    case 'totals':
      await callbacks.onTotals(parseTotals(message));
      break;
    case 'progress':
      await callbacks.onProgress(parseQueryProgress(message));
      break;
    default:
      if (typeof messageType === 'string') {
        console.warn(`Unknown result message type ${messageType}`);
      } else {
        console.warn(`Unknown result message type ${JSON.stringify(messageType)}`);
      }
  }
  return true;
}

export function isAbortError(e: unknown): e is DOMException {
  return e instanceof DOMException && e.name === 'AbortError';
}

export function streamingQuery({
  connectionCredentials,
  http,
  isStopped,
  serviceId,
  orgId,
  runId,
  skipConnectionInit,
  sql,
  variables,
  wakeService,
  ...callbacks
}: StreamingQueryArgs): StreamingQuery {
  const url = `${config.apiUrl}/queryStream`;
  const abortUrl = `${config.apiUrl}/queryAbort`;
  const queries = parseQuery(sql);
  const abortController = new AbortController();

  let aborted = false;

  const abort = async (): Promise<void> => {
    aborted = true;
    abortController.abort();
    http
      .post(abortUrl, {
        headers: {
          'referrer-url': window.location.href
        },
        body: JSON.stringify({
          runId,
          serviceId,
          credential: connectionCredentials
        })
      })
      .catch((e) => {
        console.error('Error aborting query', e);
      });
  };

  const makeCompletion = async (): Promise<StreamingQueryResponse> => {
    let response: Response;

    const headers: Record<string, string> = {};

    if (wakeService) {
      headers['Wake-Service'] = '1';
    }

    if (skipConnectionInit) {
      headers['Skip-Connection-Init'] = '1';
    }

    try {
      response = await http.post(url, {
        signal: abortController.signal,
        headers: {
          ...headers,
          'referrer-url': window.location.href
        },
        body: JSON.stringify({
          queries,
          runId,
          orgId,
          serviceId,
          credential: connectionCredentials,
          selectTimeoutSeconds,
          queryVariables: variables
        })
      });
    } catch (e) {
      const type = isAbortError(e) ? 'aborted' : 'network';
      void callbacks.onError(errorMessage(e), { type });
      void callbacks.onEnd();
      return {
        wakeServiceConfirmation: false
      };
    }

    void callbacks.onStatus(response.headers.get('X-Database-Is-Awake') !== '0');

    if (response.status === 206) {
      const { data } = (await response.json()) as { data: string };
      return {
        wakeServiceConfirmation: !isStopped && data !== 'Service is stopped'
      };
    }

    if (!response.ok) {
      let errorMessage: string = response.statusText;
      try {
        const errorBody = (await response.json()) as unknown;
        if (
          !!errorBody &&
          typeof errorBody === 'object' &&
          'error' in errorBody &&
          typeof errorBody.error === 'string'
        ) {
          errorMessage = errorBody.error;

          if (
            config.env !== 'production' &&
            errorMessage &&
            errorMessage.includes('Inconsistent client state detected')
          ) {
            dumpClientState();
            console.log(
              JSON.stringify(
                {
                  isStopped,
                  queries,
                  runId,
                  orgId,
                  serviceId,
                  credential: connectionCredentials,
                  selectTimeoutSeconds,
                  queryVariables: variables,
                  headers
                },
                null,
                2
              )
            );
          }
        }
      } catch (e) {
        // eslint-disable-next-line no-empty
      }
      void callbacks.onError(errorMessage, { type: 'network' });
      void callbacks.onEnd();
      return {
        wakeServiceConfirmation: false
      };
    }

    if (!response.body) {
      throw new Error('No response body');
    }

    let line: string | undefined;
    const reader = response.body.getReader();
    const lineReader = new LineReader(reader);
    try {
      while (!aborted) {
        line = await lineReader.getLine();
        if (line === undefined) {
          await callbacks.onEnd();
          break;
        } else if (line.length > 0) {
          if (!(await dispatchMessage(line, callbacks))) {
            break;
          }
        }
      }
    } catch (e) {
      if (isAbortError(e)) {
        await callbacks.onError(String(e), { type: 'aborted' });
      } else if (e instanceof ResponseFormatError) {
        await callbacks.onError(String(e), { type: 'responseFormat' });
      } else {
        await callbacks.onError(String(e), { type: 'clientInternal' });
      }
      await callbacks.onEnd();
    }

    return {
      wakeServiceConfirmation: false
    };
  };

  return {
    runId,
    abort,
    completion: makeCompletion()
  };
}
