Bulk insert CSV to PostgreSQL with Node.js

Importing CSV data into PostgreSQL is a common requirement for data migrations, ETL pipelines, and user-uploaded files. Node.js offers several approaches, each with different performance characteristics. Choosing the right method depends on your file size, validation requirements, and error handling needs.
This guide covers three methods: individual INSERTs (simple but slow), batch INSERT with parameterized queries (balanced), and the PostgreSQL COPY command using pg-copy-streams (fastest). You will learn when to use each approach and see complete, working TypeScript examples.
Prerequisites
- Node.js 18+ (pg supports Node 18.x, 20.x, 22.x, 24.x)
- PostgreSQL database (local or remote)
- Basic understanding of SQL and async/await
Why bulk insert performance matters
Individual INSERT statements create significant overhead. Each INSERT requires a round trip to the database, transaction handling, and query parsing. For a 10,000 row CSV file:
| Method | Approximate time | Memory usage |
|---|---|---|
| Individual INSERT | 30-60 seconds | Low |
| Batch INSERT (1000 rows) | 2-5 seconds | Medium |
| COPY command | 0.5-2 seconds | Low (streaming) |
The COPY command is PostgreSQL's native bulk loading mechanism, designed specifically for high-throughput data import. It bypasses much of the per-row overhead of INSERT statements.
Setting up the database
First, create a test table in PostgreSQL:
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) NOT NULL,
name VARCHAR(255) NOT NULL,
company VARCHAR(255),
created_at TIMESTAMP DEFAULT NOW()
);
And a sample CSV file (users.csv):
email,name,company
alice@example.com,Alice Johnson,Acme Corp
bob@example.com,Bob Smith,TechStart
carol@example.com,Carol Williams,DataFlow Inc
Install dependencies
npm install pg pg-copy-streams
npm install -D @types/pgThe pg package (node-postgres) is the most popular PostgreSQL client for Node.js. The pg-copy-streams package enables streaming data through PostgreSQL's COPY command.
Method 1: Individual INSERTs
The simplest approach inserts one row at a time. This works well for small files or when you need per-row validation and error handling.
// src/individual-insert.ts
import { Pool } from 'pg'
import { createReadStream } from 'fs'
import { parse } from 'csv-parse'
const pool = new Pool({
connectionString: process.env.DATABASE_URL
})
interface UserRow {
email: string
name: string
company?: string
}
async function importCSVIndividual(filePath: string): Promise<number> {
const client = await pool.connect()
let insertedCount = 0
try {
await client.query('BEGIN')
const parser = createReadStream(filePath).pipe(
parse({
columns: true,
skip_empty_lines: true,
trim: true
})
)
for await (const row of parser) {
const { email, name, company } = row as UserRow
await client.query(
'INSERT INTO users (email, name, company) VALUES ($1, $2, $3)',
[email, name, company || null]
)
insertedCount++
}
await client.query('COMMIT')
return insertedCount
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
// Usage
importCSVIndividual('./users.csv')
.then((count) => console.log(`Inserted ${count} rows`))
.catch(console.error)
.finally(() => pool.end())When to use this approach:
- Small files (under 1,000 rows)
- When you need per-row validation or transformation
- When detailed error reporting per row is required
Limitations:
- Slow for large files due to round-trip overhead
- Each INSERT is a separate database operation
Method 2: Batch INSERT with parameterized queries
Batch insertion groups multiple rows into a single INSERT statement, reducing database round trips significantly.
// src/batch-insert.ts
import { Pool, PoolClient } from 'pg'
import { createReadStream } from 'fs'
import { parse } from 'csv-parse'
const pool = new Pool({
connectionString: process.env.DATABASE_URL
})
interface UserRow {
email: string
name: string
company?: string
}
async function insertBatch(
client: PoolClient,
batch: UserRow[]
): Promise<void> {
if (batch.length === 0) return
// Build parameterized query for batch insert
const values: string[] = []
const params: (string | null)[] = []
batch.forEach((row, index) => {
const offset = index * 3
values.push(`($${offset + 1}, $${offset + 2}, $${offset + 3})`)
params.push(row.email, row.name, row.company || null)
})
const query = `
INSERT INTO users (email, name, company)
VALUES ${values.join(', ')}
`
await client.query(query, params)
}
async function importCSVBatch(
filePath: string,
batchSize: number = 1000
): Promise<number> {
const client = await pool.connect()
let insertedCount = 0
let batch: UserRow[] = []
try {
await client.query('BEGIN')
const parser = createReadStream(filePath).pipe(
parse({
columns: true,
skip_empty_lines: true,
trim: true
})
)
for await (const row of parser) {
batch.push(row as UserRow)
if (batch.length >= batchSize) {
await insertBatch(client, batch)
insertedCount += batch.length
batch = []
}
}
// Insert remaining rows
if (batch.length > 0) {
await insertBatch(client, batch)
insertedCount += batch.length
}
await client.query('COMMIT')
return insertedCount
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
// Usage
importCSVBatch('./users.csv', 1000)
.then((count) => console.log(`Inserted ${count} rows`))
.catch(console.error)
.finally(() => pool.end())When to use this approach:
- Medium-sized files (1,000 to 100,000 rows)
- When you need to validate or transform data before insertion
- When you want balance between performance and control
Limitations:
- Still slower than COPY for very large files
- Memory usage increases with batch size
- Query length limits (PostgreSQL has a max query size)
Method 3: COPY command with pg-copy-streams
The PostgreSQL COPY command is the fastest method for bulk loading data. The pg-copy-streams package lets you stream data directly to PostgreSQL without loading the entire file into memory.
// src/copy-insert.ts
import { Pool } from 'pg'
import { createReadStream } from 'fs'
import { pipeline } from 'stream/promises'
import { from as copyFrom } from 'pg-copy-streams'
const pool = new Pool({
connectionString: process.env.DATABASE_URL
})
async function importCSVCopy(filePath: string): Promise<number> {
const client = await pool.connect()
try {
await client.query('BEGIN')
const ingestStream = client.query(
copyFrom(`
COPY users (email, name, company)
FROM STDIN
WITH (FORMAT csv, HEADER true, NULL '')
`)
)
const sourceStream = createReadStream(filePath)
await pipeline(sourceStream, ingestStream)
await client.query('COMMIT')
return ingestStream.rowCount ?? 0
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
// Usage
importCSVCopy('./users.csv')
.then((count) => console.log(`Imported ${count} rows`))
.catch(console.error)
.finally(() => pool.end())Key COPY options explained:
FORMAT csv- Parse input as CSV formatHEADER true- First line contains column names (skip it)NULL ''- Treat empty strings as NULL valuesDELIMITER ','- Column separator (comma is default for CSV)QUOTE '"'- Quote character for strings containing delimiters
When to use this approach:
- Large files (10,000+ rows)
- When data is already validated or comes from a trusted source
- Maximum insertion speed is critical
- Memory efficiency matters (streaming, no full file load)
Transaction handling
PostgreSQL isolates transactions to individual clients. When performing bulk operations, you must use the same client instance for all statements within a transaction. From the node-postgres documentation:
"You must use the same client instance for all statements within a transaction. PostgreSQL isolates a transaction to individual clients."
The pattern for transactional bulk operations:
import { Pool } from 'pg'
const pool = new Pool({
connectionString: process.env.DATABASE_URL
})
async function transactionalImport(): Promise<void> {
const client = await pool.connect()
try {
await client.query('BEGIN')
// All operations use the same client
// ...bulk insert operations...
await client.query('COMMIT')
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}With pg-copy-streams, the stream operates within the transaction context. If an error occurs during streaming, calling destroy() on the stream sends a CopyFail message to PostgreSQL, which rolls back the operation. The pipeline() function handles this automatically when it detects an error.
Error recovery patterns
Handling errors during COPY streaming
When using pg-copy-streams, errors can occur during the streaming process. The stream works within a transaction, so partial data is not committed on failure.
import { Pool } from 'pg'
import { createReadStream } from 'fs'
import { pipeline } from 'stream/promises'
import { from as copyFrom } from 'pg-copy-streams'
import { Transform } from 'stream'
const pool = new Pool({
connectionString: process.env.DATABASE_URL
})
async function importWithErrorHandling(filePath: string): Promise<number> {
const client = await pool.connect()
try {
await client.query('BEGIN')
const ingestStream = client.query(
copyFrom(`
COPY users (email, name, company)
FROM STDIN
WITH (FORMAT csv, HEADER true)
`)
)
const sourceStream = createReadStream(filePath)
// Optional: transform stream for logging or validation
let rowCount = 0
const countingStream = new Transform({
transform(chunk, encoding, callback) {
rowCount += chunk.toString().split('\n').length - 1
callback(null, chunk)
}
})
await pipeline(sourceStream, countingStream, ingestStream)
await client.query('COMMIT')
console.log(`Successfully imported ${ingestStream.rowCount} rows`)
return ingestStream.rowCount ?? 0
} catch (error) {
await client.query('ROLLBACK')
console.error('Import failed, transaction rolled back:', error)
throw error
} finally {
client.release()
}
}Aborting a COPY operation mid-stream
If you need to abort the COPY operation before it completes (for example, detecting invalid data), call destroy() on the ingest stream:
import { Pool } from 'pg'
import { createReadStream } from 'fs'
import { from as copyFrom } from 'pg-copy-streams'
import { Transform } from 'stream'
const pool = new Pool({
connectionString: process.env.DATABASE_URL
})
async function importWithValidation(filePath: string): Promise<void> {
const client = await pool.connect()
try {
await client.query('BEGIN')
const ingestStream = client.query(
copyFrom(`COPY users (email, name, company) FROM STDIN WITH CSV HEADER`)
)
const sourceStream = createReadStream(filePath)
// Validation transform
const validator = new Transform({
transform(chunk, encoding, callback) {
const content = chunk.toString()
// Example validation: check for forbidden characters
if (content.includes('<script>')) {
// Abort the stream
callback(new Error('Invalid content detected'))
return
}
callback(null, chunk)
}
})
sourceStream.pipe(validator).pipe(ingestStream)
await new Promise<void>((resolve, reject) => {
ingestStream.on('finish', resolve)
ingestStream.on('error', reject)
validator.on('error', (err) => {
// destroy() sends CopyFail to PostgreSQL
ingestStream.destroy(err)
reject(err)
})
})
await client.query('COMMIT')
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}From the pg-copy-streams documentation:
"If you have not yet finished ingesting data into a copyFrom stream and you want to ask postgresql to abort the process, you can call
destroy()on the stream. This will send a CopyFail message to the backend that will rollback the operation."
Recovery after failed COPY operations
If a large COPY operation fails, PostgreSQL may have invisible dead rows occupying disk space. The PostgreSQL documentation notes:
"COPY stops operation at the first error. This should not lead to problems in the event of a COPY TO, but the target table will already have received earlier rows in a COPY FROM. These rows will not be visible or accessible, but they still occupy disk space."
After a failed large COPY, run VACUUM to reclaim space:
VACUUM users;
Or in Node.js:
async function cleanupAfterFailedImport(): Promise<void> {
const client = await pool.connect()
try {
await client.query('VACUUM users')
} finally {
client.release()
}
}Complete working example
Here is a full implementation combining streaming CSV parsing with pg-copy-streams and proper error handling:
// src/csv-to-postgresql.ts
import { Pool, PoolClient } from 'pg'
import { createReadStream, statSync } from 'fs'
import { pipeline } from 'stream/promises'
import { from as copyFrom, CopyStreamQuery } from 'pg-copy-streams'
interface ImportResult {
success: boolean
rowCount: number
error?: string
}
interface ImportOptions {
tableName: string
columns: string[]
delimiter?: string
nullValue?: string
encoding?: string
}
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
// Connection pool settings for bulk operations
max: 10,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000
})
function buildCopyQuery(options: ImportOptions): string {
const { tableName, columns, delimiter = ',', nullValue = '', encoding = 'UTF8' } = options
const columnList = columns.join(', ')
return `
COPY ${tableName} (${columnList})
FROM STDIN
WITH (
FORMAT csv,
HEADER true,
DELIMITER '${delimiter}',
NULL '${nullValue}',
ENCODING '${encoding}'
)
`
}
async function importCSV(
filePath: string,
options: ImportOptions
): Promise<ImportResult> {
// Verify file exists
try {
statSync(filePath)
} catch {
return {
success: false,
rowCount: 0,
error: `File not found: ${filePath}`
}
}
const client = await pool.connect()
try {
await client.query('BEGIN')
const copyQuery = buildCopyQuery(options)
const ingestStream: CopyStreamQuery = client.query(copyFrom(copyQuery))
const sourceStream = createReadStream(filePath, { encoding: 'utf8' })
await pipeline(sourceStream, ingestStream)
await client.query('COMMIT')
return {
success: true,
rowCount: ingestStream.rowCount ?? 0
}
} catch (error) {
await client.query('ROLLBACK')
const message = error instanceof Error ? error.message : 'Unknown error'
return {
success: false,
rowCount: 0,
error: message
}
} finally {
client.release()
}
}
// Main execution
async function main(): Promise<void> {
const result = await importCSV('./users.csv', {
tableName: 'users',
columns: ['email', 'name', 'company']
})
if (result.success) {
console.log(`Import completed: ${result.rowCount} rows inserted`)
} else {
console.error(`Import failed: ${result.error}`)
// Clean up dead tuples after failed import
const client = await pool.connect()
try {
await client.query('VACUUM users')
console.log('Vacuum completed')
} finally {
client.release()
}
}
await pool.end()
}
main().catch(console.error)Troubleshooting
Async/await in stream event handlers does not work
Using await inside .on('data') handlers does not pause the stream. This causes race conditions and missed data.
Problem:
// This does not work as expected
stream.on('data', async (row) => {
await client.query('INSERT INTO users ...', row) // Won't wait
})Solution: Use pipeline() or for await...of loops instead:
// Correct approach with for await
for await (const row of parser) {
await client.query('INSERT INTO users ...', row)
}
// Or use pipeline for streaming
await pipeline(sourceStream, transformStream, destinationStream)Memory issues with large files
Loading an entire CSV into memory before inserting causes out-of-memory errors on large files.
Solution: Stream the data directly to PostgreSQL:
// Stream directly - no memory buildup
const ingestStream = client.query(
copyFrom('COPY users FROM STDIN WITH CSV HEADER')
)
createReadStream('large_file.csv').pipe(ingestStream)Column mismatch errors
If your CSV columns do not match the table columns, explicitly specify which columns to import:
// Explicitly list columns to import
copyFrom('COPY users (email, name, company) FROM STDIN WITH CSV HEADER')Encoding issues with special characters
Special characters may be corrupted if encoding is not specified:
copyFrom(`
COPY users FROM STDIN
WITH (FORMAT csv, HEADER true, ENCODING 'UTF8')
`)Connection pool exhaustion
Long-running imports can exhaust the connection pool. Use dedicated clients for bulk operations:
const client = await pool.connect()
try {
// Long-running import
} finally {
client.release() // Always release
}Statement timeout on large datasets
For very large imports, the default statement timeout may be too short:
await client.query('SET statement_timeout = 0') // No timeout
// Perform import
await client.query('RESET statement_timeout')Performance tips
- Disable indexes during import: Drop non-essential indexes before bulk import, recreate after
- Disable triggers: Use
ALTER TABLE users DISABLE TRIGGER ALLfor faster imports - Increase work_mem: Temporarily increase PostgreSQL's work_mem for sorting
- Use unlogged tables: For temporary import tables,
CREATE UNLOGGED TABLEis faster - Batch size tuning: For batch INSERT, 1000-5000 rows per batch is typically optimal
Next steps
- Add data validation before streaming to PostgreSQL
- Implement progress reporting for large file imports
- Create retry logic for transient failures
- Build a queue system for processing multiple imports
Simplifying CSV imports in your application
When building user-facing CSV import features, handling the frontend presents additional challenges: file selection, column mapping when headers do not match your schema, validation feedback, and progress indication.
ImportCSV is an open-source React component that handles these frontend concerns. It parses CSV and Excel files in the browser, provides a visual column mapping interface, and validates data before sending it to your backend.
import { CSVImporter } from '@importcsv/react'
<CSVImporter
onComplete={async (data) => {
// data.rows contains validated, mapped records
// Send to your Node.js backend for PostgreSQL insertion
await fetch('/api/import', {
method: 'POST',
body: JSON.stringify(data.rows)
})
}}
columns={[
{ label: 'Email', key: 'email', required: true },
{ label: 'Name', key: 'name', required: true },
{ label: 'Company', key: 'company' }
]}
/>Combine ImportCSV on the frontend with the pg-copy-streams patterns from this guide on the backend for a complete CSV import pipeline.
Related posts
Wrap-up
CSV imports shouldn't slow you down. ImportCSV aims to expand into your workflow — whether you're building data import flows, handling customer uploads, or processing large datasets.
If that sounds like the kind of tooling you want to use, try ImportCSV .