Skip to content

SearchImportJob Saga Duplicate Event Handling

Problem Statement

The SearchImportJobStateMachine saga experiences race conditions when duplicate S3 event notifications arrive. AWS S3 uses at-least-once delivery, meaning duplicate SearchUploadSaved events can be sent for the same file upload.

Symptoms

  1. MongoDB duplicate key error (E11000): When duplicate events arrive simultaneously, both attempt to create a saga instance with the same CorrelationId
  2. Duplicate parse jobs: If the saga already exists, duplicate events trigger StartParseJobsActivity multiple times
  3. State machine rejection: Events arriving in unexpected states cause "Not accepted in state X" errors

Affected Users

Large projects with many annotation questions are disproportionately affected because:

  • Longer document processing time widens the race condition window
  • Example: Project with 2,023 annotation questions (1.45 MB document) consistently fails
  • Projects with <100 annotation questions rarely experience the issue

Root Cause Analysis

Timeline of race condition:

T0: User initiates upload → API publishes SearchUploadStarted
T1: File uploaded to S3 → S3 sends notification to Lambda
T2: Lambda publishes SearchUploadSaved to RabbitMQ
T3: S3 retries notification (at-least-once) → Lambda publishes DUPLICATE
T4: Both messages arrive at saga consumer nearly simultaneously
T5: Thread A creates saga, Thread B gets duplicate key error
    OR Thread A processes, Thread B re-triggers activities

Current Mitigations and Gaps

The saga already has some defenses, but critical pieces are missing:

What's in place

