import { Row } from 'shared/src/clickhouse/types';

import { ChunkedQueryCallbacks } from 'src/lib/query/runningQueryTypes';
import { QueryArgs, StreamingQuery, streamingQuery } from 'src/lib/query/streamingQuery';

const maxChunkRows = 1000;
const maxChunkMilliseconds = 1000;

export type StreamingChunkedQueryArgs = QueryArgs & ChunkedQueryCallbacks;

export function streamingChunkedQuery({
  http,
  sql,
  runId,
  variables,
  connectionCredentials,
  serviceId,
  orgId,
  wakeService,
  isStopped,
  ...callbacks
}: StreamingChunkedQueryArgs): StreamingQuery {
  let buffer: Row[] = [];

  const flushRows = async (): Promise<void> => {
    if (buffer.length > 0) {
      const rows = buffer;
      buffer = [];
      await callbacks.onData(rows);
    }
  };

  const periodicFlush = (): void => {
    flushRows()
      .then(() => {
        timer = setTimeout(periodicFlush, maxChunkMilliseconds);
      })
      .catch((e) => {
        console.error('Error flushing query buffer', e);
      });
  };

  let timer = setTimeout(periodicFlush, maxChunkMilliseconds);

  return streamingQuery({
    http,
    sql,
    serviceId,
    runId,
    connectionCredentials,
    variables,
    onColumns: async (columns) => {
      await flushRows();
      await callbacks.onColumns(columns);
    },
    onMessage: async (message) => {
      await flushRows();
      await callbacks.onMessage(message);
    },
    onError: async (error, context) => {
      await flushRows();
      await callbacks.onError(error, context);
    },
    onEnd: async () => {
      window.clearTimeout(timer);
      await flushRows();
      await callbacks.onEnd();
    },
    onData: async (row) => {
      buffer.push(row);
      if (buffer.length >= maxChunkRows) {
        await flushRows();
      }
    },
    onRawData: async (data) => {
      await callbacks.onRawData(data);
    },
    onMetrics: async (metrics) => {
      await callbacks.onMetrics(metrics);
    },
    onStatus: async (isAwake) => {
      await callbacks.onStatus(isAwake);
    },
    onTotals: async (totals: Row) => {
      await callbacks.onTotals(totals);
    },
    onProgress: async (progress) => {
      await callbacks.onProgress(progress);
    },
    wakeService,
    orgId,
    isStopped
  });
}
