PostgreSQL Sync
Sync your PostgreSQL tables to AACsearch Engine in real-time using pg_notify, polling, or logical replication.
PostgreSQL Sync Connector
The PostgreSQL Sync Connector keeps your AACsearch indexes in sync with your PostgreSQL database tables. It supports three deployment strategies.
Strategy 1: pg_notify (recommended)
Low-latency trigger-based sync using PostgreSQL's built-in LISTEN/NOTIFY.
Setup
First, create a trigger function and attach it to your table:
CREATE OR REPLACE FUNCTION notify_aacsearch()
RETURNS trigger AS $$ BEGIN
PERFORM pg_notify('aacsearch_sync', row_to_json(NEW)::text);
RETURN NEW;
END; $$ LANGUAGE plpgsql;
CREATE TRIGGER products_aacsearch_trigger
AFTER INSERT OR UPDATE ON products
FOR EACH ROW EXECUTE FUNCTION notify_aacsearch();Connect
import { startPgNotifyListener } from "@aacsearch/postgres-sync";
const pg = await startPgNotifyListener({
aacsearch: {
baseUrl: process.env.AACSEARCH_URL!,
token: process.env.AACSEARCH_TOKEN!,
projectId: process.env.AACSEARCH_PROJECT_ID!,
},
connectionString: process.env.DATABASE_URL!,
table: "products",
idColumn: "id",
initialFullSync: true,
debug: true,
});
process.on("SIGTERM", () => pg.end());The listener will:
- Perform an initial full sync of all existing rows
- Subscribe to
pg_notifyevents for real-time updates - Push each change to AACsearch as it arrives
Strategy 2: Polling cursor
Polling is useful when you cannot add triggers to your database tables.
import { startPollingSync } from "@aacsearch/postgres-sync";
const poller = startPollingSync({
aacsearch: { baseUrl: process.env.AACSEARCH_URL!, token, projectId },
connectionString: process.env.DATABASE_URL!,
table: "orders",
cursorColumn: "updated_at",
pollIntervalMs: 5000,
});
// Stop later:
// poller.stop();The poller queries SELECT * FROM table WHERE cursorColumn > $1 ORDER BY cursorColumn
at the configured interval and pushes new/updated rows to AACsearch.
Strategy 3: Sequin CDC
For production-grade logical replication without triggers, use Sequin to stream CDC events:
import { startSequinCdcSync } from "@aacsearch/postgres-sync";
const sequin = startSequinCdcSync({
aacsearch: { baseUrl: process.env.AACSEARCH_URL!, token, projectId },
streamUrl: "https://api.sequin.io/streams/YOUR_STREAM_ID",
accessToken: "seq_xxx",
table: "products",
});Custom document mapping
All strategies accept a custom mapper function:
startPgNotifyListener(config, (row) => ({
external_id: String(row.id),
title: row.title,
content: row.body || row.description,
price: row.price,
category: row.category_name,
tags: row.tags ? JSON.parse(row.tags) : [],
}));Callbacks
Monitor sync activity with callbacks:
startPgNotifyListener(config, undefined, {
onSync: (event) => {
console.log(`Synced ${event.action} on ${event.table}`);
},
onError: (error, context) => {
console.error(`Error on ${context?.table}:`, error);
},
onConnected: () => console.log("Connected to PostgreSQL"),
onDisconnected: () => console.log("Disconnected from PostgreSQL"),
});Strategy comparison
| Strategy | Latency | Requires trigger | Overhead | Best for |
|---|---|---|---|---|
| pg_notify | ~100ms | Yes | Minimal (NOTIFY is async) | Production tables you control |
| Polling cursor | ~5-60s | No | SELECT per interval | Tables without write access |
| Sequin CDC | ~1-5s | No | Logical replication slot | High-availability, zero-config |