Express CSV Logo

Delivery

ExpressCSV validates imported rows in the browser and delivers them through onData as sequential chunks:

  • POST each chunk to your backend
  • Stage the payload in Postgres
  • Write to your business tables only after the last chunk for that delivery arrives

POST each chunk

import { CSVImporter, x } from "@expresscsv/sdk";

// Share this schema with your API (via @expresscsv/schemas) so both sides agree on row shape.
export const importedUserSchema = x.row({
  name: x.string().label("Full Name"),
  email: x.string().email().label("Email Address"),
});

const importer = new CSVImporter({
  schema: importedUserSchema,
  getSessionToken: async () => fetchSessionToken(),
  importNamespace: "user-import",
});

importer.open({
  // Split large files into chunks so each POST stays small and retryable.
  chunkSize: { unit: "kb", value: 500 },
  onData: async (chunk, next) => {
    // Your API owns persistence. The importer only validates in the browser.
    const response = await fetch("/your-api/import-users/chunks", {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Authorization: `Bearer ${accessToken}`,
      },
      body: JSON.stringify({
        // sessionId: one import session while the iframe is open.
        sessionId: chunk.sessionId,
        // deliveryId: one "Finish" attempt. Retries get a new deliveryId.
        deliveryId: chunk.deliveryId,
        // chunkIndex: 0-based position in this delivery.
        chunkIndex: chunk.chunkIndex,
        // totalChunks: how many chunks this delivery will send.
        totalChunks: chunk.totalChunks,
        // totalRecords: row count across the whole delivery (logging, validation).
        totalRecords: chunk.totalRecords,
        // records: validated rows for this chunk only.
        records: chunk.records,
      }),
    });

    if (!response.ok) {
      // Throwing stops delivery and runs onError on the server cleanup path.
      throw new Error("Backend rejected this import chunk");
    }

    // The importer waits here until you call next().
    // Only call next() after Postgres has committed this chunk to staging.
    next();
  },
  onCancel: async ({ sessionId }) => {
    // User closed the importer. Clean up any partial staging rows.
    await fetch("/your-api/import-users/abort", {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Authorization: `Bearer ${accessToken}`,
      },
      body: JSON.stringify({ sessionId }),
    });
  },
  onError: async (_error, context) => {
    // Chunk POST failed or onData threw. Drop partial staging for this attempt.
    await fetch("/your-api/import-users/abort", {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Authorization: `Bearer ${accessToken}`,
      },
      body: JSON.stringify({
        sessionId: context.sessionId,
        deliveryId: context.deliveryId ?? null,
      }),
    });
  },
});

Each chunk includes:

  • sessionId, deliveryId, chunkIndex, totalChunks, totalRecords, and records (validated rows typed from your schema)
  • Sequential delivery: chunks arrive in order and the importer waits for next() before sending the next one (Backpressure)

Put importedUserSchema in a shared package (for example packages/schemas) and import it in both the browser and the API via @expresscsv/schemas.

Secure your chunk API

The Hono examples below accept sessionId and deliveryId from the request body. In production, treat those as untrusted until you tie them to your app:

  1. Require your normal app auth on POST /your-api/import-users/chunks and POST /your-api/import-users/abort (session cookie, JWT, API key scoped to the tenant, and so on).
  2. When you mint an ExpressCSV session token (Session tokens), persist a mapping such as expresscsvSessionId → yourUserId (or embed a signed claim you verify on every chunk POST).
  3. Reject chunks whose sessionId is not owned by the authenticated caller (403).

Use the same importNamespace in CSVImporter and when you create the session token. A mismatch causes confusing session behavior.

How you persist imported rows is up to you. A recommended pattern:

  1. Stage each chunk in a dedicated table (not your business tables).
  2. Upsert with onConflictDoUpdate on (sessionId, deliveryId, chunkIndex) so chunk retries are safe.
  3. Track delivery status so promoting the last chunk twice (retry) does not insert duplicate business rows.
  4. Promote inside a transaction: read staged chunks in order, insert into your business table, mark the delivery promoted.
  5. Delete staged rows on abort or error when the browser tells you delivery failed or was canceled.

Schema

Keep staging data separate from live tables. A small import_delivery row per deliveryId makes promotion idempotent when the last chunk POST retries.

// src/db/schema.ts
import {
  index,
  integer,
  jsonb,
  pgTable,
  text,
  timestamp,
  unique,
} from "drizzle-orm/pg-core";

