Feature: MongoDB Change Stream Resilience & Auto-Recovery¶
Summary¶
Improve the MongoDB change stream implementation to automatically recover from oplog resumability failures, and add health checks that enable Kubernetes to auto-restart pods when change streams are unhealthy.
Problem Statement¶
What Happened¶
On 2025-01-20, real-time push notifications (SignalR) stopped working in production without any code changes. Investigation revealed:
- Cause: Database activity (testing preview environment database snapshots) caused rapid oplog growth
- Effect: Change stream resume tokens became invalid as oplog entries were purged
- Symptom: The retry mechanism kept retrying with stale tokens instead of starting fresh
- Impact: Users stopped receiving live updates (screening decisions, project changes, etc.)
Why It Wasn't Detected¶
- No health check monitors change stream health
- Error logging existed but wasn't surfaced to alerts
- Kubernetes had no signal to restart the affected pods
Current Workaround¶
Manual pod restart clears in-memory state and restores functionality.
Solution Overview¶
| Part | Description | Impact |
|---|---|---|
| 1 | Expand error code handling | Fixes root cause - invalid tokens are cleared |
| 2 | Add health check infrastructure | Enables monitoring of stream health |
| 3 | Wire to Kubernetes liveness probe | Enables automatic pod restart |
| 4 | Add structured logging | Enables proactive alerting |
Detailed Implementation¶
Part 1: Expand Error Code Handling¶
File: src/libs/mongo/SyRF.Mongo.Common/MongoContext.cs
Location: Around line 290
Current Code:
catch (MongoCommandException cmd) when (cmd.Code is 136 or 280 or 286)
{
_logger.LogWarning(cmd, "Invalid resume token for {Key}", key);
await _resumeRepo.ClearResumePointAsync(key, CancellationToken.None);
obs.OnError(cmd);
}
New Code:
catch (MongoCommandException cmd) when (IsInvalidResumeTokenError(cmd))
{
_logger.LogWarning(
Events.ChangeStreamResumeTokenInvalid,
cmd,
"Invalid resume token for {Key}, clearing and will retry from current time",
key);
await _resumeRepo.ClearResumePointAsync(key, CancellationToken.None);
obs.OnError(cmd);
}
// Add helper method:
private static bool IsInvalidResumeTokenError(MongoCommandException cmd)
{
// Known error codes for invalid/expired resume tokens
if (cmd.Code is 136 or 260 or 280 or 286 or 346)
return true;
// Fallback: check error message for oplog-related failures
var message = cmd.Message;
return message.Contains("resume point may no longer be in the oplog", StringComparison.OrdinalIgnoreCase)
|| message.Contains("resume token was not found", StringComparison.OrdinalIgnoreCase)
|| message.Contains("the resume token was invalidated", StringComparison.OrdinalIgnoreCase);
}
Error Codes Reference:
| Code | Meaning |
|---|---|
| 136 | CappedPositionLost |
| 260 | InvalidResumeToken |
| 280 | ChangeStreamFatalError |
| 286 | ChangeStreamHistoryLost |
| 346 | ChangeStreamInvalidated |
Part 2: Add Change Stream Health Check Infrastructure¶
2a. Create Health Tracker¶
New File: src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthTracker.cs
using System;
using System.Collections.Concurrent;
using System.Linq;
using Microsoft.Extensions.Diagnostics.HealthChecks;
namespace SyRF.Mongo.Common;
/// <summary>
/// Tracks the health of MongoDB change streams across all collections.
/// Used by ChangeStreamHealthCheck to determine if streams are functioning.
/// </summary>
public class ChangeStreamHealthTracker
{
private readonly ConcurrentDictionary<string, StreamHealthState> _streamStates = new();
private readonly int _maxConsecutiveFailures;
public ChangeStreamHealthTracker(int maxConsecutiveFailures = 5)
{
_maxConsecutiveFailures = maxConsecutiveFailures;
}
/// <summary>
/// Record a successful change stream operation (document received or retry succeeded).
/// </summary>
public void RecordSuccess(string streamKey)
{
_streamStates.AddOrUpdate(
streamKey,
_ => new StreamHealthState(LastSuccess: DateTime.UtcNow),
(_, _) => new StreamHealthState(LastSuccess: DateTime.UtcNow));
}
/// <summary>
/// Record a change stream failure (retry attempt).
/// </summary>
public void RecordFailure(string streamKey, Exception? error = null)
{
_streamStates.AddOrUpdate(
streamKey,
_ => new StreamHealthState(
ConsecutiveFailures: 1,
LastFailure: DateTime.UtcNow,
LastError: error?.Message),
(_, existing) => new StreamHealthState(
ConsecutiveFailures: existing.ConsecutiveFailures + 1,
LastFailure: DateTime.UtcNow,
LastError: error?.Message));
}
/// <summary>
/// Get the overall health status of all change streams.
/// </summary>
public HealthCheckResult GetHealthStatus()
{
var unhealthyStreams = _streamStates
.Where(kvp => kvp.Value.ConsecutiveFailures >= _maxConsecutiveFailures)
.Select(kvp => kvp.Key)
.ToList();
var degradedStreams = _streamStates
.Where(kvp => kvp.Value.ConsecutiveFailures > 0
&& kvp.Value.ConsecutiveFailures < _maxConsecutiveFailures)
.Select(kvp => kvp.Key)
.ToList();
if (unhealthyStreams.Count > 0)
{
return HealthCheckResult.Unhealthy(
$"Change streams unhealthy: {string.Join(", ", unhealthyStreams)}",
data: new Dictionary<string, object>
{
["unhealthyStreams"] = unhealthyStreams,
["degradedStreams"] = degradedStreams
});
}
if (degradedStreams.Count > 0)
{
return HealthCheckResult.Degraded(
$"Change streams degraded: {string.Join(", ", degradedStreams)}");
}
return HealthCheckResult.Healthy("All change streams operational");
}
// Immutable state record — safe for concurrent reads from GetHealthStatus()
// while AddOrUpdate replaces values atomically.
private sealed record StreamHealthState(
DateTime? LastSuccess = null,
DateTime? LastFailure = null,
int ConsecutiveFailures = 0,
string? LastError = null);
}
2b. Create Health Check¶
New File: src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthCheck.cs
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Diagnostics.HealthChecks;
namespace SyRF.Mongo.Common;
/// <summary>
/// Health check that reports the status of MongoDB change streams.
/// Register with tags ["live", "ready"] to affect Kubernetes probes.
/// </summary>
public class ChangeStreamHealthCheck : IHealthCheck
{
private readonly ChangeStreamHealthTracker _tracker;
public ChangeStreamHealthCheck(ChangeStreamHealthTracker tracker)
{
_tracker = tracker;
}
public Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
return Task.FromResult(_tracker.GetHealthStatus());
}
}
2c. Wire Tracker into MongoContext¶
File: src/libs/mongo/SyRF.Mongo.Common/MongoContext.cs
Add dependency injection for the tracker and call it during retry:
// Constructor addition:
private readonly ChangeStreamHealthTracker _healthTracker;
public MongoContext(
MongoConnectionSettings mongoConnectionSettings,
ILogger<MongoContext> logger,
IResumePointRepository resumeRepo,
ChangeStreamHealthTracker healthTracker) // NEW
{
// ...
_healthTracker = healthTracker;
}
// In GetCachedCollectionChangeStream, modify the retry callback:
.RetryWithExponentialBackoff(
maxRetries: 15,
initialDelay: TimeSpan.FromSeconds(2),
maxDelay: TimeSpan.FromSeconds(180),
shouldRetry: ex => ex is MongoException or TimeoutException or ChangeStreamClosedException,
onRetry: (ex, attempt, delay) =>
{
_healthTracker.RecordFailure(key, ex); // NEW
_logger.LogWarning(/* existing logging */);
},
// onSuccess only fires after recovering from a failure (not on every document)
onSuccess: () => _healthTracker.RecordSuccess(key), // NEW
logger: _logger
)
Part 3: Wire Health Check to Kubernetes Liveness Probe¶
File: src/services/api/SyRF.API.Endpoint/Program.cs
// In service registration (around line 76):
registry.AddSyrfInstrumentationAndHealthChecks(context, hcb =>
{
hcb.AddCheck<ChangeStreamHealthCheck>(
"change-streams",
failureStatus: HealthStatus.Unhealthy,
tags: new[] { "live", "ready" });
});
// Also register the tracker as singleton:
registry.AddSingleton<ChangeStreamHealthTracker>();
File: src/services/project-management/SyRF.ProjectManagement.Endpoint/Program.cs
Same registration if PM service uses change streams directly.
Effect: When change streams are unhealthy:
/health/livereturns HTTP 503- Kubernetes detects liveness probe failure
- After
failureThreshold(default 3) failures, pod is restarted - Fresh pod starts with clean state and new change streams
Part 4: Add Structured Logging for Observability¶
New File: src/libs/mongo/SyRF.Mongo.Common/Events.cs
using Microsoft.Extensions.Logging;
namespace SyRF.Mongo.Common;
/// <summary>
/// Structured logging event IDs for MongoDB operations.
/// Enables filtering and alerting in logging systems.
/// </summary>
public static class Events
{
// Change Stream Events (1000-1099)
public static readonly EventId ChangeStreamOpened = new(1000, "ChangeStreamOpened");
public static readonly EventId ChangeStreamClosed = new(1001, "ChangeStreamClosed");
public static readonly EventId ChangeStreamRetryAttempt = new(1002, "ChangeStreamRetryAttempt");
public static readonly EventId ChangeStreamRetryExhausted = new(1003, "ChangeStreamRetryExhausted");
public static readonly EventId ChangeStreamResumeTokenInvalid = new(1004, "ChangeStreamResumeTokenInvalid");
public static readonly EventId ChangeStreamResumeTokenCleared = new(1005, "ChangeStreamResumeTokenCleared");
public static readonly EventId ChangeStreamHealthUnhealthy = new(1006, "ChangeStreamHealthUnhealthy");
}
Update logging calls in MongoContext.cs:
// Example: When opening stream
_logger.LogInformation(
Events.ChangeStreamOpened,
"Opened change stream for {StreamKey} with resume strategy {ResumeStrategy}",
key,
resume?.ResumeToken != null ? "token" : resume?.ClusterTime != null ? "time" : "fresh");
// Example: When retry fails
_logger.LogWarning(
Events.ChangeStreamRetryAttempt,
ex,
"Change stream {StreamKey} retry attempt {Attempt}/{MaxRetries}, next retry in {DelaySeconds}s",
key, attempt, maxRetries, delay.TotalSeconds);
Sentry Alert Configuration (for documentation):
Create alerts for: - Event ID 1003 (ChangeStreamRetryExhausted) - Critical - Event ID 1006 (ChangeStreamHealthUnhealthy) - Warning - Multiple Event ID 1002 within 5 minutes - Warning
Files Summary¶
| File | Action | Description |
|---|---|---|
src/libs/mongo/SyRF.Mongo.Common/MongoContext.cs |
Modify | Expand error handling, add health tracking hooks |
src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthTracker.cs |
Create | Tracks stream health state |
src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthCheck.cs |
Create | IHealthCheck implementation |
src/libs/mongo/SyRF.Mongo.Common/Events.cs |
Create | Structured logging event IDs |
src/services/api/SyRF.API.Endpoint/Program.cs |
Modify | Register health check |
src/services/project-management/SyRF.ProjectManagement.Endpoint/Program.cs |
Modify | Register health check (if needed) |
Testing Plan¶
Unit Tests¶
New File: src/libs/mongo/SyRF.Mongo.Common.Tests/ChangeStreamHealthTrackerTests.cs
Test cases:
- RecordSuccess_ClearsConsecutiveFailures
- RecordFailure_IncrementsConsecutiveFailures
- GetHealthStatus_ReturnsHealthy_WhenNoFailures
- GetHealthStatus_ReturnsDegraded_WhenSomeFailures
- GetHealthStatus_ReturnsUnhealthy_WhenThresholdExceeded
- IsInvalidResumeTokenError_MatchesKnownCodes
- IsInvalidResumeTokenError_MatchesOplogMessage
Integration Test¶
- Deploy to staging with new code
- Manually trigger change stream failure (if possible via MongoDB Atlas)
- Verify:
- Health check reports unhealthy
/health/livereturns 503- Pod restarts automatically
- SignalR reconnects after restart
Manual Verification¶
# Check health endpoint
curl https://api.staging.syrf.org.uk/health/live | jq
# Watch for pod restarts
kubectl get events -n syrf-staging --watch
# Check logs for new event IDs
kubectl logs -n syrf-staging deployment/syrf-api | grep "ChangeStream"
Rollout Plan¶
- PR Review: Standard review process
- Staging Deploy: Via normal CI/CD pipeline
- Staging Verification: Run integration tests
- Production Deploy: Manual promotion after staging verification
- Monitoring: Watch for health check alerts for 24 hours
Future Considerations¶
- Metrics: Add Prometheus metrics for change stream health (retry counts, failure rates)
- Dashboard: Create Grafana dashboard for change stream monitoring
- Persistent Resume Points: Consider Redis-backed resume points for cross-pod resumability (lower priority)
References¶
- MongoDB Change Streams Documentation
- MongoDB Error Codes
- ASP.NET Core Health Checks
- Original incident: 2025-01-20 SignalR production outage