# Concurrent Wave Execution Fix

## Problem Summary

During large imports, multiple waves (up to 30) were dispatching simultaneously instead of sequentially, causing:
- **Queue Exhaustion**: 1,500+ jobs queued at once (30 waves × 50 batches)
- **System Instability**: Required 3 manual queue restarts during import
- **Violation of Design**: Only ONE wave should execute until reaching 90% completion threshold

## Root Cause Analysis

### Evidence from Database (sx_db_1754515181)

Wave dispatch timestamps revealed concurrent execution:

```
Wave 2:  Dispatched 2025-09-30 13:13:41
Wave 3:  Dispatched 2025-09-30 13:13:57
Wave 4:  Dispatched 2025-09-30 13:13:57
Waves 5-34: ALL dispatched at 2025-09-30 13:13:58 (30 WAVES SIMULTANEOUSLY!)
Wave 35: Dispatched 2025-09-30 13:20:47
Waves 36-37: Dispatched 2025-09-30 13:20:51
```

**30 waves dispatched in the same second** = catastrophic queue overload.

### Code Root Cause

Located in `src/App/Services/ImportJobs/WaveCoordinator.php`:

There are **TWO unguarded dispatch paths** that allowed concurrent execution:

#### Path #1: DB-RECONCILE (lines 1108-1113)
**BEFORE FIX:**
```php
if ($nextPending !== null) {
    $currentMainCompletion = $this->checkWaveCompletion($jobId, max(1, $currentWave));
    if ($currentMainCompletion['completed'] || $currentWave === 0) {
        Log::info('DB-RECONCILE: Dispatching next pending main wave', [
            'job_id' => $jobId,
            'current_wave' => $currentWave,
            'next_pending' => $nextPending
        ]);
        $this->dispatchWave($jobId, (int) $nextPending); // ← NO GUARDS!
```

#### Path #2: CATCH-UP (lines 1933-1938)
**BEFORE FIX:**
```php
$lowestPending = DB::connection($this->connectionName)
    ->table('wave_coordination')
    ->where('job_id', $jobId)
    ->where('dependency_level', -1)
    ->where('status', 'pending')
    ->orderBy('wave_number')
    ->value('wave_number');

if ($lowestPending) {
    Log::warning('CATCH-UP: Dispatching lowest pending main wave due to stall', [
        'job_id' => $jobId,
        'lowest_pending_wave' => $lowestPending
    ]);
    $this->dispatchWave($jobId, (int) $lowestPending); // ← NO GUARDS!
```

**The Problem:**

1. **All waves created at once**: `createMainRecordTypeWaves()` creates all main waves in a transaction
2. **Only first dispatched initially**: Line 1617 dispatches wave 1, leaves 2-N pending
3. **Rapid batch completions**: When batches complete quickly, multiple `BatchJobCompleted` events fire simultaneously
4. **Concurrent progression calls**: Each event calls `checkAndTriggerNextWave()` → `handleMainTypeWaveProgression()`
5. **No active wave check**: Catch-up logic sees pending waves and dispatches WITHOUT checking:
   - Is another wave already processing?
   - Has current wave reached 90% completion threshold?
6. **Mass dispatch**: Multiple concurrent calls to `dispatchWave()` fire for all pending waves

## The Fix

Added **guard systems at THREE dispatch points** to prevent concurrent wave execution:

### Fix #1: DB-RECONCILE Path Guards (Lines 1106-1169)

**Active Wave Check (Lines 1107-1127):**
```php
// CRITICAL: Before dispatching, ensure no other wave is currently processing
$activeWave = DB::connection($this->connectionName)
    ->table('wave_coordination')
    ->where('job_id', $jobId)
    ->where('wave_number', '!=', $currentWave)
    ->whereIn('status', ['dispatching', 'processing'])
    ->value('wave_number');

if ($activeWave) {
    Log::info('DB-RECONCILE: Skipping dispatch - another wave is actively processing', [
        'job_id' => $jobId,
        'active_wave' => $activeWave,
        'current_wave' => $currentWave,
        'next_pending' => $nextPending
    ]);
    return [
        'status' => 'monitoring',
        'message' => 'Waiting for active wave to complete',
        'job_id' => $jobId,
        'active_wave' => $activeWave
    ];
}
```

