Skip to content

MassTransit Observability and Message Flow Monitoring

This document outlines a plan to implement comprehensive observability for MassTransit message flow through SyRF application components, with a focus on the systematic search upload workflow.

Table of Contents

Problem Statement

Currently, there is limited visibility into:

  1. Message flow between services (API, Project Management, S3 Notifier)
  2. State machine transitions in SearchImportJobStateMachine
  3. Processing times for message consumption and saga operations
  4. Error rates and fault patterns in the messaging pipeline

This makes debugging production issues difficult, particularly for the systematic search upload flow which involves multiple services, S3 events, and state machine orchestration.

Current State

SyRF has partial OpenTelemetry setup in SyrfConfigureServices.cs:

services.AddOpenTelemetry()
    .WithTracing(tracerProviderBuilder =>
    {
        tracerProviderBuilder
            .ConfigureResource(resource => resource.AddService("SyRF API"))
            .AddSource("MongoDB.Driver.Core.Extensions.DiagnosticSources")
            .AddSource(DiagnosticHeaders.DefaultListenerName)  // MassTransit tracing
            .AddAllSyrfAssemblySources()
            .AddAspNetCoreInstrumentation();
        if (customSentryConfig.Enabled && customSentryConfig.OpenTelemetryTracing)
        {
            tracerProviderBuilder.AddSentry();
        }
    });

What exists:

  • MassTransit distributed tracing via DiagnosticHeaders.DefaultListenerName
  • MongoDB operation tracing via custom DiagnosticsActivityEventSubscriber
  • Sentry export (when enabled)

What's missing:

  • MassTransit metrics collection (counters, gauges, histograms)
  • State machine observers for SearchImportJobStateMachine
  • Message flow observers (consume, send, publish)
  • Dedicated observability dashboard

Proposed Solution

Implement a layered observability stack:

┌─────────────────────────────────────────────────────────────────────┐
│                        VISUALIZATION LAYER                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌────────────┐ │
│  │   Grafana   │  │   Sentry    │  │   Jaeger    │  │ RabbitMQ   │ │
│  │  (Metrics)  │  │  (Traces)   │  │  (Traces)   │  │    UI      │ │
│  └─────────────┘  └─────────────┘  └─────────────┘  └────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│                        COLLECTION LAYER                             │
│  ┌─────────────────────┐  ┌─────────────────────────────────────┐  │
│  │ Prometheus Metrics  │  │ OpenTelemetry Traces                │  │
│  │ - receive/consume   │  │ - Distributed trace context         │  │
│  │ - send/publish      │  │ - Span correlation                  │  │
│  │ - saga operations   │  │ - Error capture                     │  │
│  └─────────────────────┘  └─────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│                       INSTRUMENTATION LAYER                         │
│  ┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐  │
│  │ MassTransit      │  │ State Machine    │  │ Message Flow     │  │
│  │ Built-in Metrics │  │ Observers        │  │ Observers        │  │
│  └──────────────────┘  └──────────────────┘  └──────────────────┘  │
└─────────────────────────────────────────────────────────────────────┘

Implementation Plan

Phase 1: Metrics Collection

Priority: High Effort: Low Impact: Immediate visibility into message throughput and error rates

1.1 Add MassTransit Metrics to OpenTelemetry

Update the OpenTelemetry configuration to include metrics:

File: src/libs/webhostconfig/SyRF.WebHostConfig.Common/Extensions/SyrfConfigureServices.cs

using MassTransit.Monitoring;

public static void AddOpenTelemetryConfig(this ServiceRegistry services, IConfiguration configuration)
{
    var customSentryConfig = configuration.GetSection("CustomSentryConfig").Get<CustomSentryConfig>() ??
                             throw new InvalidOperationException("CustomSentryConfig not found");

    services.AddOpenTelemetry()
        .ConfigureResource(resource => resource.AddService("SyRF API"))
        .WithTracing(tracerProviderBuilder =>
        {
            tracerProviderBuilder
                .AddSource("MongoDB.Driver.Core.Extensions.DiagnosticSources")
                .AddSource(DiagnosticHeaders.DefaultListenerName)
                .AddAllSyrfAssemblySources()
                .AddAspNetCoreInstrumentation();

            if (customSentryConfig.Enabled && customSentryConfig.OpenTelemetryTracing)
            {
                tracerProviderBuilder.AddSentry();
            }
        })
        .WithMetrics(meterProviderBuilder =>
        {
            meterProviderBuilder
                .AddMeter(InstrumentationOptions.MeterName)  // MassTransit metrics
                .AddAspNetCoreInstrumentation()
                .AddPrometheusExporter();  // Expose /metrics endpoint
        });
}

