import { useTenantNamespace } from './useTenantNamespace'
/* Types */
import { useCloudApi } from '@/composables/cloudApi'
import { useAnalytics } from '@/composables/analytics'
import type {
  V1alpha1PulsarCluster,
  V1alpha1AuditLog,
  V1ObjectMeta,
  V1alpha1PulsarInstance,
  V1alpha1MaintenanceWindow
} from '@streamnative/cloud-api-client-typescript'
import { i18n } from '@/lang'
import type { ComputedRef, Ref } from 'vue'
import { cpuParser, memoryParser } from 'k8s-resource-parser'
import AsyncLock from 'async-lock'
import { computedAsync, useIntervalFn } from '@vueuse/core'
import { getErrorMessage } from '@/utils/apiHelper'
import type { PulsarState } from './usePulsarState'
import brokerOptions from '@/data/brokerOptions.json'
import bookieOptions from '@/data/bookieOptions.json'
import Decimal from 'decimal.js'
import { view, PulsarCluster, adminTenant, PulsarClusterWrapper, useRbac } from './useRbac'
import { isEqual } from 'lodash-es'
import { isIstioEnabledForInstance } from '@/composables/useInstance'
import dayjs from 'dayjs'
import { ConnectorClientFactory } from '@/factory/connector-client-factory'

let lastOrg: string | undefined = undefined
let lastCluster: string | undefined = undefined

const { t } = i18n.global
const lock = new AsyncLock()

export const memoryPerUnit = 8589934592 // 8gb
export const memoryPerUnitGb = 8
export const cpuPerUnit = 2000 // 2 full cpu
// see here https://github.com/streamnative/cloud-manager/issues/2974 for details
export const cpuPerUnitRaw = 2
export const maxBrokerCu = 8
export const maxBookieSu = 8
export const throughputPerSu = 125
export const bkReplFactor = 2
export const memoryGbPerSu = 8
export const brokerMinimumNodeCount = 2
export const bookieMinimumNodeCount = 3
export const brokerMaximumNodeCount = 15
export const bookieMaximumNodeCount = 15
export const minThroughput = 5
export const minBrokerCu = 0.5
export const minFreeBrokerCu = 0.1
export const minBookieSu = 0.5
export const minResourceUnit = Math.min(minBrokerCu, minBookieSu)
export const minFreeCU = 0.0298
export const freeResources = {
  brokerCU: minFreeBrokerCu,
  brokerNodeCount: 1,
  bookieSU: undefined as undefined | number,
  bookieNodeCount: undefined as undefined | number,
  config: {} as ClusterConfig
}

// Allow cluster ready for don't block cluster page access on some cases
export const annotationClusterReady = 'cloud.streamnative.io/ready'

// For serverless, let's use pre created cluster as tenant allowed cluster
export const preCreatedClusterNameLabel = 'cloud.streamnative.io/pre-created-pulsar-cluster'

const CN_POOLMEMBER_DNS = '.cn'
export interface Cluster extends V1alpha1PulsarCluster {
  metadata: V1ObjectMeta & {
    name: string
    namespace: string
  }
  generated: {
    webServiceUrl?: string // 'https://${dnsName}'
    brokerServiceUrl?: string // 'pulsar+ssl://${dnsName}:6651'
    websocketServiceUrl?: string // 'wss://${dnsName}:9443'
    kopServiceUrl?: string
    kafkaSchemaRegistryUrl?: string
    mqttServiceUrl?: string
    conditions: ClusterConditions
    deleted: boolean
    brokerVersion?: string // ie. '2.8.0.9'
    bookKeeperVersion?: string // ie. '2.8.0.9'
    isKopEnabled?: boolean
    isMopEnabled?: boolean
    isAmqpEnabled?: boolean
    functionEnabled?: boolean
    recurrenceMap?: Record<string, boolean>
    // eslint-disable-next-line prettier/prettier
    bookieNodeType: (typeof bookieOptions)[number] | undefined
    // eslint-disable-next-line prettier/prettier
    brokerNodeType: (typeof brokerOptions)[number] | undefined
    kafkaConnectConfig?: KafkaConnectConfig
    topicsCount?: number
    subscriptionsCount?: number
    producersCount?: number
    consumersCount?: number
  }
}

interface KafkaConnectConfig {
  additionalServlets?: string
  kafkaConnectCustomLabels?: string
  kafkaConnectDefaultNamespace?: string
  kafkaConnectDefaultServiceAccountName?: string
  kafkaConnectDefaultTenant?: string
  kafkaConnectKubernetesJobNamespace?: string
  snRBACAuthInterceptorEnabled?: string
}

export interface ClusterConfig {
  websocketEnabled?: boolean
  // todo: these are never used anywhere?
  transactionEnabled?: boolean
  functionEnabled?: boolean
  auditLog?: V1alpha1AuditLog
  auditLogEnabled?: boolean
  auditLogDescribing?: boolean
  auditLogProducing?: boolean
  auditLogConsuming?: boolean
  custom?: {
    backlogQuotaDefaultLimitBytes?: string | number
    backlogQuotaDefaultRetentionPolicy?: string
    maxProducersPerTopic?: string | number
    maxConsumersPerTopic?: string | number
    maxConsumersPerSubscription?: string | number
    dispatchThrottlingRatePerTopicInByte?: string | number
    dispatchThrottlingRatePerTopicInMsg?: string | number
    dispatchThrottlingRatePerSubscriptionInByte?: string | number
    dispatchThrottlingRatePerSubscriptionInMsg?: string | number
    subscribeThrottlingRatePerConsumer?: string | number
    topicLevelPoliciesEnabled?: string | boolean
    snAuditLogConfig?: string
  }
  protocols?: {
    // Object is the correct type, we serialize this object
    // to a struct on the cloud-api-server end.
    // eslint-disable-next-line @typescript-eslint/ban-types
    kafka?: {}

    // Object is the correct type, we serialize this object
    // to a struct on the cloud-api-server end.
    // eslint-disable-next-line @typescript-eslint/ban-types
    mqtt?: {}

    // Object is the correct type, we serialize this object
    // to a struct on the cloud-api-server end.
    // eslint-disable-next-line @typescript-eslint/ban-types
    amqp?: {}
  }
}

