Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/paperscout/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,11 @@ class FailureCategory(str, enum.Enum):

class ConfigurationError(Exception):
"""Permanent misconfiguration (credentials, required integration settings)."""


class IndexRefreshError(Exception):
"""Transient index download failure (retry-eligible)."""

def __init__(self, category: FailureCategory, message: str = "") -> None:
super().__init__(message)
self.category = category
8 changes: 8 additions & 0 deletions src/paperscout/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ class CycleStatus(str, Enum):
FAILED = "failed"


@dataclass(frozen=True, slots=True)
class IndexRefreshResult:
"""Outcome of ``WG21Index.refresh()`` / ``fetch()``."""

papers: dict[str, Paper]
stale: bool = False


@dataclass(frozen=True, slots=True)
class CycleResult:
"""Discriminated probe cycle result (success vs empty vs failed)."""
Expand Down
59 changes: 39 additions & 20 deletions src/paperscout/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

from .concurrency import run_blocking_io
from .config import Settings, settings
from .errors import ConfigurationError, FailureCategory
from .models import CycleResult, CycleStatus, Paper, PerUserMatches, ProbeHit
from .errors import ConfigurationError, FailureCategory, IndexRefreshError
from .models import CycleResult, CycleStatus, IndexRefreshResult, Paper, PerUserMatches, ProbeHit
from .protocols import (
SOURCE_ISO_PROBE,
SOURCE_OPEN_STD,
Expand Down Expand Up @@ -187,6 +187,7 @@ def __init__(
self._last_cycle_status: CycleStatus | None = None
self._last_cycle_error: str | None = None
self._last_ops_alert: float | None = None
self._index_stale = False
self._health_lock = threading.Lock()
self._health_snapshot: SchedulerSnapshot | None = None

Expand Down Expand Up @@ -233,12 +234,24 @@ def _log_index_diff(self, diff: DiffResult) -> None:
async def _poll_sources(self, *, baseline: bool = False) -> tuple[DiffResult, list[ProbeHit]]:
diff = DiffResult(new_papers=[], updated_papers=[])
probe_hits: list[ProbeHit] = []
self._index_stale = False

for source in self.sources:
if not self._source_enabled(source.source_id):
continue

current = await source.fetch()
raw = await source.fetch()
if source.source_id == SOURCE_WG21_INDEX:
refresh_result = cast(IndexRefreshResult, raw)
current = refresh_result.papers
if refresh_result.stale:
log.warning(
"INDEX-STALE papers=%d (poll using persisted cache)",
len(current),
)
self._index_stale = True
else:
current = raw
if baseline:
self._snapshots[source.source_id] = current
if source.source_id == SOURCE_ISO_PROBE:
Expand All @@ -253,8 +266,7 @@ async def _poll_sources(self, *, baseline: bool = False) -> tuple[DiffResult, li

if source.source_id == SOURCE_WG21_INDEX:
diff = result
papers = cast(dict[str, Paper], current)
log.info("INDEX-LOAD papers=%d", len(papers))
log.info("INDEX-LOAD papers=%d", len(current))
self._log_index_diff(diff)
elif source.source_id == SOURCE_ISO_PROBE:
cycle = cast(CycleResult, current)
Expand Down Expand Up @@ -293,6 +305,15 @@ def _mark_poll_successful_if_probe_ok(self) -> None:
if self._last_cycle_status is not CycleStatus.FAILED:
self._last_successful_poll = time.time()

def _advance_staleness_clock_if_ok(self) -> None:
"""Advance staleness clock when index is fresh and probe cycle did not fail."""
if self._index_stale:
return
if self.cfg.enable_iso_probe:
self._mark_poll_successful_if_probe_ok()
else:
self._last_successful_poll = time.time()

def _publish_health_snapshot(self) -> None:
"""Publish immutable snapshot for cross-thread health reads (event loop only)."""
lsp = self._last_successful_poll
Expand Down Expand Up @@ -371,11 +392,8 @@ async def poll_once(self) -> PollResult:
if not self._seeded:
seed_result = await self.seed()
if not seed_result.had_prior_state:
if self.cfg.enable_iso_probe:
# Stats already recorded in seed() after run_cycle.
self._mark_poll_successful_if_probe_ok()
else:
self._last_successful_poll = time.time()
self._advance_staleness_clock_if_ok()
if not self.cfg.enable_iso_probe:
self._record_probe_cycle_completion()
self._publish_health_snapshot()
return PollResult(
Expand Down Expand Up @@ -415,11 +433,8 @@ async def poll_once(self) -> PollResult:
)
if self.notify_callback:
self.notify_callback(result)
if self.cfg.enable_iso_probe:
# Stats already recorded in seed() after run_cycle.
self._mark_poll_successful_if_probe_ok()
else:
self._last_successful_poll = time.time()
self._advance_staleness_clock_if_ok()
if not self.cfg.enable_iso_probe:
self._record_probe_cycle_completion()
self._publish_health_snapshot()
return result
Expand Down Expand Up @@ -512,11 +527,8 @@ async def poll_once(self) -> PollResult:
len(dp_transitions),
len(per_user_matches),
)
if self.cfg.enable_iso_probe:
# Stats already recorded above after run_cycle.
self._mark_poll_successful_if_probe_ok()
else:
self._last_successful_poll = time.time()
self._advance_staleness_clock_if_ok()
if not self.cfg.enable_iso_probe:
self._record_probe_cycle_completion()
self._publish_health_snapshot()
return result
Expand Down Expand Up @@ -581,6 +593,13 @@ async def run_forever(self, shutdown_event: asyncio.Event | None = None) -> None
exc,
)
return
except IndexRefreshError as exc:
log.error(
"POLL-ERROR failure_category=%s poll=%d %s",
exc.category.value,
self._poll_count,
exc,
)
except httpx.TimeoutException as exc:
log.error(
"POLL-ERROR failure_category=%s poll=%d %s",
Expand Down
110 changes: 77 additions & 33 deletions src/paperscout/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import httpx

from .config import Settings, settings
from .errors import FailureCategory
from .models import CycleResult, CycleStatus, Paper, ProbeHit, Tier
from .errors import ConfigurationError, FailureCategory, IndexRefreshError
from .models import CycleResult, CycleStatus, IndexRefreshResult, Paper, ProbeHit, Tier
from .protocols import SOURCE_ISO_PROBE, SOURCE_OPEN_STD, SOURCE_WG21_INDEX
from .storage import PaperCache, ProbeState, UserWatchlist

Expand Down Expand Up @@ -54,34 +54,51 @@ def __init__(self, pool, cfg: Settings | None = None):
self._max_p: int = 0 # absolute highest P-number
self._sorted_p_nums: list[int] = [] # sorted unique P-numbers, for gap analysis

async def refresh(self) -> dict[str, Paper]:
async def refresh(self) -> IndexRefreshResult:
"""Load index from cache or network; populate ``self.papers``."""
cached = self._cache.read_if_fresh()
if cached is not None:
log.info("Loaded %d entries from cache", len(cached))
self.papers = self._parse_and_index(cached)
return self.papers

raw = await self._download()
if raw is not None:
self._cache.write(raw)
log.info("Downloaded and cached %d entries", len(raw))
self.papers = self._parse_and_index(raw)
return self.papers

stale = self._cache.read()
if stale is not None:
if cached:
papers, max_rev, max_p = self._parse_raw(cached)
if papers:
self._commit_index(papers, max_rev, max_p)
log.info("Loaded %d entries from cache", len(papers))
return IndexRefreshResult(self.papers, stale=False)

raw, category = await self._download()
if raw:
papers, max_rev, max_p = self._parse_raw(raw)
if papers:
self._cache.write(raw)
self._commit_index(papers, max_rev, max_p)
log.info("Downloaded and cached %d entries", len(papers))
return IndexRefreshResult(self.papers, stale=False)
log.warning(
"INDEX-STALE-FALLBACK entries=%d (download failed; using persisted cache)",
len(stale),
"INDEX-FETCH failure_category=%s url=%s payload produced zero parseable papers",
FailureCategory.CONFIGURATION.value,
WG21_INDEX_URL,
)
self.papers = self._parse_and_index(stale)
return self.papers
category = FailureCategory.CONFIGURATION

stale = self._cache.read()
if stale:
papers, max_rev, max_p = self._parse_raw(stale)
if papers:
self._commit_index(papers, max_rev, max_p)
log.warning(
"INDEX-STALE-FALLBACK entries=%d (download failed; using persisted cache)",
len(papers),
)
return IndexRefreshResult(self.papers, stale=True)

log.error("No index data available")
return self.papers
self._raise_on_download_failure(category or FailureCategory.CONFIGURATION)

async def _download(self) -> dict | None:
def _raise_on_download_failure(self, category: FailureCategory) -> None:
if category is FailureCategory.CONFIGURATION:
raise ConfigurationError("No index data available")
raise IndexRefreshError(category=category)

async def _download(self) -> tuple[dict | None, FailureCategory | None]:
timeout = httpx.Timeout(self._cfg.wg21_index_timeout_s)
try:
async with httpx.AsyncClient(
Expand All @@ -93,17 +110,28 @@ async def _download(self) -> dict | None:
resp.raise_for_status()
data = resp.json()
if isinstance(data, dict):
return data
log.warning("Index response is not a dict")
return None
if not data:
log.warning(
"INDEX-FETCH failure_category=%s url=%s empty index payload",
FailureCategory.CONFIGURATION.value,
WG21_INDEX_URL,
)
return None, FailureCategory.CONFIGURATION
return data, None
log.warning(
"INDEX-FETCH failure_category=%s url=%s response is not a dict",
FailureCategory.CONFIGURATION.value,
WG21_INDEX_URL,
)
return None, FailureCategory.CONFIGURATION
Comment thread
coderabbitai[bot] marked this conversation as resolved.
except httpx.TimeoutException as exc:
log.warning(
"INDEX-FETCH failure_category=%s url=%s %s",
FailureCategory.TIMEOUT.value,
WG21_INDEX_URL,
exc,
)
return None
return None, FailureCategory.TIMEOUT
except httpx.HTTPStatusError as exc:
cat = (
FailureCategory.RATE_LIMIT
Expand All @@ -116,17 +144,25 @@ async def _download(self) -> dict | None:
WG21_INDEX_URL,
exc.response.status_code,
)
return None
except (httpx.HTTPError, ValueError) as exc:
return None, cat
except httpx.HTTPError as exc:
log.error(
"INDEX-FETCH failure_category=%s url=%s %s",
FailureCategory.NETWORK.value,
WG21_INDEX_URL,
exc,
)
return None
return None, FailureCategory.NETWORK
except ValueError as exc:
log.error(
"INDEX-FETCH failure_category=%s url=%s %s",
FailureCategory.CONFIGURATION.value,
WG21_INDEX_URL,
exc,
)
return None, FailureCategory.CONFIGURATION

def _parse_and_index(self, raw: dict) -> dict[str, Paper]:
def _parse_raw(self, raw: dict) -> tuple[dict[str, Paper], dict[int, int], int]:
papers: dict[str, Paper] = {}
max_rev: dict[int, int] = {}
max_p = 0
Expand All @@ -141,9 +177,17 @@ def _parse_and_index(self, raw: dict) -> dict[str, Paper]:
prev = max_rev.get(paper.number, -1)
if paper.revision > prev:
max_rev[paper.number] = paper.revision
return papers, max_rev, max_p

def _commit_index(self, papers: dict[str, Paper], max_rev: dict[int, int], max_p: int) -> None:
self.papers = papers
self._max_rev = max_rev
self._max_p = max_p
self._sorted_p_nums = sorted(max_rev.keys())

def _parse_and_index(self, raw: dict) -> dict[str, Paper]:
papers, max_rev, max_p = self._parse_raw(raw)
self._commit_index(papers, max_rev, max_p)
return papers

def highest_p_number(self) -> int:
Expand Down Expand Up @@ -188,8 +232,8 @@ def get_papers_snapshot(self) -> Mapping[str, Paper]:
"""Read-only mapping copy of ``papers`` (not the live dict object)."""
return MappingProxyType(dict(self.papers))

async def fetch(self) -> dict[str, Paper]:
"""DataSource: load index and return the current paper map."""
async def fetch(self) -> IndexRefreshResult:
"""DataSource: load index and return papers plus stale signal."""
return await self.refresh()

def diff(
Expand Down
4 changes: 2 additions & 2 deletions tests/test_callback_protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock

from paperscout.models import CycleResult, CycleStatus
from paperscout.models import CycleResult, CycleStatus, IndexRefreshResult
from paperscout.monitor import DiffResult, PollResult, Scheduler
from paperscout.protocols import SOURCE_ISO_PROBE, SOURCE_WG21_INDEX, NotifyCallback, OpsAlertFn
from paperscout.sources import ISOProber, WG21Index
Expand Down Expand Up @@ -45,7 +45,7 @@ def on_ops_alert(message: str) -> None:

wg21 = MagicMock(spec=WG21Index)
wg21.source_id = SOURCE_WG21_INDEX
wg21.fetch = AsyncMock(return_value={})
wg21.fetch = AsyncMock(return_value=IndexRefreshResult({}, stale=False))
wg21.diff = MagicMock(return_value=DiffResult([], []))

iso = MagicMock(spec=ISOProber)
Expand Down
4 changes: 2 additions & 2 deletions tests/test_datasource_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any
from unittest.mock import AsyncMock, MagicMock

from paperscout.models import CycleResult, CycleStatus, Paper
from paperscout.models import CycleResult, CycleStatus, IndexRefreshResult, Paper
from paperscout.monitor import DiffResult, Scheduler
from paperscout.protocols import SOURCE_ISO_PROBE, SOURCE_WG21_INDEX, DataSource
from paperscout.sources import ISOProber, WG21Index
Expand Down Expand Up @@ -67,7 +67,7 @@ async def test_scheduler_polls_custom_datasource(self, fake_pool):
)
wg21 = MagicMock(spec=WG21Index)
wg21.source_id = SOURCE_WG21_INDEX
wg21.fetch = AsyncMock(return_value={})
wg21.fetch = AsyncMock(return_value=IndexRefreshResult({}, stale=False))
wg21.diff = MagicMock(return_value=DiffResult([], []))

iso = MagicMock(spec=ISOProber)
Expand Down
Loading