Skip to main content

packages/client/src/uploads.ts

// Per-upload reactive handle exposed at `page.<name>` via the proxy.
//
// `UploadHandleImpl` is a stable mutable object whose reference is kept in
// `connection.uploads` for the connection lifetime. Upload ops mutate the
// handle in place; subscribers run after each batch so React (or any other
// adapter) can re-render without losing identity on the handle.

import type {
  ChannelLike,
  PushLike,
  RootConnection
} from "./runtime"
import type {
  EntryStatus,
  ExternalUploaderArgs,
  StoreId,
  UploadConfig,
  UploadEntry,
  UploadError,
  UploadHandle,
  UploadOp,
  UploadStatus
} from "./types"
import { uploadStoreKey } from "./types"

const DEFAULT_CONFIG: UploadConfig = {
  accept: "any",
  maxEntries: 1,
  maxFileSize: 8_000_000,
  chunkSize: 64_000
}

interface InternalEntry {
  ref: string
  clientName: string
  clientSize: number
  clientType: string
  progress: number
  status: EntryStatus
  errors: UploadError[]

  // Channel-mode runtime state (populated by `start()` after select).
  file: File | undefined
  channel: ChannelLike | undefined
  uploader: string | undefined
  meta: unknown
  abortController: AbortController | undefined
}

export class UploadHandleImpl implements UploadHandle {
  readonly storeId: StoreId
  readonly uploadName: string

  // Reactive surface — read by the proxy/snapshot path.
  config: UploadConfig = { ...DEFAULT_CONFIG }
  status: UploadStatus = "idle"
  entries: UploadEntry[] = []
  errors: UploadError[] = []

  // Internal — not exposed on the public surface.
  private internalEntries: Map<string, InternalEntry> = new Map()
  private listeners: Set<() => void> = new Set()
  private connection: RootConnection

  constructor(connection: RootConnection, storeId: StoreId, uploadName: string) {
    this.connection = connection
    this.storeId = storeId
    this.uploadName = uploadName
  }

  // ---- Reactive computed fields ------------------------------------------

  get progress(): number {
    if (this.entries.length === 0) return 0
    const total = this.entries.reduce((sum, e) => sum + e.progress, 0)
    return Math.round(total / this.entries.length)
  }

  get isIdle(): boolean { return this.status === "idle" }
  get isSelecting(): boolean { return this.status === "selecting" }
  get isUploading(): boolean { return this.status === "uploading" }
  get isSuccess(): boolean { return this.status === "success" }
  get isError(): boolean { return this.status === "error" }

  // ---- Subscriptions -----------------------------------------------------

  subscribe(listener: () => void): () => void {
    this.listeners.add(listener)
    return () => { this.listeners.delete(listener) }
  }

  notify(): void {
    for (const listener of this.listeners) listener()
  }

  // ---- Public API --------------------------------------------------------

  async select(files: FileList | File[]): Promise<readonly UploadEntry[]> {
    const arr: File[] = Array.from(files as FileList | File[])

    if (arr.length === 0) return this.entries

    this.status = "selecting"
    this.errors = []
    this.notify()

    const channel = this.connection.channel
    if (!channel) throw new Error("Connection is not open")

    const entriesPayload = arr.map((file, index) => ({
      client_ref: String(index),
      name: file.name,
      size: file.size,
      type: file.type
    }))

    const reply = await pushReceive<{
      ref: string
      config: {
        accept: string[] | "any"
        max_entries: number
        max_file_size: number
        chunk_size: number
      }
      entries: Record<string, {
        type: "channel" | "external"
        entry_ref: string
        token?: string
        uploader?: string
        meta?: unknown
      }>
      errors: { client_ref: string; error: UploadError }[]
    }>(
      channel.push("allow_upload", {
        root_id: this.connection.id,
        store_id: [...this.storeId],
        name: this.uploadName,
        entries: entriesPayload
      }) as PushLike
    )

    this.config = {
      accept: reply.config.accept,
      maxEntries: reply.config.max_entries,
      maxFileSize: reply.config.max_file_size,
      chunkSize: reply.config.chunk_size
    }

    // The server already emits `{op: add}` ops over `upload_ops`; selecting
    // also stashes the matching File and per-entry transport meta on the
    // handle's internal index so `start()` can pick them up.
    for (const [clientRef, accepted] of Object.entries(reply.entries)) {
      const file = arr[Number(clientRef)]
      const existing = this.internalEntries.get(accepted.entry_ref) ?? null

      const merged: InternalEntry = existing ?? {
        ref: accepted.entry_ref,
        clientName: file?.name ?? "",
        clientSize: file?.size ?? 0,
        clientType: file?.type ?? "",
        progress: 0,
        status: "pending",
        errors: [],
        file: undefined,
        channel: undefined,
        uploader: undefined,
        meta: undefined,
        abortController: undefined
      }

      merged.file = file
      merged.uploader = accepted.uploader
      merged.meta = accepted.meta
      this.internalEntries.set(accepted.entry_ref, merged)

      if (accepted.type === "channel" && accepted.token) {
        this.pendingTokens.set(accepted.entry_ref, accepted.token)
      }
    }

    this.errors = reply.errors.map((e) => e.error)
    this.status = reply.errors.length > 0 ? "error" : "selecting"
    this.refreshEntries()
    this.notify()

    return this.entries
  }

