Blog
January 12, 2026

Bulk insert CSV to PostgreSQL with Node.js

14 mins read

Bulk insert CSV to PostgreSQL with Node.js

Importing CSV data into PostgreSQL is a common requirement for data migrations, ETL pipelines, and user-uploaded files. Node.js offers several approaches, each with different performance characteristics. Choosing the right method depends on your file size, validation requirements, and error handling needs.

This guide covers three methods: individual INSERTs (simple but slow), batch INSERT with parameterized queries (balanced), and the PostgreSQL COPY command using pg-copy-streams (fastest). You will learn when to use each approach and see complete, working TypeScript examples.

Prerequisites

  • Node.js 18+ (pg supports Node 18.x, 20.x, 22.x, 24.x)
  • PostgreSQL database (local or remote)
  • Basic understanding of SQL and async/await

Why bulk insert performance matters

Individual INSERT statements create significant overhead. Each INSERT requires a round trip to the database, transaction handling, and query parsing. For a 10,000 row CSV file:

MethodApproximate timeMemory usage
Individual INSERT30-60 secondsLow
Batch INSERT (1000 rows)2-5 secondsMedium
COPY command0.5-2 secondsLow (streaming)

The COPY command is PostgreSQL's native bulk loading mechanism, designed specifically for high-throughput data import. It bypasses much of the per-row overhead of INSERT statements.

Setting up the database

First, create a test table in PostgreSQL:

CREATE TABLE users (
  id SERIAL PRIMARY KEY,
  email VARCHAR(255) NOT NULL,
  name VARCHAR(255) NOT NULL,
  company VARCHAR(255),
  created_at TIMESTAMP DEFAULT NOW()
);

And a sample CSV file (users.csv):

email,name,company
alice@example.com,Alice Johnson,Acme Corp
bob@example.com,Bob Smith,TechStart
carol@example.com,Carol Williams,DataFlow Inc

Install dependencies

npm install pg pg-copy-streams
npm install -D @types/pg

The pg package (node-postgres) is the most popular PostgreSQL client for Node.js. The pg-copy-streams package enables streaming data through PostgreSQL's COPY command.

Method 1: Individual INSERTs

The simplest approach inserts one row at a time. This works well for small files or when you need per-row validation and error handling.

// src/individual-insert.ts
import { Pool } from 'pg'
import { createReadStream } from 'fs'
import { parse } from 'csv-parse'

const pool = new Pool({
  connectionString: process.env.DATABASE_URL
})

interface UserRow {
  email: string
  name: string
  company?: string
}

async function importCSVIndividual(filePath: string): Promise<number> {
  const client = await pool.connect()
  let insertedCount = 0

  try {
    await client.query('BEGIN')

    const parser = createReadStream(filePath).pipe(
      parse({
        columns: true,
        skip_empty_lines: true,
        trim: true
      })
    )

    for await (const row of parser) {
      const { email, name, company } = row as UserRow

      await client.query(
        'INSERT INTO users (email, name, company) VALUES ($1, $2, $3)',
        [email, name, company || null]
      )
      insertedCount++
    }

    await client.query('COMMIT')
    return insertedCount
  } catch (error) {
    await client.query('ROLLBACK')
    throw error
  } finally {
    client.release()
  }
}

// Usage
importCSVIndividual('./users.csv')
  .then((count) => console.log(`Inserted ${count} rows`))
  .catch(console.error)
  .finally(() => pool.end())

When to use this approach:

  • Small files (under 1,000 rows)
  • When you need per-row validation or transformation
  • When detailed error reporting per row is required

Limitations:

  • Slow for large files due to round-trip overhead
  • Each INSERT is a separate database operation

Method 2: Batch INSERT with parameterized queries

Batch insertion groups multiple rows into a single INSERT statement, reducing database round trips significantly.

// src/batch-insert.ts
import { Pool, PoolClient } from 'pg'
import { createReadStream } from 'fs'
import { parse } from 'csv-parse'

const pool = new Pool({
  connectionString: process.env.DATABASE_URL
})

interface UserRow {
  email: string
  name: string
  company?: string
}

async function insertBatch(
  client: PoolClient,
  batch: UserRow[]
): Promise<void> {
  if (batch.length === 0) return

  // Build parameterized query for batch insert
  const values: string[] = []
  const params: (string | null)[] = []

  batch.forEach((row, index) => {
    const offset = index * 3
    values.push(`($${offset + 1}, $${offset + 2}, $${offset + 3})`)
    params.push(row.email, row.name, row.company || null)
  })

  const query = `
    INSERT INTO users (email, name, company)
    VALUES ${values.join(', ')}
  `

  await client.query(query, params)
}