// Staging table: temporary storage for chunk payloads.
// Nothing here should trigger emails, webhooks, or other production side effects.
export const importChunkStaging = pgTable(
  "import_chunk_staging",
  {
    // Ties rows to one importer session in the browser.
    sessionId: text("session_id").notNull(),
    // Ties rows to one "Finish" attempt (retries start a new deliveryId).
    deliveryId: text("delivery_id").notNull(),
    // Which slice of the file this row holds (0, 1, 2, ...).
    chunkIndex: integer("chunk_index").notNull(),
    // Same array as chunk.records from onData. We keep it untyped in Postgres.
    records: jsonb("records").notNull(),
    createdAt: timestamp("created_at", { mode: "date" }).defaultNow().notNull(),
  },
  (t) => [
    // If the same chunk POST arrives twice, update the existing row instead of inserting twice.
    unique("import_chunk_staging_session_delivery_index_unique").on(
      t.sessionId,
      t.deliveryId,
      t.chunkIndex,
    ),
    // Speed up "load all chunks for this delivery" during promotion.
    index("import_chunk_staging_delivery_id_idx").on(t.deliveryId),
  ],
);

// One row per deliveryId so we can tell "already promoted" from a retried last chunk.
export const importDeliveries = pgTable(
  "import_delivery",
  {
    deliveryId: text("delivery_id").primaryKey(),
    sessionId: text("session_id").notNull(),
    // staging: chunks still arriving. promoted: copied to app_user. aborted: discarded.
    status: text("status").notNull().default("staging"),
    // Copied from the chunk payload so we know when the last chunk has landed.
    totalChunks: integer("total_chunks"),
    // Optional: total row count for this delivery (from chunk.totalRecords).
    totalRecords: integer("total_records"),
    promotedAt: timestamp("promoted_at", { mode: "date" }),
    createdAt: timestamp("created_at", { mode: "date" }).defaultNow().notNull(),
  },
  (t) => [
    // Speed up abort when the user cancels an entire session.
    index("import_delivery_session_id_idx").on(t.sessionId),
  ],
);

// Your real table. Column names can differ from the importer schema (see mapping in promote).
export const appUsers = pgTable("app_user", {
  id: text("id")
    .primaryKey()
    .$defaultFn(() => crypto.randomUUID()),
  fullName: text("full_name").notNull(),
  workEmail: text("work_email").notNull(),
  createdAt: timestamp("created_at", { mode: "date" }).defaultNow().notNull(),
});
// src/db/index.ts
import { drizzle } from "drizzle-orm/neon-serverless";
import { Pool } from "@neondatabase/serverless";
import * as schema from "./schema";

// Pool reuses connections across serverless invocations (Neon + Drizzle pattern).
const pool = new Pool({ connectionString: process.env.DATABASE_URL! });

// Pass schema so db.query.* is typed and relational helpers know your tables.
export const db = drizzle(pool, { schema });
export type Db = typeof db;
  • sessionId: groups one import run in the browser.
  • deliveryId: one finish attempt. A new id is used if the user presses Finish again after a failure.
  • chunkIndex / totalChunks: position and size of this delivery (each chunk carries both).

If a new deliveryId appears after a retry, older staging rows for the same sessionId may still exist. Delete them in abort handlers or TTL them.

Promote on the last chunk POST (server-authoritative):

  • onComplete is optional and only for client-side UX (toast, analytics, redirect)
  • Do not promote from onComplete: it shares the same best-effort lifecycle as onError and onCancel (Errors)

Checks on every chunk POST

Each chunk body includes sessionId, deliveryId, chunkIndex, and totalChunks (plus optional totalRecords). The importer sends them in order, but your API still needs to enforce delivery state:

  • Networks retry POSTs
  • Users can cancel mid-flight
  • A buggy client could send the wrong index or an early "last" chunk

Run these checks in your chunk handler before you write to staging (see assertChunkMetadata below). They are separate from validating row shape in records (Validate the chunk body).

CheckWhy
chunkIndex < totalChunksReject out-of-range indices (400)
First chunk for a deliveryId sets totalChunks; later chunks for that id must send the same valueStops a client from shrinking totalChunks to promote early
Reject chunk POSTs when import_deliveries.status is promoted or aborted409 Conflict; retries after success or cleanup should not redo work
Store totalRecords on the delivery row when the chunk includes itLogging, progress UI, and sanity checks after promotion

Validate the chunk body

Validate the POST body on the server even though the importer already validated rows in the browser:

  • Networks, proxies, and old clients can send malformed JSON
  • Re-parse with your schema when promoting staged rows to business tables
// src/lib/import-user-delivery.ts
import { z } from "zod";

export const importChunkBodySchema = z.object({
  sessionId: z.string().min(1),
  deliveryId: z.string().min(1),
  chunkIndex: z.number().int().nonnegative(),
  totalChunks: z.number().int().positive(),
  totalRecords: z.number().int().nonnegative().optional(),
  // Opaque here on purpose. We parse with importedUserSchema when promoting.
  records: z.array(z.unknown()),
});

