AACsearch
Connectoren & Widget

PostgreSQL-Synchronisation

Synchronisieren Sie Ihre PostgreSQL-Tabellen in Echtzeit mit der AACsearch Engine mittels pg_notify, Polling oder logischer Replikation.

PostgreSQL-Synchronisations-Connector

Der PostgreSQL-Synchronisations-Connector hält Ihre AACsearch-Indizes mit Ihren PostgreSQL-Datenbanktabellen synchron. Er unterstützt drei Bereitstellungsstrategien.

Strategie 1: pg_notify (empfohlen)

Niedriglatenz, triggerbasierte Synchronisation mittels der in PostgreSQL integrierten LISTEN/NOTIFY-Funktion.

Einrichtung

Erstellen Sie zunächst eine Trigger-Funktion und weisen Sie diese Ihrer Tabelle zu:

CREATE OR REPLACE FUNCTION notify_aacsearch()
RETURNS trigger AS $$ BEGIN
  PERFORM pg_notify('aacsearch_sync', row_to_json(NEW)::text);
  RETURN NEW;
END; $$ LANGUAGE plpgsql;

CREATE TRIGGER products_aacsearch_trigger
AFTER INSERT OR UPDATE ON products
FOR EACH ROW EXECUTE FUNCTION notify_aacsearch();

Verbinden

import { startPgNotifyListener } from "@aacsearch/postgres-sync";

const pg = await startPgNotifyListener({
	aacsearch: {
		baseUrl: process.env.AACSEARCH_URL!,
		token: process.env.AACSEARCH_TOKEN!,
		projectId: process.env.AACSEARCH_PROJECT_ID!,
	},
	connectionString: process.env.DATABASE_URL!,
	table: "products",
	idColumn: "id",
	initialFullSync: true,
	debug: true,
});

process.on("SIGTERM", () => pg.end());

Der Listener führt folgende Schritte aus:

  1. Durchführung einer anfänglichen vollständigen Synchronisation aller vorhandenen Zeilen
  2. Abonnieren von pg_notify-Ereignissen für Echtzeit-Updates
  3. Weiterleiten jeder Änderung an AACsearch, sobald sie eintrifft

Strategie 2: Polling-Cursor

Polling ist nützlich, wenn Sie keine Trigger zu Ihren Datenbanktabellen hinzufügen können.

import { startPollingSync } from "@aacsearch/postgres-sync";

const poller = startPollingSync({
	aacsearch: { baseUrl: process.env.AACSEARCH_URL!, token, projectId },
	connectionString: process.env.DATABASE_URL!,
	table: "orders",
	cursorColumn: "updated_at",
	pollIntervalMs: 5000,
});

// Später beenden:
// poller.stop();

Der Poller führt in dem konfigurierten Intervall die Abfrage SELECT * FROM table WHERE cursorColumn > $1 ORDER BY cursorColumn aus und überträgt neue/aktualisierte Zeilen an AACsearch.

Strategie 3: Sequin CDC

Für produktionsreife logische Replikation ohne Trigger verwenden Sie Sequin zum Streamen von CDC-Ereignissen:

import { startSequinCdcSync } from "@aacsearch/postgres-sync";

const sequin = startSequinCdcSync({
	aacsearch: { baseUrl: process.env.AACSEARCH_URL!, token, projectId },
	streamUrl: "https://api.sequin.io/streams/YOUR_STREAM_ID",
	accessToken: "seq_xxx",
	table: "products",
});

Benutzerdefinierte Dokumentzuordnung

Alle Strategien akzeptieren eine benutzerdefinierte mapper-Funktion:

startPgNotifyListener(config, (row) => ({
	external_id: String(row.id),
	title: row.title,
	content: row.body || row.description,
	price: row.price,
	category: row.category_name,
	tags: row.tags ? JSON.parse(row.tags) : [],
}));

Callbacks

Überwachen Sie die Synchronisationsaktivität mit Callbacks:

startPgNotifyListener(config, undefined, {
	onSync: (event) => {
		console.log(`Synced ${event.action} on ${event.table}`);
	},
	onError: (error, context) => {
		console.error(`Error on ${context?.table}:`, error);
	},
	onConnected: () => console.log("Connected to PostgreSQL"),
	onDisconnected: () => console.log("Disconnected from PostgreSQL"),
});

Strategievergleich

StrategieLatenzErfordert TriggerOverheadAm besten geeignet für
pg_notify~100msJaMinimal (NOTIFY ist async)Produktionstabellen unter eigener Kontrolle
Polling-Cursor~5-60sNeinSELECT pro IntervallTabellen ohne Schreibzugriff
Sequin CDC~1-5sNeinLogischer Replikations-SlotHohe Verfügbarkeit, null Konfiguration

Verwandte Themen

On this page