# Epic 7: Three-Way Merge & Orchestrator — Design Spec

## Business Justification & Problem Statement

In a bi-directional sync between SuiteX and NetSuite, four distinct event sources -- NetSuite User Event scripts (UE), NetSuite polling, SuiteX-originated changes, and reconciliation/backfill jobs -- feed changes into a shared event stream. Race conditions are inevitable. If a user in SuiteX changes a Project Name at 10:00:01 AM, and an accountant in NetSuite changes that same Project's Status at 10:00:02 AM, both systems will fire events into our `events.raw` Pub/Sub topic.

Without a merge layer, these events will overwrite each other, causing data loss. Furthermore, circular update loops are a critical risk: SuiteX updates NetSuite, which triggers a UE script that emits an event back to SuiteX, which then writes to NetSuite again -- an infinite ping-pong loop that will exhaust NetSuite governance limits and corrupt state. We also face duplicate events from overlapping UE and polling coverage of the same NetSuite change.

We need a multi-layered system of stateless orchestrator workers that: maintain an authoritative per-record projection of current state, perform three-way merges against the target system's live state, resolve field-level conflicts using tenant-configured policies, suppress circular updates through attribution-based detection, and kill redundant writes before they leave the system.

## Proposed Architectural Solution

We will build a set of stateless Laravel background workers that consume from Pub/Sub subscriptions. The system comprises five distinct consumers:

1. **Merge Service** -- subscribes to `events.raw`, normalizes, deduplicates (fingerprint-based UE vs. polling arbitration), coalesces events within a short time window, and publishes canonical `MergedEvent` records to `events.merged`.
2. **NetSuite Writer** -- subscribes to `events.merged`, filters for SuiteX-originated changes requiring NetSuite writes, applies governance-aware batching/throttling, and issues RESTlet calls with idempotency keys and marker fields (`custbody_suitex_write_id`, `custbody_suitex_write_source`).
3. **SuiteX Writer** -- subscribes to `events.merged`, applies NetSuite-originated changes to SuiteX tenant databases and updates the `current_state` projection.
4. **Reconciliation Service** -- runs in background, detects drift between `current_state` and actual NetSuite state, emits corrective events.
5. **Error/DLQ Handler** -- consumes from `events.error` and `events.dlq`, manages retries and escalation.

**State management**: Rather than replaying historical events on every message, the system maintains a materialized `current_state` projection table (keyed by `account_id, record_type, record_id`) that is updated incrementally after each successful merge. The `events` table serves as the immutable audit log, not the runtime data source.

**Merge algorithm**: When a raw event arrives, the Merge Service performs a three-way merge:
1. Load `Base` -- the projection version the event was derived from (via `baseVersion`).
2. Fetch `RemoteCurrent` -- the current state of the target system with its version (see **RemoteCurrent Fetch Mechanism** below).
3. If `remoteVersion == baseVersion`, apply `E.changes` directly (no concurrent changes).
4. Otherwise, compute `remoteDelta = diff(Base, RemoteCurrent)` and `ourDelta = E.changes`. Run a per-field three-way merge using the tenant's `field_metadata.conflict_policy` rules. If conflicts cannot be auto-resolved, enqueue to the `conflicts` table and lock the record for human review.
5. On success, atomically update `current_state.version` and `current_state.state`.

**RemoteCurrent Fetch Mechanism**: `RemoteCurrent` is obtained via a live `GET` read from the NetSuite API immediately before applying writes — it is not a stored snapshot or a custom record in NetSuite. The Writer performs a **read-verify-update** pattern:
1. Before writing, the Writer issues a RESTlet or SuiteTalk REST API call (e.g., `record.load()` via RESTlet, or `GET /record/v1/{recordType}/{recordId}` via REST API) to retrieve the target record's current field values.
2. The response constitutes `RemoteCurrent` — the authoritative live state of the record in NetSuite at the moment of fetch.
3. The Writer compares `RemoteCurrent.version` (derived from `lastModifiedDate` or an explicit version field) against `baseVersion` to determine whether concurrent changes have occurred.
4. If concurrent changes are detected, the three-way merge runs against the fetched `RemoteCurrent` — not against any cached or projected state.
5. The merged result is then written back to NetSuite in the same logical operation.

