Перейти к содержанию

Sync Cycle — Полный код движка

Уровень: L3 (deep-dive) | ← Назад к Sync Engine

Файл: frontend/src/offline/sync/syncEngine.ts

Ядро синхронизации: подписка на network events, последовательная обработка очереди, маршрутизация операций.

import { operationQueue } from './operationQueue';
import { networkStatus } from '../network/networkStatus';
import { itemsApi } from '../../api/items';
import { outfitsApi } from '../../api/outfits';
import { mediaApi } from '../../api/media';
import { embeddingRepository } from '../../embeddings/embeddingRepository';
import { outfitEmbeddingService } from '../../embeddings/outfitEmbeddingService';
import { similarityService } from '../../embeddings/similarityService';
import { queryClient } from '../../queryClient';
import { getOutfitIdsContainingItem } from '../repositories/outfitsRepository';
import { itemsRepository } from '../repositories/itemsRepository';
import { db } from '../db/dexie';
import type { PendingOperation } from './syncTypes';

type SyncListener = () => void;

class SyncEngine {
  private _running = false;
  private _listeners = new Set<SyncListener>();
  private _unsubNetwork: (() => void) | null = null;

  /** Запуск: подписка на online/offline и об��аботка очереди */
  start() {
    if (this._unsubNetwork) return;  // Уже запущен

    this._unsubNetwork = networkStatus.subscribe((online) => {
      if (online) {
        this.processQueue();  // Возобновить при появлении сети
      }
    });

    // Обработать сразу если online
    if (networkStatus.online) {
      this.processQueue();
    }
  }

  /** Остановка движка */
  stop() {
    this._unsubNetwork?.();
    this._unsubNetwork = null;
  }

  /** Подписка на изменения статуса (для UI badge) */
  onChange(listener: SyncListener): () => void {
    this._listeners.add(listener);
    return () => this._listeners.delete(listener);
  }

  private _notify() {
    this._listeners.forEach((fn) => fn());
  }

  /** Основной цикл обработки очереди */
  async processQueue(): Promise<void> {
    if (this._running) return;  // Prevent concurrent runs
    if (!networkStatus.online) return;

    this._running = true;
    this._notify();

    try {
      let ready = await operationQueue.getReady();

      // Цикл: обработать все готовые, проверить новые разблокированные
      while (ready.length > 0 && networkStatus.online) {
        for (const op of ready) {
          if (!networkStatus.online) break;

          await operationQueue.markRunning(op.id);
          try {
            await this._executeOperation(op);
            await operationQueue.markDone(op.id);

            // Инвалидация React Query кешей
            if (op.entity_type === 'item' || op.entity_type === 'media') {
              queryClient.invalidateQueries({ queryKey: ['items'] });
              if (op.operation_type === 'ITEM_DELETE') {
                queryClient.invalidateQueries({ queryKey: ['outfits'] });
                queryClient.invalidateQueries({ queryKey: ['shared-outfits'] });
              }
            }
            if (op.entity_type === 'outfit') {
              queryClient.invalidateQueries({ queryKey: ['outfits'] });
              if (op.operation_type === 'OUTFIT_DELETE') {
                queryClient.invalidateQueries({ queryKey: ['shared-outfits'] });
              }
            }
          } catch (err) {
            const message = err instanceof Error ? err.message : String(err);
            console.warn(`[Sync] Operation ${op.operation_type} failed:`, message);
            await operationQueue.markFailed(op.id, message);
          }
          this._notify();
        }

        // Проверить новые разблокированные операции
        ready = await operationQueue.getReady();
      }
    } finally {
      this._running = false;
      this._notify();
    }
  }

  /** Enqueue + сразу попробовать обработать */
  async enqueueAndProcess(params: Parameters<typeof operationQueue.enqueue>[0]): Promise<PendingOperation> {
    const op = await operationQueue.enqueue(params);
    this._notify();
    this.processQueue();  // Не await — фоновая обработка
    return op;
  }

  get isRunning() {
    return this._running;
  }

  // ─── DISPATCH ──────────���────────────────────────��──────────────────────────

  private async _executeOperation(op: PendingOperation): Promise<void> {
    switch (op.operation_type) {
      case 'ITEM_MEDIA_UPLOAD':
        await this._execItemMediaUpload(op);
        break;
      case 'ITEM_CREATE':
        await this._execItemCreate(op);
        break;
      case 'ITEM_UPDATE':
        await this._execItemUpdate(op);
        break;
      case 'ITEM_DELETE':
        await this._execItemDelete(op);
        break;
      case 'OUTFIT_CREATE':
        await this._execOutfitCreate(op);
        break;
      case 'OUTFIT_UPDATE':
        await this._execOutfitUpdate(op);
        break;
      case 'OUTFIT_DELETE':
        await this._execOutfitDelete(op);
        break;
      case 'EMBEDDING_REFRESH':
        await this._execEmbeddingRefresh(op);
        break;
      default:
        console.warn(`[Sync] Unknown operation type: ${op.operation_type}`);
    }
  }

