AACsearch
Connectors & Widget

PostgreSQL Sync

Sync your PostgreSQL tables to AACsearch Engine in real-time using pg_notify, polling, or logical replication.

PostgreSQL Sync Connector

The PostgreSQL Sync Connector keeps your AACsearch indexes in sync with your PostgreSQL database tables. It supports three deployment strategies.

Low-latency trigger-based sync using PostgreSQL's built-in LISTEN/NOTIFY.

Setup

First, create a trigger function and attach it to your table:

CREATE OR REPLACE FUNCTION notify_aacsearch()
RETURNS trigger AS $$ BEGIN
  PERFORM pg_notify('aacsearch_sync', row_to_json(NEW)::text);
  RETURN NEW;
END; $$ LANGUAGE plpgsql;

CREATE TRIGGER products_aacsearch_trigger
AFTER INSERT OR UPDATE ON products
FOR EACH ROW EXECUTE FUNCTION notify_aacsearch();

Connect

import { startPgNotifyListener } from "@aacsearch/postgres-sync";

const pg = await startPgNotifyListener({
	aacsearch: {
		baseUrl: process.env.AACSEARCH_URL!,
		token: process.env.AACSEARCH_TOKEN!,
		projectId: process.env.AACSEARCH_PROJECT_ID!,
	},
	connectionString: process.env.DATABASE_URL!,
	table: "products",
	idColumn: "id",
	initialFullSync: true,
	debug: true,
});

process.on("SIGTERM", () => pg.end());

The listener will:

  1. Perform an initial full sync of all existing rows
  2. Subscribe to pg_notify events for real-time updates
  3. Push each change to AACsearch as it arrives

Strategy 2: Polling cursor

Polling is useful when you cannot add triggers to your database tables.

import { startPollingSync } from "@aacsearch/postgres-sync";

const poller = startPollingSync({
	aacsearch: { baseUrl: process.env.AACSEARCH_URL!, token, projectId },
	connectionString: process.env.DATABASE_URL!,
	table: "orders",
	cursorColumn: "updated_at",
	pollIntervalMs: 5000,
});

// Stop later:
// poller.stop();

The poller queries SELECT * FROM table WHERE cursorColumn > $1 ORDER BY cursorColumn at the configured interval and pushes new/updated rows to AACsearch.

Strategy 3: Sequin CDC

For production-grade logical replication without triggers, use Sequin to stream CDC events:

import { startSequinCdcSync } from "@aacsearch/postgres-sync";

const sequin = startSequinCdcSync({
	aacsearch: { baseUrl: process.env.AACSEARCH_URL!, token, projectId },
	streamUrl: "https://api.sequin.io/streams/YOUR_STREAM_ID",
	accessToken: "seq_xxx",
	table: "products",
});

Custom document mapping

All strategies accept a custom mapper function:

startPgNotifyListener(config, (row) => ({
	external_id: String(row.id),
	title: row.title,
	content: row.body || row.description,
	price: row.price,
	category: row.category_name,
	tags: row.tags ? JSON.parse(row.tags) : [],
}));

Callbacks

Monitor sync activity with callbacks:

startPgNotifyListener(config, undefined, {
	onSync: (event) => {
		console.log(`Synced ${event.action} on ${event.table}`);
	},
	onError: (error, context) => {
		console.error(`Error on ${context?.table}:`, error);
	},
	onConnected: () => console.log("Connected to PostgreSQL"),
	onDisconnected: () => console.log("Disconnected from PostgreSQL"),
});

Strategy comparison

StrategyLatencyRequires triggerOverheadBest for
pg_notify~100msYesMinimal (NOTIFY is async)Production tables you control
Polling cursor~5-60sNoSELECT per intervalTables without write access
Sequin CDC~1-5sNoLogical replication slotHigh-availability, zero-config

On this page