import { useState, useRef } from 'react';
import { css } from '@emotion/react';
import { Draft } from 'immer';
import { useAtom } from 'jotai';
import { useApiClient } from 'src/lib/controlPlane/client';
import {
  getKafkaConfigWithCredentials,
  getSchemaRegistryWithCredentials
} from 'src/lib/dataLoading/clickpipes/kafka';
import {
  ClickPipeKafkaImport,
  ClickPipeSourceType,
  ConfluentImport,
  KafkaImport,
  KafkaSaslMechanism,
  KafkaTopicFormat
} from 'shared/src/dataLoading/types';
import { schemaRegistryAtom, kafkaTopicsAtom } from 'src/state/dataLoading';
import { getCurrentServiceIdOrFail } from 'src/state/service';
import { useClickPipesImport } from 'src/components/ImportView/ClickPipesImportForm/hooks';
import TwoColsContainer from 'src/components/ImportView/ClickPipesImportForm/TwoColsContainer';
import {
  Alert,
  Button,
  Container,
  Icon,
  Link,
  PasswordField,
  Spacer,
  Switch,
  Panel,
  Text,
  TextField
} from '@clickhouse/click-ui';
import { UsernameInput } from 'src/components/ImportView/ClickPipesImportForm/SetupConnection/components/UsernameInput';
import { SaslMechanismSelect } from 'src/components/ImportView/ClickPipesImportForm/SetupConnection/components/SaslMechanismSelect';
import PanelCard from 'src/components/ImportView/ImportTypeSelector/PanelCard';

type ConnectionStatus = 'unknown' | 'error' | 'success';
type FlattenedFieldKey =
  | 'integrationName'
  | 'description'
  | 'group'
  | 'brokers'
  | 'registryUrl'
  | 'registrySecret'
  | 'registryKey';

type FieldKey = FlattenedFieldKey | 'username' | 'password' | 'authentication';

const isPresent = (value: string | undefined): boolean =>
  typeof value === 'string' && value.length > 0;

const defaultErrorState: Record<string, string | undefined> = {};

function validateConnectionObject(
  importModel: ClickPipeKafkaImport
): Map<FieldKey, string> {
  const { group, brokers, credentials, authentication } = importModel.data;
  const errors: Map<FieldKey, string> = new Map();

  if (!isPresent(group)) {
    errors.set('group', 'Field is required');
  }

  if (!isPresent(brokers)) {
    errors.set('brokers', 'Field is required');
  }

  if (!isPresent(credentials?.username)) {
    errors.set('username', 'Field is required');
  }

  if (!isPresent(credentials?.password)) {
    errors.set('password', 'Field is required');
  }

  if (!isPresent(authentication)) {
    errors.set('authentication', 'Field is required');
  }

  if (importModel.data.schemaRegistryEnabled) {
    if (!isPresent(importModel.data.registryKey)) {
      errors.set('registryKey', 'Field is required');
    }

    if (importModel.data.registryKey?.includes(' ')) {
      errors.set('registryKey', 'Invalid registry key');
    }

    if (!isPresent(importModel.data.registrySecret)) {
      errors.set('registrySecret', 'Field is required');
    }

    if (importModel.data.registrySecret?.includes(' ')) {
      errors.set('registrySecret', 'Invalid registry secret');
    }

    if (!isPresent(importModel.data.registryUrl)) {
      errors.set('registryUrl', 'Field is required');
    }

    if (importModel.data.registryUrl?.includes(' ')) {
      errors.set('registryUrl', 'Invalid registry URL');
    }

    if (!importModel.data.registryUrl?.startsWith('https://')) {
      errors.set(
        'registryUrl',
        'Invalid registry URL. Must begin with https://'
      );
    }
  }

  return errors;
}

const docsLink = 'https://clickhouse.com/docs/en/integrations/clickpipes';