1.2 Key Metrics to Monitor

Metric Description Use Case
messaging.masstransit.receive Messages received by transport Queue throughput
messaging.masstransit.receive.errors Receive faults Transport issues
messaging.masstransit.consume Messages consumed Consumer throughput
messaging.masstransit.consume.errors Consumer faults Processing failures
messaging.masstransit.saga Saga messages processed State machine activity
messaging.masstransit.saga.errors Saga faults State machine failures
messaging.masstransit.send Messages sent Outbound traffic
messaging.masstransit.receive.active Active receive operations Concurrency
messaging.masstransit.consume.duration Consumer processing time Performance
messaging.masstransit.delivery.duration End-to-end delivery time Latency

1.3 Prometheus Scrape Configuration

Add Prometheus annotations to Kubernetes deployments:

# In Helm values
podAnnotations:
  prometheus.io/scrape: "true"
  prometheus.io/port: "8080"
  prometheus.io/path: "/metrics"

Phase 2: State Machine Observers

Priority: High Effort: Medium Impact: Real-time visibility into SearchImportJob state transitions

2.1 Create State Observer

File: src/services/project-management/SyRF.ProjectManagement.Endpoint/Observers/SearchImportStateObserver.cs

using MassTransit;
using Microsoft.Extensions.Logging;
using SyRF.ProjectManagement.Endpoint.Sagas;

namespace SyRF.ProjectManagement.Endpoint.Observers;

public class SearchImportStateObserver : IStateObserver<SearchImportJobState>
{
    private readonly ILogger<SearchImportStateObserver> _logger;

    public SearchImportStateObserver(ILogger<SearchImportStateObserver> logger)
    {
        _logger = logger;
    }

    public Task StateChanged(InstanceContext<SearchImportJobState> context,
        State currentState, State previousState)
    {
        _logger.LogInformation(
            "SearchImportJob state transition: {CorrelationId} | {PreviousState} → {CurrentState} | " +
            "ProjectId={ProjectId} SearchName={SearchName}",
            context.Instance.CorrelationId,
            previousState?.Name ?? "Initial",
            currentState.Name,
            context.Instance.ProjectId,
            context.Instance.SearchName);

        return Task.CompletedTask;
    }
}

2.2 Create Event Observer

File: src/services/project-management/SyRF.ProjectManagement.Endpoint/Observers/SearchImportEventObserver.cs

using MassTransit;
using Microsoft.Extensions.Logging;
using SyRF.ProjectManagement.Endpoint.Sagas;

namespace SyRF.ProjectManagement.Endpoint.Observers;

public class SearchImportEventObserver : IEventObserver<SearchImportJobState>
{
    private readonly ILogger<SearchImportEventObserver> _logger;

    public SearchImportEventObserver(ILogger<SearchImportEventObserver> logger)
    {
        _logger = logger;
    }

    public Task PreConsume<T>(ConsumeContext<T> context) where T : class
    {
        _logger.LogDebug(
            "SearchImportJob receiving event: {EventType} | CorrelationId={CorrelationId}",
            typeof(T).Name,
            context.CorrelationId);

        return Task.CompletedTask;
    }

    public Task PostConsume<T>(ConsumeContext<T> context) where T : class
    {
        _logger.LogInformation(
            "SearchImportJob processed event: {EventType} | CorrelationId={CorrelationId} | Duration={Duration}ms",
            typeof(T).Name,
            context.CorrelationId,
            context.ReceiveContext.ElapsedTime.TotalMilliseconds);

        return Task.CompletedTask;
    }

