AACsearch
SDK

Серверные хелперы

Пакетная обработка, идемпотентность, стратегия повторных попыток и проверка подписи вебхуков — паттерны, которым должен следовать каждый потребитель серверного SDK при индексации документов и получении вебхуков от AACsearch.

Браузерный SDK — это тонкая обёртка над поисковым API. Использование SDK на серверной стороне — индексация документов, запуск CMS-коннекторов, получение вебхуков — требует большей дисциплины. На этой странице собраны четыре паттерна, которым вы всегда должны следовать.

Пакетная обработка

Запись по одному документу медленна и дорога. Всегда используйте пакетную обработку.

ЭндпоинтМакс. размер пакетаРекомендуемый размер
documents:batch (upsert)1000100–500
documents:batchdelete1000100–1000
sync/full1000500
sync/delta1000100–500
events:batch1000100

Эндпоинты пакетной обработки возвращают массивы ошибок по строкам — частичный успех является нормой.

Хелпер для Node

import { AdminClient } from "@aacsearch/client";

const admin = new AdminClient({
  baseUrl: process.env.AACSEARCH_BASE_URL!,
  apiKey: process.env.AACSEARCH_ADMIN_KEY!,
  projectId: process.env.AACSEARCH_ORG_ID!,
});

async function bulkUpsert(indexId: string, docs: Document[], batchSize = 500) {
  const errors: Array<{ id: string; error: string; message: string }> = [];

  for (let i = 0; i < docs.length; i += batchSize) {
    const batch = docs.slice(i, i + batchSize);
    const result = await admin.batchUpsertDocuments(indexId, batch);
    errors.push(...(result.errors ?? []));
  }

  return { total: docs.length, succeeded: docs.length - errors.length, errors };
}

Хелпер для Python

from aacsearch import AdminClient

admin = AdminClient(
    base_url=os.environ["AACSEARCH_BASE_URL"],
    api_key=os.environ["AACSEARCH_ADMIN_KEY"],
    project_id=os.environ["AACSEARCH_ORG_ID"],
)

def bulk_upsert(index_id, docs, batch_size=500):
    errors = []
    for i in range(0, len(docs), batch_size):
        batch = docs[i:i + batch_size]
        result = admin.batch_upsert_documents(index_id, batch)
        errors.extend(result.get("errors", []))
    return {"total": len(docs), "succeeded": len(docs) - len(errors), "errors": errors}

PHP (CMS-коннектор)

<?php
function bulkUpsert(string $baseUrl, string $token, string $projectId, array $products, int $batchSize = 500): array {
    $errors = [];
    foreach (array_chunk($products, $batchSize) as $batch) {
        $ch = curl_init("$baseUrl/api/projects/$projectId/sync/delta");
        curl_setopt_array($ch, [
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_POST => true,
            CURLOPT_HTTPHEADER => [
                "Authorization: Bearer $token",
                'Content-Type: application/json',
            ],
            CURLOPT_POSTFIELDS => json_encode(['products' => $batch]),
        ]);
        $res = json_decode(curl_exec($ch), true);
        curl_close($ch);
        $errors = array_merge($errors, $res['errors'] ?? []);
    }
    return ['total' => count($products), 'errors' => $errors];
}

Для пакетной обработки, специфичной для PrestaShop / Bitrix, см. Жизненный цикл API коннектора.

Идемпотентность

ID документов детерминированы из external_id. Повторная отправка того же документа безопасна — он перезаписывается. Именно это делает индексацию идемпотентной.

// Запускайте этот скрипт сколько угодно раз — результат одинаков
await admin.batchUpsertDocuments(indexId, [
  { external_id: "product-123", title: "Shoes", price: 49.99 },
]);

То же самое верно для sync/full и sync/delta — коннектор может повторно отправить любой пакет без создания дубликатов.

Почему это важно

CMS-коннекторы работают в ненадёжных средах (тайм-ауты cron, сетевые сбои, перезапуски сервера в середине задачи). С идемпотентностью стратегия восстановления — «просто перезапустите неудачный пакет». Без неё вам пришлось бы отслеживать, какая часть каждого пакета была обработана, и повторять только её.

Когда идемпотентность НЕ покрывает

  • Удаления не идемпотентны по отношению к вновь созданным документам с тем же external_id. Если ваша задача удаляет product-123, а отдельная задача создаёт его, порядок имеет значение.
  • Изменения схемы между запусками. Повторная отправка старой нагрузки после миграции схемы может вызвать ошибки валидации по строкам.
  • events:track — at-least-once, а не exactly-once. Отслеживайте event_id (UUID) на стороне коннектора и дедуплицируйте в вашем аналитическом пайплайне.

