Skip to content

Feature: MongoDB Change Stream Resilience & Auto-Recovery

Summary

Improve the MongoDB change stream implementation to automatically recover from oplog resumability failures, and add health checks that enable Kubernetes to auto-restart pods when change streams are unhealthy.

Problem Statement

What Happened

On 2025-01-20, real-time push notifications (SignalR) stopped working in production without any code changes. Investigation revealed:

  1. Cause: Database activity (testing preview environment database snapshots) caused rapid oplog growth
  2. Effect: Change stream resume tokens became invalid as oplog entries were purged
  3. Symptom: The retry mechanism kept retrying with stale tokens instead of starting fresh
  4. Impact: Users stopped receiving live updates (screening decisions, project changes, etc.)

Why It Wasn't Detected

  • No health check monitors change stream health
  • Error logging existed but wasn't surfaced to alerts
  • Kubernetes had no signal to restart the affected pods

Current Workaround

Manual pod restart clears in-memory state and restores functionality.


Solution Overview

Part Description Impact
1 Expand error code handling Fixes root cause - invalid tokens are cleared
2 Add health check infrastructure Enables monitoring of stream health
3 Wire to Kubernetes liveness probe Enables automatic pod restart
4 Add structured logging Enables proactive alerting

Detailed Implementation

Part 1: Expand Error Code Handling

File: src/libs/mongo/SyRF.Mongo.Common/MongoContext.cs

Location: Around line 290

Current Code:

catch (MongoCommandException cmd) when (cmd.Code is 136 or 280 or 286)
{
    _logger.LogWarning(cmd, "Invalid resume token for {Key}", key);
    await _resumeRepo.ClearResumePointAsync(key, CancellationToken.None);
    obs.OnError(cmd);
}

New Code:

catch (MongoCommandException cmd) when (IsInvalidResumeTokenError(cmd))
{
    _logger.LogWarning(
        Events.ChangeStreamResumeTokenInvalid,
        cmd,
        "Invalid resume token for {Key}, clearing and will retry from current time",
        key);
    await _resumeRepo.ClearResumePointAsync(key, CancellationToken.None);
    obs.OnError(cmd);
}

// Add helper method:
private static bool IsInvalidResumeTokenError(MongoCommandException cmd)
{
    // Known error codes for invalid/expired resume tokens
    if (cmd.Code is 136 or 260 or 280 or 286 or 346)
        return true;

    // Fallback: check error message for oplog-related failures
    var message = cmd.Message;
    return message.Contains("resume point may no longer be in the oplog", StringComparison.OrdinalIgnoreCase)
        || message.Contains("resume token was not found", StringComparison.OrdinalIgnoreCase)
        || message.Contains("the resume token was invalidated", StringComparison.OrdinalIgnoreCase);
}

Error Codes Reference:

Code Meaning
136 CappedPositionLost
260 InvalidResumeToken
280 ChangeStreamFatalError
286 ChangeStreamHistoryLost
346 ChangeStreamInvalidated

Part 2: Add Change Stream Health Check Infrastructure

2a. Create Health Tracker

New File: src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthTracker.cs

using System;
using System.Collections.Concurrent;
using System.Linq;
using Microsoft.Extensions.Diagnostics.HealthChecks;

namespace SyRF.Mongo.Common;

/// <summary>
/// Tracks the health of MongoDB change streams across all collections.
/// Used by ChangeStreamHealthCheck to determine if streams are functioning.
/// </summary>
public class ChangeStreamHealthTracker
{
    private readonly ConcurrentDictionary<string, StreamHealthState> _streamStates = new();
    private readonly int _maxConsecutiveFailures;

    public ChangeStreamHealthTracker(int maxConsecutiveFailures = 5)
    {
        _maxConsecutiveFailures = maxConsecutiveFailures;
    }

    /// <summary>
    /// Record a successful change stream operation (document received or retry succeeded).
    /// </summary>
    public void RecordSuccess(string streamKey)
    {
        _streamStates.AddOrUpdate(
            streamKey,
            _ => new StreamHealthState(LastSuccess: DateTime.UtcNow),
            (_, _) => new StreamHealthState(LastSuccess: DateTime.UtcNow));
    }

    /// <summary>
    /// Record a change stream failure (retry attempt).
    /// </summary>
    public void RecordFailure(string streamKey, Exception? error = null)
    {
        _streamStates.AddOrUpdate(
            streamKey,
            _ => new StreamHealthState(
                ConsecutiveFailures: 1,
                LastFailure: DateTime.UtcNow,
                LastError: error?.Message),
            (_, existing) => new StreamHealthState(
                ConsecutiveFailures: existing.ConsecutiveFailures + 1,
                LastFailure: DateTime.UtcNow,
                LastError: error?.Message));
    }

