# Phase B: Proactive Broadcasting & Live Sync

**Status**: Not Started  
**Priority**: High (Reduces conflict friction)  
**Estimated Effort**: 5-7 days  
**Dependencies**: Phase A complete (409 guard must be in place)

---

## Overview

Implement the **proactive layer**: real-time notifications via Laravel Broadcasting that warn users *before* they attempt a save, plus live table sync that keeps data fresh without requiring manual reloads.

This phase dramatically improves UX by reducing surprise 409 conflicts. Users learn about concurrent changes in real-time and can choose when to reload.

---

## Goals

- Notify record editors when external updates occur (sticky banner, only when dirty)
- Enable silent table refresh (Gantt-style) to prevent stale data in lists
- Reuse existing Gantt broadcasting infrastructure (`BroadcastHelper`, channel patterns)
- Maintain tenant isolation and robust fallback when websockets unavailable

---

## Prerequisites

- Phase A complete (409 guard operational)
- Broadcasting configured (`config/broadcasting.php`)
- Laravel Echo + Pusher (or Redis) working (verify via Gantt if deployed)
- `BroadcastHelper` class available (`app/Helpers/BroadcastHelper.php`)

---

## Technical Implementation

### Backend Changes

#### 1. Create `RecordUpdated` Broadcast Event

**File**: `src/Domain/Broadcasting/Events/RecordUpdated.php`

```php
<?php

namespace Domain\Broadcasting\Events;

use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class RecordUpdated implements ShouldBroadcastNow
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public string $recordType;
    public int $recordId;
    public string $updatedAtIso;
    public ?int $updatedById;
    public ?string $updatedByName;
    public ?string $tabId;
    public int $tenantId;

    public function __construct(
        string $recordType,
        int $recordId,
        string $updatedAtIso,
        ?int $updatedById = null,
        ?string $updatedByName = null,
        ?string $tabId = null,
        int $tenantId = null
    ) {
        $this->recordType = $recordType;
        $this->recordId = $recordId;
        $this->updatedAtIso = $updatedAtIso;
        $this->updatedById = $updatedById;
        $this->updatedByName = $updatedByName;
        $this->tabId = $tabId;
        $this->tenantId = $tenantId ?? app(TenantService::class)->getCurrentTenantId();
    }

    public function broadcastOn(): Channel
    {
        return new PrivateChannel("record.{$this->recordType}.{$this->recordId}");
    }

    public function broadcastAs(): string
    {
        return 'record.updated';
    }

    public function broadcastWith(): array
    {
        return [
            'tenant_id' => $this->tenantId,
            'record_type' => $this->recordType,
            'record_id' => $this->recordId,
            'updated_at_iso' => $this->updatedAtIso,
            'updated_by_id' => $this->updatedById,
            'updated_by_name' => $this->updatedByName,
            'tab_id' => $this->tabId,
        ];
    }
}
```

#### 2. Emit Broadcast After Record Updates

**Option A: Model Observers** (recommended for models with frequent updates):

```php
// src/Domain/Flows/Models/Flow.php (example)
namespace Domain\Flows\Models;

use Domain\Broadcasting\Events\RecordUpdated;
use App\Helpers\BroadcastHelper;

class Flow extends Model
{
    protected static function booted()
    {
        static::updated(function ($flow) {
            // Skip if import flag is set (reuse Gantt pattern)
            if (method_exists($flow, 'isImport') && $flow->isImport()) {
                return;
            }

            // Skip if already broadcast in this request
            if ($flow->alreadyBroadcast ?? false) {
                return;
            }

            $flow->alreadyBroadcast = true;

            try {
                BroadcastHelper::toOthers(
                    broadcast(new RecordUpdated(
                        recordType: 'flow',
                        recordId: $flow->id,
                        updatedAtIso: $flow->updated_at->toISOString(),
                        updatedById: $flow->updated_by,
                        updatedByName: $flow->updatedByUser?->name,
                        tabId: $flow->broadcastTabId ?? null
                    ))
                );
            } catch (\Exception $e) {
                // Log and continue (broadcast failures must not block saves)
                logger()->error('Failed to broadcast RecordUpdated', [
                    'record_type' => 'flow',
                    'record_id' => $flow->id,
                    'error' => $e->getMessage(),
                ]);
            }
        });
    }
}
```