// TODO add kafkaconnect config to cluster
export interface ClusterResourcePayload {
  organization: string
  instance: string
  name: string
  location: string
  poolMemberRef?: {
    namespace: string
    name: string
  }
  brokerCpu: string
  brokerMemory: string
  brokerPods: number
  bookieCpu?: string
  bookieMemory?: string
  bookiePods?: number
  brokerAutoScailingPolicy?: ClusterAutoScalingPolicy
  config?: ClusterConfig
  releaseChannel?: string
  maintenanceWindow?: V1alpha1MaintenanceWindow
  displayName?: string
  isServerless?: boolean
  isURSAFeaturePossible?: boolean
  engine?: 'ursa' | 'classic'
}

export interface ClusterAutoScalingPolicy {
  enabled: boolean
  replicas: number[]
}

export interface ClusterConditions {
  Ready: boolean
  ServiceEndpointReady: boolean
  ZookeeperReady: boolean
  BookKeeperReady: boolean
  PulsarBrokerReady: boolean
  PulsarProxyReady: boolean
  OxiaReady: boolean
  // true if BookKeeperAvailable && ZookeeperAvailable && PulsarBrokerAvailable when using independent bookkeeper and zookeeper components
  // or true if PulsarBrokerAvailable when using a shared oxia component
  Available: boolean
  BookKeeperAvailable: boolean // true if >= 2
  ZookeeperAvailable: boolean // true if >= 2.  Technically "total node count / 2 + 1" but our node count sound be 3
  PulsarBrokerAvailable: boolean // true if >= 1
}

/* State */
const clusterMap: Ref<Record<string, Array<Cluster>>> = ref({})

const currentClusterResource: ComputedRef<{
  cu: number | undefined
  su: number | undefined
  cuThroughput: number | undefined
  suThroughput: number | undefined
}> = computed(() => {
  const instanceName = usePulsarState().instance.value
  const clusterUid = usePulsarState().clusterUid.value
  const cluster = (clusterMap.value[instanceName || ''] || []).find(clu => {
    return clu.metadata?.uid === clusterUid
  })
  if (!instanceName || !clusterUid || !cluster) {
    return { cu: undefined, su: undefined, cuThroughput: undefined, suThroughput: undefined }
  }

  return getClusterResource(cluster)
})

const getClusterResource = (cluster: Cluster) => {
  const cu = getCu(cluster)
  const su = getSu(cluster)
  const cuThroughput = cu
    ? Math.round(((cu * throughputPerSu) / bkReplFactor) * 10) / 10
    : undefined
  const suThroughput = su
    ? Math.round(((su * throughputPerSu) / bkReplFactor) * 10) / 10
    : undefined
  return { cu, su, cuThroughput, suThroughput }
}

const getCu = (cluster: Cluster): number | undefined => {
  // formula: https://streamnative.slab.com/posts/pricing-and-pricing-strategy-ff8ryce6
  if (cluster.spec?.broker.resources) {
    const cpu = cpuParser(cluster.spec?.broker.resources.cpu)
    const memory = memoryParser(cluster.spec?.broker.resources.memory)
    const cu = Math.max(cpu / cpuPerUnitRaw, memory / memoryPerUnit)
    return Math.round(cu * 100) / 100
  }
  const brokerOption = brokerOptions.find(
    brokerOption => brokerOption.name === cluster.generated.brokerNodeType?.name
  )
  if (brokerOption) {
    const cpu = brokerOption.cpu
    const memory = memoryParser(brokerOption.memory)
    const cu = Math.max(cpu / cpuPerUnitRaw, memory / memoryPerUnit)
    return Math.round(cu * 100) / 100
  }

  // this most likely means error
  return undefined
}

const getSu = (cluster: Cluster): number | undefined => {
  // formula: https://streamnative.slab.com/posts/pricing-and-pricing-strategy-ff8ryce6
  if (cluster.spec?.bookKeeperSetRef) {
    // using shared bookie, probably free cluster, returning 0
    return 0
  }
  if (cluster.spec?.bookkeeper?.resources) {
    const cpu = cpuParser(cluster.spec?.bookkeeper?.resources.cpu)
    const memory = memoryParser(cluster.spec?.bookkeeper?.resources.memory)
    const cu = Math.max(cpu / cpuPerUnitRaw, memory / memoryPerUnit)
    return Math.round(cu * 100) / 100
  }
  const bookieOption = bookieOptions.find(
    bookieOption => bookieOption.name === cluster.generated.bookieNodeType?.name
  )
  if (bookieOption) {
    const cpu = bookieOption.cpu
    const memory = memoryParser(bookieOption.memory)
    const cu = Math.max(cpu / cpuPerUnitRaw, memory / memoryPerUnit)
    return Math.round(cu * 100) / 100
  }

  // this most likely means error
  return undefined
}

const mustGetSelectedInstance = (): string => {
  return usePulsarState().mustInstance()
}

const clusters: ComputedRef<Array<Cluster>> = computed(() => {
  const { instance } = usePulsarState()
  if (!instance.value || !clusterMap.value[instance.value]) {
    return []
  }
  return clusterMap.value[instance.value].sort((a, b) => {
    return a.metadata.name.localeCompare(b.metadata.name)
  })
})

/* Getters */
const clusterNames: ComputedRef<Array<string>> = computed(() => {
  const { instance } = usePulsarState()
  if (!instance.value || !clusterMap.value[instance.value]) {
    return []
  }

  return clusterMap.value[instance.value].map(clu => clu.metadata.name).sort()
})

