Skip to main content

packages/react/src/index.tsx

import {
  createContext,
  useCallback,
  useContext,
  useEffect,
  useRef,
  useState
} from "react"
import type { FC, ReactNode } from "react"
import { useSyncExternalStoreWithSelector } from "use-sync-external-store/shim/with-selector"

import {
  connect as baseConnect,
  MusubiCommandError,
  type ConnectOptions,
  type MountStoreOptions,
  type MountedStore,
  type MusubiConnection,
  type SocketLike,
  type StoreModule,
  type StoreProxy,
  type StoreSnapshot,
  type CommandName,
  type CommandPayload,
  type CommandReply
} from "@musubi/client"

import { shallowEqual } from "./shallow"

export { shallowEqual } from "./shallow"
export { MusubiCommandError, keyOf } from "@musubi/client"

export type {
  AsyncResult,
  CommandName,
  CommandPayload,
  CommandReply,
  ConnectionPatchEnvelope,
  ExternalUploader,
  ExternalUploaderArgs,
  MountStoreOptions,
  MountedStore,
  MusubiCommandErrorKind,
  MusubiCommandErrorOptions,
  MusubiConnection,
  PatchEnvelope,
  StoreId,
  StoreModule,
  StoreProxy,
  StoreSnapshot,
  StreamEntry,
  StreamOp,
  UploadConfig,
  UploadEntry,
  UploadError,
  UploadHandle,
  UploadStatus
} from "@musubi/client"

// ---------------------------------------------------------------------------
// Public types
// ---------------------------------------------------------------------------

export type MusubiRootMount<M extends StoreModule<R>, R> =
  | { status: "loading"; store: null; error: null }
  | { status: "ready"; store: StoreProxy<M, R>; error: null }
  | { status: "error"; store: null; error: Error }

export type UseMusubiRootOptions<
  M extends StoreModule<R>,
  R
> = MountStoreOptions<M, R> & {
  unmountOnCleanup?: boolean
}

export type MusubiConnectionStatus<R> =
  | { state: "connecting"; connection: null }
  | { state: "ready"; connection: MusubiConnection<R> }
  | { state: "error"; connection: null; error: Error }

type ConnectionProviderProps<R> = {
  connection: MusubiConnection<R>
  socket?: never
  topic?: never
  children: ReactNode
}

type SocketProviderProps = {
  socket: SocketLike
  topic?: string
  uploaders?: Record<string, import("@musubi/client").ExternalUploader>
  connection?: never
  children: ReactNode
}

export type MusubiProviderProps<R> =
  | ConnectionProviderProps<R>
  | SocketProviderProps

export interface MusubiFactory<R> {
  connect: (socket: SocketLike, options?: ConnectOptions) => Promise<MusubiConnection<R>>
  MusubiProvider: FC<MusubiProviderProps<R>>
  useMusubiConnection: () => MusubiConnection<R>
  useMusubiConnectionStatus: () => MusubiConnectionStatus<R>
  useMusubiRoot: <M extends StoreModule<R>>(
    options: UseMusubiRootOptions<M, R>
  ) => MusubiRootMount<M, R>
  useMusubiRootSuspense: <M extends StoreModule<R>>(
    options: UseMusubiRootOptions<M, R>
  ) => StoreProxy<M, R>
  useMusubiSnapshot: {
    <M extends StoreModule<R>>(proxy: StoreProxy<M, R>): StoreSnapshot<M, R>
    <M extends StoreModule<R>, Selected>(
      proxy: StoreProxy<M, R>,
      selector: (snapshot: StoreSnapshot<M, R>) => Selected,
      equalityFn?: (a: Selected, b: Selected) => boolean
    ): Selected
  }
  useMusubiCommand: <M extends StoreModule<R>, K extends CommandName<M, R>>(
    proxy: StoreProxy<M, R>,
    name: K
  ) => MusubiCommandResult<M, K, R>
}

export interface MusubiCommandResult<
  M extends StoreModule<R>,
  K extends CommandName<M, R>,
  R