async function importCSVBatch(
  filePath: string,
  batchSize: number = 1000
): Promise<number> {
  const client = await pool.connect()
  let insertedCount = 0
  let batch: UserRow[] = []

  try {
    await client.query('BEGIN')

    const parser = createReadStream(filePath).pipe(
      parse({
        columns: true,
        skip_empty_lines: true,
        trim: true
      })
    )

    for await (const row of parser) {
      batch.push(row as UserRow)

      if (batch.length >= batchSize) {
        await insertBatch(client, batch)
        insertedCount += batch.length
        batch = []
      }
    }

    // Insert remaining rows
    if (batch.length > 0) {
      await insertBatch(client, batch)
      insertedCount += batch.length
    }

    await client.query('COMMIT')
    return insertedCount
  } catch (error) {
    await client.query('ROLLBACK')
    throw error
  } finally {
    client.release()
  }
}

// Usage
importCSVBatch('./users.csv', 1000)
  .then((count) => console.log(`Inserted ${count} rows`))
  .catch(console.error)
  .finally(() => pool.end())

When to use this approach:

  • Medium-sized files (1,000 to 100,000 rows)
  • When you need to validate or transform data before insertion
  • When you want balance between performance and control

Limitations:

  • Still slower than COPY for very large files
  • Memory usage increases with batch size
  • Query length limits (PostgreSQL has a max query size)

Method 3: COPY command with pg-copy-streams

The PostgreSQL COPY command is the fastest method for bulk loading data. The pg-copy-streams package lets you stream data directly to PostgreSQL without loading the entire file into memory.

// src/copy-insert.ts
import { Pool } from 'pg'
import { createReadStream } from 'fs'
import { pipeline } from 'stream/promises'
import { from as copyFrom } from 'pg-copy-streams'

const pool = new Pool({
  connectionString: process.env.DATABASE_URL
})

async function importCSVCopy(filePath: string): Promise<number> {
  const client = await pool.connect()

  try {
    await client.query('BEGIN')

    const ingestStream = client.query(
      copyFrom(`
        COPY users (email, name, company)
        FROM STDIN
        WITH (FORMAT csv, HEADER true, NULL '')
      `)
    )

    const sourceStream = createReadStream(filePath)

    await pipeline(sourceStream, ingestStream)

    await client.query('COMMIT')

    return ingestStream.rowCount ?? 0
  } catch (error) {
    await client.query('ROLLBACK')
    throw error
  } finally {
    client.release()
  }
}

// Usage
importCSVCopy('./users.csv')
  .then((count) => console.log(`Imported ${count} rows`))
  .catch(console.error)
  .finally(() => pool.end())

Key COPY options explained:

  • FORMAT csv - Parse input as CSV format
  • HEADER true - First line contains column names (skip it)
  • NULL '' - Treat empty strings as NULL values
  • DELIMITER ',' - Column separator (comma is default for CSV)
  • QUOTE '"' - Quote character for strings containing delimiters

When to use this approach:

  • Large files (10,000+ rows)
  • When data is already validated or comes from a trusted source
  • Maximum insertion speed is critical
  • Memory efficiency matters (streaming, no full file load)

Transaction handling

PostgreSQL isolates transactions to individual clients. When performing bulk operations, you must use the same client instance for all statements within a transaction. From the node-postgres documentation:

"You must use the same client instance for all statements within a transaction. PostgreSQL isolates a transaction to individual clients."

The pattern for transactional bulk operations:

import { Pool } from 'pg'

const pool = new Pool({
  connectionString: process.env.DATABASE_URL
})

async function transactionalImport(): Promise<void> {
  const client = await pool.connect()

  try {
    await client.query('BEGIN')

    // All operations use the same client
    // ...bulk insert operations...

    await client.query('COMMIT')
  } catch (error) {
    await client.query('ROLLBACK')
    throw error
  } finally {
    client.release()
  }
}

With pg-copy-streams, the stream operates within the transaction context. If an error occurs during streaming, calling destroy() on the stream sends a CopyFail message to PostgreSQL, which rolls back the operation. The pipeline() function handles this automatically when it detects an error.

Error recovery patterns

Handling errors during COPY streaming

When using pg-copy-streams, errors can occur during the streaming process. The stream works within a transaction, so partial data is not committed on failure.

import { Pool } from 'pg'
import { createReadStream } from 'fs'
import { pipeline } from 'stream/promises'
import { from as copyFrom } from 'pg-copy-streams'
import { Transform } from 'stream'

const pool = new Pool({
  connectionString: process.env.DATABASE_URL
})