  async start(): Promise<void> {
    this.status = "uploading"
    this.notify()

    const channel = this.connection.channel
    if (!channel) throw new Error("Connection is not open")

    const socket = this.connection.connection.socket

    const tasks: Promise<void>[] = []

    for (const entry of this.internalEntries.values()) {
      if (entry.uploader) {
        tasks.push(this.startExternal(entry))
      } else {
        tasks.push(this.startChannel(socket, entry))
      }
    }

    try {
      await Promise.all(tasks)
      const hasError = this.entries.some((e) => e.status === "error")
      this.status = hasError ? "error" : "success"
    } catch {
      this.status = "error"
    }

    this.notify()
  }

  async cancel(entryRef?: string): Promise<void> {
    const channel = this.connection.channel
    if (!channel) return

    const refs = entryRef
      ? [entryRef]
      : Array.from(this.internalEntries.keys())

    for (const ref of refs) {
      const entry = this.internalEntries.get(ref)

      if (entry?.abortController) {
        entry.abortController.abort()
      }

      if (entry?.channel) {
        try { entry.channel.leave() } catch { /* noop */ }
      }

      await pushReceive(
        channel.push("cancel_upload", {
          root_id: this.connection.id,
          store_id: [...this.storeId],
          name: this.uploadName,
          ref
        }) as PushLike
      )
    }
  }

  async reset(): Promise<void> {
    await this.cancel()
    this.internalEntries.clear()
    this.entries = []
    this.errors = []
    this.status = "idle"
    this.notify()
  }

  // ---- Op application (called by runtime) --------------------------------

  applyOps(ops: UploadOp[]): void {
    let touched = false

    for (const op of ops) {
      switch (op.op) {
        case "config":
          this.config = {
            accept: op.config.accept,
            maxEntries: op.config.max_entries,
            maxFileSize: op.config.max_file_size,
            chunkSize: op.config.chunk_size
          }
          touched = true
          break

        case "add": {
          const wire = op.entry
          const next: InternalEntry = this.internalEntries.get(op.ref) ?? {
            ref: op.ref,
            clientName: wire.client_name,
            clientSize: wire.client_size,
            clientType: wire.client_type,
            progress: wire.progress,
            status: wire.status,
            errors: wire.errors ?? [],
            file: undefined,
            channel: undefined,
            uploader: undefined,
            meta: undefined,
            abortController: undefined
          }
          next.progress = wire.progress
          next.status = wire.status
          next.errors = wire.errors ?? []
          this.internalEntries.set(op.ref, next)
          touched = true
          break
        }

        case "progress": {
          const entry = this.internalEntries.get(op.ref)
          if (entry) {
            entry.progress = op.progress
            entry.status = op.progress >= 100 ? "success" : "uploading"
            touched = true
          }
          break
        }

        case "complete": {
          const entry = this.internalEntries.get(op.ref)
          if (entry) {
            entry.progress = 100
            entry.status = "success"
            touched = true
          }
          break
        }

        case "error": {
          if (op.ref) {
            const entry = this.internalEntries.get(op.ref)
            if (entry) {
              entry.status = "error"
              entry.errors = [...entry.errors, op.error]
            }
          } else {
            this.errors = [...this.errors, op.error]
          }
          touched = true
          break
        }

        case "cancel": {
          this.internalEntries.delete(op.ref)
          touched = true
          break
        }

        case "reset": {
          this.internalEntries.clear()
          this.errors = []
          touched = true
          break
        }
      }
    }

    if (touched) {
      this.refreshEntries()
      this.notify()
    }
  }

  // ---- Internals ---------------------------------------------------------

  private pendingTokens: Map<string, string> = new Map()

  private refreshEntries(): void {
    this.entries = Array.from(this.internalEntries.values()).map((e) => projectEntry(e))
  }