> {
  dispatch: (payload: CommandPayload<M, K, R>) => Promise<CommandReply<M, K, R>>
  isPending: boolean
  error: MusubiCommandError | null
  data: CommandReply<M, K, R> | null
  reset: () => void
}

// ---------------------------------------------------------------------------
// Factory
// ---------------------------------------------------------------------------

/**
 * Returns a Musubi API closed over the registry `R`.
 *
 * Usage:
 *
 *     export const {
 *       connect,
 *       MusubiProvider,
 *       useMusubiRoot,
 *       useMusubiRootSuspense,
 *       useMusubiSnapshot,
 *       useMusubiCommand
 *     } = createMusubi<Musubi.Stores>()
 */
export function createMusubi<R>(): MusubiFactory<R> {
  const StatusContext = createContext<MusubiConnectionStatus<R> | null>(null)

  const MusubiProvider: FC<MusubiProviderProps<R>> = (props) => {
    if (props.connection !== undefined && props.socket !== undefined) {
      throw new Error(
        "<MusubiProvider> accepts either `connection` or `socket`, not both"
      )
    }

    if (props.connection === undefined && props.socket === undefined) {
      throw new Error(
        "<MusubiProvider> requires either `connection` or `socket`"
      )
    }

    if (props.connection !== undefined) {
      return (
        <StatusContext.Provider value={{ state: "ready", connection: props.connection }}>
          {props.children}
        </StatusContext.Provider>
      )
    }

    return <SocketProvider {...props}>{props.children}</SocketProvider>
  }

  const SocketProvider: FC<SocketProviderProps> = ({ socket, topic, uploaders, children }) => {
    const [status, setStatus] = useState<MusubiConnectionStatus<R>>({
      state: "connecting",
      connection: null
    })

    useEffect(() => {
      let cancelled = false
      let liveConnection: MusubiConnection<R> | null = null
      setStatus({ state: "connecting", connection: null })

      const options: ConnectOptions = {}
      if (topic !== undefined) options.topic = topic
      if (uploaders !== undefined) options.uploaders = uploaders

      baseConnect<R>(socket, options)
        .then((connection) => {
          if (cancelled) {
            // Race: parent unmounted (or socket/topic changed) before connect
            // resolved. Tear the freshly opened connection down immediately
            // so we don't leak a live channel.
            void connection.disconnect()
            return
          }

          liveConnection = connection
          setStatus({ state: "ready", connection })
        })
        .catch((cause: unknown) => {
          if (cancelled) return
          const error = cause instanceof Error ? cause : new Error(String(cause))
          setStatus({ state: "error", connection: null, error })
        })

      return () => {
        cancelled = true
        if (liveConnection) {
          const c = liveConnection
          liveConnection = null
          void c.disconnect()
        }
      }
    }, [socket, topic, uploaders])

    return <StatusContext.Provider value={status}>{children}</StatusContext.Provider>
  }

  function useMusubiConnectionStatus(): MusubiConnectionStatus<R> {
    const status = useContext(StatusContext)
    if (!status) {
      throw new Error(
        "useMusubiConnectionStatus must be used inside <MusubiProvider>"
      )
    }
    return status
  }

  function useMusubiConnection(): MusubiConnection<R> {
    const status = useContext(StatusContext)

    if (!status) {
      throw new Error(
        "useMusubiConnection must be used inside <MusubiProvider> (or call useMusubiConnectionStatus() to observe connecting/error states)"
      )
    }

    if (status.state !== "ready") {
      throw new Error(
        `useMusubiConnection requires a ready connection (current state: ${status.state}). Use useMusubiConnectionStatus() to observe connecting/error states.`
      )
    }

    return status.connection
  }

  function connect(
    socket: SocketLike,
    options?: ConnectOptions
  ): Promise<MusubiConnection<R>> {
    return baseConnect<R>(socket, options)
  }

  function useMusubiRoot<M extends StoreModule<R>>(
    options: UseMusubiRootOptions<M, R>
  ): MusubiRootMount<M, R> {
    const connection = useMusubiConnection()
    const [state, setState] = useState<MusubiRootMount<M, R>>({
      status: "loading",
      store: null,
      error: null
    })

    const paramsKey = canonicalStringify(options.params ?? null)

    useEffect(() => {
      let cancelled = false
      const unmountOnCleanup = options.unmountOnCleanup ?? true
      const mountOptions: MountStoreOptions<M, R> = {
        module: options.module,
        id: options.id,
        ...(options.params !== undefined ? { params: options.params } : {})
      }

      setState({ status: "loading", store: null, error: null })

      const sharedMount = ensureRootMount<M, R>(connection, mountOptions)
      bumpMountRef(connection, sharedMount.key)

      sharedMount.promise
        .then((mounted) => {
          if (cancelled) return
          setState({
            status: "ready",
            store: mounted.store as StoreProxy<M, R>,
            error: null
          })
        })
        .catch((error: unknown) => {
          if (!cancelled) {
            setState({
              status: "error",
              store: null,
              error: error instanceof Error ? error : new Error(String(error))
            })
          }
        })

      return () => {
        cancelled = true
        releaseRootMount(connection, sharedMount.key, unmountOnCleanup)
      }
      // paramsKey collapses logically-equal params objects so that two
      // callers passing {a:1,b:2} and {b:2,a:1} share a mount.
    }, [connection, options.module, options.id, paramsKey, options.unmountOnCleanup])

    return state
  }

  function useMusubiRootSuspense<M extends StoreModule<R>>(
    options: UseMusubiRootOptions<M, R>
  ): StoreProxy<M, R> {
    const connection = useMusubiConnection()
    const unmountOnCleanup = options.unmountOnCleanup ?? true

    const mountOptions: MountStoreOptions<M, R> = {
      module: options.module,
      id: options.id,
      ...(options.params !== undefined ? { params: options.params } : {})
    }

    // Render-phase: lookup-or-create. DO NOT bump refs here — that happens
    // in the commit-phase effect below. Suspense may discard this render.
    const sharedMount = ensureRootMount<M, R>(connection, mountOptions)
    const committedRef = useRef(false)

    useEffect(() => {
      // Commit-phase: this render won; take a ref.
      bumpMountRef(connection, sharedMount.key)
      committedRef.current = true
      return () => {
        committedRef.current = false
        releaseRootMount(connection, sharedMount.key, unmountOnCleanup)
      }
      // eslint-disable-next-line react-hooks/exhaustive-deps
    }, [connection, sharedMount.key, unmountOnCleanup])

    // Schedule orphan-cleanup sweep regardless of success/failure: if no
    // commit-phase effect bumps refs by the time the promise settles, tear
    // the mount down so a discarded Suspense render doesn't leak a live
    // root, and clear the failed entry so a retry can run.
    scheduleSuspenseOrphanSweep(connection, sharedMount.key, unmountOnCleanup)

    if (sharedMount.failed) {
      throw sharedMount.error
    }

    if (!sharedMount.settled) {
      throw sharedMount.promise
    }

    return (sharedMount.value as unknown as MountedStore<M, R>).store
  }

  function useMusubiSnapshotImpl<M extends StoreModule<R>, Selected>(
    proxy: StoreProxy<M, R>,
    selector?: (snapshot: StoreSnapshot<M, R>) => Selected,
    equalityFn?: (a: Selected, b: Selected) => boolean
  ): Selected | StoreSnapshot<M, R> {
    const subscribe = useCallback((onChange: () => void) => proxy.subscribe(onChange), [proxy])
    const getSnapshot = useCallback(() => proxy.snapshot(), [proxy])
    const resolvedSelector =
      selector ?? ((value: StoreSnapshot<M, R>) => value as unknown as Selected)
    // Default to shallowEqual when a selector is supplied so callers that
    // return fresh object/tuple literals don't re-render on every patch.
    const resolvedEquality =
      equalityFn ?? (selector ? (shallowEqual as (a: Selected, b: Selected) => boolean) : undefined)

    return useSyncExternalStoreWithSelector(
      subscribe,
      getSnapshot,
      getSnapshot,
      resolvedSelector,
      resolvedEquality
    )
  }

  const useMusubiSnapshot = useMusubiSnapshotImpl as MusubiFactory<R>["useMusubiSnapshot"]

  function useMusubiCommand<M extends StoreModule<R>, K extends CommandName<M, R>>(
    proxy: StoreProxy<M, R>,
    name: K
  ): MusubiCommandResult<M, K, R> {
    type Reply = CommandReply<M, K, R>
    const [state, setState] = useState<{
      isPending: boolean
      error: MusubiCommandError | null
      data: Reply | null
    }>({ isPending: false, error: null, data: null })

    const requestIdRef = useRef(0)
    const mountedRef = useRef(true)

    useEffect(() => {
      mountedRef.current = true
      return () => {
        mountedRef.current = false
      }
    }, [])

    const dispatch = useCallback(
      async (payload: CommandPayload<M, K, R>): Promise<Reply> => {
        const requestId = ++requestIdRef.current
        if (mountedRef.current) {
          setState({ isPending: true, error: null, data: null })
        }

        try {
          const reply = (await proxy.dispatchCommand(name, payload)) as Reply
          if (mountedRef.current && requestId === requestIdRef.current) {
            setState({ isPending: false, error: null, data: reply })
          }
          return reply
        } catch (cause) {
          const error = MusubiCommandError.is(cause)
            ? cause
            : new MusubiCommandError({
                kind: "failed",
                command: String(name),
                storeId: [...proxy.__musubi_store_id__],
                reply: { error: cause instanceof Error ? cause.message : String(cause) },
                cause
              })

          if (mountedRef.current && requestId === requestIdRef.current) {
            setState({ isPending: false, error, data: null })
          }
          throw error
        }
      },
      [proxy, name]
    )

    const reset = useCallback(() => {
      requestIdRef.current += 1
      if (mountedRef.current) {
        setState({ isPending: false, error: null, data: null })
      }
    }, [])

    return { dispatch, isPending: state.isPending, error: state.error, data: state.data, reset }
  }

  return {
    connect,
    MusubiProvider,
    useMusubiConnection,
    useMusubiConnectionStatus,
    useMusubiRoot,
    useMusubiRootSuspense,
    useMusubiSnapshot,
    useMusubiCommand
  }
}