Заголовок Idempotency-Key (для событий)

Для аналитических событий, где идемпотентность важна, включите заголовок Idempotency-Key:

await fetch(`${BASE}/api/events/track`, {
	method: "POST",
	headers: {
		Authorization: `Bearer ${KEY}`,
		"Content-Type": "application/json",
		"Idempotency-Key": eventId, // UUIDv4 с вашей стороны
	},
	body: JSON.stringify({ event: "result_click", properties: { ... } }),
});

Дублирующиеся события с одинаковым Idempotency-Key в течение 24-часового окна дедуплицируются на серверной стороне.

Стратегия повторных попыток

Разные ошибки требуют разных политик повторных попыток. Ошибитесь здесь — и вы либо сдадитесь слишком рано (потеря данных), либо забьёте сервер (каскад ограничений скорости).

HTTP / ошибкаПовторять?Стратегия
4xx кроме 429НетИсправить запрос
429 rate_limit_exceededДаЖдать Retry-After секунд
429 quota_exceededНетОбновить тариф или ждать месячного сброса
502 search_failedДа1 повтор через 1с; если неудача — эскалировать
502 ingest_failedДаЭкспоненциальная задержка: 1с → 2с → 4с
503 service_unavailableДаЭкспоненциальная задержка с jitter
Сетевая ошибкаДаЭкспоненциальная задержка

Хелпер для Node с p-retry

import pRetry, { AbortError } from "p-retry";
import { AacSearchError } from "@aacsearch/client";

async function withRetry<T>(fn: () => Promise<T>): Promise<T> {
  return pRetry(
    async () => {
      try {
        return await fn();
      } catch (err) {
        if (err instanceof AacSearchError) {
          if (
            err.status >= 400 &&
            err.status < 500 &&
            err.code !== "rate_limit"
          ) {
            throw new AbortError(err); // не повторять 4xx
          }
          if (err.code === "quota_exceeded") {
            throw new AbortError(err);
          }
          if (err.code === "rate_limit") {
            const retryAfter = Number(
              err.response?.headers.get("Retry-After") ?? 5,
            );
            await new Promise((r) => setTimeout(r, retryAfter * 1000));
          }
        }
        throw err;
      }
    },
    {
      retries: 3,
      factor: 2,
      minTimeout: 1000,
      maxTimeout: 30_000,
      randomize: true, // jitter
    },
  );
}

// Использование:
const result = await withRetry(() =>
  admin.batchUpsertDocuments(indexId, batch),
);

Хелпер для Python

import time, random
from aacsearch import SdkError

def with_retry(fn, max_retries=3):
    for attempt in range(max_retries + 1):
        try:
            return fn()
        except SdkError as e:
            if e.status and 400 <= e.status < 500 and e.code != "rate_limit":
                raise
            if e.code == "quota_exceeded":
                raise
            if e.code == "rate_limit":
                retry_after = int(e.response_headers.get("Retry-After", 5))
                time.sleep(retry_after)
            else:
                if attempt == max_retries:
                    raise
                backoff = (2 ** attempt) + random.random()
                time.sleep(backoff)
        except Exception:
            if attempt == max_retries:
                raise
            time.sleep((2 ** attempt) + random.random())

Зачем нужен jitter

Без jitter каждый клиент, получивший 429 в один и тот же момент, повторит запрос в тот же момент, создавая лавину запросов, которая вызовет ещё один 429. Jitter распределяет их по времени.

Проверка подписи вебхуков

AACsearch подписывает каждый исходящий вебхук с помощью HMAC-SHA256. Проверяйте подпись до обработки любой нагрузки — любой, у кого есть URL вашего эндпоинта, может отправить мусорный POST.

Как работает подпись

signature = HMAC-SHA256(secret, request_body_bytes)
header X-AACSearch-Signature-256: sha256=<hex>

secret — это тот, который вы настроили в Поиск → Вебхуки → Эндпоинт → «Секрет подписи».

Проверка в Node

import crypto from "node:crypto";
import { Hono } from "hono";

const app = new Hono();

app.post("/webhooks/aacsearch", async (c) => {
  const rawBody = await c.req.text();
  const signature = c.req.header("X-AACSearch-Signature-256") ?? "";

  const expected =
    "sha256=" +
    crypto
      .createHmac("sha256", process.env.AACSEARCH_WEBHOOK_SECRET!)
      .update(rawBody)
      .digest("hex");

  if (!crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected))) {
    return c.text("invalid signature", 401);
  }

  const event = JSON.parse(rawBody);
  // безопасно для обработки
  await handleEvent(event);
  return c.text("ok", 200);
});

