1919use OCP \App \IAppManager ;
2020use OCP \AppFramework \Db \DoesNotExistException ;
2121use OCP \AppFramework \Db \MultipleObjectsReturnedException ;
22+ use OCP \AppFramework \Utility \ITimeFactory ;
2223use OCP \BackgroundJob \IJobList ;
2324use OCP \DB \Exception ;
2425use OCP \EventDispatcher \IEventDispatcher ;
5960use OCP \TaskProcessing \IInternalTaskType ;
6061use OCP \TaskProcessing \IManager ;
6162use OCP \TaskProcessing \IProvider ;
62- use OCP \TaskProcessing \ISynchronousOptionsProvider ;
63+ use OCP \TaskProcessing \ISynchronousOptionsAwareProvider ;
6364use OCP \TaskProcessing \ISynchronousProvider ;
6465use OCP \TaskProcessing \ISynchronousWatermarkingProvider ;
6566use OCP \TaskProcessing \ITaskType ;
@@ -158,6 +159,7 @@ public function __construct(
158159 private IUserSession $ userSession ,
159160 ICacheFactory $ cacheFactory ,
160161 private IFactory $ l10nFactory ,
162+ private ITimeFactory $ timeFactory ,
161163 ) {
162164 $ this ->appData = $ appDataFactory ->get ('core ' );
163165 $ this ->distributedCache = $ cacheFactory ->createDistributed ('task_processing:: ' );
@@ -1134,7 +1136,7 @@ public function processTask(Task $task, ISynchronousProvider $provider): bool {
11341136 $ this ->setTaskStatus ($ task , Task::STATUS_RUNNING );
11351137 if ($ provider instanceof ISynchronousWatermarkingProvider) {
11361138 $ output = $ provider ->process ($ task ->getUserId (), $ input , fn (float $ progress ) => $ this ->setTaskProgress ($ task ->getId (), $ progress ), $ task ->getIncludeWatermark ());
1137- } elseif ($ provider instanceof ISynchronousOptionsProvider ) {
1139+ } elseif ($ provider instanceof ISynchronousOptionsAwareProvider ) {
11381140 $ options = new SynchronousProviderOptions (
11391141 $ task ->getIncludeWatermark (),
11401142 $ task ->getPreferStreaming (),
@@ -1238,7 +1240,7 @@ public function setTaskIntermediateOutput(int $id, array $output): bool {
12381240 if ($ userId !== null
12391241 && $ userId !== ''
12401242 && $ this ->appManager ->isEnabledForAnyone ('notify_push ' )
1241- && class_exists ('\OCA\NotifyPush\Queue\IQueue ' )
1243+ && interface_exists ('\OCA\NotifyPush\Queue\IQueue ' )
12421244 ) {
12431245 try {
12441246 /** @psalm-suppress UndefinedClass */
@@ -1249,14 +1251,17 @@ public function setTaskIntermediateOutput(int $id, array $output): bool {
12491251 'message ' => 'task_ ' . $ task ->getId (),
12501252 'body ' => $ output ,
12511253 ]);
1252- // we don't update the DB if something was sent via notify_push
1253- // so if the push messages are not received for some reason, the polling will still not see any intermediate output
1254- // but will receive the final output
1255- return true ;
12561254 } catch (ContainerExceptionInterface |NotFoundExceptionInterface $ e ) {
12571255 $ this ->logger ->debug ('OCA\NotifyPush\IQueue not found, not sending to queue ' );
12581256 }
12591257 }
1258+
1259+ // throttle DB update
1260+ $ now = $ this ->timeFactory ->now ()->getTimestamp ();
1261+ if ($ now - $ task ->getLastUpdated () < 2 ) {
1262+ return true ;
1263+ }
1264+
12601265 // no output shape validation for now
12611266 $ task ->setOutput ($ output );
12621267 $ taskEntity = \OC \TaskProcessing \Db \Task::fromPublicTask ($ task );
0 commit comments