// ---------------------------------------------------------------------------
// Shared mount ref-counting across hook callers
// ---------------------------------------------------------------------------

type SharedRootMount = {
  refs: number
  promise: Promise<MountedStore<never, unknown>>
  settled: boolean
  failed: boolean
  value: MountedStore<never, unknown> | null
  error: Error | null
  cleanupTimer: ReturnType<typeof setTimeout> | null
  orphanSweepScheduled: boolean
}

const pendingRootMounts: WeakMap<
  MusubiConnection<unknown>,
  Map<string, SharedRootMount>
> = new WeakMap()

/**
 * Render-phase safe: returns the existing shared mount entry or creates a new
 * one. Does NOT bump refs (the caller does that on commit). Cancels any
 * pending cleanup timer so a re-mount during the cleanup grace period reuses
 * the existing mount.
 */
function ensureRootMount<M extends StoreModule<R>, R>(
  connection: MusubiConnection<R>,
  options: MountStoreOptions<M, R>
): SharedRootMount & { key: string } {
  const key = rootMountKey(options)
  const mounts = rootMountsFor(connection)
  const existing = mounts.get(key)

  if (existing) {
    if (existing.cleanupTimer) {
      clearTimeout(existing.cleanupTimer)
      existing.cleanupTimer = null
    }
    return Object.assign(existing, { key })
  }

  const shared: SharedRootMount = {
    refs: 0,
    promise: Promise.resolve(null as never),
    settled: false,
    failed: false,
    value: null,
    error: null,
    cleanupTimer: null,
    orphanSweepScheduled: false
  }

  shared.promise = connection
    .mountStore(options)
    .then((mounted) => {
      shared.settled = true
      shared.value = mounted as unknown as MountedStore<never, unknown>
      return mounted as unknown as MountedStore<never, unknown>
    })
    .catch((cause: unknown) => {
      shared.settled = true
      shared.failed = true
      shared.error = cause instanceof Error ? cause : new Error(String(cause))
      // Don't delete here: the failed entry has to stay long enough for
      // Suspense/effect consumers to observe it. releaseRootMount removes
      // the entry once the last ref drops, so future mounts retry cleanly
      // (no poison).
      throw shared.error
    })
  // Swallow the unhandled rejection from the bare promise; callers that
  // .then() / .catch() this still observe the error normally.
  shared.promise.catch(() => undefined)

  mounts.set(key, shared)
  return Object.assign(shared, { key })
}