This approach guarantees the merge operates on the true current state of the record, but carries governance cost: every write requires at least one additional API call (the read). The Writer must account for this in governance budget calculations (each write effectively consumes 2 API calls: 1 GET + 1 PUT/POST). SOAP is not used for this fetch — NetSuite is sunsetting SOAP Web Services, and all new integrations must use RESTlet or SuiteTalk REST API endpoints.

**Circular update prevention**: Six enforcement layers work together to make loops impossible:
- **Event attribution metadata** (`sourceSystem`, `writeId`, `transactionGroupId`) on every event identifies the originating system.
- **NetSuite marker fields** (`custbody_suitex_write_id`, `custbody_suitex_write_source`) cause UE scripts to suppress re-emission of SuiteX-originated writes.
- **Write Ledger** (`write_ledger` table) records per-record/field the last `write_id`, `source_system`, and `write_timestamp`, enabling duplicate and circular echo detection.
- **Merge engine suppression** compares incoming `writeId`/`sourceSystem` against the Write Ledger; if the event matches a recent write from the same source within a configurable time threshold (e.g., 15 seconds), it is dropped from the merge path.
- **Polling suppression** checks marker fields during discovery to skip SuiteX-originated changes.
- **SuiteX inbound filtering** rejects events where `source_system == 'suitex'` arriving from NetSuite.

All suppressed events are still recorded in the `event_audit_log` for auditability.

Finally, the resolved `MergedEvent` is published to the `events.merged` Pub/Sub topic for downstream delivery to the destination system.

### Event Lifecycle Flowchart

```mermaid
flowchart TD
    RAW["<b>events.raw</b><br/>Pub/Sub topic"]

    subgraph MS["<b>Merge Service</b>"]
        V{{"Validate against<br/>ChangeEvent Schema"}}
        AUD["Record in events table<br/>(immutable audit log)"]
        FP{{"Fingerprint<br/>deduplication"}}
        CIR{{"Circular update check<br/>via write_ledger"}}
        COAL["Coalescing buffer<br/>(Redis-backed, 5s window)"]
    end

    MERGED["<b>events.merged</b><br/>Pub/Sub topic"]

    subgraph NW["<b>NetSuite Writer</b><br/>(SuiteX → NetSuite)"]
        FF{{"Feature Flag<br/>netsuite_writer_enabled?"}}
        BYPASS["Update current_state<br/>Record in write_ledger<br/>ACK — skip HTTP call"]
        FETCH["Fetch RemoteCurrent<br/>from NetSuite"]
        VER{{"remoteVersion<br/>== baseVersion?"}}
        APPLY_DIRECT["Apply changes directly"]
        TWM["Three-Way Merge<br/>per field"]
        FIELD{{"Both sides changed<br/>same field?"}}
        UNI["Apply change —<br/>no conflict to resolve"]
        POL{{"conflict_policy?"}}
        NSW["Keep NetSuite value"]
        SXW["Keep SuiteX value"]
        LWW["Keep later timestamp"]
        MAN["Create conflict record<br/>Lock record via record_lock"]
        REST["Write to NetSuite<br/>via RESTlet with<br/>idempotency key"]
        NS_STATE["Update current_state<br/>Record in write_ledger"]
    end

    subgraph SW["<b>SuiteX Writer</b><br/>(NetSuite → SuiteX)"]
        SX_APPLY["Apply changes to<br/>tenant database"]
        SX_STATE["Update current_state<br/>Record in write_ledger"]
    end

    subgraph ERR["<b>Error / DLQ Handler</b>"]
        ECLASS{{"Error class?"}}
        RETRY["Retry with backoff<br/>(events.error)"]
        HUMAN["sync_error_queue<br/>(human review)"]
        EXHAUST["events.dlq +<br/>sync_error_queue<br/>status = exhausted"]
    end

    DONE(("✓ Done"))

    RAW --> V
    V -->|"Fail"| HUMAN
    V -->|"Pass"| AUD --> FP
    FP -->|"Duplicate<br/>(UE confirms polling)"| DONE
    FP -->|"New event"| CIR
    CIR -->|"Circular echo —<br/>suppress & log"| DONE
    CIR -->|"Not circular"| COAL
    COAL -->|"Flush"| MERGED

    MERGED -->|"source = suitex"| FF
    MERGED -->|"source = netsuite-*<br/>or reconciliation"| SX_APPLY

    FF -->|"Disabled (bypass)"| BYPASS --> DONE
    FF -->|"Enabled"| FETCH --> VER
    VER -->|"Equal — no<br/>concurrent changes"| APPLY_DIRECT
    VER -->|"Different — concurrent<br/>changes detected"| TWM --> FIELD
    FIELD -->|"Only one side<br/>changed the field"| UNI --> APPLY_DIRECT
    FIELD -->|"Both sides changed<br/>the field (conflict)"| POL
    POL -->|"netsuite-wins"| NSW --> APPLY_DIRECT
    POL -->|"suitex-wins"| SXW --> APPLY_DIRECT
    POL -->|"last-write-wins"| LWW --> APPLY_DIRECT
    POL -->|"manual"| MAN --> DONE

    APPLY_DIRECT --> REST --> NS_STATE --> DONE

    SX_APPLY --> SX_STATE --> DONE

    REST -.->|"Error"| ECLASS
    ECLASS -->|"Transient / 429 / Auth"| RETRY
    ECLASS -->|"Validation"| HUMAN
    RETRY -->|"Exhausted"| EXHAUST
```