    public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
    {
        _logger.LogError(exception,
            "SearchImportJob event fault: {EventType} | CorrelationId={CorrelationId}",
            typeof(T).Name,
            context.CorrelationId);

        return Task.CompletedTask;
    }
}

2.3 Register Observers

File: src/services/project-management/SyRF.ProjectManagement.Endpoint/Program.cs (or MassTransit configuration)

services.AddStateObserver<SearchImportJobState, SearchImportStateObserver>();
services.AddEventObserver<SearchImportJobState, SearchImportEventObserver>();

Phase 3: Message Flow Observers

Priority: Medium Effort: Medium Impact: Complete visibility into all message flow across services

3.1 Create Consume Observer

File: src/libs/webhostconfig/SyRF.WebHostConfig.Common/Observers/MessageConsumeObserver.cs

using MassTransit;
using Microsoft.Extensions.Logging;

namespace SyRF.WebHostConfig.Common.Observers;

public class MessageConsumeObserver : IConsumeObserver
{
    private readonly ILogger<MessageConsumeObserver> _logger;

    public MessageConsumeObserver(ILogger<MessageConsumeObserver> logger)
    {
        _logger = logger;
    }

    public Task PreConsume<T>(ConsumeContext<T> context) where T : class
    {
        _logger.LogDebug(
            "Consuming message: {MessageType} | MessageId={MessageId} | CorrelationId={CorrelationId}",
            typeof(T).Name,
            context.MessageId,
            context.CorrelationId);

        return Task.CompletedTask;
    }

    public Task PostConsume<T>(ConsumeContext<T> context) where T : class
    {
        _logger.LogInformation(
            "Consumed message: {MessageType} | MessageId={MessageId} | Duration={Duration}ms",
            typeof(T).Name,
            context.MessageId,
            context.ReceiveContext.ElapsedTime.TotalMilliseconds);

        return Task.CompletedTask;
    }

    public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
    {
        _logger.LogError(exception,
            "Message consume fault: {MessageType} | MessageId={MessageId} | CorrelationId={CorrelationId}",
            typeof(T).Name,
            context.MessageId,
            context.CorrelationId);

        return Task.CompletedTask;
    }
}

3.2 Create Send/Publish Observers

File: src/libs/webhostconfig/SyRF.WebHostConfig.Common/Observers/MessageSendObserver.cs

using MassTransit;
using Microsoft.Extensions.Logging;

namespace SyRF.WebHostConfig.Common.Observers;

public class MessageSendObserver : ISendObserver
{
    private readonly ILogger<MessageSendObserver> _logger;

    public MessageSendObserver(ILogger<MessageSendObserver> logger)
    {
        _logger = logger;
    }

    public Task PreSend<T>(SendContext<T> context) where T : class
    {
        _logger.LogDebug(
            "Sending message: {MessageType} | Destination={Destination} | CorrelationId={CorrelationId}",
            typeof(T).Name,
            context.DestinationAddress,
            context.CorrelationId);

        return Task.CompletedTask;
    }

    public Task PostSend<T>(SendContext<T> context) where T : class
    {
        _logger.LogInformation(
            "Sent message: {MessageType} | Destination={Destination} | MessageId={MessageId}",
            typeof(T).Name,
            context.DestinationAddress,
            context.MessageId);

        return Task.CompletedTask;
    }

    public Task SendFault<T>(SendContext<T> context, Exception exception) where T : class
    {
        _logger.LogError(exception,
            "Message send fault: {MessageType} | Destination={Destination}",
            typeof(T).Name,
            context.DestinationAddress);

        return Task.CompletedTask;
    }
}

public class MessagePublishObserver : IPublishObserver
{
    private readonly ILogger<MessagePublishObserver> _logger;

    public MessagePublishObserver(ILogger<MessagePublishObserver> logger)
    {
        _logger = logger;
    }

    public Task PrePublish<T>(PublishContext<T> context) where T : class
    {
        _logger.LogDebug(
            "Publishing message: {MessageType} | CorrelationId={CorrelationId}",
            typeof(T).Name,
            context.CorrelationId);

        return Task.CompletedTask;
    }

