je_web_runner.api.infra

Façade: driver pin / k8s runner / pipeline / synthetic monitoring / lock / coverage map / impact / diff shard.

class je_web_runner.api.infra.CoverageMap(routes_by_file: 'Dict[str, Set[str]]'=<factory>, files_by_route: 'Dict[str, Set[str]]'=<factory>)

Bases: object

all_routes() List[str]
files_by_route: Dict[str, Set[str]]
files_for(route: str) List[str]
routes_by_file: Dict[str, Set[str]]
routes_for(file_path: str) List[str]
uncovered(declared_routes: Iterable[str]) List[str]
exception je_web_runner.api.infra.CoverageMapError

Bases: WebRunnerException

Raised on invalid input or unreadable action JSON.

exception je_web_runner.api.infra.DiffShardError

Bases: WebRunnerException

Raised when git is unavailable or returns invalid output.

exception je_web_runner.api.infra.DriverPinError

Bases: WebRunnerException

Raised when a pin file is invalid or download verification fails.

exception je_web_runner.api.infra.FanOutError

Bases: WebRunnerException

Raised when arguments are invalid or all tasks fail under fail_fast=True.

class je_web_runner.api.infra.FanOutResult(outcomes: 'List[_TaskOutcome]' = <factory>)

Bases: object

property failures: List[_TaskOutcome]
outcomes: List[_TaskOutcome]
raise_for_failures() None
property succeeded: bool
to_dict() Dict[str, Any]
exception je_web_runner.api.infra.ImpactAnalysisError

Bases: WebRunnerException

Raised when an action JSON file is malformed.

class je_web_runner.api.infra.ImpactIndex(by_locator: Dict[str, ~typing.Set[str]]=<factory>, by_url: Dict[str, ~typing.Set[str]]=<factory>, by_template: Dict[str, ~typing.Set[str]]=<factory>, by_command: Dict[str, ~typing.Set[str]]=<factory>)

Bases: object

Reverse index {kind: {token: {file_paths}}}.

by_command: Dict[str, Set[str]]
by_locator: Dict[str, Set[str]]
by_template: Dict[str, Set[str]]
by_url: Dict[str, Set[str]]
files_for_command(command: str) List[str]
files_for_locator(name: str) List[str]
files_for_template(name: str) List[str]
files_for_url(fragment: str) List[str]
exception je_web_runner.api.infra.K8sRunnerError

Bases: WebRunnerException

Raised when a config produces an invalid manifest.

class je_web_runner.api.infra.LockEntry(name: str, version: str, kind: str, extras: Dict[str, ~typing.Any]=<factory>)

Bases: object

A pinned dependency layer.

extras: Dict[str, Any]
kind: str
name: str
version: str
class je_web_runner.api.infra.NameFilter(include: List[Pattern] = <factory>, exclude: List[Pattern] = <factory>)

Bases: object

Compiled include / exclude regex sets.

exclude: List[Pattern]
include: List[Pattern]
matches(path: str | Path) bool
exception je_web_runner.api.infra.NameFilterError

Bases: WebRunnerException

Raised when a regex pattern can’t be compiled.

class je_web_runner.api.infra.PinnedDriver(name: 'str', version: 'str', url: 'str', archive_format: 'str', binary_inside: 'str', platforms: 'List[str]' = <factory>, cache_subdir: 'Optional[str]' = None)

Bases: object

archive_format: str
binary_inside: str
cache_subdir: str | None = None
matches_current_platform() bool
name: str
platforms: List[str]
url: str
version: str
class je_web_runner.api.infra.Pipeline(stages: 'List[PipelineStage]' = <factory>)

Bases: object

stages: List[PipelineStage]
to_dict() Dict[str, Any]
exception je_web_runner.api.infra.PipelineError

Bases: WebRunnerException

Raised when pipeline definition or run input is invalid.

class je_web_runner.api.infra.PipelineResult(stage_name: 'str', status: 'str', file_results: 'List[Dict[str, Any]]'=<factory>, error: 'Optional[str]' = None)

Bases: object

error: str | None = None
file_results: List[Dict[str, Any]]
stage_name: str
status: str
class je_web_runner.api.infra.PipelineStage(name: 'str', files: 'List[str]', required_status: 'List[str]' = <factory>, continue_on_failure: 'bool' = False)

Bases: object

continue_on_failure: bool = False
files: List[str]
name: str
required_status: List[str]
class je_web_runner.api.infra.ShardJobConfig(name_prefix: str, image: str, total_shards: int, actions_dir: str, namespace: str = 'default', command: Sequence[str] = <factory>, env: Dict[str, str]=<factory>, resources: Dict[str, ~typing.Dict[str, str]]=<factory>, backoff_limit: int = 1, parallelism: int = 1, completions: int = 1, labels: Dict[str, str]=<factory>, extra_args: Sequence[str] = <factory>)

