import { Replace } from '@cp/common/protocol/Common';
import {
  MIN_WEBSOCKET_CONNECTION_DURATION_IN_MILLIS,
  SubscriptionDetails,
  UpdateAccessTokenRequest,
  WebSocketsAcknowledgeMessageRequest,
  WebSocketsClientMessage,
  WebSocketsMessage,
  WebSocketsRequest,
  WebSocketsSubscribeRequest
} from '@cp/common/protocol/WebSocket';
import { assertTruthy, truthy } from '@cp/common/utils/Assert';
import { MILLIS_PER_MINUTE } from '@cp/common/utils/DateTimeUtils';
import { getOrSetDefault } from '@cp/common/utils/MiscUtils';
import { sleep } from '@cp/common/utils/TestUtils';
import { isEmpty } from '@cp/common/utils/ValidationUtils';
import { v4 as uuidv4 } from 'uuid';

const withMiddleWare =
  <T, R>(middleware?: (e: T) => void) =>
  (fn: (e: T) => R) =>
  (e: T): R => {
    if (middleware) {
      middleware(e);
    }
    return fn(e);
  };

export interface ConnectOptions {
  getAccessToken?: () => Promise<string | null>;
  webSocketsEndpoint?: string;
  /** Extends the client onopen function. It is run whenever the opopen function is run */
  onOpen?: () => void;
  /** Extends the client onclose function. It is run whenever the onclose function is run */
  onClose?: (c: CloseEvent) => void;
  /** Extends the client onmessage function. It is run whenever the onmessage function is run */
  onMessage?: (m: MessageEvent<string>) => void;
  /** Extends the client onerror function. It is run whenever the onerror function is run */
  onError?: (e: Event) => Promise<void>;
  /** Extends the client send function. It is run whenever the send function is run */
  onSendMessage?: <T>(m: WebSocketsClientMessage<T>) => void;
}

export type WebSocketsNotificationHandler<PayloadType> = (
  notification: Replace<WebSocketsMessage, 'payload', PayloadType>
) => void;

interface WebSocketsNotificationHandlerWithMeta<PayloadType = any> {
  handler: WebSocketsNotificationHandler<PayloadType>;
  id: string;
  subscriptionDetails: SubscriptionDetails;
}

let ws: WebSocket | undefined;
const subscriptions = new Map<SubscriptionDetails, number>();
const uniqueSubscriptions = new Map<string, SubscriptionDetails>();
let clientId = uuidv4();
const seenMessages = new Set<string>();
let retrying = false;
const application = 'control-plane';
let accessToken: string | null;
let getAccessToken: () => Promise<string | null>;
let webSocketsEndpoint: string;
let listeners: Array<WebSocketsNotificationHandlerWithMeta> = [];
let setConnectionReady: (value: string) => void;
let setConnectionNotReady: (value: string) => void;
let isConnectionReadyPromise: Promise<string> | undefined;
let refreshIntervalId: ReturnType<typeof setInterval> | undefined;
const notificationParts: Record<string, Array<string>> = {};

let sendMessage = <T>({ rpcAction, request }: WebSocketsClientMessage<T>): void => {
  console.log('WebSocketService: sending message to the server: ', rpcAction, request);
  // Websocket 'action' must be synchronized with 'websocket.route' in serverless.yml.
  // See description of 'messageHandler' in serverless.yml.
  // We use a common 'websocketMessage' action for all WS actions except connect/disconnect,
  // because connect/disconnect events use different routes in AWS Gateway API.
  const wsMessage: WebSocketsRequest<T> = {
    action: 'websocketMessage',
    rpcAction,
    request
  };

  truthy(ws, 'Invalid Web Socket connection').send(JSON.stringify(wsMessage));
};

function initConnectionReadyPromise(): void {
  if (!isConnectionReadyPromise) {
    isConnectionReadyPromise = new Promise((res, rej) => {
      setConnectionReady = res;
      setConnectionNotReady = (s: string): void => {
        rej(s);
        isConnectionReadyPromise = undefined;
      };
    });
    isConnectionReadyPromise.catch(console.log);
  }
}

function onOpen(): void {
  resubscribeIfNeeded();
  setConnectionReady('WebSocket connection was opened successfully');
}

export function buildNotification(
  rawNotification: WebSocketsMessage,
  partsStorage: Record<string, Array<string>>
): {
  isNotificationComplete: boolean;
  notification: WebSocketsMessage;
} {
  const {
    messageId,
    part = 1,
    payload,
    totalParts = 1
  } = rawNotification as Omit<WebSocketsMessage, 'payload'> & { payload: string };
  const parts = partsStorage[messageId] ?? Array.from({ length: totalParts }).fill(null);

  parts[part - 1] = payload;

  partsStorage[messageId] = parts;

  const allPartsReceived = parts.every((p) => p !== null);

  if (allPartsReceived) {
    const finalPayload = parts.join('');
    delete partsStorage[messageId];

    return {
      isNotificationComplete: true,
      notification: { ...rawNotification, payload: JSON.parse(finalPayload) as unknown }
    };
  }

  return { isNotificationComplete: false, notification: { ...rawNotification } };
}

