je_web_runner.api.observability

Façade: timeline / failure bundle / memory leak / trace / cdp tap / event bus / OTLP.

exception je_web_runner.api.observability.BidiBackendError

Bases: WebRunnerException

Raised when subscription / unsubscription fails or backend is unsupported.

class je_web_runner.api.observability.BidiBridge

Bases: object

Backend-detecting bridge for BiDi-style event subscription.

active_subscriptions() List[BidiSubscription]
detect_backend(target: Any) str
register_translator(backend: str, event: str, translator: Callable[[Any, Callable[[BidiEvent], None]], Callable[[], None]]) None
subscribe(target: Any, event: str, callback: Callable[[BidiEvent], None], backend: str | None = None) BidiSubscription
unsubscribe(subscription: BidiSubscription) None
unsubscribe_all() None
class je_web_runner.api.observability.BidiEvent(name: str, payload: Dict[str, Any])

Bases: object

Backend-agnostic event payload.

name: str
payload: Dict[str, Any]
class je_web_runner.api.observability.BidiSubscription(subscription_id: int, event: str, backend: str, detach: Callable[[], None])

Bases: object

Handle returned by BidiBridge.subscribe().

backend: str
detach: Callable[[], None]
event: str
subscription_id: int
unsubscribe() None
class je_web_runner.api.observability.CdpRecord(timestamp: 'float', method: 'str', params: 'Dict[str, Any]', return_value: 'Any' = None, error: 'Optional[str]' = None)

Bases: object

error: str | None = None
method: str
params: Dict[str, Any]
return_value: Any = None
timestamp: float
to_dict() Dict[str, Any]
class je_web_runner.api.observability.CdpRecorder(output_path: str | Path)

Bases: object

Wrap a driver’s execute_cdp_cmd and persist every call.

attach(driver: Any) Callable[[str, Dict[str, Any]], Any]

Replace driver.execute_cdp_cmd with a recording wrapper. Returns the original method so the caller can detach later.

detach(driver: Any, original: Callable[[str, Dict[str, Any]], Any]) None
flush() Path
output_path: str | Path
records() List[CdpRecord]
class je_web_runner.api.observability.CdpReplayer(records: List[CdpRecord])

Bases: object

Match incoming execute_cdp_cmd calls against a recording.

execute_cdp_cmd(method: str, _params: Dict[str, Any] | None = None) Any
records: List[CdpRecord]
remaining() int
reset() None
exception je_web_runner.api.observability.CdpTapError

Bases: WebRunnerException

Raised when recording or replay can’t proceed.

class je_web_runner.api.observability.EventBus(log_path: str | Path, sender: str | None = None)

Bases: object

File-backed publish/subscribe primitive.

current_offset() int
log_path: str | Path
poll(offset: int = 0, topics: Iterable[str] | None = None) List[EventEnvelope]
publish(topic: str, payload: Dict[str, Any] | None = None) EventEnvelope
sender: str | None = None
wait_for(topic: str, offset: int = 0, predicate: ~typing.Callable[[~je_web_runner.utils.event_bus.bus.EventEnvelope], bool] | None = None, timeout: float = 30.0, poll_interval: float = 0.1, sleep: ~typing.Callable[[float], None] = <built-in function sleep>) EventEnvelope

Block until an event matching topic (and predicate) appears.

exception je_web_runner.api.observability.EventBusError

Bases: WebRunnerException

Raised on invalid bus configuration or corrupted log lines.

class je_web_runner.api.observability.EventEnvelope(event_id: 'str', topic: 'str', payload: 'Dict[str, Any]', timestamp_ms: 'int' = <factory>, sender: 'Optional[str]' = None)

Bases: object

event_id: str
static from_dict(data: Dict[str, Any]) EventEnvelope
payload: Dict[str, Any]
sender: str | None = None
timestamp_ms: int
to_json_line() str
topic: str
class je_web_runner.api.observability.FailureBundle(test_name: str, error_repr: str, captured_at: str = <factory>, metadata: Dict[str, ~typing.Any]=<factory>, _entries: Dict[str, bytes]=<factory>)

Bases: object

Builder for one failure bundle.

add_console(messages: List[Dict[str, Any]]) None
add_dom(html: str, name: str = 'dom.html') None
add_file(source: str | Path, inside_name: str | None = None) None
add_network(responses: List[Dict[str, Any]]) None
add_screenshot(png_bytes: bytes, name: str = 'screenshot.png') None
add_text(name: str, text: str) None
add_trace(trace_path: str | Path) None
captured_at: str
error_repr: str
metadata: Dict[str, Any]
test_name: str
write(output_path: str | Path) Path
exception je_web_runner.api.observability.FailureBundleError