  // ─── ОПЕРАЦИИ: ITEMS ───────────────────────────────────────────────────────

  /** Создание вещи: отправить на сервер, замапить local→server ID */
  private async _execItemCreate(op: PendingOperation) {
    const payload = op.payload as {
      title?: string; category_id: number;
      primary_color_id?: number; primary_media_id?: string;
      style_node_ids?: number[];
    };
    const serverItem = await itemsApi.create(payload);

    // Обновить локальную запись: маппинг ID
    await db.items.update(op.entity_id, {
      server_id: serverItem.id,
      local_only: false,
      sync_status: 'synced',
      primary_media_url: serverItem.primary_media_url ?? null,
      server_updated_at: serverItem.updated_at,
    });

    // Сохранить embedding если вернулся с сервера
    if (serverItem.embedding) {
      await embeddingRepository.upsertFromServer('item', op.entity_id, serverItem.embedding);
      similarityService.invalidateCache('item');
    }
  }

  /** Обновление вещи на сервере */
  private async _execItemUpdate(op: PendingOperation) {
    const serverId = (await db.items.get(op.entity_id))?.server_id;
    if (!serverId) throw new Error('No server_id for item update');
    const payload = op.payload as Record<string, unknown>;
    const serverItem = await itemsApi.update(serverId, payload);
    await db.items.update(op.entity_id, {
      sync_status: 'synced',
      primary_media_url: serverItem.primary_media_url ?? null,
      server_updated_at: serverItem.updated_at,
    });
  }

  /** ��даление вещи: каскадное удаление образов + embedding cleanup */
  private async _execItemDelete(op: PendingOperation) {
    // 1. Найти образы, содержащие эту вещь
    const affectedOutfitIds = await getOutfitIdsContainingItem(op.entity_id);

    // 2. Удалить на сервере
    const local = await db.items.get(op.entity_id);
    const serverId = local?.server_id;
    if (serverId) {
      await itemsApi.delete(serverId);
    }

    // 3. Удалить embedding'и затронутых образов
    for (const outfitId of affectedOutfitIds) {
      await embeddingRepository.remove('outfit', outfitId);
    }
    if (affectedOutfitIds.length > 0) similarityService.invalidateCache('outfit');

    // 4. Каскадное удаление образов из IndexedDB
    await db.transaction('rw', [db.outfits, db.outfit_items], async () => {
      for (const outfitId of affectedOutfitIds) {
        await db.outfit_items.where('outfit_id').equals(outfitId).delete();
        await db.outfits.delete(outfitId);
      }
    });

    // 5. Удалить embedding и саму вещь
    await embeddingRepository.remove('item', op.entity_id);
    similarityService.invalidateCache('item');
    await itemsRepository.removeAfterSync(op.entity_id);
  }

  // ─── ОПЕРАЦ��И: OUTFITS ─────────────────────���───────────────────────────────

  /** Создание образа: resolve local item IDs → server IDs */
  private async _execOutfitCreate(op: PendingOperation) {
    const payload = op.payload as {
      title?: string;
      outfit_items?: Array<{ item_id: string; slot_id: number; layer_index?: number }>;
    };

    // Заменить локальные UUID вещей на серверные
    if (payload.outfit_items) {
      for (const oi of payload.outfit_items) {
        const item = await db.items.get(oi.item_id);
        if (item?.server_id) {
          oi.item_id = item.server_id;
        }
      }
    }

    const serverOutfit = await outfitsApi.create(payload);
    await db.outfits.update(op.entity_id, {
      server_id: serverOutfit.id,
      local_only: false,
      sync_status: 'synced',
      title: serverOutfit.title ?? null,
      cover_media_url: serverOutfit.cover_media_url ?? null,
      server_updated_at: serverOutfit.updated_at,
    });

    // Вычислить производный embedding образа
    await outfitEmbeddingService.computeAndStore(op.entity_id);
    similarityService.invalidateCache('outfit');
  }

  private async _execOutfitUpdate(op: PendingOperation) {
    const serverId = (await db.outfits.get(op.entity_id))?.server_id;
    if (!serverId) throw new Error('No server_id for outfit update');
    const payload = op.payload as Record<string, unknown>;
    const serverOutfit = await outfitsApi.update(serverId, payload);
    await db.outfits.update(op.entity_id, {
      sync_status: 'synced',
      cover_media_url: serverOutfit.cover_media_url ?? null,
      server_updated_at: serverOutfit.updated_at,
    });
  }

