import { useCloudApi } from '@/composables/cloudApi'
import { useAnalytics } from '@/composables/analytics'
import type { Location } from '@/api/organizations'
import commonConfig from 'Config/common_config.json'
import type {
  V1alpha1PulsarCluster,
  V1alpha1AuditLog,
  V1ObjectMeta,
  V1alpha1PulsarInstance,
  V1alpha1MaintenanceWindow
} from '@streamnative/cloud-api-client-typescript'
import { i18n } from '@/lang'
import type { ComputedRef, Ref } from 'vue'
import { cpuParser, memoryParser } from 'k8s-resource-parser'
import AsyncLock from 'async-lock'
import { useIntervalFn } from '@vueuse/core'
import { getErrorMessage } from '@/utils/apiHelper'
import type { PulsarState } from './usePulsarState'
import brokerOptions from '@/data/brokerOptions.json'
import bookieOptions from '@/data/bookieOptions.json'
import Decimal from 'decimal.js'
import { isEqual } from 'lodash-es'
import { isIstioEnabledForInstance } from '@/composables/useInstance'
import dayjs from 'dayjs'
import { ConnectorClientFactory } from '@/factory/connector-client-factory'
import {
  fetchTenantsForSpecificInstance,
  fetchTenantAdminWithSpecificInstance
} from '@/api/tenants'
import type { TenantInfo } from '@streamnative/pulsar-admin-client-typescript'
import _ from 'lodash'

let lastOrg: string | undefined = undefined

const { t } = i18n.global
const lock = new AsyncLock()

export const memoryPerUnit = 8589934592 // 8gb
export const memoryPerUnitGb = 8
export const cpuPerUnit = 2000 // 2 full cpu
// see here https://github.com/streamnative/cloud-manager/issues/2974 for details
export const cpuPerUnitRaw = 2
export const maxBrokerCu = 8
export const maxBookieSu = 8
export const throughputPerSu = 125
export const bkReplFactor = 2
export const memoryGbPerSu = 8
export const brokerMinimumNodeCount = 2
export const bookieMinimumNodeCount = 3
export const brokerMaximumNodeCount = 15
export const bookieMaximumNodeCount = 15
export const minThroughput = 5
export const minBrokerCu = 0.5
export const minFreeBrokerCu = 0.1
export const minBookieSu = 0.5
export const minResourceUnit = Math.min(minBrokerCu, minBookieSu)
export const minFreeCU = 0.0298
export const freeResources = {
  brokerCU: minFreeBrokerCu,
  brokerNodeCount: 1,
  bookieSU: undefined as undefined | number,
  bookieNodeCount: undefined as undefined | number,
  config: {} as ClusterConfig
}

// Allow cluster ready for don't block cluster page access on some cases
export const annotationClusterReady = 'cloud.streamnative.io/ready'

// For serverless, let's use pre created cluster as tenant allowed cluster
export const preCreatedClusterNameLabel = 'cloud.streamnative.io/pre-created-pulsar-cluster'

const CN_POOLMEMBER_DNS = '.cn'
export interface Cluster extends V1alpha1PulsarCluster {
  metadata: V1ObjectMeta & {
    name: string
    namespace: string
  }
  generated: {
    serviceEndpointPromises: Promise<ServiceEndpoint>[]
    conditions: ClusterConditions
    deleted: boolean
    brokerVersion?: string // ie. '2.8.0.9'
    bookKeeperVersion?: string // ie. '2.8.0.9'
    isKopEnabled?: boolean
    isMopEnabled?: boolean
    isAmqpEnabled?: boolean
    functionEnabled?: boolean
    recurrenceMap?: Record<string, boolean>
    // eslint-disable-next-line prettier/prettier
    bookieNodeType: (typeof bookieOptions)[number] | undefined
    // eslint-disable-next-line prettier/prettier
    brokerNodeType: (typeof brokerOptions)[number] | undefined
    kafkaConnectConfig?: KafkaConnectConfig
    topicsCount?: number
    subscriptionsCount?: number
    producersCount?: number
    consumersCount?: number
  }
}

export interface ServiceEndpoint {
  access?: string
  gatewayDisplayName: string
  gatewayName?: string
  webServiceUrl?: string // 'https://${dnsName}'
  brokerServiceUrl?: string // 'pulsar+ssl://${dnsName}:6651'
  websocketServiceUrl?: string // 'wss://${dnsName}:9443'
  kopServiceUrl?: string
  kafkaSchemaRegistryUrl?: string
  mqttServiceUrl?: string
}

interface KafkaConnectConfig {
  additionalServlets?: string
  kafkaConnectCustomLabels?: string
  kafkaConnectDefaultNamespace?: string
  kafkaConnectDefaultServiceAccountName?: string
  kafkaConnectDefaultTenant?: string
  kafkaConnectKubernetesJobNamespace?: string
  snRBACAuthInterceptorEnabled?: string
}

export interface ClusterConfig {
  websocketEnabled?: boolean
  // todo: these are never used anywhere?
  transactionEnabled?: boolean
  functionEnabled?: boolean
  auditLog?: V1alpha1AuditLog
  auditLogEnabled?: boolean
  auditLogDescribing?: boolean
  auditLogProducing?: boolean
  auditLogConsuming?: boolean
  custom?: {
    backlogQuotaDefaultLimitBytes?: string | number
    backlogQuotaDefaultRetentionPolicy?: string
    maxProducersPerTopic?: string | number
    maxConsumersPerTopic?: string | number
    maxConsumersPerSubscription?: string | number
    dispatchThrottlingRatePerTopicInByte?: string | number
    dispatchThrottlingRatePerTopicInMsg?: string | number
    dispatchThrottlingRatePerSubscriptionInByte?: string | number
    dispatchThrottlingRatePerSubscriptionInMsg?: string | number
    subscribeThrottlingRatePerConsumer?: string | number
    topicLevelPoliciesEnabled?: string | boolean
    snAuditLogConfig?: string
  }
  protocols?: {
    // Object is the correct type, we serialize this object
    // to a struct on the cloud-api-server end.
    // eslint-disable-next-line @typescript-eslint/ban-types
    kafka?: {}

    // Object is the correct type, we serialize this object
    // to a struct on the cloud-api-server end.
    // eslint-disable-next-line @typescript-eslint/ban-types
    mqtt?: {}

    // Object is the correct type, we serialize this object
    // to a struct on the cloud-api-server end.
    // eslint-disable-next-line @typescript-eslint/ban-types
    amqp?: {}
  }
}

// TODO add kafkaconnect config to cluster
export interface ClusterResourcePayload {
  organization: string
  instance: string
  name: string
  location: string
  poolMemberRef?: {
    namespace: string
    name: string
  }
  brokerCpu: string
  brokerMemory: string
  brokerPods: number
  bookieCpu?: string
  bookieMemory?: string
  bookiePods?: number
  brokerAutoScailingPolicy?: ClusterAutoScalingPolicy
  config?: ClusterConfig
  releaseChannel?: string
  maintenanceWindow?: V1alpha1MaintenanceWindow
  displayName?: string
  isServerless?: boolean
  isURSAFeaturePossible?: boolean
  engine?: 'ursa' | 'classic'
  volume?: string
  catalogs?: string[]
  tableFormat?: string
}

export interface ClusterAutoScalingPolicy {
  enabled: boolean
  replicas: number[]
}