const activeCluster: ComputedRef<Cluster | undefined> = computed(() => {
  const { instance, clusterUid } = usePulsarState()
  if (!instance.value) {
    return undefined
  }

  if (!clusterMap.value[instance.value]?.values) {
    return undefined
  }

  const targetCluster = clusterMap.value[instance.value]?.find(clu => {
    return clu.metadata?.uid === clusterUid.value
  })

  if (targetCluster) {
    return targetCluster
  }

  return undefined
})

const isClusterReadyByNameFn = (name: string, instance: string) => {
  const cluster = clusterMap.value[instance]?.find(
    clusterItem => clusterItem.metadata.name === name
  )
  return isClusterReadyFn(cluster)
}

const isClusterAvailableByNameFn = (name: string, instance: string) => {
  const cluster = clusterMap.value[instance]?.find(
    clusterItem => clusterItem.metadata.name === name
  )
  return isClusterAvailableFn(cluster)
}

const isClusterReadyFn = (cluster: Cluster | undefined) => {
  if (!cluster) {
    return false
  }
  const annotations = cluster?.metadata?.annotations
  const assumeClusterReady = annotations && annotations[annotationClusterReady] === 'true'
  if (assumeClusterReady) {
    return true
  }
  return cluster.generated.conditions.Ready
}

const isClusterAvailableFn = (cluster: Cluster | undefined) => {
  if (!cluster) {
    return false
  }
  return isClusterReadyFn(cluster) || cluster.generated.conditions.Available
}

const isActiveClusterReady: ComputedRef<boolean> = computed(() => {
  return isClusterReadyFn(activeCluster.value)
})

const isActiveClusterAvailable: ComputedRef<boolean> = computed(() => {
  return isClusterAvailableFn(activeCluster.value)
})

const functionEnabled: ComputedRef<boolean> = computed(() => {
  if (
    activeCluster.value &&
    activeCluster.value.spec?.config &&
    activeCluster.value.spec?.config.functionEnabled
  ) {
    return true
  }
  return false
})