**Option B: Controller Emission** (for models without observers):

```php
public function update(Request $request, $id)
{
    $record = YourModel::findOrFail($id);
    $this->checkConcurrency($record, $request->input('client_version'));
    
    $record->update($validated);

    // Broadcast after successful save
    try {
        BroadcastHelper::toOthers(
            broadcast(new RecordUpdated(
                recordType: 'your_model',
                recordId: $record->id,
                updatedAtIso: $record->updated_at->toISOString(),
                updatedById: $record->updated_by,
                updatedByName: $record->updatedByUser?->name,
                tabId: $request->input('tab_id')
            ))
        );
    } catch (\Exception $e) {
        logger()->error('Broadcast failed', ['record_id' => $record->id, 'error' => $e->getMessage()]);
    }

    return response()->json(['success' => true]);
}
```

#### 3. Add Channel Authorization

**File**: `routes/channels.php`

```php
use Illuminate\Support\Facades\Broadcast;

// Generic record channel authorization
Broadcast::channel('record.{recordType}.{recordId}', function ($user, $recordType, $recordId) {
    // 1. User must be authenticated
    if (!$user) {
        return false;
    }

    // 2. Tenant context must be set
    $currentDb = config('database.connections.tenant_connection.database');
    if (!$currentDb) {
        return false;
    }

    // 3. Map record type to model class
    $modelClass = match($recordType) {
        'flow' => \Domain\Flows\Models\Flow::class,
        'node' => \Domain\Nodes\Models\Node::class,
        'project' => \Domain\Projects\Models\Project::class,
        'projecttask' => \Domain\ProjectTasks\Models\ProjectTask::class,
        // Add other record types...
        default => null,
    };

    if (!$modelClass) {
        return false;
    }

    // 4. Record must exist
    $record = $modelClass::find($recordId);
    if (!$record) {
        return false;
    }

    // 5. User must have permission to view this record type
    // Adjust this to your permission system
    if (!$user->hasPermissionForRecordType('read', $recordType)) {
        return false;
    }

    return true;
});
```

### Frontend Changes

#### 1. Add Dirty Tracking to Record Editors

**Livewire Component**:

```php
class FlowEditor extends Component
{
    public Flow $flow;
    public array $formData = [];
    public bool $isDirty = false;
    public bool $showExternalUpdateBanner = false;
    public array $externalUpdateMeta = [];

    public function mount(Flow $flow)
    {
        $this->flow = $flow;
        $this->formData = $flow->toArray();
    }

    public function updated($propertyName)
    {
        if (str_starts_with($propertyName, 'formData.')) {
            $this->isDirty = true;
            $this->dispatch('dirty-state-changed', isDirty: true);
        }
    }

    public function handleExternalUpdate($event)
    {
        // Filter out same-tab events
        if (!empty($event['tab_id']) && $event['tab_id'] === $this->tabId) {
            return;
        }

        // Only show banner if we're dirty
        if (!$this->isDirty) {
            return;
        }

        // Check if update is newer than our loaded version
        $externalTime = new \DateTime($event['updated_at_iso']);
        $loadedTime = new \DateTime($this->flow->updated_at->toISOString());

        if ($externalTime > $loadedTime) {
            $this->externalUpdateMeta = $event;
            $this->showExternalUpdateBanner = true;
        }
    }

    public function reloadLatestFromBanner()
    {
        $this->flow->refresh();
        $this->formData = $this->flow->toArray();
        $this->isDirty = false;
        $this->showExternalUpdateBanner = false;
        $this->dispatch('notify', message: 'Reloaded latest version');
    }

    public function dismissBanner()
    {
        $this->showExternalUpdateBanner = false;
    }
}
```

**Blade Template with Echo Subscription**:

```blade
<div x-data="{
    tabId: 'tab_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9),
    echoChannel: null,
    
    init() {
        // Subscribe to record channel
        this.echoChannel = window.Echo.private('record.flow.{{ $flow->id }}')
            .listen('.record.updated', (event) => {
                @this.call('handleExternalUpdate', event);
            });
    },
    
    destroy() {
        if (this.echoChannel) {
            this.echoChannel.stopListening('.record.updated');
            window.Echo.leave('record.flow.{{ $flow->id }}');
        }
    }
}">
    
    {{-- External Update Banner --}}
    @if($showExternalUpdateBanner)
    <div class="sticky top-0 z-40 bg-yellow-50 border-b border-yellow-200 p-4">
        <div class="flex items-center justify-between">
            <div class="flex items-center gap-3">
                <svg class="w-5 h-5 text-yellow-600" fill="currentColor" viewBox="0 0 20 20">
                    <path fill-rule="evenodd" d="M8.257 3.099c.765-1.36 2.722-1.36 3.486 0l5.58 9.92c.75 1.334-.213 2.98-1.742 2.98H4.42c-1.53 0-2.493-1.646-1.743-2.98l5.58-9.92zM11 13a1 1 0 11-2 0 1 1 0 012 0zm-1-8a1 1 0 00-1 1v3a1 1 0 002 0V6a1 1 0 00-1-1z" clip-rule="evenodd"/>
                </svg>
                <span class="text-sm text-gray-900">
                    This record was updated by 
                    <strong>{{ $externalUpdateMeta['updated_by_name'] ?? 'another user' }}</strong>
                    while you have unsaved changes.
                </span>
            </div>
            <div class="flex gap-2">
                <button 
                    wire:click="reloadLatestFromBanner"
                    class="px-3 py-1 bg-yellow-600 text-white text-sm rounded hover:bg-yellow-700">
                    Reload Latest
                </button>
                <button 
                    wire:click="dismissBanner"
                    class="px-3 py-1 text-gray-600 text-sm hover:text-gray-900">
                    Dismiss
                </button>
            </div>
        </div>
    </div>
    @endif

    {{-- Form Fields --}}
    <form wire:submit.prevent="save">
        {{-- ... form inputs ... --}}
    </form>
</div>
```

#### 2. Add Table Sync (Gantt-Style Throttled Refresh)

**React Table Component** (example for Project Tasks table):

```jsx
import { useEffect, useState, useRef } from 'react';
import { useQueryClient } from '@tanstack/react-query';

export function ProjectTasksTable({ projectId }) {
    const queryClient = useQueryClient();
    const [pendingUpdates, setPendingUpdates] = useState(new Set());
    const [hasInlineEdit, setHasInlineEdit] = useState(false);
    const [showReloadPrompt, setShowReloadPrompt] = useState(false);
    const throttleTimer = useRef(null);

    useEffect(() => {
        // Subscribe to project-level channel (reuse Gantt channel)
        const channel = window.Echo.private(`project.${projectId}`)
            .listen('.task.updated', (event) => {
                handleExternalTaskUpdate(event);
            });

        return () => {
            channel.stopListening('.task.updated');
            window.Echo.leave(`project.${projectId}`);
        };
    }, [projectId]);

    function handleExternalTaskUpdate(event) {
        // Filter out same-tab events
        const myTabId = window.tabId; // Set on app init
        if (event.tab_id && event.tab_id === myTabId) {
            return;
        }

        // Accumulate pending updates
        setPendingUpdates(prev => new Set(prev).add(event.task_id));

        // Clear and reset throttle timer
        clearTimeout(throttleTimer.current);
        throttleTimer.current = setTimeout(() => {
            if (hasInlineEdit) {
                // User is editing; prompt instead of auto-refresh
                setShowReloadPrompt(true);
            } else {
                // Auto-refresh silently
                refreshTable();
            }
        }, 3000); // 3 second throttle
    }

    function refreshTable() {
        queryClient.invalidateQueries(['project-tasks', projectId]);
        setPendingUpdates(new Set());
        setShowReloadPrompt(false);
    }

    return (
        <div>
            {showReloadPrompt && (
                <div className="bg-blue-50 border-b border-blue-200 p-3 flex items-center justify-between">
                    <span className="text-sm text-gray-700">
                        {pendingUpdates.size} task(s) were updated while you're editing.
                    </span>
                    <div className="flex gap-2">
                        <button 
                            onClick={refreshTable}
                            className="px-3 py-1 bg-blue-600 text-white text-sm rounded">
                            Reload Now
                        </button>
                        <button 
                            onClick={() => setShowReloadPrompt(false)}
                            className="px-3 py-1 text-gray-600 text-sm">
                            Keep Editing
                        </button>
                    </div>
                </div>
            )}

            <table>
                {/* Table rows with inline edit handlers that set hasInlineEdit */}
            </table>
        </div>
    );
}
```