    /// <summary>
    /// Get the overall health status of all change streams.
    /// </summary>
    public HealthCheckResult GetHealthStatus()
    {
        var unhealthyStreams = _streamStates
            .Where(kvp => kvp.Value.ConsecutiveFailures >= _maxConsecutiveFailures)
            .Select(kvp => kvp.Key)
            .ToList();

        var degradedStreams = _streamStates
            .Where(kvp => kvp.Value.ConsecutiveFailures > 0
                       && kvp.Value.ConsecutiveFailures < _maxConsecutiveFailures)
            .Select(kvp => kvp.Key)
            .ToList();

        if (unhealthyStreams.Count > 0)
        {
            return HealthCheckResult.Unhealthy(
                $"Change streams unhealthy: {string.Join(", ", unhealthyStreams)}",
                data: new Dictionary<string, object>
                {
                    ["unhealthyStreams"] = unhealthyStreams,
                    ["degradedStreams"] = degradedStreams
                });
        }

        if (degradedStreams.Count > 0)
        {
            return HealthCheckResult.Degraded(
                $"Change streams degraded: {string.Join(", ", degradedStreams)}");
        }

        return HealthCheckResult.Healthy("All change streams operational");
    }

    // Immutable state record — safe for concurrent reads from GetHealthStatus()
    // while AddOrUpdate replaces values atomically.
    private sealed record StreamHealthState(
        DateTime? LastSuccess = null,
        DateTime? LastFailure = null,
        int ConsecutiveFailures = 0,
        string? LastError = null);
}

2b. Create Health Check

New File: src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthCheck.cs

using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Diagnostics.HealthChecks;

namespace SyRF.Mongo.Common;

/// <summary>
/// Health check that reports the status of MongoDB change streams.
/// Register with tags ["live", "ready"] to affect Kubernetes probes.
/// </summary>
public class ChangeStreamHealthCheck : IHealthCheck
{
    private readonly ChangeStreamHealthTracker _tracker;

    public ChangeStreamHealthCheck(ChangeStreamHealthTracker tracker)
    {
        _tracker = tracker;
    }

    public Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext context,
        CancellationToken cancellationToken = default)
    {
        return Task.FromResult(_tracker.GetHealthStatus());
    }
}

2c. Wire Tracker into MongoContext

File: src/libs/mongo/SyRF.Mongo.Common/MongoContext.cs

Add dependency injection for the tracker and call it during retry:

// Constructor addition:
private readonly ChangeStreamHealthTracker _healthTracker;

public MongoContext(
    MongoConnectionSettings mongoConnectionSettings,
    ILogger<MongoContext> logger,
    IResumePointRepository resumeRepo,
    ChangeStreamHealthTracker healthTracker)  // NEW
{
    // ...
    _healthTracker = healthTracker;
}

// In GetCachedCollectionChangeStream, modify the retry callback:
.RetryWithExponentialBackoff(
    maxRetries: 15,
    initialDelay: TimeSpan.FromSeconds(2),
    maxDelay: TimeSpan.FromSeconds(180),
    shouldRetry: ex => ex is MongoException or TimeoutException or ChangeStreamClosedException,
    onRetry: (ex, attempt, delay) =>
    {
        _healthTracker.RecordFailure(key, ex);  // NEW
        _logger.LogWarning(/* existing logging */);
    },
    // onSuccess only fires after recovering from a failure (not on every document)
    onSuccess: () => _healthTracker.RecordSuccess(key),  // NEW
    logger: _logger
)

Part 3: Wire Health Check to Kubernetes Liveness Probe

File: src/services/api/SyRF.API.Endpoint/Program.cs

// In service registration (around line 76):
registry.AddSyrfInstrumentationAndHealthChecks(context, hcb =>
{
    hcb.AddCheck<ChangeStreamHealthCheck>(
        "change-streams",
        failureStatus: HealthStatus.Unhealthy,
        tags: new[] { "live", "ready" });
});

// Also register the tracker as singleton:
registry.AddSingleton<ChangeStreamHealthTracker>();

File: src/services/project-management/SyRF.ProjectManagement.Endpoint/Program.cs

Same registration if PM service uses change streams directly.

Effect: When change streams are unhealthy:

  1. /health/live returns HTTP 503
  2. Kubernetes detects liveness probe failure
  3. After failureThreshold (default 3) failures, pod is restarted
  4. Fresh pod starts with clean state and new change streams

