Sync Cycle — Полный код движка¶
Уровень: L3 (deep-dive) | ← Назад к Sync Engine
Файл: frontend/src/offline/sync/syncEngine.ts¶
Ядро синхронизации: подписка на network events, последовательная обработка очереди, маршрутизация операций.
import { operationQueue } from './operationQueue';
import { networkStatus } from '../network/networkStatus';
import { itemsApi } from '../../api/items';
import { outfitsApi } from '../../api/outfits';
import { mediaApi } from '../../api/media';
import { embeddingRepository } from '../../embeddings/embeddingRepository';
import { outfitEmbeddingService } from '../../embeddings/outfitEmbeddingService';
import { similarityService } from '../../embeddings/similarityService';
import { queryClient } from '../../queryClient';
import { getOutfitIdsContainingItem } from '../repositories/outfitsRepository';
import { itemsRepository } from '../repositories/itemsRepository';
import { db } from '../db/dexie';
import type { PendingOperation } from './syncTypes';
type SyncListener = () => void;
class SyncEngine {
private _running = false;
private _listeners = new Set<SyncListener>();
private _unsubNetwork: (() => void) | null = null;
/** Запуск: подписка на online/offline и об��аботка очереди */
start() {
if (this._unsubNetwork) return; // Уже запущен
this._unsubNetwork = networkStatus.subscribe((online) => {
if (online) {
this.processQueue(); // Возобновить при появлении сети
}
});
// Обработать сразу если online
if (networkStatus.online) {
this.processQueue();
}
}
/** Остановка движка */
stop() {
this._unsubNetwork?.();
this._unsubNetwork = null;
}
/** Подписка на изменения статуса (для UI badge) */
onChange(listener: SyncListener): () => void {
this._listeners.add(listener);
return () => this._listeners.delete(listener);
}
private _notify() {
this._listeners.forEach((fn) => fn());
}
/** Основной цикл обработки очереди */
async processQueue(): Promise<void> {
if (this._running) return; // Prevent concurrent runs
if (!networkStatus.online) return;
this._running = true;
this._notify();
try {
let ready = await operationQueue.getReady();
// Цикл: обработать все готовые, проверить новые разблокированные
while (ready.length > 0 && networkStatus.online) {
for (const op of ready) {
if (!networkStatus.online) break;
await operationQueue.markRunning(op.id);
try {
await this._executeOperation(op);
await operationQueue.markDone(op.id);
// Инвалидация React Query кешей
if (op.entity_type === 'item' || op.entity_type === 'media') {
queryClient.invalidateQueries({ queryKey: ['items'] });
if (op.operation_type === 'ITEM_DELETE') {
queryClient.invalidateQueries({ queryKey: ['outfits'] });
queryClient.invalidateQueries({ queryKey: ['shared-outfits'] });
}
}
if (op.entity_type === 'outfit') {
queryClient.invalidateQueries({ queryKey: ['outfits'] });
if (op.operation_type === 'OUTFIT_DELETE') {
queryClient.invalidateQueries({ queryKey: ['shared-outfits'] });
}
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
console.warn(`[Sync] Operation ${op.operation_type} failed:`, message);
await operationQueue.markFailed(op.id, message);
}
this._notify();
}
// Проверить новые разблокированные операции
ready = await operationQueue.getReady();
}
} finally {
this._running = false;
this._notify();
}
}
/** Enqueue + сразу попробовать обработать */
async enqueueAndProcess(params: Parameters<typeof operationQueue.enqueue>[0]): Promise<PendingOperation> {
const op = await operationQueue.enqueue(params);
this._notify();
this.processQueue(); // Не await — фоновая обработка
return op;
}
get isRunning() {
return this._running;
}
// ─── DISPATCH ──────────���────────────────────────��──────────────────────────
private async _executeOperation(op: PendingOperation): Promise<void> {
switch (op.operation_type) {
case 'ITEM_MEDIA_UPLOAD':
await this._execItemMediaUpload(op);
break;
case 'ITEM_CREATE':
await this._execItemCreate(op);
break;
case 'ITEM_UPDATE':
await this._execItemUpdate(op);
break;
case 'ITEM_DELETE':
await this._execItemDelete(op);
break;
case 'OUTFIT_CREATE':
await this._execOutfitCreate(op);
break;
case 'OUTFIT_UPDATE':
await this._execOutfitUpdate(op);
break;
case 'OUTFIT_DELETE':
await this._execOutfitDelete(op);
break;
case 'EMBEDDING_REFRESH':
await this._execEmbeddingRefresh(op);
break;
default:
console.warn(`[Sync] Unknown operation type: ${op.operation_type}`);
}
}
// ─── ОПЕРАЦИИ: ITEMS ───────────────────────────────────────────────────────
/** Создание вещи: отправить на сервер, замапить local→server ID */
private async _execItemCreate(op: PendingOperation) {
const payload = op.payload as {
title?: string; category_id: number;
primary_color_id?: number; primary_media_id?: string;
style_node_ids?: number[];
};
const serverItem = await itemsApi.create(payload);
// Обновить локальную запись: маппинг ID
await db.items.update(op.entity_id, {
server_id: serverItem.id,
local_only: false,
sync_status: 'synced',
primary_media_url: serverItem.primary_media_url ?? null,
server_updated_at: serverItem.updated_at,
});
// Сохранить embedding если вернулся с сервера
if (serverItem.embedding) {
await embeddingRepository.upsertFromServer('item', op.entity_id, serverItem.embedding);
similarityService.invalidateCache('item');
}
}
/** Обновление вещи на сервере */
private async _execItemUpdate(op: PendingOperation) {
const serverId = (await db.items.get(op.entity_id))?.server_id;
if (!serverId) throw new Error('No server_id for item update');
const payload = op.payload as Record<string, unknown>;
const serverItem = await itemsApi.update(serverId, payload);
await db.items.update(op.entity_id, {
sync_status: 'synced',
primary_media_url: serverItem.primary_media_url ?? null,
server_updated_at: serverItem.updated_at,
});
}
/** ��даление вещи: каскадное удаление образов + embedding cleanup */
private async _execItemDelete(op: PendingOperation) {
// 1. Найти образы, содержащие эту вещь
const affectedOutfitIds = await getOutfitIdsContainingItem(op.entity_id);
// 2. Удалить на сервере
const local = await db.items.get(op.entity_id);
const serverId = local?.server_id;
if (serverId) {
await itemsApi.delete(serverId);
}
// 3. Удалить embedding'и затронутых образов
for (const outfitId of affectedOutfitIds) {
await embeddingRepository.remove('outfit', outfitId);
}
if (affectedOutfitIds.length > 0) similarityService.invalidateCache('outfit');
// 4. Каскадное удаление образов из IndexedDB
await db.transaction('rw', [db.outfits, db.outfit_items], async () => {
for (const outfitId of affectedOutfitIds) {
await db.outfit_items.where('outfit_id').equals(outfitId).delete();
await db.outfits.delete(outfitId);
}
});
// 5. Удалить embedding и саму вещь
await embeddingRepository.remove('item', op.entity_id);
similarityService.invalidateCache('item');
await itemsRepository.removeAfterSync(op.entity_id);
}
// ─── ОПЕРАЦ��И: OUTFITS ─────────────────────���───────────────────────────────
/** Создание образа: resolve local item IDs → server IDs */
private async _execOutfitCreate(op: PendingOperation) {
const payload = op.payload as {
title?: string;
outfit_items?: Array<{ item_id: string; slot_id: number; layer_index?: number }>;
};
// Заменить локальные UUID вещей на серверные
if (payload.outfit_items) {
for (const oi of payload.outfit_items) {
const item = await db.items.get(oi.item_id);
if (item?.server_id) {
oi.item_id = item.server_id;
}
}
}
const serverOutfit = await outfitsApi.create(payload);
await db.outfits.update(op.entity_id, {
server_id: serverOutfit.id,
local_only: false,
sync_status: 'synced',
title: serverOutfit.title ?? null,
cover_media_url: serverOutfit.cover_media_url ?? null,
server_updated_at: serverOutfit.updated_at,
});
// Вычислить производный embedding образа
await outfitEmbeddingService.computeAndStore(op.entity_id);
similarityService.invalidateCache('outfit');
}
private async _execOutfitUpdate(op: PendingOperation) {
const serverId = (await db.outfits.get(op.entity_id))?.server_id;
if (!serverId) throw new Error('No server_id for outfit update');
const payload = op.payload as Record<string, unknown>;
const serverOutfit = await outfitsApi.update(serverId, payload);
await db.outfits.update(op.entity_id, {
sync_status: 'synced',
cover_media_url: serverOutfit.cover_media_url ?? null,
server_updated_at: serverOutfit.updated_at,
});
}
private async _execOutfitDelete(op: PendingOperation) {
const local = await db.outfits.get(op.entity_id);
const serverId = local?.server_id;
if (serverId) {
await outfitsApi.delete(serverId);
}
await embeddingRepository.remove('outfit', op.entity_id);
await db.transaction('rw', [db.outfits, db.outfit_items], async () => {
await db.outfit_items.where('outfit_id').equals(op.entity_id).delete();
await db.outfits.delete(op.entity_id);
});
}
// ─── ОПЕРАЦИИ: MEDIA ─────────────────────���─────────────────────────────────
/** Загрузка фото: blob из IndexedDB → compress → upload → update item */
private async _execItemMediaUpload(op: PendingOperation) {
const payload = op.payload as { item_local_id: string; mime_type: string };
const mediaLocalId = op.entity_id;
// 1. Получить blob (IndexedDB + legacy localStorage fallback)
const blobRecord = await db.blobs.get(mediaLocalId);
const base64DataUrl = blobRecord?.data ?? localStorage.getItem(`plechiki_blob_${mediaLocalId}`);
if (!base64DataUrl) {
throw new Error('Blob not found in storage');
}
// 2. Конвертировать base64 → File
const resp = await fetch(base64DataUrl);
const blob = await resp.blob();
const file = new File([blob], `photo_${mediaLocalId}.png`, {
type: payload.mime_type || 'image/png',
});
// 3. Загрузить на сервер (с автоматической компрессией в mediaApi)
const result = await mediaApi.upload(file, 'item_photo');
// 4. Обновить локальную вещь с серверным media_id и URL
const itemLocalId = payload.item_local_id;
await db.items.update(itemLocalId, {
primary_media_id: result.id,
primary_media_url: result.url ?? null,
});
// 5. Обновить запись media
await db.media.update(mediaLocalId, {
server_media_id: result.id,
server_url: result.url ?? null,
status: 'uploaded',
});
// 6. Обновить payload будущей ITEM_CREATE с media_id
const createOps = await db.pending_operations
.where('[entity_type+entity_id]')
.equals(['item', itemLocalId])
.toArray();
for (const createOp of createOps) {
if (createOp.operation_type === 'ITEM_CREATE' && createOp.status === 'blocked') {
const updatedPayload = { ...createOp.payload, primary_media_id: result.id };
await db.pending_operations.update(createOp.id, { payload: updatedPayload });
}
}
// 7. Очистить blob из storage
await db.blobs.delete(mediaLocalId);
localStorage.removeItem(`plechiki_blob_${mediaLocalId}`);
}
// ─── ОПЕРАЦИИ: EMBEDDINGS ──────────────────────────────────────────────────
private async _execEmbeddingRefresh(op: PendingOperation) {
const payload = op.payload as { entity_type: 'item' | 'outfit' };
if (payload.entity_type === 'item') {
const local = await db.items.get(op.entity_id);
const serverId = local?.server_id;
if (serverId) {
const serverItem = await itemsApi.get(serverId);
if (serverItem.embedding) {
await embeddingRepository.upsertFromServer('item', op.entity_id, serverItem.embedding);
}
}
} else if (payload.entity_type === 'outfit') {
await outfitEmbeddingService.computeAndStore(op.entity_id);
}
}
}
/** Singleton — единственный экземпляр движка синхронизации */
export const syncEngine = new SyncEngine();
Жизненный цикл синхронизации¶
1. Инициализация (main.tsx)¶
import('./offline/sync/syncEngine').then(({ syncEngine }) => {
syncEngine.start(); // Подписка на network events
});
2. Пользовательское действие → очередь¶
User clicks "Delete" → offlineItemsService.delete()
→ itemsRepository.markDeleted() ← мгновенно в UI
→ syncEngine.enqueueAndProcess() ← в очередь
→ processQueue() ← обработка (фон)
3. Восстановление сети¶
networkStatus → online event → syncEngine.processQueue()
→ getReady() → execute each → markDone/markFailed
4. React Query инвалидация¶
После каждой успешной о��ерации:
- queryClient.invalidateQueries({ queryKey: ['items'] })
- UI перечитывает данные из IndexedDB → обновляется
Ключевые решения¶
| Решение | Обоснование |
|---|---|
| Последовательная обработка | Предотвращает race conditions между зависимыми операциями |
_running flag |
Один processQueue() в момент времени |
| ID маппинг в OUTFIT_CREATE | Вещи созданы offline → нужен local→server ID resolve |
| Каскад в ITEM_DELETE | Образы без вещи бессмысленны → удаляем локально |
| Media upload → Create chain | Фото загружается первым, затем create получает media_id |
| Embedding invalidation | После каждого CRUD обновляем similarity cache |