function bumpMountRef<R>(connection: MusubiConnection<R>, key: string): void {
  const mounts = pendingRootMounts.get(connection as MusubiConnection<unknown>)
  const shared = mounts?.get(key)
  if (!shared) return
  if (shared.cleanupTimer) {
    clearTimeout(shared.cleanupTimer)
    shared.cleanupTimer = null
  }
  shared.refs += 1
}

function releaseRootMount<R>(
  connection: MusubiConnection<R>,
  key: string,
  unmountOnCleanup: boolean
): void {
  const mounts = pendingRootMounts.get(connection as MusubiConnection<unknown>)
  const shared = mounts?.get(key)

  if (!mounts || !shared) {
    return
  }

  shared.refs -= 1

  if (shared.refs > 0) {
    return
  }

  if (!unmountOnCleanup) {
    mounts.delete(key)
    return
  }

  shared.cleanupTimer = setTimeout(() => {
    if (shared.refs > 0) {
      return
    }

    mounts.delete(key)

    if (!shared.failed && shared.value) {
      void shared.value.unmount()
    } else if (!shared.failed) {
      void shared.promise.then((mounted) => mounted.unmount()).catch(() => undefined)
    }
  }, 0)
}

/**
 * Suspense success-with-no-consumer path: if the promise resolves but no
 * commit-phase effect ever bumped refs, the mount is orphaned. Sweep on a
 * microtask after settle. Idempotent per (connection, key).
 */