async function importWithErrorHandling(filePath: string): Promise<number> {
  const client = await pool.connect()

  try {
    await client.query('BEGIN')

    const ingestStream = client.query(
      copyFrom(`
        COPY users (email, name, company)
        FROM STDIN
        WITH (FORMAT csv, HEADER true)
      `)
    )

    const sourceStream = createReadStream(filePath)

    // Optional: transform stream for logging or validation
    let rowCount = 0
    const countingStream = new Transform({
      transform(chunk, encoding, callback) {
        rowCount += chunk.toString().split('\n').length - 1
        callback(null, chunk)
      }
    })

    await pipeline(sourceStream, countingStream, ingestStream)

    await client.query('COMMIT')
    console.log(`Successfully imported ${ingestStream.rowCount} rows`)

    return ingestStream.rowCount ?? 0
  } catch (error) {
    await client.query('ROLLBACK')
    console.error('Import failed, transaction rolled back:', error)
    throw error
  } finally {
    client.release()
  }
}

Aborting a COPY operation mid-stream

If you need to abort the COPY operation before it completes (for example, detecting invalid data), call destroy() on the ingest stream:

import { Pool } from 'pg'
import { createReadStream } from 'fs'
import { from as copyFrom } from 'pg-copy-streams'
import { Transform } from 'stream'

const pool = new Pool({
  connectionString: process.env.DATABASE_URL
})

async function importWithValidation(filePath: string): Promise<void> {
  const client = await pool.connect()

  try {
    await client.query('BEGIN')

    const ingestStream = client.query(
      copyFrom(`COPY users (email, name, company) FROM STDIN WITH CSV HEADER`)
    )

    const sourceStream = createReadStream(filePath)

    // Validation transform
    const validator = new Transform({
      transform(chunk, encoding, callback) {
        const content = chunk.toString()

        // Example validation: check for forbidden characters
        if (content.includes('<script>')) {
          // Abort the stream
          callback(new Error('Invalid content detected'))
          return
        }

        callback(null, chunk)
      }
    })

    sourceStream.pipe(validator).pipe(ingestStream)

    await new Promise<void>((resolve, reject) => {
      ingestStream.on('finish', resolve)
      ingestStream.on('error', reject)
      validator.on('error', (err) => {
        // destroy() sends CopyFail to PostgreSQL
        ingestStream.destroy(err)
        reject(err)
      })
    })

    await client.query('COMMIT')
  } catch (error) {
    await client.query('ROLLBACK')
    throw error
  } finally {
    client.release()
  }
}

From the pg-copy-streams documentation:

"If you have not yet finished ingesting data into a copyFrom stream and you want to ask postgresql to abort the process, you can call destroy() on the stream. This will send a CopyFail message to the backend that will rollback the operation."

Recovery after failed COPY operations

If a large COPY operation fails, PostgreSQL may have invisible dead rows occupying disk space. The PostgreSQL documentation notes:

"COPY stops operation at the first error. This should not lead to problems in the event of a COPY TO, but the target table will already have received earlier rows in a COPY FROM. These rows will not be visible or accessible, but they still occupy disk space."

After a failed large COPY, run VACUUM to reclaim space:

VACUUM users;

Or in Node.js:

async function cleanupAfterFailedImport(): Promise<void> {
  const client = await pool.connect()
  try {
    await client.query('VACUUM users')
  } finally {
    client.release()
  }
}

Complete working example

Here is a full implementation combining streaming CSV parsing with pg-copy-streams and proper error handling:

// src/csv-to-postgresql.ts
import { Pool, PoolClient } from 'pg'
import { createReadStream, statSync } from 'fs'
import { pipeline } from 'stream/promises'
import { from as copyFrom, CopyStreamQuery } from 'pg-copy-streams'

interface ImportResult {
  success: boolean
  rowCount: number
  error?: string
}

interface ImportOptions {
  tableName: string
  columns: string[]
  delimiter?: string
  nullValue?: string
  encoding?: string
}

const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
  // Connection pool settings for bulk operations
  max: 10,
  idleTimeoutMillis: 30000,
  connectionTimeoutMillis: 5000
})

function buildCopyQuery(options: ImportOptions): string {
  const { tableName, columns, delimiter = ',', nullValue = '', encoding = 'UTF8' } = options

  const columnList = columns.join(', ')

  return `
    COPY ${tableName} (${columnList})
    FROM STDIN
    WITH (
      FORMAT csv,
      HEADER true,
      DELIMITER '${delimiter}',
      NULL '${nullValue}',
      ENCODING '${encoding}'
    )
  `
}