export interface ClusterConditions {
  Ready: boolean
  ServiceEndpointReady: boolean
  ZookeeperReady: boolean
  BookKeeperReady: boolean
  PulsarBrokerReady: boolean
  PulsarProxyReady: boolean
  OxiaReady: boolean
  // true if BookKeeperAvailable && ZookeeperAvailable && PulsarBrokerAvailable when using independent bookkeeper and zookeeper components
  // or true if PulsarBrokerAvailable when using a shared oxia component
  Available: boolean
  BookKeeperAvailable: boolean // true if >= 2
  ZookeeperAvailable: boolean // true if >= 2.  Technically "total node count / 2 + 1" but our node count sound be 3
  PulsarBrokerAvailable: boolean // true if >= 1
}

export interface Lakehouse {
  enabled: boolean
  bucketMode?: string
  roleArn?: string
  bucketName?: string
  bucketLocation?: string
  region?: string
}

export interface Unity {
  uri: string
  name: string
  schemaName: string
  secret: string
}

export interface Iceberg {
  uri: string
  warehouse: string
  secret: string
}

export interface Catalog {
  enabled: boolean
  authType?: string
  provider?: string
  unity?: Unity
  tabular?: Iceberg
  polaris?: Iceberg
  s3Table?: Iceberg
  openCatalog?: Iceberg
  tableMode?: string
}

/* State */
const clusterMap: Ref<Record<string, Array<Cluster>>> = ref({})

const currentClusterResource: ComputedRef<{
  cu: number | undefined
  su: number | undefined
  cuThroughput: number | undefined
  suThroughput: number | undefined
}> = computed(() => {
  const instanceName = usePulsarState().instance.value
  const clusterUid = usePulsarState().clusterUid.value
  const cluster = (clusterMap.value[instanceName || ''] || []).find(clu => {
    return clu.metadata?.uid === clusterUid
  })
  if (!instanceName || !clusterUid || !cluster) {
    return { cu: undefined, su: undefined, cuThroughput: undefined, suThroughput: undefined }
  }

  return getClusterResource(cluster)
})

const getClusterResource = (cluster: Cluster) => {
  const cu = getCu(cluster)
  const su = getSu(cluster)
  const cuThroughput = cu
    ? Math.round(((cu * throughputPerSu) / bkReplFactor) * 10) / 10
    : undefined
  const suThroughput = su
    ? Math.round(((su * throughputPerSu) / bkReplFactor) * 10) / 10
    : undefined
  return { cu, su, cuThroughput, suThroughput }
}

const getCu = (cluster: Cluster): number | undefined => {
  // formula: https://streamnative.slab.com/posts/pricing-and-pricing-strategy-ff8ryce6
  if (cluster.spec?.broker.resources) {
    const cpu = cpuParser(cluster.spec?.broker.resources.cpu)
    const memory = memoryParser(cluster.spec?.broker.resources.memory)
    const cu = Math.max(cpu / cpuPerUnitRaw, memory / memoryPerUnit)
    return Math.round(cu * 100) / 100
  }
  const brokerOption = brokerOptions.find(
    brokerOption => brokerOption.name === cluster.generated.brokerNodeType?.name
  )
  if (brokerOption) {
    const cpu = brokerOption.cpu
    const memory = memoryParser(brokerOption.memory)
    const cu = Math.max(cpu / cpuPerUnitRaw, memory / memoryPerUnit)
    return Math.round(cu * 100) / 100
  }

  // this most likely means error
  return undefined
}

const getSu = (cluster: Cluster): number | undefined => {
  // formula: https://streamnative.slab.com/posts/pricing-and-pricing-strategy-ff8ryce6
  if (cluster.spec?.bookKeeperSetRef) {
    // using shared bookie, probably free cluster, returning 0
    return 0
  }
  if (cluster.spec?.bookkeeper?.resources) {
    const cpu = cpuParser(cluster.spec?.bookkeeper?.resources.cpu)
    const memory = memoryParser(cluster.spec?.bookkeeper?.resources.memory)
    const cu = Math.max(cpu / cpuPerUnitRaw, memory / memoryPerUnit)
    return Math.round(cu * 100) / 100
  }
  const bookieOption = bookieOptions.find(
    bookieOption => bookieOption.name === cluster.generated.bookieNodeType?.name
  )
  if (bookieOption) {
    const cpu = bookieOption.cpu
    const memory = memoryParser(bookieOption.memory)
    const cu = Math.max(cpu / cpuPerUnitRaw, memory / memoryPerUnit)
    return Math.round(cu * 100) / 100
  }

  // this most likely means error
  return undefined
}

const mustGetSelectedInstance = (): string => {
  return usePulsarState().mustInstance()
}

const clusters: ComputedRef<Array<Cluster>> = computed(() => {
  const { instance } = usePulsarState()
  if (!instance.value || !clusterMap.value[instance.value]) {
    return []
  }
  return clusterMap.value[instance.value].sort((a, b) => {
    return a.metadata.name.localeCompare(b.metadata.name)
  })
})

/* Getters */
const clusterNames: ComputedRef<Array<string>> = computed(() => {
  const { instance } = usePulsarState()
  if (!instance.value || !clusterMap.value[instance.value]) {
    return []
  }

  return clusterMap.value[instance.value].map(clu => clu.metadata.name).sort()
})

const activeCluster: ComputedRef<Cluster | undefined> = computed(() => {
  const { instance, clusterUid } = usePulsarState()
  if (!instance.value) {
    return undefined
  }

  if (!clusterMap.value[instance.value]?.values) {
    return undefined
  }

  const targetCluster = clusterMap.value[instance.value]?.find(clu => {
    return clu.metadata?.uid === clusterUid.value
  })

  if (targetCluster) {
    return targetCluster
  }

  return undefined
})

const activeClusterName = computed(() => activeCluster.value?.metadata?.name)

const isClusterReadyByNameFn = (name: string, instance: string) => {
  const cluster = clusterMap.value[instance]?.find(
    clusterItem => clusterItem.metadata.name === name
  )
  return isClusterReadyFn(cluster)
}

const isClusterAvailableByNameFn = (name: string, instance: string) => {
  const cluster = clusterMap.value[instance]?.find(
    clusterItem => clusterItem.metadata.name === name
  )
  return isClusterAvailableFn(cluster)
}

const isClusterReadyFn = (cluster: Cluster | undefined) => {
  if (!cluster) {
    return false
  }
  const annotations = cluster?.metadata?.annotations
  const assumeClusterReady = annotations && annotations[annotationClusterReady] === 'true'
  if (assumeClusterReady) {
    return true
  }
  return cluster.generated.conditions.Ready
}

const isClusterAvailableFn = (cluster: Cluster | undefined) => {
  if (!cluster) {
    return false
  }
  return isClusterReadyFn(cluster) || cluster.generated.conditions.Available
}

const isActiveClusterReady: ComputedRef<boolean> = computed(() => {
  return isClusterReadyFn(activeCluster.value)
})

const isActiveClusterAvailable: ComputedRef<boolean> = computed(() => {
  return isClusterAvailableFn(activeCluster.value)
})

const functionEnabled: ComputedRef<boolean> = computed(() => {
  if (
    activeCluster.value &&
    activeCluster.value.spec?.config &&
    activeCluster.value.spec?.config.functionEnabled
  ) {
    return true
  }
  return false
})

const ursaEnabled: ComputedRef<boolean> = computed(() => {
  if (
    activeCluster.value &&
    activeCluster.value.metadata.annotations &&
    activeCluster.value.metadata.annotations['cloud.streamnative.io/engine'] === 'ursa'
  ) {
    return true
  }
  return false
})