Bases: object

Inputs for a single batch of shard Jobs.

actions_dir: str
backoff_limit: int = 1
command: Sequence[str]
completions: int = 1
env: Dict[str, str]
extra_args: Sequence[str]
image: str
labels: Dict[str, str]
name_prefix: str
namespace: str = 'default'
parallelism: int = 1
resources: Dict[str, Dict[str, str]]
total_shards: int
class je_web_runner.api.infra.SyntheticMonitor(alert_sink: ~typing.Callable[[~typing.Dict[str, ~typing.Any]], None], clock: ~typing.Callable[[], float] = <built-in function monotonic>)

Bases: object

Run a curated set of checks repeatedly, emitting alerts on transitions.

register(name: str, check: Callable[[], Any], failure_threshold: int = 1, recovery_threshold: int = 1) None
run_for(iterations: int, interval_seconds: float = 60.0, sleep: ~typing.Callable[[float], None] = <built-in function sleep>) List[SyntheticMonitorResult]

Run iterations ticks separated by interval_seconds.

tick_once() List[SyntheticMonitorResult]

Run every registered check exactly once and return the outcomes.

exception je_web_runner.api.infra.SyntheticMonitorError

Bases: WebRunnerException

Raised on bad config or sink misuse.

class je_web_runner.api.infra.SyntheticMonitorResult(name: str, status: str, duration_seconds: float, transitioned: bool, error: str | None = None, timestamp: float = <factory>)

Bases: object

Per-iteration outcome for a single check.

duration_seconds: float
error: str | None = None
name: str
status: str
timestamp: float
to_dict() Dict[str, Any]
transitioned: bool
exception je_web_runner.api.infra.WatchModeError

Bases: WebRunnerException

Raised on bad inputs to the watcher.

class je_web_runner.api.infra.WatchSnapshot(entries: Dict[str, ~typing.Tuple[float, int]]=<factory>)

Bases: object

{path: (mtime, size)} snapshot of a directory tree.

entries: Dict[str, Tuple[float, int]]
class je_web_runner.api.infra.WorkspaceLock(python_version: 'str', generated_at: 'str', entries: 'List[LockEntry]' = <factory>)

Bases: object

by_kind(kind: str) List[LockEntry]
entries: List[LockEntry]
generated_at: str
python_version: str
to_dict() Dict[str, Any]
exception je_web_runner.api.infra.WorkspaceLockError

Bases: WebRunnerException

Raised on invalid lock content or missing target.

je_web_runner.api.infra.affected_action_files(index: ImpactIndex, locators: Iterable[str] | None = None, urls: Iterable[str] | None = None, templates: Iterable[str] | None = None, commands: Iterable[str] | None = None) List[str]

Given changed locator/URL/template/command names, return every action JSON file that touches at least one of them.

je_web_runner.api.infra.assert_all_passed(results: Iterable[PipelineResult]) None

Raise if any stage status is not passed.

je_web_runner.api.infra.build_coverage_map(directory: str | Path, glob: str = '**/*.json', normalise_params: bool = True) CoverageMap

Walk directory for action JSON files and build the coverage map.

je_web_runner.api.infra.build_filter(include: Sequence[str] | None = None, exclude: Sequence[str] | None = None) NameFilter

Compile include / exclude regex lists.

je_web_runner.api.infra.build_index(directory: str | Path, glob: str = '**/*.json') ImpactIndex

走訪 directory 下所有 action JSON 檔,建立反查表 Walk directory for *.json files and project each one’s locators, URLs, templates, and command names into the returned index.

je_web_runner.api.infra.build_lock(drivers: Iterable[Dict[str, Any]] | None = None, playwright_versions: Dict[str, str] | None = None, allow_distributions: Iterable[str] | None = None, now: datetime | None = None) WorkspaceLock

Build a WorkspaceLock from the active interpreter + caller-supplied driver / Playwright versions.

je_web_runner.api.infra.changed_paths(base_ref: str = 'main', git_runner: Callable[[Sequence[str]], str] | None = None) List[str]

Return the list of paths changed between base_ref and HEAD.

je_web_runner.api.infra.coverage_for_routes(coverage: CoverageMap, declared_routes: Sequence[str]) Dict[str, List[str]]

Return {route: [files]} for each declared route (empty list if missing).

je_web_runner.api.infra.current_platform_marker() str

Return win / mac-arm64 / mac-x64 / linux / linux-arm64.

je_web_runner.api.infra.diff_locks(before: WorkspaceLock, after: WorkspaceLock) Dict[str, List[Dict[str, Any]]]

Compare two locks and return {added, removed, version_changed} lists.

