Add job to keep user notifications in reasonable volume

Change-Id: I4d4fa4c987a1732e5e29536a7669e28c34d4ab18
This commit is contained in:
bsitu 2014-09-09 15:11:53 -07:00 committed by EBernhardson
parent c679929e03
commit 55ef794d74
7 changed files with 247 additions and 6 deletions

View file

@ -103,11 +103,15 @@ $wgAutoloadClasses['EchoNotificationController'] = $dir . 'controller/Notificati
$wgAutoloadClasses['EchoDiscussionParser'] = $dir . 'includes/DiscussionParser.php';
$wgAutoloadClasses['EchoDiffParser'] = $dir . 'includes/DiffParser.php';
// Job queue
// Job to process the sending of Echo notifications
$wgAutoloadClasses['EchoNotificationJob'] = $dir . 'jobs/NotificationJob.php';
$wgJobClasses['EchoNotificationJob'] = 'EchoNotificationJob';
// Job to process notification email bundling
$wgAutoloadClasses['MWEchoNotificationEmailBundleJob'] = $dir . 'jobs/NotificationEmailBundleJob.php';
$wgJobClasses['MWEchoNotificationEmailBundleJob'] = 'MWEchoNotificationEmailBundleJob';
// Job to delete older notifications
$wgAutoloadClasses['EchoNotificationDeleteJob'] = $dir . 'jobs/NotificationDeleteJob.php';
$wgJobClasses['EchoNotificationDeleteJob'] = 'EchoNotificationDeleteJob';
// Deferred execution
$wgAutoloadClasses['EchoDeferredMarkAsReadUpdate'] = $dir . '/includes/DeferredMarkAsReadUpdate.php';
@ -239,10 +243,10 @@ $wgEchoCluster = false;
// The max number showed in bundled message, eg, <user> and 99+ others <action>
$wgEchoMaxNotificationCount = 99;
// The max number allowed to be updated on a web request, when we mark all notification
// as read, it's a bad idea to update on a web request if the number is incredibly
// huge, to prevent this, we just fetch 2000 thousand records and mark them as read.
// This would cover most of the use cases.
// The max number of notifications allowed for a user to do a live update,
// this is also the number of max notifications allowed for a user to have
// @FIXME - the name is not intuitive, probably change it when the deleteJob patch
// is deployed to both deployment branches
$wgEchoMaxUpdateCount = 2000;
// The time interval between each bundle email in seconds

View file

@ -74,7 +74,10 @@ class EchoNotificationController {
$type = $event->getType();
$notifyTypes = self::getEventNotifyTypes( $type );
$userIds = array();
$userIdsCount = 0;
foreach ( self::getUsersToNotifyForEvent( $event ) as $user ) {
$userIds[$user->getId()] = $user->getId();
$userNotifyTypes = $notifyTypes;
wfRunHooks( 'EchoGetNotificationTypes', array( $user, $event, &$userNotifyTypes ) );
@ -82,9 +85,43 @@ class EchoNotificationController {
foreach ( $userNotifyTypes as $type ) {
self::doNotification( $event, $user, $type );
}
$userIdsCount++;
// Process 1000 users per NotificationDeleteJob
if ( $userIdsCount > 1000 ) {
self::enqueueDeleteJob( $userIds, $event );
$userIds = array();
$userIdsCount = 0;
}
}
// process the userIds left in the array
if ( $userIds ) {
self::enqueueDeleteJob( $userIds, $event );
}
}
/**
* Schedule a job to check and delete older notifications
*
* @param int $userIds
* @param EchoEvent $event
*/
public static function enqueueDeleteJob( array $userIds, EchoEvent $event ) {
// Do nothing if there is no user
if ( !$userIds ) {
return;
}
$job = new EchoNotificationDeleteJob(
$event->getTitle() ?: Title::newMainPage(),
array(
'userIds' => $userIds
)
);
JobQueueGroup::singleton()->push( $job );
}
/**
* @param string $type Event type
* @return string[] List of notification types to send for

View file

@ -206,4 +206,57 @@ class EchoNotificationMapper extends EchoAbstractMapper {
}
}
/**
* Fetch a notification by user in the specified offset. The caller should
* know that passing a big number for offset is NOT going to work
* @param User $user
* @param int $offset
* @return EchoNotification|bool
*/
public function fetchByUserOffset( User $user, $offset ) {
$dbr = $this->dbFactory->getEchoDb( DB_SLAVE );
$row = $dbr->selectRow(
array( 'echo_notification', 'echo_event' ),
array( '*' ),
array(
'notification_user' => $user->getId(),
'notification_bundle_base' => 1
),
__METHOD__,
array(
'ORDER BY' => 'notification_timestamp DESC, notification_event DESC',
'OFFSET' => $offset,
'LIMIT' => 1
),
array(
'echo_event' => array( 'LEFT JOIN', 'notification_event=event_id' ),
)
);
if ( $row ) {
return EchoNotification::newFromRow( $row );
} else {
return false;
}
}
/**
* Batch delete notifications by user and eventId offset
* @param User $user
* @param int $eventId
* @return boolean
*/
public function deleteByUserEventOffset( User $user, $eventId ) {
$dbw = $this->dbFactory->getEchoDb( DB_MASTER );
$res = $dbw->delete(
'echo_notification',
array(
'notification_user' => $user->getId(),
'notification_event < ' . (int)$eventId
),
__METHOD__
);
return $res;
}
}