/* Mutations */
const setCluster = (cluster: Cluster, instanceName: string = mustGetSelectedInstance()) => {
  const clusterIndex = clusterMap.value[instanceName]
    ? clusterMap.value[instanceName].findIndex(
        clusterItem => clusterItem.metadata.name === cluster.metadata.name
      )
    : -1
  if (clusterIndex > -1) {
    clusterMap.value[instanceName].splice(clusterIndex, 1, cluster)
  } else {
    clusterMap.value[instanceName] = [cluster]
  }
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const setActiveClusterDeleted = (cluster: Cluster) => {
  cluster.generated.deleted = true
}

const isClusterDeletedFn = (cluster: Cluster | undefined) => {
  if (!cluster) {
    return false
  }
  return cluster.metadata.deletionTimestamp || cluster.generated.deleted === true
}

const isClusterDeletedByNameFn = (name: string, instance: string) => {
  const cluster = clusterMap.value[instance]?.find(
    clusterItem => clusterItem.metadata.name === name
  )
  return isClusterDeletedFn(cluster)
}

const createClusterByResources = async (payload: ClusterResourcePayload): Promise<Cluster> => {
  try {
    const body = await validateAndGetClusterPayloadResource(payload, true)
    const newCluster = formatClusterModel(
      (await useCloudApi().createNamespacedPulsarCluster(payload.organization, body)).data
    )
    setCluster(newCluster, payload.instance)
    return newCluster
  } catch (e) {
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore: Object is of type 'unknown'
    if (e && e.response && e.response.status === 409) {
      throw Error(t('cluster.duplicateClusterError'))
    }
    throw Error(getErrorMessage(e))
  }
}

const getJsonPatch = (path: string, cluster: Cluster, updatePayload: ClusterResourcePayload) => {
  const tokenizedPath = path.split('/')
  const lastPath = tokenizedPath.pop()

  let tempClusterValue: any = cluster
  let tempUpdatePayloadValue: any = updatePayload
  tokenizedPath.forEach(p => {
    if (!p) {
      return
    } else if (p === 'spec') {
      tempClusterValue = tempClusterValue?.[p]
    } else {
      tempClusterValue = tempClusterValue?.[p]
      tempUpdatePayloadValue = tempUpdatePayloadValue?.[p]
    }
  })

  const desiredValue = tempUpdatePayloadValue?.[lastPath as string]
  const currentValue = tempClusterValue?.[lastPath as string]

  if (isEqual(currentValue, desiredValue)) {
    // nothing to do
    return []
  } else if (currentValue && !desiredValue) {
    // value is removed
    return [{ op: 'remove', path }]
  } else {
    // value is updated
    return [{ op: 'replace', path, value: desiredValue }]
  }
}

const updateClusterConfigByResource = async (payload: ClusterResourcePayload): Promise<Cluster> => {
  // 'custom' attrs numbers must be strings in request
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  const patchArray: Record<string, any> = []
  const clusterObj = clusterMap.value[payload.instance].find(c => c.metadata.name === payload.name)
  if (!clusterObj) {
    throw Error('cluster to update is not found')
  }
  if (!clusterObj?.spec?.config) {
    patchArray.push({
      op: 'add',
      path: `/spec/config`,
      value: {}
    })
  }

  if (!clusterObj?.spec?.config?.custom) {
    patchArray.push({
      op: 'add',
      path: `/spec/config/custom`,
      value: {}
    })
  }

  Object.entries(payload.config?.custom || {}).map(([key, value]) => {
    if (value === undefined) {
      if ((clusterObj?.spec?.config?.custom ?? {})[key]) {
        patchArray.push({
          op: 'remove',
          path: `/spec/config/custom/${key}`
        })
      }
    } else {
      patchArray.push({
        op: 'replace',
        path: `/spec/config/custom/${key}`,
        value: String(value)
      })
    }
  })

  if (!clusterObj.spec?.config?.protocols) {
    patchArray.push({
      op: 'add',
      path: '/spec/config/protocols',
      value: {}
    })
  }
  patchArray.push(...getJsonPatch(`/spec/config/protocols/kafka`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/config/protocols/mqtt`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/config/protocols/amqp`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/config/websocketEnabled`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/config/auditLog`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/config/transactionEnabled`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/config/functionEnabled`, clusterObj, payload))
  patchArray.push(...getJsonPatch(`/spec/maintenanceWindow`, clusterObj, payload))

  patchArray.push({
    op: 'replace',
    path: '/spec/broker/replicas',
    value: payload.brokerPods
  })
  if (payload.engine !== 'ursa') {
    patchArray.push({
      op: 'replace',
      path: '/spec/bookkeeper/replicas',
      value: payload.bookiePods
    })
  }

  if (clusterObj?.spec?.broker?.resources) {
    patchArray.push({
      op: 'replace',
      path: '/spec/broker/resources/cpu',
      value: payload.brokerCpu
    })
    patchArray.push({
      op: 'replace',
      path: '/spec/broker/resources/memory',
      value: payload.brokerMemory
    })
  } else {
    patchArray.push({
      op: 'add',
      path: '/spec/broker/resources',
      value: { cpu: payload.brokerCpu, memory: payload.brokerMemory }
    })
  }

  if (payload.brokerAutoScailingPolicy?.enabled) {
    if (clusterObj?.spec?.broker.autoScalingPolicy) {
      patchArray.push({
        op: 'replace',
        path: '/spec/broker/autoScalingPolicy',
        value: {
          minReplicas: payload.brokerAutoScailingPolicy.replicas[0],
          maxReplicas: payload.brokerAutoScailingPolicy.replicas[1]
        }
      })
    } else {
      patchArray.push({
        op: 'add',
        path: '/spec/broker/autoScalingPolicy',
        value: {
          minReplicas: payload.brokerAutoScailingPolicy.replicas[0],
          maxReplicas: payload.brokerAutoScailingPolicy.replicas[1]
        }
      })
    }
  } else if (clusterObj?.spec?.broker.autoScalingPolicy) {
    patchArray.push({
      op: 'remove',
      path: '/spec/broker/autoScalingPolicy'
    })
  }

  if (payload.engine !== 'ursa') {
    if (clusterObj?.spec?.bookkeeper?.resources) {
      patchArray.push({
        op: 'replace',
        path: '/spec/bookkeeper/resources/cpu',
        value: payload.bookieCpu
      })
      patchArray.push({
        op: 'replace',
        path: '/spec/bookkeeper/resources/memory',
        value: payload.bookieMemory
      })
    } else {
      patchArray.push({
        op: 'add',
        path: '/spec/bookkeeper/resources',
        value: { cpu: payload.bookieCpu, memory: payload.bookieMemory }
      })
    }
  }
  try {
    const { data } = await useCloudApi().patchNamespacedPulsarCluster(
      payload.name,
      payload.organization,
      patchArray
    )
    const clusterModel = formatClusterModel(data)
    setCluster(clusterModel)
    return clusterModel
  } catch (e) {
    throw Error(getErrorMessage(e))
  }
}

const deleteCluster = async (cluster: Cluster) => {
  await useCloudApi().deleteNamespacedPulsarCluster(
    cluster.metadata.name,
    cluster.metadata.namespace
  )
  setActiveClusterDeleted(cluster)
}

const deleteClusterByName = async (name: string, instance: string) => {
  const _cluster = clusterMap.value[instance].find(item => item.metadata.name === name)
  if (_cluster) {
    await deleteCluster(_cluster)
  }
}

const getClusterMap = async ({
  organization,
  instances
}: {
  organization: string
  instances?: V1alpha1PulsarInstance[]
}) => {
  const cm = await getClusterMapRaw({ organization, instances })
  clusterMap.value = cm
}

export const isLoadingClusters = ref(false)

const getClusterMapRaw = async ({
  organization,
  instances
}: {
  organization: string
  instances?: V1alpha1PulsarInstance[]
}) => {
  isLoadingClusters.value = true
  try {
    const { data } = await useCloudApi().listNamespacedPulsarCluster(organization)
    if (!instances) {
      const {
        data: { items }
      } = await useCloudApi().listNamespacedPulsarInstance(organization)
      instances = items
    }

    useAnalytics().identifyUser()

    const cm: Record<string, Array<Cluster>> = {}
    data.items?.forEach(cluster => {
      if (!cluster.spec?.instanceName) {
        // invalid instance name is passed in
        console.warn(`invalid instance name is passed in: ${cluster}`)
        return
      }
      const instanceName = cluster.spec?.instanceName

      if (!cm[instanceName]) {
        cm[instanceName] = []
      }

      // for every organization only _1_ instance should have the same name, or there is a bug
      cm[instanceName].push(
        formatClusterModelWithInstance(
          cluster,
          instances?.filter(i => i.metadata?.name === instanceName)[0]
        )
      )
    })

    return cm
  } catch (e) {
    throw Error(getErrorMessage(e))
  } finally {
    isLoadingClusters.value = false
  }
}

const convertMillis = (value: string) => {
  if (!value.endsWith('m')) {
    return value
  }
  // allow 10th decimal
  return new Decimal(value.replace('m', '')).div(100).round().div(10).toString()
}

/**
 * Validates and converts UI's custom pulsar cluster object to V1alpha1PulsarCluster
 * @param payload Custom object that represents a pulsar cluster in UI
 * @param isCreate if is for create or update cluster
 * @returns V1alpha1PulsarCluster our API accepts and we should be using
 */
const validateAndGetClusterPayloadResource = async (
  payload: ClusterResourcePayload,
  isCreate: boolean
): Promise<V1alpha1PulsarCluster> => {
  if (payload.isServerless) {
    // serverless cluster does not require name, just use display name
    if (!payload.displayName) {
      throw Error(t('cluster.clusterNameIsRequired'))
    }
  } else {
    if (!payload.name) {
      throw Error(t('cluster.clusterNameIsRequired'))
    }
  }

  if (!payload.location) {
    throw Error(t('cluster.clusterLocationIsRequired'))
  }

  const { data } = await useCloudApi().readNamespacedPulsarInstance(
    payload.instance,
    payload.organization
  )
  const isFree = data.spec?.type === 'free'

  // TODO need to scope this out better
  if (isFree && (Number(payload.brokerCpu) > 0.2 || payload.bookieCpu)) {
    throw Error(t('cluster.unableToCreateFreeBrokerResource')) // TODO refine this error message
  }

  const configValue = isFree
    ? undefined
    : JSON.parse(JSON.stringify(payload.config, (k, v) => (typeof v !== 'number' ? v : '' + v)))

  Object.entries(configValue?.custom ?? {}).forEach(([key, value]) => {
    if (!value) {
      delete configValue.custom[key]
    }
  })

  if (isCreate) {
    if (
      !isFree &&
      (payload.bookieCpu === undefined ||
        payload.bookieMemory === undefined ||
        payload.bookiePods === undefined)
    ) {
      throw Error(t('cluster.bookieResourcesRequired')) // TODO refine this error message
    }

    // For create, return minimal cluster object needed to create
    const cluster: V1alpha1PulsarCluster = {
      kind: 'PulsarCluster',
      metadata: {
        name: payload.isServerless ? '' : payload.name,
        namespace: payload.organization
      },
      spec: {
        instanceName: payload.instance,
        displayName: payload.displayName,
        location: payload.location,
        poolMemberRef: payload.poolMemberRef,
        config: configValue,
        broker: {
          replicas: payload.brokerPods,
          resources: {
            cpu: payload.brokerCpu,
            memory: payload.brokerMemory
          }
        },
        bookkeeper:
          isFree ||
          payload.isServerless ||
          (payload.isURSAFeaturePossible && payload.engine === 'ursa') ||
          payload.bookieCpu === undefined ||
          payload.bookieMemory === undefined ||
          payload.bookiePods === undefined
            ? undefined
            : {
                replicas: payload.bookiePods,
                resources: {
                  cpu: payload.bookieCpu,
                  memory: payload.bookieMemory
                }
              }
      }
    }

    if (payload.isURSAFeaturePossible && payload.engine === 'ursa') {
      // eslint-disable-next-line @typescript-eslint/ban-ts-comment
      // @ts-ignore defined on line 31
      cluster.metadata.annotations = {
        'cloud.streamnative.io/engine': 'ursa'
      }
      if (payload.volume && payload.volume !== '') {
        // eslint-disable-next-line @typescript-eslint/ban-ts-comment
        // @ts-ignore defined on line 31
        cluster.spec.volume = {
          name: payload.volume
        }
      }
      if (payload.catalogs && payload.catalogs.length > 0) {
        // eslint-disable-next-line @typescript-eslint/ban-ts-comment
        // @ts-ignore defined on line 31
        cluster.spec.catalogs = payload.catalogs
      }
      if (payload.tableFormat && payload.tableFormat !== '') {
        // eslint-disable-next-line @typescript-eslint/ban-ts-comment
        // @ts-ignore defined on line 31
        cluster.spec.tableFormat = payload.tableFormat
      }
    }

    if (payload.brokerAutoScailingPolicy?.enabled && cluster.spec) {
      cluster.spec.broker = Object.assign(cluster.spec.broker || {}, {
        autoScalingPolicy: {
          minReplicas: payload.brokerAutoScailingPolicy.replicas[0],
          maxReplicas: payload.brokerAutoScailingPolicy.replicas[1]
        }
      })
    }
    if (payload.releaseChannel && cluster.spec) {
      cluster.spec.releaseChannel = payload.releaseChannel
    }
    if (payload.maintenanceWindow && cluster.spec) {
      cluster.spec.maintenanceWindow = payload.maintenanceWindow
    }
    if (payload.isServerless) {
      delete cluster.spec?.config
    }
    return cluster
  }

  // For updates, there are some immutable values, such as poolRef, and we should maintain
  // those values.  Thus we are fetching the base model and modifying what we can on top.
  const cluster = (
    await useCloudApi().readNamespacedPulsarCluster(payload.name, payload.organization)
  ).data

  if (!cluster.spec) {
    throw Error(t('cluster.specIsMissing'))
  }

  cluster.spec.config = configValue
  if (isFree) {
    // free
    cluster.spec.bookkeeper = undefined
  } else {
    if (
      payload.bookieCpu === undefined ||
      payload.bookieMemory === undefined ||
      payload.bookiePods === undefined
    ) {
      throw Error(t('cluster.bookieResourcesRequired')) // TODO refine this error message
    }

    // not free
    cluster.spec.bookkeeper = Object.assign(cluster.spec.bookkeeper || {}, {
      replicas: payload.bookiePods,
      resources: Object.assign(cluster.spec?.bookkeeper?.resources || {}, {
        cpu: payload.bookieCpu,
        memory: payload.bookieMemory
      })
    })
  }
  cluster.spec.broker = Object.assign(cluster.spec.broker || {}, {
    replicas: payload.brokerPods,
    resources: Object.assign(cluster.spec?.broker?.resources || {}, {
      cpu: payload.brokerCpu,
      memory: payload.brokerMemory
    })
  })

  if (payload.brokerAutoScailingPolicy?.enabled && cluster.spec) {
    cluster.spec.broker = Object.assign(cluster.spec.broker || {}, {
      autoScalingPolicy: {
        minReplicas: payload.brokerAutoScailingPolicy.replicas[0],
        maxReplicas: payload.brokerAutoScailingPolicy.replicas[1]
      }
    })
  }
  if (payload.releaseChannel && cluster.spec) {
    cluster.spec.releaseChannel = payload.releaseChannel
  }
  if (payload.maintenanceWindow && cluster.spec) {
    cluster.spec.maintenanceWindow = payload.maintenanceWindow
  }
  return cluster
}

export const formatClusterModelWithInstance = (
  cluster: V1alpha1PulsarCluster,
  instance?: V1alpha1PulsarInstance
): Cluster => {
  if (!cluster.metadata?.uid) {
    throw new Error('Cluster is missing UID')
  }

  const brokerReadyReplicas = cluster?.status?.broker?.readyReplicas
  const bookkeeperReadyReplicas = cluster?.status?.bookkeeper?.readyReplicas
  const zookeeperReadyReplicas = cluster?.status?.zookeeper?.readyReplicas

  const conditions = (cluster.status?.conditions?.reduce(
    (curr: Record<string, boolean> = {}, condition) => {
      curr[condition.type] = condition.status === 'True'
      return curr
    },
    {}
  ) ?? {
    Ready: false,
    ServiceEndpointReady: false,
    ZookeeperReady: false,
    BookKeeperReady: false,
    PulsarBrokerReady: false,
    PulsarProxyReady: false,
    Available: false,
    BookKeeperAvailable: false,
    ZookeeperAvailable: false,
    PulsarBrokerAvailable: false
  }) as unknown as ClusterConditions

  conditions.PulsarBrokerAvailable = brokerReadyReplicas !== undefined && brokerReadyReplicas > 0
  conditions.BookKeeperAvailable =
    bookkeeperReadyReplicas !== undefined && bookkeeperReadyReplicas > 1
  conditions.ZookeeperAvailable = zookeeperReadyReplicas !== undefined && zookeeperReadyReplicas > 1

  // senario using shared oxia
  if (bookkeeperReadyReplicas === undefined && zookeeperReadyReplicas === undefined) {
    conditions.Available = conditions.PulsarBrokerAvailable
  } else if (zookeeperReadyReplicas === undefined) {
    conditions.Available =
      conditions.PulsarBrokerAvailable && conditions.BookKeeperAvailable && conditions.OxiaReady
  } else {
    conditions.Available =
      conditions.PulsarBrokerAvailable &&
      conditions.BookKeeperAvailable &&
      conditions.ZookeeperAvailable
  }

  if (cluster.metadata?.annotations?.[annotationClusterReady] === 'true') {
    // override to all true if manual override annotation is set
    conditions.Ready = true
    conditions.ServiceEndpointReady = true
    conditions.ZookeeperReady = true
    conditions.BookKeeperReady = true
    conditions.PulsarBrokerReady = true
    conditions.PulsarProxyReady = true
    conditions.Available = true
    conditions.BookKeeperAvailable = true
    conditions.ZookeeperAvailable = true
    conditions.PulsarBrokerAvailable = true
  }

  let port = useInstance().istioEnabled.value ? 443 : 9443
  if (instance) {
    port = isIstioEnabledForInstance(instance) ? 443 : 9443
  }

  const webServiceEndpoints = _.cloneDeep(
    (cluster.spec?.serviceEndpoints || [])
      .filter(endpoint => endpoint?.type === 'service' && endpoint.dnsName)
      .map(endpoint => {
        if (endpoint.gateway === 'default') {
          endpoint.gateway = undefined
        }
        return endpoint
      })
      .sort((a, b) => (a.gateway || '').localeCompare(b.gateway || ''))
  )
  const serviceEndpointPromises = webServiceEndpoints.map(
    async (_endpoint, index): Promise<ServiceEndpoint> => {
      const endpoint = _.clone(_endpoint)
      const dnsName = endpoint.dnsName
      let access = ''
      if (endpoint.gateway && endpoint.gateway !== 'default') {
        try {
          const gateway = await useCloudApi().readNamespacedPulsarGateway(
            endpoint.gateway,
            cluster.metadata?.namespace as string
          )
          access = gateway.data.spec?.access || 'public'
        } catch (e) {
          console.warn(`failed to get gateway ${endpoint.gateway}`, e)
        }
      }
      return {
        access: access,
        gatewayName: endpoint.gateway || 'default',
        gatewayDisplayName: index === 0 ? 'default' : `${access} gateway [${index}]`,
        webServiceUrl: dnsName && `https://${dnsName}`,
        brokerServiceUrl: dnsName && `pulsar+ssl://${dnsName}:6651`,
        websocketServiceUrl:
          cluster.spec?.config?.websocketEnabled && dnsName
            ? `wss://${dnsName}:${port}`
            : undefined,
        kopServiceUrl:
          !!cluster.spec?.config?.protocols?.kafka && dnsName ? `${dnsName}:9093` : undefined,
        kafkaSchemaRegistryUrl:
          !!cluster.spec?.config?.protocols?.kafka && dnsName
            ? `https://${dnsName}/kafka`
            : undefined,
        mqttServiceUrl:
          !!cluster.spec?.config?.protocols?.mqtt && dnsName ? `${dnsName}:8883` : undefined
      }
    }
  )

  // we allow k8 resource syntaxes, such as `32Gi`, `2000m`, `16` and etc.  However, for larger numeric values
  // causees underlying library we depends on to error on calculation.
  // i.e. `require('k8s-resource-parser').memoryParser('30923764531200m') === NaN`
  // Because of this, we are reducing probability of running into this issue by converting from milli.
  if (cluster.spec?.broker?.resources?.cpu) {
    cluster.spec.broker.resources.cpu = convertMillis(cluster.spec.broker.resources.cpu)
  }
  if (cluster.spec?.broker?.resources?.memory) {
    cluster.spec.broker.resources.memory = convertMillis(cluster.spec.broker.resources.memory)
  }
  if (cluster.spec?.bookkeeper?.resources?.cpu) {
    cluster.spec.bookkeeper.resources.cpu = convertMillis(cluster.spec.bookkeeper.resources.cpu)
  }
  if (cluster.spec?.bookkeeper?.resources?.memory) {
    cluster.spec.bookkeeper.resources.memory = convertMillis(
      cluster.spec.bookkeeper.resources.memory
    )
  }

  const res = {
    ...cluster,
    metadata: {
      // this is an annoying part of the generated V1alpha1PulsarCluster and other kubernete objects.
      // although name and namespace is a required field, it is marked as optional.  Overriding this
      // optionality so that we don't have to do same check over and over when using name and namespace.
      ...cluster.metadata,
      name: cluster.metadata.name as string,
      namespace: cluster.metadata.namespace as string
    },
    generated: {
      conditions,
      serviceEndpointPromises,
      deleted: !!cluster.metadata?.deletionTimestamp,
      brokerVersion: cluster.spec?.broker?.image?.split(':')[1],
      bookKeeperVersion: cluster.spec?.bookkeeper?.image?.split(':')[1],
      bookieNodeType: bookieOptions.find(
        bo => bo.name === cluster.spec?.bookkeeper?.resourceSpec?.nodeType
      ),
      brokerNodeType: brokerOptions.find(
        bo => bo.name === cluster.spec?.broker?.resourceSpec?.nodeType
      ),
      isKopEnabled: !!cluster.spec?.config?.protocols?.kafka,
      functionEnabled: cluster.spec?.config?.functionEnabled,
      recurrenceMap: convertRecurrenceFromStringToMap(cluster.spec?.maintenanceWindow?.recurrence),
      isMopEnabled: !!cluster.spec?.config?.protocols?.mqtt,
      isAmqpEnabled: !!cluster.spec?.config?.protocols?.amqp,
      kafkaConnectConfig: getKafkaConnectConfig(cluster.spec?.config?.custom)
    }
  }
  return res
}

const firstClusterFn = (instance: string) => {
  const _clusters = clusterMap.value[instance] as Cluster[]
  if (!_clusters || !_clusters.length) {
    return undefined
  }
  // here is important, deep clone the cluster to avoid the clusters sort update cause the watch exec cause UI stuck
  return (JSON.parse(JSON.stringify(_clusters)) as Cluster[]).sort((clsA, clsB) => {
    return (
      dayjs(clsA.metadata.creationTimestamp).unix() - dayjs(clsB.metadata.creationTimestamp).unix()
    )
  })[0]
}

const isFirstClusterByNameFn = (instance: string, clusterName: string) => {
  const first = firstClusterFn(instance)
  if (!first) {
    return true
  }
  return first.metadata.name === clusterName
}

const isFirstClusterFn = (instance: string, cluster: Cluster) => {
  const first = firstClusterFn(instance)
  if (!first) {
    return true
  }
  return first.metadata.name === cluster?.metadata?.name
}

const isFirstClusterAvailableFn = (instance: string) => {
  const first = firstClusterFn(instance)
  if (!first) {
    return true
  }
  return isClusterAvailableFn(first)
}

const isFirstClusterReadyFn = (instance: string) => {
  const first = firstClusterFn(instance)
  if (!first) {
    return true
  }
  return isClusterReadyFn(first)
}

const getKafkaConnectConfig = (
  custom: Record<string, string> | undefined
): KafkaConnectConfig | undefined => {
  if (!custom) {
    return undefined
  }

  const kafkaConnectConfig: KafkaConnectConfig = {}
  kafkaConnectConfig.additionalServlets = custom?.additionalServlets
  kafkaConnectConfig.kafkaConnectCustomLabels = custom?.kafkaConnectCustomLabels
  kafkaConnectConfig.kafkaConnectDefaultNamespace = custom?.kafkaConnectDefaultNamespace
  kafkaConnectConfig.kafkaConnectDefaultServiceAccountName =
    custom?.kafkaConnectDefaultServiceAccountName
  kafkaConnectConfig.kafkaConnectDefaultTenant = custom?.kafkaConnectDefaultTenant
  kafkaConnectConfig.kafkaConnectKubernetesJobNamespace = custom?.kafkaConnectKubernetesJobNamespace
  kafkaConnectConfig.snRBACAuthInterceptorEnabled = custom?.snRBACAuthInterceptorEnabled
  return kafkaConnectConfig
}

export const formatClusterModel = (cluster: V1alpha1PulsarCluster): Cluster => {
  return formatClusterModelWithInstance(cluster, undefined)
}

// locally scoped composable for pinging a cluster
export const usePingCluster = () => {
  const cluster = ref<V1alpha1PulsarCluster>()
  const conditions = computed(() => cluster.value?.status?.conditions ?? [])
  const brokerReadyReplicas = computed<number | undefined>(
    () => cluster.value?.status?.broker?.readyReplicas
  )
  const bookkeeperReadyReplicas = computed<number | undefined>(
    () => cluster.value?.status?.bookkeeper?.readyReplicas
  )
  const zookeeperReadyReplicas = computed<number | undefined>(
    () => cluster.value?.status?.zookeeper?.readyReplicas
  )
  const error = ref('')

  const { resume, pause, isActive } = useIntervalFn(async () => {
    if (!activeCluster.value) {
      // cluster fetch may not be ready, simply retry at next interval
      return
    }
    const clusterName = activeCluster.value.metadata.name
    const organization = activeCluster.value.metadata.namespace
    const clusterUid = activeCluster.value.metadata.uid || ''

    if (!lock.isBusy(clusterUid)) {
      await lock.acquire(clusterUid, async () => {
        try {
          const { data } = await useCloudApi().readNamespacedPulsarCluster(
            clusterName,
            organization
          )
          cluster.value = data
          const formatedCluster = formatClusterModel(data)
          setCluster(formatedCluster, formatedCluster.spec?.instanceName)
        } catch (e) {
          error.value = getErrorMessage(e)
          pause()
        }
      })
    }
  }, 5000)

  return {
    cluster,
    conditions,
    brokerReadyReplicas,
    bookkeeperReadyReplicas,
    zookeeperReadyReplicas,
    resume,
    pause,
    isActive,
    error
  }
}

export const useResourceCalculation = (params: {
  brokerNodeCount: Ref<number | undefined>
  brokerCUCount: Ref<number | undefined>
  bookieNodeCount: Ref<number | undefined>
  bookieSUCount: Ref<number | undefined>
  zookieSuCount: Ref<number | undefined>
  brokerAutoscaling?: Ref<{ enabled: boolean; replicas: number[] } | undefined>
  isIstio: Ref<boolean>
  isZookeeperCharged: Ref<boolean>
}) => {
  const {
    brokerNodeCount,
    brokerCUCount,
    bookieNodeCount,
    bookieSUCount,
    zookieSuCount,
    brokerAutoscaling,
    isIstio,
    isZookeeperCharged
  } = params
  const getCuTotal = (brokerNodeCount?: number, brokerCUCount?: number) => {
    if (!brokerNodeCount || !brokerCUCount) {
      return NaN
    }
    return Math.round(brokerNodeCount * brokerCUCount * 10) / 10
  }
  const getSuTotal = (bookieNodeCount?: number, bookieSUCount?: number) => {
    if (!bookieNodeCount || !bookieSUCount) {
      return NaN
    }
    return Math.round(bookieNodeCount * bookieSUCount * 10) / 10
  }
  const getBrokerThroughput = (cuTotal: number) => {
    return Math.round(cuTotal * throughputPerSu * 10) / 10
  }
  const getBrokerCPUCores = (cuTotal: number) => {
    return (cuTotal * cpuPerUnit) / 1000
  }
  const getBrokerMemory = (cuTotal: number) => {
    return cuTotal * memoryPerUnitGb
  }
  const getBookieThroughput = (suTotal: number) => {
    return Math.round(suTotal * throughputPerSu * 10) / 10
  }
  const getBookieCPUCores = (suTotal: number) => {
    return (suTotal * cpuPerUnit) / 1000
  }
  const getBookieMemory = (suTotal: number) => {
    return suTotal * memoryGbPerSu
  }

  const getTotalThroughput = (brokerThroughput: number, suTotal: number) => {
    if (!suTotal) {
      return brokerThroughput
    }
    // Min(broker nodes * cus per broker node * broker write throughput, bookie nodes * SUs per bookie node * bookie throughput / 3
    return Math.min(
      brokerThroughput,
      Math.round(((suTotal * throughputPerSu) / bkReplFactor) * 10) / 10
    )
  }

  const autoscaleMinMax = computed<{ min: number; max: number } | undefined>(() => {
    if (brokerAutoscaling?.value?.enabled && brokerAutoscaling?.value?.replicas?.length === 2) {
      return {
        min: brokerAutoscaling.value.replicas[0],
        max: brokerAutoscaling.value.replicas[1]
      }
    }
  })

  const minCUTotal = computed<number>(() => {
    if (autoscaleMinMax.value) {
      return getCuTotal(autoscaleMinMax.value.min, brokerCUCount.value)
    }
    return getCuTotal(brokerNodeCount.value, brokerCUCount.value)
  })
  const maxCUTotal = computed<number>(() => {
    if (autoscaleMinMax.value) {
      return getCuTotal(autoscaleMinMax.value.max, brokerCUCount.value)
    }
    return getCuTotal(brokerNodeCount.value, brokerCUCount.value)
  })
  const suTotal = computed<number>(() => {
    return getSuTotal(bookieNodeCount.value, bookieSUCount.value)
  })
  const minBrokerThroughput = computed<number>(() => {
    return getBrokerThroughput(minCUTotal.value)
  })
  const maxBrokerThroughput = computed<number>(() => {
    return getBrokerThroughput(maxCUTotal.value)
  })
  const minBrokerCPUCores = computed<number>(() => {
    return getBrokerCPUCores(minCUTotal.value)
  })
  const maxBrokerCPUCores = computed<number>(() => {
    return getBrokerCPUCores(maxCUTotal.value)
  })
  const minBrokerMemory = computed<number>(() => {
    return getBrokerMemory(minCUTotal.value)
  })
  const maxBrokerMemory = computed<number>(() => {
    return getBrokerMemory(maxCUTotal.value)
  })
  const bookieThroughput = computed<number>(() => {
    return getBookieThroughput(suTotal.value)
  })
  const bookieCPUCores = computed<number>(() => {
    return getBookieCPUCores(suTotal.value)
  })
  const bookieMemory = computed<number>(() => {
    return getBookieMemory(suTotal.value)
  })
  const maxTotalThroughput = computed<number>(() => {
    return getTotalThroughput(maxBrokerThroughput.value, suTotal.value)
  })
  const compactPrintResource = (v: number) => {
    return Intl.NumberFormat('en-US', {
      notation: 'compact',
      maximumFractionDigits: 0
    }).format(v)
  }

  const cuDisplay = computed<string>(() => {
    if (minCUTotal.value !== maxCUTotal.value) {
      return `${minCUTotal.value} ~ ${maxCUTotal.value}`
    }
    return `${minCUTotal.value}`
  })
  const brokerCPUCoresDisplay = computed<string>(() => {
    if (minBrokerCPUCores.value !== maxBrokerCPUCores.value) {
      return `${minBrokerCPUCores.value} ~ ${maxBrokerCPUCores.value}`
    }
    return `${minBrokerCPUCores.value}`
  })
  const brokerMemoryDisplay = computed<string>(() => {
    if (minBrokerMemory.value !== maxBrokerMemory.value) {
      return `${minBrokerMemory.value} ~ ${maxBrokerMemory.value}`
    }
    return `${minBrokerMemory.value}`
  })
  const brokerThroughputDisplay = computed<string>(() => {
    if (minBrokerThroughput.value !== maxBrokerThroughput.value) {
      return `${compactPrintResource(minBrokerThroughput.value)} ~ ${compactPrintResource(
        maxBrokerThroughput.value
      )}`
    }
    return `${compactPrintResource(minBrokerThroughput.value)}`
  })
  const totalThroughputDisplay = computed<string>(() => {
    if (minBrokerThroughput.value !== maxBrokerThroughput.value) {
      const min = getTotalThroughput(minBrokerThroughput.value, suTotal.value)
      const max = getTotalThroughput(maxBrokerThroughput.value, suTotal.value)
      return `${min} ~ ${max}`
    }
    return `${getTotalThroughput(minBrokerThroughput.value, suTotal.value)}`
  })
  const totalCUDisplay = computed<string>(() => {
    if (minCUTotal.value !== maxCUTotal.value) {
      return `${minCUTotal.value} ~ ${maxCUTotal.value}`
    }
    return `${minCUTotal.value}`
  })
  const totalSUDisplay = computed<string>(() => {
    return `${suTotal.value}`
  })

  const proxyCUTotal = computed<number | undefined>(() => {
    if (isIstio.value) {
      return undefined
    }
    return maxCUTotal.value
  })
  const proxyCPUCores = computed<number | undefined>(() => {
    if (isIstio.value) {
      return undefined
    }
    return maxBrokerCPUCores.value
  })
  const proxyMemory = computed<number | undefined>(() => {
    if (isIstio.value) {
      return undefined
    }
    return maxBrokerMemory.value
  })
  const zkSUTotal = computed<number | undefined>(() => {
    if (isZookeeperCharged.value) {
      return getSuTotal(3, zookieSuCount.value ?? bookieSUCount.value)
    }
    return undefined
  })
  const zkCPUCores = computed<number | undefined>(() => {
    if (!zkSUTotal.value) {
      return undefined
    }
    return getBookieCPUCores(zkSUTotal.value)
  })
  const zkMemory = computed<number | undefined>(() => {
    if (!zkSUTotal.value) {
      return undefined
    }
    return getBookieMemory(zkSUTotal.value)
  })

  return {
    suTotal,
    minCUTotal,
    maxCUTotal,
    minBrokerThroughput,
    maxBrokerThroughput,
    minBrokerCPUCores,
    maxBrokerCPUCores,
    minBrokerMemory,
    maxBrokerMemory,
    bookieThroughput,
    bookieCPUCores,
    bookieMemory,
    maxTotalThroughput,
    totalThroughputDisplay,
    totalCUDisplay,
    totalSUDisplay,
    cuDisplay,
    brokerCPUCoresDisplay,
    brokerMemoryDisplay,
    brokerThroughputDisplay,
    proxyCUTotal,
    proxyCPUCores,
    proxyMemory,
    zkSUTotal,
    zkCPUCores,
    zkMemory
  }
}

export const isCNPoolMember = (dnsName: string | undefined) => {
  if (dnsName === undefined) {
    return true
  }
  return dnsName.endsWith(CN_POOLMEMBER_DNS)
}

export const init = (initialState: PulsarState) => {
  const { organization } = usePulsarState()
  const { isRbacUpdating } = rbacHelper()

  const valueChanged = async ([org, ab]: [string | undefined, boolean | undefined]) => {
    if (!org) {
      clusterMap.value = {}
      lastOrg = undefined
      return
    }
    if (ab) {
      return
    }

    if (lastOrg !== org) {
      const { canDescribeClusterList } = rbacManager()
      if (canDescribeClusterList()) {
        await getClusterMap({ organization: org })
      }
    }

    lastOrg = org
  }

  watch([organization, isRbacUpdating], valueChanged)

  return valueChanged([initialState.organization, isRbacUpdating.value])
}

const getClusterFromUid = (uid: string) => {
  let cluster = undefined
  for (const instanceName in clusterMap.value) {
    cluster = clusterMap.value[instanceName].find(clu => clu.metadata.uid === uid)
    if (cluster) {
      break
    }
  }
  return cluster
}

const getClusterUidFromName = (name: string) => {
  let cluster = undefined

  for (const instanceName in clusterMap.value) {
    cluster = clusterMap.value[instanceName].find(clu => clu.metadata.name === name)
    if (cluster) {
      break
    }
  }
  return cluster?.metadata.uid
}
export const convertRecurrenceFromMapToString = (recurrence?: Record<string, boolean>) => {
  return Object.entries(recurrence ?? {})
    .filter(([, v]) => v)
    .map(([k]) => {
      switch (k) {
        case 'sunday':
          return '0'
        case 'monday':
          return '1'
        case 'tuesday':
          return '2'
        case 'wednesday':
          return '3'
        case 'thursday':
          return '4'
        case 'friday':
          return '5'
        case 'saturday':
          return '6'
        default:
          return ''
      }
    })
    .join(',')
}

const convertRecurrenceFromStringToMap = (recurrence?: string) => {
  const _recurrence = (recurrence || '0,1,2,3,4,5,6').split(',')
  return {
    sunday: _recurrence.includes('0'),
    monday: _recurrence.includes('1'),
    tuesday: _recurrence.includes('2'),
    wednesday: _recurrence.includes('3'),
    thursday: _recurrence.includes('4'),
    friday: _recurrence.includes('5'),
    saturday: _recurrence.includes('6')
  }
}

const recurrenceShowLabel = (recurrence: string) => {
  return recurrence
    .split(',')
    .filter(val => !!val)
    .map(([k]) => {
      switch (k) {
        case '1':
          return t('common.monday')
        case '2':
          return t('common.tuesday')
        case '3':
          return t('common.wednesday')
        case '4':
          return t('common.thursday')
        case '5':
          return t('common.friday')
        case '6':
          return t('common.saturday')
        case '0':
          return t('common.sunday')
        default:
          return 'Anytime'
      }
    })
    .join(', ')
    .replace(/,(\s*[^,]+)$/, ' and$1')
}

const connectorServerInfo = async () => {
  const { organization } = usePulsarState()
  const { audience } = useInstance()
  if (!audience.value || !organization.value || !activeCluster.value) {
    return undefined
  }

  try {
    const client = ConnectorClientFactory.createConnectorClient(
      organization.value,
      activeCluster.value as Cluster,
      audience.value
    ).client
    const { data } = await client.serverInfo()
    return data
  } catch (e) {
    console.error(e)
    return undefined
  }
}

const readyClusters = computed(() => {
  return Object.values(clusterMap.value)
    .map(clusters => clusters.filter(c => isClusterReadyFn(c)))
    .flat()
})

const availableClusters = computed(() => {
  return Object.values(clusterMap.value)
    .map(clusters => clusters.filter(c => isClusterAvailableFn(c)))
    .flat()
})

// right now, this variable is only used at the users page rbac.  Tenants page will fetch and use tenant info
// on demand not using this case.  Because of how user's rbac page requires fetching tenants info on all tenants
// we are maintaining cache of tenant infor for all tenants and all clusters.  It would be recommended to not use
// this variable and we really need to redesign how the rbac works at the pulsar
const clusterToTenantInfoPromMap = ref<{
  [key: string]:
    | Promise<{
        tenants: string[]
        tenantInfoMap: Record<string, TenantInfo>
        clusterName: string
        instanceName: string
      }>
    | undefined
}>({})

const fetchTenantAdminForCluster = async (
  instance: string,
  clusterUid: string,
  clusterName: string
) => {
  const tenants = (
    await fetchTenantsForSpecificInstance({
      organization: usePulsarState().mustOrganization(),
      instance: instance,
      clusterUid: clusterUid
    })
  ).data

  const tenantInfoMap: Record<string, TenantInfo> = {}
  for (const tenant of tenants) {
    tenantInfoMap[tenant] = (
      await fetchTenantAdminWithSpecificInstance({
        organization: usePulsarState().mustOrganization(),
        clusterUid: clusterUid,
        instance,
        tenant
      })
    ).data
  }
  return { tenants, tenantInfoMap, clusterName, instanceName: instance }
}

const updateClusterToTenantInfoPromMap = () => {
  Object.values(clusterMap.value)
    .flat()
    .forEach(c => {
      if (
        !c.spec?.instanceName ||
        !c.metadata.uid ||
        clusterToTenantInfoPromMap.value[c.metadata.uid]
      ) {
        return
      }
      clusterToTenantInfoPromMap.value[c.metadata.uid] = fetchTenantAdminForCluster(
        c.spec?.instanceName,
        c.metadata.uid,
        c.metadata.name
      )
    })
}

const getAvailableLocations = (
  type: string,
  infrastructure: string,
  isUrsaFeaturePossible: boolean,
  engine: 'classic' | 'ursa' | undefined,
  managedPools: Location[],
  managedProPools: Location[],
  hostedPools: Location[]
) => {
  if (type === 'managed') {
    if (isUrsaFeaturePossible && engine === 'ursa') {
      return managedPools.filter(l => l.pool.cloudType === infrastructure && l.pool.features?.URSA)
    }
    return managedPools.concat(managedProPools).filter(l => l.pool.cloudType === infrastructure)
  }
  if (type === 'managed-pro') {
    if (isUrsaFeaturePossible && engine === 'ursa') {
      return managedProPools.filter(
        l => l.pool.cloudType === infrastructure && l.pool.features?.URSA
      )
    }
    return managedProPools.filter(l => l.pool.cloudType === infrastructure)
  }
  if (type === 'serverless') {
    const serverlessLocation = commonConfig.serverlessLocation
    const serverlessLocationList = serverlessLocation.split(',')
    const locations = hostedPools.filter(l => {
      const cloudType = l?.pool?.cloudType
      const poolNamespace = l?.pool?.poolRef?.namespace
      const poolName = l?.pool?.poolRef?.name
      const id = `${cloudType}:${poolNamespace}:${poolName}:${l.location}`
      if (serverlessLocationList.includes(id) && cloudType === infrastructure) {
        return true
      }
    })
    // override label for serverless
    locations.forEach(location => {
      location.label = location.label.replace('dedicated', 'serverless')
    })
    return locations
  }
  if (isUrsaFeaturePossible && engine === 'ursa') {
    return hostedPools.filter(l => {
      if (l.pool.cloudType === infrastructure && l.pool.features?.URSA) {
        return true
      }
    })
  }
  // for hosted
  return hostedPools.filter(l => l.pool.cloudType === infrastructure)
}

const getManagedAndManagedProAvailableLocations = (
  infrastructure: string,
  isUrsaFeaturePossible: boolean,
  engine: 'classic' | 'ursa' | undefined,
  managedPools: Location[],
  managedProPools: Location[]
) => {
  const managedLocations = getAvailableLocations(
    'managed',
    infrastructure,
    isUrsaFeaturePossible,
    engine,
    managedPools,
    managedProPools,
    []
  )
  const managedProLocations = getAvailableLocations(
    'managed-pro',
    infrastructure,
    isUrsaFeaturePossible,
    engine,
    managedPools,
    managedProPools,
    []
  )
  return managedLocations.concat(managedProLocations)
}

const clusterTitleFn = (cluster?: V1alpha1PulsarCluster) => {
  if (cluster?.spec?.displayName) {
    return `${cluster?.spec?.displayName} (${cluster?.metadata?.name})`
  }
  return `${cluster?.metadata?.name}`
}

const activeClusterTitle = computed(() => {
  return clusterTitleFn(activeCluster.value)
})

export const useCluster = () => {
  return {
    clusters,
    bookieOptions,
    brokerOptions,
    clusterMap,
    clusterNames,
    activeCluster,
    activeClusterName,
    isClusterReadyFn,
    isClusterAvailableFn,
    isClusterReadyByNameFn,
    isClusterAvailableByNameFn,
    isActiveClusterReady,
    isActiveClusterAvailable,
    functionEnabled,
    currentClusterResource,
    init,
    updateClusterConfigByResource,
    deleteCluster,
    deleteClusterByName,
    getClusterMap,
    createClusterByResources,
    isCNPoolMember,
    getClusterMapRaw,
    getClusterFromUid,
    getClusterUidFromName,
    convertRecurrenceFromMapToString,
    recurrenceShowLabel,
    isClusterDeletedByNameFn,
    isClusterDeletedFn,
    firstClusterFn,
    isFirstClusterFn,
    isFirstClusterByNameFn,
    isFirstClusterAvailableFn,
    isFirstClusterReadyFn,
    connectorServerInfo,
    readyClusters,
    availableClusters,
    fetchTenantAdminForCluster,
    clusterToTenantInfoPromMap,
    updateClusterToTenantInfoPromMap,
    ursaEnabled,
    getAvailableLocations,
    getManagedAndManagedProAvailableLocations,
    activeClusterTitle,
    clusterTitleFn,
    isLoadingClusters
  }
}