    public Task PostPublish<T>(PublishContext<T> context) where T : class
    {
        _logger.LogInformation(
            "Published message: {MessageType} | MessageId={MessageId}",
            typeof(T).Name,
            context.MessageId);

        return Task.CompletedTask;
    }

    public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class
    {
        _logger.LogError(exception,
            "Message publish fault: {MessageType}",
            typeof(T).Name);

        return Task.CompletedTask;
    }
}

3.3 Register Message Flow Observers

// In MassTransit configuration
services.AddConsumeObserver<MessageConsumeObserver>();
services.AddSendObserver<MessageSendObserver>();
services.AddPublishObserver<MessagePublishObserver>();

Phase 4: Distributed Tracing Dashboard

Priority: Low Effort: High Impact: Complete end-to-end trace visualization

4.1 Add Jaeger/Tempo Exporter

.WithTracing(b => b
    .AddSource(DiagnosticHeaders.DefaultListenerName)
    .AddOtlpExporter(o =>
    {
        o.Endpoint = new Uri(configuration["Tracing:OtlpEndpoint"]
            ?? "http://tempo:4317");
    })
)

4.2 Deploy Grafana Tempo (Helm)

Add to cluster-gitops infrastructure:

# cluster-gitops/infrastructure/tempo/values.yaml
tempo:
  storage:
    trace:
      backend: gcs
      gcs:
        bucket_name: syrf-traces

4.3 Grafana Dashboard

Create a dashboard showing:

  • Message flow between services
  • State machine transition timeline
  • Error rates and latency percentiles
  • Search import job progress

Quick Wins (No Code Changes)

These can be used immediately for debugging:

RabbitMQ Management UI

# Port-forward to RabbitMQ management UI
kubectl port-forward svc/rabbitmq 15672:15672 -n syrf

# Access at http://localhost:15672
# Default credentials in Helm values

Key queues to monitor:

  • search-import-job-state - State machine events
  • start-parsing-reference-file-command - Parse job commands
  • *_error queues - Dead letter messages

MongoDB Queries

// Check SearchImportJob status
db.pmProject.find(
    {"SearchImportJobs.Id": CSUUID("your-search-id")},
    {"SearchImportJobs.$": 1}
)

// Count studies by search
db.pmStudy.countDocuments({
    SystematicSearchId: CSUUID("your-search-id")
})

// Find stuck jobs (Uploading or Parsing for > 1 hour)
db.pmProject.aggregate([
    {$unwind: "$SearchImportJobs"},
    {$match: {
        "SearchImportJobs.Status": {$in: [0, 1]},  // Uploading, Parsing
        "SearchImportJobs.CreatedAt": {$lt: new Date(Date.now() - 3600000)}
    }},
    {$project: {
        searchId: "$SearchImportJobs.Id",
        status: "$SearchImportJobs.Status",
        name: "$SearchImportJobs.Name",
        createdAt: "$SearchImportJobs.CreatedAt"
    }}
])

Enable Debug Logging

Temporarily enable verbose MassTransit logging:

{
  "Logging": {
    "LogLevel": {
      "MassTransit": "Debug",
      "SyRF.ProjectManagement.Endpoint.Sagas": "Debug"
    }
  }
}

Service Logs

# Watch Project Management logs for state transitions
kubectl logs -f deploy/project-management -n syrf | grep -E "(SearchImport|StateMachine)"

# Watch API logs for presigning events
kubectl logs -f deploy/api -n syrf | grep "SearchUploadStarted"

Success Criteria

Phase 1 Complete When

  • Prometheus metrics endpoint exposed on all services
  • MassTransit metrics visible in Prometheus/Grafana
  • Basic dashboard showing message throughput and error rates

Phase 2 Complete When

  • State machine transitions logged with correlation IDs
  • State transition timeline visible in logs
  • Alerts configured for stuck state machines

Phase 3 Complete When

  • All message flow logged with correlation IDs
  • End-to-end message latency visible
  • Consumer processing times tracked

Phase 4 Complete When

  • Distributed traces visible in Grafana/Jaeger
  • Search import flow traceable from API to completion
  • Custom dashboard for systematic search upload monitoring