timingSafeEqual предотвращает атаки по времени. Не используйте === для сравнения подписей.

Проверка в Python

import hmac, hashlib
from flask import request, abort

@app.route("/webhooks/aacsearch", methods=["POST"])
def aacsearch_webhook():
    raw_body = request.get_data()
    signature = request.headers.get("X-AACSearch-Signature-256", "")

    expected = "sha256=" + hmac.new(
        os.environ["AACSEARCH_WEBHOOK_SECRET"].encode(),
        raw_body,
        hashlib.sha256,
    ).hexdigest()

    if not hmac.compare_digest(signature, expected):
        abort(401)

    event = request.get_json()
    handle_event(event)
    return "ok", 200

Проверка в PHP

<?php
$rawBody = file_get_contents('php://input');
$signature = $_SERVER['HTTP_X_AACSEARCH_SIGNATURE_256'] ?? '';

$expected = 'sha256=' . hash_hmac('sha256', $rawBody, getenv('AACSEARCH_WEBHOOK_SECRET'));

if (!hash_equals($signature, $expected)) {
    http_response_code(401);
    exit('invalid signature');
}

$event = json_decode($rawBody, true);
handleEvent($event);
http_response_code(200);

hash_equals — это PHP-эквивалент timingSafeEqual.

Защита от повтора

Одна лишь подпись не защищает от повтора (записанный валидный запрос, отправленный повторно позже). Для защиты от повтора проверяйте поле timestamp события и отклоняйте всё, что старше 5 минут:

const event = JSON.parse(rawBody);
const eventTime = new Date(event.timestamp).getTime();
if (Math.abs(Date.now() - eventTime) > 5 * 60 * 1000) {
  return c.text("event too old", 401);
}

Для доставки ровно один раз дедуплицируйте по event.id в вашем обработчике — AACsearch повторяет попытки при 5xx, так что вы можете получить одно и то же событие дважды.

Читайте сырое тело запроса для проверки HMAC, а не разобранный JSON. Повторная сериализация JSON может изменить порядок байтов, пробелы и экранирование ключей — подпись не совпадёт.

Массовый импорт (начальная синхронизация)

Для самой первой индексации большого каталога (10k–10M документов) правильный инструмент — паттерн экспорт → импорт, а не множество маленьких пакетов.

// 1. Экспортируйте ваш исходный каталог как JSONL
// 2. Потоково передайте через batchUpsert чанками по 500 документов
import { createReadStream } from "node:fs";
import readline from "node:readline";

async function bulkImport(path: string, indexId: string) {
  const stream = readline.createInterface({
    input: createReadStream(path),
    crlfDelay: Infinity,
  });

  let batch: Document[] = [];
  let total = 0;
  const errors: any[] = [];

  for await (const line of stream) {
    batch.push(JSON.parse(line));
    if (batch.length >= 500) {
      const result = await withRetry(() =>
        admin.batchUpsertDocuments(indexId, batch),
      );
      errors.push(...(result.errors ?? []));
      total += batch.length;
      batch = [];
      if (total % 10_000 === 0) console.log(`импортировано ${total}`);
    }
  }
  if (batch.length) {
    const result = await withRetry(() =>
      admin.batchUpsertDocuments(indexId, batch),
    );
    errors.push(...(result.errors ?? []));
    total += batch.length;
  }
  console.log(`готово: ${total} документов, ${errors.length} ошибок`);
}

Пропускная способность на типичном тарифе Pro: ~1000 док/сек устойчиво. Каталог из 1M документов импортируется за ~15–20 минут.

Дельта-синхронизация

После начального массового импорта переключитесь на дельта-синхронизацию. Коннектор отслеживает курсор последнего изменения и отправляет только изменения:

async function deltaSync(indexId: string, since: Date) {
  const changed = await db.products.findMany({
    where: { updatedAt: { gt: since } },
    take: 1000,
  });

  if (!changed.length) return since;

  await withRetry(() =>
    admin.batchUpsertDocuments(
      indexId,
      changed.map((p) => ({ external_id: p.id, ...mapToDocument(p) })),
    ),
  );

  return changed[changed.length - 1].updatedAt;
}

Для удалений запрашивайте отдельно «soft-deleted since»:

const deleted = await db.products.findMany({
  where: { deletedAt: { gt: since } },
  select: { id: true },
});
if (deleted.length) {
  await admin.batchDeleteDocuments(
    indexId,
    deleted.map((p) => p.id),
  );
}

Для коннекторов, специфичных для PrestaShop, Bitrix и других CMS, эндпоинты жизненного цикла (sync/full, sync/delta) заменяют прямые вызовы batchUpsert. См. Жизненный цикл API коннектора.

Связанные страницы

On this page