function normalizeSchemaType(
  schemaType?: string
): KafkaTopicFormat | undefined {
  // TODO fix schemaType on the clickpipes side https://github.com/ClickHouse/clickpipes-platform/issues/1335
  if (!schemaType) {
    return 'AvroConfluent';
  }
  const type = schemaType.toLowerCase();
  if (type === 'json') return 'JSONEachRow';
  if (type === 'avro_confluent' || type === 'avroconfluent')
    return 'AvroConfluent';
  return undefined;
}

function isSchemaRegistrySupported(model: ClickPipeKafkaImport): boolean {
  if (
    model.type === 'kafka' ||
    model.type === 'confluent' ||
    model.type === 'redpanda' ||
    model.type === 'msk'
  ) {
    return true;
  }
  return false;
}

type KafkaPlaceholder = {
  name: string;
  integrationName: string;
  description: string;
  username: string;
  password: string;
  servers: string;
};

function getKafkaUIData(type: ClickPipeSourceType): KafkaPlaceholder {
  const defaultUsernamePlaceholder = 'Input your username';
  const defaultPasswordPlaceholder = 'Input your password';

  switch (type) {
    case 'kafka':
      return {
        name: 'Apache Kafka',
        integrationName: 'My new Kafka pipe',
        description: 'Custom Kafka ingestion',
        username: defaultUsernamePlaceholder,
        password: defaultPasswordPlaceholder,
        servers: 'Your Kafka broker: ex:localhost:9092'
      };
    case 'confluent':
      return {
        name: 'Confluent',
        integrationName: 'My new Confluent pipe',
        description: 'Custom Confluent Ingestion',
        username: 'Provide Confluent API Key',
        password: 'Provide Confluent API Secret',
        servers:
          'Your Confluent broker: ex:clickpipes.us-east-2.aws.confluent.cloud:9092'
      };
    case 'msk':
      return {
        name: 'MSK',
        integrationName: 'My new MSK pipe',
        description: 'Custom MSK Ingestion',
        username: defaultUsernamePlaceholder,
        password: defaultPasswordPlaceholder,
        servers:
          'Your MSK broker: ex:clickpipes.kafka.us-west-2.amazonaws.com:9196'
      };
    case 'redpanda':
      return {
        name: 'Redpanda',
        integrationName: 'My new Redpanda pipe',
        description: 'Custom Redpanda Ingestion',
        username: defaultUsernamePlaceholder,
        password: defaultPasswordPlaceholder,
        servers:
          'Your Redpanda broker: ex:clickpipes.prd.cloud.redpanda.com:9092'
      };
    case 'upstash':
      return {
        name: 'Upstash',
        integrationName: 'My new Upstash pipe',
        description: 'Custom Upstash Ingestion',
        username: defaultUsernamePlaceholder,
        password: defaultPasswordPlaceholder,
        servers: 'Your Upstash broker: ex:clickpipes.upstash.io:9092'
      };
    case 'warpstream':
      return {
        name: 'WarpStream',
        integrationName: 'My new WarpStream pipe',
        description: 'Custom WarpStream Ingestion',
        username: defaultUsernamePlaceholder,
        password: defaultPasswordPlaceholder,
        servers: 'Your WarpStream broker: ex:serverless.warpstream.com:9092'
      };
    case 'azureeventhub':
      return {
        name: 'Azure Event Hubs',
        integrationName: 'My new Event Hubs pipe',
        description: 'Custom Event Hubs Ingestion',
        username: '',
        password: 'Provide Connection String',
        servers:
          'Your Event Hubs Kafka surface: ex:clickpipes.servicebus.windows.net:9093'
      };
  }

  return {
    name: 'ClickPipe',
    integrationName: 'My new ClickPipe',
    description: 'Custom ClickPipes Ingestion',
    username: 'Select your username',
    password: 'Input your password',
    servers: 'Your Kafka broker: ex:localhost:9092'
  };
}

