import type { StoreId, StreamEntry, StreamOp } from "./types"
import { storeIdKey, storeKeyFromStreamStoreKey, streamStoreKey, streamStoreKeyPrefix } from "./types"
export type MaterializedStreamMap = Map<string, readonly StreamEntry<unknown>[]>
export function applyStreamOps(
current: ReadonlyMap<string, readonly StreamEntry<unknown>[]>,
ops: readonly StreamOp[]
): MaterializedStreamMap {
const next = new Map(current)
for (const op of ops) {
const key = streamStoreKey(op.store_id, op.stream)
const entries = [...(next.get(key) ?? [])]
switch (op.op) {
case "reset":
next.set(key, [])
break
case "delete":
next.set(
key,
entries.filter((entry) => entry.itemKey !== op.item_key)
)
break
case "insert":
next.set(key, applyInsert(entries, op))
break
}
}
return next
}
// Stable empty singleton — `useSyncExternalStore` consumers treat a new []
// reference as a state change and re-render forever. Share one empty list.
const EMPTY_STREAM: readonly StreamEntry<unknown>[] = Object.freeze([])
export function getStream<T>(
streams: ReadonlyMap<string, readonly StreamEntry<unknown>[]>,
storeId: StoreId,
streamName: string
): readonly StreamEntry<T>[] {
return (streams.get(streamStoreKey(storeId, streamName)) ?? EMPTY_STREAM) as readonly StreamEntry<T>[]
}
export function pruneStreams(
streams: ReadonlyMap<string, readonly StreamEntry<unknown>[]>,
validStoreIds: ReadonlySet<string>
): MaterializedStreamMap {
const next = new Map<string, readonly StreamEntry<unknown>[]>()
for (const [key, value] of streams) {
const storeKey = storeKeyFromStreamStoreKey(key)
if (validStoreIds.has(storeKey)) {
next.set(key, value)
}
}
return next
}
export function touchedStoreKeys(ops: readonly StreamOp[]): ReadonlySet<string> {
return new Set(ops.map((op) => storeIdKey(op.store_id)))
}
export function hasStreamKeyForStore(
streams: ReadonlyMap<string, readonly StreamEntry<unknown>[]>,
storeId: StoreId
): boolean {
const prefix = streamStoreKeyPrefix(storeId)
for (const key of streams.keys()) {
if (key.startsWith(prefix)) {
return true
}
}
return false
}
function applyInsert(
entries: StreamEntry<unknown>[],
op: Extract<StreamOp, { op: "insert" }>
): readonly StreamEntry<unknown>[] {
const existingIndex = entries.findIndex((entry) => entry.itemKey === op.item_key)
if (existingIndex >= 0) {
entries.splice(existingIndex, 1)
}
const nextEntry: StreamEntry<unknown> = {
itemKey: op.item_key,
item: op.item
}
const insertionIndex = resolveInsertionIndex(op.at, entries.length)
entries.splice(insertionIndex, 0, nextEntry)
return trimEntries(entries, op.limit, op.at)
}
function resolveInsertionIndex(at: number, length: number): number {
if (at <= 0) {
return at === -1 ? length : 0
}
return Math.min(at, length)
}
function trimEntries(
entries: StreamEntry<unknown>[],
limit: number | null,
at: number
): readonly StreamEntry<unknown>[] {
if (limit === null) {
return entries
}
const size = Math.abs(limit)
if (size === 0) {
return []
}
if (entries.length <= size) {
return entries
}
const overflow = entries.length - size
if (at === 0) {
entries.splice(entries.length - overflow, overflow)
return entries
}
entries.splice(0, overflow)
return entries
}