# iPaaS System Architecture and Logic Flow Documentation

## Table of Contents
1. [System Overview](#system-overview)
2. [Core Components](#core-components)
3. [Flow Execution Lifecycle](#flow-execution-lifecycle)
4. [Job Types and Responsibilities](#job-types-and-responsibilities)
5. [Node Types and Processing](#node-types-and-processing)
6. [API Connector Strategy Pattern](#api-connector-strategy-pattern)
7. [Pagination and Data Processing](#pagination-and-data-processing)
8. [Subflow Processing](#subflow-processing)
9. [Duplicate Prevention System](#duplicate-prevention-system)
10. [Flow Completion and Metrics](#flow-completion-and-metrics)
11. [Database Architecture](#database-architecture)
12. [Error Handling and Recovery](#error-handling-and-recovery)
13. [Preview vs Full Flow Execution](#preview-vs-full-flow-execution)

---

## System Overview

The iPaaS (Integration Platform as a Service) is a flow-based data processing system that orchestrates data movement and transformation between external APIs and internal systems. The system operates on a multi-tenant architecture with tenant-isolated databases and uses Redis for state management and duplicate prevention.

### Key Architectural Principles
- **Multi-tenant isolation**: Each tenant has isolated database connections and data
- **Queue-based processing**: All operations are asynchronous using Laravel queues
- **Idempotent operations**: Duplicate prevention ensures safe retry and recovery
- **Run-level isolation**: Each flow execution has a unique run ID for isolation
- **Event-driven completion**: Flow completion is managed through event dispatching

---

## Core Components

### Primary Job Classes
- **ProcessFlow**: Entry point for flow execution
- **ProcessFlowPage**: Handles API pagination
- **ProcessNode**: Executes individual node logic
- **SubProcessApiNodeJob**: Manages subflow processing

### Node Types (NodeTypeEnum)
- **ApiNode**: External API requests and responses
- **MapNode**: Data field mapping operations
- **TransformNode**: Data transformation logic
- **BranchNode**: Conditional flow control
- **FileNode**: File reading operations
- **BranchChildNode**: Branch logic flows

### Support Services
- **FlowCounterService**: Tracks job completion status
- **FlowMetricsService**: Collects performance and execution metrics
- **NodeLoaderService**: Dynamically loads node models
- **DuplicatePreventionTrait**: Prevents duplicate record processing
- **CleanupFlowDataService**: Manages Redis key cleanup

---

## Flow Execution Lifecycle

### 1. Flow Initiation
```
Trigger (Manual/Scheduled)
    ↓
ProcessFlow Job Dispatched
    ↓
Tenant Connection Setup
    ↓
Flow Metrics Initialization
    ↓
Run ID Generation (unique per execution)
```

### 2. Initial Data Processing
```
API Node Execution
    ↓
Response Data Extraction
    ↓
Pagination Check
    ↓
Branch: Pagination Enabled/Disabled
```

### 3A. Non-Paginated Flow
```
Process Records Directly
    ↓
Batch Processing (if >10 records)
    ↓
ProcessNode Jobs Dispatched
    ↓
Flow Completion
```

### 3B. Paginated Flow
```
ProcessFlowPage Dispatched
    ↓
Iterative API Calls
    ↓
Data Collection
    ↓
ProcessNode Jobs Dispatched
    ↓
Check for Next Page
    ↓
Flow Completion Detection
```

### 4. Record Processing Chain
```
ProcessNode Execution
    ↓
Node Type Identification
    ↓
Node Logic Application
    ↓
Data Transformation/API Call
    ↓
Next Node Dispatch (if exists)
    ↓
Flow Counter Update
```

---

## Job Types and Responsibilities

### ProcessFlow
**Purpose**: Main entry point and initial data acquisition
**Key Responsibilities**:
- Establishes tenant database connection
- Generates unique run ID for execution isolation
- Initializes flow and node metrics tracking
- Executes the first API node to fetch data
- Determines pagination requirements
- Dispatches appropriate next jobs (ProcessFlowPage or ProcessNode)
- Manages flow completion for non-paginated scenarios

**Key Features**:
- Dual-counter system for job tracking (`jobs_expected` / `jobs_completed`)
- Redis-based state management with run-level isolation
- Comprehensive logging and metrics collection
- Error handling with graceful degradation

### ProcessFlowPage
**Purpose**: Handles paginated API responses
**Key Responsibilities**:
- Iterates through paginated API responses
- Supports multiple pagination types (generic, token, GraphQL)
- Processes each page of data
- Dispatches ProcessNode jobs for each record
- Detects pagination completion
- Triggers flow completion when all jobs finish

**Pagination Types**:
- **Generic**: Offset/limit based pagination
- **Token**: Next page token based pagination
- **GraphQL**: Cursor-based pagination for GraphQL APIs

### ProcessNode
**Purpose**: Executes individual node logic within the flow
**Key Responsibilities**:
- Loads appropriate node model based on type
- Executes node-specific logic (API, Transform, Map, Branch)
- Updates node data with results
- Dispatches next node in sequence
- Updates flow counters for completion tracking
- Handles node-level error scenarios

**Node Execution Pattern**:
1. Load node data from database
2. Identify and load node model
3. Execute node logic with payload
4. Update metrics and logging
5. Dispatch next node (if exists)
6. Increment completion counter

### SubProcessApiNodeJob
**Purpose**: Handles subflow processing for complex data scenarios
**Key Responsibilities**:
- Breaks down large payloads into manageable subsets
- Processes individual items through subflows
- Manages subflow state and coordination
- Merges results back into main flow
- Provides isolated execution context

---

## Node Types and Processing

### ApiNode
**Purpose**: Manages external API requests using strategy pattern
**Key Features**:
- Connector-based configuration
- Strategy pattern for different authentication types
- Throttling and rate limiting
- Automatic token refresh on authorization errors
- Support for various content types and HTTP methods
- Subflow processing capability

**Execution Flow**:
```
Load Connector Configuration
    ↓
Apply Authentication Strategy
    ↓
Execute API Request (with throttling)
    ↓
Handle Response/Errors
    ↓
Token Refresh (if needed)
    ↓
Return Processed Data
```

### MapNode
**Purpose**: Field mapping and data structure transformation
- Maps data fields between different schemas
- Supports nested object transformation
- Handles array mapping operations

### TransformNode
**Purpose**: Data transformation and business logic application
- Executes custom transformation scripts
- Applies business rules to data
- Formats data for downstream consumption

### BranchNode
**Purpose**: Conditional flow control and routing
- Evaluates branch conditions
- Routes data to appropriate flow paths
- Supports multiple branching scenarios

---

## API Connector Strategy Pattern

### Architecture
The system uses a strategy pattern for handling different API authentication and request methods:

```
AuthContext (Context)
    ↓
ConnectorStrategyInterface (Strategy Interface)
    ↓
ConnectorStrategy (Abstract Strategy)
    ↓
Specific Strategy Implementations
```

### Key Components

#### Connector Model
- Stores connection configuration (base URL, auth type, throttling)
- Maintains authentication credentials
- Defines media types and request settings
- Links to tenant-specific configurations

#### ConnectorStrategy (Abstract)
- Implements common request logic
- Handles HTTP method execution
- Manages headers and content types
- Provides error handling framework
- Implements token refresh mechanisms

#### Strategy Interface
- `authenticate()`: Initial authentication
- `refreshToken()`: Token renewal
- `makeRequest()`: Execute API requests

### Authentication Flow
```
1. Load Connector Configuration
2. Select Authentication Strategy
3. Execute Initial Authentication
4. Make API Request
5. Handle Authorization Errors
6. Refresh Token (if needed)
7. Retry Request
8. Return Response
```

---

## Snowflake Async Polling and Timeout Handling

### Overview
The iPaaS system integrates with Snowflake's asynchronous query execution API. When Snowflake returns an HTTP 202 status, queries execute asynchronously and require polling to retrieve results. This introduces unique timeout considerations for job execution.

### HTTP Client Timeout Defaults (Non-Snowflake Connectors)

All iPaaS HTTP requests made through `IpaasHelper::executeCurl()` use an explicit cURL timeout:

- **Default HTTP timeout**: 60 seconds per request
- **Per-request override**: `input['timeout']` (seconds) can override the default
- **Connector-level override**: `connector_config.timeout` (seconds) can set a custom timeout (capped at 180 seconds)

This timeout applies to all connector strategies (OAuth1, OAuth2, Basic, Password, JWT, and others) except where a job explicitly passes a different value. Snowflake async polling uses its own internal timeout configuration described below.

### Snowflake Async Polling Architecture

#### Async Detection
When Snowflake receives a query that will take longer than expected, it returns:
```json
{
  "httpStatusCode": 202,
  "message": "Asynchronous execution in progress",
  "statementHandle": "01c009cc-0003-c318-0000-8a45210e48ee",
  "statementStatusUrl": "/api/v2/statements/01c009cc-0003-c318-0000-8a45210e48ee"
}
```

#### Polling Implementation
The `SnowflakeService` automatically detects async responses and polls the status URL:

```php
// SnowflakeService.php
private const POLL_INTERVAL_SECONDS = 5;        // Poll every 5 seconds
private const POLL_TIMEOUT_SECONDS = 240;       // Maximum 4 minutes
```

**Polling Behavior**:
- Polls every 5 seconds until query completes
- Maximum polling duration: 240 seconds (4 minutes)
- Handles HTTP 202 (still processing), 200 (success), 422 (error), 429 (rate limit), 5xx (transient errors)
- Retries transient errors with exponential backoff (up to 3 retries)

#### Synchronous Blocking
**Important**: Snowflake async polling is **synchronous and blocking**. The entire polling operation runs within the same job process, blocking the worker thread for up to 240 seconds.

**Execution Flow**:
```
ApiNode->execute()
  └─> executeSingleRequest()
      └─> IpaasHelper::executeThrottlerRequest() (synchronous)
          └─> ConnectorJWT->makeRequest() (synchronous)
              └─> SnowflakeService->handleResponse() (synchronous)
                  └─> pollAsyncStatement() (blocks for up to 240 seconds)
```

### Job Timeout Requirements

All jobs that can execute API nodes (and thus Snowflake requests) must have sufficient timeout to accommodate async polling:

#### Required Timeouts
| Job | Required Timeout | Rationale |
|-----|------------------|-----------|
| **ProcessFlow** | 300 seconds (5 min) | Must allow for 240s polling + 60s buffer |
| **ProcessFlowPage** | 300 seconds (5 min) | Pagination jobs may also call Snowflake |
| **ProcessNode** | 300 seconds (5 min) | Direct API node execution |
| **SubProcessApiNodeJob** | 300 seconds (5 min) | Subflow nodes may call Snowflake |
| **ProcessApiPreviewJob** | 180 seconds (3 min) | Fast feedback for testing; prevents cURL timeout errors |

**Configuration**:
```php
public $timeout = 300; // 5 minutes - sufficient for Snowflake async polling
```

#### Timeout Calculation
- **Snowflake polling maximum**: 240 seconds
- **Processing overhead**: 30 seconds
- **Network/retry buffer**: 30 seconds
- **Minimum total**: 300 seconds (5 minutes)

**Preview Job Exception**: `ProcessApiPreviewJob` uses 180 seconds (3 minutes) because:
- Preview operations are meant for fast user feedback during testing
- Preview processes only single items, not full datasets
- Longer timeouts can cause cURL timeout errors
- Users can switch to full flow execution if preview times out

### Proactive Requeueing Strategy

The `ProcessNode` job implements proactive requeueing to prevent mid-execution timeouts.

#### Rationale
**Problem**: If setup work consumes significant time and insufficient time remains for node execution (especially Snowflake polling), starting work would result in timeout mid-operation.

**Solution**: Check remaining time BEFORE starting node execution and requeue if insufficient time remains.

#### Implementation
```php
// ProcessNode.php - Lines 132-164
$elapsedJobTime = $startTime - $jobStartTime;
$remainingTime = $this->timeout - $elapsedJobTime;

// Require at least 270 seconds (240s Snowflake + 30s buffer)
$minRequiredTime = 270;

if ($remainingTime < $minRequiredTime) {
    // Requeue cleanly BEFORE starting work
    // Prevents:
    // - Mid-execution timeouts
    // - Incomplete metrics state
    // - Partial node data updates
    ProcessNode::dispatch(...)->delay(now()->addSeconds(30));
    return;
}
```

#### Benefits
1. **Prevents Partial State**: Avoids starting node execution that cannot complete
2. **Clean Metrics**: Metrics initialization (line 111) won't be left incomplete
3. **Better Failure Handling**: Clean requeue is preferable to mid-execution timeout
4. **Resource Efficiency**: Avoids wasting time on operations that will timeout

### Timeout Error Handling

#### Post-Timeout Retry Logic
```php
// ProcessNode.php - Lines 377-382
if ($isTimeoutError && $totalJobTime < ($this->timeout * 2)) {
    // Only retry if not already a long-running retry
    $this->handleTimeoutError($e, $totalJobTime);
    return;
}
```

**Retry Delay Calculation** (based on elapsed time):
- `< 30 seconds`: 60 second delay
- `30-45 seconds`: 180 second delay
- `> 45 seconds`: 300 second delay

### Connector Isolation

**Important**: Snowflake async polling only affects **Snowflake JWT connectors**. Other connector strategies (`ConnectorOAuth2`, `ConnectorBasic`, `ConnectorOAuth1`, `ConnectorPassword`) are unaffected.

**Implementation Location**:
- `ConnectorJWT->processResponse()` - Calls `SnowflakeService->handleResponse()` only for `snowflake_jwt` application code
- Other connector strategies bypass async polling entirely

### Best Practices

1. **Always Set Explicit Timeouts**: Never rely on Laravel's default 60-second timeout for jobs that may call Snowflake
2. **Buffer Time**: Include adequate buffer (30+ seconds) beyond maximum polling time
3. **Monitor Setup Time**: Track how long setup work takes - if consistently high, consider optimization
4. **Log Timeout Events**: Comprehensive logging helps diagnose timeout-related issues
5. **Test with Long Queries**: Verify timeout configurations with queries that trigger async polling

### Troubleshooting Timeout Issues

**Symptoms**:
- Jobs timing out during Snowflake API calls
- Partial metrics or incomplete state
- Jobs marked as failed but queries succeed

**Diagnosis**:
1. Check job timeout configuration (`public $timeout`)
2. Verify Snowflake query is returning 202 status (async mode)
3. Review logs for polling duration (`elapsed_seconds` in logs)
4. Check if setup work is consuming excessive time before node execution

**Solutions**:
- Increase job timeout if consistently hitting limits
- Optimize setup work if it's consuming too much time
- Verify worker/Horizon timeout settings match job timeouts

---

## Pagination and Data Processing

### Pagination Types

#### Generic Pagination (Offset/Limit)
```php
// Calculates next offset based on current data
$nextOffset = $currentOffset + $currentCount;
$limit = $currentCount; // Maintain same limit

// Updates request parameters
$updatedParams = $this->updatePaginationParameters(
    $originalParams,
    $paginationConfig,
    $limit,
    $nextOffset
);
```

#### Token-Based Pagination
```php
// Extracts next page token from response
$token = $this->getValueFromPath($this->payload, $this->paginationConfig['nextPageTokenPath']);

// Validates token and constructs next request
if (!empty($token)) {
    // Continue pagination
} else {
    // End pagination
}
```

#### GraphQL Pagination
```php
// Uses cursor-based pagination for GraphQL APIs
// Manages cursor state and query construction
// Handles GraphQL-specific response formats
```

### Data Processing Strategy
- **Small datasets** (<= 10 records): Sequential processing
- **Large datasets** (> 10 records): Batch processing (chunks of 10)
- **Pagination enabled**: ProcessFlowPage handles data collection
- **Pagination disabled**: Direct ProcessNode dispatch

---

## Subflow Processing

### Purpose
Subflows handle scenarios where payload data needs to be broken down into smaller subsets for individual processing, then merged back into the main flow.

### Architecture
```
Main Flow
    ↓
SubProcessApiNodeJob Dispatched
    ↓
Payload Subdivision (via sub_flow_path)
    ↓
Individual Item Processing
    ↓
Result Collection and Merging
    ↓
Return to Main Flow
```

### Key Features
- **Isolated execution**: Each subflow operates independently
- **State management**: Redis-based state tracking
- **Result merging**: Automatic aggregation of subflow results
- **Error isolation**: Subflow errors don't crash main flow
- **Metrics tracking**: Individual subflow performance monitoring

### Execution Pattern
```php
// Extract items using configured path
$items = IpaasHelper::getNestedValue($this->payload, $this->node->sub_flow_path);

// Process each item individually
foreach ($items as $item) {
    // Execute subflow logic
    $result = processSubflowItem($item);

    // Store result with unique key
    $this->stateService->setItemResult($itemId, $result);
}

// Merge results back into payload
$mergedResult = $this->stateService->getMergedResults();
```

---

## Duplicate Prevention System

### Architecture
The system implements a comprehensive duplicate prevention mechanism using Redis and database storage:

- **Redis Layer**: Fast, in-memory deduplication for active flows
- **Database Layer**: Persistent storage for audit trails
- **Record-level**: Prevents duplicate record processing
- **Page-level**: Prevents duplicate page processing
- **Flow-level**: Ensures flow completion events fire only once

### Key Components
- **FlowDuplicatePrevention**: Record-level deduplication
- **FlowCompletionLocks**: Flow completion event prevention
- **FlowPagePrevention**: Page-level deduplication
- **CleanupFlowDataService**: Automated cleanup of expired entries

### Redis Key Patterns
```
flow:{flowId}:processed_record:{recordHash}
flow:{flowId}:processed_page:{pageHash}
flow:{flowId}:completion_lock
flow:{flowId}:run:{runId}:jobs_expected
flow:{flowId}:run:{runId}:jobs_completed
subflow_state:{timestamp}:{flowId}
```

### Cleanup Strategy
- **Production-safe SCAN**: Non-blocking Redis operations
- **KEYS fallback**: Automatic fallback when SCAN fails
- **Batch processing**: Processes keys in batches of 500
- **Prefix handling**: Proper Redis prefix management
- **Error recovery**: Comprehensive retry and error handling

---

## Flow Completion and Metrics

### Dual-Counter System
The system uses a sophisticated dual-counter mechanism for accurate flow completion tracking:

```php
// Expected jobs: Total jobs that will be processed
Redis::set("flow:{$flowId}:run:{$runId}:jobs_expected", $totalExpectedJobs);

// Completed jobs: Jobs that have finished
Redis::set("flow:{$flowId}:run:{$runId}:jobs_completed", 0);
```

### Job Counting Rules
- **ProcessNode jobs**: Counted towards completion
- **ProcessFlowPage jobs**: NOT counted (pagination coordination)
- **SubProcessApiNodeJob**: Counted independently

### Completion Detection
```php
public static function isFlowReadyForCompletion($flowId, $runId): bool
{
    $expected = Redis::get("flow:{$flowId}:run:{$runId}:jobs_expected") ?? 0;
    $completed = Redis::get("flow:{$flowId}:run:{$runId}:jobs_completed") ?? 0;

    return $expected > 0 && $completed >= $expected;
}
```

### Metrics Collection
The FlowMetricsService tracks comprehensive performance data:

#### Flow-Level Metrics
- Execution time and duration
- Total records processed
- Success/failure counts
- Start and completion timestamps

#### Node-Level Metrics
- Individual node execution times
- Input/output data samples and schemas
- Record counts and data sizes
- Configuration and transformation details
- Error tracking and recovery

---

## Database Architecture

### Multi-Tenant Isolation
The system maintains strict tenant isolation through separate database connections:

#### Core Database (`mysql`)
- **Purpose**: Job queue management only
- **Tables**: `jobs`, `failed_jobs`, queue-related tables
- **Access**: Shared across all tenants

#### Tenant Database (`tenant_connection`)
- **Purpose**: All tenant-specific data
- **Tables**: `flows`, `nodes`, `node_data`, `connectors`, metrics tables
- **Access**: Isolated per tenant

### Connection Management
```php
// Setup tenant connection for each job
setupTenantConnection($this->tenantDatabase);

// All models use tenant connection
class Flow extends Model {
    protected $connection = 'tenant_connection';
}
```

### Migration Strategy
- **Core migrations**: `database/migrations/` - shared system tables
- **Tenant migrations**: `database/migrations/tenants/` - tenant-specific tables
- **Isolation**: Never create cross-database foreign keys
- **Command**: `php artisan migrate:tenants` for tenant migrations

---

## Error Handling and Recovery

### Error Classification
1. **Transient Errors**: Network timeouts, API rate limits
2. **Authentication Errors**: Token expiration, invalid credentials
3. **Data Errors**: Invalid JSON, missing fields
4. **System Errors**: Database failures, queue issues

### NetSuite HTTP Timeouts and Concurrency Limits

NetSuite (via the `netsuite_oauth1` connector) has additional behavior that affects how timeouts and rate limits are handled:

- **Concurrent request limits**: NetSuite can return HTTP 429 with `CONCURRENCY_LIMIT_EXCEEDED` when too many requests are in-flight:

```json
{
  "httpStatusCode": 429,
  "errorResponse": {
    "status": 429,
    "o:errorDetails": [
      {
        "detail": "Concurrent request limit exceeded. Request blocked. Verify your concurrency limits at Setup > Integration > Integration Management > Integration Governance.",
        "o:errorCode": "CONCURRENCY_LIMIT_EXCEEDED"
      }
    ]
  }
}
```

- **Current behavior**:
  - These responses are surfaced as `Error HTTP 429` from the connector strategy and logged as errors.
  - They are treated as hard failures for the individual API call, even though NetSuite may continue processing some requests in the background.

- **HTTP timeout configuration**:
  - iPaaS HTTP calls to NetSuite now use a **60 second** default cURL timeout (unless overridden per-request or via connector config).
  - This is intended to reduce false-positive timeouts when NetSuite is slow to respond due to concurrency governance.

- **Planned edge-case handling (future enhancement)**:
  - If NetSuite continues to return timeouts after increasing the HTTP timeout to 60 seconds, a special case may be introduced for specific NetSuite timeout patterns where:
    - Certain timeout responses are treated as **expected behavior** for idempotent operations.
    - These cases would be reported as a logical **success** in iPaaS while still logging the timeout for observability.
  - This behavior is not yet implemented and will be added only after confirming patterns in production logs.

### Recovery Mechanisms

#### Automatic Token Refresh
```php
// Detect unauthorized errors
if (IpaasHelper::isUnauthorizedError($decodedResponse)) {
    $this->refreshToken++;
    $this->refreshToken($this->connector);
    $input['connector'] = $this->connector;
    return $this->makeRequest($input); // Retry with new token
}
```

#### Flow State Management
```php
// Set error state with run isolation
Redis::set("flow:{$flowId}:run:{$runId}:status", "error");
Redis::set("flow:{$flowId}:run:{$runId}:error", $errorMessage);
Redis::set("flow:{$flowId}:run:{$runId}:error_at", now()->toIso8601String());
```

#### Graceful Degradation
- Missing node data: Log error and complete job counter
- API failures: Retry with exponential backoff
- Queue failures: Failed job handling with retry limits
- Pagination errors: Stop pagination, process collected data

### Logging Strategy
Comprehensive logging with structured data:
```php
Log::info('🚀 PROCESS FLOW STARTED', [
    'flow_id' => $this->flow_id,
    'tenant_database' => $this->tenantDatabase,
    'node_id' => $this->node->id,
    'timestamp' => now()->toIso8601String()
]);
```

---

## Preview vs Full Flow Execution

The iPaaS system supports two distinct execution modes: **Preview Mode** for testing and validation, and **Full Flow Execution** for production processing. These modes handle data storage and state management differently to optimize for their respective use cases.

### Execution Mode Comparison

| Aspect | Preview Execution | Full Flow Execution |
|---------|-------------------|-------------------|
| **Entry Point** | `ApiNodeController::execute()` | `ProcessFlow` job |
| **NodeData Records** | ❌ Not created (`nodeDataID = 0`) | ✅ Created & persisted |
| **Data Storage** | ⚡ Redis temporary state only | 💾 Database + Redis state |
| **Payload Source** | 📝 ApiNode model field | 🔄 API response records |
| **Persistence** | 🕒 Temporary (TTL 24h) | 💾 Permanent database records |
| **Recovery** | ❌ No recovery possible | ✅ Can resume from database |
| **Audit Trail** | ❌ Logs only | ✅ Database records + logs |
| **Performance** | ⚡ Faster (no DB writes) | 🐢 Slower (DB persistence) |
| **Use Case** | 🔍 Testing/validation | 🚀 Production processing |

### Preview Execution Flow

**Purpose**: Lightweight testing and validation without database pollution

**Entry Point**:
```php
ApiNodeController::execute($id) → ApiNode::execute(null, $runId)
```

**Key Characteristics**:
- **No NodeData Creation**: Uses `nodeDataID = 0` as placeholder
- **Temporary State Management**: Redis-based with custom state keys
- **In-Memory Processing**: Data exists only during execution
- **Fast Feedback**: Immediate UI updates for testing purposes
- **No Database Pollution**: Test runs don't create permanent records

**Data Flow**:
```
User clicks "Preview"
    ↓
ApiNodeController generates unique runId
    ↓
ApiNode uses existing payload from model field
    ↓
SubProcessApiNodeJob processes items temporarily
    ↓
ProcessApiRequest jobs execute with nodeDataID = 0
    ↓
Results stored in temporary Redis state
    ↓
UI receives immediate feedback
```

**State Key Pattern**: `subflow_state:{timestamp}` (e.g., `subflow_state:1756267167873`)

### Full Flow Execution

**Purpose**: Production data processing with complete audit trail and recovery

**Entry Point**:
```php
ProcessFlow job → API execution → record processing → job dispatch
```

**Key Characteristics**:
- **NodeData Persistence**: Each record stored in database table
- **Complete Audit Trail**: Full tracking of processed records
- **Recovery Capability**: Can resume from database on failure
- **Batch Processing**: Handles large datasets efficiently
- **Duplicate Prevention**: Redis + database deduplication

**Data Flow**:
```
ProcessFlow starts flow execution
    ↓
Executes first ApiNode to fetch data
    ↓
API response contains records array
    ↓
For each record: NodeData::create() → database insertion
    ↓
ProcessNode::dispatch() with actual nodeData->id
    ↓
Nodes process records through flow chain
    ↓
Completion tracked via dual-counter system
```

**State Key Pattern**: `subflow_state:{runId}:{nodeId}` (e.g., `subflow_state:run_12345:42`)

### Technical Implementation Details

#### ProcessFlow - NodeData Creation
```php
// ProcessFlow creates persistent records
foreach ($decodedOutput['records'] as $record) {
    $nodeData = new NodeData([
        'payload' => json_encode($record),
        'node_id' => $this->node->next_node_id,
        'node_type' => $this->node->next_node_type
    ]);
    $nodeData->save();

    // Dispatch with actual database ID
    ProcessNode::dispatch($this->node->next_node_id, $this->node->next_node_type,
        $nodeData->id, $this->tenantDatabase, $this->flow_id, $queueName, $this->runId);
}
```

#### Preview - No NodeData Creation
```php
// SubProcessApiNodeJob uses temporary processing
foreach ($items as $item) {
    $itemParams['nodeDataID'] = $this->node->nodeDataId ?? 0; // No database record

    $jobs[] = new ProcessApiRequest(
        $itemParams, $this->database, $this->node->sub_flow_attribute_name,
        $this->node->sub_flow_route_response, $this->node->sub_flow_path,
        $item, $this->key, $this->flow_id, null, $this->runId
    );
}
```

### State Management Differences

#### Preview State Management
- **Key Format**: Custom timestamp-based keys from ApiNode
- **Initialization**: `SubflowStateService::initializeStateWithCustomKey()`
- **Updates**: Uses legacy key format for compatibility
- **Cleanup**: Automatic TTL-based expiration (24 hours)
- **Purpose**: Immediate feedback for UI testing

#### Full Flow State Management
- **Key Format**: Standard `runId:nodeId` pattern
- **Initialization**: `SubflowStateService::initializeState()`
- **Updates**: Standard state update methods
- **Cleanup**: Managed cleanup after flow completion
- **Purpose**: Robust tracking for production processing

### Error Handling Differences

#### Preview Error Handling
- **Fast Fail**: Immediate error return to UI
- **No Recovery**: Errors stop processing immediately
- **Temporary State**: Error information not persisted
- **UI Focus**: Designed for immediate developer feedback

#### Full Flow Error Handling
- **Retry Logic**: Automatic retry with exponential backoff
- **State Persistence**: Error information stored in database
- **Recovery**: Can resume from last successful state
- **Production Focus**: Robust error handling for reliability

### Preview Mode Enhancements (2025-08-27)

- **Single-item processing in preview**: Only the first item in `sub_flow_path` is processed to reduce API calls and provide faster feedback.
- **Error-first preview responses**: If any item produces an error, ApiNode now returns an object with:
  - `error`: primary error message
  - `errorResponse` (optional): raw error payload when available
  - `summary`: `{ total_responses, errors_count, successes_count, mixed_results }`
  - `partial_success` (optional): the first successful item payload for context
- **State key compatibility**: Preview initializes state via `SubflowStateService::initializeStateWithCustomKey($customKey, $initialState)` to use ApiNode’s timestamp-based key. `ProcessApiRequest` reads/updates using `getStateByLegacyKey()`/`updateStateByLegacyKey()` ensuring compatibility.
- **Improved preview return semantics**: ApiNode inspects `state['response']` for errors and returns error details immediately; otherwise returns the updated `payload`.
- **Diagnostic logging**: ApiNode logs final state structure and response array metadata to aid troubleshooting.

Example error-first response shape:
```json
{
  "error": "Error HTTP 400",
  "errorResponse": { "title": "Bad Request", "status": 400 },
  "summary": { "total_responses": 2, "errors_count": 1, "successes_count": 1, "mixed_results": true },
  "partial_success": { "netsuiteId": "99698", "...": "..." }
}
```

---

### Development Guidelines

1. **Tenant Isolation**: Always use `setupTenantConnection()` before tenant operations
2. **Run Isolation**: Use run IDs for Redis keys to prevent cross-execution conflicts
3. **Job Counting**: Only count ProcessNode jobs in completion tracking
4. **Error Handling**: Implement graceful degradation and comprehensive logging
5. **Duplicate Prevention**: Always check for duplicates before processing
6. **Cleanup**: Implement proper cleanup of Redis keys and temporary data
7. **Testing**: Use SQLite in-memory databases with proper isolation
8. **Metrics**: Track comprehensive metrics for monitoring and optimization

### Common Patterns

#### Job Dispatch Pattern
```php
// Increment expected jobs before dispatch
FlowCounterService::incrementExpectedJobs($flowId, $runId, 1);

// Dispatch job
ProcessNode::dispatch($nodeId, $nodeType, $nodeDataId, $tenantDatabase, $flowId, $queueName, $runId);

// Increment completed jobs in job handler
FlowCounterService::incrementCompletedJobs($flowId, $runId, 1);
```

#### Error Handling Pattern
```php
try {
    // Main logic
} catch (Exception $e) {
    Log::error('❌ ERROR MESSAGE', [
        'flow_id' => $this->flow_id,
        'error' => $e->getMessage(),
        'trace' => $e->getTraceAsString()
    ]);

    // Set error state
    $this->handleError($e);

    // Update counters if needed
    FlowCounterService::incrementCompletedJobs($flowId, $runId, 1);
}
```
