-
Notifications
You must be signed in to change notification settings - Fork 728
feat: dockerhub-sync worker for repo_docker pull counts (CM-1213) #4163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b23c298
d200d43
09d2a00
6c0d8be
eab2645
12ed78f
cbf3f6c
a4528db
cf32149
793f406
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| -- dockerhub-sync (CM-1213) | ||
| -- | ||
| -- Adds discovery/refresh bookkeeping for the dockerhub-sync worker | ||
| -- (services/apps/packages_worker/src/dockerhub) and a daily snapshot table | ||
| -- for Docker Hub lifetime pull counts. | ||
|
|
||
| -- Last time dockerhub-sync probed this repo for a published Docker image | ||
| -- (Dockerfile detection + Hub candidate lookup). NULL = never checked. | ||
| -- Separate from repos.last_synced_at because discovery cadence (weeks) | ||
| -- differs from light-metadata refresh cadence (daily). | ||
| ALTER TABLE repos | ||
| ADD COLUMN IF NOT EXISTS docker_checked_at timestamptz; | ||
|
|
||
| -- Partial index for the discovery backlog query: pages repos that have never | ||
| -- been probed for a Docker image. Once docker_checked_at is set the row drops | ||
| -- out of the index, so this stays small even as the repos table grows. | ||
| CREATE INDEX IF NOT EXISTS repos_docker_pending_idx ON repos (id) | ||
| WHERE | ||
| host = 'github' AND docker_checked_at IS NULL; | ||
|
|
||
| -- Supports the refresh query (WHERE last_synced_at < NOW() - interval). | ||
| CREATE INDEX IF NOT EXISTS repo_docker_stale_idx ON repo_docker (last_synced_at); | ||
|
|
||
| -- ============================================================ | ||
| -- REPO DOCKER PULLS DAILY | ||
| -- One row per image per day storing the *lifetime* pull_count as returned | ||
| -- by hub.docker.com/v2/repositories/<image>. Docker Hub does not expose | ||
| -- per-day download counts, so daily deltas are derived at query time: | ||
| -- pulls_total - LAG(pulls_total) OVER (PARTITION BY image_name ORDER BY date) | ||
| -- Keyed by image_name (matches repo_docker UNIQUE) so rows survive a | ||
| -- repo_docker re-discovery without an FK cascade. | ||
| -- | ||
| -- Partitioned monthly via pg_partman (extension + schema already created in | ||
| -- V1780231200__npm_worker.sql). | ||
| -- ============================================================ | ||
| CREATE TABLE IF NOT EXISTS repo_docker_pulls_daily ( | ||
| image_name text NOT NULL, | ||
| date date NOT NULL, | ||
| pulls_total bigint NOT NULL, | ||
| PRIMARY KEY (image_name, date) | ||
| ) | ||
| PARTITION BY RANGE (date); | ||
|
|
||
| -- Guard so this migration is idempotent against environments where the | ||
| -- table was already registered manually (e.g. local dev that applied the | ||
| -- earlier in-place schema edit). | ||
| DO $$ | ||
| BEGIN | ||
| IF NOT EXISTS ( | ||
| SELECT 1 FROM partman.part_config | ||
| WHERE parent_table = 'public.repo_docker_pulls_daily' | ||
| ) THEN | ||
| PERFORM partman.create_parent( | ||
| p_parent_table => 'public.repo_docker_pulls_daily', | ||
| p_control => 'date', | ||
| p_interval => '1 month', | ||
| p_premake => 3 | ||
| ); | ||
| END IF; | ||
| END | ||
| $$; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| version: '3.1' | ||
|
|
||
| x-env-args: &env-args | ||
| DOCKER_BUILDKIT: 1 | ||
| NODE_ENV: docker | ||
| SERVICE: dockerhub-sync | ||
| SHELL: /bin/sh | ||
| SUPPRESS_NO_CONFIG_WARNING: 'true' | ||
| DOCKERHUB_API_BASE_URL: 'https://hub.docker.com/v2' | ||
| DOCKERHUB_BATCH_SIZE: '100' | ||
| DOCKERHUB_REFRESH_INTERVAL_HOURS: '24' | ||
| DOCKERHUB_DISCOVERY_INTERVAL_DAYS: '14' | ||
| DOCKERHUB_IDLE_SLEEP_SEC: '60' | ||
|
|
||
| services: | ||
| dockerhub-sync: | ||
| build: | ||
| context: ../../ | ||
| dockerfile: ./scripts/services/docker/Dockerfile.packages-worker | ||
| command: 'pnpm run start:dockerhub-sync' | ||
| working_dir: /usr/crowd/app/services/apps/packages_worker | ||
| env_file: | ||
| - ../../backend/.env.dist.local | ||
| - ../../backend/.env.dist.composed | ||
| - ../../backend/.env.override.local | ||
| - ../../backend/.env.override.composed | ||
| environment: | ||
| <<: *env-args | ||
| restart: always | ||
| networks: | ||
| - crowd-bridge | ||
|
|
||
| dockerhub-sync-dev: | ||
| build: | ||
| context: ../../ | ||
| dockerfile: ./scripts/services/docker/Dockerfile.packages-worker | ||
| command: 'pnpm run dev:dockerhub-sync' | ||
| working_dir: /usr/crowd/app/services/apps/packages_worker | ||
| # user: '${USER_ID}:${GROUP_ID}' | ||
| env_file: | ||
| - ../../backend/.env.dist.local | ||
| - ../../backend/.env.dist.composed | ||
| - ../../backend/.env.override.local | ||
| - ../../backend/.env.override.composed | ||
| environment: | ||
| <<: *env-args | ||
| hostname: dockerhub-sync | ||
| networks: | ||
| - crowd-bridge | ||
| volumes: | ||
| - ../../services/libs/audit-logs/src:/usr/crowd/app/services/libs/audit-logs/src | ||
| - ../../services/libs/common/src:/usr/crowd/app/services/libs/common/src | ||
| - ../../services/libs/common_services/src:/usr/crowd/app/services/libs/common_services/src | ||
| - ../../services/libs/data-access-layer/src:/usr/crowd/app/services/libs/data-access-layer/src | ||
| - ../../services/libs/database/src:/usr/crowd/app/services/libs/database/src | ||
| - ../../services/libs/integrations/src:/usr/crowd/app/services/libs/integrations/src | ||
| - ../../services/libs/logging/src:/usr/crowd/app/services/libs/logging/src | ||
| - ../../services/libs/nango/src:/usr/crowd/app/services/libs/nango/src | ||
| - ../../services/libs/opensearch/src:/usr/crowd/app/services/libs/opensearch/src | ||
| - ../../services/libs/queue/src:/usr/crowd/app/services/libs/queue/src | ||
| - ../../services/libs/redis/src:/usr/crowd/app/services/libs/redis/src | ||
| - ../../services/libs/snowflake/src:/usr/crowd/app/services/libs/snowflake/src | ||
| - ../../services/libs/telemetry/src:/usr/crowd/app/services/libs/telemetry/src | ||
| - ../../services/libs/temporal/src:/usr/crowd/app/services/libs/temporal/src | ||
| - ../../services/libs/types/src:/usr/crowd/app/services/libs/types/src | ||
| - ../../services/apps/packages_worker/src:/usr/crowd/app/services/apps/packages_worker/src | ||
|
|
||
| networks: | ||
| crowd-bridge: | ||
| external: true |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| import { getServiceLogger } from '@crowd/logging' | ||
|
|
||
| import { getDockerhubConfig, getGithubAppConfig } from '../config' | ||
| import { getPackagesDb } from '../db' | ||
| import { runDockerhubLoop } from '../dockerhub' | ||
| import { fetchRateLimitDiagnostics, resolveInstallations } from '../enricher/githubAppAuth' | ||
|
|
||
| const log = getServiceLogger() | ||
|
|
||
| let shuttingDown = false | ||
|
|
||
| const shutdown = async () => { | ||
| if (shuttingDown) return | ||
| shuttingDown = true | ||
| log.info('Shutting down dockerhub-sync...') | ||
| } | ||
|
|
||
| process.on('SIGINT', shutdown) | ||
| process.on('SIGTERM', shutdown) | ||
|
|
||
| const main = async () => { | ||
| log.info('dockerhub-sync starting...') | ||
|
|
||
| const config = getDockerhubConfig() | ||
| const appConfig = getGithubAppConfig() | ||
|
|
||
| const installationIds = await resolveInstallations(appConfig) | ||
|
|
||
| if (installationIds.length === 0) { | ||
| log.error('No GitHub App installations found — cannot build token pool') | ||
| process.exit(1) | ||
| } | ||
|
|
||
| await fetchRateLimitDiagnostics(appConfig.appId, appConfig.privateKeyPem, installationIds) | ||
|
|
||
| const qx = await getPackagesDb() | ||
| await qx.selectOne('SELECT 1') | ||
| log.info('Connected to packages-db.') | ||
|
|
||
| log.info( | ||
| { | ||
| installations: installationIds.length, | ||
| batchSize: config.batchSize, | ||
| hubBaseUrl: config.hubBaseUrl, | ||
| }, | ||
| 'Starting dockerhub loop', | ||
| ) | ||
|
|
||
| await runDockerhubLoop(qx, installationIds, appConfig, config, () => shuttingDown) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ignored probe installation filterHigh Severity Startup calls Reviewed by Cursor Bugbot for commit 793f406. Configure here. |
||
|
|
||
| log.info('dockerhub-sync stopped.') | ||
| process.exit(0) | ||
| } | ||
|
|
||
| main().catch((err) => { | ||
| log.error({ err }, 'dockerhub-sync fatal error') | ||
| process.exit(1) | ||
| }) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| import { describe, expect, it } from 'vitest' | ||
|
|
||
| import { buildCandidates } from '../candidates' | ||
|
|
||
| describe('buildCandidates', () => { | ||
| it('lowercases owner and repo into a single <owner>/<repo> candidate', () => { | ||
| expect(buildCandidates('Grafana', 'Grafana')).toEqual(['grafana/grafana']) | ||
| }) | ||
|
|
||
| it('passes through already-valid lowercase slugs', () => { | ||
| expect(buildCandidates('prometheus', 'node_exporter')).toEqual(['prometheus/node_exporter']) | ||
| }) | ||
|
|
||
| it('rejects components with characters Docker Hub does not accept', () => { | ||
| // GitHub allows '+' in org names via renames; Hub would 400. | ||
| expect(buildCandidates('foo+bar', 'baz')).toEqual([]) | ||
| }) | ||
|
|
||
| it('rejects components that start or end with a separator', () => { | ||
| expect(buildCandidates('-leading', 'repo')).toEqual([]) | ||
| expect(buildCandidates('owner', 'trailing.')).toEqual([]) | ||
| }) | ||
|
|
||
| it('does not emit a library/<repo> candidate', () => { | ||
| // Guard against accidental reintroduction — see comment in candidates.ts. | ||
| expect(buildCandidates('nodejs', 'node')).toEqual(['nodejs/node']) | ||
| }) | ||
| }) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
| import { afterEach, describe, expect, it, vi } from 'vitest' | ||
|
|
||
| import { fetchDockerhub } from '../fetchDockerhub' | ||
| import { FetchError } from '../types' | ||
|
|
||
| const BASE = 'https://hub.docker.com/v2' | ||
|
|
||
| function mockFetch(status: number, body: unknown, headers: Record<string, string> = {}) { | ||
| return vi.spyOn(globalThis, 'fetch').mockResolvedValue( | ||
| new Response(typeof body === 'string' ? body : JSON.stringify(body), { | ||
| status, | ||
| headers: { 'Content-Type': 'application/json', ...headers }, | ||
| }), | ||
| ) | ||
| } | ||
|
|
||
| afterEach(() => { | ||
| vi.restoreAllMocks() | ||
| }) | ||
|
|
||
| describe('fetchDockerhub', () => { | ||
| it('returns pull/star counts on 200', async () => { | ||
| mockFetch(200, { | ||
| name: 'grafana', | ||
| namespace: 'grafana', | ||
| pull_count: 12345, | ||
| star_count: 678, | ||
| last_updated: '2026-05-01T00:00:00Z', | ||
| }) | ||
|
|
||
| const r = await fetchDockerhub(BASE, 'grafana/grafana') | ||
| expect(r).toEqual({ | ||
| imageName: 'grafana/grafana', | ||
| pulls: 12345, | ||
| stars: 678, | ||
| lastUpdated: '2026-05-01T00:00:00Z', | ||
| }) | ||
| }) | ||
|
|
||
| it('appends trailing slash to the request URL', async () => { | ||
| const spy = mockFetch(200, { pull_count: 1, star_count: 0 }) | ||
| await fetchDockerhub(BASE, 'a/b') | ||
| expect(spy).toHaveBeenCalledWith(`${BASE}/repositories/a/b/`, expect.anything()) | ||
| }) | ||
|
|
||
| it('normalizes a trailing slash on the configured base URL', async () => { | ||
| const spy = mockFetch(200, { pull_count: 1, star_count: 0 }) | ||
| await fetchDockerhub(`${BASE}/`, 'a/b') | ||
| expect(spy).toHaveBeenCalledWith(`${BASE}/repositories/a/b/`, expect.anything()) | ||
| }) | ||
|
|
||
| it('classifies 404 as NOT_FOUND', async () => { | ||
| mockFetch(404, { message: 'object not found' }) | ||
| await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'NOT_FOUND' }) | ||
| }) | ||
|
|
||
| it('classifies 400 as NOT_FOUND (Hub 400s on malformed slugs)', async () => { | ||
| mockFetch(400, { message: 'bad request' }) | ||
| await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'NOT_FOUND' }) | ||
| }) | ||
|
|
||
| it('classifies 429 as RATE_LIMIT with resetAt from header', async () => { | ||
| const resetSec = Math.floor(Date.now() / 1000) + 120 | ||
| mockFetch(429, { message: 'too many' }, { 'x-ratelimit-reset': String(resetSec) }) | ||
|
|
||
| expect.assertions(3) | ||
| try { | ||
| await fetchDockerhub(BASE, 'a/b') | ||
| } catch (err) { | ||
| expect(err).toBeInstanceOf(FetchError) | ||
| const fe = err as FetchError | ||
| expect(fe.kind).toBe('RATE_LIMIT') | ||
| expect(fe.resetAt).toBeGreaterThan(Date.now()) | ||
| } | ||
| }) | ||
|
|
||
| it('does NOT discard a 200 response when x-ratelimit-remaining is 0', async () => { | ||
| // remaining=0 means this request consumed the last slot; the response itself | ||
| // is valid. The next call will 429 and park then. | ||
| mockFetch(200, { pull_count: 1, star_count: 0 }, { 'x-ratelimit-remaining': '0' }) | ||
| const r = await fetchDockerhub(BASE, 'a/b') | ||
| expect(r.pulls).toBe(1) | ||
| }) | ||
|
|
||
| it('classifies 401/403 as AUTH so misconfig surfaces instead of looking like a miss', async () => { | ||
| mockFetch(401, { message: 'unauthorized' }) | ||
| await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'AUTH' }) | ||
| mockFetch(403, { message: 'forbidden' }) | ||
| await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'AUTH' }) | ||
| }) | ||
|
|
||
| it('classifies 5xx as TRANSIENT', async () => { | ||
| mockFetch(503, 'Service Unavailable') | ||
| await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'TRANSIENT' }) | ||
| }) | ||
|
|
||
| it('classifies network failure as TRANSIENT', async () => { | ||
| vi.spyOn(globalThis, 'fetch').mockRejectedValue(new Error('ECONNRESET')) | ||
| await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'TRANSIENT' }) | ||
| }) | ||
|
|
||
| it('classifies non-JSON 200 body as MALFORMED', async () => { | ||
| mockFetch(200, '<html>not json</html>') | ||
| await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'MALFORMED' }) | ||
| }) | ||
|
|
||
| it('classifies missing pull_count as MALFORMED', async () => { | ||
| mockFetch(200, { name: 'x', star_count: 0 }) | ||
| await expect(fetchDockerhub(BASE, 'a/b')).rejects.toMatchObject({ kind: 'MALFORMED' }) | ||
| }) | ||
| }) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| // Docker Hub repository slugs are lowercase and limited to [a-z0-9._-]. | ||
| // GitHub allows uppercase and a few characters Hub rejects, so we lowercase | ||
| // and validate before probing — anything that fails the regex would 400 on | ||
| // Hub anyway and isn't worth an HTTP round-trip. | ||
| const HUB_COMPONENT = /^[a-z0-9](?:[a-z0-9._-]*[a-z0-9])?$/ | ||
|
|
||
| export function buildCandidates(owner: string, name: string): string[] { | ||
| const ns = owner.toLowerCase() | ||
| const repo = name.toLowerCase() | ||
|
|
||
| if (!HUB_COMPONENT.test(ns) || !HUB_COMPONENT.test(repo)) { | ||
| return [] | ||
| } | ||
|
|
||
| // v1 deliberately omits `library/<repo>`: a random github.com/foo/node with a | ||
| // dev Dockerfile would false-positive onto the official library/node image. | ||
| // Official images (~150) to be seeded via allowlist in a follow-up. | ||
| return [`${ns}/${repo}`] | ||
| } |


Uh oh!
There was an error while loading. Please reload this page.