export function SetupKafkaConnection(): JSX.Element | null {
  const api = useApiClient();
  const serviceId = getCurrentServiceIdOrFail();
  const [loading, setLoading] = useState(false);
  const [, setTopics] = useAtom(kafkaTopicsAtom);
  const [, setSchema] = useAtom(schemaRegistryAtom);
  const [useSSLCertificate, setUseSSLCertificate] = useState(false);
  const [fileName, setFileName] = useState('');

  const [connectionStatus, setConnectionStatus] =
    useState<ConnectionStatus>('unknown');
  const [connectionError, setConnectionError] = useState<string | undefined>(
    undefined
  );
  const [schemaRegistryError, setSchemaRegistryError] = useState<
    string | undefined
  >(undefined);
  const [schemaRegistryConnectionStatus, setSchemaRegistryConnectionStatus] =
    useState<ConnectionStatus>('unknown');

  const [errors, setErrors] = useState(defaultErrorState);

  const { importModel, updateImportModel } =
    useClickPipesImport<ClickPipeKafkaImport>();

  const data = importModel.data;

  const isSchemaRegistryEnabled =
    isSchemaRegistrySupported(importModel) ||
    // for old clickpipes relying on JSON_SR for Confluent
    importModel.type === 'confluent';

  const onFieldChangeHandler =
    (field: FlattenedFieldKey, trim: boolean = false) =>
    (value: string) => {
      setConnectionStatus('unknown');
      setSchemaRegistryConnectionStatus('unknown');
      setError(field, undefined);
      updateImportModel<Draft<ConfluentImport>>((model) => {
        const newValue = trim ? value.trim() : value;
        model.data[field] = newValue;
      });
    };
  const handleFileChange = async (
    fileevent: React.ChangeEvent<HTMLInputElement>
  ) => {
    if (
      !fileevent.target.files ||
      fileevent.target.files.length === 0 ||
      fileevent.target.files[0] === null
    ) {
      return;
    }
    var fileEventName = fileevent.target.files[0].name;
    const strFile = await fileToString(fileevent.target.files[0]);
    updateImportModel<Draft<KafkaImport>>((model) => {
      model.data.sslCertificate = strFile;
      setFileName(fileEventName);
    });
  };

  const setIntegrationName = onFieldChangeHandler('integrationName');
  const setDescription = onFieldChangeHandler('description');
  const setBrokers = onFieldChangeHandler('brokers', true);
  const setGroup = onFieldChangeHandler('group');
  const setRegistryUrl = onFieldChangeHandler('registryUrl', true);
  const setRegistryKey = onFieldChangeHandler('registryKey', true);
  const setRegistrySecret = onFieldChangeHandler('registrySecret', true);

  const setSchemaRegistryEnabled = (value: boolean): void => {
    if (!value) {
      // reset state if user disables the schema registry switch
      updateImportModel<Draft<ConfluentImport>>((model) => {
        model.data.registryUrl = undefined;
        model.data.registryKey = undefined;
        model.data.registrySecret = undefined;
      });
    }

    updateImportModel<Draft<ConfluentImport>>((model) => {
      model.data.schemaRegistryEnabled = value;
    });
  };

  const setSaslMechanism = (value: KafkaSaslMechanism): void => {
    setConnectionStatus('unknown');
    updateImportModel<Draft<KafkaImport>>((model) => {
      model.data.authentication = value;
    });
  };

  const setUsername = (username: string): void => {
    setConnectionStatus('unknown');
    setError('username', undefined);
    updateImportModel<Draft<KafkaImport>>((model) => {
      const credentials = {
        username,
        password: model.data.credentials?.password ?? ''
      };

      model.data.credentials = credentials;
    });
  };

  const setPassword = (password: string): void => {
    setConnectionStatus('unknown');
    setError('password', undefined);
    updateImportModel<Draft<KafkaImport>>((model) => {
      const credentials = {
        username: model.data.credentials?.username ?? '',
        password
      };

      model.data.credentials = credentials;
    });
  };

  const fileToString = (file: File): Promise<string> => {
    if (!file || !(file instanceof File)) {
      return Promise.resolve('');
    }
    return new Promise((resolve, reject) => {
      const reader = new FileReader();
      reader.readAsText(file);
      reader.onload = () => resolve(reader.result as string);
      reader.onerror = (error) => reject(error);
    });
  };

  const setPasswordLabel = (importModel: ClickPipeKafkaImport): string => {
    if (importModel.type === 'confluent') {
      return 'API secret';
    } else if (importModel.type === 'azureeventhub') {
      return 'Connection String';
    } else {
      return 'Password';
    }
  };

  const setError = (key: FieldKey, error: string | undefined): void => {
    setErrors((oldErrors) => {
      const newErrors = { ...oldErrors };
      error === undefined ? delete newErrors[key] : (newErrors[key] = error);
      return newErrors;
    });
  };
  const fileInputRef = useRef<HTMLInputElement>(null);
  const validateModel = (): boolean => {
    let valid = true;

    if (!isPresent(importModel.data.integrationName)) {
      valid = false;
      setError('integrationName', 'Field is required');
    }

    const validationErrors = validateConnectionObject(importModel);
    for (const [field, message] of validationErrors.entries()) {
      setError(field, message);
    }

    return valid && validationErrors.size === 0;
  };

  const checkConnection = async (): Promise<boolean> => {
    try {
      const validationErrors = validateConnectionObject(importModel);
      for (const [field, message] of validationErrors.entries()) {
        setError(field, message);
      }
      if (validationErrors.size === 0) {
        const request = getKafkaConfigWithCredentials(importModel);
        const topics = await api.listKafkaTopics(request, serviceId);
        setConnectionError(undefined);
        // TODO handle no topics
        setTopics(topics || []);
        setConnectionStatus('success');
        return true;
      }
    } catch (error) {
      if (error instanceof Error) {
        const message = error.message ?? "Can't connect to the data source.";
        setConnectionError(message);
        setConnectionStatus('error');
      } else {
        setConnectionError("Can't connect to the data source.");
        setConnectionStatus('error');
      }
    }

    return false;
  };

  const checkSchema = async (): Promise<boolean> => {
    if (!importModel.data.schemaRegistryEnabled) {
      setSchema({ schema: '', schemaType: '' });
      return true;
    }

    try {
      // TODO add importModel validation
      const request = getSchemaRegistryWithCredentials(importModel);
      if (request.credentials === undefined || request.url === undefined) {
        console.error('Credentials are not defined');
        return false;
      }
      const schemaRecord = await api.getSchemaRegistry(
        {
          url: request.url,
          credentials: request.credentials
        },
        serviceId
      );

      const schemaType = normalizeSchemaType(schemaRecord.schemaType);
      if (schemaRecord.schema.length && schemaType) {
        setSchemaRegistryError(undefined);
        setSchema({
          schema: schemaRecord.schema,
          schemaType
        });
        updateImportModel<Draft<KafkaImport>>((model) => {
          model.data.format = schemaType;
        });
        setSchemaRegistryConnectionStatus('success');
        return true;
      } else {
        setSchemaRegistryError("Didn't retrieve schema.");
        setSchemaRegistryConnectionStatus('error');
        return false;
      }
    } catch (error) {
      const message = error instanceof Error ? error.message : undefined;
      setSchemaRegistryError(message ?? "Can't retrieve schema.");
      setSchemaRegistryConnectionStatus('error');
    }

    return false;
  };

  const validateSetupConnection = async (): Promise<void> => {
    const valid = validateModel();
    if (!valid) {
      return;
    }
    if (!useSSLCertificate) {
      updateImportModel((draft: Draft<ClickPipeKafkaImport>) => {
        draft.data.sslCertificate = '';
      });
    }

    if (connectionStatus === 'unknown') {
      const valid = (await checkConnection()) && (await checkSchema());

      if (!valid) {
        return;
      }
    }

    updateImportModel<Draft<KafkaImport>>((model) => {
      model.data.step = 2;
    });
  };

  const nextStep = async (): Promise<void> => {
    setLoading(true);
    await validateSetupConnection();
    setLoading(false);
  };

  const back = (): void => {
    setTopics([]);
    setSchema({ schema: '', schemaType: '' });
    updateImportModel((draft: Draft<ClickPipeKafkaImport>) => {
      draft.data.step = 0;
    });
  };

  const isNextStepDisabled =
    loading ||
    connectionStatus === 'error' ||
    schemaRegistryConnectionStatus === 'error';

  const labelName = getKafkaUIData(importModel.type).name;

  return (
    <Container orientation="vertical" gap="md">
      <Text>
        If you need any help to connect to {labelName}{' '}
        <Link href={docsLink} target="_blank" rel="noreferrer">
          access the documentation
        </Link>
        for more details.
      </Text>
      <Container orientation="vertical" gap="md" maxWidth="800px">
        <TwoColsContainer>
          <TextField
            label="Integration name"
            error={errors['integrationName']}
            name="integrationName"
            value={data.integrationName ?? ''}
            onChange={setIntegrationName}
            placeholder={getKafkaUIData(importModel.type).integrationName}
            data-testid="integration-name"
            className="fs-exclude"
            autoComplete="off"
          />
          <TextField
            label="Description"
            error={errors['description']}
            name="description"
            value={data.description ?? ''}
            onChange={setDescription}
            placeholder={getKafkaUIData(importModel.type).description}
            data-testid="description"
            className="fs-exclude"
            autoComplete="off"
          />
        </TwoColsContainer>

        <UsernameInput
          type={data.type}
          value={data.credentials?.username ?? ''}
          errors={errors}
          placeholder={getKafkaUIData(importModel.type).username}
          onChange={setUsername}
          setPassword={setPassword}
        />

        <PasswordField
          label={setPasswordLabel(importModel)}
          error={errors['password']}
          name="password"
          placeholder={getKafkaUIData(importModel.type).password}
          value={data.credentials?.password ?? ''}
          onChange={setPassword}
          data-testid="password"
          className="fs-exclude"
        />

        <SaslMechanismSelect
          value={data.authentication}
          type={data.type}
          onSelect={setSaslMechanism}
        />

        <TextField
          label="Consumer group"
          error={errors['group']}
          name="group"
          placeholder="Input your consumer group"
          value={data.group ?? ''}
          onChange={setGroup}
          data-testid="group"
          className="fs-exclude"
          autoComplete="off"
        />

        <TextField
          label="Servers"
          error={errors['brokers']}
          name="brokers"
          placeholder={getKafkaUIData(importModel.type).servers}
          value={data.brokers ?? ''}
          onChange={setBrokers}
          data-testid="brokers"
          className="fs-exclude"
        />
        {!loading && connectionStatus === 'error' && connectionError && (
          <Alert
            text={connectionError}
            state="danger"
            size="medium"
            showIcon={false}
          />
        )}
      </Container>

      <Container orientation="vertical" gap="md" maxWidth="800px">
        <Panel
          alignItems="start"
          color="default"
          hasBorder
          height=""
          orientation="vertical"
          padding="sm"
          radii="sm"
          width="100%"
        >
          <Container
            style={{ justifyContent: 'space-between' }}
            orientation="horizontal"
          >
            <Switch
              checked={useSSLCertificate}
              onClick={() => {
                setUseSSLCertificate(!useSSLCertificate);
              }}
              label="SSL certificate (Beta)"
              data-testid="use-ssl-switch"
              orientation="horizontal"
              dir="end"
            />
            <div>
              Read{' '}
              <Link
                target="_blank"
                href="https://clickhouse.com/docs/en/integrations/clickpipes/kafka#custom-certificates"
              >
                Docs <Icon name="share" size="sm" />
              </Link>
            </div>
          </Container>

          {useSSLCertificate && (
            <Container style={{ padding: '12x' }} orientation="horizontal">
              <input
                type="file"
                hidden={true}
                data-testid="sslCertificateUpload"
                accept=".crt,.pem,.cer,.key"
                onChange={handleFileChange}
                ref={fileInputRef}
              />
              {!fileName && (
                <PanelCard
                  title="Upload a file"
                  subTitle=".crt, .pem, .cer, or .key"
                  iconName="upload"
                  onClick={() => {
                    if (fileInputRef.current) {
                      fileInputRef.current.click();
                    }
                  }}
                />
              )}
              {!!fileName && (
                <PanelCard
                  title={fileName}
                  iconName="document"
                  onClick={() => {
                    if (fileInputRef.current) {
                      fileInputRef.current.click();
                    }
                  }}
                />
              )}
            </Container>
          )}
        </Panel>
      </Container>
      {isSchemaRegistryEnabled && (
        <Container orientation="vertical" gap="md" maxWidth="800px">
          <Panel
            alignItems="start"
            color="default"
            hasBorder
            height=""
            orientation="vertical"
            padding="sm"
            radii="sm"
            width="100%"
          >
            <Container
              style={{ justifyContent: 'space-between' }}
              orientation="horizontal"
            >
              <Switch
                onCheckedChange={setSchemaRegistryEnabled}
                label="Use Schema Registry"
                checked={!!importModel.data.schemaRegistryEnabled}
                data-testid="schema-registry-switch"
                orientation="horizontal"
                dir="end"
              />
              <div>
                Read{' '}
                <Link
                  target="_blank"
                  href="https://clickhouse.com/docs/en/integrations/clickpipes/kafka"
                >
                  Docs <Icon name="share" size="sm" />
                </Link>
              </div>
            </Container>
            {importModel.data.schemaRegistryEnabled && (
              <Container style={{ padding: '12px' }} orientation="vertical">
                <TextField
                  label="Schema URL"
                  error={errors['registryUrl']}
                  name="registryUrl"
                  placeholder="https://psrc-aa00.us-east-2.aws.confluent.cloud/schemas/ids/100004"
                  value={importModel.data.registryUrl ?? ''}
                  onChange={setRegistryUrl}
                  data-testid="registryUrl"
                  disabled={!importModel.data.schemaRegistryEnabled}
                  className="fs-exclude"
                />
                <Spacer size={'sm'} />
                <TextField
                  label="Key"
                  css={css({ position: 'relative' })}
                  error={errors['registryKey']}
                  name="registryKey"
                  placeholder="Type your key"
                  value={importModel.data.registryKey ?? ''}
                  onChange={setRegistryKey}
                  data-testid="registryKey"
                  disabled={!importModel.data.schemaRegistryEnabled}
                  className="fs-exclude"
                />
                <Spacer size={'sm'} />
                <PasswordField
                  label="Secret"
                  error={errors['registrySecret']}
                  name="secret"
                  placeholder="Type your secret"
                  value={importModel.data.registrySecret ?? ''}
                  onChange={setRegistrySecret}
                  data-testid="registrySecret"
                  disabled={!importModel.data.schemaRegistryEnabled}
                  className="fs-exclude"
                />
              </Container>
            )}
            {!loading &&
              schemaRegistryConnectionStatus === 'error' &&
              schemaRegistryError && (
                <Alert
                  text={schemaRegistryError}
                  state="danger"
                  size="medium"
                  showIcon={false}
                />
              )}
          </Panel>
        </Container>
      )}

      <Spacer size="xs" />
      <Container gap="md">
        <Button type="secondary" onClick={back}>
          Back
        </Button>
        <Button
          onClick={(): void => void nextStep()}
          loading={loading}
          data-testid="setup-conn-next-step"
          disabled={isNextStepDisabled}
        >
          Next: Incoming Data
        </Button>
      </Container>
      <Spacer size="xs" />
    </Container>
  );
}