#### 3. Add Polling Fallback (for environments without websockets)

**Version Check Endpoint** (follows API structure versioning):

```php
// routes/api.php
Route::prefix('v1')->middleware(['web', 'auth'])->group(function () {
    Route::get('records/{type}/{id}/version', [RecordVersionController::class, 'show']);
});
```

**Controller** (extends BaseApiController):

```php
namespace App\Http\Controllers\Api\v1;

use App\Http\Controllers\Api\BaseApiController;

class RecordVersionController extends BaseApiController
{
    public function show(string $type, int $id)
    {
        // Map type to model class
        $modelClass = $this->getModelClass($type);
        $record = $modelClass::findOrFail($id);

        return $this->successResponse([
            'updated_at' => $record->updated_at->toJSON(),
            'updated_by_name' => $record->updatedByUser?->name,
        ]);
    }

    private function getModelClass(string $type): string
    {
        return match($type) {
            'flow' => \Domain\Flows\Models\Flow::class,
            'node' => \Domain\Nodes\Models\Node::class,
            'project' => \Domain\Projects\Models\Project::class,
            // Add other record types...
            default => throw new \InvalidArgumentException("Unknown record type: {$type}"),
        };
    }
}
```

**Livewire Component**:

```blade
<div x-data="{
    pollingInterval: null,
    loadedVersion: '{{ $record->updated_at->toJSON() }}',
    
    init() {
        if (!window.Echo) {
            // Websockets unavailable; start polling
            this.startPolling();
        }
    },
    
    startPolling() {
        this.pollingInterval = setInterval(() => {
            this.checkForUpdates();
        }, 30000); // 30 seconds
    },
    
    async checkForUpdates() {
        const response = await fetch('/api/v1/records/{{ $recordType }}/{{ $record->id }}/version');
        const data = await response.json();
        
        if (data.data.updated_at !== this.loadedVersion) {
            // Trigger the same handler as Echo events
            @this.call('handleExternalUpdate', {
                updated_at_iso: data.data.updated_at,
                updated_by_name: data.data.updated_by_name,
            });
        }
    },
    
    destroy() {
        if (this.pollingInterval) {
            clearInterval(this.pollingInterval);
        }
    }
}">
    {{-- Component content --}}
</div>
```

---

## Rollout Strategy

### Step 1: Backend Broadcasting (2 days)
1. Create `RecordUpdated` event
2. Add channel authorization for record types
3. Implement broadcast emission in 2-3 pilot models
4. Test with Pusher debug console

### Step 2: Frontend - Record Editors (2 days)
1. Add dirty tracking to Livewire base editor component
2. Create external update banner component
3. Implement Echo subscription pattern
4. Test multi-tab and multi-user scenarios

### Step 3: Frontend - Table Sync (2 days)
1. Add table-level Echo subscriptions (reuse Gantt channel)
2. Implement throttled refresh pattern
3. Add inline-edit detection and reload prompt
4. Test with React Query invalidation

### Step 4: Polling Fallback (1 day)
1. Create `RecordVersionController` extending `BaseApiController` (per API structure)
2. Add versioned route: `GET /api/v1/records/{type}/{id}/version`
3. Implement fallback polling in editors
4. Test in non-websocket environment
5. Document `X-Socket-ID` and `X-Tab-ID` headers in API structure