function scheduleSuspenseOrphanSweep<R>(
  connection: MusubiConnection<R>,
  key: string,
  unmountOnCleanup: boolean
): void {
  const mounts = pendingRootMounts.get(connection as MusubiConnection<unknown>)
  const shared = mounts?.get(key)
  if (!mounts || !shared || shared.orphanSweepScheduled) return
  shared.orphanSweepScheduled = true

  const sweep = () => {
    setTimeout(() => {
      shared.orphanSweepScheduled = false
      if (shared.refs > 0) return
      // No consumer ever committed: drop the entry. On success, also unmount.
      mounts.delete(key)
      if (!shared.failed && shared.value && unmountOnCleanup) {
        void shared.value.unmount()
      }
    }, 0)
  }
  shared.promise.then(sweep, sweep)
}

function rootMountsFor<R>(
  connection: MusubiConnection<R>
): Map<string, SharedRootMount> {
  const key = connection as MusubiConnection<unknown>
  const existing = pendingRootMounts.get(key)

  if (existing) {
    return existing
  }

  const mounts = new Map<string, SharedRootMount>()
  pendingRootMounts.set(key, mounts)
  return mounts
}

function rootMountKey<M extends StoreModule<R>, R>(
  options: MountStoreOptions<M, R>
): string {
  return `${options.id}|${options.module}|${canonicalStringify(options.params ?? null)}`
}

function canonicalStringify(value: unknown): string {
  // Mirror native JSON.stringify semantics for `undefined`:
  // arrays render undefined slots as "null"; objects drop undefined-valued keys.
  if (value === undefined) return "null"
  if (value === null || typeof value !== "object") return JSON.stringify(value)
  if (Array.isArray(value)) return `[${value.map(canonicalStringify).join(",")}]`
  const obj = value as Record<string, unknown>
  const keys = Object.keys(obj)
    .filter((k) => obj[k] !== undefined)
    .sort()
  return `{${keys
    .map((k) => `${JSON.stringify(k)}:${canonicalStringify(obj[k])}`)
    .join(",")}}`
}