Rewrite persistRevisionThreadItems

* Fix estimate when using --current
* Add waitForReplication() after batches
* Add an option to restart from a specific point
* Remove TableCleanup, which doesn't work well for these use cases
* Use index on page_id when processing current revisions only

Bug: T315510
Change-Id: Idf9759743e67b3e116d6e20234b603bd76d4a41f
This commit is contained in:
Bartosz Dziewoński 2022-09-07 03:33:37 +02:00
parent 86d47438d3
commit 3fa61ab6cd

View file

@ -2,26 +2,29 @@
namespace MediaWiki\Extension\DiscussionTools\Maintenance;
use IDatabase;
use Maintenance;
use MediaWiki\Extension\DiscussionTools\Hooks\HookUtils;
use MediaWiki\Extension\DiscussionTools\ThreadItemStore;
use MediaWiki\MediaWikiServices;
use MediaWiki\Revision\RevisionStore;
use MediaWiki\Shell\Shell;
use MWExceptionRenderer;
use stdClass;
use TableCleanup;
use Throwable;
use Title;
use Wikimedia\Rdbms\SelectQueryBuilder;
$IP = getenv( 'MW_INSTALL_PATH' );
if ( $IP === false ) {
$IP = __DIR__ . '/../../..';
}
require_once "$IP/maintenance/Maintenance.php";
// Autoloader isn't set up yet until we do `require_once RUN_MAINTENANCE_IF_MAIN`…
// but our class needs to exist at that point D:
require_once "$IP/maintenance/TableCleanup.php";
class PersistRevisionThreadItems extends TableCleanup {
class PersistRevisionThreadItems extends Maintenance {
/** @var IDatabase */
private $dbr;
/** @var ThreadItemStore */
private $itemStore;
@ -35,18 +38,25 @@ class PersistRevisionThreadItems extends TableCleanup {
$this->addDescription( 'Persist thread item information for the given pages/revisions' );
$this->addOption( 'rev', 'Revision ID to process', false, true, false, true );
$this->addOption( 'page', 'Page title to process', false, true, false, true );
$this->addOption( 'all', 'Process the whole wiki', false, false, false, false );
$this->addOption( 'current', 'Process current revisions only', false, false, false, false );
$this->addOption( 'all', 'Process the whole wiki' );
$this->addOption( 'current', 'Process current revisions only' );
$this->addOption( 'start', 'Restart from this position (as printed by the script)', false, true );
$this->setBatchSize( 100 );
}
public function execute() {
$services = MediaWikiServices::getInstance();
$this->dbr = $this->getDB( DB_REPLICA );
$this->itemStore = $services->getService( 'DiscussionTools.ThreadItemStore' );
$this->revStore = $services->getRevisionStore();
$qb = $this->dbr->newSelectQueryBuilder();
$qb->queryInfo( $this->revStore->getQueryInfo( [ 'page' ] ) );
if ( $this->getOption( 'all' ) ) {
$conds = [];
// Do nothing
} elseif ( $this->getOption( 'page' ) ) {
$linkBatch = $services->getLinkBatchFactory()->newLinkBatch();
@ -57,10 +67,10 @@ class PersistRevisionThreadItems extends TableCleanup {
return $page->getId();
}, $linkBatch->getPageIdentities() );
$conds = [ 'rev_page' => $pageIds ];
$qb->where( [ 'rev_page' => $pageIds ] );
} elseif ( $this->getOption( 'rev' ) ) {
$conds = [ 'rev_id' => $this->getOption( 'rev' ) ];
$qb->where( [ 'rev_id' => $this->getOption( 'rev' ) ] );
} else {
$this->error( "One of 'all', 'page', or 'rev' required" );
$this->maybeHelp( true );
@ -68,31 +78,91 @@ class PersistRevisionThreadItems extends TableCleanup {
}
if ( $this->getOption( 'current' ) ) {
// runTable() doesn't provide a way to do a JOIN. This is equivalent, but it might have
// different performance characteristics. It should be good enough for a maintenance script.
$conds[] = 'rev_id IN ( SELECT page_latest FROM page )';
$qb->where( 'rev_id = page_latest' );
$index = [ 'page_id' ];
} else {
// Process in order by page and time to avoid confusing results while the script is running
$index = [ 'rev_page', 'rev_timestamp', 'rev_id' ];
}
$this->runTable( [
'table' => 'revision',
'conds' => $conds,
'index' => [ 'rev_page', 'rev_timestamp', 'rev_id' ],
'callback' => 'processRow',
] );
$this->process( $qb, $index );
}
/**
* @param SelectQueryBuilder $qb
* @param array $index
*/
private function process( SelectQueryBuilder $qb, array $index ): void {
$qb->caller( __METHOD__ );
// estimateRowCount() refuses to work when fields are set, so we can't just call it on $qb
$countQueryInfo = $qb->getQueryInfo();
$count = $qb->newSubquery()
->rawTables( $countQueryInfo['tables'] )
->where( $countQueryInfo['conds'] )
->options( $countQueryInfo['options'] )
->joinConds( $countQueryInfo['join_conds'] )
->caller( __METHOD__ )
->estimateRowCount();
$this->output( "Processing... (estimated $count rows)\n" );
$processed = 0;
$updated = 0;
$qb->orderBy( $index );
$batchSize = $this->getBatchSize();
$qb->limit( $batchSize );
$batchStart = null;
if ( $this->getOption( 'start' ) ) {
$batchStart = json_decode( $this->getOption( 'start' ) );
if ( !$batchStart ) {
$this->error( "Invalid 'start'" );
}
}
while ( true ) {
$qbForBatch = clone $qb;
if ( $batchStart ) {
$batchStartCond = $this->dbr->buildComparison( '>', array_combine( $index, $batchStart ) );
$qbForBatch->where( $batchStartCond );
$batchStartOutput = Shell::escape( json_encode( $batchStart ) );
$this->output( "--start $batchStartOutput\n" );
}
$res = $qbForBatch->fetchResultSet();
foreach ( $res as $row ) {
$updated += (int)$this->processRow( $row );
}
$processed += $res->numRows();
$this->output( "Processed $processed (updated $updated) of $count rows\n" );
$this->waitForReplication();
if ( $res->numRows() < $batchSize || !isset( $row ) ) {
// Done
break;
}
// Update the conditions to select the next batch.
$batchStart = [];
foreach ( $index as $field ) {
$batchStart[] = $row->$field;
}
}
$this->output( "Finished!\n" );
}
/**
* @param stdClass $row Database table row
* @return bool
*/
protected function processRow( stdClass $row ) {
private function processRow( stdClass $row ): bool {
$changed = false;
try {
// HACK (because we don't query the table this data ordinarily comes from,
// and we don't care about edit summaries here)
$row->rev_comment_text = '';
$row->rev_comment_data = null;
$row->rev_comment_cid = null;
$rev = $this->revStore->newRevisionFromRow( $row );
$title = Title::newFromLinkTarget(
$rev->getPageAsLinkTarget()
@ -100,16 +170,14 @@ class PersistRevisionThreadItems extends TableCleanup {
if ( HookUtils::isAvailableForTitle( $title ) ) {
$threadItemSet = HookUtils::parseRevisionParsoidHtml( $rev );
if ( !$this->dryrun ) {
// Store permalink data
$changed = $this->itemStore->insertThreadItems( $rev, $threadItemSet );
}
// Store permalink data
$changed = $this->itemStore->insertThreadItems( $rev, $threadItemSet );
}
} catch ( Throwable $e ) {
$this->output( "Error while processing revid=$row->rev_id, pageid=$row->rev_page\n" );
MWExceptionRenderer::output( $e, MWExceptionRenderer::AS_RAW );
}
$this->progress( (int)$changed );
return $changed;
}
}