async function importCSV(
  filePath: string,
  options: ImportOptions
): Promise<ImportResult> {
  // Verify file exists
  try {
    statSync(filePath)
  } catch {
    return {
      success: false,
      rowCount: 0,
      error: `File not found: ${filePath}`
    }
  }

  const client = await pool.connect()

  try {
    await client.query('BEGIN')

    const copyQuery = buildCopyQuery(options)
    const ingestStream: CopyStreamQuery = client.query(copyFrom(copyQuery))
    const sourceStream = createReadStream(filePath, { encoding: 'utf8' })

    await pipeline(sourceStream, ingestStream)

    await client.query('COMMIT')

    return {
      success: true,
      rowCount: ingestStream.rowCount ?? 0
    }
  } catch (error) {
    await client.query('ROLLBACK')

    const message = error instanceof Error ? error.message : 'Unknown error'
    return {
      success: false,
      rowCount: 0,
      error: message
    }
  } finally {
    client.release()
  }
}

// Main execution
async function main(): Promise<void> {
  const result = await importCSV('./users.csv', {
    tableName: 'users',
    columns: ['email', 'name', 'company']
  })

  if (result.success) {
    console.log(`Import completed: ${result.rowCount} rows inserted`)
  } else {
    console.error(`Import failed: ${result.error}`)

    // Clean up dead tuples after failed import
    const client = await pool.connect()
    try {
      await client.query('VACUUM users')
      console.log('Vacuum completed')
    } finally {
      client.release()
    }
  }

  await pool.end()
}

main().catch(console.error)

Troubleshooting

Async/await in stream event handlers does not work

Using await inside .on('data') handlers does not pause the stream. This causes race conditions and missed data.

Problem:

// This does not work as expected
stream.on('data', async (row) => {
  await client.query('INSERT INTO users ...', row)  // Won't wait
})

Solution: Use pipeline() or for await...of loops instead:

// Correct approach with for await
for await (const row of parser) {
  await client.query('INSERT INTO users ...', row)
}

// Or use pipeline for streaming
await pipeline(sourceStream, transformStream, destinationStream)

Memory issues with large files

Loading an entire CSV into memory before inserting causes out-of-memory errors on large files.

Solution: Stream the data directly to PostgreSQL:

// Stream directly - no memory buildup
const ingestStream = client.query(
  copyFrom('COPY users FROM STDIN WITH CSV HEADER')
)
createReadStream('large_file.csv').pipe(ingestStream)

Column mismatch errors

If your CSV columns do not match the table columns, explicitly specify which columns to import:

// Explicitly list columns to import
copyFrom('COPY users (email, name, company) FROM STDIN WITH CSV HEADER')

Encoding issues with special characters

Special characters may be corrupted if encoding is not specified:

copyFrom(`
  COPY users FROM STDIN
  WITH (FORMAT csv, HEADER true, ENCODING 'UTF8')
`)

Connection pool exhaustion

Long-running imports can exhaust the connection pool. Use dedicated clients for bulk operations:

const client = await pool.connect()
try {
  // Long-running import
} finally {
  client.release()  // Always release
}

Statement timeout on large datasets

For very large imports, the default statement timeout may be too short:

await client.query('SET statement_timeout = 0')  // No timeout
// Perform import
await client.query('RESET statement_timeout')

Performance tips

  1. Disable indexes during import: Drop non-essential indexes before bulk import, recreate after
  2. Disable triggers: Use ALTER TABLE users DISABLE TRIGGER ALL for faster imports
  3. Increase work_mem: Temporarily increase PostgreSQL's work_mem for sorting
  4. Use unlogged tables: For temporary import tables, CREATE UNLOGGED TABLE is faster
  5. Batch size tuning: For batch INSERT, 1000-5000 rows per batch is typically optimal

Next steps

  • Add data validation before streaming to PostgreSQL
  • Implement progress reporting for large file imports
  • Create retry logic for transient failures
  • Build a queue system for processing multiple imports

Simplifying CSV imports in your application

When building user-facing CSV import features, handling the frontend presents additional challenges: file selection, column mapping when headers do not match your schema, validation feedback, and progress indication.

ImportCSV is an open-source React component that handles these frontend concerns. It parses CSV and Excel files in the browser, provides a visual column mapping interface, and validates data before sending it to your backend.

import { CSVImporter } from '@importcsv/react'

<CSVImporter
  onComplete={async (data) => {
    // data.rows contains validated, mapped records
    // Send to your Node.js backend for PostgreSQL insertion
    await fetch('/api/import', {
      method: 'POST',
      body: JSON.stringify(data.rows)
    })
  }}
  columns={[
    { label: 'Email', key: 'email', required: true },
    { label: 'Name', key: 'name', required: true },
    { label: 'Company', key: 'company' }
  ]}
/>

Combine ImportCSV on the frontend with the pg-copy-streams patterns from this guide on the backend for a complete CSV import pipeline.

Wrap-up

CSV imports shouldn't slow you down. ImportCSV aims to expand into your workflow — whether you're building data import flows, handling customer uploads, or processing large datasets.

If that sounds like the kind of tooling you want to use, try ImportCSV .