---

## Testing

### Backend Tests

```php
describe('RecordUpdated broadcast', function () {
    it('broadcasts on model update', function () {
        Event::fake([RecordUpdated::class]);

        $flow = Flow::factory()->create();
        $flow->update(['name' => 'Updated']);

        Event::assertDispatched(RecordUpdated::class, function ($event) use ($flow) {
            return $event->recordType === 'flow' 
                && $event->recordId === $flow->id;
        });
    });

    it('does not broadcast during imports', function () {
        Event::fake([RecordUpdated::class]);

        $flow = Flow::factory()->create();
        $flow->setImportFlag(true);
        $flow->update(['name' => 'Import update']);

        Event::assertNotDispatched(RecordUpdated::class);
    });

    it('includes tab_id when provided', function () {
        $flow = Flow::factory()->create();
        $flow->broadcastTabId = 'test_tab_123';
        
        Event::fake([RecordUpdated::class]);
        $flow->update(['name' => 'Test']);

        Event::assertDispatched(RecordUpdated::class, function ($event) {
            return $event->tabId === 'test_tab_123';
        });
    });
});
```

### Frontend Tests (Cypress/Playwright)

```javascript
describe('Dirty banner notification', () => {
    it('shows banner when record updated elsewhere and form is dirty', () => {
        // User A opens record editor
        cy.visit('/flows/1/edit');
        cy.get('[name="name"]').type('My changes'); // Make dirty

        // Simulate User B updating the record (trigger broadcast)
        cy.task('updateRecordViaApi', { id: 1, name: 'User B update' });

        // Banner should appear
        cy.contains('This record was updated by').should('be.visible');
        cy.contains('User B').should('be.visible');
    });

    it('does not show banner when form is not dirty', () => {
        cy.visit('/flows/1/edit');
        // Don't make changes

        cy.task('updateRecordViaApi', { id: 1, name: 'Update' });

        // Banner should NOT appear
        cy.contains('This record was updated by').should('not.exist');
    });
});
```

---

## Acceptance Criteria

### Record Editors
- [ ] Dirty state tracked accurately (changes to any field set `isDirty = true`)
- [ ] Echo subscription established on component mount
- [ ] External update events filtered by `tab_id`
- [ ] Sticky banner shows when: dirty + external update + newer version
- [ ] Banner includes updater name and timestamp
- [ ] "Reload Latest" refreshes form and clears dirty state
- [ ] "Dismiss" hides banner without reloading
- [ ] Polling fallback activates when Echo unavailable
- [ ] Polling uses versioned endpoint: `/api/v1/records/{type}/{id}/version`

### Table Sync
- [ ] Table subscribes to project/list-level channel
- [ ] External task updates accumulate and throttle (3-5s)
- [ ] Auto-refresh occurs when no inline edit active
- [ ] Reload prompt shows when inline edit is active
- [ ] "Reload Now" invalidates and refetches
- [ ] "Keep Editing" queues updates and keeps prompt visible

### Backend
- [ ] `RecordUpdated` broadcasts after model saves
- [ ] Broadcasts suppressed during imports
- [ ] `BroadcastHelper::toOthers` used to avoid socket ID errors
- [ ] Channel authorization enforces tenant + permission checks
- [ ] Broadcast failures logged but don't block saves
- [ ] `RecordVersionController` extends `BaseApiController` and uses `successResponse()`
- [ ] API documentation updated with optional headers (`X-Socket-ID`, `X-Tab-ID`)

---

## Performance Considerations

- **Broadcast volume**: Human edits only (low frequency); imports suppressed
- **Throttling**: 3-5 second throttle on table refreshes prevents storms
- **Payload size**: Keep events < 1KB (only metadata, not full models)
- **Channel subscriptions**: One per open editor/table (scales with active users)

---

## Next Steps

After Phase B is complete, proceed to **Phase C** to add Redis-backed presence tracking for "Who's viewing" UI and richer collaboration features.