View file

@ -140,6 +140,27 @@ class EchoTargetPageMapper extends EchoAbstractMapper {
return $res;
}
/**
* Delete multiple EchoTargetPage records by user & event_id offset
*
* @param User $user
* @param int $eventId
* @return boolean
*/
public function deleteByUserEventOffset( User $user, $eventId ) {
$dbw = $this->dbFactory->getEchoDb( DB_MASTER );
$res = $dbw->delete(
'echo_target_page',
array(
'etp_user' => $user->getId(),
'etp_event < ' . (int)$eventId
),
__METHOD__
);
return $res;
}
/**
* Delete multiple EchoTargetPage records by user
*

View file

@ -0,0 +1,74 @@
<?php
/**
* This job is created when sending notifications to the target users. The purpose
* of this job is to delete older notifications when the number of notifications a
* user has is more than $wgEchoMaxUpdateCount, it does not make sense to have tons
* of notifications in the history while users wouldn't bother to click 'load more'
* like 100 times to see them. What we gain from this is we could run expensive
* queries otherwise that would requires adding index and data denormalization.
*/
class EchoNotificationDeleteJob extends Job {
/**
* UserIds to be processed
* @var int[]
*/
protected $userIds = array();
/**
* @var MWEchoDbFactory
*/
protected $dbFactory;
/**
* @param Title
* @param array
*/
function __construct( $title, $params ) {
parent::__construct( __CLASS__, $title, $params );
$this->userIds = $params['userIds'];
$this->dbFactory = MWEchoDbFactory::newFromDefault();
}
/**
* Run the job of finding & deleting older notifications
*/
function run() {
global $wgEchoMaxUpdateCount;
$updateCount = 0;
$dbw = $this->dbFactory->getEchoDb( DB_MASTER );
$notifMapper = new EchoNotificationMapper();
$targetMapper = new EchoTargetPageMapper();
foreach ( $this->userIds as $userId ) {
$user = User::newFromId( $userId );
$notif = $notifMapper->fetchByUserOffset( $user, $wgEchoMaxUpdateCount );
if ( $notif ) {
$dbw->startAtomic( __METHOD__ );
$res = $notifMapper->deleteByUserEventOffset(
$user, $notif->getEvent()->getId()
);
if ( $res ) {
$res = $targetMapper->deleteByUserEventOffset(
$user, $notif->getEvent()->getId()
);
}
$dbw->endAtomic( __METHOD__ );
if ( $res ) {
$updateCount++;
$notifUser = MWEchoNotifUser::newFromUser( $user );
$notifUser->resetNotificationCount( DB_MASTER );
}
// Wait for slave if we are doing a lot of updates
if ( $updateCount > 10 ) {
$this->dbFactory->waitForSlaves();
$updateCount = 0;
}
}
}
return true;
}
}

