Systematic Search Upload and Study Creation Flow¶
This document describes the complete data flow for uploading systematic search reference files through the SyRF platform, from the Angular frontend through to the creation of Study records in MongoDB.
Table of Contents¶
- Overview
- Architecture Diagram
- Sequence Diagram
- Detailed Flow
- Phase 1: Frontend File Selection and Validation
- Phase 2: S3 Presigning Deep Dive
- Phase 3: Direct S3 Upload
- Phase 4: S3 Event Notification (Lambda)
- Phase 5: State Machine Orchestration
- Phase 6: Reference File Parsing
- Phase 7: Study Creation
- Phase 8: Systematic Search Creation
- Message Contracts
- Domain Models
- Example Payloads
- Error Handling
- Troubleshooting
- Performance Characteristics
- Implementation Critique
- Related Documentation
Overview¶
The systematic search upload process is a multi-service, event-driven workflow that:
- Uploads reference library files (CSV, TSV, Endnote XML, PubMed XML, Living Search JSON) to S3
- Triggers a state machine in the Project Management service
- Parses the uploaded files to extract study metadata
- Creates
Studyaggregate roots in MongoDB
Architecture Diagram¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ ANGULAR FRONTEND │
│ ┌──────────────────┐ ┌─────────────────┐ ┌────────────────────────┐ │
│ │ CreateSearch │───▶│ ProjectDetail │───▶│ S3FileService │ │
│ │ Component │ │ Effects │ │ (presigned upload) │ │
│ └──────────────────┘ └─────────────────┘ └──────────┬─────────────┘ │
└──────────────────────────────────────────────────────────────┼──────────────┘
│
┌─────────────────────────────────────┼──────────────┐
│ API SERVICE │ │
│ ┌──────────────────┐ │ │
│ │ SearchController │ │ │
│ │ GetS3Signature() │──────┐ │ │
│ └──────────────────┘ │ │ │
└────────────────────────────┼────────┼──────────────┘
│ │
▼ ▼
┌─────────────────────────────┐ ┌────────────────────────────────┐
│ RABBITMQ │◀────────────▶│ AWS S3 │
│ ISearchUploadStartedEvent │ │ (Reference File Storage) │
│ ISearchUploadSavedToS3Event│ └────────────────┬───────────────┘
└──────────────┬──────────────┘ │
│ │
│ ┌─────────────────────────┼───────────────┐
│ │ S3 NOTIFIER LAMBDA │ │
│ │ ┌─────────────────────────────────┐ │
│ │ │ S3FileReceivedHandler │ │
│ │ │ (S3:ObjectCreated:Put trigger) │────┘
│ │ └─────────────────────────────────┘
│ └─────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ PROJECT MANAGEMENT SERVICE │
│ ┌────────────────────────────────────────────────────────────────────────┐ │
│ │ SearchImportJobStateMachine │ │
│ │ ┌──────────┐ ┌──────────┐ ┌─────────┐ ┌───────────┐ │ │
│ │ │ Initial │───▶│Uploading │───▶│Uploaded │───▶│ Parsing │──┬──────┤ │
│ │ └──────────┘ └──────────┘ └─────────┘ └───────────┘ │ │ │
│ │ │ │ │ │
│ │ │ (S3 event first) │ │ │
│ │ └───────────────────────────────────────▶ (to Parsing) │ │ │
│ │ ┌────────────┐ ┌─────────┐ │ │ │
│ │ │ Completed │◀─┤ │ │ │ │
│ │ └────────────┘ │ Error │◀──┘ │ │
│ │ └─────────┘ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌────────────────────────────────────────────────────────────────────────┐ │
│ │ ReferenceFileParseJobConsumer │ │
│ │ ┌──────────────────────┐ ┌────────────────────┐ │ │
│ │ │ ProjectManagement │───▶│ StudyReferenceFile │───▶ MongoDB │ │
│ │ │ Service │ │ Parser │ (Studies) │ │
│ │ └──────────────────────┘ └────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────────┘
Sequence Diagram¶
sequenceDiagram
participant User
participant Angular as Angular Frontend
participant API as API Service
participant S3
participant Lambda as S3 Notifier Lambda
participant RabbitMQ
participant PM as Project Management
participant MongoDB
User->>Angular: Select reference file
Angular->>Angular: Validate file format
Angular->>Angular: Calculate SHA-256 hash
Angular->>API: POST /api/projects/{id}/searches/getSignature
Note over API: Generate ReferenceFileId (UUID)
Note over API: Build S3 metadata headers
Note over API: Sign with AWS Sig V4
API->>RabbitMQ: Publish ISearchUploadStartedEvent
API-->>Angular: Return presigned URL + headers
RabbitMQ->>PM: Deliver ISearchUploadStartedEvent
PM->>PM: Create SearchImportJob (Uploading state)
PM->>PM: Schedule 60-min timeout
Angular->>S3: PUT file (presigned URL)
S3-->>Angular: 200 OK
S3->>Lambda: S3:ObjectCreated:Put event
Lambda->>Lambda: Extract metadata from headers
Lambda->>RabbitMQ: Publish ISearchUploadSavedToS3Event
RabbitMQ->>PM: Deliver ISearchUploadSavedToS3Event
PM->>PM: Transition to Uploaded state
PM->>PM: Execute StartParseJobsActivity
PM->>RabbitMQ: Publish IStartParsingReferenceFileCommand
loop For each reference file
RabbitMQ->>PM: Deliver parse command
PM->>S3: Download file
PM->>PM: Parse records
PM->>MongoDB: Batch insert Studies (5000/batch)
PM->>RabbitMQ: Publish IReferenceFileParsingCompletedEvent
end
PM->>PM: All files parsed → Completed state
PM->>MongoDB: Create SystematicSearch
PM-->>User: Search ready for screening
Detailed Flow¶
Phase 1: Frontend File Selection and Validation¶
Component: CreateSearchComponent (src/services/web/src/app/project/project-overview/create-search/create-search.component.ts)
- User selects a reference library file in the Create Search dialog
- Frontend validates the file format:
- CSV/TSV: Checks for required column headers (Title, Authors, PublicationName, etc.)
- XML: Validates Endnote or PubMed XML structure
- User optionally configures screening import settings (mapping columns to investigators)
- User submits the form, dispatching
projectDetailActions.startSearchImport()
Supported File Types (LibraryFileType enum):
| Value | Type | Description |
|---|---|---|
0 |
CSV | Comma-separated values |
1 |
TSV | Tab-separated values |
2 |
EndnoteXml | Endnote XML export |
3 |
PubmedXml | PubMed XML format |
4 |
LivingSearchJson | Living search JSON format |
Required CSV/TSV Columns:
| Column | Description |
|---|---|
title |
Study title |
authors |
Author list |
publicationName |
Journal/publication name |
alternateName |
Alternative publication name |
abstract |
Study abstract |
url |
Link to study |
authorAddress |
Author contact information |
year |
Publication year |
doi |
Digital Object Identifier |
referenceType |
Type of reference |
pdfRelativePath |
Path to PDF file |
keywords |
Study keywords |
customId |
User-defined identifier |
Phase 2: S3 Presigning Deep Dive¶
The presigning process allows the frontend to upload files directly to S3 without the file passing through our API servers. This is critical for large files and reduces server load.
Effect: project-detail.effects.ts (addSearch$ effect) (src/services/web/src/app/core/services/project/project-detail.effects.ts)
- Effect generates SHA-256 hash of the file content
- Calls
SearchService.getS3RequestSignature()with file metadata
API Endpoint: POST /api/projects/{projectId}/searches/getSignature
Controller: SearchController.cs (lines 92-156) (src/services/api/SyRF.API.Endpoint/Controllers/SearchController.cs)
AWS Signature Version 4 Process¶
The API uses AWS Signature Version 4 (SigV4) to create presigned URLs. This is implemented in S3PostSigner.cs (src/libs/appservices/SyRF.AppServices/Services/S3PostSigner.cs).
Signing Algorithm (AWS4-HMAC-SHA256):
StringToSign = Algorithm + '\n' +
RequestDateTime + '\n' +
CredentialScope + '\n' +
HashedCanonicalRequest
CredentialScope = Date + '/' + Region + '/' + Service + '/aws4_request'
Signature = HMAC-SHA256(SigningKey, StringToSign)
Key Derivation Chain:
kSecret = "AWS4" + SecretAccessKey
kDate = HMAC-SHA256(kSecret, Date)
kRegion = HMAC-SHA256(kDate, Region) // "eu-west-1"
kService = HMAC-SHA256(kRegion, Service) // "s3"
kSigning = HMAC-SHA256(kService, "aws4_request")
Canonical Request Format:
CanonicalRequest = HTTPMethod + '\n' +
CanonicalURI + '\n' +
CanonicalQueryString + '\n' +
CanonicalHeaders + '\n' +
SignedHeaders + '\n' +
HashedPayload
What the API Does¶
- Validates the search ID and search name
- Generates a unique
referenceFileId(UUID) for tracking - Constructs S3 key:
Projects/{projectId}/Imported Search Libraries/SyRF Library - {searchId}.{ext} - Builds metadata headers (
x-amz-meta-*): x-amz-meta-projectid: Project GUIDx-amz-meta-searchid: Search GUIDx-amz-meta-searchname: URL-encoded search namex-amz-meta-description: URL-encoded descriptionx-amz-meta-referencefiles: JSON-serializedReferenceFileInfo[]- Signs the request using AWS SigV4
- Publishes
ISearchUploadStartedEventto RabbitMQ (starts state machine) - Returns presigned URL, headers, and metadata to frontend
S3 Configuration¶
- Bucket: Configured via
AWS_S3_BUCKET_NAMEenvironment variable - Region:
eu-west-1 - CORS: Must allow PUT from web origins
- Presign expiry: Configurable (default implementation)
Phase 3: Direct S3 Upload¶
Service: S3FileService (src/services/web/src/app/core/services/s3-file.service.ts)
- Frontend performs HTTP PUT directly to S3 using presigned URL
- Includes all metadata headers from signature response
- Tracks upload progress via
HttpEventType.UploadProgress - S3 stores file with metadata in
x-amz-meta-*headers
Phase 4: S3 Event Notification (Lambda)¶
Lambda Function: S3FileReceivedFunction.cs (src/services/s3-notifier/SyRF.S3FileSavedNotifier.Endpoint/S3FileReceivedFunction.cs)
When S3 receives the file:
- S3 triggers
S3:ObjectCreated:Putevent - Lambda function
syrfAppUploadS3Notifierreceives the event - Extracts metadata from S3 object headers
- Publishes
ISearchUploadSavedToS3Eventto RabbitMQ with: ProjectId,SearchId,SearchName,DescriptionReferenceFiles(deserialized from JSON header)DateTimeEventOccurred
Phase 5: State Machine Orchestration¶
State Machine: SearchImportJobStateMachine.cs (src/services/project-management/SyRF.ProjectManagement.Endpoint/Sagas/SearchImportJobStateMachine.cs)
The MassTransit state machine manages the import workflow.
States¶
| State | Description |
|---|---|
Initial |
State machine not yet started |
Uploading |
API has been notified, waiting for S3 confirmation |
Uploaded |
File successfully stored in S3 |
Parsing |
Reference files being parsed into studies |
Completed |
All parsing complete, SystematicSearch created. Saga persists with CompletedAt timestamp; cleaned up by TTL index after 7 days. |
Error |
Failure occurred at any stage. Saga persists with CompletedAt timestamp; cleaned up by TTL index after 7 days. |
SearchImportJobStatus Enum:
| Value | Status | Description |
|---|---|---|
0 |
Uploading | Initial upload in progress |
1 |
Parsing | File parsing underway |
2 |
Complete | Successfully finished |
3 |
Error | Failed with error |
Dual-Path Entry (Race Condition Handling)¶
The state machine handles a potential race condition where events can arrive in either order:
Path A (Normal flow - API event first):
Path B (S3 event arrives first):
In Path B, SearchUploadSaved arriving first goes directly through Initially to Parsing (via CreateSearchImportJobActivity, SetFileReceivedActivity, and StartParseJobsActivity). When SearchUploadStarted arrives later, it is handled by During(Uploading, Uploaded, Parsing) which sets metadata but does not re-trigger parsing.
This ensures the workflow completes correctly regardless of which event arrives first.
Events and Transitions¶
ISearchUploadStartedEvent
Initial ──────────────────────────────────────────▶ Uploading
│ │
│ ISearchUploadSavedToS3Event │ ISearchUploadSavedToS3Event
│ (S3 event arrives first) │ (normal flow)
│ ▼
│ Uploaded
│ │
│ │ StartParseJobsActivity
│ ▼
└──────────────────────────────────────────────▶ Parsing
│
┌───────────────────────────────────┼───────────────────────────────┐
│ │ │
IReferenceFileParsingCompletedEvent │ IReferenceFileParsingFaultedEvent
│ │ │
▼ ▼ ▼
(all done?) (still pending) (mark fault)
│ │
┌───────────┴───────────┐ │
│ │ │
(no faults) (has faults) │
│ │ │
▼ ▼ │
Completed ◀───────────── Error ◀────────────────────────────────────────────────────┘
Terminal states (Completed, Error) persist with CompletedAt timestamp.
MongoDB TTL index (ttl_CompletedAt_7d) cleans up after 7 days.
All 6 events are Ignore()'d in terminal states to handle late duplicates.
Activities Executed¶
| Activity | Trigger | Purpose |
|---|---|---|
CreateSearchImportJobActivity |
SearchUploadStarted, SearchUploadSaved |
Creates SearchImportJob aggregate with ReferenceFileParseJob entities |
SetFileReceivedActivity |
SearchUploadSaved |
Sets upload completion timestamp |
StartParseJobsActivity |
Transition to Parsing |
Publishes IStartParsingReferenceFileCommand for each reference file |
CompleteSearchJobActivity |
All parse jobs complete (no faults) | Creates SystematicSearch aggregate |
FailSearchJobActivity |
Any parse job faulted | Marks import job with error status |
Timeout Handling¶
- 60-minute timeout scheduled when entering
Uploadingstate - If S3 confirmation not received, transitions to
Errorstate - Publishes
ISearchImportJobErrorEventwith timeout message
Phase 6: Reference File Parsing¶
Consumer: ReferenceFileParseJobConsumer.cs (src/services/project-management/SyRF.ProjectManagement.Endpoint/Consumers/ReferenceFileParseJobConsumer.cs)
For each reference file, the consumer:
- Receives
IStartParsingReferenceFileCommand - Calls
ProjectManagementService.ParseReferenceFile() - On success: Publishes
IReferenceFileParsingCompletedEvent - On failure: Publishes
IReferenceFileParsingFaultedEvent
Configuration:
- Job timeout: 5 minutes
- Retry policy: 4 incremental retries (1 min + 1 min increments)
- Concurrent job limit: 5
Service: ProjectManagementService.cs (lines 129-199) (src/libs/project-management/SyRF.ProjectManagement.Core/Services/ProjectManagementService.cs)
The ParseReferenceFile method:
- Retrieves the
ReferenceFileParseJobfrom the project - Sets up progress tracking with throttled updates (every 2 seconds)
- Calls
StudyReferenceFileParser.ParseStudiesAsync() - Handles progress updates and final completion/error states
Phase 7: Study Creation¶
Parser Router: StudyReferenceFileParser.cs (src/libs/project-management/SyRF.ProjectManagement.Core/Services/StudyReferenceFileParser.cs)
- Selects appropriate parser implementation based on
LibraryFileType - Creates new SyRF reference file URL for storing parsed output
- Invokes parser's
ParseStudiesAsync()method
Parser Implementations:
| Parser | File Types | Location |
|---|---|---|
SpreadsheetParseImplementation |
CSV, TSV | SpreadsheetParseImplementation.cs |
EndnoteXmlParseImplementation |
Endnote XML | EndnoteXmlParseImplementation.cs |
PubmedXmlParseImplementation |
PubMed XML | PubmedXmlParseImplementation.cs |
Record Processors:
| Processor | Purpose |
|---|---|
StudySpreadsheetRecordProcessor |
Converts CSV/TSV rows to Study objects |
StudyEndnoteRecordProcessor |
Converts Endnote XML records to Study objects |
StudyPubmedRecordProcessor |
Converts PubMed XML records to Study objects |
Each parser:
- Downloads the original file from S3
- Parses records using the appropriate record processor
- Creates
Studyaggregate roots with metadata: - Title, Authors, Abstract, Year
- DOI, URL, Keywords
- PublicationName, ReferenceType
- Project and Search associations
- Batches studies (5000 at a time) for MongoDB insertion
- Creates a new SyRF reference file with Study IDs added
- Reports progress back to the state machine
Phase 8: Systematic Search Creation¶
Domain Model: SearchImportJob.cs (line 41) (src/libs/project-management/SyRF.ProjectManagement.Core/Model/ProjectAggregate/SearchImportJob.cs)
When all ReferenceFileParseJob entities complete:
public SystematicSearch CompleteSearchImportJob()
{
Status = SearchImportJobStatus.Complete;
var search = new SystematicSearch(Id, Name, Description,
ReferenceFileParseJobs.Select(rfj => rfj.CreateStudyReferenceFile()),
ProjectId, LivingSearchId);
return search;
}
Result: A SystematicSearch aggregate is created containing:
- Search ID (same as SearchImportJob ID)
- Name and Description
- Collection of
StudyReferenceFileobjects - Project association
- Optional Living Search association
Message Contracts¶
Events¶
| Event | Publisher | Subscribers | Purpose |
|---|---|---|---|
ISearchUploadStartedEvent |
API Service | PM State Machine | Notifies upload has begun |
ISearchUploadSavedToS3Event |
S3 Notifier Lambda | PM State Machine | Confirms file stored in S3 |
IReferenceFileParsingCompletedEvent |
ReferenceFileParseJobConsumer | PM State Machine | Parse job succeeded |
IReferenceFileParsingFaultedEvent |
ReferenceFileParseJobConsumer | PM State Machine | Parse job failed |
ISearchImportJobErrorEvent |
PM State Machine | Error handlers | Import job failed |
Commands¶
| Command | Publisher | Consumer | Purpose |
|---|---|---|---|
IStartParsingReferenceFileCommand |
StartParseJobsActivity | ReferenceFileParseJobConsumer | Triggers parsing of individual file |
Shared Interfaces¶
ICanStartSearchImportJob (src/libs/kernel/SyRF.SharedKernel/Interfaces/ICanStartSearchImportJob.cs)
Base interface implemented by ISearchUploadStartedEvent and ISearchUploadSavedToS3Event (the two events that can initiate a search import job). Note: IStartSearchImportJobCommand was removed as it had no producer anywhere in the monorepo.
public interface ICanStartSearchImportJob
{
Guid ProjectId { get; }
Guid SearchId { get; }
string SearchName { get; }
string Description { get; }
IEnumerable<ReferenceFileInfo> ReferenceFiles { get; }
}
ReferenceFileInfo (src/libs/kernel/SyRF.SharedKernel/ValueObjects/ReferenceFileInfo.cs)
Value object containing file metadata:
public record ReferenceFileInfo(
Guid ReferenceFileId,
string FileUrl,
LibraryFileType LibraryFileType,
ScreeningImportSettings? ScreeningImportSettings
);
Domain Models¶
SearchImportJob¶
Location: SearchImportJob.cs
MongoDB Collection: Projects (embedded in Project aggregate)
Aggregate root tracking the import process:
| Property | Type | Description |
|---|---|---|
Id |
Guid |
Same as SearchId |
Name |
string |
User-provided search name |
Description |
string |
User-provided description |
Status |
SearchImportJobStatus |
Uploading, Parsing, Complete, Error |
ReferenceFileParseJobs |
IEnumerable<ReferenceFileParseJob> |
Individual file parsing jobs |
TotalNumberOfStudies |
int? |
Sum across all parse jobs |
NumberOfParsedStudies |
int |
Progress counter |
Errors |
List<string> |
Error messages |
ReferenceFileParseJob¶
Location: ReferenceFileParseJob.cs
MongoDB Collection: Embedded in SearchImportJob
Entity tracking individual file parsing:
| Property | Type | Description |
|---|---|---|
Id |
Guid |
ReferenceFileId |
LibraryType |
LibraryFileType |
File format |
OriginalFileUrl |
string |
S3 location of uploaded file |
SyrfReferenceFileUrl |
string? |
S3 location of parsed file with IDs |
TotalNumberOfStudies |
int? |
Total studies in file |
NumberOfParsedStudies |
int |
Parsed count |
ScreeningImportSettings |
ScreeningImportSettings? |
Column-to-investigator mapping |
IsComplete |
bool |
Parsed == Total |
Study¶
Location: Study.cs
MongoDB Collection: Studies
Aggregate root for individual studies:
| Property | Type | Description |
|---|---|---|
Id |
Guid |
Unique study identifier |
Title |
string |
Study title |
Authors |
IEnumerable<Author> |
Author list |
Abstract |
string? |
Study abstract |
Year |
int? |
Publication year |
DOI |
string? |
Digital Object Identifier |
PublicationName |
PublicationName |
Journal information |
Keywords |
List<string> |
Study keywords |
ProjectId |
Guid |
Owning project |
SystematicSearchId |
Guid |
Owning search |
ReferenceFileId |
Guid |
Source file |
ScreeningInfo |
ScreeningInfo |
Screening decisions |
ExtractionInfo |
ExtractionInfo |
Data extraction |
SystematicSearch¶
Location: SystematicSearch.cs
MongoDB Collection: SystematicSearches
Aggregate root representing a completed search:
| Property | Type | Description |
|---|---|---|
Id |
Guid |
Search identifier |
Name |
string |
Search name |
Description |
string |
Search description |
SyrfReferenceFiles |
IEnumerable<StudyReferenceFile> |
Parsed file references |
ProjectId |
Guid? |
Owning project |
NumberOfStudies |
int |
Total studies across all files |
Example Payloads¶
Presigning Request¶
Request: POST /api/projects/{projectId}/searches/getSignature
{
"searchId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"searchName": "PubMed Search 2024",
"description": "Search results from PubMed for cardiac intervention studies",
"referenceFiles": [
{
"referenceFileId": "f1e2d3c4-b5a6-7890-1234-567890abcdef",
"fileUrl": "Projects/project-123/Imported Search Libraries/SyRF Library - a1b2c3d4.csv",
"libraryFileType": 0,
"screeningImportSettings": null
}
],
"fileHash": "abc123def456...",
"contentType": "text/csv"
}
Presigning Response¶
{
"uploadUrl": "https://syrf-bucket.s3.eu-west-1.amazonaws.com/Projects/...",
"headers": {
"x-amz-meta-projectid": "project-123",
"x-amz-meta-searchid": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"x-amz-meta-searchname": "PubMed%20Search%202024",
"x-amz-meta-description": "Search%20results%20from%20PubMed...",
"x-amz-meta-referencefiles": "[{\"ReferenceFileId\":\"f1e2d3c4...\",\"FileUrl\":\"...\",\"LibraryFileType\":0}]",
"x-amz-content-sha256": "abc123def456...",
"x-amz-date": "20241207T120000Z",
"Authorization": "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20241207/eu-west-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-meta-projectid;x-amz-meta-referencefiles;x-amz-meta-searchid;x-amz-meta-searchname, Signature=..."
},
"referenceFileId": "f1e2d3c4-b5a6-7890-1234-567890abcdef"
}
ISearchUploadStartedEvent¶
{
"messageId": "msg-uuid-123",
"messageType": ["urn:message:SyRF.ProjectManagement.Messages.Events:ISearchUploadStartedEvent"],
"message": {
"projectId": "project-123",
"searchId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"searchName": "PubMed Search 2024",
"description": "Search results from PubMed...",
"referenceFiles": [
{
"referenceFileId": "f1e2d3c4-b5a6-7890-1234-567890abcdef",
"fileUrl": "Projects/project-123/Imported Search Libraries/SyRF Library - a1b2c3d4.csv",
"libraryFileType": 0,
"screeningImportSettings": null
}
],
"dateTimeEventOccurred": "2024-12-07T12:00:00Z"
}
}
ISearchUploadSavedToS3Event¶
{
"messageId": "msg-uuid-456",
"messageType": ["urn:message:SyRF.S3FileSavedNotifier.Messages:ISearchUploadSavedToS3Event"],
"message": {
"projectId": "project-123",
"searchId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"searchName": "PubMed Search 2024",
"description": "Search results from PubMed...",
"referenceFiles": [
{
"referenceFileId": "f1e2d3c4-b5a6-7890-1234-567890abcdef",
"fileUrl": "Projects/project-123/Imported Search Libraries/SyRF Library - a1b2c3d4.csv",
"libraryFileType": 0
}
],
"dateTimeEventOccurred": "2024-12-07T12:00:05Z"
}
}
Error Handling¶
Upload Timeout¶
- 60-minute timeout from upload start
- State machine transitions to
Errorstate ISearchImportJobErrorEventpublished with "Timed out waiting for file upload"
Parsing Failures¶
- Individual file parsing has 5-minute timeout
- 4 automatic retries with incremental backoff
- On final failure,
IReferenceFileParsingFaultedEventpublished - If any file fails, entire import marked as
Error
Rollback on Parse Error¶
If parsing fails mid-stream:
- All studies created for that file are deleted
FileParseResultcontains error detailsReferenceFileParseJobmarked with errorsSearchImportJobstatus set toError
Troubleshooting¶
Common Issues¶
| Symptom | Likely Cause | Resolution |
|---|---|---|
| Upload stuck at 0% | CORS misconfiguration on S3 bucket | Verify S3 CORS allows PUT from web origin |
| "Timed out waiting for file upload" | Lambda not triggered or RabbitMQ connectivity | Check Lambda CloudWatch logs, verify S3 event notification |
| Search stuck in "Uploading" | S3 event never reached PM service | Check S3 bucket event notifications, Lambda invocations |
| Parsing errors | Malformed input file | Validate CSV columns, check XML structure |
| Studies not appearing | MongoDB connectivity or batch insert failure | Check PM service logs, MongoDB connection |
Diagnostic Steps¶
- Check RabbitMQ queues: Verify messages are being delivered
search-import-job-statequeue for state machine events-
start-parsing-reference-file-commandqueue for parse jobs -
Check Lambda CloudWatch logs:
/aws/lambda/syrfAppUploadS3Notifier - Look for S3 event processing errors
-
Verify metadata extraction succeeded
-
Check PM service logs: Search for
SearchImportJobStateMachine - State transitions logged with correlation ID
-
Activity execution results
-
MongoDB queries:
// Check SearchImportJob status
db.Projects.find({"SearchImportJobs.Id": UUID("search-id")})
// Count studies for a search
db.Studies.countDocuments({SystematicSearchId: UUID("search-id")})
Debug Mode¶
Enable verbose logging in Project Management service:
Screening Import Integration¶
When ScreeningImportSettings is provided:
- CSV/TSV columns are mapped to investigator IDs
- Screening decisions are extracted during parsing
- Studies are created with pre-populated
ScreeningInfo - Allows bulk import of existing screening decisions
ScreeningImportSettings Structure:
public record ScreeningImportSettings(
Guid StageId,
Dictionary<string, Guid> UserColumnMap // column name -> investigator ID
);
Performance Characteristics¶
| Operation | Typical Duration | Bottleneck |
|---|---|---|
| S3 signature generation | < 100ms | API response |
| S3 upload | Variable (file size) | Network bandwidth |
| S3 notification | 1-5 seconds | Lambda cold start |
| Parse job (CSV, 10K studies) | 30-60 seconds | MongoDB batch writes |
| Parse job (XML, 10K studies) | 60-120 seconds | XML parsing + MongoDB |
Note: Performance numbers are estimates based on typical workloads and may vary.
Optimizations:
- Studies batched in groups of 5,000 for MongoDB insertion
- Progress updates throttled to every 2 seconds
- Up to 5 concurrent parse jobs per instance
- Incremental retry with backoff on transient failures
Implementation Critique¶
This section documents known architectural concerns and potential improvements for the systematic search upload flow.
Strengths¶
-
Robust Race Condition Handling: The dual-path state machine entry handles the case where S3 events can arrive before API events. Both
SearchUploadStartedandSearchUploadSavedcan initiate the saga viaInitially, and late-arriving events are handled by explicitDuringblocks. -
Good Separation of Concerns: The architecture cleanly separates:
- API (presigning + event publishing)
- Lambda (S3 event notification)
- State machine (orchestration)
-
Consumers (parsing work)
-
Proper Use of MassTransit Sagas: Using state machines for workflow orchestration is the right pattern for this use case.
-
Batching Strategy: The 5,000 study batch size for MongoDB inserts is a reasonable trade-off between memory usage and database round-trips.
Known Issues and Technical Debt¶
1. Presigned URL Security Gap¶
Location: SearchController.cs:92-156
The presigning endpoint publishes ISearchUploadStartedEvent before the file is actually uploaded. If a user requests a signature but never uploads:
- A
SearchImportJobgets created in "Uploading" state - It will time out after 60 minutes and transition to "Error"
- This creates orphaned/failed jobs in the database
Recommendation: Consider a two-phase approach where the job isn't created until the S3 event confirms the upload, or implement cleanup for abandoned uploads.
2. Silent Exception Swallowing¶
Location: ReferenceFileParseJobConsumer.cs:29-32
The exception e is caught but never logged. This makes debugging production issues difficult. The fault event doesn't contain the exception details.
Recommendation: Log the exception with correlation ID before publishing the fault event.
3. State Machine Duplicate Event Handling (Resolved)¶
Location: SearchImportJobStateMachine.cs
Previously, duplicate events could create duplicate jobs or trigger duplicate parse commands. This has been resolved with a three-layer defense: scoped UseMessageRetry (only MongoDbConcurrencyException), UseInMemoryOutbox, and Ignore() handlers for all 6 events in terminal states. Saga instances persist in terminal states and are cleaned up by a MongoDB TTL index after 7 days. See SearchImportJob Saga Duplicate Event Handling for full details.
4. Tight Coupling to S3 Path Convention¶
Location: SearchController.cs
The S3 key pattern Projects/{projectId}/Imported Search Libraries/SyRF Library - {searchId}.{ext} is hard-coded. If this convention changes, both the API and Lambda need coordinated updates.
Recommendation: Extract path construction to a shared utility, or store the full URL in metadata rather than reconstructing it.
5. SystematicSearch Schema Migration Complexity¶
Location: SystematicSearch.cs:72-105
The SyrfReferenceFiles property has complex getter/setter logic handling schema version 0 vs newer versions. This backward compatibility logic makes the domain model harder to understand and maintain.
Recommendation: Consider a one-time data migration script to upgrade all v0 documents, then remove the backward compatibility code.
6. Missing Validation in ReferenceFileParseJob.UpdateProgress¶
Location: ReferenceFileParseJob.cs:95-133
The UpdateProgress method silently returns false when HasError || IsComplete. Callers may not check this return value, leading to silent failures where progress updates are ignored.
Recommendation: Either throw an exception or log a warning when progress updates are attempted on completed/errored jobs.
7. No Circuit Breaker for MongoDB Operations¶
The parsing flow does batch inserts of 5,000 studies without circuit breaker protection. If MongoDB is under load:
- All 5 concurrent parse jobs will pile up
- Retries will compound the problem
- No backpressure mechanism exists
Recommendation: Implement a circuit breaker pattern (Polly) for MongoDB operations, and consider adding backpressure through RabbitMQ prefetch limits.
8. Metadata Size Limit Risk¶
Location: SearchController.cs
S3 object metadata has a 2KB limit. The x-amz-meta-referencefiles header contains JSON-serialized file info. With multiple files or long descriptions, this could exceed the limit.
Recommendation: Store minimal identifiers in metadata and have the Lambda look up full details from a database or use the S3 key itself to encode the correlation ID.
9. Inconsistent Error Handling Strategy¶
- Some errors delete created studies (rollback)
- Some errors just mark the job as failed
- The state machine transitions to
Errorbut existing partial data may remain
Recommendation: Define and document a consistent error handling contract - either always roll back partial data or always keep it for manual review.
10. No Dead Letter Queue Strategy¶
If messages permanently fail, they'll exhaust retries and be moved to error queues, but there's no documented strategy for:
- Monitoring dead letters
- Alerting on accumulated failures
- Manual retry or resolution procedures
Recommendation: Add observability and runbooks for DLQ handling.
Minor Improvements¶
- Progress throttling (2 seconds) could be configurable
- Concurrent job limit (5) should be based on resource constraints, not a magic number
- Add correlation IDs to all log statements for distributed tracing
- Consider using CloudEvents format for better observability tooling compatibility
Operational Notes¶
Saga State Collection Growth¶
The pmSearchImportJobState MongoDB collection stores saga instances for the SearchImportJobStateMachine. With TTL-based cleanup (replacing immediate SetCompleted deletion), saga instances persist for 7 days after reaching a terminal state (Completed or Error). The TTL index ttl_CompletedAt_7d on the CompletedAt field handles automatic cleanup.
Monitor: Collection size should remain proportional to the volume of search imports over the past 7 days. If the collection grows unexpectedly, check:
- TTL index exists:
db.pmSearchImportJobState.getIndexes()should showttl_CompletedAt_7d CompletedAtis being set: documents in terminal states should have a non-nullCompletedAtvalue- MongoDB TTL monitor thread is running (runs every 60 seconds by default)
Related Documentation¶
- Dependency Map - Service dependencies
- System Overview - Overall architecture
- ADR-003: Cluster Architecture - Deployment architecture