Mitigation Location Effect
Message partitioner SearchImportJobStateMachineDefinition.ConfigureSaga CreatePartitioner(1) serializes messages per SearchId on a single consumer instance. Does NOT protect across multiple service replicas (MassTransit #5713).
Idempotent job creation ProjectManagementService.GetOrCreateSearchImportJobAsync Checks if job exists before creating. Safe to call multiple times.
Idempotent file received ProjectManagementService.ReceivedReferenceFilesForSearchImportJob Skips update if status is not Uploading. Safe to call multiple times.

What's missing

Gap Impact
No UseMessageRetry E11000 errors go straight to the error queue. No chance to recover.
No UseInMemoryOutbox StartParseJobsActivity sends IStartParsingReferenceFileCommand directly to RabbitMQ before saga state is persisted. If persist fails (E11000), the command is already sent. On retry, it's sent again -- duplicate parse commands.
No Ignore() handlers Duplicate events reaching the saga in terminal or irrelevant states cause unhandled event exceptions.

Note: SyrfAutoConfigureEndpoints in MassTransitHelpers.cs includes UseInMemoryOutbox() for state machine endpoints, but this method is dead code -- never called. The PM service uses MassTransit's default ConfigureEndpoints(provider) which does not add the outbox.

Implemented Solution

Three layers of defense plus TTL-based cleanup, all configured in the saga definition and state machine:

Layer 1: Scoped Retry Policy for Concurrency Exceptions

Add a scoped retry policy so only MongoDB duplicate key errors (E11000) are retried, not all exceptions. This prevents retrying business logic failures that would never succeed.

File: src/services/project-management/SyRF.ProjectManagement.Endpoint/Sagas/SearchImportJobStateMachine.cs (embedded SearchImportJobStateMachineDefinition)

protected override void ConfigureSaga(IReceiveEndpointConfigurator endpointConfigurator,
    ISagaConfigurator<SearchImportJobState> sagaConfigurator)
{
    // Layer 1: Retry ONLY on concurrency exceptions (E11000 duplicate key)
    endpointConfigurator.UseMessageRetry(r =>
    {
        r.Handle<MongoDbConcurrencyException>();
        r.Intervals(100, 200, 500);
    });

    // Layer 2: Buffer outgoing commands until saga state persists successfully
    endpointConfigurator.UseInMemoryOutbox();

    base.ConfigureSaga(endpointConfigurator, sagaConfigurator);
    var partition = endpointConfigurator.CreatePartitioner(1);
    // ... existing partitioner configuration unchanged
}

Why scoped retry matters: Without r.Handle<MongoDbConcurrencyException>(), the retry would catch ALL exceptions (including business logic failures, null references, etc.), delaying their arrival at the error queue and potentially causing confusing retry behavior. With the filter, only E11000 concurrency conflicts are retried -- exactly the class of error that benefits from retry.

Why this order matters: MassTransit middleware nests outside-in. Retry wraps the outbox, which wraps saga processing. If persist fails, the outbox discards buffered commands, then retry re-executes the entire pipeline cleanly.

Layer 2: In-Memory Outbox

Added in the same ConfigureSaga method above. The outbox buffers all commands and events published during saga processing. They are only delivered to the transport after the saga state persists successfully. If persist fails (E11000), the buffer is discarded -- no duplicate commands reach consumers.

Layer 3: State Machine Ignore Handlers for Terminal and Non-Applicable States

Use MassTransit's Ignore() to silently discard duplicate events in states where they should not trigger processing. All 6 events are ignored in terminal states (Completed, Error):

File: src/services/project-management/SyRF.ProjectManagement.Endpoint/Sagas/SearchImportJobStateMachine.cs

// Terminal states: ignore ALL events to prevent re-processing
During(Completed,
    Ignore(SearchUploadStarted),
    Ignore(SearchUploadSaved),
    Ignore(ReferenceFileParsingCompleted),
    Ignore(ReferenceFileFaulted),
    Ignore(StartParsingFaulted),
    Ignore(UploadTimeout.Received)
);

During(Error,
    Ignore(SearchUploadStarted),
    Ignore(SearchUploadSaved),
    Ignore(ReferenceFileParsingCompleted),
    Ignore(ReferenceFileFaulted),
    Ignore(StartParsingFaulted),
    Ignore(UploadTimeout.Received)
);

Additionally, SearchUploadSaved is ignored in non-terminal states where it has already been processed:

During(Uploaded,
    Ignore(SearchUploadSaved)
);

During(Parsing,
    Ignore(SearchUploadSaved)
);

UploadTimeout hazard note: The 60-minute timeout is scheduled in Initially(When(SearchUploadStarted)) and only unscheduled in the Uploading state when SearchUploadSaved arrives. If the timeout message is already in-flight when the saga transitions out of Uploading, it can arrive in Uploaded or Parsing. Without explicit Ignore handlers, this would cause an unhandled event exception. The ignore handlers in terminal states cover this, and UploadTimeout.Received is also handled (or harmless) in non-terminal states through normal state machine flow.

DuringAny refactoring note: The original state machine used DuringAny(When(SearchUploadStarted)) to handle SearchUploadStarted in all states. This was refactored to explicit During(Uploading, Uploaded, Parsing) blocks for non-terminal states only. This makes event routing explicit and avoids the subtle interaction between DuringAny and During + Ignore (which per MassTransit discussion #3461 are additive and could cause unpredictable behavior when combined with Ignore).

TTL-Based Cleanup (Replacing SetCompleted)

The original state machine called SetCompleted() in terminal states (Completed, Error) to immediately remove the saga instance from MongoDB. This was removed and replaced with TTL-based cleanup:

Why SetCompleted was removed: When a saga instance is deleted on reaching a terminal state, the Ignore() handlers in During(Completed, ...) and During(Error, ...) become dead code -- the saga no longer exists in MongoDB, so late-arriving duplicate events would hit Initially instead and potentially recreate the saga or fail.

How TTL cleanup works:

  1. SearchImportJobState has a CompletedAt property (DateTime?)
  2. WhenEnter(Completed) and WhenEnter(Error) hooks set CompletedAt = DateTime.UtcNow
  3. A MongoDB TTL index named ttl_CompletedAt_7d on the CompletedAt field expires documents 7 days after completion
  4. The CreateSagaTtlIndex hosted service creates this index at application startup

Benefits:

  • Saga instances persist in terminal states, so Ignore() handlers work correctly for late duplicates
  • MongoDB automatically cleans up stale saga instances after 7 days
  • No manual cleanup needed; operational overhead is minimal

Accepted risk: After TTL expiry (7 days), a replayed message could theoretically recreate the saga via Initially. This is acceptable because:

  • S3 duplicate notifications happen within minutes/hours, not days
  • RabbitMQ message TTL would expire long before 7 days
  • The 7-day window provides ample coverage for all realistic retry scenarios

SearchId Is One-Shot

SearchId is a one-shot identifier -- there is no re-import support. Once a search import completes (or errors), the same SearchId will never be legitimately reused. Any events arriving for a SearchId that has already reached a terminal state are always duplicates. This simplifies the ignore logic: there is no valid reason to process events in terminal states.

Dead IStartSearchImportJobCommand Removal

IStartSearchImportJobCommand was removed from the codebase. Investigation confirmed that no producer existed anywhere in the monorepo -- the command interface was defined but never published by any service. The state machine's Initially(When(StartSearchImportJobCommand)) handler was dead code. Removing it simplifies the state machine and eliminates a confusing entry point.

Metadata Validation via IfElse Branching

The implementation includes two layers of metadata validation:

Service Layer Validation

ProjectManagementService.GetOrCreateSearchImportJobAsync validates all metadata fields (ProjectId, SearchId, SearchName, ReferenceFiles) before creating or returning a SearchImportJob. This catches missing or invalid data early.

Saga Layer Validation (IfElse Branching)

The state machine uses MassTransit's IfElse branching to validate metadata at the saga level:

  1. Fatal validation in During(Uploading, When(SearchUploadSaved)): If metadata is missing or invalid (e.g., no ReferenceFiles), the saga transitions to Error state. This is fatal because the upload event carries the file metadata needed for parsing -- without it, the saga cannot proceed.

  2. Non-fatal validation in During(Uploading, Uploaded, Parsing, When(SearchUploadStarted)): If metadata is missing on the SearchUploadStarted event arriving late (after SearchUploadSaved already provided valid metadata), the saga logs a warning but continues. This is non-fatal because the saga already has valid metadata from the S3 event.

How the Layers Work Together

Scenario A: Concurrent duplicate events (cross-replica race)

Event A arrives at Replica 1:
  +-- Initially(When(SearchUploadSaved))
  +-- CreateSearchImportJobActivity -> GetOrCreate (creates job)
  +-- SetFileReceivedActivity -> marks files received
  +-- StartParseJobsActivity -> command BUFFERED in outbox (not sent yet)
  +-- Transition to Parsing
  +-- MongoDB saga INSERT -> SUCCESS
  +-- Outbox delivers IStartParsingReferenceFileCommand

Event B (duplicate) arrives at Replica 2:
  +-- Initially(When(SearchUploadSaved))
  +-- CreateSearchImportJobActivity -> GetOrCreate (returns existing)
  +-- SetFileReceivedActivity -> status not Uploading, skips
  +-- StartParseJobsActivity -> command BUFFERED in outbox
  +-- Transition to Parsing
  +-- MongoDB saga INSERT -> E11000 DUPLICATE KEY
  +-- Outbox DISCARDS buffered command (no duplicate parse!)
  +-- UseMessageRetry catches MongoDbConcurrencyException, waits 100ms, retries
  +-- On retry: saga loaded from DB (state = Parsing)
  +-- During(Parsing, Ignore(SearchUploadSaved)) -> event silently dropped
  +-- Done. No errors. No duplicate commands.

Scenario B: Late-arriving duplicate (non-concurrent)

Event A processed normally -> saga reaches Parsing/Completed state

Event B (late duplicate) arrives:
  +-- Saga loaded from DB (state = Parsing or Completed)
  +-- During(Parsing/Completed, Ignore(SearchUploadSaved))
  +-- Event silently dropped. No retry needed.

Files Modified

File Change
SearchImportJobStateMachine.cs (state machine class) Added Ignore() for all 6 events in terminal states; Ignore(SearchUploadSaved) in Uploaded/Parsing; replaced DuringAny with explicit During blocks; removed SetCompleted; added WhenEnter hooks for CompletedAt; removed IStartSearchImportJobCommand handler; added IfElse metadata validation
SearchImportJobStateMachine.cs (embedded definition class) Added scoped UseMessageRetry with Handle<MongoDbConcurrencyException>() and UseInMemoryOutbox before base.ConfigureSaga
SearchImportJobState.cs Added CompletedAt property (DateTime?) for TTL cleanup
CreateSagaTtlIndex.cs (new) Hosted service that creates ttl_CompletedAt_7d TTL index on pmSearchImportJobState collection at startup
IStartSearchImportJobCommand.cs Removed (dead code -- no producer existed)
ProjectManagementService.cs Metadata validation in GetOrCreateSearchImportJobAsync
Integration test files New test suite (see Testing Strategy below)

Why ParseCommandSent flag was removed

The original spec proposed adding a ParseCommandSent boolean to SearchImportJobState and checking it in StartParseJobsActivity. This was removed for two reasons:

  1. It wouldn't work: The flag is part of saga state. When the saga insert fails (E11000), the state is NOT persisted. On retry, the saga is loaded fresh from the database where ParseCommandSent is still false. The guard never fires.

  2. It's redundant: With UseInMemoryOutbox, commands are buffered and only delivered on successful persist. With Ignore() handlers, duplicate events never reach the activity on retry. The flag adds complexity with no benefit.

Testing Strategy

Implemented Test Suite

The following tests validate the duplicate event handling implementation:

Test Purpose
Smoke test Verifies the normal happy-path flow (SearchUploadStarted -> SearchUploadSaved -> parse -> complete) works end-to-end
Retry rejection test Confirms that after E11000 retry, the saga correctly ignores the duplicate event via Ignore() handlers
Completed-state test Verifies all 6 events are silently ignored when saga is in Completed state
Behavioral duplicate test Tests that two SearchUploadSaved events for the same SearchId result in only one parse command being sent
Concurrency tests (Testcontainers) Integration tests using real MongoDB via Testcontainers to validate E11000 handling under concurrent event delivery
TTL index tests Verifies CreateSagaTtlIndex creates the correct TTL index on the pmSearchImportJobState collection

Manual Verification

  1. Deploy to staging environment
  2. Upload systematic search to large project (e.g., project with many annotation questions)
  3. Monitor logs for:
  4. No duplicate key errors (E11000)
  5. No "Not accepted in state" errors
  6. Search import completes successfully
  7. Verify only one parse job is created per upload

Alternatives Considered

Lambda-Level Deduplication (Not Chosen)

Store S3 request IDs in DynamoDB to deduplicate at source.

Pros: Eliminates duplicates before they reach the saga Cons: Requires DynamoDB setup, adds Lambda complexity, additional AWS costs

Custom MongoDB Repository with Upsert (Not Chosen)

Implement upsert-based saga creation instead of insert.

Pros: True idempotent creation at database level Cons: Requires forking/extending MassTransit MongoDB provider, high maintenance burden

ParseCommandSent Flag (Not Chosen)

Add a boolean flag to saga state to track if the parse command was already sent.

Pros: Simple concept Cons: Doesn't work -- flag isn't persisted when E11000 occurs, so it resets on retry. Redundant when InMemoryOutbox is configured. See "Why ParseCommandSent flag was removed" above.

References

Appendix: Investigation Data

Affected Project Statistics

From MongoDB analysis on 2026-01-22:

Metric Value
Affected project annotation questions 2,023
Affected project document size ~1.45 MB
Projects larger than affected 11
Projects smaller than affected 2,163
Rank by annotation questions #1 (6x more than #2)

Error Log Pattern

MassTransit.MongoDbConcurrencyException: Saga exception on initial Insert
---> MongoDB.Driver.MongoWriteException: E11000 duplicate key error
    CorrelationId: 30ce826a-xxxx-xxxx-xxxx-xxxxxxxxxxxx

MassTransit.UnhandledEventException: The SearchUploadSaved event is not handled
    during the Uploading state for the SearchImportJobStateMachine