View file

@ -114,6 +114,43 @@ class EchoNotificationMapperTest extends MediaWikiTestCase {
$this->assertInstanceOf( 'EchoNotification', $row );
}
public function testFetchByUserOffset() {
// Unsuccessful select
$notifMapper = new EchoNotificationMapper( $this->mockMWEchoDbFactory( array ( 'selectRow' => false ) ) );
$res = $notifMapper->fetchByUserOffset( User::newFromId( 1 ), 500 );
$this->assertFalse( $res );
// Successful select
$dbResult = (object)array (
'event_id' => 1,
'event_type' => 'test',
'event_variant' => '',
'event_extra' => '',
'event_page_id' => '',
'event_agent_id' => '',
'event_agent_ip' => '',
'notification_user' => 1,
'notification_timestamp' => '20140615101010',
'notification_read_timestamp' => '20140616101010',
'notification_bundle_base' => 1,
'notification_bundle_hash' => 'testhash',
'notification_bundle_display_hash' => 'testdisplayhash'
);
$notifMapper = new EchoNotificationMapper( $this->mockMWEchoDbFactory( array ( 'selectRow' => $dbResult ) ) );
$row = $notifMapper->fetchNewestByUserBundleHash( User::newFromId( 1 ), 500 );
$this->assertInstanceOf( 'EchoNotification', $row );
}
public function testDeleteByUserEventOffset() {
$dbResult = array( 'delete' => true );
$notifMapper = new EchoNotificationMapper( $this->mockMWEchoDbFactory( $dbResult ) );
$this->assertTrue( $notifMapper->deleteByUserEventOffset( User::newFromId( 1 ), 500 ) );
$dbResult = array( 'delete' => false );
$notifMapper = new EchoNotificationMapper( $this->mockMWEchoDbFactory( $dbResult ) );
$this->assertFalse( $notifMapper->deleteByUserEventOffset( User::newFromId( 1 ), 500 ) );
}
/**
* Mock object of User
*/
@ -166,8 +203,10 @@ class EchoNotificationMapperTest extends MediaWikiTestCase {
$dbResult += array(
'insert' => '',
'select' => '',
'selectRow' => ''
'selectRow' => '',
'delete' => ''
);
$db = $this->getMockBuilder( 'DatabaseMysql' )
->disableOriginalConstructor()
->getMock();
@ -177,6 +216,9 @@ class EchoNotificationMapperTest extends MediaWikiTestCase {
$db->expects( $this->any() )
->method( 'select' )
->will( $this->returnValue( $dbResult['select'] ) );
$db->expects( $this->any() )
->method( 'delete' )
->will( $this->returnValue( $dbResult['delete'] ) );
$db->expects( $this->any() )
->method( 'selectRow' )
->will( $this->returnValue( $dbResult['selectRow'] ) );

View file

@ -121,6 +121,16 @@ class EchoTargetPageMapperTest extends MediaWikiTestCase {
$this->assertSame( $targetMapper->deleteByUser( User::newFromId( 1 ) ), false );
}
public function testDeleteByUserEventOffset() {
$dbResult = array( 'delete' => true );
$targetMapper = new EchoTargetPageMapper( $this->mockMWEchoDbFactory( $dbResult ) );
$this->assertSame( $targetMapper->deleteByUserEventOffset( User::newFromId( 1 ), 500 ), true );
$dbResult = array( 'delete' => false );
$targetMapper = new EchoTargetPageMapper( $this->mockMWEchoDbFactory( $dbResult ) );
$this->assertSame( $targetMapper->deleteByUserEventOffset( User::newFromId( 1 ), 500 ), false );
}
/**
* Mock object of EchoTargetPage
*/