**Threshold Check (Lines 1132-1168):**
```php
// CRITICAL: Only dispatch if current wave reached threshold OR is first wave
if ($currentMainCompletion['completed'] ||
    ($currentMainCompletion['progress'] >= $this->completionThreshold) ||
    $currentWave === 0) {

    Log::info('DB-RECONCILE: Dispatching next pending main wave (safe - no active waves, threshold met)', [
        'job_id' => $jobId,
        'current_wave' => $currentWave,
        'current_progress' => $currentMainCompletion['progress'],
        'threshold' => $this->completionThreshold,
        'next_pending' => $nextPending
    ]);
    $this->dispatchWave($jobId, (int) $nextPending);
} else {
    Log::info('DB-RECONCILE: Current wave below threshold, waiting', [
        'job_id' => $jobId,
        'current_wave' => $currentWave,
        'progress' => $currentMainCompletion['progress'],
        'threshold' => $this->completionThreshold
    ]);
    return ['status' => 'monitoring', 'message' => 'Current wave below threshold'];
}
```

### Fix #2: Normal Progression Path Guard (Lines 1888-1910)

**Active Wave Check:**
```php
if ($nextMainWave) {
    // CRITICAL: Before dispatching, ensure no other wave is currently processing
    $activeWave = DB::connection($this->connectionName)
        ->table('wave_coordination')
        ->where('job_id', $jobId)
        ->where('wave_number', '!=', $currentWave)
        ->whereIn('status', ['dispatching', 'processing'])
        ->value('wave_number');

    if ($activeWave) {
        Log::info('🛑 MAIN WAVE PROGRESSION: Skipping dispatch - another wave is actively processing');
        return ['status' => 'monitoring', 'message' => 'Waiting for active wave to complete'];
    }

    // Safe to dispatch - no active waves
    $this->dispatchWave($jobId, $nextMainWave);
}
```

### Fix #3: Catch-Up Logic Guards (Lines 1926-1992)

**Lines 1926-1944 (AFTER FIX):**
```php
// CRITICAL: Ensure no other wave is currently processing
$activeWave = DB::connection($this->connectionName)
    ->table('wave_coordination')
    ->where('job_id', $jobId)
    ->whereIn('status', ['dispatching', 'processing'])
    ->value('wave_number');

if ($activeWave) {
    Log::info('CATCH-UP: Skipping - another wave is actively processing', [
        'job_id' => $jobId,
        'active_wave' => $activeWave,
        'current_wave' => $currentWave
    ]);
    return [
        'status' => 'monitoring',
        'message' => 'Waiting for active wave to complete before catch-up',
        'job_id' => $jobId,
        'active_wave' => $activeWave
    ];
}
```

**Effect**: If ANY wave is in `dispatching` or `processing` status, skip dispatch and wait.

### Guard #2: Enforce Completion Threshold

**Lines 1946-1961 (AFTER FIX):**
```php
// SECOND: Check if current wave has reached completion threshold
if (!$waveCompletion['completed'] && $waveCompletion['progress'] < $this->completionThreshold) {
    Log::info('CATCH-UP: Current wave has not reached threshold, waiting', [
        'job_id' => $jobId,
        'current_wave' => $currentWave,
        'progress' => $waveCompletion['progress'],
        'threshold' => $this->completionThreshold
    ]);
    return [
        'status' => 'monitoring',
        'message' => 'Current wave below threshold',
        'job_id' => $jobId,
        'current_wave' => $currentWave,
        'progress' => $waveCompletion['progress']
    ];
}
```

**Effect**: Only dispatch next wave if current wave has reached 90% threshold (from `config/waves.php`).

### Guard #3: Normal Progression Path Protection

**Lines 1888-1910 (AFTER FIX):**

Added identical active wave check to the normal progression path (when `$nextMainWave` is found):

```php
if ($nextMainWave) {
    // CRITICAL: Before dispatching, ensure no other wave is currently processing
    $activeWave = DB::connection($this->connectionName)
        ->table('wave_coordination')
        ->where('job_id', $jobId)
        ->where('wave_number', '!=', $currentWave) // Exclude current wave
        ->whereIn('status', ['dispatching', 'processing'])
        ->value('wave_number');

    if ($activeWave) {
        Log::info('🛑 MAIN WAVE PROGRESSION: Skipping dispatch - another wave is actively processing', [
            'job_id' => $jobId,
            'active_wave' => $activeWave,
            'current_wave' => $currentWave,
            'next_wave_candidate' => $nextMainWave
        ]);
        return [
            'status' => 'monitoring',
            'message' => 'Waiting for active wave to complete',
            'job_id' => $jobId,
            'active_wave' => $activeWave,
            'next_wave_candidate' => $nextMainWave
        ];
    }

    // Safe to dispatch - no active waves
    $this->dispatchWave($jobId, $nextMainWave);
```

**Effect**: Prevents race conditions in normal progression when multiple batch completions trigger simultaneously.

## Expected Behavior After Fix

### Sequential Wave Execution
1. **Wave 1 dispatches** at job start
2. **Wave 1 processes** until 90% threshold
3. **Wave 2 dispatches** only after Wave 1 reaches 90%
4. **Wave 2 processes** until 90% threshold
5. **Wave 3 dispatches** only after Wave 2 reaches 90%
6. And so on...

