Перейти к содержанию

Operation Queue — Полный код

Уровень: L3 (deep-dive) | ← Назад к Sync Engine

Файл: frontend/src/offline/sync/operationQueue.ts

Очередь операций с зависимостями, retry-логикой и rollback.

import { db } from '../db/dexie';
import { queryClient } from '../../queryClient';
import { syncRepository } from '../repositories/syncRepository';
import { shouldRetry, getNextRetryAt, getRetryDelay } from './retryPolicy';
import type { PendingOperation, OperationType, SyncEntityType, OperationStatus, SyncStatus } from './syncTypes';

/** Генерация уникального ID (Web Crypto API) */
function uuid(): string {
  return crypto.randomUUID();
}

export const operationQueue = {
  /**
   * Добавить опер��цию в очередь.
   * Если depends_on указан — операция создаётся в статусе 'blocked'.
   */
  async enqueue(params: {
    operation_type: OperationType;
    entity_type: SyncEntityType;
    entity_id: string;
    payload: Record<string, unknown>;
    depends_on?: string;
  }): Promise<PendingOperation> {
    const now = new Date().toISOString();
    const op: PendingOperation = {
      id: uuid(),
      operation_type: params.operation_type,
      entity_type: params.entity_type,
      entity_id: params.entity_id,
      payload: params.payload,
      depends_on: params.depends_on ?? null,
      status: params.depends_on ? 'blocked' : 'queued',  // blocked если есть зависимость
      retry_count: 0,
      next_retry_at: null,
      last_error: null,
      idempotency_key: uuid(),  // Для дедупликации на сервере
      created_at: now,
      updated_at: now,
    };
    await syncRepository.enqueue(op);
    return op;
  },

  /**
   * Получить готовые к выполнению операции:
   * - status = 'queued'
   * - Зависимости разрешены (depends_on = done)
   * - Время retry прошло (next_retry_at <= now)
   */
  async getReady(): Promise<PendingOperation[]> {
    const ready = await syncRepository.getReady();
    const now = Date.now();
    return ready.filter((op) => {
      if (op.next_retry_at && new Date(op.next_retry_at).getTime() > now) {
        return false;  // Ещё не время для retry
      }
      return true;
    });
  },

  /** Пометить как выполняемую */
  async markRunning(id: string): Promise<void> {
    await syncRepository.updateStatus(id, 'running');
  },

  /** Пометить как завершённую + разблокировать зависимые */
  async markDone(id: string): Promise<void> {
    await syncRepository.updateStatus(id, 'done');
    // Найти и разблокировать операции, ожидающие эту
    const blocked = await db.pending_operations
      .where('depends_on').equals(id)
      .toArray();
    for (const dep of blocked) {
      if (dep.status === 'blocked') {
        await syncRepository.updateStatus(dep.id, 'queued');
      }
    }
  },

  /** Пометить как неудачную с логикой retry */
  async markFailed(id: string, error: string): Promise<void> {
    const op = await db.pending_operations.get(id);
    if (!op) return;

    const newRetryCount = op.retry_count + 1;

    if (shouldRetry(op.retry_count)) {
      // Можно повторить — запланировать retry
      const delayMs = getRetryDelay(op.retry_count);
      await db.pending_operations.update(id, {
        status: 'failed' as OperationStatus,
        retry_count: newRetryCount,
        next_retry_at: getNextRetryAt(op.retry_count),
        last_error: error,
        updated_at: new Date().toISOString(),
      });
      // Через delay перевести обратно в queued и разбудить engine
      setTimeout(async () => {
        const current = await db.pending_operations.get(id);
        if (current && current.status === 'failed') {
          await syncRepository.updateStatus(id, 'queued');
          const { syncEngine } = await import('./syncEngine');
          syncEngine.processQueue();
        }
      }, delayMs);
    } else {
      // Максимум попыток исчерпан — перманентный fail
      await db.pending_operations.update(id, {
        status: 'failed' as OperationStatus,
        retry_count: newRetryCount,
        last_error: error,
        updated_at: new Date().toISOString(),
      });
      // Откатить soft-delete чтобы пользователь не потерял данные
      await this._rollbackIfDelete(op);
    }
  },

  /** Откат soft-delete при перманентном сбое удаления */
  async _rollbackIfDelete(op: PendingOperation): Promise<void> {
    if (op.operation_type === 'ITEM_DELETE') {
      const item = await db.items.get(op.entity_id);
      if (item && item.deleted_locally) {
        await db.items.update(op.entity_id, {
          deleted_locally: false,
          sync_status: 'synced' as SyncStatus,
        });
        console.warn(`[Sync] Rolled back soft-delete for item ${op.entity_id}`);
        queryClient.invalidateQueries({ queryKey: ['items'] });
      }
    } else if (op.operation_type === 'OUTFIT_DELETE') {
      const outfit = await db.outfits.get(op.entity_id);
      if (outfit && outfit.deleted_locally) {
        await db.outfits.update(op.entity_id, {
          deleted_locally: false,
          sync_status: 'synced' as SyncStatus,
        });
        console.warn(`[Sync] Rolled back soft-delete for outfit ${op.entity_id}`);
        queryClient.invalidateQueries({ queryKey: ['outfits'] });
      }
    }
  },

  /** Количество pending операций (для UI badge) */
  async getPendingCount(): Promise<number> {
    return syncRepository.getPendingCount();
  },

  /** Получить упавшие операции (для debug/profile) */
  async getFailed(): Promise<PendingOperation[]> {
    return syncRepository.getFailed();
  },

  /** Ручной retry конкретной операции */
  async retryOne(id: string): Promise<void> {
    await db.pending_operations.update(id, {
      status: 'queued' as OperationStatus,
      next_retry_at: null,
      updated_at: new Date().toISOString(),
    });
  },

  /** Отменить операцию */
  async cancel(id: string): Promise<void> {
    await syncRepository.updateStatus(id, 'cancelled');
  },
};

Структура операции (PendingOperation)

{
  id: "550e8400-e29b-41d4-a716-446655440000",  // UUID
  operation_type: "ITEM_CREATE",                 // Тип действия
  entity_type: "item",                           // Тип сущности
  entity_id: "660e8400-...",                     // ID сущности (local UUID)
  payload: { title: "Джинсы", category_id: 5 }, // Данные для API
  depends_on: "770e8400-...",                    // ID зависимой операции (или null)
  status: "queued",                              // Текущий статус
  retry_count: 0,                                // Счётчик попыток
  next_retry_at: null,                           // Вре��я следующей попытки
  last_error: null,                              // Последняя ошибка
  idempotency_key: "880e8400-...",               // Ключ идемпотент��ости
  created_at: "2026-04-12T10:00:00.000Z",
  updated_at: "2026-04-12T10:00:00.000Z"
}

Граф зависимостей (пример)

ITEM_MEDIA_UPLOAD (media, blob_123)
    └── ITEM_CREATE (item, item_456)  [depends_on: upload.id]
            ├── ITEM_UPDATE (item, item_456)  [depends_on: create.id]
            └── ITEM_DELETE (item, item_456)   [depends_on: create.id]

Когда ITEM_MEDIA_UPLOAD завершается (markDone), ITEM_CREATE автоматически разблокируется и переходит из blocked в queued.