• Skip to content
  • Skip to link menu
KDE 4.5 API Reference
  • KDE API Reference
  • KDE-PIM Libraries
  • Sitemap
  • Contact Us
 

akonadi

resourcescheduler.cpp

00001 /*
00002     Copyright (c) 2007 Volker Krause <vkrause@kde.org>
00003 
00004     This library is free software; you can redistribute it and/or modify it
00005     under the terms of the GNU Library General Public License as published by
00006     the Free Software Foundation; either version 2 of the License, or (at your
00007     option) any later version.
00008 
00009     This library is distributed in the hope that it will be useful, but WITHOUT
00010     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00011     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
00012     License for more details.
00013 
00014     You should have received a copy of the GNU Library General Public License
00015     along with this library; see the file COPYING.LIB.  If not, write to the
00016     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00017     02110-1301, USA.
00018 */
00019 
00020 #include "resourcescheduler_p.h"
00021 
00022 #include <kdebug.h>
00023 #include <klocale.h>
00024 
00025 #include <QtCore/QTimer>
00026 #include <QtDBus/QDBusInterface>
00027 #include <QtDBus/QDBusConnectionInterface>
00028 #include <boost/graph/graph_concepts.hpp>
00029 
00030 using namespace Akonadi;
00031 
00032 qint64 ResourceScheduler::Task::latestSerial = 0;
00033 static QDBusAbstractInterface *s_resourcetracker = 0;
00034 
00035 //@cond PRIVATE
00036 
00037 ResourceScheduler::ResourceScheduler( QObject *parent ) :
00038     QObject( parent ),
00039     mOnline( false )
00040 {
00041 }
00042 
00043 void ResourceScheduler::scheduleFullSync()
00044 {
00045   Task t;
00046   t.type = SyncAll;
00047   TaskList& queue = queueForTaskType( t.type );
00048   if ( queue.contains( t ) || mCurrentTask == t )
00049     return;
00050   queue << t;
00051   signalTaskToTracker( t, "SyncAll" );
00052   scheduleNext();
00053 }
00054 
00055 void ResourceScheduler::scheduleCollectionTreeSync()
00056 {
00057   Task t;
00058   t.type = SyncCollectionTree;
00059   TaskList& queue = queueForTaskType( t.type );
00060   if ( queue.contains( t ) || mCurrentTask == t )
00061     return;
00062   queue << t;
00063   signalTaskToTracker( t, "SyncCollectionTree" );
00064   scheduleNext();
00065 }
00066 
00067 void ResourceScheduler::scheduleSync(const Collection & col)
00068 {
00069   Task t;
00070   t.type = SyncCollection;
00071   t.collection = col;
00072   TaskList& queue = queueForTaskType( t.type );
00073   if ( queue.contains( t ) || mCurrentTask == t )
00074     return;
00075   queue << t;
00076   signalTaskToTracker( t, "SyncCollection" );
00077   scheduleNext();
00078 }
00079 
00080 void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet<QByteArray> &parts, const QDBusMessage & msg)
00081 {
00082   Task t;
00083   t.type = FetchItem;
00084   t.item = item;
00085   t.itemParts = parts;
00086 
00087   // if the current task does already fetch the requested item, break here but
00088   // keep the dbus message, so we can send the reply later on
00089   if ( mCurrentTask == t ) {
00090     mCurrentTask.dbusMsgs << msg;
00091     return;
00092   }
00093 
00094   // If this task is already in the queue, merge with it.
00095   TaskList& queue = queueForTaskType( t.type );
00096   const int idx = queue.indexOf( t );
00097   if ( idx != -1 ) {
00098     queue[ idx ].dbusMsgs << msg;
00099     return;
00100   }
00101 
00102   t.dbusMsgs << msg;
00103   queue << t;
00104   signalTaskToTracker( t, "FetchItem" );
00105   scheduleNext();
00106 }
00107 
00108 void ResourceScheduler::scheduleResourceCollectionDeletion()
00109 {
00110   Task t;
00111   t.type = DeleteResourceCollection;
00112   TaskList& queue = queueForTaskType( t.type );
00113   if ( queue.contains( t ) || mCurrentTask == t )
00114     return;
00115   queue << t;
00116   signalTaskToTracker( t, "DeleteResourceCollection" );
00117   scheduleNext();
00118 }
00119 
00120 void ResourceScheduler::scheduleChangeReplay()
00121 {
00122   Task t;
00123   t.type = ChangeReplay;
00124   TaskList& queue = queueForTaskType( t.type );
00125   // see ResourceBase::changeProcessed() for why we do not check for mCurrentTask == t here like in the other tasks
00126   if ( queue.contains( t ) )
00127     return;
00128   queue << t;
00129   signalTaskToTracker( t, "ChangeReplay" );
00130   scheduleNext();
00131 }
00132 
00133 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
00134 {
00135   Task t;
00136   t.type = SyncAllDone;
00137   TaskList& queue = queueForTaskType( t.type );
00138   // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost
00139   queue << t;
00140   signalTaskToTracker( t, "SyncAllDone" );
00141   scheduleNext();
00142 }
00143 
00144 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority )
00145 {
00146   Task t;
00147   t.type = Custom;
00148   t.receiver = receiver;
00149   t.methodName = methodName;
00150   t.argument = argument;
00151   QueueType queueType = GenericTaskQueue;
00152   if ( priority == ResourceBase::AfterChangeReplay )
00153     queueType = AfterChangeReplayQueue;
00154   TaskList& queue = mTaskList[ queueType ];
00155 
00156   if ( queue.contains( t ) )
00157     return;
00158 
00159   switch (priority) {
00160   case ResourceBase::Prepend:
00161     queue.prepend( t );
00162     break;
00163   default:
00164     queue.append(t);
00165     break;
00166   }
00167 
00168   signalTaskToTracker( t, "Custom-" + t.methodName );
00169   scheduleNext();
00170 }
00171 
00172 void ResourceScheduler::taskDone()
00173 {
00174   if ( isEmpty() )
00175     emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work", "Ready" ) );
00176 
00177   if ( s_resourcetracker ) {
00178     QList<QVariant> argumentList;
00179     argumentList << QString::number( mCurrentTask.serial )
00180                  << QString();
00181     s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00182   }
00183 
00184   mCurrentTask = Task();
00185   scheduleNext();
00186 }
00187 
00188 void ResourceScheduler::deferTask()
00189 {
00190   if ( s_resourcetracker ) {
00191     QList<QVariant> argumentList;
00192     argumentList << QString::number( mCurrentTask.serial )
00193                  << QString();
00194     s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00195   }
00196 
00197   Task t = mCurrentTask;
00198   mCurrentTask = Task();
00199   mTaskList[GenericTaskQueue] << t;
00200   signalTaskToTracker( t, "DeferedTask" );
00201 
00202   scheduleNext();
00203 }
00204 
00205 bool ResourceScheduler::isEmpty()
00206 {
00207   for ( int i = 0; i < NQueueCount; ++i ) {
00208     if ( !mTaskList[i].isEmpty() )
00209       return false;
00210   }
00211   return true;
00212 }
00213 
00214 void ResourceScheduler::scheduleNext()
00215 {
00216   if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
00217     return;
00218   QTimer::singleShot( 0, this, SLOT( executeNext() ) );
00219 }
00220 
00221 void ResourceScheduler::executeNext()
00222 {
00223   if ( mCurrentTask.type != Invalid || isEmpty() )
00224     return;
00225 
00226   for ( int i = 0; i < NQueueCount; ++i ) {
00227     if ( !mTaskList[ i ].isEmpty() ) {
00228       mCurrentTask = mTaskList[ i ].takeFirst();
00229       break;
00230     }
00231   }
00232 
00233   if ( s_resourcetracker ) {
00234     QList<QVariant> argumentList;
00235     argumentList << QString::number( mCurrentTask.serial );
00236     s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList);
00237   }
00238 
00239   switch ( mCurrentTask.type ) {
00240     case SyncAll:
00241       emit executeFullSync();
00242       break;
00243     case SyncCollectionTree:
00244       emit executeCollectionTreeSync();
00245       break;
00246     case SyncCollection:
00247       emit executeCollectionSync( mCurrentTask.collection );
00248       break;
00249     case FetchItem:
00250       emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
00251       break;
00252     case DeleteResourceCollection:
00253       emit executeResourceCollectionDeletion();
00254       break;
00255     case ChangeReplay:
00256       emit executeChangeReplay();
00257       break;
00258     case SyncAllDone:
00259       emit fullSyncComplete();
00260       break;
00261     case Custom:
00262     {
00263       bool success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) );
00264       if ( !success )
00265         success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName );
00266 
00267       if ( !success )
00268         kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument;
00269       break;
00270     }
00271     default: {
00272       kError() << "Unhandled task type" << mCurrentTask.type;
00273       dump();
00274       Q_ASSERT( false );
00275     }
00276   }
00277 }
00278 
00279 ResourceScheduler::Task ResourceScheduler::currentTask() const
00280 {
00281   return mCurrentTask;
00282 }
00283 
00284 void ResourceScheduler::setOnline(bool state)
00285 {
00286   if ( mOnline == state )
00287     return;
00288   mOnline = state;
00289   if ( mOnline ) {
00290     scheduleNext();
00291   } else {
00292     if ( mCurrentTask.type != Invalid ) {
00293       // abort running task
00294       queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
00295       mCurrentTask = Task();
00296     }
00297     // abort pending synchronous tasks, might take longer until the resource goes online again
00298     TaskList& itemFetchQueue = queueForTaskType( FetchItem );
00299     for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) {
00300       if ( (*it).type == FetchItem ) {
00301         (*it).sendDBusReplies( false );
00302         it = itemFetchQueue.erase( it );
00303         if ( s_resourcetracker ) {
00304           QList<QVariant> argumentList;
00305           argumentList << QString::number( mCurrentTask.serial )
00306                        << QLatin1String( "Job canceled." );
00307           s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList );
00308         }
00309       } else {
00310         ++it;
00311       }
00312     }
00313   }
00314 }
00315 
00316 void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType )
00317 {
00318   // if there's a job tracer running, tell it about the new job
00319   if ( !s_resourcetracker && QDBusConnection::sessionBus().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) {
00320     s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ),
00321                                        QLatin1String( "/resourcesJobtracker" ),
00322                                        QLatin1String( "org.freedesktop.Akonadi.JobTracker" ),
00323                                        QDBusConnection::sessionBus(), 0 );
00324   }
00325 
00326   if ( s_resourcetracker ) {
00327     QList<QVariant> argumentList;
00328     argumentList << static_cast<AgentBase*>(  parent() )->identifier()
00329                  << QString::number( task.serial )
00330                  << QString()
00331                  << QString::fromLatin1( taskType );
00332     s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList);
00333   }
00334 }
00335 
00336 void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection )
00337 {
00338   if ( !collection.isValid() ) // should not happen, but you never know...
00339     return;
00340   TaskList& queue = queueForTaskType( SyncCollection );
00341   for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) {
00342     if ( (*it).type == SyncCollection && (*it).collection == collection ) {
00343       it = queue.erase( it );
00344       kDebug() << " erasing";
00345     } else
00346       ++it;
00347   }
00348 }
00349 
00350 void ResourceScheduler::Task::sendDBusReplies( bool success )
00351 {
00352   Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) {
00353     QDBusMessage reply( msg );
00354     reply << success;
00355     QDBusConnection::sessionBus().send( reply );
00356   }
00357 }
00358 
00359 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
00360 {
00361   switch( type ) {
00362   case ChangeReplay:
00363     return ChangeReplayQueue;
00364   case FetchItem:
00365     return ItemFetchQueue;
00366   default:
00367     return GenericTaskQueue;
00368   }
00369 }
00370 
00371 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
00372 {
00373   const QueueType qt = queueTypeForTaskType( type );
00374   return mTaskList[ qt ];
00375 }
00376 
00377 void ResourceScheduler::dump()
00378 {
00379   kDebug() << "ResourceScheduler: Online:" << mOnline;
00380   kDebug() << " current task:" << mCurrentTask;
00381   for ( int i = 0; i < NQueueCount; ++i ) {
00382     const TaskList& queue = mTaskList[i];
00383     kDebug() << " queue" << i << queue.size() << "tasks:";
00384     for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) {
00385       kDebug() << "  " << (*it);
00386     }
00387   }
00388 }
00389 
00390 void ResourceScheduler::clear()
00391 {
00392   kDebug() << "Clearing ResourceScheduler queues:";
00393   for ( int i = 0; i < NQueueCount; ++i ) {
00394     TaskList& queue = mTaskList[i];
00395     queue.clear();
00396   }
00397   mCurrentTask = Task();
00398 }
00399 
00400 static const char s_taskTypes[][25] = {
00401       "Invalid",
00402       "SyncAll",
00403       "SyncCollectionTree",
00404       "SyncCollection",
00405       "FetchItem",
00406       "ChangeReplay",
00407       "DeleteResourceCollection",
00408       "SyncAllDone",
00409       "Custom"
00410 };
00411 
00412 QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task )
00413 {
00414   d << task.serial << s_taskTypes[task.type];
00415   if ( task.type != ResourceScheduler::Invalid ) {
00416     if ( task.collection.id() != -1 )
00417       d << "collection" << task.collection.id();
00418     if ( task.item.id() != -1 )
00419       d << "item" << task.item.id();
00420     if ( !task.methodName.isEmpty() )
00421       d << task.methodName << task.argument;
00422   }
00423   return d;
00424 }
00425 
00426 //@endcond
00427 
00428 #include "resourcescheduler_p.moc"

akonadi

Skip menu "akonadi"
  • Main Page
  • Modules
  • Namespace List
  • Class Hierarchy
  • Alphabetical List
  • Class List
  • File List
  • Namespace Members
  • Class Members
  • Related Pages

KDE-PIM Libraries

Skip menu "KDE-PIM Libraries"
  • akonadi
  •   contact
  •   kmime
  • kabc
  • kblog
  • kcal
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  •   richtextbuilders
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Generated for KDE-PIM Libraries by doxygen 1.7.1
This website is maintained by Adriaan de Groot and Allen Winter.
KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal