import { Draft } from 'immer';
import { useAtom } from 'jotai';
import { createToast } from 'primitives';
import { ReactElement, useEffect, useState } from 'react';
import { Alert, Button, Container, Select, Spacer } from '@clickhouse/click-ui';

import { useTables } from 'src/metadata/metadataState';
import { useApiClient } from 'src/lib/controlPlane/client';
import { errorMessage } from 'src/lib/errors/errorMessage';
import { schemaRegistryAtom } from 'src/state/dataLoading';
import { ClickPipeKafkaImport } from 'shared/src/dataLoading';
import { DataPreviewer } from 'src/lib/dataLoading/DataPreviewer';
import { useCurrentInstanceId } from 'src/instance/instanceController';
import { useSelectedDatabaseValue } from 'src/metadata/selectedDatabase';
import { useClickPipeOffsetOverride } from 'src/lib/dataLoading/feature';
import { KafkaOffsetStrategy, KafkaTopicFormat } from 'shared/src/dataLoading';
import { getKafkaConfigWithCredentials } from 'src/lib/dataLoading/clickpipes/kafka';
import Sample from 'src/components/ImportView/ClickPipesImportForm/IncomingData/Sample';
import Schema from 'src/components/ImportView/ClickPipesImportForm/IncomingData/Schema';
import { useClickPipesImport } from 'src/components/ImportView/ClickPipesImportForm/hooks';
import TwoColsContainer from 'src/components/ImportView/ClickPipesImportForm/TwoColsContainer';
import TopicSelector from 'src/components/ImportView/ClickPipesImportForm/IncomingData/TopicSelector';
import OffsetSelector from 'src/components/ImportView/ClickPipesImportForm/IncomingData/OffsetSelector';
import {
  convertSampleSchemaToColumns,
  isNoDataReceivedError
} from 'src/lib/dataLoading/clickpipes/samples';

export function KafkaSample(): ReactElement {
  const [loading, setLoading] = useState(false);
  const [enforceState, enforceUpdate] = useState<unknown[]>([]);
  const [error, setError] = useState<string | null>(null);
  const { importModel, updateImportModel, setTableConfig } =
    useClickPipesImport<ClickPipeKafkaImport>();
  const clickPipeOffsetOverrideEnabled = useClickPipeOffsetOverride();
  const tables = useTables();
  const api = useApiClient();
  const database = useSelectedDatabaseValue();
  const serviceId = useCurrentInstanceId() ?? '';
  const dataPreviewer = new DataPreviewer(api, serviceId, tables, database);

  const [sample, setSample] = useState<string | null>(null);
  const [flattenSample, setFlattenSample] = useState<string | null>(null);
  const [offset, setOffset] = useState<string>();
  const [partition, setPartition] = useState<string>();
  const [timestamp, setTimestamp] = useState<string>();

  const topic = importModel.data.topic;
  const format = importModel.data.format;
  const offsetStrategy = importModel.data.offsetStrategy;
  const offsetStrategyOverride = importModel.data.offsetOverride;
  const offsetValue = importModel.data.offsetValue;

  const [schemaRegistrySchema] = useAtom(schemaRegistryAtom);
  const schemaIsEnabled = importModel.data.schemaRegistryEnabled;

  function setFlatten(value: boolean): void {
    updateImportModel<Draft<ClickPipeKafkaImport>>((model) => {
      setError(null);
      model.data.flatten = value;
    });
  }

  const updateOffsetSelectionOverride = (value: string): void =>
    updateImportModel((draft: Draft<ClickPipeKafkaImport>) => {
      if (
        value === 'from_beginning' ||
        value === 'from_latest' ||
        value === 'from_selected'
      ) {
        draft.data.offsetOverride = value;
      } else {
        throw Error('Invalid offset strategy');
      }
    });

  const updateSampleFunc = async (value: string | undefined): Promise<void> => {
    const updateSample = async (
      topic: string,
      format: KafkaTopicFormat,
      offsetStrategy: KafkaOffsetStrategy,
      offsetValue: string,
      flatten: boolean
    ): Promise<void> => {
      setLoading(true);
      setError(null);
      const configWithCredentials = getKafkaConfigWithCredentials(importModel);
      try {
        if (value !== undefined) {
          offsetValue = value;
        }
        const { samples, schema, sampleValues, metadata } =
          await api.getKafkaSampleSchema(
            {
              ...configWithCredentials,
              flatten,
              topic,
              format,
              offsetStrategy,
              offsetValue
            },
            serviceId
          );

        updateImportModel((draft: Draft<ClickPipeKafkaImport>) => {
          draft.data.sourceSchema = [...schema];
          draft.data.rowsPreview = sampleValues;
        });

        const tableConfig = dataPreviewer.generateTableConfig(
          'newTable',
          topic,
          convertSampleSchemaToColumns(schema)
        );
        setTableConfig(tableConfig);

        const sample =
          samples.length > 0 ? JSON.stringify(samples[0], null, 2) : null;
        if (flatten) {
          setFlattenSample(sample);
        }

        setSample(sample);

        if (metadata.kafka !== undefined) {
          setOffset(metadata.kafka.offset);
          setPartition(metadata.kafka.partition);
          setTimestamp(metadata.kafka.timestamp);
        }
      } catch (error) {
        setError(errorMessage(error));
        setSample(null);
      } finally {
        setLoading(false);
      }
    };

    if (topic && format) {
      updateSample(
        topic,
        format,
        offsetStrategy,
        offsetValue,
        importModel.data.flatten
      ).catch(setError);
    } else {
      setSample(null);
    }
  };

  useEffect(() => {
    // reset when an action is taken so it doesn't linger confusing the user
    setOffset(undefined);
    setPartition(undefined);
    setTimestamp(undefined);

    updateSampleFunc(undefined).catch((error) => {
      console.error(error);
    }); // this should never happen, but just in case
  }, [
    topic,
    format,
    offsetStrategy,
    importModel.id,
    enforceState,
    importModel.data.flatten
  ]);

  const back = (): void => {
    updateImportModel((draft: Draft<ClickPipeKafkaImport>) => {
      draft.data.step = 1;
    });
  };

  const next = (): void => {
    if (loading) {
      return;
    }
    if (sample && topic && format) {
      updateImportModel((draft: Draft<ClickPipeKafkaImport>) => {
        draft.data.step = 3;
      });
    } else {
      createToast(
        'Error',
        'alert',
        "Can't proceed with current configuration. No data found"
      );
    }
  };

  return (
    <Container orientation="vertical" gap="md" maxWidth="800px">
      <Schema
        schema={schemaRegistrySchema}
        enabled={schemaIsEnabled}
        consumerGroup={importModel.data.group}
      />

      <TwoColsContainer>
        <TopicSelector />

        <OffsetSelector
          topicDefined={importModel.data.topic !== undefined}
          updateSampleFunc={updateSampleFunc}
        />
      </TwoColsContainer>

      {!importModel.data.schemaRegistryEnabled && (
        <>
          <Alert
            customIcon="information"
            showIcon
            size="small"
            state="info"
            text="If a schema registry is not provided, we expect JSON format in the topic"
            title="We expect JSON format"
            type="default"
          />
        </>
      )}

      {clickPipeOffsetOverrideEnabled &&
        importModel.data.offsetStrategy !== 'from_beginning' && (
          <Container orientation="vertical" gap="md">
            <Select
              data-testid="offset-select"
              value={offsetStrategyOverride}
              onSelect={updateOffsetSelectionOverride}
              label="Consume from offset"
              className="fs-exclude"
            >
              <Select.Item value="from_selected" className="fs-exclude">
                Start from selected
              </Select.Item>
              <Select.Item value="from_beginning" className="fs-exclude">
                Start from beginning
              </Select.Item>
              <Select.Item value="from_latest" className="fs-exclude">
                Start from latest
              </Select.Item>
            </Select>
          </Container>
        )}

      <Sample
        setFlatten={setFlatten}
        flattenSample={flattenSample}
        flatten={importModel.data.flatten}
        format={format}
        sample={sample}
        loading={loading}
        error={error}
        offsetInfo={{ offset, partition, timestamp }}
      />

      <Spacer size="xs" />

      <Container gap="md">
        <Button type="secondary" onClick={back}>
          Back
        </Button>
        {error && isNoDataReceivedError(error) ? (
          <Button onClick={(): void => enforceUpdate([])}>Retry</Button>
        ) : (
          <Button
            onClick={next}
            loading={loading}
            data-testid="incoming-data-next-step"
            disabled={Boolean(loading || error || !topic)}
          >
            Next: Parse Information
          </Button>
        )}
      </Container>
    </Container>
  );
}