## Explicit Functional Requirements

**A. Ingestion & Validation:**

Consume messages from the `events.raw` Pub/Sub subscription. Validate the incoming payload against the `ChangeEvent` JSON Schema (Appendix C1). Query `field_metadata` to verify the tenant's field mappings, normalization rules, and sync configuration. If validation fails, route immediately to `events.dlq`. If `fullSnapshotRef` is present, download the external payload from GCS into memory.

**B. The Immutable Audit Write:**

Immediately INSERT the validated raw event into the `events` table (immutable store) and record its fingerprint in `event_audit_log`. This guarantees a complete historical audit trail regardless of the eventual merge outcome. Even events that are later suppressed as circular or duplicate must be recorded.

**C. Deduplication & UE/Polling Arbitration:**

Before merging, check the event's fingerprint (`hash(recordType, recordId, lastModifiedDate, operation)`) against `event_audit_log`. If a polling event's fingerprint matches a prior UE event, treat it as a confirmation (no duplicate logical change), optionally enriching the projection with additional fields from the polling payload. If no match exists, treat it as a new authoritative NetSuite change.

**D. Three-Way Merge & Conflict Resolution:**

Load the `Base` snapshot from `current_state` using the event's `baseVersion`. Fetch `RemoteCurrent` from the target system. Apply the three-way merge per-field using `field_metadata.conflict_policy`:
- `netsuite-wins`: On conflict, accept the NetSuite value. Typical use: system-managed fields where NetSuite is the authority.
- `suitex-wins`: On conflict, accept the SuiteX value. Typical use: fields where SuiteX is the primary editing surface.
- `last-write-wins`: On conflict, accept whichever change has the later timestamp. Use sparingly and document risks.
- `manual`: On conflict, queue for human review; lock the record via `record_lock` to block automated writes until resolved.

Note: these policies only activate when both systems have changed the same field since the shared base version. A unilateral change (only one side changed the field) is always applied regardless of the policy value.

Apply `field_metadata.normalization_rule` before comparison to handle NetSuite field type quirks (checkbox `T`/`F` vs `true`/`false`, multiselect ordering, date timezone normalization).

**E. Circular Update Suppression & Egress:**

Before publishing, consult the `write_ledger` and `idempotency_keys` tables:
- If the incoming event's `writeId` matches an existing entry, skip (idempotent duplicate).
- If the incoming event's `sourceSystem` matches the previous write's source and the timestamp is within the configured threshold, skip (circular echo).
- If the merged changes produce zero delta against `current_state`, ACK the message and stop.

If data changed, publish the final resolved `MergedEvent` to the `events.merged` Pub/Sub topic. Update `current_state`, `write_ledger`, and `event_audit_log` atomically.