  private async _execOutfitDelete(op: PendingOperation) {
    const local = await db.outfits.get(op.entity_id);
    const serverId = local?.server_id;
    if (serverId) {
      await outfitsApi.delete(serverId);
    }
    await embeddingRepository.remove('outfit', op.entity_id);
    await db.transaction('rw', [db.outfits, db.outfit_items], async () => {
      await db.outfit_items.where('outfit_id').equals(op.entity_id).delete();
      await db.outfits.delete(op.entity_id);
    });
  }

  // ─── ОПЕРАЦИИ: MEDIA ─────────────────────���─────────────────────────────────

  /** Загрузка фото: blob из IndexedDB → compress → upload → update item */
  private async _execItemMediaUpload(op: PendingOperation) {
    const payload = op.payload as { item_local_id: string; mime_type: string };
    const mediaLocalId = op.entity_id;

    // 1. Получить blob (IndexedDB + legacy localStorage fallback)
    const blobRecord = await db.blobs.get(mediaLocalId);
    const base64DataUrl = blobRecord?.data ?? localStorage.getItem(`plechiki_blob_${mediaLocalId}`);
    if (!base64DataUrl) {
      throw new Error('Blob not found in storage');
    }

    // 2. Конвертировать base64 → File
    const resp = await fetch(base64DataUrl);
    const blob = await resp.blob();
    const file = new File([blob], `photo_${mediaLocalId}.png`, {
      type: payload.mime_type || 'image/png',
    });

    // 3. Загрузить на сервер (с автоматической компрессией в mediaApi)
    const result = await mediaApi.upload(file, 'item_photo');

    // 4. Обновить локальную вещь с серверным media_id и URL
    const itemLocalId = payload.item_local_id;
    await db.items.update(itemLocalId, {
      primary_media_id: result.id,
      primary_media_url: result.url ?? null,
    });

    // 5. Обновить запись media
    await db.media.update(mediaLocalId, {
      server_media_id: result.id,
      server_url: result.url ?? null,
      status: 'uploaded',
    });

    // 6. Обновить payload будущей ITEM_CREATE с media_id
    const createOps = await db.pending_operations
      .where('[entity_type+entity_id]')
      .equals(['item', itemLocalId])
      .toArray();
    for (const createOp of createOps) {
      if (createOp.operation_type === 'ITEM_CREATE' && createOp.status === 'blocked') {
        const updatedPayload = { ...createOp.payload, primary_media_id: result.id };
        await db.pending_operations.update(createOp.id, { payload: updatedPayload });
      }
    }

    // 7. Очистить blob из storage
    await db.blobs.delete(mediaLocalId);
    localStorage.removeItem(`plechiki_blob_${mediaLocalId}`);
  }

  // ─── ОПЕРАЦИИ: EMBEDDINGS ──────────────────────────────────────────────────

  private async _execEmbeddingRefresh(op: PendingOperation) {
    const payload = op.payload as { entity_type: 'item' | 'outfit' };
    if (payload.entity_type === 'item') {
      const local = await db.items.get(op.entity_id);
      const serverId = local?.server_id;
      if (serverId) {
        const serverItem = await itemsApi.get(serverId);
        if (serverItem.embedding) {
          await embeddingRepository.upsertFromServer('item', op.entity_id, serverItem.embedding);
        }
      }
    } else if (payload.entity_type === 'outfit') {
      await outfitEmbeddingService.computeAndStore(op.entity_id);
    }
  }
}

/** Singleton — единственный экземпляр движка синхронизации */
export const syncEngine = new SyncEngine();

Жизненный цикл синхронизации

1. Инициализация (main.tsx)

import('./offline/sync/syncEngine').then(({ syncEngine }) => {
  syncEngine.start();  // Подписка на network events
});

2. Пользовательское действие → очередь

User clicks "Delete" → offlineItemsService.delete()
  → itemsRepository.markDeleted()      ← мгновенно в UI
  → syncEngine.enqueueAndProcess()     ← в очередь
    → processQueue()                   ← обработка (фон)

3. Восстановление сети

networkStatus → online event → syncEngine.processQueue()
  → getReady() → execute each → markDone/markFailed

4. React Query инвалидация

После каждой успешной о��ерации: - queryClient.invalidateQueries({ queryKey: ['items'] }) - UI перечитывает данные из IndexedDB → обновляется

Ключевые решения

Решение Обоснование
Последовательная обработка Предотвращает race conditions между зависимыми операциями
_running flag Один processQueue() в момент времени
ID маппинг в OUTFIT_CREATE Вещи созданы offline → нужен local→server ID resolve
Каскад в ITEM_DELETE Образы без вещи бессмысленны → удаляем локально
Media upload → Create chain Фото загружается первым, затем create получает media_id
Embedding invalidation После каждого CRUD обновляем similarity cache