Bases: WebRunnerException

Raised when bundle building or reading fails.

exception je_web_runner.api.observability.MemoryLeakError

Bases: WebRunnerException

Raised when the driver cannot report heap stats or growth exceeds budget.

class je_web_runner.api.observability.MemorySample(iteration: int, used_heap_bytes: int)

Bases: object

One heap-size sample after a round.

iteration: int
used_heap_bytes: int
class je_web_runner.api.observability.OtlpExportConfig(endpoint: str, protocol: str = 'grpc', headers: Dict[str, str] | None = None, timeout: float = 10.0, insecure: bool = False, service_name: str = 'webrunner')

Bases: object

Caller-supplied OTLP wiring.

endpoint: str
headers: Dict[str, str] | None = None
insecure: bool = False
protocol: str = 'grpc'
service_name: str = 'webrunner'
timeout: float = 10.0
exception je_web_runner.api.observability.OtlpExporterError

Bases: WebRunnerException

Raised when configuration is invalid or the SDK is missing.

exception je_web_runner.api.observability.TimelineError

Bases: WebRunnerException

Raised when an event lacks a usable timestamp.

class je_web_runner.api.observability.TimelineEvent(timestamp_ms: float, kind: str, label: str, payload: Dict[str, Any])

Bases: object

A single point on the merged timeline.

kind: str
label: str
payload: Dict[str, Any]
timestamp_ms: float
class je_web_runner.api.observability.TraceRecorder(output_dir: str = 'trace-out', screenshots: bool = True, snapshots: bool = True, sources: bool = True)

Bases: object

Capture Playwright tracing into per-name zip files.

output_dir: str = 'trace-out'
screenshots: bool = True
snapshots: bool = True
sources: bool = True
start(context: Any, name: str) None
stop(context: Any) str
written() List[str]
exception je_web_runner.api.observability.TraceRecorderError

Bases: WebRunnerException

Raised when tracing API isn’t available or stop() runs without start().

je_web_runner.api.observability.build(spans: Iterable[Dict[str, Any]] | None = None, console: Iterable[Dict[str, Any]] | None = None, responses: Iterable[Dict[str, Any]] | None = None) List[Dict[str, Any]]

Top-level helper: take three optional sources, return one ordered list.

je_web_runner.api.observability.build_exporter(config: OtlpExportConfig) Any

Construct an OTLPSpanExporter matching the requested protocol.

je_web_runner.api.observability.configure_otlp_export(tracer_provider: Any, config: OtlpExportConfig, processor_factory: Any | None = None, exporter_factory: Any | None = None) Any

Build the exporter + BatchSpanProcessor and register it with the supplied TracerProvider. Returns the registered processor so the caller can call shutdown() cleanly.

processor_factory / exporter_factory let unit tests inject stubs without importing the OTel SDK.

je_web_runner.api.observability.detect_growth(driver: Any, action: Callable[[], None], iterations: int = 5, warmup: int = 1, growth_bytes_per_iter_budget: int | None = None, sampler: Callable[[Any], int] | None = None) dict

action N 次,回傳每輪的 heap 樣本與線性斜率 Run action iterations times (after warmup discarded rounds), sample the heap each round, and return the linear-fit slope so callers can decide whether memory is growing.

Parameters:

growth_bytes_per_iter_budget – when set, raise if slope exceeds it.

je_web_runner.api.observability.extract_bundle(zip_path: str | Path) Dict[str, Any]

Read a bundle and return {manifest, files: {name: bytes}}.

je_web_runner.api.observability.from_console(messages: Iterable[Dict[str, Any]]) List[TimelineEvent]
je_web_runner.api.observability.from_responses(responses: Iterable[Dict[str, Any]]) List[TimelineEvent]
je_web_runner.api.observability.from_spans(spans: Iterable[Dict[str, Any]]) List[TimelineEvent]

Convert OTel-shaped spans ({name, start_ms, end_ms, attrs}).

je_web_runner.api.observability.load_recording(path: str | Path) List[CdpRecord]
je_web_runner.api.observability.merge(*event_lists: Iterable[TimelineEvent]) List[TimelineEvent]

Concatenate event lists and sort by timestamp ascending (stable).

je_web_runner.api.observability.sample_used_heap(driver: Any) int

讀取 performance.memory.usedJSHeapSize Selenium / Playwright friendly heap-size probe. Returns bytes.

je_web_runner.api.observability.to_dicts(events: Iterable[TimelineEvent]) List[Dict[str, Any]]