/* Mutations */
const setCluster = (cluster: Cluster, instanceName: string = mustGetSelectedInstance()) => {
  const clusterIndex = clusterMap.value[instanceName]
    ? clusterMap.value[instanceName].findIndex(
        clusterItem => clusterItem.metadata.name === cluster.metadata.name
      )
    : -1
  if (clusterIndex > -1) {
    clusterMap.value[instanceName].splice(clusterIndex, 1, cluster)
  } else {
    clusterMap.value[instanceName] = [cluster]
  }
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const setActiveClusterDeleted = (cluster: Cluster) => {
  cluster.generated.deleted = true
}

const isClusterDeletedFn = (cluster: Cluster | undefined) => {
  if (!cluster) {
    return false
  }
  return cluster.generated.deleted === true
}

const isClusterDeletedByNameFn = (name: string, instance: string) => {
  const cluster = clusterMap.value[instance]?.find(
    clusterItem => clusterItem.metadata.name === name
  )
  return isClusterDeletedFn(cluster)
}

const createClusterByResources = async (payload: ClusterResourcePayload): Promise<Cluster> => {
  try {
    const body = await validateAndGetClusterPayloadResource(payload, true)
    const newCluster = formatClusterModel(
      (await useCloudApi().createNamespacedPulsarCluster(payload.organization, body)).data
    )
    setCluster(newCluster, payload.instance)
    return newCluster
  } catch (e) {
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore: Object is of type 'unknown'
    if (e && e.response && e.response.status === 409) {
      throw Error(t('cluster.duplicateClusterError'))
    }
    throw Error(getErrorMessage(e))
  }
}

const getJsonPatch = (path: string, cluster: Cluster, updatePayload: ClusterResourcePayload) => {
  const tokenizedPath = path.split('/')
  const lastPath = tokenizedPath.pop()

  let tempClusterValue: any = cluster
  let tempUpdatePayloadValue: any = updatePayload
  tokenizedPath.forEach(p => {
    if (!p) {
      return
    } else if (p === 'spec') {
      tempClusterValue = tempClusterValue?.[p]
    } else {
      tempClusterValue = tempClusterValue?.[p]
      tempUpdatePayloadValue = tempUpdatePayloadValue?.[p]
    }
  })

  const desiredValue = tempUpdatePayloadValue?.[lastPath as string]
  const currentValue = tempClusterValue?.[lastPath as string]

  if (isEqual(currentValue, desiredValue)) {
    // nothing to do
    return []
  } else if (currentValue && !desiredValue) {
    // value is removed
    return [{ op: 'remove', path }]
  } else {
    // value is updated
    return [{ op: 'replace', path, value: desiredValue }]
  }
}

const updateClusterConfigByResource = async (payload: ClusterResourcePayload): Promise<Cluster> => {
  // 'custom' attrs numbers must be strings in request
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  const patchArray: Record<string, any> = []
  const clusterObj = clusterMap.value[payload.instance].find(c => c.metadata.name === payload.name)
  if (!clusterObj) {
    throw Error('cluster to update is not found')
  }
  if (!clusterObj?.spec?.config) {
    patchArray.push({
      op: 'add',
      path: `/spec/config`,
      value: {}
    })
  }

  if (!clusterObj?.spec?.config?.custom) {
    patchArray.push({
      op: 'add',
      path: `/spec/config/custom`,
      value: {}
    })
  }

  Object.entries(payload.config?.custom || {}).map(([key, value]) => {
    if (value === undefined) {
      if ((clusterObj?.spec?.config?.custom ?? {})[key]) {
        patchArray.push({
          op: 'remove',
          path: `/spec/config/custom/${key}`
        })
      }
    } else {
      patchArray.push({
        op: 'replace',
        path: `/spec/config/custom/${key}`,
        value: String(value)
      })
    }
  })

  if (!clusterObj.spec?.config?.protocols) {
    patchArray.push({
      op: 'add',
      path: '/spec/config/protocols',
      value: {}
    })
  }
  patchArray.push(...getJsonPatch(`/spec/config/protocols/kafka`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/config/protocols/mqtt`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/config/protocols/amqp`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/config/websocketEnabled`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/config/auditLog`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/config/transactionEnabled`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/config/functionEnabled`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/config/maintenanceWindow`, clusterObj, payload))

  patchArray.push({
    op: 'replace',
    path: '/spec/broker/replicas',
    value: payload.brokerPods
  })
  patchArray.push({
    op: 'replace',
    path: '/spec/bookkeeper/replicas',
    value: payload.bookiePods
  })

  if (clusterObj?.spec?.broker?.resources) {
    patchArray.push({
      op: 'replace',
      path: '/spec/broker/resources/cpu',
      value: payload.brokerCpu
    })
    patchArray.push({
      op: 'replace',
      path: '/spec/broker/resources/memory',
      value: payload.brokerMemory
    })
  } else {
    patchArray.push({
      op: 'add',
      path: '/spec/broker/resources',
      value: { cpu: payload.brokerCpu, memory: payload.brokerMemory }
    })
  }

  if (payload.brokerAutoScailingPolicy?.enabled) {
    if (clusterObj?.spec?.broker.autoScalingPolicy) {
      patchArray.push({
        op: 'replace',
        path: '/spec/broker/autoScalingPolicy',
        value: {
          minReplicas: payload.brokerAutoScailingPolicy.replicas[0],
          maxReplicas: payload.brokerAutoScailingPolicy.replicas[1]
        }
      })
    } else {
      patchArray.push({
        op: 'add',
        path: '/spec/broker/autoScalingPolicy',
        value: {
          minReplicas: payload.brokerAutoScailingPolicy.replicas[0],
          maxReplicas: payload.brokerAutoScailingPolicy.replicas[1]
        }
      })
    }
  } else if (clusterObj?.spec?.broker.autoScalingPolicy) {
    patchArray.push({
      op: 'remove',
      path: '/spec/broker/autoScalingPolicy'
    })
  }

  if (clusterObj?.spec?.bookkeeper?.resources) {
    patchArray.push({
      op: 'replace',
      path: '/spec/bookkeeper/resources/cpu',
      value: payload.bookieCpu
    })
    patchArray.push({
      op: 'replace',
      path: '/spec/bookkeeper/resources/memory',
      value: payload.bookieMemory
    })
  } else {
    patchArray.push({
      op: 'add',
      path: '/spec/bookkeeper/resources',
      value: { cpu: payload.bookieCpu, memory: payload.bookieMemory }
    })
  }
  try {
    const { data } = await useCloudApi().patchNamespacedPulsarCluster(
      payload.name,
      payload.organization,
      patchArray
    )
    const clusterModel = formatClusterModel(data)
    setCluster(clusterModel)
    return clusterModel
  } catch (e) {
    throw Error(getErrorMessage(e))
  }
}

const deleteCluster = async (cluster: Cluster) => {
  await useCloudApi().deleteNamespacedPulsarCluster(
    cluster.metadata.name,
    cluster.metadata.namespace
  )
  setActiveClusterDeleted(cluster)
}

const deleteClusterByName = async (name: string, instance: string) => {
  const _cluster = clusterMap.value[instance].find(item => item.metadata.name === name)
  if (_cluster) {
    await useCloudApi().deleteNamespacedPulsarCluster(
      _cluster.metadata.name,
      _cluster.metadata.namespace
    )
  }
  if (_cluster && _cluster.metadata.name === activeCluster.value?.metadata.name) {
    setActiveClusterDeleted(_cluster)
  }
}

const getClusterMap = async ({ organization }: { organization: string }) => {
  const cm = await getClusterMapRaw({ organization })
  clusterMap.value = cm
}

const getClusterMapRaw = async ({ organization }: { organization: string }) => {
  const { data } = await useCloudApi().listNamespacedPulsarCluster(organization)
  const { data: instanceRes } = await useCloudApi().listNamespacedPulsarInstance(organization)

  useAnalytics().identifyUser()

  const cm: Record<string, Array<Cluster>> = {}
  data.items.forEach(cluster => {
    if (!cluster.spec?.instanceName) {
      // invalid instance name is passed in
      console.warn(`invalid instance name is passed in: ${cluster}`)
      return
    }
    const instanceName = cluster.spec?.instanceName

    if (!cm[instanceName]) {
      cm[instanceName] = []
    }

    // for every organization only _1_ instance should have the same name, or there is a bug
    cm[instanceName].push(
      formatClusterModelWithInstance(
        cluster,
        instanceRes.items.filter(i => i.metadata?.name === instanceName)[0]
      )
    )
  })

  return cm
}

const convertMillis = (value: string) => {
  if (!value.endsWith('m')) {
    return value
  }
  // allow 10th decimal
  return new Decimal(value.replace('m', '')).div(100).round().div(10).toString()
}

/**
 * Validates and converts UI's custom pulsar cluster object to V1alpha1PulsarCluster
 * @param payload Custom object that represents a pulsar cluster in UI
 * @param isCreate if is for create or update cluster
 * @returns V1alpha1PulsarCluster our API accepts and we should be using
 */
const validateAndGetClusterPayloadResource = async (
  payload: ClusterResourcePayload,
  isCreate: boolean
): Promise<V1alpha1PulsarCluster> => {
  if (payload.isServerless) {
    // serverless cluster does not require name, just use display name
    if (!payload.displayName) {
      throw Error(t('cluster.clusterNameIsRequired'))
    }
  } else {
    if (!payload.name) {
      throw Error(t('cluster.clusterNameIsRequired'))
    }
  }

  if (!payload.location) {
    throw Error(t('cluster.clusterLocationIsRequired'))
  }

  const { data } = await useCloudApi().readNamespacedPulsarInstance(
    payload.instance,
    payload.organization
  )
  const isFree = data.spec?.type === 'free'

  // TODO need to scope this out better
  if (isFree && (Number(payload.brokerCpu) > 0.2 || payload.bookieCpu)) {
    throw Error(t('cluster.unableToCreateFreeBrokerResource')) // TODO refine this error message
  }

  const configValue = isFree
    ? undefined
    : JSON.parse(JSON.stringify(payload.config, (k, v) => (typeof v !== 'number' ? v : '' + v)))

  Object.entries(configValue?.custom ?? {}).forEach(([key, value]) => {
    if (!value) {
      delete configValue.custom[key]
    }
  })

  if (isCreate) {
    if (
      !isFree &&
      (payload.bookieCpu === undefined ||
        payload.bookieMemory === undefined ||
        payload.bookiePods === undefined)
    ) {
      throw Error(t('cluster.bookieResourcesRequired')) // TODO refine this error message
    }

    // For create, return minimal cluster object needed to create
    const cluster: V1alpha1PulsarCluster = {
      kind: 'PulsarCluster',
      metadata: {
        name: payload.name,
        namespace: payload.organization
      },
      spec: {
        instanceName: payload.instance,
        displayName: payload.displayName,
        location: payload.location,
        poolMemberRef: payload.poolMemberRef,
        config: configValue,
        broker: {
          replicas: payload.brokerPods,
          resources: {
            cpu: payload.brokerCpu,
            memory: payload.brokerMemory
          }
        },
        bookkeeper:
          isFree ||
          payload.bookieCpu === undefined ||
          payload.bookieMemory === undefined ||
          payload.bookiePods === undefined
            ? undefined
            : {
                replicas: payload.bookiePods,
                resources: {
                  cpu: payload.bookieCpu,
                  memory: payload.bookieMemory
                }
              }
      }
    }

    if (payload.isURSAFeaturePossible && payload.engine === 'ursa') {
      // eslint-disable-next-line @typescript-eslint/ban-ts-comment
      // @ts-ignore defined on line 31
      cluster.metadata.annotations = {
        'cloud.streamnative.io/engine': 'ursa'
      }
    }

    if (payload.brokerAutoScailingPolicy?.enabled && cluster.spec) {
      cluster.spec.broker = Object.assign(cluster.spec.broker || {}, {
        autoScalingPolicy: {
          minReplicas: payload.brokerAutoScailingPolicy.replicas[0],
          maxReplicas: payload.brokerAutoScailingPolicy.replicas[1]
        }
      })
    }
    if (payload.releaseChannel && cluster.spec) {
      cluster.spec.releaseChannel = payload.releaseChannel
    }
    if (payload.maintenanceWindow && cluster.spec) {
      cluster.spec.maintenanceWindow = payload.maintenanceWindow
    }
    if (payload.isServerless) {
      delete cluster.spec?.config
    }
    return cluster
  }

  // For updates, there are some immutable values, such as poolRef, and we should maintain
  // those values.  Thus we are fetching the base model and modifying what we can on top.
  const cluster = (
    await useCloudApi().readNamespacedPulsarCluster(payload.name, payload.organization)
  ).data

  if (!cluster.spec) {
    throw Error(t('cluster.specIsMissing'))
  }

  cluster.spec.config = configValue
  if (isFree) {
    // free
    cluster.spec.bookkeeper = undefined
  } else {
    if (
      payload.bookieCpu === undefined ||
      payload.bookieMemory === undefined ||
      payload.bookiePods === undefined
    ) {
      throw Error(t('cluster.bookieResourcesRequired')) // TODO refine this error message
    }

    // not free
    cluster.spec.bookkeeper = Object.assign(cluster.spec.bookkeeper || {}, {
      replicas: payload.bookiePods,
      resources: Object.assign(cluster.spec?.bookkeeper?.resources || {}, {
        cpu: payload.bookieCpu,
        memory: payload.bookieMemory
      })
    })
  }
  cluster.spec.broker = Object.assign(cluster.spec.broker || {}, {
    replicas: payload.brokerPods,
    resources: Object.assign(cluster.spec?.broker?.resources || {}, {
      cpu: payload.brokerCpu,
      memory: payload.brokerMemory
    })
  })

  if (payload.brokerAutoScailingPolicy?.enabled && cluster.spec) {
    cluster.spec.broker = Object.assign(cluster.spec.broker || {}, {
      autoScalingPolicy: {
        minReplicas: payload.brokerAutoScailingPolicy.replicas[0],
        maxReplicas: payload.brokerAutoScailingPolicy.replicas[1]
      }
    })
  }
  if (payload.releaseChannel && cluster.spec) {
    cluster.spec.releaseChannel = payload.releaseChannel
  }
  if (payload.maintenanceWindow && cluster.spec) {
    cluster.spec.maintenanceWindow = payload.maintenanceWindow
  }
  return cluster
}

export const formatClusterModelWithInstance = (
  cluster: V1alpha1PulsarCluster,
  instance?: V1alpha1PulsarInstance
): Cluster => {
  if (!cluster.metadata?.uid) {
    throw new Error('Cluster is missing UID')
  }

  const brokerReadyReplicas = cluster?.status?.broker?.readyReplicas
  const bookkeeperReadyReplicas = cluster?.status?.bookkeeper?.readyReplicas
  const zookeeperReadyReplicas = cluster?.status?.zookeeper?.readyReplicas

  const conditions = (cluster.status?.conditions?.reduce(
    (curr: Record<string, boolean> = {}, condition) => {
      curr[condition.type] = condition.status === 'True'
      return curr
    },
    {}
  ) ?? {
    Ready: false,
    ServiceEndpointReady: false,
    ZookeeperReady: false,
    BookKeeperReady: false,
    PulsarBrokerReady: false,
    PulsarProxyReady: false,
    Available: false,
    BookKeeperAvailable: false,
    ZookeeperAvailable: false,
    PulsarBrokerAvailable: false
  }) as unknown as ClusterConditions

  conditions.PulsarBrokerAvailable = brokerReadyReplicas !== undefined && brokerReadyReplicas > 0
  conditions.BookKeeperAvailable =
    bookkeeperReadyReplicas !== undefined && bookkeeperReadyReplicas > 1
  conditions.ZookeeperAvailable = zookeeperReadyReplicas !== undefined && zookeeperReadyReplicas > 1

  // senario using shared oxia
  if (bookkeeperReadyReplicas === undefined && zookeeperReadyReplicas === undefined) {
    conditions.Available = conditions.PulsarBrokerAvailable
  } else if (zookeeperReadyReplicas === undefined) {
    conditions.Available =
      conditions.PulsarBrokerAvailable && conditions.BookKeeperAvailable && conditions.OxiaReady
  } else {
    conditions.Available =
      conditions.PulsarBrokerAvailable &&
      conditions.BookKeeperAvailable &&
      conditions.ZookeeperAvailable
  }

  if (cluster.metadata?.annotations?.[annotationClusterReady] === 'true') {
    // override to all true if manual override annotation is set
    conditions.Ready = true
    conditions.ServiceEndpointReady = true
    conditions.ZookeeperReady = true
    conditions.BookKeeperReady = true
    conditions.PulsarBrokerReady = true
    conditions.PulsarProxyReady = true
    conditions.Available = true
    conditions.BookKeeperAvailable = true
    conditions.ZookeeperAvailable = true
    conditions.PulsarBrokerAvailable = true
  }

  let dnsName = ''
  cluster.spec?.serviceEndpoints?.forEach(endpoint => {
    if (endpoint.type === 'service') {
      dnsName = endpoint.dnsName
    }
  })
  let port = useInstance().istioEnabled.value ? 443 : 9443
  if (instance) {
    port = isIstioEnabledForInstance(instance) ? 443 : 9443
  }

  // we allow k8 resource syntaxes, such as `32Gi`, `2000m`, `16` and etc.  However, for larger numeric values
  // causees underlying library we depends on to error on calculation.
  // i.e. `require('k8s-resource-parser').memoryParser('30923764531200m') === NaN`
  // Because of this, we are reducing probability of running into this issue by converting from milli.
  if (cluster.spec?.broker?.resources?.cpu) {
    cluster.spec.broker.resources.cpu = convertMillis(cluster.spec.broker.resources.cpu)
  }
  if (cluster.spec?.broker?.resources?.memory) {
    cluster.spec.broker.resources.memory = convertMillis(cluster.spec.broker.resources.memory)
  }
  if (cluster.spec?.bookkeeper?.resources?.cpu) {
    cluster.spec.bookkeeper.resources.cpu = convertMillis(cluster.spec.bookkeeper.resources.cpu)
  }
  if (cluster.spec?.bookkeeper?.resources?.memory) {
    cluster.spec.bookkeeper.resources.memory = convertMillis(
      cluster.spec.bookkeeper.resources.memory
    )
  }

  const res = {
    ...cluster,
    metadata: {
      // this is an annoying part of the generated V1alpha1PulsarCluster and other kubernete objects.
      // although name and namespace is a required field, it is marked as optional.  Overriding this
      // optionality so that we don't have to do same check over and over when using name and namespace.
      ...cluster.metadata,
      name: cluster.metadata.name as string,
      namespace: cluster.metadata.namespace as string
    },
    generated: {
      conditions,
      webServiceUrl: dnsName && `https://${dnsName}`,
      brokerServiceUrl: dnsName && `pulsar+ssl://${dnsName}:6651`,
      websocketServiceUrl:
        cluster.spec?.config?.websocketEnabled && dnsName ? `wss://${dnsName}:${port}` : undefined,
      kopServiceUrl:
        !!cluster.spec?.config?.protocols?.kafka && dnsName ? `${dnsName}:9093` : undefined,
      kafkaSchemaRegistryUrl:
        !!cluster.spec?.config?.protocols?.kafka && dnsName
          ? `https://${dnsName}/kafka`
          : undefined,
      mqttServiceUrl:
        !!cluster.spec?.config?.protocols?.mqtt && dnsName ? `${dnsName}:8883` : undefined,
      deleted: !!cluster.metadata?.deletionTimestamp,
      brokerVersion: cluster.spec?.broker?.image?.split(':')[1],
      bookKeeperVersion: cluster.spec?.bookkeeper?.image?.split(':')[1],
      bookieNodeType: bookieOptions.find(
        bo => bo.name === cluster.spec?.bookkeeper?.resourceSpec?.nodeType
      ),
      brokerNodeType: brokerOptions.find(
        bo => bo.name === cluster.spec?.broker?.resourceSpec?.nodeType
      ),
      isKopEnabled: !!cluster.spec?.config?.protocols?.kafka,
      functionEnabled: cluster.spec?.config?.functionEnabled,
      recurrenceMap: convertRecurrenceFromStringToMap(cluster.spec?.maintenanceWindow?.recurrence),
      isMopEnabled: !!cluster.spec?.config?.protocols?.mqtt,
      isAmqpEnabled: !!cluster.spec?.config?.protocols?.amqp,
      kafkaConnectConfig: getKafkaConnectConfig(cluster.spec?.config?.custom)
    }
  }
  return res
}

const firstClusterFn = (instance: string) => {
  const _clusters = clusterMap.value[instance] as Cluster[]
  if (!_clusters || !_clusters.length) {
    return undefined
  }
  // here is important, deep clone the cluster to avoid the clusters sort update cause the watch exec cause UI stuck
  return (JSON.parse(JSON.stringify(_clusters)) as Cluster[]).sort((clsA, clsB) => {
    return (
      dayjs(clsA.metadata.creationTimestamp).unix() - dayjs(clsB.metadata.creationTimestamp).unix()
    )
  })[0]
}

const isFirstClusterByNameFn = (instance: string, clusterName: string) => {
  const first = firstClusterFn(instance)
  if (!first) {
    return true
  }
  return first.metadata.name === clusterName
}

const isFirstClusterFn = (instance: string, cluster: Cluster) => {
  const first = firstClusterFn(instance)
  if (!first) {
    return true
  }
  return first.metadata.name === cluster.metadata.name
}

const isFirstClusterAvailableFn = (instance: string) => {
  const first = firstClusterFn(instance)
  if (!first) {
    return true
  }
  return isClusterAvailableFn(first)
}

const isFirstClusterReadyFn = (instance: string) => {
  const first = firstClusterFn(instance)
  if (!first) {
    return true
  }
  return isClusterReadyFn(first)
}

const getKafkaConnectConfig = (
  custom: Record<string, string> | undefined
): KafkaConnectConfig | undefined => {
  if (!custom) {
    return undefined
  }

  const kafkaConnectConfig: KafkaConnectConfig = {}
  kafkaConnectConfig.additionalServlets = custom?.additionalServlets
  kafkaConnectConfig.kafkaConnectCustomLabels = custom?.kafkaConnectCustomLabels
  kafkaConnectConfig.kafkaConnectDefaultNamespace = custom?.kafkaConnectDefaultNamespace
  kafkaConnectConfig.kafkaConnectDefaultServiceAccountName =
    custom?.kafkaConnectDefaultServiceAccountName
  kafkaConnectConfig.kafkaConnectDefaultTenant = custom?.kafkaConnectDefaultTenant
  kafkaConnectConfig.kafkaConnectKubernetesJobNamespace = custom?.kafkaConnectKubernetesJobNamespace
  kafkaConnectConfig.snRBACAuthInterceptorEnabled = custom?.snRBACAuthInterceptorEnabled
  return kafkaConnectConfig
}

export const formatClusterModel = (cluster: V1alpha1PulsarCluster): Cluster => {
  return formatClusterModelWithInstance(cluster, undefined)
}

// locally scoped composable for pinging a cluster
export const usePingCluster = () => {
  const cluster = ref<V1alpha1PulsarCluster>()
  const conditions = computed(() => cluster.value?.status?.conditions ?? [])
  const brokerReadyReplicas = computed<number | undefined>(
    () => cluster.value?.status?.broker?.readyReplicas
  )
  const bookkeeperReadyReplicas = computed<number | undefined>(
    () => cluster.value?.status?.bookkeeper?.readyReplicas
  )
  const zookeeperReadyReplicas = computed<number | undefined>(
    () => cluster.value?.status?.zookeeper?.readyReplicas
  )
  const error = ref('')

  const { resume, pause, isActive } = useIntervalFn(async () => {
    if (!activeCluster.value) {
      // cluster fetch may not be ready, simply retry at next interval
      return
    }
    const clusterName = activeCluster.value.metadata.name
    const organization = activeCluster.value.metadata.namespace
    const clusterUid = activeCluster.value.metadata.uid || ''

    if (!lock.isBusy(clusterUid)) {
      await lock.acquire(clusterUid, async () => {
        try {
          const { data } = await useCloudApi().readNamespacedPulsarCluster(
            clusterName,
            organization
          )
          cluster.value = data
          const formatedCluster = formatClusterModel(data)
          setCluster(formatedCluster, formatedCluster.spec?.instanceName)
        } catch (e) {
          error.value = getErrorMessage(e)
          pause()
        }
      })
    }
  }, 5000)

  return {
    cluster,
    conditions,
    brokerReadyReplicas,
    bookkeeperReadyReplicas,
    zookeeperReadyReplicas,
    resume,
    pause,
    isActive,
    error
  }
}

export const useResourceCalculation = (
  params: {
    brokerNodeCount: Ref<number | undefined>
    brokerCUCount: Ref<number | undefined>
    bookieNodeCount: Ref<number | undefined>
    bookieSUCount: Ref<number | undefined>
  } = {
    brokerNodeCount: computed(() => activeCluster.value?.spec?.broker.replicas),
    brokerCUCount: computed(() => currentClusterResource.value.cu),
    bookieNodeCount: computed(() => activeCluster.value?.spec?.bookkeeper?.replicas),
    bookieSUCount: computed(() => currentClusterResource.value.su)
  }
) => {
  const { brokerNodeCount, brokerCUCount, bookieNodeCount, bookieSUCount } = params
  const cuTotal = computed<number>(() => {
    if (!brokerNodeCount.value || !brokerCUCount.value) {
      return NaN
    }
    return Math.round(brokerNodeCount.value * brokerCUCount.value * 10) / 10
  })
  const suTotal = computed<number>(() => {
    if (!bookieNodeCount.value || !bookieSUCount.value) {
      return NaN
    }
    return Math.round(bookieNodeCount.value * bookieSUCount.value * 10) / 10
  })
  const brokerThroughput = computed<number>(() => {
    return Math.round(cuTotal.value * throughputPerSu * 10) / 10
  })
  const brokerCPUCores = computed<number>(() => {
    return (cuTotal.value * cpuPerUnit) / 1000
  })
  const brokerMemory = computed<number>(() => {
    return cuTotal.value * memoryPerUnitGb
  })
  const bookieThroughput = computed<number>(() => {
    return Math.round(suTotal.value * throughputPerSu * 10) / 10
  })
  const bookieCPUCores = computed<number>(() => {
    return (suTotal.value * cpuPerUnit) / 1000
  })
  const bookieMemory = computed<number>(() => {
    return suTotal.value * memoryGbPerSu
  })
  const totalThroughput = computed<number>(() => {
    if (!bookieThroughput.value) {
      // for free bookie will not be set and throughput will be NaN
      return brokerThroughput.value
    }

    // Min(broker nodes * cus per broker node * broker write throughput, bookie nodes * SUs per bookie node * bookie throughput / 3
    return Math.min(
      brokerThroughput.value,
      Math.round(((suTotal.value * throughputPerSu) / bkReplFactor) * 10) / 10
    )
  })

  return {
    cuTotal,
    suTotal,
    brokerThroughput,
    brokerCPUCores,
    brokerMemory,
    bookieThroughput,
    bookieCPUCores,
    bookieMemory,
    totalThroughput
  }
}

export const isCNPoolMember = (dnsName: string | undefined) => {
  if (dnsName === undefined) {
    return true
  }
  return dnsName.endsWith(CN_POOLMEMBER_DNS)
}

export const init = (
  initialState: PulsarState,
  can?: ((action: string, type: any, conditions?: any) => boolean) | true
) => {
  const { organization, instance, clusterUid } = usePulsarState()
  const { abilityUpdating } = useRbac()

  // TODO shouldn't this also watch instance?? why would you not reset cluster if
  //      you changed instance??
  const valueChanged = async ([org, clusUid, ab]: [
    string | undefined,
    string | undefined,
    boolean | undefined
  ]) => {
    if (!org) {
      clusterMap.value = {}
      lastOrg = undefined
      lastCluster = undefined
      return
    }
    if (ab) {
      return
    }

    const activeCluster = clusters.value.find(clu => clu.metadata.uid === clusUid)
    const isActiveClusterMissing = clusUid ? !activeCluster : true

    if (lastOrg !== org || isActiveClusterMissing || !ab) {
      if (can && (can === true || can(view, PulsarCluster))) {
        await getClusterMap({ organization: org })
      }
    }

    lastOrg = org
    lastCluster = clusUid
  }

  watch([organization, clusterUid, abilityUpdating], valueChanged)
  return valueChanged([initialState.organization, initialState.clusterUid, abilityUpdating.value])
}

const getClusterFromUid = (uid: string) => {
  let cluster = undefined
  for (const instanceName in clusterMap.value) {
    cluster = clusterMap.value[instanceName].find(clu => clu.metadata.uid === uid)
    if (cluster) {
      break
    }
  }
  return cluster
}

const getClusterUidFromName = (name: string) => {
  let cluster = undefined

  for (const instanceName in clusterMap.value) {
    cluster = clusterMap.value[instanceName].find(clu => clu.metadata.name === name)
    if (cluster) {
      break
    }
  }
  return cluster?.metadata.uid
}
export const convertRecurrenceFromMapToString = (recurrence?: Record<string, boolean>) => {
  return Object.entries(recurrence ?? {})
    .filter(([, v]) => v)
    .map(([k]) => {
      switch (k) {
        case 'sunday':
          return '0'
        case 'monday':
          return '1'
        case 'tuesday':
          return '2'
        case 'wednesday':
          return '3'
        case 'thursday':
          return '4'
        case 'friday':
          return '5'
        case 'saturday':
          return '6'
        default:
          return ''
      }
    })
    .join(',')
}

const convertRecurrenceFromStringToMap = (recurrence?: string) => {
  const _recurrence = (recurrence || '').split(',')
  return {
    sunday: _recurrence.includes('0'),
    monday: _recurrence.includes('1'),
    tuesday: _recurrence.includes('2'),
    wednesday: _recurrence.includes('3'),
    thursday: _recurrence.includes('4'),
    friday: _recurrence.includes('5'),
    saturday: _recurrence.includes('6')
  }
}

const recurrenceShowLabel = (recurrence: string) => {
  return recurrence
    .split(',')
    .filter(val => !!val)
    .map(([k]) => {
      switch (k) {
        case '1':
          return t('common.monday')
        case '2':
          return t('common.tuesday')
        case '3':
          return t('common.wednesday')
        case '4':
          return t('common.thursday')
        case '5':
          return t('common.friday')
        case '6':
          return t('common.saturday')
        case '0':
          return t('common.sunday')
        default:
          return 'Anytime'
      }
    })
    .join(', ')
    .replace(/,(\s*[^,]+)$/, ' and$1')
}

const connectorServerInfo = computedAsync(async () => {
  const { organization } = usePulsarState()
  const { audience } = useInstance()
  if (!audience.value || !organization.value) {
    return undefined
  }

  try {
    const client = ConnectorClientFactory.createConnectorClient(
      organization.value,
      activeCluster.value as Cluster,
      audience.value
    ).client
    const { data } = await client.serverInfo()
    return data
  } catch (e) {
    console.error(e)
    return undefined
  }
})

export const useCluster = () => {
  return {
    clusters,
    bookieOptions,
    brokerOptions,
    clusterMap,
    clusterNames,
    activeCluster,
    isClusterReadyFn,
    isClusterAvailableFn,
    isClusterReadyByNameFn,
    isClusterAvailableByNameFn,
    isActiveClusterReady,
    isActiveClusterAvailable,
    functionEnabled,
    currentClusterResource,
    init,
    updateClusterConfigByResource,
    deleteCluster,
    deleteClusterByName,
    getClusterMap,
    createClusterByResources,
    isCNPoolMember,
    getClusterMapRaw,
    getClusterFromUid,
    getClusterUidFromName,
    convertRecurrenceFromMapToString,
    recurrenceShowLabel,
    isClusterDeletedByNameFn,
    isClusterDeletedFn,
    firstClusterFn,
    isFirstClusterFn,
    isFirstClusterByNameFn,
    isFirstClusterAvailableFn,
    isFirstClusterReadyFn,
    connectorServerInfo
  }
}
