Feature: Change Stream OTel Metrics & Health Tracker Improvements¶
Summary¶
Fix the ChangeStreamHealthTracker to preserve diagnostic information after recovery, add per-stream detail to the health check data, and add OpenTelemetry metrics for time-series observability. Wire up the dormant OTel metrics pipeline and connect it to Grafana Cloud free tier.
Problem Statement¶
Health Tracker Wipes Diagnostic Info¶
RecordSuccess creates a brand-new StreamHealthState, wiping LastFailure, LastError, and any context about what just happened. RecordFailure similarly wipes LastSuccess. After recovery, there's zero visibility into the preceding failure.
Root cause: Both methods use new StreamHealthState(...) instead of with expressions.
Additionally, GetHealthStatus() only returns stream names in its data dictionary — not per-stream details like timestamps or error messages. Even after fixing the with issue, the preserved state isn't surfaced anywhere operationally. The /health/live response writer only extracts data from the "version" health check entry, so change-stream data would need a separate endpoint or response writer update to be visible via HTTP.
No Time-Series Observability¶
The health check only answers "is it alive now?" (binary healthy/degraded/unhealthy for Kubernetes). There's no way to see:
- How many times a stream has failed and recovered over hours/days
- Whether a stream is flapping (repeated fail-recover cycles)
- Trends in failure rates across deployments
- Gradual degradation before it becomes an outage
Partially Dormant OTel Infrastructure¶
The project has all OTel metrics instrumentation packages installed in WebHostConfig.Common.csproj but only tracing is wired up:
| Package | Status | What it provides |
|---|---|---|
OpenTelemetry.Instrumentation.Runtime |
Installed, not wired | GC pressure, memory, thread pool, exceptions |
OpenTelemetry.Instrumentation.AspNetCore |
Installed, wired for tracing only | Request rate, latency histograms, error rates |
OpenTelemetry.Instrumentation.Http |
Installed, wired for tracing only | Outbound HTTP call metrics (Atlas, Auth0, AWS) |
OpenTelemetry.Exporter.OpenTelemetryProtocol |
Installed, not wired | OTLP export to any compatible backend |
The active AddOpenTelemetryConfig() method in SyrfConfigureServices.cs only calls .WithTracing() — there is no .WithMetrics(). A separate ConfigureOpenTelemetry() method in HostExtensions.cs does include .WithMetrics() but it is unused by any service — it was added as part of an Aspire-style pattern that was never activated.
Service coverage: Only the API service calls AddOpenTelemetryConfig(). The PM and Quartz services do not — this is a pre-existing gap.
Solution Overview¶
| Part | Description | Scope |
|---|---|---|
| 1 | Fix health tracker with expressions + enrich health data |
ChangeStreamHealthTracker.cs |
| 2 | Add OTel metrics instruments | New ChangeStreamMetrics.cs |
| 3 | Wire up OTel metrics pipeline (API service) | SyrfConfigureServices.cs |
| 4 | Connect Grafana Cloud free tier | Helm chart + cluster-gitops |
Design Decisions¶
Why both health tracker fix AND OTel metrics (not one or the other)?
They serve different purposes:
| Concern | Health tracker | OTel metrics |
|---|---|---|
| Kubernetes liveness probes | Yes — drives pod restarts | No |
| Preserve failure context after recovery | Yes — the with fix |
No (counters don't store "last error") |
| Time-series trends and alerting | No | Yes |
| Survive pod restarts | No (in-memory) | Yes (exported to backend) |
| Detect flapping over hours/days | No | Yes |
Why NOT add TotalFailures/TotalRecoveries to the health state?
OTel counters handle this better — they persist in the metrics backend, survive pod restarts, and support proper time-series queries and alerting. Adding lifetime counters to the in-memory health state would be redundant.
Why Grafana Cloud free tier?
- 10K active series, 14-day retention — well within SyRF's scale
- Zero cost, no credit card required
- Native OTLP support
- Built-in dashboards and alerting
- Sentry (current APM) does NOT support OTel metrics
- Elastic APM (installed but disabled) is an alternative if subscription is still active
v1 scope: API service only
Only the API service calls AddOpenTelemetryConfig(). PM and Quartz do not — wiring them up is a follow-up task. The OTLP env var mapping targets API only for v1 to stay consistent.
Part 1: Fix RecordSuccess/RecordFailure + Enrich Health Data¶
File: src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthTracker.cs
RecordSuccess (line 29)¶
Before (wipes LastFailure, LastError):
After (preserves failure history, uses _timeProvider):
var now = _timeProvider.GetUtcNow().UtcDateTime;
// ...
(_, existing) => existing with
{
LastSuccess = now,
ConsecutiveFailures = 0,
Phase = StreamPhase.Active,
Termination = null,
OutageStartedAt = null
}
RecordFailure (line 46)¶
Before (wipes LastSuccess):
(_, existing) => new StreamHealthState(
ConsecutiveFailures: existing.ConsecutiveFailures + 1,
LastFailure: DateTime.UtcNow,
LastError: error?.Message)
After (preserves success history, uses _timeProvider):
var now = _timeProvider.GetUtcNow().UtcDateTime;
// ...
(_, existing) => existing with
{
ConsecutiveFailures = existing.ConsecutiveFailures + 1,
LastFailure = now,
LastError = error?.Message,
Phase = StreamPhase.Active,
Termination = null,
OutageStartedAt = existing.ConsecutiveFailures == 0 ? now : existing.OutageStartedAt
}
Enrich GetHealthStatus with per-stream details¶
Add a streams entry to HealthCheckResult.Data for all three return paths (Healthy, Degraded, Unhealthy):
// Only include streams that participate in health classification
// Excluded: clean disconnects (0 failures) and unsubscribed streams (nobody using them)
var evaluatedStreams = _streamStates
.Where(kvp => !(kvp.Value.Phase == StreamPhase.Terminated
&& (kvp.Value.ConsecutiveFailures == 0
|| kvp.Value.Termination == TerminationReason.Unsubscribed)));
data["streams"] = evaluatedStreams.ToDictionary(
kvp => kvp.Key,
kvp => (object)new Dictionary<string, object?>
{
["consecutiveFailures"] = kvp.Value.ConsecutiveFailures,
["lastSuccess"] = kvp.Value.LastSuccess,
["lastFailure"] = kvp.Value.LastFailure,
["lastError"] = kvp.Value.LastError,
["phase"] = kvp.Value.Phase.ToString(),
["termination"] = kvp.Value.Termination?.ToString(),
["outageStartedAt"] = kvp.Value.OutageStartedAt,
["healthReason"] = ComputeHealthReason(kvp.Value, now)
});
Each key is the stream key (fully-qualified type name), each value is a Dictionary<string, object?> with eight fields. The phase field ("Active" or "Terminated") identifies stream lifecycle state. The termination field is null for Active-phase streams and "Error" or "Unsubscribed" for Terminated-phase streams (though Unsubscribed streams are filtered out — see below). The outageStartedAt field is the timestamp when the current outage began (first failure after recovery) — null when the stream is healthy. The healthReason field provides a diagnostic classification (e.g., "active_failing", "terminated_retrying", "healthy") covering the 7 non-filtered classification table rows — this gives programmatic consumers richer context than the standard HealthStatus enum while keeping probes simple. Six possible values: healthy, active_failing, active_recovering, active_stale, terminated_retrying, terminated_stale. (active_failing covers both below-threshold and at/above-threshold-within-grace-period cases.) Terminated streams with zero failures (clean disconnect) and unsubscribed streams (all clients left via RefCount() disposal) are filtered out — they have no diagnostic value. Using an explicit dictionary (rather than anonymous objects) ensures tests can assert against individual fields without reflection or dynamic. StreamHealthState is already accessible within GetHealthStatus (private nested record in the same class).
Note: The current /health/live response writer (SyrfHealthCheckExtensions.WriteHealthResponseWithVersion) only extracts data from the "version" entry. Per-stream details will be present in HealthCheckResult.Data for programmatic consumers but won't appear in the HTTP response until the response writer is updated. That's a separate follow-up task.
Security consideration: lastError contains raw Exception.Message text which may include connection strings or internal hostnames. For the HealthCheckResult.Data dictionary (programmatic consumers only, not HTTP-exposed), this is acceptable for v1. When the /health/live response writer is updated to surface per-stream data (see Future Enhancements), error messages must be sanitised — e.g., truncate to type name + first 100 characters, or expose only Exception.GetType().Name.
Stale-failure grace period (two-threshold model)¶
To prevent unnecessary pod restarts from document-gated recovery (see Part 2 caveat) while still detecting genuinely dead streams, GetHealthStatus() applies two time thresholds against LastFailure:
Timeline after last failure:
[0 – 5 min) → Unhealthy (actively failing or just stopped)
[5 min – 30 min) → Degraded (maybe recovered quietly, give it time)
[30 min+) → Unhealthy (no activity at all — stream likely dead)
If a document arrives at any point, RecordSuccess resets ConsecutiveFailures to 0, dropping the stream out of both thresholds entirely.
// Constructor — TimeSpan defaults are not compile-time constants,
// so use nullable + null-coalescing pattern.
// TimeProvider is the single clock source for all timestamps
// (RecordSuccess, RecordFailure, GetHealthStatus). Default
// TimeProvider.System for production; inject FakeTimeProvider in tests.
public ChangeStreamHealthTracker(
int maxConsecutiveFailures = 5,
TimeSpan? staleFailureGracePeriod = null,
TimeSpan? maxStalenessPeriod = null,
TimeProvider? timeProvider = null)
{
_maxConsecutiveFailures = maxConsecutiveFailures;
_staleFailureGracePeriod = staleFailureGracePeriod ?? TimeSpan.FromMinutes(5);
_maxStalenessPeriod = maxStalenessPeriod ?? TimeSpan.FromMinutes(30);
_timeProvider = timeProvider ?? TimeProvider.System;
}
// In GetHealthStatus(), when classifying streams:
var now = _timeProvider.GetUtcNow().UtcDateTime;
foreach (var kvp in _streamStates)
{
// Terminated streams
if (kvp.Value.Phase == StreamPhase.Terminated)
{
// Unsubscribed streams: skip liveness entirely — nobody is using this stream,
// so pod restart has no benefit. Stale failures from the old session are irrelevant.
if (kvp.Value.Termination == TerminationReason.Unsubscribed)
continue;
// Error-terminated streams: classify by outage duration (not LastFailure,
// which refreshes on every outer retry cycle's inner failures)
if (kvp.Value.ConsecutiveFailures > 0)
{
var timeSinceOutageStart = kvp.Value.OutageStartedAt.HasValue
? now - kvp.Value.OutageStartedAt.Value
: TimeSpan.Zero;
if (timeSinceOutageStart >= _maxStalenessPeriod)
→ unhealthyStreams // outage has persisted too long
else
→ degradedStreams // outer retry may still recover
}
// else: error-terminated with 0 failures (clean disconnect) — skip
continue; // Skip Active classification for all Terminated streams
}
// Active streams: two-threshold classification
var timeSinceLastFailure = kvp.Value.LastFailure.HasValue
? now - kvp.Value.LastFailure.Value
: TimeSpan.Zero;
if (kvp.Value.ConsecutiveFailures >= _maxConsecutiveFailures)
{
if (timeSinceLastFailure < _staleFailureGracePeriod
|| timeSinceLastFailure >= _maxStalenessPeriod)
→ unhealthyStreams // actively failing OR stream likely dead
else
→ degradedStreams // grace window — maybe recovered quietly
}
else if (kvp.Value.ConsecutiveFailures > 0)
→ degradedStreams
}
Design note — low-count Active failures: Active streams with ConsecutiveFailures > 0 but below threshold remain Degraded until MarkActive (cursor re-opened) or RecordSuccess (document received) resets ConsecutiveFailures to 0. This is intentional: MarkActive confirms the cursor is open (the stream IS functional), low failures indicate minor hiccups rather than a dead stream, and Degraded doesn't trigger pod restarts. Escalating low-count failures would cause unnecessary restarts for streams that recovered but serve quiet collections.
Why two thresholds are needed: The health check is registered with failureStatus: HealthStatus.Unhealthy and tags: ["live"] (Program.cs:80-83). The liveness probe (values.yaml:194-200) polls /health/live every 10 seconds with no explicit failureThreshold (Kubernetes default: 3).
- Without any grace period: When
WatchAsyncitself keeps failing (cursor never opens,MarkActivenever fires),RecordFailureaccumulatesConsecutiveFailures >= 5→ Unhealthy → pod restart after 30s. This is correct for genuinely broken streams. Transient failures are protected by two mechanisms: (a) the failure threshold (5) — a few failures stay below threshold → Degraded (row 4), and (b)MarkActiveresets failures to 0 when the cursor reconnects, dropping the stream out of both thresholds entirely. - With only a grace period (no max staleness): The inner retry (
MongoContext.cs:312-315) caps at 15 retries with 180s max delay. When exhausted,Observable.Throwterminates the inner sequence (MongoContext.cs:427-428). The API subscription layer has an outer retry (24h max, 10min max interval — see "Retry architecture" below) that resubscribes, but during its backoff the stream isTerminatedand only visible as Degraded. Without max-staleness, the Terminated stream could stay Degraded indefinitely even when the outage is genuinely permanent. - With both thresholds: For Active streams with >= threshold failures,
LastFailurestays fresh during active retries (< 5 min between attempts with 180s max delay), so the stream remains Unhealthy (row 1) — this is correct, the stream IS non-functional. The 5-30 min Degraded window (row 2) is a theoretical fallback: it would only apply if retries paused without transitioning to Terminated (unlikely in practice). The 30+ min Unhealthy window (row 3) catches truly stale Active entries with no activity at all. For Terminated streams, 30+ minutes sinceOutageStartedAt(first failure in the current outage) means the persistent outage has lasted too long → Unhealthy (inner retries triggered by outer resubscriptions refreshLastFailurebut notOutageStartedAt).
Stream lifecycle awareness¶
The two-threshold model assumes every key in _streamStates represents an active stream. But _streamStates (ConcurrentDictionary) never removes entries — once a stream key is added via RecordFailure or RecordSuccess, it persists for the lifetime of the process. Streams are cached via Publish().RefCount() (MongoContext.cs:321-322): when the last subscriber disconnects, RefCount disposes the underlying cursor, but the health tracker state persists.
Additionally, the API subscription layer wraps every change stream with an outer retry (AggregateRootEntitySubscriptionManager.cs:51, MyUtils.cs:596-599: maxTotalTimeToRetry: 24h, maxRetryInterval: 10 min). When inner retries exhaust, the error propagates to this outer retry, which waits up to 10 minutes before resubscribing. The outer retry does not call RecordFailure/RecordSuccess — only the inner retry does.
Simply removing the key on stream termination (as originally proposed) would create false-healthy windows: during the outer retry's backoff (up to 10 min), the health tracker has no entry → reports Healthy, while the stream is actively broken.
Fix: Replace key removal with a phase transition. Add a StreamPhase enum to StreamHealthState:
private enum StreamPhase { Active, Terminated }
public enum TerminationReason { Error, Unsubscribed }
public enum TerminalErrorReason { MaxRetries, NonRetryable }
// Single mapping point — enum → metric tag value:
public static class TerminalErrorReasonExtensions
{
public static string ToMetricTag(this TerminalErrorReason reason) => reason switch
{
TerminalErrorReason.MaxRetries => "max_retries",
TerminalErrorReason.NonRetryable => "non_retryable",
_ => throw new ArgumentOutOfRangeException(nameof(reason), reason,
$"Unmapped TerminalErrorReason '{reason}' — add a case to ToMetricTag() and a corresponding PromQL alert rule before using new enum values")
};
}
// StreamHealthState gains Phase and Termination fields:
private sealed record StreamHealthState(
DateTime? LastSuccess = null,
DateTime? LastFailure = null,
DateTime? OutageStartedAt = null,
int ConsecutiveFailures = 0,
string? LastError = null,
StreamPhase Phase = StreamPhase.Active,
TerminationReason? Termination = null);
Visibility: TerminationReason must be public because it appears in the public void MarkTerminated(string, TerminationReason) signature and is referenced by MongoContext (a different class). TerminalErrorReason must be public because it appears in the public void RecordTerminalError(string, Exception?, TerminalErrorReason) signature and is used in RetryWithExponentialBackoff's onTerminalError callback. TerminalErrorReasonExtensions.ToMetricTag() is the single mapping point from enum values to metric tag strings — all metric emission goes through this method, preventing typo-induced label proliferation. StreamPhase stays private — it's only used within ChangeStreamHealthTracker's own AddOrUpdate delegates and GetHealthStatus(). StreamHealthState stays private — it's the internal storage format, never exposed directly.
Interface: Extract IChangeStreamHealthTracker to enable mocking in MongoContext wiring tests. MongoContext depends on the interface; ChangeStreamHealthTracker implements it:
public interface IChangeStreamHealthTracker
{
void MarkActive(string streamKey);
void MarkTerminated(string streamKey, TerminationReason reason);
void RecordSuccess(string streamKey);
void RecordFailure(string streamKey, Exception? error = null);
void RecordTerminalError(string streamKey, Exception? error = null, TerminalErrorReason reason = TerminalErrorReason.MaxRetries);
HealthCheckResult GetHealthStatus();
}
ChangeStreamHealthCheck also depends on IChangeStreamHealthTracker (for GetHealthStatus()). The interface lives in the same file as the enum and tracker class (ChangeStreamHealthTracker.cs).
DI registration (in SyrfConfigureServices.cs): Use the forwarding pattern to ensure a single tracker instance serves both interface and concrete resolvers:
// Register concrete as singleton
services.AddSingleton<ChangeStreamHealthTracker>();
// Forward interface resolution to the same instance
services.AddSingleton<IChangeStreamHealthTracker>(sp =>
sp.GetRequiredService<ChangeStreamHealthTracker>());
// Retry options — default values match current hardcoded behaviour
services.AddSingleton(new ChangeStreamRetryOptions());
This replaces the current services.AddSingleton<ChangeStreamHealthTracker>() at line 69. Without the forwarding registration, IServiceProvider.GetService<IChangeStreamHealthTracker>() would return null (or a separate instance if registered independently), splitting health state between MongoContext and the health check.
Lamar scan ordering: MongoLamarRegistry.cs:16-23 scans SyRF assemblies with WithDefaultConventions(), which auto-registers concrete types matching I{TypeName} interfaces. If Lamar's scan discovers ChangeStreamHealthTracker → IChangeStreamHealthTracker, it would create a separate singleton registration that competes with the explicit forwarding pattern above.
Implementation rule: In AddSyrfMongoServices (SyrfConfigureServices.cs:60), move the tracker forwarding registration after services.IncludeRegistry<MongoLamarRegistry>() (currently line 70). IServiceCollection uses last-wins semantics, so the explicit forwarding registration will override any scan-generated registration. The current concrete registration at line 69 must move to after line 70 and be expanded to the full forwarding pattern:
public static ServiceRegistry AddSyrfMongoServices(this ServiceRegistry services, IConfiguration configuration)
{
// ... existing setup (MongoConnectionSettings, EnsureLegacyGuidSerializer, etc.) ...
services.IncludeRegistry<MongoLamarRegistry>(); // Lamar scan first
// Forwarding registration AFTER scan — last-wins overrides any scan-generated registration
services.AddSingleton<ChangeStreamHealthTracker>();
services.AddSingleton<IChangeStreamHealthTracker>(sp =>
sp.GetRequiredService<ChangeStreamHealthTracker>());
services.AddSingleton(new ChangeStreamRetryOptions());
return services;
}
The HealthTrackerRegistration_InterfaceAndConcreteResolveSameInstance regression test (see Testing section) catches ordering regressions if someone reorders these lines.
// In ChangeStreamHealthTracker (implements IChangeStreamHealthTracker):
public void MarkActive(string streamKey)
{
// Read state BEFORE update — avoids side effects inside AddOrUpdate delegate
// (CAS loop may invoke the update factory multiple times under contention).
_streamStates.TryGetValue(streamKey, out var previous);
_streamStates.AddOrUpdate(
streamKey,
_ => new StreamHealthState(Phase: StreamPhase.Active),
(_, existing) => existing with
{
Phase = StreamPhase.Active,
ConsecutiveFailures = 0,
Termination = null,
OutageStartedAt = null
});
// Flag pending recovery for deferred emission by RecordSuccess (first document).
// Cursor-open alone is insufficient — in reconnect-fail loops, cursors open
// and immediately fail each cycle, inflating the recovery count. Deferring to
// RecordSuccess ensures the metric only fires when the stream is truly working.
// TryGetValue + post-update flag is a benign TOCTOU: worst case under
// contention is a rare extra/missed flag, acceptable for counters.
if (previous is { ConsecutiveFailures: > 0, Termination: not TerminationReason.Unsubscribed })
_pendingRecoveryMetrics[streamKey] = true;
}
public void MarkTerminated(string streamKey, TerminationReason reason)
{
_streamStates.AddOrUpdate(
streamKey,
_ => new StreamHealthState(Phase: StreamPhase.Terminated, Termination: reason),
(_, existing) => existing with
{
Phase = StreamPhase.Terminated,
Termination = reason
});
_pendingRecoveryMetrics.TryRemove(streamKey, out _); // recovery failed or unsubscribed
}
Phase transitions:
MarkActiveis the primary reactivation signal — called whenWatchAsyncsucceeds (cursor is open), regardless of document flow. ResetsConsecutiveFailures = 0,Termination = null, andOutageStartedAt = nullbecause a new cursor is a fresh start (the previous cursor's failure history is stale). UsesTryGetValueto read pre-update state (avoiding side effects inside theAddOrUpdatedelegate — see Part 2 critical note) and flags a pending recovery metric (via_pendingRecoveryMetrics[streamKey] = true) when transitioning from a failing state, suppressed when prior termination wasUnsubscribed(clean disconnect with stale failures is not a genuine outage). The metric itself is deferred toRecordSuccess(document-gated) to avoid inflating the count during reconnect-fail loops where cursors open but immediately fail. Fires on everyObservable.Deferre-execution (initial connection AND reconnections). Without this reset, a reconnected stream would retain old failures → stay Unhealthy → pod restarts in ~30s (liveness probe: 10s period, failureThreshold 3) before the 5-minute grace period kicks in.RecordFailure/RecordSuccess/RecordTerminalErroralso setPhase = Active, Termination = nullas belt-and-suspenders (they prove the inner retry is running, so the stream must be active and any prior termination state is stale).RecordFailureandRecordTerminalErrorsetOutageStartedAton the first failure (ConsecutiveFailurestransitions from 0 to 1) and preserve it on subsequent failures.RecordSuccessandMarkActiveclearOutageStartedAt(outage is over).RecordSuccessalso emits the deferred recovery metric whenMarkActiveflagged a pending recovery (see "Recovery metric — document-gated" in Part 2).RecordTerminalErrorandMarkTerminatedclear the pending recovery flag (recovery failed or stream unsubscribed).MarkTerminated(key, reason)is called by.Finally()when the inner sequence ends, with aTerminationReasondistinguishing error-termination from clean disposal:TerminationReason.Error— inner retries exhausted (outer retry will attempt recovery)TerminationReason.Unsubscribed— all subscribers disconnected viaRefCount()disposal (nobody is using this stream)
// In MongoContext.GetCachedCollectionChangeStream:
// 1. After WatchAsync succeeds (line 264), before cursor polling loop:
var cursor = await GetCollection<TAggregateRoot, TId>()
.WatchAsync(pipeline, opts, ct);
_healthTracker.MarkActive(key); // Cursor created — stream is active
// 2. After RetryWithExponentialBackoff, before Publish().RefCount():
// Do(onError:) captures whether the source errored before .Finally() fires.
// - Inner retries exhaust → error propagates → Do(onError) sets flag →
// .Finally() fires with terminatedByError = true → MarkTerminated(key, Error)
// - All subscribers leave → RefCount() disposes upstream → Publish() disposes →
// .Finally() fires with terminatedByError = false → MarkTerminated(key, Unsubscribed)
//
// IMPORTANT: terminatedByError MUST be scoped inside Observable.Defer so each
// re-execution gets its own flag. Scoping it outside would share the mutable flag
// across Publish().RefCount() subscription cycles on the cached observable.
Observable.Defer(() =>
{
var terminatedByError = false;
return Observable.Create(async (observer, ct) => { /* ... cursor polling ... */ })
.RetryWithExponentialBackoff(...)
.Do(onError: _ => terminatedByError = true)
.Finally(() =>
{
_healthTracker.MarkTerminated(key,
terminatedByError ? TerminationReason.Error : TerminationReason.Unsubscribed);
});
})
.Publish()
.RefCount()
GetHealthStatus() classifies streams by phase and termination reason. The healthReason field (included in Data["streams"]) provides diagnostic detail while standard HealthStatus drives probes:
| Phase | Termination | ConsecutiveFailures | Time condition ² | Classification | healthReason |
|---|---|---|---|---|---|
| Active | — | >= threshold | LastFailure < 5 min ago | Unhealthy | active_failing |
| Active | — | >= threshold | LastFailure 5–30 min ago | Degraded | active_recovering |
| Active | — | >= threshold | LastFailure > 30 min ago | Unhealthy | active_stale |
| Active | — | > 0, < threshold | any | Degraded | active_failing |
| Active | — | 0 | any | Healthy | healthy |
| Terminated | Error | > 0 | OutageStartedAt < 30 min ago | Degraded | terminated_retrying |
| Terminated | Error | > 0 | OutageStartedAt >= 30 min ago | Unhealthy | terminated_stale |
| Terminated | Error | 0 | any | Skipped | — ¹ |
| Terminated | Unsubscribed | any | any | Skipped | — ¹ |
¹ Skipped streams are filtered from Data["streams"] — no healthReason is emitted. The classification is internal only.
² Active rows measure time since LastFailure (recency of failure activity). Terminated rows measure time since OutageStartedAt (when the first failure in the current outage occurred) — this prevents inner retries triggered by outer resubscriptions from continuously resetting the staleness clock via RecordFailure.
This resolves:
- False-healthy window: Error-terminated streams with failures report Degraded, not invisible
- Stale disconnected subscribers: Terminated streams with 0 failures (clean disconnect) are skipped
- Unnecessary restarts from unsubscribed streams: Streams terminated by
RefCount()disposal (all clients left) are skipped regardless of failure count — nobody is using the stream, so pod restart has no benefit - Prolonged outage escalation: Error-terminated streams escalate to Unhealthy when
OutageStartedAtexceeds 30 min (staleness threshold). UsingOutageStartedAt(set on first failure) instead ofLastFailure(refreshed on every retry) ensures the staleness clock is not reset by the inner retry'sRecordFailurecalls within a single Terminated phase. However, ifWatchAsynckeeps briefly succeeding (cursor opens →MarkActiveclearsOutageStartedAt→ cursor fails → retries exhaust → Terminated), each cycle resets the clock and 30-min escalation is unreachable — Degraded is the steady state (see "Edge case" above). Escalation IS reachable whenWatchAsyncalways fails (cursor never opens,MarkActivenever fires) or when the outer retry gives up (24h timeout)
Edge case — WatchAsync succeeds but cursor immediately fails: If WatchAsync opens a cursor but MoveNextAsync throws immediately on each retry, MarkActive resets both ConsecutiveFailures to 0 and OutageStartedAt to null each cycle. The failure count never exceeds 1, so the Active two-threshold logic (threshold = 5) is never reached. The Terminated path classifies as Degraded each cycle, but OutageStartedAt also resets because MarkActive clears it on cursor creation — so the 30-min staleness escalation is unreachable during active reconnect-fail loops.
Why Degraded is correct here: The stream IS partially working (cursor opens each cycle), so pod restart is unlikely to help — the underlying issue is probably server-side. The terminal_errors OTel counter accumulates across all cycles (one increment per exhaustion), enabling Grafana alerting on persistent reconnect-fail patterns even though the liveness probe stays at Degraded. The 30-min escalation remains reachable when (a) WatchAsync always fails (cursor never opens, MarkActive never fires, OutageStartedAt persists) or (b) the outer retry eventually gives up (24h total timeout).
Recommended alert rules (Grafana Cloud, PromQL):
One inner retry exhaustion takes ~28 min with default settings (15 retries, 2s initial, 180s max: delays = 2+4+8+16+32+64+128+180×8 = 1694s before jitter). Jitter is ±10% (jitterFactor = 0.1 at MongoContext.cs:389): at max delay 180s, each attempt varies by ±18s. With 8 attempts at max delay, worst-case jitter adds ~2.4 min → ~30.4 min per exhaustion. The outer retry (MyUtils.cs:625-627) uses Math.Pow(2, retryCount) seconds capped at 10 min — so early outer backoffs are short (1s, 2s, 4s, 8s...) and only reach 10 min after ~10 cycles. Full cycle times: ~28-30 min (early, including jitter), growing toward ~40 min after many exhaustions. Alerts must account for worst-case jitter and growing backoffs.
- Sustained retry exhaustion (
max_retries):
expr: >-
sum by (stream_key, pod) (
increase(syrf_changestream_terminal_errors_total{service_name="SyRF API", deployment_environment="production", terminal_reason="max_retries"}[3h])
) >= 3
Three retry exhaustions per stream per pod in three hours means the inner retry has exhausted its budget three times on a single replica. Each exhaustion takes ~28-40 min with jitter, so 3 × 40 min = 120 min fits within the 180 min window. No for: duration needed — the 3h increase window already smooths transients. Response: investigate MongoDB connectivity/replica set health, check Atlas alerts.
- Non-retryable error (
non_retryable):
expr: >-
sum by (stream_key, pod) (
increase(syrf_changestream_terminal_errors_total{service_name="SyRF API", deployment_environment="production", terminal_reason="non_retryable"}[3h])
) >= 1
Any non-retryable error is immediately actionable — these bypass the retry logic entirely (classified by ChangeStreamRetryClassifier.IsRetryable) and fire on the first occurrence. This covers MongoConfigurationException (connection string errors), MongoAuthenticationException (credential failures), and other non-transient MongoException subtypes excluded in the classifier. Threshold is >= 1 (not >= 3) because non-retryable errors don't self-heal — one is enough to investigate. Response: check application logs for the specific exception type and lastError in health check data.
- Flapping stream:
expr: >-
sum by (stream_key, pod) (
increase(syrf_changestream_recoveries_total{service_name="SyRF API", deployment_environment="production"}[3h])
) >= 3
Three recoveries per stream per pod in three hours indicates persistent die-recover-work-die cycling — the stream genuinely processes documents between outages. Because the metric is document-gated (deferred from MarkActive to RecordSuccess), reconnect-fail loops (cursor opens but fails before documents) do NOT inflate this count — those are caught by the terminal_errors alert above. Response: check lastError in structured logs for the stream key, investigate cursor-killing operations (e.g., server-side timeouts, collection drops).
Why no for: duration: With ~28-40 min cycles (including jitter), increase(...[3h]) >= 3 is true for the duration that all 3 events remain within the sliding window. A for: clause risks the condition oscillating as events age in/out of the window — the increase range already provides sufficient smoothing.
Alert overlap: In reconnect-fail loops, only the max_retries alert (1) fires (recovery metric never emits because no documents flow). When alerts 1 and 3 fire simultaneously, it indicates genuine flapping — the stream works briefly between outages (documents flow → RecordSuccess emits recovery) then fails again. The non_retryable alert (2) fires independently — it indicates a fundamentally different problem (bad configuration, not connectivity). Alert 4 (multi-pod) co-fires with alert 1 when multiple pods independently hit their per-pod thresholds — this is expected; alert 4 adds the signal that the outage is infrastructure-wide, not pod-local.
- Cross-pod persistent failure (multi-pod companion):
expr: >-
count by (stream_key) (
sum by (stream_key, pod) (
increase(syrf_changestream_terminal_errors_total{service_name="SyRF API", deployment_environment="production", terminal_reason="max_retries"}[3h])
) >= 2
) >= 2
Fires when at least 2 pods each independently accumulate >= 2 retry exhaustions for the same stream in 3h. The inner sum by (stream_key, pod) ... >= 2 produces one row per pod that has failed at least twice; the outer count by (stream_key) ... >= 2 requires at least 2 such pods. This cannot be triggered by a single noisy pod — it structurally requires independent failures on multiple replicas, indicating an infrastructure-level issue (e.g., MongoDB primary failover, network partition) rather than a pod-local problem. Response: same as alert 1 (connectivity/replica set health), but the multi-pod pattern points to cluster-wide or MongoDB-side causes.
Alerts 1-3 page on {service_name, deployment_environment} and group by (stream_key, pod) — per-pod granularity prevents 2-3 pods hitting thresholds faster than intended from the same underlying outage (replica-sensitivity). Alert 4 uses a nested count by (stream_key) over sum by (stream_key, pod) to structurally require failures on multiple pods — it cannot be triggered by a single noisy replica. The deployment_environment="production" label matcher is mandatory on all rules — omitting it allows staging metrics to trigger production alerts. Update service_name when rolling out to PM or Quartz services. For dashboard views, sum by (stream_key) (without the count wrapper) provides a service-wide perspective. Alerts 1, 3, and 4 should fire at warning severity; alert 2 (non_retryable) should fire at critical severity since non-transient errors require immediate investigation. These are starting thresholds — tune after observing baseline rates in staging. Alert configuration is a Grafana Cloud concern (not application code) and can be adjusted without redeployment.
Reachability of Active + >= threshold: With MarkActive resetting ConsecutiveFailures = 0 on cursor creation, Active-phase streams can only accumulate >= threshold failures when WatchAsync itself keeps failing (cursor never opens, MarkActive never fires). In this case, the stream IS non-functional and escalation to Unhealthy is correct. Quiet-but-open streams (cursor opens, no documents) always have 0 failures after MarkActive → classified as Healthy.
Note: _changeStreamCache retains the Publish().RefCount() wrapper, so re-subscription triggers Observable.Defer → fresh cursor + fresh attempt = 0. The phase transition only affects health state, not the cached observable pipeline.
Retry architecture¶
The full retry chain has two tiers. Only the inner tier interacts with the health tracker:
SignalR Subscription Manager (outer)
└── RetryWithExponentialBackoff (MyUtils.cs:596-599)
maxTotalTimeToRetry: 24 hours
maxRetryInterval: 10 minutes
Does NOT call RecordFailure/RecordSuccess
└── MongoContext change stream (inner)
└── RetryWithExponentialBackoff (MongoContext.cs:312-319)
maxRetries: 15
maxDelay: 180 seconds
Calls RecordFailure (onRetry) / RecordTerminalError (onTerminalError) / RecordSuccess (onSuccess)
When inner retries exhaust → Observable.Throw terminates the inner sequence → Do(onError:) sets error flag → .Finally() calls MarkTerminated(key, Error) [clears pending recovery flag] → error propagates to the outer retry → outer retry waits (up to 10 min) → resubscribes → Observable.Defer re-executes → WatchAsync succeeds → MarkActive(key) [Phase = Active, ConsecutiveFailures = 0, Termination = null, flags pending recovery] → cursor polling begins → first document → RecordSuccess [updates LastSuccess, emits deferred Recoveries metric].
When all subscribers disconnect → RefCount() disposes upstream → Publish() disposes → .Finally() calls MarkTerminated(key, Unsubscribed) → stream is skipped in liveness classification (nobody is using it).
The outer retry is invisible to the health tracker by design — it doesn't know about ChangeStreamHealthTracker. The Terminated + Error phase (Degraded classification) bridges this gap: the stream is visibly impaired during the outer retry's backoff without triggering pod restarts. Terminated + Unsubscribed streams are skipped entirely — they represent idle cached observables with no active consumers.
Injectable retry configuration¶
Retry parameters for the inner RetryWithExponentialBackoff in MongoContext.GetCachedCollectionChangeStream are currently hardcoded (maxRetries: 15, initialDelay: 2s, maxDelay: 180s at line 312-315). Extract into an injectable options class so tests can use fast values without waiting through real delays:
// New file: src/libs/mongo/SyRF.Mongo.Common/ChangeStreamRetryOptions.cs
public class ChangeStreamRetryOptions
{
public int MaxRetries { get; init; } = 15;
public TimeSpan InitialDelay { get; init; } = TimeSpan.FromSeconds(2);
public TimeSpan MaxDelay { get; init; } = TimeSpan.FromSeconds(180);
/// <summary>Fail-fast validation — called by MongoContext constructors.</summary>
public void Validate()
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(MaxRetries);
ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(InitialDelay, TimeSpan.Zero);
ArgumentOutOfRangeException.ThrowIfLessThan(MaxDelay, InitialDelay);
}
}
Fail-fast validation: Validate() is called by both MongoContext constructors (after _retryOptions = retryOptions ?? new ChangeStreamRetryOptions(); _retryOptions.Validate();). This catches invalid values at service startup rather than at first change stream subscription — when RetryWithExponentialBackoff would throw the same ArgumentOutOfRangeException at MongoContext.cs:399. The MaxDelay >= InitialDelay guard mirrors the existing retry extension guard. Default-constructed instances always pass validation.
Intentional tightening: Validate() rejects InitialDelay = TimeSpan.Zero (ThrowIfLessThanOrEqual), while the existing retry extension guard at MongoContext.cs:398 only rejects negative values (ThrowIfLessThan(initialDelay, TimeSpan.Zero)). This is deliberate — a zero initial delay makes the exponential formula produce Math.Pow(2, n) * 0 = 0 for all attempts, collapsing all retries to instant (no backoff). The extension allows it for flexibility; ChangeStreamRetryOptions forbids it because instant retries are never appropriate for change stream reconnection.
Why InitialDelay must be configurable: RetryWithExponentialBackoff enforces maxDelay >= initialDelay (MongoContext.cs:399). Setting only MaxDelay to a small test value while InitialDelay remains at 2s would throw ArgumentOutOfRangeException.
// In MongoContext — both constructors gain the parameter:
// Protected testing constructor (line 27):
protected MongoContext(ILogger<MongoContext> logger, IResumePointRepository resumeRepo,
IChangeStreamHealthTracker healthTracker,
ChangeStreamRetryOptions? retryOptions = null)
{
...
_retryOptions = retryOptions ?? new ChangeStreamRetryOptions();
_retryOptions.Validate();
}
// Public constructor (line 38):
public MongoContext(
MongoConnectionSettings mongoConnectionSettings,
ILogger<MongoContext> logger, IResumePointRepository resumeRepo,
IChangeStreamHealthTracker healthTracker,
ChangeStreamRetryOptions? retryOptions = null)
{
...
_retryOptions = retryOptions ?? new ChangeStreamRetryOptions();
_retryOptions.Validate();
}
// In GetCachedCollectionChangeStream (line 312-315):
.RetryWithExponentialBackoff(
maxRetries: _retryOptions.MaxRetries,
initialDelay: _retryOptions.InitialDelay,
maxDelay: _retryOptions.MaxDelay,
...)
Both constructors default retryOptions to null (falling back to new ChangeStreamRetryOptions() with production defaults), keeping the parameter optional for existing callers and test subclasses. The protected constructor is used by test fixtures (e.g., MongoDbTestFixture) that bypass MongoClient creation — they can pass fast retry options directly.
DI registration: services.AddSingleton(new ChangeStreamRetryOptions()); (default values match current hardcoded behaviour — zero runtime change). Tests inject options directly via the constructor:
new ChangeStreamRetryOptions { MaxRetries = 1, InitialDelay = TimeSpan.FromMilliseconds(1), MaxDelay = TimeSpan.FromMilliseconds(1) }
Terminal error recording gap: RetryWithExponentialBackoff only calls onRetry (→ RecordFailure) for retryable exceptions (MongoContext.cs:437). Non-retryable exceptions (!shouldRetry(ex) at line 418) and max-retries-exceeded (line 425-428) throw immediately without recording. If a non-retryable error hits on the first attempt, ConsecutiveFailures stays 0, and .Finally() → Terminated + 0 → Skipped (completely invisible). Note: with the current broad shouldRetry predicate (ex is MongoException or TimeoutException or ChangeStreamClosedException), all MongoException subtypes are retried — including non-transient ones like MongoConfigurationException and MongoAuthenticationException. This makes the non_retryable path nearly unreachable. See "Retry predicate narrowing" below for the fix.
Fix: Add an onTerminalError callback to RetryWithExponentialBackoff that fires for both non-retryable exceptions AND max-retries-exceeded, before the error propagates:
// In RetryWithExponentialBackoff (.RetryWhen handler):
if (!shouldRetry(ex))
{
onTerminalError?.Invoke(ex, TerminalErrorReason.NonRetryable); // Record before propagating
return Observable.Throw<long>(ex);
}
attempt++;
if (attempt > maxRetries)
{
onTerminalError?.Invoke(ex, TerminalErrorReason.MaxRetries); // Record before propagating
return Observable.Throw<long>(ex);
}
// In MongoContext wiring (line 312-319):
.RetryWithExponentialBackoff(
...,
onRetry: (ex, attempt, delay) => _healthTracker.RecordFailure(key, ex),
onSuccess: () => _healthTracker.RecordSuccess(key),
onTerminalError: (ex, reason) => _healthTracker.RecordTerminalError(key, ex, reason))
Parameter ordering: onTerminalError should be the final optional parameter in the RetryWithExponentialBackoff extension signature (after onRetry and onSuccess), typed as Action<Exception, TerminalErrorReason>?. Using the TerminalErrorReason enum (not raw strings) prevents typo-induced label proliferation — the compiler rejects invalid values. This minimises positional-call break risk — existing callers using positional arguments for onRetry/onSuccess won't be broken by the new parameter appended at the end. Callers using named arguments are unaffected regardless of ordering.
This ensures ConsecutiveFailures > 0 when the stream terminates from any error, preventing Terminated + 0 → Skipped for genuinely failed streams. RecordTerminalError emits terminal_errors with a terminal.reason tag mapped from the TerminalErrorReason enum (not retry_failures), keeping the two metrics semantically distinct and enabling reason-specific alerting.
Retry predicate narrowing¶
The current shouldRetry predicate (ex is MongoException or TimeoutException or ChangeStreamClosedException at MongoContext.cs:316) retries all MongoException subtypes, including non-transient ones like MongoConfigurationException and MongoAuthenticationException (both inherit from MongoException). This means non_retryable only fires for exceptions outside the Mongo driver hierarchy (e.g., InvalidOperationException, NullReferenceException) — making the terminal.reason tag and the non-retryable alert (alert 2) nearly useless in practice.
Fix: Centralize the retry decision in a single static classifier method. This is the single source of truth for retry eligibility — MongoContext's shouldRetry lambda delegates to it, and tests verify each excluded type individually:
// In ChangeStreamRetryOptions.cs (alongside the options class):
public static class ChangeStreamRetryClassifier
{
/// <summary>
/// Returns true if the exception is transient and should be retried.
/// Non-transient MongoException subtypes are excluded — retrying them
/// wastes retry budget (~28 min per exhaustion cycle) and delays
/// the terminal error alert.
/// </summary>
public static bool IsRetryable(Exception ex) => ex is (MongoException
and not MongoConfigurationException
and not MongoAuthenticationException)
or TimeoutException
or ChangeStreamClosedException;
}
// In MongoContext (line 316) — delegates to classifier:
shouldRetry: ChangeStreamRetryClassifier.IsRetryable,
This retries connection failures (MongoConnectionException), replica set failovers (MongoNotPrimaryException, MongoNodeIsRecoveringException), cursor expiry, command errors, and other transient subtypes — while routing configuration and authentication errors to the non_retryable path for immediate alerting. The exclude-list pattern (broad retry, narrow exclusions) is safer than an include-list: newly added transient MongoException subtypes in driver updates are retried by default rather than silently becoming non-retryable.
MongoInternalException decision: MongoInternalException exists in the driver and indicates internal driver bugs or invariant violations (e.g., unexpected state in connection pooling). Decision: keep retryable (do NOT exclude). Rationale: internal exceptions are often transient — a connection pool race or a corrupted wire message may succeed on the next attempt with a fresh connection. Excluding it would route rare driver bugs to the non_retryable path, generating noisy alerts for conditions that typically self-heal on retry. If a MongoInternalException is truly persistent, the max_retries path catches it after exhausting the retry budget. This decision is locked by IsRetryable_ReturnsTrue_ForMongoInternalException (see Testing section).
Implementation note: The exclude list (MongoConfigurationException, MongoAuthenticationException) covers the known non-transient subtypes where retrying cannot succeed. If future driver versions introduce new non-transient subtypes, add them to the exclude list, add a corresponding test, and update the alert 2 description. Each excluded type must have a dedicated unit test (see Testing section) to lock the decision.
Part 2: OTel Metrics Instruments¶
New file: src/libs/mongo/SyRF.Mongo.Common/ChangeStreamMetrics.cs¶
Static Meter + three Counter<long> instruments. Follows the existing pattern in DiagnosticsActivityEventSubscriber.cs (static instruments from assembly metadata). No new NuGet packages — System.Diagnostics.Metrics is built into .NET 10.
public static class ChangeStreamMetrics
{
private static readonly AssemblyName AssemblyName =
typeof(ChangeStreamMetrics).Assembly.GetName();
public static readonly Meter Meter = new(
AssemblyName.Name!,
AssemblyName.Version?.ToString());
public static readonly Counter<long> RetryFailures = Meter.CreateCounter<long>(
"syrf.changestream.retry_failures",
description: "Number of individual change stream retry attempts that failed");
public static readonly Counter<long> TerminalErrors = Meter.CreateCounter<long>(
"syrf.changestream.terminal_errors",
description: "Number of change stream terminal failures (non-retryable or max retries exceeded)");
public static readonly Counter<long> Recoveries = Meter.CreateCounter<long>(
"syrf.changestream.recoveries",
description: "Number of change stream recovery events (first document after recovery)");
}
Metric semantics¶
retry_failurescounts each individual retry attempt that fails (called fromonRetrycallback inRetryWithExponentialBackoff). If a stream fails 5 times before recovering, that's 5 increments. This measures retry pressure. Does NOT include terminal errors (non-retryable or max-retries-exceeded) — those have their own counter.terminal_errorscounts each terminal failure — either a non-retryable exception or max retries exceeded (called fromonTerminalErrorcallback). Tagged withterminal.reasonviaTerminalErrorReason.ToMetricTag()— the enum produces"non_retryable"or"max_retries"through a single mapping point, preventing typo-induced label proliferation. One terminal error per outage cycle. This measures outage severity — a highterminal_errorscount withmax_retriesindicates streams are exhausting their retry budgets (connectivity/failover issues);non_retryableindicates non-transient errors excluded by the narrowedshouldRetrypredicate (e.g.,MongoConfigurationException,MongoAuthenticationException) that bypass retry logic entirely.recoveriescounts each confirmed recovery — emitted byRecordSuccess(first document) afterMarkActiveflags a pending recovery (cursor created after prior failures, suppressed when prior termination wasUnsubscribed). If a stream fails 5 times then reconnects and processes a document, that's 1 increment. This measures confirmed outage resolutions and is the primary flapping indicator. The metric is document-gated (deferred from cursor creation to first document) to avoid inflating counts during reconnect-fail loops where cursors open but immediately fail.
Recovery metric — document-gated via deferred emission: MarkActive detects recovery conditions (ConsecutiveFailures > 0, prior termination not Unsubscribed) and sets _pendingRecoveryMetrics[streamKey] = true. RecordSuccess (first document) checks _pendingRecoveryMetrics.TryRemove(streamKey, out _) and emits ChangeStreamMetrics.Recoveries only when the flag is present. This two-step pattern ensures recovery is confirmed by document flow, not just cursor creation. In reconnect-fail loops (cursor opens → MarkActive sets flag → cursor fails before documents → RecordTerminalError/MarkTerminated clears flag), the metric never fires — correctly reflecting that the stream never truly recovered. _pendingRecoveryMetrics is a ConcurrentDictionary<string, bool> alongside _streamStates, keeping metric-emission state separate from health-classification state. The TryRemove call in RecordSuccess is atomic, preventing double-emission under concurrency.
Quiet-collection trade-off: Collections that reconnect but produce no documents will not emit a recovery metric until the first document arrives. This is acceptable because: (a) health state is already correct — MarkActive resets ConsecutiveFailures immediately, so the stream classifies as Healthy; (b) terminal_errors still tracks the prior exhaustion event; © for truly quiet collections, the absence of a recovery metric has no alerting impact since there's no flapping to detect.
When inner retries exhaust, .Finally() marks the stream Terminated with TerminationReason.Error → Degraded (visible but not triggering restarts) while the outer retry (see "Retry architecture" in Part 1) attempts to resubscribe. On resubscription, MarkActive transitions the stream back to Active with 0 failures before any documents flow.
To detect flapping: high recoveries rate = stream is repeatedly failing and recovering. To assess severity: high retry_failures / recoveries ratio = many retries per outage (slow recovery).
Emit metrics from ChangeStreamHealthTracker¶
Critical: emit metrics outside the AddOrUpdate delegates — never inside them. ConcurrentDictionary.AddOrUpdate may invoke the update factory multiple times under contention (the CAS loop retries until it wins). Calling Counter.Add() inside the delegate would overcount. Similarly, avoid capturing mutable external state from within delegates (e.g., wasFailing = existing.ConsecutiveFailures > 0). Instead, use TryGetValue before AddOrUpdate to snapshot the pre-update state, then use the snapshot to decide metric emission after the update completes (see MarkActive for the flag-setting pattern and RecordSuccess for the deferred emission pattern). The TryGetValue + post-update flag/emission introduces a benign TOCTOU gap: under contention, the worst case is a rare double/missed recovery count. These counters are SLI-level indicators (trend detection, alerting on sustained patterns) — not exact audit counts. A rare ±1 under high contention does not affect dashboard accuracy or alert thresholds.
Deferred recovery emission: The Recoveries metric uses a two-step deferred pattern via _pendingRecoveryMetrics (ConcurrentDictionary<string, bool>). MarkActive sets the flag when recovery conditions are met; RecordSuccess clears it and emits the counter. RecordTerminalError and MarkTerminated clear the flag without emitting (recovery failed or stream was unsubscribed). This keeps metric-emission state separate from health-classification state (_streamStates).
- RecordFailure: call
AddOrUpdatefirst, then unconditionally incrementChangeStreamMetrics.RetryFailureswithstream.keytag. One call toRecordFailure= exactly one metric increment. - RecordTerminalError: increments
ConsecutiveFailures(ensures Terminated + 0 → Skipped doesn't hide genuinely failed streams) and emitsChangeStreamMetrics.TerminalErrorswith aterminal.reasontag mapped from theTerminalErrorReasonenum viareason.ToMetricTag()(produces"non_retryable"or"max_retries"). Separate fromRecordFailureto avoid conflating retryable and terminal failures in the same metric:
public void RecordTerminalError(string streamKey, Exception? error = null, TerminalErrorReason reason = TerminalErrorReason.MaxRetries)
{
var now = _timeProvider.GetUtcNow().UtcDateTime;
_streamStates.AddOrUpdate(
streamKey,
_ => new StreamHealthState(
ConsecutiveFailures: 1,
LastFailure: now,
OutageStartedAt: now,
LastError: error?.Message,
Phase: StreamPhase.Active),
(_, existing) => existing with
{
ConsecutiveFailures = existing.ConsecutiveFailures + 1,
LastFailure = now,
LastError = error?.Message,
Phase = StreamPhase.Active,
Termination = null,
OutageStartedAt = existing.ConsecutiveFailures == 0 ? now : existing.OutageStartedAt
});
ChangeStreamMetrics.TerminalErrors.Add(1,
new KeyValuePair<string, object?>("stream.key", streamKey),
new KeyValuePair<string, object?>("terminal.reason", reason.ToMetricTag()));
_pendingRecoveryMetrics.TryRemove(streamKey, out _); // recovery failed
}
- RecordSuccess: update
LastSuccesstimestamp and reset failures (belt-and-suspenders —MarkActivealready cleared them). Emits the deferred recovery metric whenMarkActivepreviously flagged a recovery (first document confirms the stream is truly working, not just a cursor that opened and immediately failed in a reconnect-fail loop):
public void RecordSuccess(string streamKey)
{
var now = _timeProvider.GetUtcNow().UtcDateTime;
_streamStates.AddOrUpdate(
streamKey,
_ => new StreamHealthState(LastSuccess: now, Phase: StreamPhase.Active),
(_, existing) => existing with
{
LastSuccess = now,
ConsecutiveFailures = 0,
Phase = StreamPhase.Active,
Termination = null,
OutageStartedAt = null
});
// Emit deferred recovery metric — MarkActive flagged a recovery when the
// cursor opened after prior failures. The first document confirms the stream
// is genuinely working. TryRemove is atomic: exactly one thread emits.
if (_pendingRecoveryMetrics.TryRemove(streamKey, out _))
ChangeStreamMetrics.Recoveries.Add(1,
new KeyValuePair<string, object?>("stream.key", streamKey));
}
Tag conventions: stream.key = the aggregate root's fully-qualified type name (type.FullName ?? type.Name from MongoContext.cs:221, e.g., SyRF.ProjectManagement.Core.Model.ProjectAggregate.Project). Low cardinality (~5-10 values). terminal.reason (on terminal_errors only) = "non_retryable" or "max_retries", mapped from TerminalErrorReason enum via ToMetricTag(). Low cardinality (2 values — ToMetricTag() throws ArgumentOutOfRangeException for unmapped enum members, so adding a new reason requires updating both the mapping and the alert rules).
Part 3: Wire Up OTel Metrics Pipeline¶
File: src/libs/webhostconfig/SyRF.WebHostConfig.Common/Extensions/SyrfConfigureServices.cs
Add .WithMetrics() to AddOpenTelemetryConfig() (line 111) alongside existing .WithTracing():
var environmentName = configuration["RuntimeEnvironment"] ?? "unknown";
var podName = Environment.GetEnvironmentVariable("POD_NAME") ?? "unknown";
services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService("SyRF API")
.AddAttributes(new[]
{
new KeyValuePair<string, object>("deployment.environment", environmentName),
new KeyValuePair<string, object>("k8s.pod.name", podName)
}))
.WithMetrics(metricsBuilder =>
{
metricsBuilder
.AddMeter(ChangeStreamMetrics.Meter.Name) // custom change stream metrics
.AddAspNetCoreInstrumentation() // request rate, latency, errors
.AddHttpClientInstrumentation() // outbound HTTP metrics
.AddRuntimeInstrumentation(); // GC, thread pool, memory
if (!string.IsNullOrWhiteSpace(configuration["OTEL_EXPORTER_OTLP_ENDPOINT"]))
{
metricsBuilder.AddOtlpExporter();
}
})
.WithTracing(tracerProviderBuilder =>
{
// ... existing tracing config — REMOVE .ConfigureResource() from here ...
});
Resource identity: .ConfigureResource() is moved from the .WithTracing() block (where it currently lives at line 119) to the top-level AddOpenTelemetry() call. This ensures traces and metrics share the same service.name, deployment.environment, and k8s.pod.name attributes, which are required for cross-signal correlation, environment-scoped alerting, and per-pod alert grouping in Grafana. The deployment.environment attribute is sourced from configuration["RuntimeEnvironment"] — set by the SYRF__RuntimeEnvironment env var (env-mapping runtime section, env-mapping.yaml:60), which resolves to the environment name from values.yaml (e.g., "staging", "production"). The k8s.pod.name attribute is sourced from the POD_NAME env var, populated via the Kubernetes downward API in the Helm chart (see step 4d below). These are the labels referenced as deployment_environment and pod in the PromQL alert rules (Grafana Cloud's Prometheus backend translates OTel resource attribute dots to underscores).
Required imports: Add using OpenTelemetry.Metrics; to SyrfConfigureServices.cs. The AddMeter(), AddAspNetCoreInstrumentation() (metrics overload), AddHttpClientInstrumentation() (metrics overload), AddRuntimeInstrumentation(), and AddOtlpExporter() (metrics overload) extension methods live in this namespace. using SyRF.Mongo.Common; is already present (line 16) for ChangeStreamMetrics.Meter.Name.
This activates ~50+ built-in metrics from the already-installed packages, plus the 3 custom change stream counters. ChangeStreamMetrics.Meter.Name resolves to the assembly name (SyRF.Mongo.Common) — using the constant avoids a magic string that could silently break if the assembly is renamed. The OTLP exporter only activates when OTEL_EXPORTER_OTLP_ENDPOINT is set.
Scope: Only the API service calls AddOpenTelemetryConfig(). PM and Quartz services are a follow-up task (see Future Enhancements).
Service name footgun: .AddService("SyRF API") is hardcoded for v1 (API-only). When rolling out to PM or Quartz, this must be parameterised (e.g., configuration["ServiceName"] or per-service constant) to avoid all services reporting as "SyRF API". Mislabelled service_name would break per-service alert routing and make dashboards unreliable. The PM/Quartz rollout task should include service name parameterisation as a prerequisite.
Part 4: Grafana Cloud Free Tier¶
4a. Sign up¶
Manual step: create free account at grafana.com. Navigate to the OpenTelemetry configuration page in your Grafana Cloud stack settings to generate:
- OTLP endpoint URL (e.g.,
https://otlp-gateway-prod-eu-west-2.grafana.net/otlp) - Instance ID (numeric)
- API key (for OTLP authentication)
Reference: Grafana Cloud OTLP setup docs
4b. Store credentials in GCP Secret Manager¶
Create a secret grafana-cloud-otlp in project camarades-net with keys:
endpoint: the OTLP gateway URL (e.g.,https://otlp-gateway-prod-eu-west-2.grafana.net/otlp)protocol:http/protobufheaders:Authorization=Basic <base64(instanceId:apiKey)>
Important: The headers value must use key=value format (not just the token). This is what OTEL_EXPORTER_OTLP_HEADERS expects. The base64 payload is instanceId:apiKey encoded per RFC 7617.
4c. Add ExternalSecret (cluster-gitops)¶
Create an ExternalSecret that syncs grafana-cloud-otlp from GCP Secret Manager to a Kubernetes secret. Follow the existing pattern used for sentry and elastic-apm secrets.
4d. Add OTLP env vars to Helm chart¶
File: src/charts/syrf-common/env-mapping.yaml
Add a new section following the existing patterns:
otlp:
services: [api]
displayName: "OpenTelemetry OTLP"
condition: otlp.enabled
envVars:
- name: OTEL_EXPORTER_OTLP_ENDPOINT
secretNamePath: otlp.authSecretName
secretNameDefault: grafana-cloud-otlp
secretKey: endpoint
- name: OTEL_EXPORTER_OTLP_PROTOCOL
secretNamePath: otlp.authSecretName
secretNameDefault: grafana-cloud-otlp
secretKey: protocol
- name: OTEL_EXPORTER_OTLP_HEADERS
secretNamePath: otlp.authSecretName
secretNameDefault: grafana-cloud-otlp
secretKey: headers
After modifying env-mapping.yaml, regenerate the Helm template and update service values:
This does two things: (1) regenerates src/charts/syrf-common/templates/_env-blocks.tpl from the mapping file, and (2) populates otlp.authSecretName: grafana-cloud-otlp into service chart values.yaml files. The --update-values flag is mandatory — omitting it leaves the Helm value undefined, causing an empty secret name at deploy time.
Cross-service scope: The generator iterates all service charts (api, project-management, quartz, web), but each env-mapping section has a services filter (generate-env-blocks.ts:721-725). The OTLP section above specifies services: [api], so OTLP defaults are written only to the API chart's values.yaml. However, sections without a services field (or with services: [api, project-management, quartz]) affect all .NET service charts. If you add other sections in the same generator run, those may insert defaults into charts beyond API. Run a dry-run first to preview:
Review the output, then re-run without --dry-run to apply. Verify diffs are limited to the expected chart(s).
Note: secretNameDefault is a generation-time default, not a runtime fallback. The --update-values flag writes it into the matching service chart's values.yaml, which is the value Helm reads at deploy time. Environment-specific overrides in cluster-gitops (e.g., staging.values.yaml) take precedence via Helm's values merge order.
4e. Add pod name resource attribute (Helm chart)¶
File: src/charts/syrf-common/templates/_helpers.tpl (or the service deployment template)
Add a POD_NAME env var sourced from the Kubernetes downward API. This is not a secret — it uses fieldRef to expose the pod's own name:
This populates the k8s.pod.name OTel resource attribute used in ConfigureResource() (see Part 3). The attribute surfaces as the pod label in Grafana Cloud (Prometheus backend translates k8s_pod_name → pod via standard relabeling). Alert rules 1-3 group by (stream_key, pod) and alert 4 requires per-pod counts — both depend on this label being present.
Implementation: Add to the shared _env-blocks.tpl helper or directly in each service's deployment template. Since this is a fieldRef (not a secret), it doesn't go through the env-mapping generator — add it as a static env block in the common chart template, gated on otlp.enabled (same condition as the OTLP env vars).
4f. Enable per environment (cluster-gitops)¶
# syrf/environments/staging/staging.values.yaml
otlp:
enabled: true
authSecretName: grafana-cloud-otlp # must match ExternalSecret name from step 4c
4g. Verification — Helm render¶
# Verify template renders correctly with otlp enabled
helm template test src/services/api/.chart --set otlp.enabled=true | grep OTEL
# Verify template is clean with otlp disabled (fails if any OTEL var found)
! helm template test src/services/api/.chart --set otlp.enabled=false | grep -q OTEL
Cost Analysis¶
| Component | Cost | Notes |
|---|---|---|
| Code changes (metrics instruments) | $0 | Built into .NET 10, no packages |
| In-app overhead | Negligible | Counter.Add() is nanoseconds, batched export every 60s |
| Grafana Cloud free tier | $0 | 10K active series, 14-day retention |
| OTLP export bandwidth | Negligible | A few KB every 60 seconds |
Cardinality: stream.key tag has ~5-10 unique values (one per watched collection). terminal.reason tag (on terminal_errors only) has 2 values (ToMetricTag() throws for unmapped enum members — new values require updating both the mapping and alert rules). Total active series well within free tier limits.
Files to Modify¶
| File | Change |
|---|---|
src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthTracker.cs |
Extract IChangeStreamHealthTracker interface, with expressions, emit metrics, enrich GetHealthStatus data with healthReason, two-threshold grace period, MarkActive() (flags pending recovery) + RecordSuccess() (emits deferred recovery metric) + MarkTerminated(reason) + RecordTerminalError(TerminalErrorReason) (emits terminal_errors with terminal.reason tag via ToMetricTag()) + StreamPhase + TerminationReason + TerminalErrorReason enum + TerminalErrorReasonExtensions.ToMetricTag(), _pendingRecoveryMetrics dictionary, TimeProvider injection |
src/libs/mongo/SyRF.Mongo.Common/MongoContext.cs |
Depend on IChangeStreamHealthTracker (constructor injection), inject ChangeStreamRetryOptions (see below), add MarkActive(key) after WatchAsync, Do(onError:) + .Finally() with TerminationReason before Publish().RefCount(), narrow shouldRetry via centralized IsRetryable classifier (see below), onTerminalError: Action<Exception, TerminalErrorReason> → RecordTerminalError with enum forwarding in RetryWithExponentialBackoff |
src/libs/mongo/SyRF.Mongo.Common/ChangeStreamRetryOptions.cs |
NEW — Injectable retry configuration class + ChangeStreamRetryClassifier.IsRetryable() static classifier method (single source of truth for retry eligibility) |
src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthCheck.cs |
Change constructor to depend on IChangeStreamHealthTracker instead of concrete ChangeStreamHealthTracker |
src/libs/mongo/SyRF.Mongo.Common/ChangeStreamMetrics.cs |
NEW — Meter + 3 Counter instruments (RetryFailures, TerminalErrors, Recoveries) |
src/libs/webhostconfig/SyRF.WebHostConfig.Common/Extensions/SyrfConfigureServices.cs |
Add .WithMetrics() to OTel config, update DI registration for IChangeStreamHealthTracker (see registration snippet below) |
src/libs/mongo/SyRF.Mongo.Common.Tests/ChangeStreamHealthTrackerTests.cs |
~20 new tests (including TerminationReason, healthReason, and TerminalErrorReason enum mapping tests) |
src/libs/mongo/SyRF.Mongo.Common.Tests/ChangeStreamRetryClassifierTests.cs |
NEW — 9 tests for IsRetryable() classifier (one per excluded/included exception type, including MongoNodeIsRecoveringException and MongoInternalException) |
src/libs/mongo/SyRF.Mongo.Common.Tests/RetryWithExponentialBackoffTests.cs |
2 new tests for onTerminalError callback (non-retryable and max-retries-exceeded, using TerminalErrorReason enum) |
src/libs/mongo/SyRF.Mongo.Common.Tests/ChangeStreamHealthCheckTests.cs |
No code change needed — new ChangeStreamHealthTracker() implicitly casts to IChangeStreamHealthTracker |
src/libs/mongo/SyRF.Mongo.Common.Tests/GetCachedCollectionChangeStreamTests.cs |
New tests use Mock<IChangeStreamHealthTracker>; existing tests keep concrete (implicit cast) |
src/libs/mongo/SyRF.Mongo.Common.Tests/MongoUnitOfWorkBaseTests.cs |
No code change — new ChangeStreamHealthTracker() implicitly casts to IChangeStreamHealthTracker |
src/libs/testing/SyRF.Testing.Common/Fixtures/MongoDbTestFixture.cs |
No code change — new ChangeStreamHealthTracker() implicitly casts to IChangeStreamHealthTracker |
src/libs/project-management/SyRF.ProjectManagement.Core.Tests/BsonClassMapTests.cs |
No code change — new ChangeStreamHealthTracker() implicitly casts to IChangeStreamHealthTracker |
src/libs/mongo/SyRF.Mongo.Common.Tests/SyRF.Mongo.Common.Tests.csproj |
Add Microsoft.Extensions.TimeProvider.Testing test dependency |
src/libs/webhostconfig/SyRF.WebHostConfig.Common.Tests/SyrfConfigureServicesTests.cs |
OTel wiring smoke test |
src/libs/webhostconfig/SyRF.WebHostConfig.Common.Tests/SyRF.WebHostConfig.Common.Tests.csproj |
Add OpenTelemetry.Exporter.InMemory test dependency |
src/charts/syrf-common/env-mapping.yaml |
OTLP env var mapping |
src/charts/syrf-common/templates/_env-blocks.tpl |
GENERATED — via npm run generate:env-blocks -- --update-values |
src/charts/syrf-common/templates/_env-blocks.tpl or deployment template |
Add POD_NAME env var via Kubernetes downward API fieldRef: metadata.name, gated on otlp.enabled (see step 4e) |
src/services/api/.chart/values.yaml |
UPDATED by --update-values — populates otlp.authSecretName (see Helm step in Part 3) |
CLAUDE.md |
Add OTel metrics pipeline, change stream health tracking architecture, ChangeStreamRetryOptions, and deployment.environment resource attribute to project context |
Testing¶
Unit tests¶
New test cases:
- History preservation (verify via
GetHealthStatus().Data["streams"]): RecordSuccess_PreservesLastFailureAndLastErrorRecordFailure_PreservesLastSuccess- Per-stream health data:
GetHealthStatus_IncludesPerStreamDetails_ForAllStatuses- Two-threshold grace period (inject
FakeTimeProvidervia constructor to control clock — requiresMicrosoft.Extensions.TimeProvider.Testingpackage inSyRF.Mongo.Common.Tests.csproj, see Files to Modify): GetHealthStatus_DowngradesToDegraded_WhenFailureIsStale— record enough failures to exceed_maxConsecutiveFailures, advance time past_staleFailureGracePeriodbut within_maxStalenessPeriod, verify Degraded.GetHealthStatus_ReturnsUnhealthy_WhenFailureExceedsMaxStaleness— same setup, advance time past_maxStalenessPeriod, verify Unhealthy (stream presumed dead).- Stream lifecycle and termination reason (inject
FakeTimeProvider): MarkTerminated_Error_SetsPhaseAndReason— record failures, callMarkTerminated(key, Error), verify viaGetHealthStatus().Data["streams"]thatphaseis"Terminated"andterminationis"Error".MarkTerminated_Unsubscribed_ExcludedFromStreamData— record failures, callMarkTerminated(key, Unsubscribed), verify stream is excluded fromData["streams"](Unsubscribed streams are filtered regardless of failure count).MarkActive_ClearsTerminationReason—MarkTerminated(key, Error)thenMarkActive(key), verify viaData["streams"]thatterminationisnullandphaseis"Active"(fresh cursor).GetHealthStatus_ClassifiesErrorTerminatedWithFailuresAsDegraded— record failures on a stream, callMarkTerminated(key, Error), verify overall status is Degraded (not Unhealthy, not Healthy).GetHealthStatus_SkipsErrorTerminatedWithZeroFailures— record success only, callMarkTerminated(key, Error), verify stream is excluded from health data.GetHealthStatus_SkipsUnsubscribedStreams— record failures, callMarkTerminated(key, Unsubscribed), verify stream is excluded from health data regardless of failure count.RecordFailure_ResetsPhaseToActive—MarkTerminated(key, Error)thenRecordFailure, verify phase returns toActiveandTermination = null(outer retry resubscribed).MarkActive_SetsPhaseToActiveFromTerminated—MarkTerminated(key, Error)thenMarkActive, verify phase isActive(cursor created on resubscription).GetHealthStatus_EscalatesErrorTerminatedToUnhealthy_WhenStale— record failures,MarkTerminated(key, Error), advance time past_maxStalenessPeriodfromOutageStartedAt, verify Unhealthy (outage has persisted too long).- OutageStartedAt tracking (inject
FakeTimeProvider): RecordFailure_SetsOutageStartedAt_OnFirstFailure— callRecordFailureon a fresh stream, verify viaGetHealthStatus().Data["streams"]thatoutageStartedAtequals the currentFakeTimeProvidertime.RecordFailure_PreservesOutageStartedAt_OnSubsequentFailures— callRecordFailuretwice with time advancing between calls, verify viaData["streams"]thatoutageStartedAtremains the time of the first failure (not updated by the second).MarkActive_ClearsOutageStartedAt— record failures (setsoutageStartedAt), thenMarkActive, verify viaData["streams"]thatoutageStartedAtisnull.GetHealthStatus_UsesOutageStartedAt_ForTerminatedStaleness— record failures at T=0, advance time 20 min, record more failures (refreshesLastFailureto T+20m),MarkTerminated(key, Error), advance time to T+31m.LastFailureis only 11 min old butOutageStartedAtis 31 min old — verify Unhealthy (provesOutageStartedAtis used, notLastFailure).- Deferred recovery metric emission:
RecordSuccess_EmitsDeferredRecoveryMetric— record failures,MarkActive(sets pending flag), thenRecordSuccess, verifyRecoveriesincrements exactly once. Verify a secondRecordSuccessdoes NOT increment again (flag cleared by first).RecordSuccess_DoesNotEmitRecoveryMetric_WithoutPendingFlag—RecordSuccesson a fresh stream (no priorMarkActivewith failures), verify noRecoveriesincrement.MarkActive_DoesNotEmitRecoveryMetric_AfterUnsubscribedTermination— record failures,MarkTerminated(key, Unsubscribed), thenMarkActive, thenRecordSuccess, verify noRecoveriesmetric increment (stale failures from a clean disconnect are not a genuine outage — pending flag suppressed by Unsubscribed condition).RecordTerminalError_ClearsPendingRecoveryFlag—MarkActive(sets pending flag), thenRecordTerminalError, thenRecordSuccess, verify noRecoveriesincrement (flag cleared by terminal error).MarkTerminated_ClearsPendingRecoveryFlag—MarkActive(sets pending flag), thenMarkTerminated(key, Error), then newMarkActive+RecordSuccess, verify the second cycle emits recovery (flag from first cycle was cleared, second cycle sets new flag).- healthReason field:
GetHealthStatus_IncludesHealthReasonInStreamData— verifyhealthReasonfield is present in each stream's data dictionary with correct values for Active/Healthy ("healthy"), Active/Degraded ("active_failing"), and Terminated/Error/Degraded ("terminated_retrying").- Metric emission (using
MeterListener): RecordFailure_EmitsRetryFailureMetricRecordTerminalError_EmitsTerminalErrorMetric_WithReason— callRecordTerminalError(key, ex, TerminalErrorReason.NonRetryable), verifyTerminalErrorsincrements withterminal.reason = "non_retryable"tag (mapped viaToMetricTag(), notRetryFailures). Repeat withTerminalErrorReason.MaxRetriesto verify both reason values propagate through the mappingReconnectFailLoop_DoesNotEmitRecoveryMetric— record failures,MarkActive(sets pending flag),RecordTerminalError(clears flag), verify noRecoveriesincrement (simulates reconnect-fail loop where cursor opens but fails before documents)
Test isolation: Each metric test should use a unique stream.key value (e.g., $"TestStream_{testName}") and filter in the MeterListener callback by matching the stream.key tag. This prevents cross-test interference from the global static counters. Assert only on measurements with the expected tag value.
OTel wiring test¶
Add a smoke test in SyrfConfigureServicesTests.cs to verify AddOpenTelemetryConfig() registers the metrics pipeline. Add OpenTelemetry.Exporter.InMemory Version="1.9.0" as a test-only dependency to SyRF.WebHostConfig.Common.Tests.csproj (must match existing OTel package versions in SyRF.WebHostConfig.Common.csproj:25-31).
AddOpenTelemetryConfig_RegistersAndCollectsChangeStreamMetrics— callservices.AddOpenTelemetryConfig(configuration)with a testIConfigurationcontainingCustomSentryConfig, then callservices.AddOpenTelemetry().WithMetrics(m => m.AddInMemoryExporter(exportedMetrics))separately to attach the test exporter. This works becauseAddOpenTelemetry()is additive — bothWithMetricsblocks configure the same underlyingMeterProvider. The method returnsvoid, so chaining is not possible. Build the service provider, incrementChangeStreamMetrics.RetryFailures, force a collect viaMeterProvider.ForceFlush(), then assert:exportedMetricscontains a metric namedsyrf.changestream.retry_failures(proves.AddMeter()subscription).- The exported metric's
Resourcecontainsservice.name = "SyRF API"(proves.ConfigureResource()is applied at the top-levelAddOpenTelemetry()scope, not just tracing).
This tests the full pipeline: .WithMetrics() → .AddMeter() subscription → instrument collection → resource identity → export. Unlike MeterListener (which subscribes directly to System.Diagnostics.Metrics.Meter and would pass even without .AddMeter() registration), the in-memory exporter only receives metrics from meters the OTel SDK is subscribed to. The service.name check catches drift if someone moves .ConfigureResource() back into .WithTracing(). Full end-to-end OTLP integration testing is out of scope.
DI singleton forwarding test¶
Add a regression test in SyrfConfigureServicesTests.cs to verify the forwarding registration resolves to a single instance:
HealthTrackerRegistration_InterfaceAndConcreteResolveSameInstance— register services viaAddSyrfMongoServices(configuration)(the method containing theChangeStreamHealthTrackerforwarding registration — see DI registration section above), build the service provider, then assertAssert.Same(sp.GetRequiredService<ChangeStreamHealthTracker>(), sp.GetRequiredService<IChangeStreamHealthTracker>()). This proves the forwarding pattern resolves to a single instance and prevents accidental drift where someone registersIChangeStreamHealthTrackerindependently (creating a second singleton), splitting health state between MongoContext and the health check.
RetryWithExponentialBackoff extension tests¶
The onTerminalError callback is added to RetryWithExponentialBackoff (see Part 1). Add direct unit tests for the extension method itself (in existing RetryWithExponentialBackoffTests or a new test class alongside):
NonRetryableException_InvokesOnTerminalError_WithNonRetryableReason— configureshouldRetryto returnfalse, subscribe, push an error, verifyonTerminalErroris called exactly once with(exception, TerminalErrorReason.NonRetryable), andonRetryis NOT called.MaxRetriesExceeded_InvokesOnTerminalError_WithMaxRetriesReason— setmaxRetries: 2, push 3 errors that passshouldRetry, verifyonRetryis called twice (attempts 1-2), thenonTerminalErroris called once with(exception, TerminalErrorReason.MaxRetries)on the 3rd error. VerifyonRetryis NOT called for the 3rd error.
These test the extension in isolation, ensuring the callback contract is correct before MongoContext wiring tests depend on it.
Retry classifier tests¶
Unit tests for ChangeStreamRetryClassifier.IsRetryable() — each excluded type must have a dedicated test to lock the decision. Add to ChangeStreamRetryOptionsTests or a new ChangeStreamRetryClassifierTests class:
IsRetryable_ReturnsFalse_ForMongoConfigurationException— verifyMongoConfigurationExceptionis non-retryableIsRetryable_ReturnsFalse_ForMongoAuthenticationException— verifyMongoAuthenticationExceptionis non-retryableIsRetryable_ReturnsTrue_ForMongoConnectionException— verify transient connection errors are retriedIsRetryable_ReturnsTrue_ForMongoNotPrimaryException— verify replica set failovers (primary stepdown) are retriedIsRetryable_ReturnsTrue_ForMongoNodeIsRecoveringException— verify replica set failovers (node in recovery state) are retriedIsRetryable_ReturnsTrue_ForMongoInternalException— verify internal driver errors are retried (decision: transient, self-heals on retry with fresh connection — see "MongoInternalException decision" in Part 1)IsRetryable_ReturnsTrue_ForTimeoutException— verify timeouts are retriedIsRetryable_ReturnsTrue_ForChangeStreamClosedException— verify cursor expiry is retriedIsRetryable_ReturnsFalse_ForNonMongoException— verifyInvalidOperationException(outside Mongo hierarchy) is non-retryable
These tests serve as a decision lock — adding a new exclusion requires adding a failing test first, making the exclude-list change deliberate rather than accidental.
TerminalErrorReason mapping tests¶
ToMetricTag_MapsMaxRetries—TerminalErrorReason.MaxRetries.ToMetricTag()returns"max_retries"ToMetricTag_MapsNonRetryable—TerminalErrorReason.NonRetryable.ToMetricTag()returns"non_retryable"
These lock the enum-to-metric-tag mapping — a mismatch would silently break PromQL label filters in alert rules.
Helm template validation¶
MongoContext lifecycle integration¶
The lifecycle hooks (MarkActive after WatchAsync, MarkTerminated(reason) via Do(onError:) + .Finally(), RecordFailure via onRetry, RecordTerminalError via onTerminalError, RecordSuccess via onSuccess) are wired in MongoContext's change stream pipeline. These can be tested with a mock IChangeStreamCursor<T> and a Mock<IChangeStreamHealthTracker>, avoiding the need for a real MongoDB connection.
Unit tests (in GetCachedCollectionChangeStreamTests or similar):
MarkActive_CalledAfterWatchAsyncSuccess— mock cursor that succeeds, verifymockTracker.Verify(t => t.MarkActive(key))is called before any documents are emittedMarkTerminated_CalledWithError_WhenInnerRetriesExhaust— mock cursor that always throws, injectnew ChangeStreamRetryOptions { MaxRetries = 1, InitialDelay = TimeSpan.FromMilliseconds(1), MaxDelay = TimeSpan.FromMilliseconds(1) }, verifymockTracker.Verify(t => t.MarkTerminated(key, TerminationReason.Error))MarkTerminated_CalledWithUnsubscribed_WhenRefCountDropsToZero— subscribe then dispose, verifymockTracker.Verify(t => t.MarkTerminated(key, TerminationReason.Unsubscribed))RecordTerminalError_CalledOnNonRetryableException— mock cursor that throwsMongoConfigurationException(excluded byChangeStreamRetryClassifier.IsRetryable), verifymockTracker.Verify(t => t.RecordTerminalError(key, It.IsAny<Exception>(), TerminalErrorReason.NonRetryable))UnsubscribedStreams_NotClassifiedForLiveness— end-to-end with real tracker: cursor with failures → unsubscribe → verify health check skips the streamConstructor_ThrowsOnInvalidRetryOptions— passnew ChangeStreamRetryOptions { MaxRetries = 0 }(orInitialDelay > MaxDelay), verifyArgumentOutOfRangeExceptionis thrown at construction time (fail-fast validation)
The mock cursor approach: create a Mock<IAsyncCursor<ChangeStreamDocument<T>>> that can be programmed to emit documents (MoveNextAsync returns true, Current returns batch), throw exceptions, or complete. MongoContext depends on IChangeStreamHealthTracker and ChangeStreamRetryOptions (both injected via constructor), so the mock tracker and fast retry options (MaxRetries = 1, InitialDelay = 1ms, MaxDelay = 1ms) are injected directly via the protected testing constructor — no factory override needed.
Complementary verification (non-unit):
- Code review: Confirm
Do(onError:)is positioned betweenRetryWithExponentialBackoffand.Finally(), andterminatedByErroris scoped insideObservable.Defer(one flag per re-execution, not shared across RefCount cycles) - Staging smoke test: Deploy to staging, trigger a change stream reconnection, then observe metrics in Grafana. Check
/health/livefor overall status transition (Healthy → Degraded/Unhealthy → Healthy) — per-stream detail is inHealthCheckResult.Databut not surfaced in the HTTP response until the response writer is updated (see Future Enhancements). To inspect per-stream data during staging, useHealthCheckServiceprogrammatic access or structured logging ofGetHealthStatus()results. Trigger options (in order of preference): - Atlas Test Failover (managed Atlas): In the Atlas UI, navigate to the cluster → click "..." → "Test Failover". This triggers a primary stepdown safely within Atlas's managed environment and is available on M10+ tiers. Alternatively, use the Atlas Admin API:
POST /api/atlas/v2/groups/{groupId}/clusters/{clusterName}/restartPrimaries replSetStepDown(self-managed or Atlas with admin access):db.adminCommand({replSetStepDown: 60}). This may be unavailable on managed Atlas clusters where admin commands are restricted- Application-level cursor invalidation: Drop and recreate a watched collection in a staging-only database (requires staging DB isolation — see MongoDB Testing Strategy). This invalidates the cursor without requiring replica set admin access
Integration verification¶
After deploying to staging with OTLP enabled, built-in metrics should appear in Grafana Cloud's Explore view within ~60 seconds:
http_server_request_duration_seconds— built-in ASP.NET Coreprocess_runtime_dotnet_gc_collections_count— built-in runtime
Custom counters are event-driven — they emit only when a change stream failure, terminal error, or recovery event occurs. These series will NOT appear in Grafana until at least one event fires. To verify custom counter wiring on first deploy, induce a change stream failure (see staging smoke test above), then look for:
syrf_changestream_retry_failures_total— custom counter (retryable failures)syrf_changestream_terminal_errors_total— custom counter (non-retryable or max-retries-exceeded)syrf_changestream_recoveries_total— custom counter (document-gated recovery events)
Metric name translation: The OTel SDK emits syrf.changestream.retry_failures, syrf.changestream.terminal_errors, and syrf.changestream.recoveries (dot notation). Grafana Cloud's Prometheus backend translates these to syrf_changestream_retry_failures_total, syrf_changestream_terminal_errors_total, and syrf_changestream_recoveries_total (underscores + _total suffix for counters). If using a different backend (e.g., OTLP-native), metric names may appear in their original dot notation. Use flexible matching when building dashboards.
Label validation (first deploy): Before finalising alert rules, verify actual label keys in Grafana Cloud's Explore view. The PromQL queries assume stream_key (from counter tag stream.key), terminal_reason (from counter tag terminal.reason, values produced by TerminalErrorReason.ToMetricTag(), on terminal_errors only), service_name (from resource attribute service.name), deployment_environment (from resource attribute deployment.environment), and pod (from resource attribute k8s.pod.name, populated via the POD_NAME downward API env var added in step 4e). All five labels are populated by committed implementation steps — no runtime detection or fallback is needed. Grafana Cloud's Prometheus backend translates OTel attribute dots to underscores and lowercases — but other backends may differ. After the first staging deploy, run {__name__=~"syrf_changestream.*"} in Explore and inspect the actual label names on the returned series. Update alert PromQL if labels differ from the assumed names.
Future Enhancements¶
Once the metrics pipeline is established, additional metrics can be added with minimal effort:
- PM and Quartz OTel config: Wire up
AddOpenTelemetryConfig()for PM and Quartz services, expand env-mappingserviceslist - Health endpoint enrichment: Update
WriteHealthResponseWithVersionto include change-stream per-stream data in the/health/liveHTTP response. Requires error message redaction before HTTP exposure - Change stream document throughput: Counter per collection per event type
- MassTransit message processing: Consumer rate and latency
- MongoDB query duration: Histogram per collection per operation
- Business metrics: Screening rate, annotation rate, export throughput
- Cursor-level recovery metric: The
recoveriescounter is document-gated (deferred toRecordSuccess) to avoid inflating counts during reconnect-fail loops. This means quiet collections that reconnect without document flow don't emit recovery metrics until data arrives. If cursor-level recovery visibility is needed for quiet collections, add a separatesyrf.changestream.cursor_recoveriescounter emitted directly byMarkActive— this provides a complete picture at the cost of noisy counts during reconnect-fail loops. The two metrics together distinguish confirmed recoveries (document-gated) from cursor reopens (cursor-gated)