function onMessage(message: MessageEvent<string>): void {
  if (isEmpty(message.data)) {
    return;
  }
  const rawNotification = JSON.parse(message.data) as WebSocketsMessage;

  if (!rawNotification.messageId) {
    return;
  }

  const { isNotificationComplete, notification } = buildNotification(rawNotification, notificationParts);

  if (!isNotificationComplete) {
    return;
  }

  acknowledgeMessage(notification);

  console.debug('WebSocketService: got notification', notification?.subscription?.type, notification);

  if (!Object.keys(notification).length) {
    return;
  }

  if (seenMessages.has(notification.messageId)) {
    return;
  }

  seenMessages.add(notification.messageId);

  listeners.forEach((listener) => {
    if (
      notification.subscription.objId === listener.subscriptionDetails.objId &&
      notification.subscription.type === listener.subscriptionDetails.type
    ) {
      listener.handler(notification as Replace<WebSocketsMessage, 'payload', unknown>);
    }
  });
}

async function onClose(e: CloseEvent): Promise<void> {
  console.log('Got a websockets close event', e);
  if (!ws || e.target !== ws) {
    return;
  }

  disconnect(false);
  await onError(e);
}

async function onErrorMessage(e: Event): Promise<void> {
  console.log('Got a websockets error event', e);

  if (!ws || e.target !== ws) {
    return;
  }

  disconnect(false);
  await onError(e);
}

async function connect({
  getAccessToken: getAccessTokenProp,
  webSocketsEndpoint: webSocketsEndpointProp,
  onOpen: onOpenProp,
  onClose: onCloseProp,
  onMessage: onMessageProp,
  onError: onErrorProp,
  onSendMessage: onSendMessageProp
}: ConnectOptions = {}): Promise<void> {
  getAccessToken = getAccessTokenProp ?? getAccessToken;
  webSocketsEndpoint = webSocketsEndpointProp ?? webSocketsEndpoint;
  assertTruthy(webSocketsEndpoint, 'Websocket endpoint not defined');
  accessToken = (await getAccessToken?.()) ?? '';

  initConnectionReadyPromise();

  if (!accessToken) {
    disconnect(false);
    return;
  }

  if (ws) {
    try {
      await isConnectionReadyPromise;
      updateAccessToken(accessToken);
      return;
    } catch (e) {
      console.log(e);
    }
  }

  console.log(`Connecting to websockets, clientId: ${clientId}`);
  ws = new WebSocket(`${webSocketsEndpoint}?at=${accessToken}&ci=${clientId}&ap=${application}`);

  ws.onopen = withMiddleWare(onOpenProp)(onOpen);

  ws.onmessage = withMiddleWare(onMessageProp)(onMessage);

  ws.onclose = withMiddleWare(onCloseProp)(onClose);

  ws.onerror = withMiddleWare(onErrorProp)(onErrorMessage);

  const defaultSendMessage = sendMessage;
  sendMessage = withMiddleWare(onSendMessageProp)(defaultSendMessage);

  try {
    await isConnectionReadyPromise;
  } catch (e) {
    console.log('Could not establish a websocket connection. Disconnected');
  }

  keepConnectionAlive();
}

async function refreshConnection(getAccessToken: () => Promise<string | null>): Promise<void> {
  const accessToken = (await getAccessToken()) || '';
  updateAccessToken(accessToken);
}

function keepConnectionAlive(): void {
  if (refreshIntervalId) {
    return;
  }

  refreshIntervalId = setInterval(() => {
    isConnectionReadyPromise
      ?.then(() => {
        refreshConnection(getAccessToken).catch(console.error);
      })
      .catch(console.error);
  }, MIN_WEBSOCKET_CONNECTION_DURATION_IN_MILLIS - MILLIS_PER_MINUTE);
}

function disconnect(shouldClearSubscriptions = true): void {
  if (shouldClearSubscriptions) {
    clearSubscriptions();
  }

  destroyClient();
  setConnectionNotReady('Disconnected from WebSockets');
  clearInterval(refreshIntervalId);
  refreshIntervalId = undefined;
}

function destroyClient(): void {
  if (ws) {
    try {
      const webSocket = ws;
      ws = undefined;
      // The websocket needs to be cleared before calling close() because the close handler checks for the websocket
      // existence.
      webSocket.close();
      setConnectionNotReady('Connection closed');
    } catch (e) {
      console.error('Error while disconnecting from websocket', e);
    }
  }
}

function clearSubscriptions(): void {
  clientId = uuidv4();
  subscriptions.clear();
  uniqueSubscriptions.clear();
  listeners = [];
}

function updateAccessToken(accessToken: string): void {
  if (!ws) {
    console.debug('not connected to WebSocket during access token update.');
    throw new Error('Not connected to websockets');
  }
  console.debug('Updating access token');

  sendMessage<UpdateAccessTokenRequest>({
    rpcAction: 'updateAccessToken',
    request: {
      accessToken,
      clientId,
      application
    }
  });
}