## Architectural Boundaries & Technical Constraints

**Strict Ordering:** The system must respect Pub/Sub Message Ordering using ordering key `recordType:recordId`. Events for the same record must be processed serially. Concurrent processing of events for the same record will corrupt the `current_state` projection.

**Statelessness:** All worker processes must remain entirely stateless. After processing each message, no in-memory state carries over. Workers rely on Cloud SQL (`current_state`, `events`, `write_ledger`, `idempotency_keys`, `field_metadata`) and the target system's live state as their sources of truth.

**Idempotency:** All writes (to SuiteX and NetSuite) must include idempotency keys in the canonical format `<accountId>:<source>:<recordType>:<recordId>:<eventTimestamp>:<operation>`. Keys are persisted in the `idempotency_keys` table with a minimum 30-day retention window. The `accountId` prefix is required because NetSuite `internalId` values are per-account (not globally unique) — without it, two tenants with the same `recordType:recordId` updating in the same second would produce identical keys, causing the second event to be silently dropped as a false duplicate.

**Governance Awareness:** The NetSuite Writer must enforce per-account adaptive concurrency, never exceeding 50% of the account's concurrent request limit sustained. Circuit breaker activates when error rate exceeds 50%, pausing writes for 5 minutes.

**Development Against the Mock Server (Epic 3):** All Epic 7 development and integration testing must target the Epic 3 Node.js Mock Server rather than live NetSuite. The mock server's current scope (RESTlet write endpoints, cursor-based poller endpoint, chaos middleware) must be extended to support the interactions required by Epic 7:

1. **Record Read Endpoint (RemoteCurrent):** The mock server must expose a `GET /mock-netsuite/restlet/{recordType}/{recordId}` endpoint that returns the record's current field values, `lastModifiedDate`, and a monotonic version number. This is the target for the NetSuite Writer's read-verify-update pattern. The endpoint must accept an optional `fields` query parameter so tests can verify that the Writer only fetches synced fields.
2. **Marker Field Storage & Inspection:** Write endpoints must accept and persist `custbody_suitex_write_id` and `custbody_suitex_write_source` marker fields. The poller endpoint must return these fields so circular update prevention and polling suppression can be tested end-to-end.
3. **Concurrent Modification Simulation:** The mock server must support a test mode where a record's state can be externally mutated between a GET and a subsequent PUT, allowing the Writer to encounter a version mismatch and exercise the `ConcurrentModification` retry path.
4. **UE Event Simulation:** The mock server must provide an endpoint (e.g., `POST /mock-netsuite/simulate/ue-event`) that generates a canonical `ChangeEvent` payload simulating a NetSuite User Event script emission and publishes it to `events.raw`. This allows the Merge Service to be tested with realistic NetSuite-originated events without deploying actual UE scripts.
5. **Polling Event Simulation:** Similarly, a `POST /mock-netsuite/simulate/poll-event` endpoint must generate polling-style events (with `source: "netsuite-poll"`) for the same record to test UE/polling deduplication and fingerprint arbitration.
6. **Error Injection:** Beyond the existing chaos middleware (latency, 429s), the mock server should support targeted error injection per record or per request type — validation errors, auth errors, governance limit errors — so all six error classes in Epic 7's retry policy can be tested deterministically.

These mock server extensions are a prerequisite for Stage 3 (Consumers) and Stage 5 (Integration Tests). The extensions should be coordinated with the Epic 3 owner and may be delivered as a separate Jira task under Epic 3.

## Testable Acceptance Criteria

**Scenario 1 (Circular Update Suppression):**

**Given** SuiteX previously wrote Project A's name to "Acme" (recorded in `write_ledger` with `source_system = 'suitex'` and a `write_id`)

**When** a raw event arrives from NetSuite (via UE or polling) with the same `write_id` or matching `source_system = 'suitex'` within the configured time threshold

**Then** the Merge Service suppresses the event from the merge path, records it in `event_audit_log` as a circular echo, and does not publish to `events.merged`.

**Scenario 2 (Conflict Resolution with Per-Field Policies):**