### Concurrent Call Handling
- **First call**: Sees no active waves, dispatches next wave
- **Subsequent calls**: See active wave, return `status: monitoring` and wait
- **Result**: Only ONE wave executes at a time

### Queue Load Management
- **Before**: 1,500 jobs (30 waves × 50 batches)
- **After**: 50 jobs maximum (1 wave × 50 batches)
- **Threshold**: Queue stays under control, no exhaustion

## Configuration

Wave execution controlled by `config/waves.php`:

```php
'wave_size' => env('WAVE_SIZE', 50), // Batches per wave
'completion_threshold' => env('WAVE_COMPLETION_THRESHOLD', 90), // Percentage before next wave
```

## Testing Recommendations

1. **Monitor wave dispatch times**: Verify waves dispatch sequentially with ~90% completion gaps
2. **Check queue depth**: Should never exceed `wave_size` × worker_count
3. **Validate threshold**: Next wave should only trigger after current reaches 90%+
4. **Log analysis**: Look for "Skipping - another wave is actively processing" messages (indicates guards working)

## Validation Criteria

✅ **Single Active Wave**: At any moment, only ONE wave has `status IN ('dispatching', 'processing')`

✅ **Threshold Enforcement**: Next wave only dispatches when current wave ≥ 90% complete

✅ **No Queue Exhaustion**: Queue depth stays under control throughout import

✅ **Completion Time**: May be slightly longer due to sequential execution, but system stability maintained

## Related Files

- **Fixed**: `src/App/Services/ImportJobs/WaveCoordinator.php`
  - Lines 1106-1169: DB-RECONCILE path guards (PRIMARY FIX - this was the active path)
  - Lines 1888-1910: Normal progression path guard
  - Lines 1926-1992: Catch-up logic guards
- **Config**: `config/waves.php`
  - `completion_threshold`: 90%
  - `wave_size`: 50 batches

## PHPStan Validation

```bash
./vendor/bin/phpstan analyze src/App/Services/ImportJobs/WaveCoordinator.php --level=5
# Result: ✅ No errors
```

## Additional Fix: Wave Completion Stall (Lines 928-956)

### Problem Discovered
After fixing concurrent execution, a new issue emerged: **waves stopped progressing after wave 7 completed**.

**Root Cause**: Wave progression relies on `BatchJobCompleted` events to call `checkAndTriggerNextWave()`. Once all batches in a wave complete, there are no more completion events to trigger the next wave!

**Evidence**:
- Wave 7 marked as `completed` in database
- Wave 8 remained in `pending` status indefinitely
- No more `BatchJobCompleted` events after wave 7's last batch
- No automatic trigger to dispatch wave 8

### Solution: Auto-Trigger on Wave Completion

Added logic in `checkWaveCompletion()` to automatically trigger next wave progression when a wave completes:

```php
// CRITICAL: Immediately trigger next wave progression since this wave just completed
// This ensures we don't rely solely on BatchJobCompleted events which stop after last batch
try {
    Log::info('Triggering next wave progression after wave completion', [
        'job_id' => $jobId,
        'completed_wave' => $waveNumber
    ]);

    // Use a queued job to trigger progression asynchronously to avoid blocking
    dispatch(function() use ($jobId, $waveNumber) {
        try {
            $coordinator = new WaveCoordinator();
            $coordinator->checkAndTriggerNextWave($jobId);
        } catch (\Throwable $e) {
            Log::error('Failed to trigger next wave after completion', [
                'job_id' => $jobId,
                'completed_wave' => $waveNumber,
                'error' => $e->getMessage()
            ]);
        }
    })->onQueue('import')->delay(now()->addSeconds(2)); // 2 second delay to ensure wave status is persisted

} catch (\Throwable $e) {
    Log::error('Failed to queue next wave progression', [
        'job_id' => $jobId,
        'wave_number' => $waveNumber,
        'error' => $e->getMessage()
    ]);
}
```

**Effect**: When wave N completes, the system queues a job to check and dispatch wave N+1 after 2 seconds, ensuring continuous progression even when no more batch events fire.

## Deployment Notes

- **No database changes** required
- **No config changes** required
- **Backward compatible** with existing imports
- **Effect**: Immediate - next import will respect single-wave execution AND auto-progress between waves
- **Monitoring**: Watch for sequential wave dispatch in logs and "Triggering next wave progression after wave completion" messages

## Success Metrics

**Before Fix:**
- 30 waves dispatched simultaneously
- 1,500+ jobs in queue
- 3 manual queue restarts required

**After Fix (Expected):**
- 1 wave active at any time
- ≤50 jobs in queue (1 wave)
- Zero manual interventions
- Stable queue throughout import