function subscribe(subscriptionDetails: SubscriptionDetails): void {
  isConnectionReadyPromise
    ?.then(() => {
      const subscriptionMapKey = getSubscriptionMapKey(subscriptionDetails);
      const uniqueSubscriptionDetails = getUniqueSubscriptionDetails(subscriptionMapKey, subscriptionDetails);

      const numberOfSubscriptions = (subscriptions.get(uniqueSubscriptionDetails) || 0) + 1;

      if (numberOfSubscriptions === 1) {
        console.log(`subscribing: ${clientId}`);
        sendMessage({
          rpcAction: 'subscribe',
          request: {
            clientId,
            subscriptions: [uniqueSubscriptionDetails],
            accessToken
          }
        });
      }
      subscriptions.set(uniqueSubscriptionDetails, numberOfSubscriptions);
    })
    .catch(() => console.log('Could not subscribe. Client not connected'));
}

function unsubscribe(subscriptionDetails: SubscriptionDetails): void {
  isConnectionReadyPromise
    ?.then(() => {
      const subscriptionMapKey = getSubscriptionMapKey(subscriptionDetails);
      const uniqueSubscriptionDetails = getUniqueSubscriptionDetails(subscriptionMapKey, subscriptionDetails);
      let numOfSubscriptions = subscriptions.get(uniqueSubscriptionDetails);
      if (!numOfSubscriptions) {
        return;
      }
      numOfSubscriptions--;
      if (!numOfSubscriptions) {
        sendMessage({
          rpcAction: 'unsubscribe',
          request: {
            clientId,
            subscriptions: [uniqueSubscriptionDetails],
            accessToken
          }
        });
        subscriptions.delete(uniqueSubscriptionDetails);
        uniqueSubscriptions.delete(subscriptionMapKey);
      } else {
        subscriptions.set(uniqueSubscriptionDetails, numOfSubscriptions);
      }
    })
    .catch(console.log);
}

function getSubscriptionMapKey(subscriptionDetails: SubscriptionDetails): string {
  return subscriptionDetails.type + (subscriptionDetails.objId ? `_${subscriptionDetails.objId}` : '');
}

function getUniqueSubscriptionDetails(
  subscriptionMapKey: string,
  subscriptionDetails: SubscriptionDetails
): SubscriptionDetails {
  return getOrSetDefault(uniqueSubscriptions, subscriptionMapKey, subscriptionDetails);
}

async function onError(error: unknown): Promise<void> {
  console.warn('WebSockets error: ', error);
  if (retrying) {
    return;
  }

  const isAuthenticated = !!accessToken;

  if (!isAuthenticated) {
    console.debug('on error not authenticated - 1');
    return;
  }

  retrying = true;
  try {
    while (!ws) {
      if (retrying) {
        await sleep(5000);
      }
      if (!isAuthenticated) {
        console.debug('on error not authenticated - 2');
        break;
      }
      console.log('Retrying websockets connection....');

      if (!ws) {
        await connect({ getAccessToken });
      }
    }
  } finally {
    retrying = false;
  }
}

function acknowledgeMessage(notification: WebSocketsMessage): void {
  if (!Object.keys(notification).length) {
    return;
  }

  console.debug('WebSocketService: acknowledgeMessage', notification?.subscription?.type, notification);

  sendMessage<WebSocketsAcknowledgeMessageRequest>({
    rpcAction: 'acknowledgeMessage',
    request: {
      clientId,
      messageId: notification.messageId
    }
  });
}

function resubscribeIfNeeded(): void {
  if (!subscriptions.size) {
    return;
  }

  if (!accessToken) {
    return;
  }

  console.log('Resubscribing after reconnect, subscriptions: ', subscriptions.size);

  sendMessage<WebSocketsSubscribeRequest>({
    rpcAction: 'subscribe',
    request: {
      clientId,
      subscriptions: [...subscriptions.keys()],
      accessToken,
      application
    }
  });
}

function addNotificationListener<PayloadType>(
  subscriptionDetails: SubscriptionDetails,
  handler: WebSocketsNotificationHandler<PayloadType>
): () => void {
  const id = uuidv4();
  listeners = [...listeners, { id, subscriptionDetails, handler }];
  subscribe(subscriptionDetails);

  return () => {
    listeners = listeners.filter(({ id: listenerId }) => listenerId !== id);
    unsubscribe(subscriptionDetails);
  };
}

export interface WebSocketsCpClient {
  disconnect: () => void;
  addNotificationListener: <PayloadType>(
    subscriptionDetails: SubscriptionDetails,
    handler: WebSocketsNotificationHandler<PayloadType>
  ) => () => void;
}

export async function getOrCreateWsClient(connectOptions: ConnectOptions): Promise<WebSocketsCpClient> {
  await connect(connectOptions);

  return {
    disconnect,
    addNotificationListener
  };
}