Part 4: Add Structured Logging for Observability

New File: src/libs/mongo/SyRF.Mongo.Common/Events.cs

using Microsoft.Extensions.Logging;

namespace SyRF.Mongo.Common;

/// <summary>
/// Structured logging event IDs for MongoDB operations.
/// Enables filtering and alerting in logging systems.
/// </summary>
public static class Events
{
    // Change Stream Events (1000-1099)
    public static readonly EventId ChangeStreamOpened = new(1000, "ChangeStreamOpened");
    public static readonly EventId ChangeStreamClosed = new(1001, "ChangeStreamClosed");
    public static readonly EventId ChangeStreamRetryAttempt = new(1002, "ChangeStreamRetryAttempt");
    public static readonly EventId ChangeStreamRetryExhausted = new(1003, "ChangeStreamRetryExhausted");
    public static readonly EventId ChangeStreamResumeTokenInvalid = new(1004, "ChangeStreamResumeTokenInvalid");
    public static readonly EventId ChangeStreamResumeTokenCleared = new(1005, "ChangeStreamResumeTokenCleared");
    public static readonly EventId ChangeStreamHealthUnhealthy = new(1006, "ChangeStreamHealthUnhealthy");
}

Update logging calls in MongoContext.cs:

// Example: When opening stream
_logger.LogInformation(
    Events.ChangeStreamOpened,
    "Opened change stream for {StreamKey} with resume strategy {ResumeStrategy}",
    key,
    resume?.ResumeToken != null ? "token" : resume?.ClusterTime != null ? "time" : "fresh");

// Example: When retry fails
_logger.LogWarning(
    Events.ChangeStreamRetryAttempt,
    ex,
    "Change stream {StreamKey} retry attempt {Attempt}/{MaxRetries}, next retry in {DelaySeconds}s",
    key, attempt, maxRetries, delay.TotalSeconds);

Sentry Alert Configuration (for documentation):

Create alerts for: - Event ID 1003 (ChangeStreamRetryExhausted) - Critical - Event ID 1006 (ChangeStreamHealthUnhealthy) - Warning - Multiple Event ID 1002 within 5 minutes - Warning


Files Summary

File Action Description
src/libs/mongo/SyRF.Mongo.Common/MongoContext.cs Modify Expand error handling, add health tracking hooks
src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthTracker.cs Create Tracks stream health state
src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthCheck.cs Create IHealthCheck implementation
src/libs/mongo/SyRF.Mongo.Common/Events.cs Create Structured logging event IDs
src/services/api/SyRF.API.Endpoint/Program.cs Modify Register health check
src/services/project-management/SyRF.ProjectManagement.Endpoint/Program.cs Modify Register health check (if needed)

Testing Plan

Unit Tests

New File: src/libs/mongo/SyRF.Mongo.Common.Tests/ChangeStreamHealthTrackerTests.cs

Test cases: - RecordSuccess_ClearsConsecutiveFailures - RecordFailure_IncrementsConsecutiveFailures - GetHealthStatus_ReturnsHealthy_WhenNoFailures - GetHealthStatus_ReturnsDegraded_WhenSomeFailures - GetHealthStatus_ReturnsUnhealthy_WhenThresholdExceeded - IsInvalidResumeTokenError_MatchesKnownCodes - IsInvalidResumeTokenError_MatchesOplogMessage

Integration Test

  1. Deploy to staging with new code
  2. Manually trigger change stream failure (if possible via MongoDB Atlas)
  3. Verify:
  4. Health check reports unhealthy
  5. /health/live returns 503
  6. Pod restarts automatically
  7. SignalR reconnects after restart

Manual Verification

# Check health endpoint
curl https://api.staging.syrf.org.uk/health/live | jq

# Watch for pod restarts
kubectl get events -n syrf-staging --watch

# Check logs for new event IDs
kubectl logs -n syrf-staging deployment/syrf-api | grep "ChangeStream"

Rollout Plan

  1. PR Review: Standard review process
  2. Staging Deploy: Via normal CI/CD pipeline
  3. Staging Verification: Run integration tests
  4. Production Deploy: Manual promotion after staging verification
  5. Monitoring: Watch for health check alerts for 24 hours

Future Considerations

  • Metrics: Add Prometheus metrics for change stream health (retry counts, failure rates)
  • Dashboard: Create Grafana dashboard for change stream monitoring
  • Persistent Resume Points: Consider Redis-backed resume points for cross-pod resumability (lower priority)

References