Skip to content

Feature: Change Stream OTel Metrics & Health Tracker Improvements

Summary

Fix the ChangeStreamHealthTracker to preserve diagnostic information after recovery, add per-stream detail to the health check data, and add OpenTelemetry metrics for time-series observability. Wire up the dormant OTel metrics pipeline and connect it to Grafana Cloud free tier.

Problem Statement

Health Tracker Wipes Diagnostic Info

RecordSuccess creates a brand-new StreamHealthState, wiping LastFailure, LastError, and any context about what just happened. RecordFailure similarly wipes LastSuccess. After recovery, there's zero visibility into the preceding failure.

Root cause: Both methods use new StreamHealthState(...) instead of with expressions.

Additionally, GetHealthStatus() only returns stream names in its data dictionary — not per-stream details like timestamps or error messages. Even after fixing the with issue, the preserved state isn't surfaced anywhere operationally. The /health/live response writer only extracts data from the "version" health check entry, so change-stream data would need a separate endpoint or response writer update to be visible via HTTP.

No Time-Series Observability

The health check only answers "is it alive now?" (binary healthy/degraded/unhealthy for Kubernetes). There's no way to see:

  • How many times a stream has failed and recovered over hours/days
  • Whether a stream is flapping (repeated fail-recover cycles)
  • Trends in failure rates across deployments
  • Gradual degradation before it becomes an outage

Partially Dormant OTel Infrastructure

The project has all OTel metrics instrumentation packages installed in WebHostConfig.Common.csproj but only tracing is wired up:

Package Status What it provides
OpenTelemetry.Instrumentation.Runtime Installed, not wired GC pressure, memory, thread pool, exceptions
OpenTelemetry.Instrumentation.AspNetCore Installed, wired for tracing only Request rate, latency histograms, error rates
OpenTelemetry.Instrumentation.Http Installed, wired for tracing only Outbound HTTP call metrics (Atlas, Auth0, AWS)
OpenTelemetry.Exporter.OpenTelemetryProtocol Installed, not wired OTLP export to any compatible backend

The active AddOpenTelemetryConfig() method in SyrfConfigureServices.cs only calls .WithTracing() — there is no .WithMetrics(). A separate ConfigureOpenTelemetry() method in HostExtensions.cs does include .WithMetrics() but it is unused by any service — it was added as part of an Aspire-style pattern that was never activated.

Service coverage: Only the API service calls AddOpenTelemetryConfig(). The PM and Quartz services do not — this is a pre-existing gap.


Solution Overview

Part Description Scope
1 Fix health tracker with expressions + enrich health data ChangeStreamHealthTracker.cs
2 Add OTel metrics instruments New ChangeStreamMetrics.cs
3 Wire up OTel metrics pipeline (API service) SyrfConfigureServices.cs
4 Connect Grafana Cloud free tier Helm chart + cluster-gitops

Design Decisions

Why both health tracker fix AND OTel metrics (not one or the other)?

They serve different purposes:

Concern Health tracker OTel metrics
Kubernetes liveness probes Yes — drives pod restarts No
Preserve failure context after recovery Yes — the with fix No (counters don't store "last error")
Time-series trends and alerting No Yes
Survive pod restarts No (in-memory) Yes (exported to backend)
Detect flapping over hours/days No Yes

Why NOT add TotalFailures/TotalRecoveries to the health state?

OTel counters handle this better — they persist in the metrics backend, survive pod restarts, and support proper time-series queries and alerting. Adding lifetime counters to the in-memory health state would be redundant.

Why Grafana Cloud free tier?

  • 10K active series, 14-day retention — well within SyRF's scale
  • Zero cost, no credit card required
  • Native OTLP support
  • Built-in dashboards and alerting
  • Sentry (current APM) does NOT support OTel metrics
  • Elastic APM (installed but disabled) is an alternative if subscription is still active

v1 scope: API service only

Only the API service calls AddOpenTelemetryConfig(). PM and Quartz do not — wiring them up is a follow-up task. The OTLP env var mapping targets API only for v1 to stay consistent.


Part 1: Fix RecordSuccess/RecordFailure + Enrich Health Data

File: src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthTracker.cs

RecordSuccess (line 29)

Before (wipes LastFailure, LastError):

(_, _) => new StreamHealthState(LastSuccess: DateTime.UtcNow)

After (preserves failure history, uses _timeProvider):

var now = _timeProvider.GetUtcNow().UtcDateTime;
// ...
(_, existing) => existing with
{
    LastSuccess = now,
    ConsecutiveFailures = 0,
    Phase = StreamPhase.Active,
    Termination = null,
    OutageStartedAt = null
}

RecordFailure (line 46)

Before (wipes LastSuccess):

(_, existing) => new StreamHealthState(
    ConsecutiveFailures: existing.ConsecutiveFailures + 1,
    LastFailure: DateTime.UtcNow,
    LastError: error?.Message)

After (preserves success history, uses _timeProvider):

var now = _timeProvider.GetUtcNow().UtcDateTime;
// ...
(_, existing) => existing with
{
    ConsecutiveFailures = existing.ConsecutiveFailures + 1,
    LastFailure = now,
    LastError = error?.Message,
    Phase = StreamPhase.Active,
    Termination = null,
    OutageStartedAt = existing.ConsecutiveFailures == 0 ? now : existing.OutageStartedAt
}

Enrich GetHealthStatus with per-stream details

Add a streams entry to HealthCheckResult.Data for all three return paths (Healthy, Degraded, Unhealthy):

// Only include streams that participate in health classification
// Excluded: clean disconnects (0 failures) and unsubscribed streams (nobody using them)
var evaluatedStreams = _streamStates
    .Where(kvp => !(kvp.Value.Phase == StreamPhase.Terminated
                  && (kvp.Value.ConsecutiveFailures == 0
                      || kvp.Value.Termination == TerminationReason.Unsubscribed)));

data["streams"] = evaluatedStreams.ToDictionary(
    kvp => kvp.Key,
    kvp => (object)new Dictionary<string, object?>
    {
        ["consecutiveFailures"] = kvp.Value.ConsecutiveFailures,
        ["lastSuccess"] = kvp.Value.LastSuccess,
        ["lastFailure"] = kvp.Value.LastFailure,
        ["lastError"] = kvp.Value.LastError,
        ["phase"] = kvp.Value.Phase.ToString(),
        ["termination"] = kvp.Value.Termination?.ToString(),
        ["outageStartedAt"] = kvp.Value.OutageStartedAt,
        ["healthReason"] = ComputeHealthReason(kvp.Value, now)
    });

Each key is the stream key (fully-qualified type name), each value is a Dictionary<string, object?> with eight fields. The phase field ("Active" or "Terminated") identifies stream lifecycle state. The termination field is null for Active-phase streams and "Error" or "Unsubscribed" for Terminated-phase streams (though Unsubscribed streams are filtered out — see below). The outageStartedAt field is the timestamp when the current outage began (first failure after recovery) — null when the stream is healthy. The healthReason field provides a diagnostic classification (e.g., "active_failing", "terminated_retrying", "healthy") covering the 7 non-filtered classification table rows — this gives programmatic consumers richer context than the standard HealthStatus enum while keeping probes simple. Six possible values: healthy, active_failing, active_recovering, active_stale, terminated_retrying, terminated_stale. (active_failing covers both below-threshold and at/above-threshold-within-grace-period cases.) Terminated streams with zero failures (clean disconnect) and unsubscribed streams (all clients left via RefCount() disposal) are filtered out — they have no diagnostic value. Using an explicit dictionary (rather than anonymous objects) ensures tests can assert against individual fields without reflection or dynamic. StreamHealthState is already accessible within GetHealthStatus (private nested record in the same class).

Note: The current /health/live response writer (SyrfHealthCheckExtensions.WriteHealthResponseWithVersion) only extracts data from the "version" entry. Per-stream details will be present in HealthCheckResult.Data for programmatic consumers but won't appear in the HTTP response until the response writer is updated. That's a separate follow-up task.

Security consideration: lastError contains raw Exception.Message text which may include connection strings or internal hostnames. For the HealthCheckResult.Data dictionary (programmatic consumers only, not HTTP-exposed), this is acceptable for v1. When the /health/live response writer is updated to surface per-stream data (see Future Enhancements), error messages must be sanitised — e.g., truncate to type name + first 100 characters, or expose only Exception.GetType().Name.

Stale-failure grace period (two-threshold model)

To prevent unnecessary pod restarts from document-gated recovery (see Part 2 caveat) while still detecting genuinely dead streams, GetHealthStatus() applies two time thresholds against LastFailure:

Timeline after last failure:
[0 – 5 min)         → Unhealthy  (actively failing or just stopped)
[5 min – 30 min)    → Degraded   (maybe recovered quietly, give it time)
[30 min+)           → Unhealthy  (no activity at all — stream likely dead)

If a document arrives at any point, RecordSuccess resets ConsecutiveFailures to 0, dropping the stream out of both thresholds entirely.

// Constructor — TimeSpan defaults are not compile-time constants,
// so use nullable + null-coalescing pattern.
// TimeProvider is the single clock source for all timestamps
// (RecordSuccess, RecordFailure, GetHealthStatus). Default
// TimeProvider.System for production; inject FakeTimeProvider in tests.
public ChangeStreamHealthTracker(
    int maxConsecutiveFailures = 5,
    TimeSpan? staleFailureGracePeriod = null,
    TimeSpan? maxStalenessPeriod = null,
    TimeProvider? timeProvider = null)
{
    _maxConsecutiveFailures = maxConsecutiveFailures;
    _staleFailureGracePeriod = staleFailureGracePeriod ?? TimeSpan.FromMinutes(5);
    _maxStalenessPeriod = maxStalenessPeriod ?? TimeSpan.FromMinutes(30);
    _timeProvider = timeProvider ?? TimeProvider.System;
}

// In GetHealthStatus(), when classifying streams:
var now = _timeProvider.GetUtcNow().UtcDateTime;

foreach (var kvp in _streamStates)
{
    // Terminated streams
    if (kvp.Value.Phase == StreamPhase.Terminated)
    {
        // Unsubscribed streams: skip liveness entirely — nobody is using this stream,
        // so pod restart has no benefit. Stale failures from the old session are irrelevant.
        if (kvp.Value.Termination == TerminationReason.Unsubscribed)
            continue;

        // Error-terminated streams: classify by outage duration (not LastFailure,
        // which refreshes on every outer retry cycle's inner failures)
        if (kvp.Value.ConsecutiveFailures > 0)
        {
            var timeSinceOutageStart = kvp.Value.OutageStartedAt.HasValue
                ? now - kvp.Value.OutageStartedAt.Value
                : TimeSpan.Zero;
            if (timeSinceOutageStart >= _maxStalenessPeriod)
                 unhealthyStreams   // outage has persisted too long
            else
                 degradedStreams    // outer retry may still recover
        }
        // else: error-terminated with 0 failures (clean disconnect) — skip
        continue;  // Skip Active classification for all Terminated streams
    }

    // Active streams: two-threshold classification
    var timeSinceLastFailure = kvp.Value.LastFailure.HasValue
        ? now - kvp.Value.LastFailure.Value
        : TimeSpan.Zero;

    if (kvp.Value.ConsecutiveFailures >= _maxConsecutiveFailures)
    {
        if (timeSinceLastFailure < _staleFailureGracePeriod
            || timeSinceLastFailure >= _maxStalenessPeriod)
             unhealthyStreams   // actively failing OR stream likely dead
        else
             degradedStreams    // grace window — maybe recovered quietly
    }
    else if (kvp.Value.ConsecutiveFailures > 0)
         degradedStreams
}

Design note — low-count Active failures: Active streams with ConsecutiveFailures > 0 but below threshold remain Degraded until MarkActive (cursor re-opened) or RecordSuccess (document received) resets ConsecutiveFailures to 0. This is intentional: MarkActive confirms the cursor is open (the stream IS functional), low failures indicate minor hiccups rather than a dead stream, and Degraded doesn't trigger pod restarts. Escalating low-count failures would cause unnecessary restarts for streams that recovered but serve quiet collections.

Why two thresholds are needed: The health check is registered with failureStatus: HealthStatus.Unhealthy and tags: ["live"] (Program.cs:80-83). The liveness probe (values.yaml:194-200) polls /health/live every 10 seconds with no explicit failureThreshold (Kubernetes default: 3).

  • Without any grace period: When WatchAsync itself keeps failing (cursor never opens, MarkActive never fires), RecordFailure accumulates ConsecutiveFailures >= 5 → Unhealthy → pod restart after 30s. This is correct for genuinely broken streams. Transient failures are protected by two mechanisms: (a) the failure threshold (5) — a few failures stay below threshold → Degraded (row 4), and (b) MarkActive resets failures to 0 when the cursor reconnects, dropping the stream out of both thresholds entirely.
  • With only a grace period (no max staleness): The inner retry (MongoContext.cs:312-315) caps at 15 retries with 180s max delay. When exhausted, Observable.Throw terminates the inner sequence (MongoContext.cs:427-428). The API subscription layer has an outer retry (24h max, 10min max interval — see "Retry architecture" below) that resubscribes, but during its backoff the stream is Terminated and only visible as Degraded. Without max-staleness, the Terminated stream could stay Degraded indefinitely even when the outage is genuinely permanent.
  • With both thresholds: For Active streams with >= threshold failures, LastFailure stays fresh during active retries (< 5 min between attempts with 180s max delay), so the stream remains Unhealthy (row 1) — this is correct, the stream IS non-functional. The 5-30 min Degraded window (row 2) is a theoretical fallback: it would only apply if retries paused without transitioning to Terminated (unlikely in practice). The 30+ min Unhealthy window (row 3) catches truly stale Active entries with no activity at all. For Terminated streams, 30+ minutes since OutageStartedAt (first failure in the current outage) means the persistent outage has lasted too long → Unhealthy (inner retries triggered by outer resubscriptions refresh LastFailure but not OutageStartedAt).

Stream lifecycle awareness

The two-threshold model assumes every key in _streamStates represents an active stream. But _streamStates (ConcurrentDictionary) never removes entries — once a stream key is added via RecordFailure or RecordSuccess, it persists for the lifetime of the process. Streams are cached via Publish().RefCount() (MongoContext.cs:321-322): when the last subscriber disconnects, RefCount disposes the underlying cursor, but the health tracker state persists.

Additionally, the API subscription layer wraps every change stream with an outer retry (AggregateRootEntitySubscriptionManager.cs:51, MyUtils.cs:596-599: maxTotalTimeToRetry: 24h, maxRetryInterval: 10 min). When inner retries exhaust, the error propagates to this outer retry, which waits up to 10 minutes before resubscribing. The outer retry does not call RecordFailure/RecordSuccess — only the inner retry does.

Simply removing the key on stream termination (as originally proposed) would create false-healthy windows: during the outer retry's backoff (up to 10 min), the health tracker has no entry → reports Healthy, while the stream is actively broken.

Fix: Replace key removal with a phase transition. Add a StreamPhase enum to StreamHealthState:

private enum StreamPhase { Active, Terminated }
public enum TerminationReason { Error, Unsubscribed }
public enum TerminalErrorReason { MaxRetries, NonRetryable }

// Single mapping point — enum → metric tag value:
public static class TerminalErrorReasonExtensions
{
    public static string ToMetricTag(this TerminalErrorReason reason) => reason switch
    {
        TerminalErrorReason.MaxRetries => "max_retries",
        TerminalErrorReason.NonRetryable => "non_retryable",
        _ => throw new ArgumentOutOfRangeException(nameof(reason), reason,
            $"Unmapped TerminalErrorReason '{reason}' — add a case to ToMetricTag() and a corresponding PromQL alert rule before using new enum values")
    };
}

// StreamHealthState gains Phase and Termination fields:
private sealed record StreamHealthState(
    DateTime? LastSuccess = null,
    DateTime? LastFailure = null,
    DateTime? OutageStartedAt = null,
    int ConsecutiveFailures = 0,
    string? LastError = null,
    StreamPhase Phase = StreamPhase.Active,
    TerminationReason? Termination = null);

Visibility: TerminationReason must be public because it appears in the public void MarkTerminated(string, TerminationReason) signature and is referenced by MongoContext (a different class). TerminalErrorReason must be public because it appears in the public void RecordTerminalError(string, Exception?, TerminalErrorReason) signature and is used in RetryWithExponentialBackoff's onTerminalError callback. TerminalErrorReasonExtensions.ToMetricTag() is the single mapping point from enum values to metric tag strings — all metric emission goes through this method, preventing typo-induced label proliferation. StreamPhase stays private — it's only used within ChangeStreamHealthTracker's own AddOrUpdate delegates and GetHealthStatus(). StreamHealthState stays private — it's the internal storage format, never exposed directly.

Interface: Extract IChangeStreamHealthTracker to enable mocking in MongoContext wiring tests. MongoContext depends on the interface; ChangeStreamHealthTracker implements it:

public interface IChangeStreamHealthTracker
{
    void MarkActive(string streamKey);
    void MarkTerminated(string streamKey, TerminationReason reason);
    void RecordSuccess(string streamKey);
    void RecordFailure(string streamKey, Exception? error = null);
    void RecordTerminalError(string streamKey, Exception? error = null, TerminalErrorReason reason = TerminalErrorReason.MaxRetries);
    HealthCheckResult GetHealthStatus();
}

ChangeStreamHealthCheck also depends on IChangeStreamHealthTracker (for GetHealthStatus()). The interface lives in the same file as the enum and tracker class (ChangeStreamHealthTracker.cs).

DI registration (in SyrfConfigureServices.cs): Use the forwarding pattern to ensure a single tracker instance serves both interface and concrete resolvers:

// Register concrete as singleton
services.AddSingleton<ChangeStreamHealthTracker>();
// Forward interface resolution to the same instance
services.AddSingleton<IChangeStreamHealthTracker>(sp =>
    sp.GetRequiredService<ChangeStreamHealthTracker>());
// Retry options — default values match current hardcoded behaviour
services.AddSingleton(new ChangeStreamRetryOptions());

This replaces the current services.AddSingleton<ChangeStreamHealthTracker>() at line 69. Without the forwarding registration, IServiceProvider.GetService<IChangeStreamHealthTracker>() would return null (or a separate instance if registered independently), splitting health state between MongoContext and the health check.

Lamar scan ordering: MongoLamarRegistry.cs:16-23 scans SyRF assemblies with WithDefaultConventions(), which auto-registers concrete types matching I{TypeName} interfaces. If Lamar's scan discovers ChangeStreamHealthTrackerIChangeStreamHealthTracker, it would create a separate singleton registration that competes with the explicit forwarding pattern above.

Implementation rule: In AddSyrfMongoServices (SyrfConfigureServices.cs:60), move the tracker forwarding registration after services.IncludeRegistry<MongoLamarRegistry>() (currently line 70). IServiceCollection uses last-wins semantics, so the explicit forwarding registration will override any scan-generated registration. The current concrete registration at line 69 must move to after line 70 and be expanded to the full forwarding pattern:

public static ServiceRegistry AddSyrfMongoServices(this ServiceRegistry services, IConfiguration configuration)
{
    // ... existing setup (MongoConnectionSettings, EnsureLegacyGuidSerializer, etc.) ...
    services.IncludeRegistry<MongoLamarRegistry>();  // Lamar scan first
    // Forwarding registration AFTER scan — last-wins overrides any scan-generated registration
    services.AddSingleton<ChangeStreamHealthTracker>();
    services.AddSingleton<IChangeStreamHealthTracker>(sp =>
        sp.GetRequiredService<ChangeStreamHealthTracker>());
    services.AddSingleton(new ChangeStreamRetryOptions());
    return services;
}

The HealthTrackerRegistration_InterfaceAndConcreteResolveSameInstance regression test (see Testing section) catches ordering regressions if someone reorders these lines.

// In ChangeStreamHealthTracker (implements IChangeStreamHealthTracker):
public void MarkActive(string streamKey)
{
    // Read state BEFORE update — avoids side effects inside AddOrUpdate delegate
    // (CAS loop may invoke the update factory multiple times under contention).
    _streamStates.TryGetValue(streamKey, out var previous);

    _streamStates.AddOrUpdate(
        streamKey,
        _ => new StreamHealthState(Phase: StreamPhase.Active),
        (_, existing) => existing with
        {
            Phase = StreamPhase.Active,
            ConsecutiveFailures = 0,
            Termination = null,
            OutageStartedAt = null
        });

    // Flag pending recovery for deferred emission by RecordSuccess (first document).
    // Cursor-open alone is insufficient — in reconnect-fail loops, cursors open
    // and immediately fail each cycle, inflating the recovery count. Deferring to
    // RecordSuccess ensures the metric only fires when the stream is truly working.
    // TryGetValue + post-update flag is a benign TOCTOU: worst case under
    // contention is a rare extra/missed flag, acceptable for counters.
    if (previous is { ConsecutiveFailures: > 0, Termination: not TerminationReason.Unsubscribed })
        _pendingRecoveryMetrics[streamKey] = true;
}

public void MarkTerminated(string streamKey, TerminationReason reason)
{
    _streamStates.AddOrUpdate(
        streamKey,
        _ => new StreamHealthState(Phase: StreamPhase.Terminated, Termination: reason),
        (_, existing) => existing with
        {
            Phase = StreamPhase.Terminated,
            Termination = reason
        });
    _pendingRecoveryMetrics.TryRemove(streamKey, out _);  // recovery failed or unsubscribed
}

Phase transitions:

  • MarkActive is the primary reactivation signal — called when WatchAsync succeeds (cursor is open), regardless of document flow. Resets ConsecutiveFailures = 0, Termination = null, and OutageStartedAt = null because a new cursor is a fresh start (the previous cursor's failure history is stale). Uses TryGetValue to read pre-update state (avoiding side effects inside the AddOrUpdate delegate — see Part 2 critical note) and flags a pending recovery metric (via _pendingRecoveryMetrics[streamKey] = true) when transitioning from a failing state, suppressed when prior termination was Unsubscribed (clean disconnect with stale failures is not a genuine outage). The metric itself is deferred to RecordSuccess (document-gated) to avoid inflating the count during reconnect-fail loops where cursors open but immediately fail. Fires on every Observable.Defer re-execution (initial connection AND reconnections). Without this reset, a reconnected stream would retain old failures → stay Unhealthy → pod restarts in ~30s (liveness probe: 10s period, failureThreshold 3) before the 5-minute grace period kicks in.
  • RecordFailure/RecordSuccess/RecordTerminalError also set Phase = Active, Termination = null as belt-and-suspenders (they prove the inner retry is running, so the stream must be active and any prior termination state is stale). RecordFailure and RecordTerminalError set OutageStartedAt on the first failure (ConsecutiveFailures transitions from 0 to 1) and preserve it on subsequent failures. RecordSuccess and MarkActive clear OutageStartedAt (outage is over). RecordSuccess also emits the deferred recovery metric when MarkActive flagged a pending recovery (see "Recovery metric — document-gated" in Part 2). RecordTerminalError and MarkTerminated clear the pending recovery flag (recovery failed or stream unsubscribed).
  • MarkTerminated(key, reason) is called by .Finally() when the inner sequence ends, with a TerminationReason distinguishing error-termination from clean disposal:
  • TerminationReason.Error — inner retries exhausted (outer retry will attempt recovery)
  • TerminationReason.Unsubscribed — all subscribers disconnected via RefCount() disposal (nobody is using this stream)
// In MongoContext.GetCachedCollectionChangeStream:

// 1. After WatchAsync succeeds (line 264), before cursor polling loop:
var cursor = await GetCollection<TAggregateRoot, TId>()
    .WatchAsync(pipeline, opts, ct);
_healthTracker.MarkActive(key);  // Cursor created — stream is active

// 2. After RetryWithExponentialBackoff, before Publish().RefCount():
//    Do(onError:) captures whether the source errored before .Finally() fires.
//    - Inner retries exhaust → error propagates → Do(onError) sets flag →
//      .Finally() fires with terminatedByError = true → MarkTerminated(key, Error)
//    - All subscribers leave → RefCount() disposes upstream → Publish() disposes →
//      .Finally() fires with terminatedByError = false → MarkTerminated(key, Unsubscribed)
//
//    IMPORTANT: terminatedByError MUST be scoped inside Observable.Defer so each
//    re-execution gets its own flag. Scoping it outside would share the mutable flag
//    across Publish().RefCount() subscription cycles on the cached observable.
Observable.Defer(() =>
{
    var terminatedByError = false;
    return Observable.Create(async (observer, ct) => { /* ... cursor polling ... */ })
        .RetryWithExponentialBackoff(...)
        .Do(onError: _ => terminatedByError = true)
        .Finally(() =>
        {
            _healthTracker.MarkTerminated(key,
                terminatedByError ? TerminationReason.Error : TerminationReason.Unsubscribed);
        });
})
.Publish()
.RefCount()

GetHealthStatus() classifies streams by phase and termination reason. The healthReason field (included in Data["streams"]) provides diagnostic detail while standard HealthStatus drives probes:

Phase Termination ConsecutiveFailures Time condition ² Classification healthReason
Active >= threshold LastFailure < 5 min ago Unhealthy active_failing
Active >= threshold LastFailure 5–30 min ago Degraded active_recovering
Active >= threshold LastFailure > 30 min ago Unhealthy active_stale
Active > 0, < threshold any Degraded active_failing
Active 0 any Healthy healthy
Terminated Error > 0 OutageStartedAt < 30 min ago Degraded terminated_retrying
Terminated Error > 0 OutageStartedAt >= 30 min ago Unhealthy terminated_stale
Terminated Error 0 any Skipped — ¹
Terminated Unsubscribed any any Skipped — ¹

¹ Skipped streams are filtered from Data["streams"] — no healthReason is emitted. The classification is internal only.

² Active rows measure time since LastFailure (recency of failure activity). Terminated rows measure time since OutageStartedAt (when the first failure in the current outage occurred) — this prevents inner retries triggered by outer resubscriptions from continuously resetting the staleness clock via RecordFailure.

This resolves:

  • False-healthy window: Error-terminated streams with failures report Degraded, not invisible
  • Stale disconnected subscribers: Terminated streams with 0 failures (clean disconnect) are skipped
  • Unnecessary restarts from unsubscribed streams: Streams terminated by RefCount() disposal (all clients left) are skipped regardless of failure count — nobody is using the stream, so pod restart has no benefit
  • Prolonged outage escalation: Error-terminated streams escalate to Unhealthy when OutageStartedAt exceeds 30 min (staleness threshold). Using OutageStartedAt (set on first failure) instead of LastFailure (refreshed on every retry) ensures the staleness clock is not reset by the inner retry's RecordFailure calls within a single Terminated phase. However, if WatchAsync keeps briefly succeeding (cursor opens → MarkActive clears OutageStartedAt → cursor fails → retries exhaust → Terminated), each cycle resets the clock and 30-min escalation is unreachable — Degraded is the steady state (see "Edge case" above). Escalation IS reachable when WatchAsync always fails (cursor never opens, MarkActive never fires) or when the outer retry gives up (24h timeout)

Edge case — WatchAsync succeeds but cursor immediately fails: If WatchAsync opens a cursor but MoveNextAsync throws immediately on each retry, MarkActive resets both ConsecutiveFailures to 0 and OutageStartedAt to null each cycle. The failure count never exceeds 1, so the Active two-threshold logic (threshold = 5) is never reached. The Terminated path classifies as Degraded each cycle, but OutageStartedAt also resets because MarkActive clears it on cursor creation — so the 30-min staleness escalation is unreachable during active reconnect-fail loops.

Why Degraded is correct here: The stream IS partially working (cursor opens each cycle), so pod restart is unlikely to help — the underlying issue is probably server-side. The terminal_errors OTel counter accumulates across all cycles (one increment per exhaustion), enabling Grafana alerting on persistent reconnect-fail patterns even though the liveness probe stays at Degraded. The 30-min escalation remains reachable when (a) WatchAsync always fails (cursor never opens, MarkActive never fires, OutageStartedAt persists) or (b) the outer retry eventually gives up (24h total timeout).

Recommended alert rules (Grafana Cloud, PromQL):

One inner retry exhaustion takes ~28 min with default settings (15 retries, 2s initial, 180s max: delays = 2+4+8+16+32+64+128+180×8 = 1694s before jitter). Jitter is ±10% (jitterFactor = 0.1 at MongoContext.cs:389): at max delay 180s, each attempt varies by ±18s. With 8 attempts at max delay, worst-case jitter adds ~2.4 min → ~30.4 min per exhaustion. The outer retry (MyUtils.cs:625-627) uses Math.Pow(2, retryCount) seconds capped at 10 min — so early outer backoffs are short (1s, 2s, 4s, 8s...) and only reach 10 min after ~10 cycles. Full cycle times: ~28-30 min (early, including jitter), growing toward ~40 min after many exhaustions. Alerts must account for worst-case jitter and growing backoffs.

  1. Sustained retry exhaustion (max_retries):
expr: >-
  sum by (stream_key, pod) (
    increase(syrf_changestream_terminal_errors_total{service_name="SyRF API", deployment_environment="production", terminal_reason="max_retries"}[3h])
  ) >= 3

Three retry exhaustions per stream per pod in three hours means the inner retry has exhausted its budget three times on a single replica. Each exhaustion takes ~28-40 min with jitter, so 3 × 40 min = 120 min fits within the 180 min window. No for: duration needed — the 3h increase window already smooths transients. Response: investigate MongoDB connectivity/replica set health, check Atlas alerts.

  1. Non-retryable error (non_retryable):
expr: >-
  sum by (stream_key, pod) (
    increase(syrf_changestream_terminal_errors_total{service_name="SyRF API", deployment_environment="production", terminal_reason="non_retryable"}[3h])
  ) >= 1

Any non-retryable error is immediately actionable — these bypass the retry logic entirely (classified by ChangeStreamRetryClassifier.IsRetryable) and fire on the first occurrence. This covers MongoConfigurationException (connection string errors), MongoAuthenticationException (credential failures), and other non-transient MongoException subtypes excluded in the classifier. Threshold is >= 1 (not >= 3) because non-retryable errors don't self-heal — one is enough to investigate. Response: check application logs for the specific exception type and lastError in health check data.

  1. Flapping stream:
expr: >-
  sum by (stream_key, pod) (
    increase(syrf_changestream_recoveries_total{service_name="SyRF API", deployment_environment="production"}[3h])
  ) >= 3

Three recoveries per stream per pod in three hours indicates persistent die-recover-work-die cycling — the stream genuinely processes documents between outages. Because the metric is document-gated (deferred from MarkActive to RecordSuccess), reconnect-fail loops (cursor opens but fails before documents) do NOT inflate this count — those are caught by the terminal_errors alert above. Response: check lastError in structured logs for the stream key, investigate cursor-killing operations (e.g., server-side timeouts, collection drops).

Why no for: duration: With ~28-40 min cycles (including jitter), increase(...[3h]) >= 3 is true for the duration that all 3 events remain within the sliding window. A for: clause risks the condition oscillating as events age in/out of the window — the increase range already provides sufficient smoothing.

Alert overlap: In reconnect-fail loops, only the max_retries alert (1) fires (recovery metric never emits because no documents flow). When alerts 1 and 3 fire simultaneously, it indicates genuine flapping — the stream works briefly between outages (documents flow → RecordSuccess emits recovery) then fails again. The non_retryable alert (2) fires independently — it indicates a fundamentally different problem (bad configuration, not connectivity). Alert 4 (multi-pod) co-fires with alert 1 when multiple pods independently hit their per-pod thresholds — this is expected; alert 4 adds the signal that the outage is infrastructure-wide, not pod-local.

  1. Cross-pod persistent failure (multi-pod companion):
expr: >-
  count by (stream_key) (
    sum by (stream_key, pod) (
      increase(syrf_changestream_terminal_errors_total{service_name="SyRF API", deployment_environment="production", terminal_reason="max_retries"}[3h])
    ) >= 2
  ) >= 2

Fires when at least 2 pods each independently accumulate >= 2 retry exhaustions for the same stream in 3h. The inner sum by (stream_key, pod) ... >= 2 produces one row per pod that has failed at least twice; the outer count by (stream_key) ... >= 2 requires at least 2 such pods. This cannot be triggered by a single noisy pod — it structurally requires independent failures on multiple replicas, indicating an infrastructure-level issue (e.g., MongoDB primary failover, network partition) rather than a pod-local problem. Response: same as alert 1 (connectivity/replica set health), but the multi-pod pattern points to cluster-wide or MongoDB-side causes.

Alerts 1-3 page on {service_name, deployment_environment} and group by (stream_key, pod) — per-pod granularity prevents 2-3 pods hitting thresholds faster than intended from the same underlying outage (replica-sensitivity). Alert 4 uses a nested count by (stream_key) over sum by (stream_key, pod) to structurally require failures on multiple pods — it cannot be triggered by a single noisy replica. The deployment_environment="production" label matcher is mandatory on all rules — omitting it allows staging metrics to trigger production alerts. Update service_name when rolling out to PM or Quartz services. For dashboard views, sum by (stream_key) (without the count wrapper) provides a service-wide perspective. Alerts 1, 3, and 4 should fire at warning severity; alert 2 (non_retryable) should fire at critical severity since non-transient errors require immediate investigation. These are starting thresholds — tune after observing baseline rates in staging. Alert configuration is a Grafana Cloud concern (not application code) and can be adjusted without redeployment.

Reachability of Active + >= threshold: With MarkActive resetting ConsecutiveFailures = 0 on cursor creation, Active-phase streams can only accumulate >= threshold failures when WatchAsync itself keeps failing (cursor never opens, MarkActive never fires). In this case, the stream IS non-functional and escalation to Unhealthy is correct. Quiet-but-open streams (cursor opens, no documents) always have 0 failures after MarkActive → classified as Healthy.

Note: _changeStreamCache retains the Publish().RefCount() wrapper, so re-subscription triggers Observable.Defer → fresh cursor + fresh attempt = 0. The phase transition only affects health state, not the cached observable pipeline.

Retry architecture

The full retry chain has two tiers. Only the inner tier interacts with the health tracker:

SignalR Subscription Manager (outer)
  └── RetryWithExponentialBackoff (MyUtils.cs:596-599)
      maxTotalTimeToRetry: 24 hours
      maxRetryInterval: 10 minutes
      Does NOT call RecordFailure/RecordSuccess
      └── MongoContext change stream (inner)
          └── RetryWithExponentialBackoff (MongoContext.cs:312-319)
              maxRetries: 15
              maxDelay: 180 seconds
              Calls RecordFailure (onRetry) / RecordTerminalError (onTerminalError) / RecordSuccess (onSuccess)

When inner retries exhaust → Observable.Throw terminates the inner sequence → Do(onError:) sets error flag → .Finally() calls MarkTerminated(key, Error) [clears pending recovery flag] → error propagates to the outer retry → outer retry waits (up to 10 min) → resubscribes → Observable.Defer re-executes → WatchAsync succeeds → MarkActive(key) [Phase = Active, ConsecutiveFailures = 0, Termination = null, flags pending recovery] → cursor polling begins → first document → RecordSuccess [updates LastSuccess, emits deferred Recoveries metric].

When all subscribers disconnect → RefCount() disposes upstream → Publish() disposes → .Finally() calls MarkTerminated(key, Unsubscribed) → stream is skipped in liveness classification (nobody is using it).

The outer retry is invisible to the health tracker by design — it doesn't know about ChangeStreamHealthTracker. The Terminated + Error phase (Degraded classification) bridges this gap: the stream is visibly impaired during the outer retry's backoff without triggering pod restarts. Terminated + Unsubscribed streams are skipped entirely — they represent idle cached observables with no active consumers.

Injectable retry configuration

Retry parameters for the inner RetryWithExponentialBackoff in MongoContext.GetCachedCollectionChangeStream are currently hardcoded (maxRetries: 15, initialDelay: 2s, maxDelay: 180s at line 312-315). Extract into an injectable options class so tests can use fast values without waiting through real delays:

// New file: src/libs/mongo/SyRF.Mongo.Common/ChangeStreamRetryOptions.cs
public class ChangeStreamRetryOptions
{
    public int MaxRetries { get; init; } = 15;
    public TimeSpan InitialDelay { get; init; } = TimeSpan.FromSeconds(2);
    public TimeSpan MaxDelay { get; init; } = TimeSpan.FromSeconds(180);

    /// <summary>Fail-fast validation — called by MongoContext constructors.</summary>
    public void Validate()
    {
        ArgumentOutOfRangeException.ThrowIfNegativeOrZero(MaxRetries);
        ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(InitialDelay, TimeSpan.Zero);
        ArgumentOutOfRangeException.ThrowIfLessThan(MaxDelay, InitialDelay);
    }
}

Fail-fast validation: Validate() is called by both MongoContext constructors (after _retryOptions = retryOptions ?? new ChangeStreamRetryOptions(); _retryOptions.Validate();). This catches invalid values at service startup rather than at first change stream subscription — when RetryWithExponentialBackoff would throw the same ArgumentOutOfRangeException at MongoContext.cs:399. The MaxDelay >= InitialDelay guard mirrors the existing retry extension guard. Default-constructed instances always pass validation.

Intentional tightening: Validate() rejects InitialDelay = TimeSpan.Zero (ThrowIfLessThanOrEqual), while the existing retry extension guard at MongoContext.cs:398 only rejects negative values (ThrowIfLessThan(initialDelay, TimeSpan.Zero)). This is deliberate — a zero initial delay makes the exponential formula produce Math.Pow(2, n) * 0 = 0 for all attempts, collapsing all retries to instant (no backoff). The extension allows it for flexibility; ChangeStreamRetryOptions forbids it because instant retries are never appropriate for change stream reconnection.

Why InitialDelay must be configurable: RetryWithExponentialBackoff enforces maxDelay >= initialDelay (MongoContext.cs:399). Setting only MaxDelay to a small test value while InitialDelay remains at 2s would throw ArgumentOutOfRangeException.

// In MongoContext — both constructors gain the parameter:

// Protected testing constructor (line 27):
protected MongoContext(ILogger<MongoContext> logger, IResumePointRepository resumeRepo,
    IChangeStreamHealthTracker healthTracker,
    ChangeStreamRetryOptions? retryOptions = null)
{
    ...
    _retryOptions = retryOptions ?? new ChangeStreamRetryOptions();
    _retryOptions.Validate();
}

// Public constructor (line 38):
public MongoContext(
    MongoConnectionSettings mongoConnectionSettings,
    ILogger<MongoContext> logger, IResumePointRepository resumeRepo,
    IChangeStreamHealthTracker healthTracker,
    ChangeStreamRetryOptions? retryOptions = null)
{
    ...
    _retryOptions = retryOptions ?? new ChangeStreamRetryOptions();
    _retryOptions.Validate();
}

// In GetCachedCollectionChangeStream (line 312-315):
.RetryWithExponentialBackoff(
    maxRetries: _retryOptions.MaxRetries,
    initialDelay: _retryOptions.InitialDelay,
    maxDelay: _retryOptions.MaxDelay,
    ...)

Both constructors default retryOptions to null (falling back to new ChangeStreamRetryOptions() with production defaults), keeping the parameter optional for existing callers and test subclasses. The protected constructor is used by test fixtures (e.g., MongoDbTestFixture) that bypass MongoClient creation — they can pass fast retry options directly.

DI registration: services.AddSingleton(new ChangeStreamRetryOptions()); (default values match current hardcoded behaviour — zero runtime change). Tests inject options directly via the constructor:

new ChangeStreamRetryOptions { MaxRetries = 1, InitialDelay = TimeSpan.FromMilliseconds(1), MaxDelay = TimeSpan.FromMilliseconds(1) }

Terminal error recording gap: RetryWithExponentialBackoff only calls onRetry (→ RecordFailure) for retryable exceptions (MongoContext.cs:437). Non-retryable exceptions (!shouldRetry(ex) at line 418) and max-retries-exceeded (line 425-428) throw immediately without recording. If a non-retryable error hits on the first attempt, ConsecutiveFailures stays 0, and .Finally() → Terminated + 0 → Skipped (completely invisible). Note: with the current broad shouldRetry predicate (ex is MongoException or TimeoutException or ChangeStreamClosedException), all MongoException subtypes are retried — including non-transient ones like MongoConfigurationException and MongoAuthenticationException. This makes the non_retryable path nearly unreachable. See "Retry predicate narrowing" below for the fix.

Fix: Add an onTerminalError callback to RetryWithExponentialBackoff that fires for both non-retryable exceptions AND max-retries-exceeded, before the error propagates:

// In RetryWithExponentialBackoff (.RetryWhen handler):
if (!shouldRetry(ex))
{
    onTerminalError?.Invoke(ex, TerminalErrorReason.NonRetryable);  // Record before propagating
    return Observable.Throw<long>(ex);
}

attempt++;
if (attempt > maxRetries)
{
    onTerminalError?.Invoke(ex, TerminalErrorReason.MaxRetries);  // Record before propagating
    return Observable.Throw<long>(ex);
}
// In MongoContext wiring (line 312-319):
.RetryWithExponentialBackoff(
    ...,
    onRetry: (ex, attempt, delay) => _healthTracker.RecordFailure(key, ex),
    onSuccess: () => _healthTracker.RecordSuccess(key),
    onTerminalError: (ex, reason) => _healthTracker.RecordTerminalError(key, ex, reason))

Parameter ordering: onTerminalError should be the final optional parameter in the RetryWithExponentialBackoff extension signature (after onRetry and onSuccess), typed as Action<Exception, TerminalErrorReason>?. Using the TerminalErrorReason enum (not raw strings) prevents typo-induced label proliferation — the compiler rejects invalid values. This minimises positional-call break risk — existing callers using positional arguments for onRetry/onSuccess won't be broken by the new parameter appended at the end. Callers using named arguments are unaffected regardless of ordering.

This ensures ConsecutiveFailures > 0 when the stream terminates from any error, preventing Terminated + 0 → Skipped for genuinely failed streams. RecordTerminalError emits terminal_errors with a terminal.reason tag mapped from the TerminalErrorReason enum (not retry_failures), keeping the two metrics semantically distinct and enabling reason-specific alerting.

Retry predicate narrowing

The current shouldRetry predicate (ex is MongoException or TimeoutException or ChangeStreamClosedException at MongoContext.cs:316) retries all MongoException subtypes, including non-transient ones like MongoConfigurationException and MongoAuthenticationException (both inherit from MongoException). This means non_retryable only fires for exceptions outside the Mongo driver hierarchy (e.g., InvalidOperationException, NullReferenceException) — making the terminal.reason tag and the non-retryable alert (alert 2) nearly useless in practice.

Fix: Centralize the retry decision in a single static classifier method. This is the single source of truth for retry eligibility — MongoContext's shouldRetry lambda delegates to it, and tests verify each excluded type individually:

// In ChangeStreamRetryOptions.cs (alongside the options class):
public static class ChangeStreamRetryClassifier
{
    /// <summary>
    /// Returns true if the exception is transient and should be retried.
    /// Non-transient MongoException subtypes are excluded — retrying them
    /// wastes retry budget (~28 min per exhaustion cycle) and delays
    /// the terminal error alert.
    /// </summary>
    public static bool IsRetryable(Exception ex) => ex is (MongoException
            and not MongoConfigurationException
            and not MongoAuthenticationException)
        or TimeoutException
        or ChangeStreamClosedException;
}
// In MongoContext (line 316) — delegates to classifier:
shouldRetry: ChangeStreamRetryClassifier.IsRetryable,

This retries connection failures (MongoConnectionException), replica set failovers (MongoNotPrimaryException, MongoNodeIsRecoveringException), cursor expiry, command errors, and other transient subtypes — while routing configuration and authentication errors to the non_retryable path for immediate alerting. The exclude-list pattern (broad retry, narrow exclusions) is safer than an include-list: newly added transient MongoException subtypes in driver updates are retried by default rather than silently becoming non-retryable.

MongoInternalException decision: MongoInternalException exists in the driver and indicates internal driver bugs or invariant violations (e.g., unexpected state in connection pooling). Decision: keep retryable (do NOT exclude). Rationale: internal exceptions are often transient — a connection pool race or a corrupted wire message may succeed on the next attempt with a fresh connection. Excluding it would route rare driver bugs to the non_retryable path, generating noisy alerts for conditions that typically self-heal on retry. If a MongoInternalException is truly persistent, the max_retries path catches it after exhausting the retry budget. This decision is locked by IsRetryable_ReturnsTrue_ForMongoInternalException (see Testing section).

Implementation note: The exclude list (MongoConfigurationException, MongoAuthenticationException) covers the known non-transient subtypes where retrying cannot succeed. If future driver versions introduce new non-transient subtypes, add them to the exclude list, add a corresponding test, and update the alert 2 description. Each excluded type must have a dedicated unit test (see Testing section) to lock the decision.


Part 2: OTel Metrics Instruments

New file: src/libs/mongo/SyRF.Mongo.Common/ChangeStreamMetrics.cs

Static Meter + three Counter<long> instruments. Follows the existing pattern in DiagnosticsActivityEventSubscriber.cs (static instruments from assembly metadata). No new NuGet packages — System.Diagnostics.Metrics is built into .NET 10.

public static class ChangeStreamMetrics
{
    private static readonly AssemblyName AssemblyName =
        typeof(ChangeStreamMetrics).Assembly.GetName();

    public static readonly Meter Meter = new(
        AssemblyName.Name!,
        AssemblyName.Version?.ToString());

    public static readonly Counter<long> RetryFailures = Meter.CreateCounter<long>(
        "syrf.changestream.retry_failures",
        description: "Number of individual change stream retry attempts that failed");

    public static readonly Counter<long> TerminalErrors = Meter.CreateCounter<long>(
        "syrf.changestream.terminal_errors",
        description: "Number of change stream terminal failures (non-retryable or max retries exceeded)");

    public static readonly Counter<long> Recoveries = Meter.CreateCounter<long>(
        "syrf.changestream.recoveries",
        description: "Number of change stream recovery events (first document after recovery)");
}

Metric semantics

  • retry_failures counts each individual retry attempt that fails (called from onRetry callback in RetryWithExponentialBackoff). If a stream fails 5 times before recovering, that's 5 increments. This measures retry pressure. Does NOT include terminal errors (non-retryable or max-retries-exceeded) — those have their own counter.
  • terminal_errors counts each terminal failure — either a non-retryable exception or max retries exceeded (called from onTerminalError callback). Tagged with terminal.reason via TerminalErrorReason.ToMetricTag() — the enum produces "non_retryable" or "max_retries" through a single mapping point, preventing typo-induced label proliferation. One terminal error per outage cycle. This measures outage severity — a high terminal_errors count with max_retries indicates streams are exhausting their retry budgets (connectivity/failover issues); non_retryable indicates non-transient errors excluded by the narrowed shouldRetry predicate (e.g., MongoConfigurationException, MongoAuthenticationException) that bypass retry logic entirely.
  • recoveries counts each confirmed recovery — emitted by RecordSuccess (first document) after MarkActive flags a pending recovery (cursor created after prior failures, suppressed when prior termination was Unsubscribed). If a stream fails 5 times then reconnects and processes a document, that's 1 increment. This measures confirmed outage resolutions and is the primary flapping indicator. The metric is document-gated (deferred from cursor creation to first document) to avoid inflating counts during reconnect-fail loops where cursors open but immediately fail.

Recovery metric — document-gated via deferred emission: MarkActive detects recovery conditions (ConsecutiveFailures > 0, prior termination not Unsubscribed) and sets _pendingRecoveryMetrics[streamKey] = true. RecordSuccess (first document) checks _pendingRecoveryMetrics.TryRemove(streamKey, out _) and emits ChangeStreamMetrics.Recoveries only when the flag is present. This two-step pattern ensures recovery is confirmed by document flow, not just cursor creation. In reconnect-fail loops (cursor opens → MarkActive sets flag → cursor fails before documents → RecordTerminalError/MarkTerminated clears flag), the metric never fires — correctly reflecting that the stream never truly recovered. _pendingRecoveryMetrics is a ConcurrentDictionary<string, bool> alongside _streamStates, keeping metric-emission state separate from health-classification state. The TryRemove call in RecordSuccess is atomic, preventing double-emission under concurrency.

Quiet-collection trade-off: Collections that reconnect but produce no documents will not emit a recovery metric until the first document arrives. This is acceptable because: (a) health state is already correct — MarkActive resets ConsecutiveFailures immediately, so the stream classifies as Healthy; (b) terminal_errors still tracks the prior exhaustion event; © for truly quiet collections, the absence of a recovery metric has no alerting impact since there's no flapping to detect.

When inner retries exhaust, .Finally() marks the stream Terminated with TerminationReason.Error → Degraded (visible but not triggering restarts) while the outer retry (see "Retry architecture" in Part 1) attempts to resubscribe. On resubscription, MarkActive transitions the stream back to Active with 0 failures before any documents flow.

To detect flapping: high recoveries rate = stream is repeatedly failing and recovering. To assess severity: high retry_failures / recoveries ratio = many retries per outage (slow recovery).

Emit metrics from ChangeStreamHealthTracker

Critical: emit metrics outside the AddOrUpdate delegates — never inside them. ConcurrentDictionary.AddOrUpdate may invoke the update factory multiple times under contention (the CAS loop retries until it wins). Calling Counter.Add() inside the delegate would overcount. Similarly, avoid capturing mutable external state from within delegates (e.g., wasFailing = existing.ConsecutiveFailures > 0). Instead, use TryGetValue before AddOrUpdate to snapshot the pre-update state, then use the snapshot to decide metric emission after the update completes (see MarkActive for the flag-setting pattern and RecordSuccess for the deferred emission pattern). The TryGetValue + post-update flag/emission introduces a benign TOCTOU gap: under contention, the worst case is a rare double/missed recovery count. These counters are SLI-level indicators (trend detection, alerting on sustained patterns) — not exact audit counts. A rare ±1 under high contention does not affect dashboard accuracy or alert thresholds.

Deferred recovery emission: The Recoveries metric uses a two-step deferred pattern via _pendingRecoveryMetrics (ConcurrentDictionary<string, bool>). MarkActive sets the flag when recovery conditions are met; RecordSuccess clears it and emits the counter. RecordTerminalError and MarkTerminated clear the flag without emitting (recovery failed or stream was unsubscribed). This keeps metric-emission state separate from health-classification state (_streamStates).

  • RecordFailure: call AddOrUpdate first, then unconditionally increment ChangeStreamMetrics.RetryFailures with stream.key tag. One call to RecordFailure = exactly one metric increment.
  • RecordTerminalError: increments ConsecutiveFailures (ensures Terminated + 0 → Skipped doesn't hide genuinely failed streams) and emits ChangeStreamMetrics.TerminalErrors with a terminal.reason tag mapped from the TerminalErrorReason enum via reason.ToMetricTag() (produces "non_retryable" or "max_retries"). Separate from RecordFailure to avoid conflating retryable and terminal failures in the same metric:
public void RecordTerminalError(string streamKey, Exception? error = null, TerminalErrorReason reason = TerminalErrorReason.MaxRetries)
{
    var now = _timeProvider.GetUtcNow().UtcDateTime;
    _streamStates.AddOrUpdate(
        streamKey,
        _ => new StreamHealthState(
            ConsecutiveFailures: 1,
            LastFailure: now,
            OutageStartedAt: now,
            LastError: error?.Message,
            Phase: StreamPhase.Active),
        (_, existing) => existing with
        {
            ConsecutiveFailures = existing.ConsecutiveFailures + 1,
            LastFailure = now,
            LastError = error?.Message,
            Phase = StreamPhase.Active,
            Termination = null,
            OutageStartedAt = existing.ConsecutiveFailures == 0 ? now : existing.OutageStartedAt
        });

    ChangeStreamMetrics.TerminalErrors.Add(1,
        new KeyValuePair<string, object?>("stream.key", streamKey),
        new KeyValuePair<string, object?>("terminal.reason", reason.ToMetricTag()));
    _pendingRecoveryMetrics.TryRemove(streamKey, out _);  // recovery failed
}
  • RecordSuccess: update LastSuccess timestamp and reset failures (belt-and-suspenders — MarkActive already cleared them). Emits the deferred recovery metric when MarkActive previously flagged a recovery (first document confirms the stream is truly working, not just a cursor that opened and immediately failed in a reconnect-fail loop):
public void RecordSuccess(string streamKey)
{
    var now = _timeProvider.GetUtcNow().UtcDateTime;
    _streamStates.AddOrUpdate(
        streamKey,
        _ => new StreamHealthState(LastSuccess: now, Phase: StreamPhase.Active),
        (_, existing) => existing with
        {
            LastSuccess = now,
            ConsecutiveFailures = 0,
            Phase = StreamPhase.Active,
            Termination = null,
            OutageStartedAt = null
        });

    // Emit deferred recovery metric — MarkActive flagged a recovery when the
    // cursor opened after prior failures. The first document confirms the stream
    // is genuinely working. TryRemove is atomic: exactly one thread emits.
    if (_pendingRecoveryMetrics.TryRemove(streamKey, out _))
        ChangeStreamMetrics.Recoveries.Add(1,
            new KeyValuePair<string, object?>("stream.key", streamKey));
}

Tag conventions: stream.key = the aggregate root's fully-qualified type name (type.FullName ?? type.Name from MongoContext.cs:221, e.g., SyRF.ProjectManagement.Core.Model.ProjectAggregate.Project). Low cardinality (~5-10 values). terminal.reason (on terminal_errors only) = "non_retryable" or "max_retries", mapped from TerminalErrorReason enum via ToMetricTag(). Low cardinality (2 values — ToMetricTag() throws ArgumentOutOfRangeException for unmapped enum members, so adding a new reason requires updating both the mapping and the alert rules).


Part 3: Wire Up OTel Metrics Pipeline

File: src/libs/webhostconfig/SyRF.WebHostConfig.Common/Extensions/SyrfConfigureServices.cs

Add .WithMetrics() to AddOpenTelemetryConfig() (line 111) alongside existing .WithTracing():

var environmentName = configuration["RuntimeEnvironment"] ?? "unknown";
var podName = Environment.GetEnvironmentVariable("POD_NAME") ?? "unknown";
services.AddOpenTelemetry()
    .ConfigureResource(resource => resource
        .AddService("SyRF API")
        .AddAttributes(new[]
        {
            new KeyValuePair<string, object>("deployment.environment", environmentName),
            new KeyValuePair<string, object>("k8s.pod.name", podName)
        }))
    .WithMetrics(metricsBuilder =>
    {
        metricsBuilder
            .AddMeter(ChangeStreamMetrics.Meter.Name)  // custom change stream metrics
            .AddAspNetCoreInstrumentation()       // request rate, latency, errors
            .AddHttpClientInstrumentation()       // outbound HTTP metrics
            .AddRuntimeInstrumentation();         // GC, thread pool, memory

        if (!string.IsNullOrWhiteSpace(configuration["OTEL_EXPORTER_OTLP_ENDPOINT"]))
        {
            metricsBuilder.AddOtlpExporter();
        }
    })
    .WithTracing(tracerProviderBuilder =>
    {
        // ... existing tracing config — REMOVE .ConfigureResource() from here ...
    });

Resource identity: .ConfigureResource() is moved from the .WithTracing() block (where it currently lives at line 119) to the top-level AddOpenTelemetry() call. This ensures traces and metrics share the same service.name, deployment.environment, and k8s.pod.name attributes, which are required for cross-signal correlation, environment-scoped alerting, and per-pod alert grouping in Grafana. The deployment.environment attribute is sourced from configuration["RuntimeEnvironment"] — set by the SYRF__RuntimeEnvironment env var (env-mapping runtime section, env-mapping.yaml:60), which resolves to the environment name from values.yaml (e.g., "staging", "production"). The k8s.pod.name attribute is sourced from the POD_NAME env var, populated via the Kubernetes downward API in the Helm chart (see step 4d below). These are the labels referenced as deployment_environment and pod in the PromQL alert rules (Grafana Cloud's Prometheus backend translates OTel resource attribute dots to underscores).

Required imports: Add using OpenTelemetry.Metrics; to SyrfConfigureServices.cs. The AddMeter(), AddAspNetCoreInstrumentation() (metrics overload), AddHttpClientInstrumentation() (metrics overload), AddRuntimeInstrumentation(), and AddOtlpExporter() (metrics overload) extension methods live in this namespace. using SyRF.Mongo.Common; is already present (line 16) for ChangeStreamMetrics.Meter.Name.

This activates ~50+ built-in metrics from the already-installed packages, plus the 3 custom change stream counters. ChangeStreamMetrics.Meter.Name resolves to the assembly name (SyRF.Mongo.Common) — using the constant avoids a magic string that could silently break if the assembly is renamed. The OTLP exporter only activates when OTEL_EXPORTER_OTLP_ENDPOINT is set.

Scope: Only the API service calls AddOpenTelemetryConfig(). PM and Quartz services are a follow-up task (see Future Enhancements).

Service name footgun: .AddService("SyRF API") is hardcoded for v1 (API-only). When rolling out to PM or Quartz, this must be parameterised (e.g., configuration["ServiceName"] or per-service constant) to avoid all services reporting as "SyRF API". Mislabelled service_name would break per-service alert routing and make dashboards unreliable. The PM/Quartz rollout task should include service name parameterisation as a prerequisite.


Part 4: Grafana Cloud Free Tier

4a. Sign up

Manual step: create free account at grafana.com. Navigate to the OpenTelemetry configuration page in your Grafana Cloud stack settings to generate:

  • OTLP endpoint URL (e.g., https://otlp-gateway-prod-eu-west-2.grafana.net/otlp)
  • Instance ID (numeric)
  • API key (for OTLP authentication)

Reference: Grafana Cloud OTLP setup docs

4b. Store credentials in GCP Secret Manager

Create a secret grafana-cloud-otlp in project camarades-net with keys:

  • endpoint: the OTLP gateway URL (e.g., https://otlp-gateway-prod-eu-west-2.grafana.net/otlp)
  • protocol: http/protobuf
  • headers: Authorization=Basic <base64(instanceId:apiKey)>

Important: The headers value must use key=value format (not just the token). This is what OTEL_EXPORTER_OTLP_HEADERS expects. The base64 payload is instanceId:apiKey encoded per RFC 7617.

4c. Add ExternalSecret (cluster-gitops)

Create an ExternalSecret that syncs grafana-cloud-otlp from GCP Secret Manager to a Kubernetes secret. Follow the existing pattern used for sentry and elastic-apm secrets.

4d. Add OTLP env vars to Helm chart

File: src/charts/syrf-common/env-mapping.yaml

Add a new section following the existing patterns:

otlp:
  services: [api]
  displayName: "OpenTelemetry OTLP"
  condition: otlp.enabled
  envVars:
    - name: OTEL_EXPORTER_OTLP_ENDPOINT
      secretNamePath: otlp.authSecretName
      secretNameDefault: grafana-cloud-otlp
      secretKey: endpoint
    - name: OTEL_EXPORTER_OTLP_PROTOCOL
      secretNamePath: otlp.authSecretName
      secretNameDefault: grafana-cloud-otlp
      secretKey: protocol
    - name: OTEL_EXPORTER_OTLP_HEADERS
      secretNamePath: otlp.authSecretName
      secretNameDefault: grafana-cloud-otlp
      secretKey: headers

After modifying env-mapping.yaml, regenerate the Helm template and update service values:

cd src/charts/syrf-common && npm run generate:env-blocks -- --update-values

This does two things: (1) regenerates src/charts/syrf-common/templates/_env-blocks.tpl from the mapping file, and (2) populates otlp.authSecretName: grafana-cloud-otlp into service chart values.yaml files. The --update-values flag is mandatory — omitting it leaves the Helm value undefined, causing an empty secret name at deploy time.

Cross-service scope: The generator iterates all service charts (api, project-management, quartz, web), but each env-mapping section has a services filter (generate-env-blocks.ts:721-725). The OTLP section above specifies services: [api], so OTLP defaults are written only to the API chart's values.yaml. However, sections without a services field (or with services: [api, project-management, quartz]) affect all .NET service charts. If you add other sections in the same generator run, those may insert defaults into charts beyond API. Run a dry-run first to preview:

cd src/charts/syrf-common && npm run generate:env-blocks -- --update-values --dry-run

Review the output, then re-run without --dry-run to apply. Verify diffs are limited to the expected chart(s).

Note: secretNameDefault is a generation-time default, not a runtime fallback. The --update-values flag writes it into the matching service chart's values.yaml, which is the value Helm reads at deploy time. Environment-specific overrides in cluster-gitops (e.g., staging.values.yaml) take precedence via Helm's values merge order.

4e. Add pod name resource attribute (Helm chart)

File: src/charts/syrf-common/templates/_helpers.tpl (or the service deployment template)

Add a POD_NAME env var sourced from the Kubernetes downward API. This is not a secret — it uses fieldRef to expose the pod's own name:

env:
  - name: POD_NAME
    valueFrom:
      fieldPath: metadata.name

This populates the k8s.pod.name OTel resource attribute used in ConfigureResource() (see Part 3). The attribute surfaces as the pod label in Grafana Cloud (Prometheus backend translates k8s_pod_namepod via standard relabeling). Alert rules 1-3 group by (stream_key, pod) and alert 4 requires per-pod counts — both depend on this label being present.

Implementation: Add to the shared _env-blocks.tpl helper or directly in each service's deployment template. Since this is a fieldRef (not a secret), it doesn't go through the env-mapping generator — add it as a static env block in the common chart template, gated on otlp.enabled (same condition as the OTLP env vars).

4f. Enable per environment (cluster-gitops)

# syrf/environments/staging/staging.values.yaml
otlp:
  enabled: true
  authSecretName: grafana-cloud-otlp  # must match ExternalSecret name from step 4c

4g. Verification — Helm render

# Verify template renders correctly with otlp enabled
helm template test src/services/api/.chart --set otlp.enabled=true | grep OTEL

# Verify template is clean with otlp disabled (fails if any OTEL var found)
! helm template test src/services/api/.chart --set otlp.enabled=false | grep -q OTEL

Cost Analysis

Component Cost Notes
Code changes (metrics instruments) $0 Built into .NET 10, no packages
In-app overhead Negligible Counter.Add() is nanoseconds, batched export every 60s
Grafana Cloud free tier $0 10K active series, 14-day retention
OTLP export bandwidth Negligible A few KB every 60 seconds

Cardinality: stream.key tag has ~5-10 unique values (one per watched collection). terminal.reason tag (on terminal_errors only) has 2 values (ToMetricTag() throws for unmapped enum members — new values require updating both the mapping and alert rules). Total active series well within free tier limits.


Files to Modify

File Change
src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthTracker.cs Extract IChangeStreamHealthTracker interface, with expressions, emit metrics, enrich GetHealthStatus data with healthReason, two-threshold grace period, MarkActive() (flags pending recovery) + RecordSuccess() (emits deferred recovery metric) + MarkTerminated(reason) + RecordTerminalError(TerminalErrorReason) (emits terminal_errors with terminal.reason tag via ToMetricTag()) + StreamPhase + TerminationReason + TerminalErrorReason enum + TerminalErrorReasonExtensions.ToMetricTag(), _pendingRecoveryMetrics dictionary, TimeProvider injection
src/libs/mongo/SyRF.Mongo.Common/MongoContext.cs Depend on IChangeStreamHealthTracker (constructor injection), inject ChangeStreamRetryOptions (see below), add MarkActive(key) after WatchAsync, Do(onError:) + .Finally() with TerminationReason before Publish().RefCount(), narrow shouldRetry via centralized IsRetryable classifier (see below), onTerminalError: Action<Exception, TerminalErrorReason>RecordTerminalError with enum forwarding in RetryWithExponentialBackoff
src/libs/mongo/SyRF.Mongo.Common/ChangeStreamRetryOptions.cs NEW — Injectable retry configuration class + ChangeStreamRetryClassifier.IsRetryable() static classifier method (single source of truth for retry eligibility)
src/libs/mongo/SyRF.Mongo.Common/ChangeStreamHealthCheck.cs Change constructor to depend on IChangeStreamHealthTracker instead of concrete ChangeStreamHealthTracker
src/libs/mongo/SyRF.Mongo.Common/ChangeStreamMetrics.cs NEW — Meter + 3 Counter instruments (RetryFailures, TerminalErrors, Recoveries)
src/libs/webhostconfig/SyRF.WebHostConfig.Common/Extensions/SyrfConfigureServices.cs Add .WithMetrics() to OTel config, update DI registration for IChangeStreamHealthTracker (see registration snippet below)
src/libs/mongo/SyRF.Mongo.Common.Tests/ChangeStreamHealthTrackerTests.cs ~20 new tests (including TerminationReason, healthReason, and TerminalErrorReason enum mapping tests)
src/libs/mongo/SyRF.Mongo.Common.Tests/ChangeStreamRetryClassifierTests.cs NEW — 9 tests for IsRetryable() classifier (one per excluded/included exception type, including MongoNodeIsRecoveringException and MongoInternalException)
src/libs/mongo/SyRF.Mongo.Common.Tests/RetryWithExponentialBackoffTests.cs 2 new tests for onTerminalError callback (non-retryable and max-retries-exceeded, using TerminalErrorReason enum)
src/libs/mongo/SyRF.Mongo.Common.Tests/ChangeStreamHealthCheckTests.cs No code change needed — new ChangeStreamHealthTracker() implicitly casts to IChangeStreamHealthTracker
src/libs/mongo/SyRF.Mongo.Common.Tests/GetCachedCollectionChangeStreamTests.cs New tests use Mock<IChangeStreamHealthTracker>; existing tests keep concrete (implicit cast)
src/libs/mongo/SyRF.Mongo.Common.Tests/MongoUnitOfWorkBaseTests.cs No code change — new ChangeStreamHealthTracker() implicitly casts to IChangeStreamHealthTracker
src/libs/testing/SyRF.Testing.Common/Fixtures/MongoDbTestFixture.cs No code change — new ChangeStreamHealthTracker() implicitly casts to IChangeStreamHealthTracker
src/libs/project-management/SyRF.ProjectManagement.Core.Tests/BsonClassMapTests.cs No code change — new ChangeStreamHealthTracker() implicitly casts to IChangeStreamHealthTracker
src/libs/mongo/SyRF.Mongo.Common.Tests/SyRF.Mongo.Common.Tests.csproj Add Microsoft.Extensions.TimeProvider.Testing test dependency
src/libs/webhostconfig/SyRF.WebHostConfig.Common.Tests/SyrfConfigureServicesTests.cs OTel wiring smoke test
src/libs/webhostconfig/SyRF.WebHostConfig.Common.Tests/SyRF.WebHostConfig.Common.Tests.csproj Add OpenTelemetry.Exporter.InMemory test dependency
src/charts/syrf-common/env-mapping.yaml OTLP env var mapping
src/charts/syrf-common/templates/_env-blocks.tpl GENERATED — via npm run generate:env-blocks -- --update-values
src/charts/syrf-common/templates/_env-blocks.tpl or deployment template Add POD_NAME env var via Kubernetes downward API fieldRef: metadata.name, gated on otlp.enabled (see step 4e)
src/services/api/.chart/values.yaml UPDATED by --update-values — populates otlp.authSecretName (see Helm step in Part 3)
CLAUDE.md Add OTel metrics pipeline, change stream health tracking architecture, ChangeStreamRetryOptions, and deployment.environment resource attribute to project context

Testing

Unit tests

dotnet test src/libs/mongo/SyRF.Mongo.Common.Tests/

New test cases:

  • History preservation (verify via GetHealthStatus().Data["streams"]):
  • RecordSuccess_PreservesLastFailureAndLastError
  • RecordFailure_PreservesLastSuccess
  • Per-stream health data:
  • GetHealthStatus_IncludesPerStreamDetails_ForAllStatuses
  • Two-threshold grace period (inject FakeTimeProvider via constructor to control clock — requires Microsoft.Extensions.TimeProvider.Testing package in SyRF.Mongo.Common.Tests.csproj, see Files to Modify):
  • GetHealthStatus_DowngradesToDegraded_WhenFailureIsStale — record enough failures to exceed _maxConsecutiveFailures, advance time past _staleFailureGracePeriod but within _maxStalenessPeriod, verify Degraded.
  • GetHealthStatus_ReturnsUnhealthy_WhenFailureExceedsMaxStaleness — same setup, advance time past _maxStalenessPeriod, verify Unhealthy (stream presumed dead).
  • Stream lifecycle and termination reason (inject FakeTimeProvider):
  • MarkTerminated_Error_SetsPhaseAndReason — record failures, call MarkTerminated(key, Error), verify via GetHealthStatus().Data["streams"] that phase is "Terminated" and termination is "Error".
  • MarkTerminated_Unsubscribed_ExcludedFromStreamData — record failures, call MarkTerminated(key, Unsubscribed), verify stream is excluded from Data["streams"] (Unsubscribed streams are filtered regardless of failure count).
  • MarkActive_ClearsTerminationReasonMarkTerminated(key, Error) then MarkActive(key), verify via Data["streams"] that termination is null and phase is "Active" (fresh cursor).
  • GetHealthStatus_ClassifiesErrorTerminatedWithFailuresAsDegraded — record failures on a stream, call MarkTerminated(key, Error), verify overall status is Degraded (not Unhealthy, not Healthy).
  • GetHealthStatus_SkipsErrorTerminatedWithZeroFailures — record success only, call MarkTerminated(key, Error), verify stream is excluded from health data.
  • GetHealthStatus_SkipsUnsubscribedStreams — record failures, call MarkTerminated(key, Unsubscribed), verify stream is excluded from health data regardless of failure count.
  • RecordFailure_ResetsPhaseToActiveMarkTerminated(key, Error) then RecordFailure, verify phase returns to Active and Termination = null (outer retry resubscribed).
  • MarkActive_SetsPhaseToActiveFromTerminatedMarkTerminated(key, Error) then MarkActive, verify phase is Active (cursor created on resubscription).
  • GetHealthStatus_EscalatesErrorTerminatedToUnhealthy_WhenStale — record failures, MarkTerminated(key, Error), advance time past _maxStalenessPeriod from OutageStartedAt, verify Unhealthy (outage has persisted too long).
  • OutageStartedAt tracking (inject FakeTimeProvider):
  • RecordFailure_SetsOutageStartedAt_OnFirstFailure — call RecordFailure on a fresh stream, verify via GetHealthStatus().Data["streams"] that outageStartedAt equals the current FakeTimeProvider time.
  • RecordFailure_PreservesOutageStartedAt_OnSubsequentFailures — call RecordFailure twice with time advancing between calls, verify via Data["streams"] that outageStartedAt remains the time of the first failure (not updated by the second).
  • MarkActive_ClearsOutageStartedAt — record failures (sets outageStartedAt), then MarkActive, verify via Data["streams"] that outageStartedAt is null.
  • GetHealthStatus_UsesOutageStartedAt_ForTerminatedStaleness — record failures at T=0, advance time 20 min, record more failures (refreshes LastFailure to T+20m), MarkTerminated(key, Error), advance time to T+31m. LastFailure is only 11 min old but OutageStartedAt is 31 min old — verify Unhealthy (proves OutageStartedAt is used, not LastFailure).
  • Deferred recovery metric emission:
  • RecordSuccess_EmitsDeferredRecoveryMetric — record failures, MarkActive (sets pending flag), then RecordSuccess, verify Recoveries increments exactly once. Verify a second RecordSuccess does NOT increment again (flag cleared by first).
  • RecordSuccess_DoesNotEmitRecoveryMetric_WithoutPendingFlagRecordSuccess on a fresh stream (no prior MarkActive with failures), verify no Recoveries increment.
  • MarkActive_DoesNotEmitRecoveryMetric_AfterUnsubscribedTermination — record failures, MarkTerminated(key, Unsubscribed), then MarkActive, then RecordSuccess, verify no Recoveries metric increment (stale failures from a clean disconnect are not a genuine outage — pending flag suppressed by Unsubscribed condition).
  • RecordTerminalError_ClearsPendingRecoveryFlagMarkActive (sets pending flag), then RecordTerminalError, then RecordSuccess, verify no Recoveries increment (flag cleared by terminal error).
  • MarkTerminated_ClearsPendingRecoveryFlagMarkActive (sets pending flag), then MarkTerminated(key, Error), then new MarkActive + RecordSuccess, verify the second cycle emits recovery (flag from first cycle was cleared, second cycle sets new flag).
  • healthReason field:
  • GetHealthStatus_IncludesHealthReasonInStreamData — verify healthReason field is present in each stream's data dictionary with correct values for Active/Healthy ("healthy"), Active/Degraded ("active_failing"), and Terminated/Error/Degraded ("terminated_retrying").
  • Metric emission (using MeterListener):
  • RecordFailure_EmitsRetryFailureMetric
  • RecordTerminalError_EmitsTerminalErrorMetric_WithReason — call RecordTerminalError(key, ex, TerminalErrorReason.NonRetryable), verify TerminalErrors increments with terminal.reason = "non_retryable" tag (mapped via ToMetricTag(), not RetryFailures). Repeat with TerminalErrorReason.MaxRetries to verify both reason values propagate through the mapping
  • ReconnectFailLoop_DoesNotEmitRecoveryMetric — record failures, MarkActive (sets pending flag), RecordTerminalError (clears flag), verify no Recoveries increment (simulates reconnect-fail loop where cursor opens but fails before documents)

Test isolation: Each metric test should use a unique stream.key value (e.g., $"TestStream_{testName}") and filter in the MeterListener callback by matching the stream.key tag. This prevents cross-test interference from the global static counters. Assert only on measurements with the expected tag value.

OTel wiring test

Add a smoke test in SyrfConfigureServicesTests.cs to verify AddOpenTelemetryConfig() registers the metrics pipeline. Add OpenTelemetry.Exporter.InMemory Version="1.9.0" as a test-only dependency to SyRF.WebHostConfig.Common.Tests.csproj (must match existing OTel package versions in SyRF.WebHostConfig.Common.csproj:25-31).

  • AddOpenTelemetryConfig_RegistersAndCollectsChangeStreamMetrics — call services.AddOpenTelemetryConfig(configuration) with a test IConfiguration containing CustomSentryConfig, then call services.AddOpenTelemetry().WithMetrics(m => m.AddInMemoryExporter(exportedMetrics)) separately to attach the test exporter. This works because AddOpenTelemetry() is additive — both WithMetrics blocks configure the same underlying MeterProvider. The method returns void, so chaining is not possible. Build the service provider, increment ChangeStreamMetrics.RetryFailures, force a collect via MeterProvider.ForceFlush(), then assert:
  • exportedMetrics contains a metric named syrf.changestream.retry_failures (proves .AddMeter() subscription).
  • The exported metric's Resource contains service.name = "SyRF API" (proves .ConfigureResource() is applied at the top-level AddOpenTelemetry() scope, not just tracing).

This tests the full pipeline: .WithMetrics().AddMeter() subscription → instrument collection → resource identity → export. Unlike MeterListener (which subscribes directly to System.Diagnostics.Metrics.Meter and would pass even without .AddMeter() registration), the in-memory exporter only receives metrics from meters the OTel SDK is subscribed to. The service.name check catches drift if someone moves .ConfigureResource() back into .WithTracing(). Full end-to-end OTLP integration testing is out of scope.

DI singleton forwarding test

Add a regression test in SyrfConfigureServicesTests.cs to verify the forwarding registration resolves to a single instance:

  • HealthTrackerRegistration_InterfaceAndConcreteResolveSameInstance — register services via AddSyrfMongoServices(configuration) (the method containing the ChangeStreamHealthTracker forwarding registration — see DI registration section above), build the service provider, then assert Assert.Same(sp.GetRequiredService<ChangeStreamHealthTracker>(), sp.GetRequiredService<IChangeStreamHealthTracker>()). This proves the forwarding pattern resolves to a single instance and prevents accidental drift where someone registers IChangeStreamHealthTracker independently (creating a second singleton), splitting health state between MongoContext and the health check.

RetryWithExponentialBackoff extension tests

The onTerminalError callback is added to RetryWithExponentialBackoff (see Part 1). Add direct unit tests for the extension method itself (in existing RetryWithExponentialBackoffTests or a new test class alongside):

  • NonRetryableException_InvokesOnTerminalError_WithNonRetryableReason — configure shouldRetry to return false, subscribe, push an error, verify onTerminalError is called exactly once with (exception, TerminalErrorReason.NonRetryable), and onRetry is NOT called.
  • MaxRetriesExceeded_InvokesOnTerminalError_WithMaxRetriesReason — set maxRetries: 2, push 3 errors that pass shouldRetry, verify onRetry is called twice (attempts 1-2), then onTerminalError is called once with (exception, TerminalErrorReason.MaxRetries) on the 3rd error. Verify onRetry is NOT called for the 3rd error.

These test the extension in isolation, ensuring the callback contract is correct before MongoContext wiring tests depend on it.

Retry classifier tests

Unit tests for ChangeStreamRetryClassifier.IsRetryable() — each excluded type must have a dedicated test to lock the decision. Add to ChangeStreamRetryOptionsTests or a new ChangeStreamRetryClassifierTests class:

  • IsRetryable_ReturnsFalse_ForMongoConfigurationException — verify MongoConfigurationException is non-retryable
  • IsRetryable_ReturnsFalse_ForMongoAuthenticationException — verify MongoAuthenticationException is non-retryable
  • IsRetryable_ReturnsTrue_ForMongoConnectionException — verify transient connection errors are retried
  • IsRetryable_ReturnsTrue_ForMongoNotPrimaryException — verify replica set failovers (primary stepdown) are retried
  • IsRetryable_ReturnsTrue_ForMongoNodeIsRecoveringException — verify replica set failovers (node in recovery state) are retried
  • IsRetryable_ReturnsTrue_ForMongoInternalException — verify internal driver errors are retried (decision: transient, self-heals on retry with fresh connection — see "MongoInternalException decision" in Part 1)
  • IsRetryable_ReturnsTrue_ForTimeoutException — verify timeouts are retried
  • IsRetryable_ReturnsTrue_ForChangeStreamClosedException — verify cursor expiry is retried
  • IsRetryable_ReturnsFalse_ForNonMongoException — verify InvalidOperationException (outside Mongo hierarchy) is non-retryable

These tests serve as a decision lock — adding a new exclusion requires adding a failing test first, making the exclude-list change deliberate rather than accidental.

TerminalErrorReason mapping tests

  • ToMetricTag_MapsMaxRetriesTerminalErrorReason.MaxRetries.ToMetricTag() returns "max_retries"
  • ToMetricTag_MapsNonRetryableTerminalErrorReason.NonRetryable.ToMetricTag() returns "non_retryable"

These lock the enum-to-metric-tag mapping — a mismatch would silently break PromQL label filters in alert rules.

Helm template validation

cd src/charts/syrf-common && npm run validate:env-blocks

MongoContext lifecycle integration

The lifecycle hooks (MarkActive after WatchAsync, MarkTerminated(reason) via Do(onError:) + .Finally(), RecordFailure via onRetry, RecordTerminalError via onTerminalError, RecordSuccess via onSuccess) are wired in MongoContext's change stream pipeline. These can be tested with a mock IChangeStreamCursor<T> and a Mock<IChangeStreamHealthTracker>, avoiding the need for a real MongoDB connection.

Unit tests (in GetCachedCollectionChangeStreamTests or similar):

  • MarkActive_CalledAfterWatchAsyncSuccess — mock cursor that succeeds, verify mockTracker.Verify(t => t.MarkActive(key)) is called before any documents are emitted
  • MarkTerminated_CalledWithError_WhenInnerRetriesExhaust — mock cursor that always throws, inject new ChangeStreamRetryOptions { MaxRetries = 1, InitialDelay = TimeSpan.FromMilliseconds(1), MaxDelay = TimeSpan.FromMilliseconds(1) }, verify mockTracker.Verify(t => t.MarkTerminated(key, TerminationReason.Error))
  • MarkTerminated_CalledWithUnsubscribed_WhenRefCountDropsToZero — subscribe then dispose, verify mockTracker.Verify(t => t.MarkTerminated(key, TerminationReason.Unsubscribed))
  • RecordTerminalError_CalledOnNonRetryableException — mock cursor that throws MongoConfigurationException (excluded by ChangeStreamRetryClassifier.IsRetryable), verify mockTracker.Verify(t => t.RecordTerminalError(key, It.IsAny<Exception>(), TerminalErrorReason.NonRetryable))
  • UnsubscribedStreams_NotClassifiedForLiveness — end-to-end with real tracker: cursor with failures → unsubscribe → verify health check skips the stream
  • Constructor_ThrowsOnInvalidRetryOptions — pass new ChangeStreamRetryOptions { MaxRetries = 0 } (or InitialDelay > MaxDelay), verify ArgumentOutOfRangeException is thrown at construction time (fail-fast validation)

The mock cursor approach: create a Mock<IAsyncCursor<ChangeStreamDocument<T>>> that can be programmed to emit documents (MoveNextAsync returns true, Current returns batch), throw exceptions, or complete. MongoContext depends on IChangeStreamHealthTracker and ChangeStreamRetryOptions (both injected via constructor), so the mock tracker and fast retry options (MaxRetries = 1, InitialDelay = 1ms, MaxDelay = 1ms) are injected directly via the protected testing constructor — no factory override needed.

Complementary verification (non-unit):

  1. Code review: Confirm Do(onError:) is positioned between RetryWithExponentialBackoff and .Finally(), and terminatedByError is scoped inside Observable.Defer (one flag per re-execution, not shared across RefCount cycles)
  2. Staging smoke test: Deploy to staging, trigger a change stream reconnection, then observe metrics in Grafana. Check /health/live for overall status transition (Healthy → Degraded/Unhealthy → Healthy) — per-stream detail is in HealthCheckResult.Data but not surfaced in the HTTP response until the response writer is updated (see Future Enhancements). To inspect per-stream data during staging, use HealthCheckService programmatic access or structured logging of GetHealthStatus() results. Trigger options (in order of preference):
  3. Atlas Test Failover (managed Atlas): In the Atlas UI, navigate to the cluster → click "..." → "Test Failover". This triggers a primary stepdown safely within Atlas's managed environment and is available on M10+ tiers. Alternatively, use the Atlas Admin API: POST /api/atlas/v2/groups/{groupId}/clusters/{clusterName}/restartPrimaries
  4. replSetStepDown (self-managed or Atlas with admin access): db.adminCommand({replSetStepDown: 60}). This may be unavailable on managed Atlas clusters where admin commands are restricted
  5. Application-level cursor invalidation: Drop and recreate a watched collection in a staging-only database (requires staging DB isolation — see MongoDB Testing Strategy). This invalidates the cursor without requiring replica set admin access

Integration verification

After deploying to staging with OTLP enabled, built-in metrics should appear in Grafana Cloud's Explore view within ~60 seconds:

  • http_server_request_duration_seconds — built-in ASP.NET Core
  • process_runtime_dotnet_gc_collections_count — built-in runtime

Custom counters are event-driven — they emit only when a change stream failure, terminal error, or recovery event occurs. These series will NOT appear in Grafana until at least one event fires. To verify custom counter wiring on first deploy, induce a change stream failure (see staging smoke test above), then look for:

  • syrf_changestream_retry_failures_total — custom counter (retryable failures)
  • syrf_changestream_terminal_errors_total — custom counter (non-retryable or max-retries-exceeded)
  • syrf_changestream_recoveries_total — custom counter (document-gated recovery events)

Metric name translation: The OTel SDK emits syrf.changestream.retry_failures, syrf.changestream.terminal_errors, and syrf.changestream.recoveries (dot notation). Grafana Cloud's Prometheus backend translates these to syrf_changestream_retry_failures_total, syrf_changestream_terminal_errors_total, and syrf_changestream_recoveries_total (underscores + _total suffix for counters). If using a different backend (e.g., OTLP-native), metric names may appear in their original dot notation. Use flexible matching when building dashboards.

Label validation (first deploy): Before finalising alert rules, verify actual label keys in Grafana Cloud's Explore view. The PromQL queries assume stream_key (from counter tag stream.key), terminal_reason (from counter tag terminal.reason, values produced by TerminalErrorReason.ToMetricTag(), on terminal_errors only), service_name (from resource attribute service.name), deployment_environment (from resource attribute deployment.environment), and pod (from resource attribute k8s.pod.name, populated via the POD_NAME downward API env var added in step 4e). All five labels are populated by committed implementation steps — no runtime detection or fallback is needed. Grafana Cloud's Prometheus backend translates OTel attribute dots to underscores and lowercases — but other backends may differ. After the first staging deploy, run {__name__=~"syrf_changestream.*"} in Explore and inspect the actual label names on the returned series. Update alert PromQL if labels differ from the assumed names.


Future Enhancements

Once the metrics pipeline is established, additional metrics can be added with minimal effort:

  • PM and Quartz OTel config: Wire up AddOpenTelemetryConfig() for PM and Quartz services, expand env-mapping services list
  • Health endpoint enrichment: Update WriteHealthResponseWithVersion to include change-stream per-stream data in the /health/live HTTP response. Requires error message redaction before HTTP exposure
  • Change stream document throughput: Counter per collection per event type
  • MassTransit message processing: Consumer rate and latency
  • MongoDB query duration: Histogram per collection per operation
  • Business metrics: Screening rate, annotation rate, export throughput
  • Cursor-level recovery metric: The recoveries counter is document-gated (deferred to RecordSuccess) to avoid inflating counts during reconnect-fail loops. This means quiet collections that reconnect without document flow don't emit recovery metrics until data arrives. If cursor-level recovery visibility is needed for quiet collections, add a separate syrf.changestream.cursor_recoveries counter emitted directly by MarkActive — this provides a complete picture at the cost of noisy counts during reconnect-fail loops. The two metrics together distinguish confirmed recoveries (document-gated) from cursor reopens (cursor-gated)