Серверные хелперы
Пакетная обработка, идемпотентность, стратегия повторных попыток и проверка подписи вебхуков — паттерны, которым должен следовать каждый потребитель серверного SDK при индексации документов и получении вебхуков от AACsearch.
Браузерный SDK — это тонкая обёртка над поисковым API. Использование SDK на серверной стороне — индексация документов, запуск CMS-коннекторов, получение вебхуков — требует большей дисциплины. На этой странице собраны четыре паттерна, которым вы всегда должны следовать.
Пакетная обработка
Запись по одному документу медленна и дорога. Всегда используйте пакетную обработку.
| Эндпоинт | Макс. размер пакета | Рекомендуемый размер |
|---|---|---|
documents:batch (upsert) | 1000 | 100–500 |
documents:batchdelete | 1000 | 100–1000 |
sync/full | 1000 | 500 |
sync/delta | 1000 | 100–500 |
events:batch | 1000 | 100 |
Эндпоинты пакетной обработки возвращают массивы ошибок по строкам — частичный успех является нормой.
Хелпер для Node
import { AdminClient } from "@aacsearch/client";
const admin = new AdminClient({
baseUrl: process.env.AACSEARCH_BASE_URL!,
apiKey: process.env.AACSEARCH_ADMIN_KEY!,
projectId: process.env.AACSEARCH_ORG_ID!,
});
async function bulkUpsert(indexId: string, docs: Document[], batchSize = 500) {
const errors: Array<{ id: string; error: string; message: string }> = [];
for (let i = 0; i < docs.length; i += batchSize) {
const batch = docs.slice(i, i + batchSize);
const result = await admin.batchUpsertDocuments(indexId, batch);
errors.push(...(result.errors ?? []));
}
return { total: docs.length, succeeded: docs.length - errors.length, errors };
}Хелпер для Python
from aacsearch import AdminClient
admin = AdminClient(
base_url=os.environ["AACSEARCH_BASE_URL"],
api_key=os.environ["AACSEARCH_ADMIN_KEY"],
project_id=os.environ["AACSEARCH_ORG_ID"],
)
def bulk_upsert(index_id, docs, batch_size=500):
errors = []
for i in range(0, len(docs), batch_size):
batch = docs[i:i + batch_size]
result = admin.batch_upsert_documents(index_id, batch)
errors.extend(result.get("errors", []))
return {"total": len(docs), "succeeded": len(docs) - len(errors), "errors": errors}PHP (CMS-коннектор)
<?php
function bulkUpsert(string $baseUrl, string $token, string $projectId, array $products, int $batchSize = 500): array {
$errors = [];
foreach (array_chunk($products, $batchSize) as $batch) {
$ch = curl_init("$baseUrl/api/projects/$projectId/sync/delta");
curl_setopt_array($ch, [
CURLOPT_RETURNTRANSFER => true,
CURLOPT_POST => true,
CURLOPT_HTTPHEADER => [
"Authorization: Bearer $token",
'Content-Type: application/json',
],
CURLOPT_POSTFIELDS => json_encode(['products' => $batch]),
]);
$res = json_decode(curl_exec($ch), true);
curl_close($ch);
$errors = array_merge($errors, $res['errors'] ?? []);
}
return ['total' => count($products), 'errors' => $errors];
}Для пакетной обработки, специфичной для PrestaShop / Bitrix, см. Жизненный цикл API коннектора.
Идемпотентность
ID документов детерминированы из external_id. Повторная отправка того же документа безопасна — он перезаписывается. Именно это делает индексацию идемпотентной.
// Запускайте этот скрипт сколько угодно раз — результат одинаков
await admin.batchUpsertDocuments(indexId, [
{ external_id: "product-123", title: "Shoes", price: 49.99 },
]);То же самое верно для sync/full и sync/delta — коннектор может повторно отправить любой пакет без создания дубликатов.
Почему это важно
CMS-коннекторы работают в ненадёжных средах (тайм-ауты cron, сетевые сбои, перезапуски сервера в середине задачи). С идемпотентностью стратегия восстановления — «просто перезапустите неудачный пакет». Без неё вам пришлось бы отслеживать, какая часть каждого пакета была обработана, и повторять только её.
Когда идемпотентность НЕ покрывает
- Удаления не идемпотентны по отношению к вновь созданным документам с тем же
external_id. Если ваша задача удаляетproduct-123, а отдельная задача создаёт его, порядок имеет значение. - Изменения схемы между запусками. Повторная отправка старой нагрузки после миграции схемы может вызвать ошибки валидации по строкам.
events:track— at-least-once, а не exactly-once. Отслеживайтеevent_id(UUID) на стороне коннектора и дедуплицируйте в вашем аналитическом пайплайне.
Заголовок Idempotency-Key (для событий)
Для аналитических событий, где идемпотентность важна, включите заголовок Idempotency-Key:
await fetch(`${BASE}/api/events/track`, {
method: "POST",
headers: {
Authorization: `Bearer ${KEY}`,
"Content-Type": "application/json",
"Idempotency-Key": eventId, // UUIDv4 с вашей стороны
},
body: JSON.stringify({ event: "result_click", properties: { ... } }),
});Дублирующиеся события с одинаковым Idempotency-Key в течение 24-часового окна дедуплицируются на серверной стороне.
Стратегия повторных попыток
Разные ошибки требуют разных политик повторных попыток. Ошибитесь здесь — и вы либо сдадитесь слишком рано (потеря данных), либо забьёте сервер (каскад ограничений скорости).
| HTTP / ошибка | Повторять? | Стратегия |
|---|---|---|
4xx кроме 429 | Нет | Исправить запрос |
429 rate_limit_exceeded | Да | Ждать Retry-After секунд |
429 quota_exceeded | Нет | Обновить тариф или ждать месячного сброса |
502 search_failed | Да | 1 повтор через 1с; если неудача — эскалировать |
502 ingest_failed | Да | Экспоненциальная задержка: 1с → 2с → 4с |
503 service_unavailable | Да | Экспоненциальная задержка с jitter |
| Сетевая ошибка | Да | Экспоненциальная задержка |
Хелпер для Node с p-retry
import pRetry, { AbortError } from "p-retry";
import { AacSearchError } from "@aacsearch/client";
async function withRetry<T>(fn: () => Promise<T>): Promise<T> {
return pRetry(
async () => {
try {
return await fn();
} catch (err) {
if (err instanceof AacSearchError) {
if (
err.status >= 400 &&
err.status < 500 &&
err.code !== "rate_limit"
) {
throw new AbortError(err); // не повторять 4xx
}
if (err.code === "quota_exceeded") {
throw new AbortError(err);
}
if (err.code === "rate_limit") {
const retryAfter = Number(
err.response?.headers.get("Retry-After") ?? 5,
);
await new Promise((r) => setTimeout(r, retryAfter * 1000));
}
}
throw err;
}
},
{
retries: 3,
factor: 2,
minTimeout: 1000,
maxTimeout: 30_000,
randomize: true, // jitter
},
);
}
// Использование:
const result = await withRetry(() =>
admin.batchUpsertDocuments(indexId, batch),
);Хелпер для Python
import time, random
from aacsearch import SdkError
def with_retry(fn, max_retries=3):
for attempt in range(max_retries + 1):
try:
return fn()
except SdkError as e:
if e.status and 400 <= e.status < 500 and e.code != "rate_limit":
raise
if e.code == "quota_exceeded":
raise
if e.code == "rate_limit":
retry_after = int(e.response_headers.get("Retry-After", 5))
time.sleep(retry_after)
else:
if attempt == max_retries:
raise
backoff = (2 ** attempt) + random.random()
time.sleep(backoff)
except Exception:
if attempt == max_retries:
raise
time.sleep((2 ** attempt) + random.random())Зачем нужен jitter
Без jitter каждый клиент, получивший 429 в один и тот же момент, повторит запрос в тот же момент, создавая лавину запросов, которая вызовет ещё один 429. Jitter распределяет их по времени.
Проверка подписи вебхуков
AACsearch подписывает каждый исходящий вебхук с помощью HMAC-SHA256. Проверяйте подпись до обработки любой нагрузки — любой, у кого есть URL вашего эндпоинта, может отправить мусорный POST.
Как работает подпись
signature = HMAC-SHA256(secret, request_body_bytes)
header X-AACSearch-Signature-256: sha256=<hex>secret — это тот, который вы настроили в Поиск → Вебхуки → Эндпоинт → «Секрет подписи».
Проверка в Node
import crypto from "node:crypto";
import { Hono } from "hono";
const app = new Hono();
app.post("/webhooks/aacsearch", async (c) => {
const rawBody = await c.req.text();
const signature = c.req.header("X-AACSearch-Signature-256") ?? "";
const expected =
"sha256=" +
crypto
.createHmac("sha256", process.env.AACSEARCH_WEBHOOK_SECRET!)
.update(rawBody)
.digest("hex");
if (!crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected))) {
return c.text("invalid signature", 401);
}
const event = JSON.parse(rawBody);
// безопасно для обработки
await handleEvent(event);
return c.text("ok", 200);
});timingSafeEqual предотвращает атаки по времени. Не используйте === для сравнения подписей.
Проверка в Python
import hmac, hashlib
from flask import request, abort
@app.route("/webhooks/aacsearch", methods=["POST"])
def aacsearch_webhook():
raw_body = request.get_data()
signature = request.headers.get("X-AACSearch-Signature-256", "")
expected = "sha256=" + hmac.new(
os.environ["AACSEARCH_WEBHOOK_SECRET"].encode(),
raw_body,
hashlib.sha256,
).hexdigest()
if not hmac.compare_digest(signature, expected):
abort(401)
event = request.get_json()
handle_event(event)
return "ok", 200Проверка в PHP
<?php
$rawBody = file_get_contents('php://input');
$signature = $_SERVER['HTTP_X_AACSEARCH_SIGNATURE_256'] ?? '';
$expected = 'sha256=' . hash_hmac('sha256', $rawBody, getenv('AACSEARCH_WEBHOOK_SECRET'));
if (!hash_equals($signature, $expected)) {
http_response_code(401);
exit('invalid signature');
}
$event = json_decode($rawBody, true);
handleEvent($event);
http_response_code(200);hash_equals — это PHP-эквивалент timingSafeEqual.
Защита от повтора
Одна лишь подпись не защищает от повтора (записанный валидный запрос, отправленный повторно позже). Для защиты от повтора проверяйте поле timestamp события и отклоняйте всё, что старше 5 минут:
const event = JSON.parse(rawBody);
const eventTime = new Date(event.timestamp).getTime();
if (Math.abs(Date.now() - eventTime) > 5 * 60 * 1000) {
return c.text("event too old", 401);
}Для доставки ровно один раз дедуплицируйте по event.id в вашем обработчике — AACsearch повторяет попытки при 5xx, так что вы можете получить одно и то же событие дважды.
Читайте сырое тело запроса для проверки HMAC, а не разобранный JSON. Повторная сериализация JSON может изменить порядок байтов, пробелы и экранирование ключей — подпись не совпадёт.
Массовый импорт (начальная синхронизация)
Для самой первой индексации большого каталога (10k–10M документов) правильный инструмент — паттерн экспорт → импорт, а не множество маленьких пакетов.
// 1. Экспортируйте ваш исходный каталог как JSONL
// 2. Потоково передайте через batchUpsert чанками по 500 документов
import { createReadStream } from "node:fs";
import readline from "node:readline";
async function bulkImport(path: string, indexId: string) {
const stream = readline.createInterface({
input: createReadStream(path),
crlfDelay: Infinity,
});
let batch: Document[] = [];
let total = 0;
const errors: any[] = [];
for await (const line of stream) {
batch.push(JSON.parse(line));
if (batch.length >= 500) {
const result = await withRetry(() =>
admin.batchUpsertDocuments(indexId, batch),
);
errors.push(...(result.errors ?? []));
total += batch.length;
batch = [];
if (total % 10_000 === 0) console.log(`импортировано ${total}`);
}
}
if (batch.length) {
const result = await withRetry(() =>
admin.batchUpsertDocuments(indexId, batch),
);
errors.push(...(result.errors ?? []));
total += batch.length;
}
console.log(`готово: ${total} документов, ${errors.length} ошибок`);
}Пропускная способность на типичном тарифе Pro: ~1000 док/сек устойчиво. Каталог из 1M документов импортируется за ~15–20 минут.
Дельта-синхронизация
После начального массового импорта переключитесь на дельта-синхронизацию. Коннектор отслеживает курсор последнего изменения и отправляет только изменения:
async function deltaSync(indexId: string, since: Date) {
const changed = await db.products.findMany({
where: { updatedAt: { gt: since } },
take: 1000,
});
if (!changed.length) return since;
await withRetry(() =>
admin.batchUpsertDocuments(
indexId,
changed.map((p) => ({ external_id: p.id, ...mapToDocument(p) })),
),
);
return changed[changed.length - 1].updatedAt;
}Для удалений запрашивайте отдельно «soft-deleted since»:
const deleted = await db.products.findMany({
where: { deletedAt: { gt: since } },
select: { id: true },
});
if (deleted.length) {
await admin.batchDeleteDocuments(
indexId,
deleted.map((p) => p.id),
);
}Для коннекторов, специфичных для PrestaShop, Bitrix и других CMS, эндпоинты жизненного цикла (sync/full, sync/delta) заменяют прямые вызовы batchUpsert. См. Жизненный цикл API коннектора.
Связанные страницы
- Справочник Node.js SDK — API
AdminClient - Справочник Python SDK
- Жизненный цикл API коннектора — полная синхронизация / дельта / heartbeat
- Обзор вебхуков — справочник исходящих событий
- Ошибки и лимиты запросов — матрица кодов ошибок
- Ошибки индексации — отладка зависшей или сбойной индексации
Мультиязычный каталог
Один товар, несколько языков. Единый индекс с фасетом `locale`, языково-специфичные текстовые поля и токен с ограничением, привязывающий пользователя к его языку.
Обзор миграции
Как мигрировать в AACsearch с разных стеков — БД LIKE, Algolia, Elasticsearch, Meilisearch, self-hosted Typesense.