je_web_runner.api.infra.download_pinned(pinned: PinnedDriver, cache_dir: str | Path = '.webrunner/drivers', fetch: Any | None = None) Path

確認對應的 driver 已下載並解壓;回傳可執行檔路徑 Make sure the pinned driver archive has been fetched and extracted into cache_dir and return the on-disk path of the binary inside.

fetch lets tests inject a synthetic byte loader; when None the archive is fetched via urllib.request.urlopen() over a default SSL context.

je_web_runner.api.infra.filter_paths(paths: Iterable[str | Path], include: Sequence[str] | None = None, exclude: Sequence[str] | None = None) List[str]

Return only those paths whose name matches the include / exclude rules.

je_web_runner.api.infra.from_action_files(files: Iterable[str], runner: Callable[[str], None], *, failure_threshold: int = 2, recovery_threshold: int = 1, alert_sink: Callable[[Dict[str, Any]], None] | None = None) SyntheticMonitor

Build a monitor whose checks each run an action JSON file via runner.

je_web_runner.api.infra.install_for_browser(pin_file: str | Path, browser: str, cache_dir: str | Path = '.webrunner/drivers', fetch: Any | None = None) Path | None

High-level helper: load the pin file, find the entry for browser, download if needed, and return the on-disk binary path.

je_web_runner.api.infra.load_lock(path: str | Path) WorkspaceLock
je_web_runner.api.infra.load_pinfile(path: str | Path) List[PinnedDriver]
je_web_runner.api.infra.load_pipeline(source: str | Path | Dict[str, Any]) Pipeline

Load a pipeline definition from a path / JSON string / dict.

je_web_runner.api.infra.normalise_path(path: str, normalise_params: bool = True) str

Strip query / fragment and replace numeric or UUID segments with :id.

je_web_runner.api.infra.poll_changes(previous: WatchSnapshot, current: WatchSnapshot) WatchDiff
je_web_runner.api.infra.render_coverage_markdown(coverage: CoverageMap, declared_routes: Sequence[str] | None = None) str

Render a Markdown coverage report (table of route → file count).

je_web_runner.api.infra.render_job_manifests(config: ShardJobConfig) List[Dict[str, Any]]

Produce one batch/v1 Job dict per shard.

je_web_runner.api.infra.render_job_yaml(config: ShardJobConfig) str

End-to-end: render manifests then YAML-encode.

je_web_runner.api.infra.render_yaml_documents(manifests: List[Dict[str, Any]]) str

Render to a multi-doc YAML string (basic dumper, no PyYAML dependency).

je_web_runner.api.infra.run_fan_out(tasks: Sequence[Any], max_workers: int | None = None, timeout: float | None = None, fail_fast: bool = False) FanOutResult

平行跑多個 callable;每個 task 可以是 callable(name, callable) tuple。 Run every entry in tasks concurrently. Each entry must be either a zero-arg callable or a (name, callable) tuple. Returns a FanOutResult with per-task timing and outcomes.

je_web_runner.api.infra.run_pipeline(pipeline: Pipeline, runner: Callable[[str], Dict[str, Any]], file_resolver: Callable[[str], List[str]] | None = None) List[PipelineResult]

依宣告順序跑 pipeline。Stage 失敗時: - continue_on_failure=True → 收集失敗、進下一 stage - 否則 → 中斷後續 stage,標記為 skipped

je_web_runner.api.infra.save_pinfile(path: str | Path, drivers: List[PinnedDriver]) Path
je_web_runner.api.infra.select_action_files(candidate_paths: Iterable[str], changed: Iterable[str], additional_keep: Iterable[str] | None = None) List[str]

candidate_paths 中挑出 changed 中影響到的子集 Keep only the candidate paths that are also in changed. additional_keep forces inclusion regardless of diff (useful for “core” tests that should always run).

je_web_runner.api.infra.select_for_changed(candidate_paths: Iterable[str], base_ref: str = 'main', additional_keep: Iterable[str] | None = None, git_runner: Callable[[Sequence[str]], str] | None = None) List[str]

High-level shortcut: query git, then filter.

je_web_runner.api.infra.snapshot_dir(directory: str, glob: str = '**/*.json') WatchSnapshot
je_web_runner.api.infra.watch_loop(directory: str, on_change: ~typing.Callable[[~je_web_runner.utils.watch_mode.watcher.WatchDiff], None], glob: str = '**/*.json', interval: float = 0.5, max_iterations: int | None = None, sleep: ~typing.Callable[[float], None] = <built-in function sleep>) int

directory 上輪詢,每次有變動就 on_change(diff) Poll the directory at interval seconds. Calls on_change(diff) only when something changed; returns the iteration count.

je_web_runner.api.infra.write_lock(lock: WorkspaceLock, path: str | Path) Path