export type ImportChunkBody = z.infer<typeof importChunkBodySchema>;

Stage

Upsert each chunk into staging and record delivery metadata. Reject chunks when the delivery is already promoted or aborted.

// src/lib/import-user-delivery.ts (continued)
import { and, asc, eq } from "drizzle-orm";
import { type Infer } from "@expresscsv/schemas";
import { importedUserSchema } from "@/schemas/user";
import {
  appUsers,
  db,
  importChunkStaging,
  importDeliveries,
} from "@/db";

// Same type the importer used in the browser for chunk.records.
export type ImportedUser = Infer<typeof importedUserSchema>;

export class ImportDeliveryConflictError extends Error {}

export async function assertChunkMetadata(body: ImportChunkBody) {
  const { deliveryId, chunkIndex, totalChunks } = body;

  if (chunkIndex >= totalChunks) {
    throw new Error("chunkIndex must be less than totalChunks");
  }

  const [delivery] = await db
    .select()
    .from(importDeliveries)
    .where(eq(importDeliveries.deliveryId, deliveryId))
    .limit(1);

  if (delivery?.status === "promoted" || delivery?.status === "aborted") {
    throw new ImportDeliveryConflictError(
      "Delivery is no longer accepting chunks",
    );
  }

  if (delivery?.totalChunks != null && delivery.totalChunks !== totalChunks) {
    throw new Error("totalChunks does not match this delivery");
  }
}

export async function upsertStagedChunk(body: ImportChunkBody) {
  const { sessionId, deliveryId, chunkIndex, totalChunks, totalRecords, records } =
    body;

  await assertChunkMetadata(body);

  // First chunk for this deliveryId sets totalChunks. Retries overwrite the same row.
  await db
    .insert(importDeliveries)
    .values({
      deliveryId,
      sessionId,
      totalChunks,
      totalRecords: totalRecords ?? null,
      status: "staging",
    })
    .onConflictDoUpdate({
      target: importDeliveries.deliveryId,
      set: {
        ...(totalRecords != null ? { totalRecords } : {}),
      },
    });

  // Store the payload. A retry of the same chunk overwrites records instead of duplicating.
  await db
    .insert(importChunkStaging)
    .values({ sessionId, deliveryId, chunkIndex, records })
    .onConflictDoUpdate({
      target: [
        importChunkStaging.sessionId,
        importChunkStaging.deliveryId,
        importChunkStaging.chunkIndex,
      ],
      set: { records },
    });
}

Promote

When the last chunk POST succeeds, copy staged rows into your business table in one transaction. Retries on that last POST must not insert twice.

// src/lib/import-user-delivery.ts (continued)
export async function promoteAllStagedChunks({
  sessionId,
  deliveryId,
}: {
  sessionId: string;
  deliveryId: string;
}) {
  // Wrap promotion in a transaction so we never half-insert users or half-delete staging.
  await db.transaction(async (tx) => {
    const [delivery] = await tx
      .select()
      .from(importDeliveries)
      .where(eq(importDeliveries.deliveryId, deliveryId))
      .limit(1);

    // Nothing to do if this delivery was aborted mid-flight.
    if (!delivery || delivery.status === "aborted") {
      return;
    }

    // The last chunk POST can run twice. Do not insert into app_user twice.
    if (delivery.status === "promoted") {
      return;
    }

    // Load every chunk for this delivery. orderBy keeps rows in the same order the importer sent.
    const staged = await tx
      .select()
      .from(importChunkStaging)
      .where(
        and(
          eq(importChunkStaging.sessionId, sessionId),
          eq(importChunkStaging.deliveryId, deliveryId),
        ),
      )
      .orderBy(asc(importChunkStaging.chunkIndex));

    // Flatten chunk JSON into business-table rows.
    // Re-parse each row: browser validation is not a trust boundary for your API.
    const rows = staged.flatMap((chunk) => {
      const records = chunk.records as unknown[];
      return records.map((raw) => {
        const row = importedUserSchema.parse(raw) as ImportedUser;
        return {
          fullName: row.name,
          workEmail: row.email,
        };
      });
    });

    if (rows.length > 0) {
      await tx.insert(appUsers).values(rows);
    }

    // Mark success before deleting staging so a crash does not look "still in progress".
    await tx
      .update(importDeliveries)
      .set({ status: "promoted", promotedAt: new Date() })
      .where(eq(importDeliveries.deliveryId, deliveryId));

    // It's a good idea to clean up staging rows after use.
    await tx
      .delete(importChunkStaging)
      .where(
        and(
          eq(importChunkStaging.sessionId, sessionId),
          eq(importChunkStaging.deliveryId, deliveryId),
        ),
      );
  });
}

Abort

