MassTransit Observability and Message Flow Monitoring¶
This document outlines a plan to implement comprehensive observability for MassTransit message flow through SyRF application components, with a focus on the systematic search upload workflow.
Table of Contents¶
- Problem Statement
- Current State
- Proposed Solution
- Implementation Plan
- Phase 1: Metrics Collection
- Phase 2: State Machine Observers
- Phase 3: Message Flow Observers
- Phase 4: Distributed Tracing Dashboard
- Quick Wins (No Code Changes)
- Success Criteria
- Related Documentation
Problem Statement¶
Currently, there is limited visibility into:
- Message flow between services (API, Project Management, S3 Notifier)
- State machine transitions in
SearchImportJobStateMachine - Processing times for message consumption and saga operations
- Error rates and fault patterns in the messaging pipeline
This makes debugging production issues difficult, particularly for the systematic search upload flow which involves multiple services, S3 events, and state machine orchestration.
Current State¶
SyRF has partial OpenTelemetry setup in SyrfConfigureServices.cs:
services.AddOpenTelemetry()
.WithTracing(tracerProviderBuilder =>
{
tracerProviderBuilder
.ConfigureResource(resource => resource.AddService("SyRF API"))
.AddSource("MongoDB.Driver.Core.Extensions.DiagnosticSources")
.AddSource(DiagnosticHeaders.DefaultListenerName) // MassTransit tracing
.AddAllSyrfAssemblySources()
.AddAspNetCoreInstrumentation();
if (customSentryConfig.Enabled && customSentryConfig.OpenTelemetryTracing)
{
tracerProviderBuilder.AddSentry();
}
});
What exists:
- MassTransit distributed tracing via
DiagnosticHeaders.DefaultListenerName - MongoDB operation tracing via custom
DiagnosticsActivityEventSubscriber - Sentry export (when enabled)
What's missing:
- MassTransit metrics collection (counters, gauges, histograms)
- State machine observers for
SearchImportJobStateMachine - Message flow observers (consume, send, publish)
- Dedicated observability dashboard
Proposed Solution¶
Implement a layered observability stack:
┌─────────────────────────────────────────────────────────────────────┐
│ VISUALIZATION LAYER │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌────────────┐ │
│ │ Grafana │ │ Sentry │ │ Jaeger │ │ RabbitMQ │ │
│ │ (Metrics) │ │ (Traces) │ │ (Traces) │ │ UI │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
▲
┌─────────────────────────────────────────────────────────────────────┐
│ COLLECTION LAYER │
│ ┌─────────────────────┐ ┌─────────────────────────────────────┐ │
│ │ Prometheus Metrics │ │ OpenTelemetry Traces │ │
│ │ - receive/consume │ │ - Distributed trace context │ │
│ │ - send/publish │ │ - Span correlation │ │
│ │ - saga operations │ │ - Error capture │ │
│ └─────────────────────┘ └─────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
▲
┌─────────────────────────────────────────────────────────────────────┐
│ INSTRUMENTATION LAYER │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ │ MassTransit │ │ State Machine │ │ Message Flow │ │
│ │ Built-in Metrics │ │ Observers │ │ Observers │ │
│ └──────────────────┘ └──────────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Implementation Plan¶
Phase 1: Metrics Collection¶
Priority: High Effort: Low Impact: Immediate visibility into message throughput and error rates
1.1 Add MassTransit Metrics to OpenTelemetry¶
Update the OpenTelemetry configuration to include metrics:
File: src/libs/webhostconfig/SyRF.WebHostConfig.Common/Extensions/SyrfConfigureServices.cs
using MassTransit.Monitoring;
public static void AddOpenTelemetryConfig(this ServiceRegistry services, IConfiguration configuration)
{
var customSentryConfig = configuration.GetSection("CustomSentryConfig").Get<CustomSentryConfig>() ??
throw new InvalidOperationException("CustomSentryConfig not found");
services.AddOpenTelemetry()
.ConfigureResource(resource => resource.AddService("SyRF API"))
.WithTracing(tracerProviderBuilder =>
{
tracerProviderBuilder
.AddSource("MongoDB.Driver.Core.Extensions.DiagnosticSources")
.AddSource(DiagnosticHeaders.DefaultListenerName)
.AddAllSyrfAssemblySources()
.AddAspNetCoreInstrumentation();
if (customSentryConfig.Enabled && customSentryConfig.OpenTelemetryTracing)
{
tracerProviderBuilder.AddSentry();
}
})
.WithMetrics(meterProviderBuilder =>
{
meterProviderBuilder
.AddMeter(InstrumentationOptions.MeterName) // MassTransit metrics
.AddAspNetCoreInstrumentation()
.AddPrometheusExporter(); // Expose /metrics endpoint
});
}
1.2 Key Metrics to Monitor¶
| Metric | Description | Use Case |
|---|---|---|
messaging.masstransit.receive |
Messages received by transport | Queue throughput |
messaging.masstransit.receive.errors |
Receive faults | Transport issues |
messaging.masstransit.consume |
Messages consumed | Consumer throughput |
messaging.masstransit.consume.errors |
Consumer faults | Processing failures |
messaging.masstransit.saga |
Saga messages processed | State machine activity |
messaging.masstransit.saga.errors |
Saga faults | State machine failures |
messaging.masstransit.send |
Messages sent | Outbound traffic |
messaging.masstransit.receive.active |
Active receive operations | Concurrency |
messaging.masstransit.consume.duration |
Consumer processing time | Performance |
messaging.masstransit.delivery.duration |
End-to-end delivery time | Latency |
1.3 Prometheus Scrape Configuration¶
Add Prometheus annotations to Kubernetes deployments:
# In Helm values
podAnnotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8080"
prometheus.io/path: "/metrics"
Phase 2: State Machine Observers¶
Priority: High Effort: Medium Impact: Real-time visibility into SearchImportJob state transitions
2.1 Create State Observer¶
File: src/services/project-management/SyRF.ProjectManagement.Endpoint/Observers/SearchImportStateObserver.cs
using MassTransit;
using Microsoft.Extensions.Logging;
using SyRF.ProjectManagement.Endpoint.Sagas;
namespace SyRF.ProjectManagement.Endpoint.Observers;
public class SearchImportStateObserver : IStateObserver<SearchImportJobState>
{
private readonly ILogger<SearchImportStateObserver> _logger;
public SearchImportStateObserver(ILogger<SearchImportStateObserver> logger)
{
_logger = logger;
}
public Task StateChanged(InstanceContext<SearchImportJobState> context,
State currentState, State previousState)
{
_logger.LogInformation(
"SearchImportJob state transition: {CorrelationId} | {PreviousState} → {CurrentState} | " +
"ProjectId={ProjectId} SearchName={SearchName}",
context.Instance.CorrelationId,
previousState?.Name ?? "Initial",
currentState.Name,
context.Instance.ProjectId,
context.Instance.SearchName);
return Task.CompletedTask;
}
}
2.2 Create Event Observer¶
File: src/services/project-management/SyRF.ProjectManagement.Endpoint/Observers/SearchImportEventObserver.cs
using MassTransit;
using Microsoft.Extensions.Logging;
using SyRF.ProjectManagement.Endpoint.Sagas;
namespace SyRF.ProjectManagement.Endpoint.Observers;
public class SearchImportEventObserver : IEventObserver<SearchImportJobState>
{
private readonly ILogger<SearchImportEventObserver> _logger;
public SearchImportEventObserver(ILogger<SearchImportEventObserver> logger)
{
_logger = logger;
}
public Task PreConsume<T>(ConsumeContext<T> context) where T : class
{
_logger.LogDebug(
"SearchImportJob receiving event: {EventType} | CorrelationId={CorrelationId}",
typeof(T).Name,
context.CorrelationId);
return Task.CompletedTask;
}
public Task PostConsume<T>(ConsumeContext<T> context) where T : class
{
_logger.LogInformation(
"SearchImportJob processed event: {EventType} | CorrelationId={CorrelationId} | Duration={Duration}ms",
typeof(T).Name,
context.CorrelationId,
context.ReceiveContext.ElapsedTime.TotalMilliseconds);
return Task.CompletedTask;
}
public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
{
_logger.LogError(exception,
"SearchImportJob event fault: {EventType} | CorrelationId={CorrelationId}",
typeof(T).Name,
context.CorrelationId);
return Task.CompletedTask;
}
}
2.3 Register Observers¶
File: src/services/project-management/SyRF.ProjectManagement.Endpoint/Program.cs (or MassTransit configuration)
services.AddStateObserver<SearchImportJobState, SearchImportStateObserver>();
services.AddEventObserver<SearchImportJobState, SearchImportEventObserver>();
Phase 3: Message Flow Observers¶
Priority: Medium Effort: Medium Impact: Complete visibility into all message flow across services
3.1 Create Consume Observer¶
File: src/libs/webhostconfig/SyRF.WebHostConfig.Common/Observers/MessageConsumeObserver.cs
using MassTransit;
using Microsoft.Extensions.Logging;
namespace SyRF.WebHostConfig.Common.Observers;
public class MessageConsumeObserver : IConsumeObserver
{
private readonly ILogger<MessageConsumeObserver> _logger;
public MessageConsumeObserver(ILogger<MessageConsumeObserver> logger)
{
_logger = logger;
}
public Task PreConsume<T>(ConsumeContext<T> context) where T : class
{
_logger.LogDebug(
"Consuming message: {MessageType} | MessageId={MessageId} | CorrelationId={CorrelationId}",
typeof(T).Name,
context.MessageId,
context.CorrelationId);
return Task.CompletedTask;
}
public Task PostConsume<T>(ConsumeContext<T> context) where T : class
{
_logger.LogInformation(
"Consumed message: {MessageType} | MessageId={MessageId} | Duration={Duration}ms",
typeof(T).Name,
context.MessageId,
context.ReceiveContext.ElapsedTime.TotalMilliseconds);
return Task.CompletedTask;
}
public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
{
_logger.LogError(exception,
"Message consume fault: {MessageType} | MessageId={MessageId} | CorrelationId={CorrelationId}",
typeof(T).Name,
context.MessageId,
context.CorrelationId);
return Task.CompletedTask;
}
}
3.2 Create Send/Publish Observers¶
File: src/libs/webhostconfig/SyRF.WebHostConfig.Common/Observers/MessageSendObserver.cs
using MassTransit;
using Microsoft.Extensions.Logging;
namespace SyRF.WebHostConfig.Common.Observers;
public class MessageSendObserver : ISendObserver
{
private readonly ILogger<MessageSendObserver> _logger;
public MessageSendObserver(ILogger<MessageSendObserver> logger)
{
_logger = logger;
}
public Task PreSend<T>(SendContext<T> context) where T : class
{
_logger.LogDebug(
"Sending message: {MessageType} | Destination={Destination} | CorrelationId={CorrelationId}",
typeof(T).Name,
context.DestinationAddress,
context.CorrelationId);
return Task.CompletedTask;
}
public Task PostSend<T>(SendContext<T> context) where T : class
{
_logger.LogInformation(
"Sent message: {MessageType} | Destination={Destination} | MessageId={MessageId}",
typeof(T).Name,
context.DestinationAddress,
context.MessageId);
return Task.CompletedTask;
}
public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class
{
_logger.LogError(exception,
"Message send fault: {MessageType} | Destination={Destination}",
typeof(T).Name,
context.DestinationAddress);
return Task.CompletedTask;
}
}
public class MessagePublishObserver : IPublishObserver
{
private readonly ILogger<MessagePublishObserver> _logger;
public MessagePublishObserver(ILogger<MessagePublishObserver> logger)
{
_logger = logger;
}
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
_logger.LogDebug(
"Publishing message: {MessageType} | CorrelationId={CorrelationId}",
typeof(T).Name,
context.CorrelationId);
return Task.CompletedTask;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class
{
_logger.LogInformation(
"Published message: {MessageType} | MessageId={MessageId}",
typeof(T).Name,
context.MessageId);
return Task.CompletedTask;
}
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
{
_logger.LogError(exception,
"Message publish fault: {MessageType}",
typeof(T).Name);
return Task.CompletedTask;
}
}
3.3 Register Message Flow Observers¶
// In MassTransit configuration
services.AddConsumeObserver<MessageConsumeObserver>();
services.AddSendObserver<MessageSendObserver>();
services.AddPublishObserver<MessagePublishObserver>();
Phase 4: Distributed Tracing Dashboard¶
Priority: Low Effort: High Impact: Complete end-to-end trace visualization
4.1 Add Jaeger/Tempo Exporter¶
.WithTracing(b => b
.AddSource(DiagnosticHeaders.DefaultListenerName)
.AddOtlpExporter(o =>
{
o.Endpoint = new Uri(configuration["Tracing:OtlpEndpoint"]
?? "http://tempo:4317");
})
)
4.2 Deploy Grafana Tempo (Helm)¶
Add to cluster-gitops infrastructure:
# cluster-gitops/infrastructure/tempo/values.yaml
tempo:
storage:
trace:
backend: gcs
gcs:
bucket_name: syrf-traces
4.3 Grafana Dashboard¶
Create a dashboard showing:
- Message flow between services
- State machine transition timeline
- Error rates and latency percentiles
- Search import job progress
Quick Wins (No Code Changes)¶
These can be used immediately for debugging:
RabbitMQ Management UI¶
# Port-forward to RabbitMQ management UI
kubectl port-forward svc/rabbitmq 15672:15672 -n syrf
# Access at http://localhost:15672
# Default credentials in Helm values
Key queues to monitor:
search-import-job-state- State machine eventsstart-parsing-reference-file-command- Parse job commands*_errorqueues - Dead letter messages
MongoDB Queries¶
// Check SearchImportJob status
db.pmProject.find(
{"SearchImportJobs.Id": CSUUID("your-search-id")},
{"SearchImportJobs.$": 1}
)
// Count studies by search
db.pmStudy.countDocuments({
SystematicSearchId: CSUUID("your-search-id")
})
// Find stuck jobs (Uploading or Parsing for > 1 hour)
db.pmProject.aggregate([
{$unwind: "$SearchImportJobs"},
{$match: {
"SearchImportJobs.Status": {$in: [0, 1]}, // Uploading, Parsing
"SearchImportJobs.CreatedAt": {$lt: new Date(Date.now() - 3600000)}
}},
{$project: {
searchId: "$SearchImportJobs.Id",
status: "$SearchImportJobs.Status",
name: "$SearchImportJobs.Name",
createdAt: "$SearchImportJobs.CreatedAt"
}}
])
Enable Debug Logging¶
Temporarily enable verbose MassTransit logging:
{
"Logging": {
"LogLevel": {
"MassTransit": "Debug",
"SyRF.ProjectManagement.Endpoint.Sagas": "Debug"
}
}
}
Service Logs¶
# Watch Project Management logs for state transitions
kubectl logs -f deploy/project-management -n syrf | grep -E "(SearchImport|StateMachine)"
# Watch API logs for presigning events
kubectl logs -f deploy/api -n syrf | grep "SearchUploadStarted"
Success Criteria¶
Phase 1 Complete When¶
- Prometheus metrics endpoint exposed on all services
- MassTransit metrics visible in Prometheus/Grafana
- Basic dashboard showing message throughput and error rates
Phase 2 Complete When¶
- State machine transitions logged with correlation IDs
- State transition timeline visible in logs
- Alerts configured for stuck state machines
Phase 3 Complete When¶
- All message flow logged with correlation IDs
- End-to-end message latency visible
- Consumer processing times tracked
Phase 4 Complete When¶
- Distributed traces visible in Grafana/Jaeger
- Search import flow traceable from API to completion
- Custom dashboard for systematic search upload monitoring
Related Documentation¶
- Systematic Search Upload Flow - Complete data flow documentation
- MassTransit Observability Docs - Official documentation
- OpenTelemetry .NET - OpenTelemetry instrumentation guide