feat(tia): continues to work on poc

This commit is contained in:
nuno maduro
2026-04-20 10:05:55 -07:00
parent 9c8033d60c
commit 47f1fc2d94
2 changed files with 173 additions and 10 deletions

View File

@ -86,6 +86,8 @@ final class Tia implements AddsOutput, HandlesArguments, Terminable
private const string WORKER_PREFIX = 'tia-worker-';
private const string WORKER_RESULTS_PREFIX = 'tia-worker-results-';
/**
* Global flag toggled by the parent process so workers know to record.
*/
@ -174,6 +176,16 @@ final class Tia implements AddsOutput, HandlesArguments, Terminable
return self::tempDir().DIRECTORY_SEPARATOR.self::WORKER_PREFIX.'*.json';
}
private static function workerResultsPath(string $token): string
{
return self::tempDir().DIRECTORY_SEPARATOR.self::WORKER_RESULTS_PREFIX.$token.'.json';
}
private static function workerResultsGlob(): string
{
return self::tempDir().DIRECTORY_SEPARATOR.self::WORKER_RESULTS_PREFIX.'*.json';
}
public function __construct(
private readonly OutputInterface $output,
private readonly Recorder $recorder,
@ -263,6 +275,14 @@ final class Tia implements AddsOutput, HandlesArguments, Terminable
return;
}
// Worker in replay mode: flush the ResultCollector + replay counter
// into a partial so the parent can merge them into the graph after
// paratest returns. Parent's own ResultCollector is empty in parallel
// runs because workers — not the parent — execute the tests.
if (Parallel::isWorker() && $this->replayGraph !== null) {
$this->flushWorkerReplay();
}
$recorder = $this->recorder;
if (! $recorder->isActive()) {
@ -329,6 +349,14 @@ final class Tia implements AddsOutput, HandlesArguments, Terminable
// twice in a row would re-execute the same affected tests both
// times even though nothing new changed.
if ($this->replayRan) {
// In parallel runs the workers executed the tests, so their
// ResultCollector + replay counter live in other processes. Pull
// those partials in before both the summary and the graph
// snapshot so the parent state reflects the whole run.
if (Parallel::isEnabled()) {
$this->mergeWorkerReplayPartials();
}
$this->bumpRecordedSha();
$this->emitReplaySummary();
}
@ -578,6 +606,10 @@ final class Tia implements AddsOutput, HandlesArguments, Terminable
return $arguments;
}
// Clear stale partials from a previous interrupted run so the merge
// pass doesn't pick up results from an unrelated invocation.
$this->purgeWorkerPartials($projectRoot);
Parallel::setGlobal(self::REPLAYING_GLOBAL, '1');
return $arguments;
@ -670,16 +702,7 @@ final class Tia implements AddsOutput, HandlesArguments, Terminable
*/
private function flushWorkerPartial(string $projectRoot, array $perTest): void
{
$token = $_SERVER['TEST_TOKEN'] ?? $_ENV['TEST_TOKEN'] ?? getmypid();
// Defensive: token might arrive as int or string depending on paratest
// version. Cast + filter to keep filenames sane.
$token = preg_replace('/[^A-Za-z0-9_-]/', '', (string) $token);
if ($token === '') {
$token = (string) getmypid();
}
$path = self::workerPath($token);
$path = self::workerPath($this->workerToken());
$dir = dirname($path);
if (! is_dir($dir) && ! @mkdir($dir, 0755, true) && ! is_dir($dir)) {
@ -719,6 +742,132 @@ final class Tia implements AddsOutput, HandlesArguments, Terminable
foreach ($this->collectWorkerPartials($projectRoot) as $path) {
@unlink($path);
}
foreach ($this->collectWorkerReplayPartials() as $path) {
@unlink($path);
}
}
/**
* Worker-side flush of replay state (collected results + cache-hit
* counter) into a per-worker partial file. Parent merges them in
* `addOutput` so the graph snapshot + summary reflect the full run.
*/
private function flushWorkerReplay(): void
{
/** @var ResultCollector $collector */
$collector = Container::getInstance()->get(ResultCollector::class);
$results = $collector->all();
if ($results === [] && $this->replayedCount === 0) {
return;
}
$token = $this->workerToken();
$path = self::workerResultsPath($token);
$dir = dirname($path);
if (! is_dir($dir) && ! @mkdir($dir, 0755, true) && ! is_dir($dir)) {
return;
}
$json = json_encode([
'results' => $results,
'replayed' => $this->replayedCount,
], JSON_UNESCAPED_SLASHES);
if ($json === false) {
return;
}
$tmp = $path.'.'.bin2hex(random_bytes(4)).'.tmp';
if (@file_put_contents($tmp, $json) === false) {
return;
}
if (! @rename($tmp, $path)) {
@unlink($tmp);
}
}
/**
* @return array<int, string>
*/
private function collectWorkerReplayPartials(): array
{
$matches = glob(self::workerResultsGlob());
return $matches === false ? [] : $matches;
}
/**
* Parent-side merge of per-worker replay partials. Feeds the results into
* the parent's `ResultCollector` so the existing snapshot pass persists
* them, and rolls up the cache-hit counts so the summary is accurate.
*/
private function mergeWorkerReplayPartials(): void
{
/** @var ResultCollector $collector */
$collector = Container::getInstance()->get(ResultCollector::class);
foreach ($this->collectWorkerReplayPartials() as $path) {
$raw = @file_get_contents($path);
if ($raw === false) {
@unlink($path);
continue;
}
$decoded = json_decode($raw, true);
@unlink($path);
if (! is_array($decoded)) {
continue;
}
if (isset($decoded['replayed']) && is_int($decoded['replayed'])) {
$this->replayedCount += $decoded['replayed'];
}
if (isset($decoded['results']) && is_array($decoded['results'])) {
$normalised = [];
/** @var mixed $result */
foreach ($decoded['results'] as $testId => $result) {
if (! is_string($testId) || ! is_array($result)) {
continue;
}
$normalised[$testId] = [
'status' => is_int($result['status'] ?? null) ? $result['status'] : 0,
'message' => is_string($result['message'] ?? null) ? $result['message'] : '',
'time' => is_float($result['time'] ?? null) || is_int($result['time'] ?? null) ? (float) $result['time'] : 0.0,
'assertions' => is_int($result['assertions'] ?? null) ? $result['assertions'] : 0,
];
}
if ($normalised !== []) {
$collector->merge($normalised);
}
}
}
}
private function workerToken(): string
{
$raw = $_SERVER['TEST_TOKEN'] ?? $_ENV['TEST_TOKEN'] ?? null;
$token = is_scalar($raw) ? (string) $raw : (string) getmypid();
$token = preg_replace('/[^A-Za-z0-9_-]/', '', $token);
if ($token === null || $token === '') {
return (string) getmypid();
}
return $token;
}
/**

View File

@ -97,6 +97,20 @@ final class ResultCollector
}
}
/**
* Injects externally-collected results (e.g. partials flushed by parallel
* workers) into this collector so the parent can persist them in the same
* snapshot pass as non-parallel runs.
*
* @param array<string, array{status: int, message: string, time: float, assertions: int}> $results
*/
public function merge(array $results): void
{
foreach ($results as $testId => $result) {
$this->results[$testId] = $result;
}
}
public function reset(): void
{
$this->results = [];