Wire onError and onCancel to POST /your-api/import-users/abort:

  • onError: receives { sessionId, deliveryId } when delivery started; usually includes deliveryId
  • onCancel: session-scoped and may run before any chunk lands; may send only sessionId

Mark the delivery aborted and delete staging rows for the scope you receive.

// src/lib/import-user-delivery.ts (continued)
export async function abortImport({
  sessionId,
  deliveryId,
}: {
  sessionId: string;
  deliveryId?: string | null;
}) {
  // onError usually includes deliveryId. Scope cleanup to that failed attempt.
  if (deliveryId) {
    await db
      .update(importDeliveries)
      .set({ status: "aborted" })
      .where(eq(importDeliveries.deliveryId, deliveryId));

    await db
      .delete(importChunkStaging)
      .where(
        and(
          eq(importChunkStaging.sessionId, sessionId),
          eq(importChunkStaging.deliveryId, deliveryId),
        ),
      );
    return;
  }

  // onCancel is session-wide. User may cancel before any deliveryId exists.
  await db
    .update(importDeliveries)
    .set({ status: "aborted" })
    .where(eq(importDeliveries.sessionId, sessionId));

  await db
    .delete(importChunkStaging)
    .where(eq(importChunkStaging.sessionId, sessionId));
}

Hono routes

Wire the handlers to /your-api/import-users/chunks and /your-api/import-users/abort. Return non-2xx from the chunk route if staging fails so the importer stops and runs onError.

// src/routes/import-users.ts
import { Hono } from "hono";
import { HTTPException } from "hono/http-exception";
import {
  abortImport,
  ImportDeliveryConflictError,
  importChunkBodySchema,
  promoteAllStagedChunks,
  upsertStagedChunk,
} from "@/lib/import-user-delivery";

// Group import routes under one router, then mount at /your-api/import-users.
const importUsers = new Hono();

// Require your app auth on every import route (cookie, JWT, etc.).
importUsers.use("*", async (c, next) => {
  const user = await getAuthenticatedUser(c);
  if (!user) {
    throw new HTTPException(401, { message: "Unauthorized" });
  }
  c.set("user", user);
  await next();
});

importUsers.post("/chunks", async (c) => {
  const parsed = importChunkBodySchema.safeParse(await c.req.json());
  if (!parsed.success) {
    // Bad JSON shape. Return 4xx so the client does not call next().
    throw new HTTPException(400, { message: "Invalid chunk payload" });
  }

  const { sessionId, deliveryId, chunkIndex, totalChunks } = parsed.data;

  // Reject chunks for sessions this user did not start (see Secure your chunk API).
  await assertSessionOwnedByUser(sessionId, c.get("user"));

  try {
    await upsertStagedChunk(parsed.data);

    // Chunks are sequential. When this is the last index, promote the whole delivery.
    if (chunkIndex + 1 === totalChunks) {
      await promoteAllStagedChunks({ sessionId, deliveryId });
    }
  } catch (error) {
    if (error instanceof ImportDeliveryConflictError) {
      throw new HTTPException(409, { message: error.message });
    }
    if (error instanceof Error && error.message.includes("totalChunks")) {
      throw new HTTPException(400, { message: error.message });
    }
    console.error("Failed to stage import chunk", {
      sessionId,
      deliveryId,
      chunkIndex,
      totalChunks,
      error,
    });
    // 5xx makes onData throw so the user sees an error and onError can clean up.
    throw new HTTPException(500, { message: "Failed to save import chunk" });
  }

  return c.json({ ok: true });
});

importUsers.post("/abort", async (c) => {
  const body = await c.req.json<{
    sessionId?: string;
    deliveryId?: string | null;
  }>();

  if (!body.sessionId) {
    throw new HTTPException(400, { message: "sessionId is required" });
  }

  await assertSessionOwnedByUser(body.sessionId, c.get("user"));

  await abortImport({
    sessionId: body.sessionId,
    deliveryId: body.deliveryId,
  });

  return c.json({ ok: true });
});

export { importUsers };
// src/index.ts
import { Hono } from "hono";
import { importUsers } from "./routes/import-users";

const app = new Hono();

// Full paths become /your-api/import-users/chunks and /your-api/import-users/abort.
app.route("/your-api/import-users", importUsers);

export default app;

Backpressure

Chunks are sequential:

  • ExpressCSV waits for next() before sending the next chunk
  • Call next() only after the API has committed this chunk to your import chunk staging table

Errors

  • If onData throws or rejects before next(), delivery stops and onError(error, { sessionId, deliveryId }) runs. Use deliveryId to fail that attempt or delete rows from your import chunk staging table.
  • onError can arrive with only { sessionId } before delivery starts. Make abort tolerate a missing deliveryId.
  • onCancel is session-scoped; the user may cancel before any delivery—clean up with { sessionId }.