**Given** `field_metadata` defines `conflict_policy = 'suitex-wins'` for "Project Name" and `conflict_policy = 'netsuite-wins'` for "Status"

**When** a SuiteX event (baseVersion=5) changes both Name to "Acme" and Status to "Active", **and** RemoteCurrent (version 6) shows NetSuite has concurrently changed Status to "Closed" since baseVersion 5

**Then** the three-way merge detects a conflict on Status (both sides changed it since base), applies `netsuite-wins` to keep "Closed", accepts the non-conflicting Name change to "Acme" (no conflict — only SuiteX changed it, so the policy is not consulted), and forwards the merged result `{name: "Acme", status: "Closed"}` at version 7. Note: if NetSuite had NOT changed Status, the SuiteX Status change to "Active" would succeed regardless of the `netsuite-wins` policy — `conflict_policy` only activates when both systems have changed the same field.

**Scenario 3 (Three-Way Merge with Concurrent Changes):**

**Given** `current_state` has Project B at version 5 with `{name: "Alpha", status: "Active"}`

**When** a SuiteX event (baseVersion=5) changes `name` to "Beta", AND the target system's RemoteCurrent is at version 6 with `{name: "Alpha", status: "Closed"}`

**Then** the Merge Service detects non-conflicting concurrent changes (SuiteX changed `name`, NetSuite changed `status`), merges to `{name: "Beta", status: "Closed"}`, and updates `current_state` to version 7.

**Scenario 4 (UE/Polling Deduplication):**

**Given** a UE event for Customer 123 was already recorded in `event_audit_log` with fingerprint `hash("act-123:customer:123:2026-03-12T10:00:00Z:update")`

**When** a polling event arrives for the same Customer 123 with the same fingerprint

**Then** the Merge Service treats it as a confirmation, does not produce a duplicate `MergedEvent`, and optionally enriches the projection with any additional polling fields.

**Scenario 5 (Conflict Queuing for Manual Policy):**

**Given** a field "Budget" has `conflict_policy = 'manual'`

**When** both SuiteX and NetSuite change "Budget" concurrently (different values since `baseVersion`)

**Then** the Merge Service creates a record in the `conflicts` table, locks the record via `record_lock`, and blocks automated writes until human resolution.

## Deliverables (Staged per V10)

**Stage 1 -- Event Model & Durable Event Store:**
- `ChangeEvent` JSON schema and `events` table migration.
- Producers: UE emitter, Polling emitter, SuiteX emitter (transactional outbox pattern).
- Integration tests demonstrating replayability and per-record ordering.

**Stage 2 -- Projections & Versioning:**
- `current_state` projection table, version logic, and projection API.
- Migration of snapshot-based projection into `current_state`.

**Stage 3 -- Orchestrator + Three-Way Merge:**
- Merge Service consumer (`ProcessRawEvent` job) with three-way merge algorithm.
- `ConflictResolver` service class with per-field policy engine driven by `field_metadata`.
- `WriteledgerService` for circular update tracking.
- Idempotency key generation and enforcement via `idempotency_keys` table.
- `write_ledger` table and suppression logic.
- Comprehensive Pest tests covering: no-concurrency merge, non-conflicting concurrent changes, conflicting fields with configured policy, conflict queueing, circular update suppression, UE/polling deduplication, and idempotent retries.

**Stage 4 -- Backpressure, Batching & Adaptive Concurrency:**
- NetSuite Writer with governance-aware throttling and adaptive per-account concurrency.
- Debounce/coalescing logic for high-frequency SuiteX UI updates.
- Load test demonstrating graceful handling under throttling.

**Stage 5 -- Conflict UI & Human Workflows:**
- Conflict queue UI showing Base, SuiteX value, NetSuite value, suggested merge, timestamps, and audit trail.
- Resolution APIs and record locking semantics.

**Stage 6 -- Observability, Runbooks & Cutover:**
- Dashboards (events/sec, backlog, 429s, conflicts, error rates per account).
- Alerts, runbooks, and migration plan from snapshot pipeline to event-driven pipeline.
- Feature flag bypass switch for progressive per-account rollout.
- Shadow event emission for legacy API coexistence (Strangler Fig pattern).