  private async startChannel(
    socket: { channel: (topic: string, payload?: object) => ChannelLike },
    entry: InternalEntry
  ): Promise<void> {
    const file = entry.file
    const token = this.pendingTokens.get(entry.ref)
    if (!file || !token) return

    const topic = `musubi_upload:${entry.ref}`
    const channel = socket.channel(topic, { token })
    entry.channel = channel

    try {
      await pushReceive(channel.join() as PushLike)

      const chunkSize = this.config.chunkSize

      // Server completes on the final chunk: when bytes_written reaches
      // client_size, it replies `progress: 100` and stops the channel.
      // No separate "close" event — the reply to the last chunk is the
      // completion signal.
      for (let offset = 0; offset < file.size; offset += chunkSize) {
        const slice = await file.slice(offset, offset + chunkSize).arrayBuffer()
        await pushReceive(channel.push("chunk", slice as ArrayBuffer) as PushLike)
      }
    } finally {
      this.pendingTokens.delete(entry.ref)
    }
  }

  private async startExternal(entry: InternalEntry): Promise<void> {
    if (!entry.file || !entry.uploader) return

    const uploaderName = entry.uploader
    const uploader = this.connection.connection.uploaders?.[uploaderName]

    if (!uploader) {
      throw new Error(`No registered uploader for "${uploaderName}"`)
    }

    const channel = this.connection.channel
    if (!channel) throw new Error("Connection is not open")

    const controller = new AbortController()
    entry.abortController = controller

    const args: ExternalUploaderArgs = {
      entry: projectEntry(entry),
      file: entry.file,
      meta: entry.meta,
      onProgress: (pct: number) => {
        channel.push("upload_progress", {
          root_id: this.connection.id,
          store_id: [...this.storeId],
          name: this.uploadName,
          ref: entry.ref,
          progress: Math.max(0, Math.min(100, Math.round(pct)))
        })
      },
      signal: controller.signal
    }

    try {
      await uploader(args)
      args.onProgress(100)
    } catch (err) {
      // Wire the failure back to the server so the page server can
      // emit `{op: error, code: "external_failed"}` and the matching
      // entry transitions to `:error` (spec/domains/uploads/features/
      // external.feature § "Registered uploader rejects the PUT").
      channel.push("upload_error", {
        root_id: this.connection.id,
        store_id: [...this.storeId],
        name: this.uploadName,
        ref: entry.ref,
        code: "external_failed",
        message: err instanceof Error ? err.message : String(err)
      })

      throw err
    } finally {
      entry.abortController = undefined
    }
  }
}

function projectEntry(entry: InternalEntry): UploadEntry {
  return {
    ref: entry.ref,
    clientName: entry.clientName,
    clientSize: entry.clientSize,
    clientType: entry.clientType,
    progress: entry.progress,
    status: entry.status,
    errors: [...entry.errors],
    get isPending() { return entry.status === "pending" },
    get isUploading() { return entry.status === "uploading" },
    get isSuccess() { return entry.status === "success" },
    get isError() { return entry.status === "error" },
    get isCancelled() { return entry.status === "cancelled" }
  }
}

// ---------------------------------------------------------------------------
// Connection helpers (registry of UploadHandle instances per connection)
// ---------------------------------------------------------------------------

export function getUploadHandle(
  connection: RootConnection,
  storeId: StoreId,
  uploadName: string
): UploadHandleImpl {
  const key = uploadStoreKey(storeId, uploadName)
  const existing = connection.uploads.get(key)

  if (existing) return existing

  const handle = new UploadHandleImpl(connection, storeId, uploadName)
  connection.uploads.set(key, handle)
  return handle
}

export function applyUploadOps(
  connection: RootConnection,
  ops: readonly UploadOp[]
): ReadonlySet<string> {
  const touched = new Set<string>()
  const byHandle = new Map<UploadHandleImpl, UploadOp[]>()

  for (const op of ops) {
    const handle = getUploadHandle(connection, op.store_id, op.upload)
    const list = byHandle.get(handle) ?? []
    list.push(op)
    byHandle.set(handle, list)
    touched.add(`${JSON.stringify(op.store_id)}\0${op.upload}`)
  }

  for (const [handle, handleOps] of byHandle) {
    handle.applyOps(handleOps)
  }

  return touched
}

export function pruneUploads(
  uploads: Map<string, UploadHandleImpl>,
  validStoreIds: ReadonlySet<string>
): void {
  for (const key of Array.from(uploads.keys())) {
    const storeKey = key.split("\0")[0] ?? ""
    if (!validStoreIds.has(storeKey)) {
      uploads.delete(key)
    }
  }
}

export function touchedStoresFromUploadOps(ops: readonly UploadOp[]): ReadonlySet<string> {
  return new Set(ops.map((op) => JSON.stringify(op.store_id)))
}

// ---------------------------------------------------------------------------
// Small helper to await a Phoenix push and produce a Promise
// ---------------------------------------------------------------------------

function pushReceive<T = unknown>(push: PushLike): Promise<T> {
  return new Promise<T>((resolve, reject) => {
    push
      .receive("ok", (reply: unknown) => resolve(reply as T))
      .receive("error", (reply: unknown) => reject(new Error(JSON.stringify(reply))))
      .receive("timeout", () => reject(new Error("timeout")))
  })
}