Operation Queue — Полный код¶
Уровень: L3 (deep-dive) | ← Назад к Sync Engine
Файл: frontend/src/offline/sync/operationQueue.ts¶
Очередь операций с зависимостями, retry-логикой и rollback.
import { db } from '../db/dexie';
import { queryClient } from '../../queryClient';
import { syncRepository } from '../repositories/syncRepository';
import { shouldRetry, getNextRetryAt, getRetryDelay } from './retryPolicy';
import type { PendingOperation, OperationType, SyncEntityType, OperationStatus, SyncStatus } from './syncTypes';
/** Генерация уникального ID (Web Crypto API) */
function uuid(): string {
return crypto.randomUUID();
}
export const operationQueue = {
/**
* Добавить опер��цию в очередь.
* Если depends_on указан — операция создаётся в статусе 'blocked'.
*/
async enqueue(params: {
operation_type: OperationType;
entity_type: SyncEntityType;
entity_id: string;
payload: Record<string, unknown>;
depends_on?: string;
}): Promise<PendingOperation> {
const now = new Date().toISOString();
const op: PendingOperation = {
id: uuid(),
operation_type: params.operation_type,
entity_type: params.entity_type,
entity_id: params.entity_id,
payload: params.payload,
depends_on: params.depends_on ?? null,
status: params.depends_on ? 'blocked' : 'queued', // blocked если есть зависимость
retry_count: 0,
next_retry_at: null,
last_error: null,
idempotency_key: uuid(), // Для дедупликации на сервере
created_at: now,
updated_at: now,
};
await syncRepository.enqueue(op);
return op;
},
/**
* Получить готовые к выполнению операции:
* - status = 'queued'
* - Зависимости разрешены (depends_on = done)
* - Время retry прошло (next_retry_at <= now)
*/
async getReady(): Promise<PendingOperation[]> {
const ready = await syncRepository.getReady();
const now = Date.now();
return ready.filter((op) => {
if (op.next_retry_at && new Date(op.next_retry_at).getTime() > now) {
return false; // Ещё не время для retry
}
return true;
});
},
/** Пометить как выполняемую */
async markRunning(id: string): Promise<void> {
await syncRepository.updateStatus(id, 'running');
},
/** Пометить как завершённую + разблокировать зависимые */
async markDone(id: string): Promise<void> {
await syncRepository.updateStatus(id, 'done');
// Найти и разблокировать операции, ожидающие эту
const blocked = await db.pending_operations
.where('depends_on').equals(id)
.toArray();
for (const dep of blocked) {
if (dep.status === 'blocked') {
await syncRepository.updateStatus(dep.id, 'queued');
}
}
},
/** Пометить как неудачную с логикой retry */
async markFailed(id: string, error: string): Promise<void> {
const op = await db.pending_operations.get(id);
if (!op) return;
const newRetryCount = op.retry_count + 1;
if (shouldRetry(op.retry_count)) {
// Можно повторить — запланировать retry
const delayMs = getRetryDelay(op.retry_count);
await db.pending_operations.update(id, {
status: 'failed' as OperationStatus,
retry_count: newRetryCount,
next_retry_at: getNextRetryAt(op.retry_count),
last_error: error,
updated_at: new Date().toISOString(),
});
// Через delay перевести обратно в queued и разбудить engine
setTimeout(async () => {
const current = await db.pending_operations.get(id);
if (current && current.status === 'failed') {
await syncRepository.updateStatus(id, 'queued');
const { syncEngine } = await import('./syncEngine');
syncEngine.processQueue();
}
}, delayMs);
} else {
// Максимум попыток исчерпан — перманентный fail
await db.pending_operations.update(id, {
status: 'failed' as OperationStatus,
retry_count: newRetryCount,
last_error: error,
updated_at: new Date().toISOString(),
});
// Откатить soft-delete чтобы пользователь не потерял данные
await this._rollbackIfDelete(op);
}
},
/** Откат soft-delete при перманентном сбое удаления */
async _rollbackIfDelete(op: PendingOperation): Promise<void> {
if (op.operation_type === 'ITEM_DELETE') {
const item = await db.items.get(op.entity_id);
if (item && item.deleted_locally) {
await db.items.update(op.entity_id, {
deleted_locally: false,
sync_status: 'synced' as SyncStatus,
});
console.warn(`[Sync] Rolled back soft-delete for item ${op.entity_id}`);
queryClient.invalidateQueries({ queryKey: ['items'] });
}
} else if (op.operation_type === 'OUTFIT_DELETE') {
const outfit = await db.outfits.get(op.entity_id);
if (outfit && outfit.deleted_locally) {
await db.outfits.update(op.entity_id, {
deleted_locally: false,
sync_status: 'synced' as SyncStatus,
});
console.warn(`[Sync] Rolled back soft-delete for outfit ${op.entity_id}`);
queryClient.invalidateQueries({ queryKey: ['outfits'] });
}
}
},
/** Количество pending операций (для UI badge) */
async getPendingCount(): Promise<number> {
return syncRepository.getPendingCount();
},
/** Получить упавшие операции (для debug/profile) */
async getFailed(): Promise<PendingOperation[]> {
return syncRepository.getFailed();
},
/** Ручной retry конкретной операции */
async retryOne(id: string): Promise<void> {
await db.pending_operations.update(id, {
status: 'queued' as OperationStatus,
next_retry_at: null,
updated_at: new Date().toISOString(),
});
},
/** Отменить операцию */
async cancel(id: string): Promise<void> {
await syncRepository.updateStatus(id, 'cancelled');
},
};
Структура операции (PendingOperation)¶
{
id: "550e8400-e29b-41d4-a716-446655440000", // UUID
operation_type: "ITEM_CREATE", // Тип действия
entity_type: "item", // Тип сущности
entity_id: "660e8400-...", // ID сущности (local UUID)
payload: { title: "Джинсы", category_id: 5 }, // Данные для API
depends_on: "770e8400-...", // ID зависимой операции (или null)
status: "queued", // Текущий статус
retry_count: 0, // Счётчик попыток
next_retry_at: null, // Вре��я следующей попытки
last_error: null, // Последняя ошибка
idempotency_key: "880e8400-...", // Ключ идемпотент��ости
created_at: "2026-04-12T10:00:00.000Z",
updated_at: "2026-04-12T10:00:00.000Z"
}
Граф зависимостей (пример)¶
ITEM_MEDIA_UPLOAD (media, blob_123)
└── ITEM_CREATE (item, item_456) [depends_on: upload.id]
├── ITEM_UPDATE (item, item_456) [depends_on: create.id]
└── ITEM_DELETE (item, item_456) [depends_on: create.id]
Когда ITEM_MEDIA_UPLOAD завершается (markDone), ITEM_CREATE автоматически разблокируется и переходит из blocked в queued.