SearchImportJob Saga Duplicate Event Handling¶
Problem Statement¶
The SearchImportJobStateMachine saga experiences race conditions when duplicate S3 event notifications arrive. AWS S3 uses at-least-once delivery, meaning duplicate SearchUploadSaved events can be sent for the same file upload.
Symptoms¶
- MongoDB duplicate key error (E11000): When duplicate events arrive simultaneously, both attempt to create a saga instance with the same
CorrelationId - Duplicate parse jobs: If the saga already exists, duplicate events trigger
StartParseJobsActivitymultiple times - State machine rejection: Events arriving in unexpected states cause "Not accepted in state X" errors
Affected Users¶
Large projects with many annotation questions are disproportionately affected because:
- Longer document processing time widens the race condition window
- Example: Project with 2,023 annotation questions (1.45 MB document) consistently fails
- Projects with <100 annotation questions rarely experience the issue
Root Cause Analysis¶
Timeline of race condition:
T0: User initiates upload → API publishes SearchUploadStarted
T1: File uploaded to S3 → S3 sends notification to Lambda
T2: Lambda publishes SearchUploadSaved to RabbitMQ
T3: S3 retries notification (at-least-once) → Lambda publishes DUPLICATE
T4: Both messages arrive at saga consumer nearly simultaneously
T5: Thread A creates saga, Thread B gets duplicate key error
OR Thread A processes, Thread B re-triggers activities
Current Mitigations and Gaps¶
The saga already has some defenses, but critical pieces are missing:
What's in place¶
| Mitigation | Location | Effect |
|---|---|---|
| Message partitioner | SearchImportJobStateMachineDefinition.ConfigureSaga |
CreatePartitioner(1) serializes messages per SearchId on a single consumer instance. Does NOT protect across multiple service replicas (MassTransit #5713). |
| Idempotent job creation | ProjectManagementService.GetOrCreateSearchImportJobAsync |
Checks if job exists before creating. Safe to call multiple times. |
| Idempotent file received | ProjectManagementService.ReceivedReferenceFilesForSearchImportJob |
Skips update if status is not Uploading. Safe to call multiple times. |
What's missing¶
| Gap | Impact |
|---|---|
No UseMessageRetry |
E11000 errors go straight to the error queue. No chance to recover. |
No UseInMemoryOutbox |
StartParseJobsActivity sends IStartParsingReferenceFileCommand directly to RabbitMQ before saga state is persisted. If persist fails (E11000), the command is already sent. On retry, it's sent again -- duplicate parse commands. |
No Ignore() handlers |
Duplicate events reaching the saga in terminal or irrelevant states cause unhandled event exceptions. |
Note:
SyrfAutoConfigureEndpointsinMassTransitHelpers.csincludesUseInMemoryOutbox()for state machine endpoints, but this method is dead code -- never called. The PM service uses MassTransit's defaultConfigureEndpoints(provider)which does not add the outbox.
Implemented Solution¶
Three layers of defense plus TTL-based cleanup, all configured in the saga definition and state machine:
Layer 1: Scoped Retry Policy for Concurrency Exceptions¶
Add a scoped retry policy so only MongoDB duplicate key errors (E11000) are retried, not all exceptions. This prevents retrying business logic failures that would never succeed.
File: src/services/project-management/SyRF.ProjectManagement.Endpoint/Sagas/SearchImportJobStateMachine.cs (embedded SearchImportJobStateMachineDefinition)
protected override void ConfigureSaga(IReceiveEndpointConfigurator endpointConfigurator,
ISagaConfigurator<SearchImportJobState> sagaConfigurator)
{
// Layer 1: Retry ONLY on concurrency exceptions (E11000 duplicate key)
endpointConfigurator.UseMessageRetry(r =>
{
r.Handle<MongoDbConcurrencyException>();
r.Intervals(100, 200, 500);
});
// Layer 2: Buffer outgoing commands until saga state persists successfully
endpointConfigurator.UseInMemoryOutbox();
base.ConfigureSaga(endpointConfigurator, sagaConfigurator);
var partition = endpointConfigurator.CreatePartitioner(1);
// ... existing partitioner configuration unchanged
}
Why scoped retry matters: Without r.Handle<MongoDbConcurrencyException>(), the retry would catch ALL exceptions (including business logic failures, null references, etc.), delaying their arrival at the error queue and potentially causing confusing retry behavior. With the filter, only E11000 concurrency conflicts are retried -- exactly the class of error that benefits from retry.
Why this order matters: MassTransit middleware nests outside-in. Retry wraps the outbox, which wraps saga processing. If persist fails, the outbox discards buffered commands, then retry re-executes the entire pipeline cleanly.
Layer 2: In-Memory Outbox¶
Added in the same ConfigureSaga method above. The outbox buffers all commands and events published during saga processing. They are only delivered to the transport after the saga state persists successfully. If persist fails (E11000), the buffer is discarded -- no duplicate commands reach consumers.
Layer 3: State Machine Ignore Handlers for Terminal and Non-Applicable States¶
Use MassTransit's Ignore() to silently discard duplicate events in states where they should not trigger processing. All 6 events are ignored in terminal states (Completed, Error):
File: src/services/project-management/SyRF.ProjectManagement.Endpoint/Sagas/SearchImportJobStateMachine.cs
// Terminal states: ignore ALL events to prevent re-processing
During(Completed,
Ignore(SearchUploadStarted),
Ignore(SearchUploadSaved),
Ignore(ReferenceFileParsingCompleted),
Ignore(ReferenceFileFaulted),
Ignore(StartParsingFaulted),
Ignore(UploadTimeout.Received)
);
During(Error,
Ignore(SearchUploadStarted),
Ignore(SearchUploadSaved),
Ignore(ReferenceFileParsingCompleted),
Ignore(ReferenceFileFaulted),
Ignore(StartParsingFaulted),
Ignore(UploadTimeout.Received)
);
Additionally, SearchUploadSaved is ignored in non-terminal states where it has already been processed:
UploadTimeouthazard note: The 60-minute timeout is scheduled inInitially(When(SearchUploadStarted))and only unscheduled in theUploadingstate whenSearchUploadSavedarrives. If the timeout message is already in-flight when the saga transitions out ofUploading, it can arrive inUploadedorParsing. Without explicitIgnorehandlers, this would cause an unhandled event exception. The ignore handlers in terminal states cover this, andUploadTimeout.Receivedis also handled (or harmless) in non-terminal states through normal state machine flow.
DuringAny refactoring note: The original state machine used DuringAny(When(SearchUploadStarted)) to handle SearchUploadStarted in all states. This was refactored to explicit During(Uploading, Uploaded, Parsing) blocks for non-terminal states only. This makes event routing explicit and avoids the subtle interaction between DuringAny and During + Ignore (which per MassTransit discussion #3461 are additive and could cause unpredictable behavior when combined with Ignore).
TTL-Based Cleanup (Replacing SetCompleted)¶
The original state machine called SetCompleted() in terminal states (Completed, Error) to immediately remove the saga instance from MongoDB. This was removed and replaced with TTL-based cleanup:
Why SetCompleted was removed: When a saga instance is deleted on reaching a terminal state, the Ignore() handlers in During(Completed, ...) and During(Error, ...) become dead code -- the saga no longer exists in MongoDB, so late-arriving duplicate events would hit Initially instead and potentially recreate the saga or fail.
How TTL cleanup works:
SearchImportJobStatehas aCompletedAtproperty (DateTime?)WhenEnter(Completed)andWhenEnter(Error)hooks setCompletedAt = DateTime.UtcNow- A MongoDB TTL index named
ttl_CompletedAt_7don theCompletedAtfield expires documents 7 days after completion - The
CreateSagaTtlIndexhosted service creates this index at application startup
Benefits:
- Saga instances persist in terminal states, so
Ignore()handlers work correctly for late duplicates - MongoDB automatically cleans up stale saga instances after 7 days
- No manual cleanup needed; operational overhead is minimal
Accepted risk: After TTL expiry (7 days), a replayed message could theoretically recreate the saga via Initially. This is acceptable because:
- S3 duplicate notifications happen within minutes/hours, not days
- RabbitMQ message TTL would expire long before 7 days
- The 7-day window provides ample coverage for all realistic retry scenarios
SearchId Is One-Shot¶
SearchId is a one-shot identifier -- there is no re-import support. Once a search import completes (or errors), the same SearchId will never be legitimately reused. Any events arriving for a SearchId that has already reached a terminal state are always duplicates. This simplifies the ignore logic: there is no valid reason to process events in terminal states.
Dead IStartSearchImportJobCommand Removal¶
IStartSearchImportJobCommand was removed from the codebase. Investigation confirmed that no producer existed anywhere in the monorepo -- the command interface was defined but never published by any service. The state machine's Initially(When(StartSearchImportJobCommand)) handler was dead code. Removing it simplifies the state machine and eliminates a confusing entry point.
Metadata Validation via IfElse Branching¶
The implementation includes two layers of metadata validation:
Service Layer Validation¶
ProjectManagementService.GetOrCreateSearchImportJobAsync validates all metadata fields (ProjectId, SearchId, SearchName, ReferenceFiles) before creating or returning a SearchImportJob. This catches missing or invalid data early.
Saga Layer Validation (IfElse Branching)¶
The state machine uses MassTransit's IfElse branching to validate metadata at the saga level:
-
Fatal validation in
During(Uploading, When(SearchUploadSaved)): If metadata is missing or invalid (e.g., noReferenceFiles), the saga transitions toErrorstate. This is fatal because the upload event carries the file metadata needed for parsing -- without it, the saga cannot proceed. -
Non-fatal validation in
During(Uploading, Uploaded, Parsing, When(SearchUploadStarted)): If metadata is missing on theSearchUploadStartedevent arriving late (afterSearchUploadSavedalready provided valid metadata), the saga logs a warning but continues. This is non-fatal because the saga already has valid metadata from the S3 event.
How the Layers Work Together¶
Scenario A: Concurrent duplicate events (cross-replica race)¶
Event A arrives at Replica 1:
+-- Initially(When(SearchUploadSaved))
+-- CreateSearchImportJobActivity -> GetOrCreate (creates job)
+-- SetFileReceivedActivity -> marks files received
+-- StartParseJobsActivity -> command BUFFERED in outbox (not sent yet)
+-- Transition to Parsing
+-- MongoDB saga INSERT -> SUCCESS
+-- Outbox delivers IStartParsingReferenceFileCommand
Event B (duplicate) arrives at Replica 2:
+-- Initially(When(SearchUploadSaved))
+-- CreateSearchImportJobActivity -> GetOrCreate (returns existing)
+-- SetFileReceivedActivity -> status not Uploading, skips
+-- StartParseJobsActivity -> command BUFFERED in outbox
+-- Transition to Parsing
+-- MongoDB saga INSERT -> E11000 DUPLICATE KEY
+-- Outbox DISCARDS buffered command (no duplicate parse!)
+-- UseMessageRetry catches MongoDbConcurrencyException, waits 100ms, retries
+-- On retry: saga loaded from DB (state = Parsing)
+-- During(Parsing, Ignore(SearchUploadSaved)) -> event silently dropped
+-- Done. No errors. No duplicate commands.
Scenario B: Late-arriving duplicate (non-concurrent)¶
Event A processed normally -> saga reaches Parsing/Completed state
Event B (late duplicate) arrives:
+-- Saga loaded from DB (state = Parsing or Completed)
+-- During(Parsing/Completed, Ignore(SearchUploadSaved))
+-- Event silently dropped. No retry needed.
Files Modified¶
| File | Change |
|---|---|
SearchImportJobStateMachine.cs (state machine class) |
Added Ignore() for all 6 events in terminal states; Ignore(SearchUploadSaved) in Uploaded/Parsing; replaced DuringAny with explicit During blocks; removed SetCompleted; added WhenEnter hooks for CompletedAt; removed IStartSearchImportJobCommand handler; added IfElse metadata validation |
SearchImportJobStateMachine.cs (embedded definition class) |
Added scoped UseMessageRetry with Handle<MongoDbConcurrencyException>() and UseInMemoryOutbox before base.ConfigureSaga |
SearchImportJobState.cs |
Added CompletedAt property (DateTime?) for TTL cleanup |
CreateSagaTtlIndex.cs (new) |
Hosted service that creates ttl_CompletedAt_7d TTL index on pmSearchImportJobState collection at startup |
IStartSearchImportJobCommand.cs |
Removed (dead code -- no producer existed) |
ProjectManagementService.cs |
Metadata validation in GetOrCreateSearchImportJobAsync |
| Integration test files | New test suite (see Testing Strategy below) |
Why ParseCommandSent flag was removed¶
The original spec proposed adding a ParseCommandSent boolean to SearchImportJobState and checking it in StartParseJobsActivity. This was removed for two reasons:
-
It wouldn't work: The flag is part of saga state. When the saga insert fails (E11000), the state is NOT persisted. On retry, the saga is loaded fresh from the database where
ParseCommandSentis stillfalse. The guard never fires. -
It's redundant: With
UseInMemoryOutbox, commands are buffered and only delivered on successful persist. WithIgnore()handlers, duplicate events never reach the activity on retry. The flag adds complexity with no benefit.
Testing Strategy¶
Implemented Test Suite¶
The following tests validate the duplicate event handling implementation:
| Test | Purpose |
|---|---|
| Smoke test | Verifies the normal happy-path flow (SearchUploadStarted -> SearchUploadSaved -> parse -> complete) works end-to-end |
| Retry rejection test | Confirms that after E11000 retry, the saga correctly ignores the duplicate event via Ignore() handlers |
| Completed-state test | Verifies all 6 events are silently ignored when saga is in Completed state |
| Behavioral duplicate test | Tests that two SearchUploadSaved events for the same SearchId result in only one parse command being sent |
| Concurrency tests (Testcontainers) | Integration tests using real MongoDB via Testcontainers to validate E11000 handling under concurrent event delivery |
| TTL index tests | Verifies CreateSagaTtlIndex creates the correct TTL index on the pmSearchImportJobState collection |
Manual Verification¶
- Deploy to staging environment
- Upload systematic search to large project (e.g., project with many annotation questions)
- Monitor logs for:
- No duplicate key errors (E11000)
- No "Not accepted in state" errors
- Search import completes successfully
- Verify only one parse job is created per upload
Alternatives Considered¶
Lambda-Level Deduplication (Not Chosen)¶
Store S3 request IDs in DynamoDB to deduplicate at source.
Pros: Eliminates duplicates before they reach the saga Cons: Requires DynamoDB setup, adds Lambda complexity, additional AWS costs
Custom MongoDB Repository with Upsert (Not Chosen)¶
Implement upsert-based saga creation instead of insert.
Pros: True idempotent creation at database level Cons: Requires forking/extending MassTransit MongoDB provider, high maintenance burden
ParseCommandSent Flag (Not Chosen)¶
Add a boolean flag to saga state to track if the parse command was already sent.
Pros: Simple concept Cons: Doesn't work -- flag isn't persisted when E11000 occurs, so it resets on retry. Redundant when InMemoryOutbox is configured. See "Why ParseCommandSent flag was removed" above.
References¶
- MassTransit Saga Guidance -- recommended retry + outbox + partitioner pattern
- MassTransit State Machine --
Ignore()method documentation - MassTransit Discussion #3461 --
DuringAny+Duringare additive - MassTransit Discussion #5713 -- partitioner doesn't protect across replicas
- AWS S3 Event Notification Best Practices
- MongoDB Duplicate Key Error E11000
- MongoDB TTL Indexes
Appendix: Investigation Data¶
Affected Project Statistics¶
From MongoDB analysis on 2026-01-22:
| Metric | Value |
|---|---|
| Affected project annotation questions | 2,023 |
| Affected project document size | ~1.45 MB |
| Projects larger than affected | 11 |
| Projects smaller than affected | 2,163 |
| Rank by annotation questions | #1 (6x more than #2) |
Error Log Pattern¶
MassTransit.MongoDbConcurrencyException: Saga exception on initial Insert
---> MongoDB.Driver.MongoWriteException: E11000 duplicate key error
CorrelationId: 30ce826a-xxxx-xxxx-xxxx-xxxxxxxxxxxx
MassTransit.UnhandledEventException: The SearchUploadSaved event is not handled
during the Uploading state for the SearchImportJobStateMachine