resourcebase.cpp
00001 /* 00002 Copyright (c) 2006 Till Adam <adam@kde.org> 00003 Copyright (c) 2007 Volker Krause <vkrause@kde.org> 00004 00005 This library is free software; you can redistribute it and/or modify it 00006 under the terms of the GNU Library General Public License as published by 00007 the Free Software Foundation; either version 2 of the License, or (at your 00008 option) any later version. 00009 00010 This library is distributed in the hope that it will be useful, but WITHOUT 00011 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00012 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 00013 License for more details. 00014 00015 You should have received a copy of the GNU Library General Public License 00016 along with this library; see the file COPYING.LIB. If not, write to the 00017 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 00018 02110-1301, USA. 00019 */ 00020 00021 #include "resourcebase.h" 00022 #include "agentbase_p.h" 00023 00024 #include "resourceadaptor.h" 00025 #include "collectiondeletejob.h" 00026 #include "collectionsync_p.h" 00027 #include "dbusconnectionpool.h" 00028 #include "itemsync.h" 00029 #include "kdepimlibs-version.h" 00030 #include "resourcescheduler_p.h" 00031 #include "tracerinterface.h" 00032 #include "xdgbasedirs_p.h" 00033 00034 #include "changerecorder.h" 00035 #include "collectionfetchjob.h" 00036 #include "collectionfetchscope.h" 00037 #include "collectionmodifyjob.h" 00038 #include "invalidatecachejob_p.h" 00039 #include "itemfetchjob.h" 00040 #include "itemfetchscope.h" 00041 #include "itemmodifyjob.h" 00042 #include "itemmodifyjob_p.h" 00043 #include "session.h" 00044 #include "resourceselectjob_p.h" 00045 #include "monitor_p.h" 00046 #include "servermanager_p.h" 00047 00048 #include <kaboutdata.h> 00049 #include <kcmdlineargs.h> 00050 #include <kdebug.h> 00051 #include <klocale.h> 00052 00053 #include <QtCore/QDebug> 00054 #include <QtCore/QDir> 00055 #include <QtCore/QHash> 00056 #include <QtCore/QSettings> 00057 #include <QtCore/QTimer> 00058 #include <QtGui/QApplication> 00059 #include <QtDBus/QtDBus> 00060 00061 using namespace Akonadi; 00062 00063 class Akonadi::ResourceBasePrivate : public AgentBasePrivate 00064 { 00065 Q_OBJECT 00066 Q_CLASSINFO( "D-Bus Interface", "org.kde.dfaure" ) 00067 00068 public: 00069 ResourceBasePrivate( ResourceBase *parent ) 00070 : AgentBasePrivate( parent ), 00071 scheduler( 0 ), 00072 mItemSyncer( 0 ), 00073 mItemSyncFetchScope( 0 ), 00074 mItemTransactionMode( ItemSync::SingleTransaction ), 00075 mCollectionSyncer( 0 ), 00076 mHierarchicalRid( false ), 00077 mUnemittedProgress( 0 ), 00078 mAutomaticProgressReporting( true ) 00079 { 00080 Internal::setClientType( Internal::Resource ); 00081 mStatusMessage = defaultReadyMessage(); 00082 mProgressEmissionCompressor.setInterval( 1000 ); 00083 mProgressEmissionCompressor.setSingleShot( true ); 00084 } 00085 00086 ~ResourceBasePrivate() 00087 { 00088 delete mItemSyncFetchScope; 00089 } 00090 00091 Q_DECLARE_PUBLIC( ResourceBase ) 00092 00093 void delayedInit() 00094 { 00095 if ( !DBusConnectionPool::threadConnection().registerService( QLatin1String( "org.freedesktop.Akonadi.Resource." ) + mId ) ) { 00096 QString reason = DBusConnectionPool::threadConnection().lastError().message(); 00097 if ( reason.isEmpty() ) { 00098 reason = QString::fromLatin1( "this service is probably running already." ); 00099 } 00100 kError() << "Unable to register service at D-Bus: " << reason; 00101 00102 if ( QThread::currentThread() == QCoreApplication::instance()->thread() ) 00103 QCoreApplication::instance()->exit(1); 00104 00105 } else { 00106 AgentBasePrivate::delayedInit(); 00107 } 00108 } 00109 00110 virtual void changeProcessed() 00111 { 00112 mChangeRecorder->changeProcessed(); 00113 if ( !mChangeRecorder->isEmpty() ) 00114 scheduler->scheduleChangeReplay(); 00115 scheduler->taskDone(); 00116 } 00117 00118 void slotAbortRequested(); 00119 00120 void slotDeliveryDone( KJob* job ); 00121 void slotCollectionSyncDone( KJob *job ); 00122 void slotLocalListDone( KJob *job ); 00123 void slotSynchronizeCollection( const Collection &col ); 00124 void slotCollectionListDone( KJob *job ); 00125 void slotSynchronizeCollectionAttributes( const Collection &col ); 00126 void slotCollectionListForAttributesDone( KJob *job ); 00127 void slotCollectionAttributesSyncDone( KJob *job ); 00128 00129 void slotItemSyncDone( KJob *job ); 00130 00131 void slotPercent( KJob* job, unsigned long percent ); 00132 void slotDelayedEmitProgress(); 00133 void slotDeleteResourceCollection(); 00134 void slotDeleteResourceCollectionDone( KJob *job ); 00135 void slotCollectionDeletionDone( KJob *job ); 00136 00137 void slotInvalidateCache( const Akonadi::Collection &collection ); 00138 00139 void slotPrepareItemRetrieval( const Akonadi::Item &item ); 00140 void slotPrepareItemRetrievalResult( KJob* job ); 00141 00142 void changeCommittedResult( KJob* job ); 00143 00144 void slotSessionReconnected() 00145 { 00146 Q_Q( ResourceBase ); 00147 00148 new ResourceSelectJob( q->identifier() ); 00149 } 00150 00151 void createItemSyncInstanceIfMissing() 00152 { 00153 Q_Q( ResourceBase ); 00154 if ( scheduler->currentTask().type == ResourceScheduler::SyncCollection ) { 00155 if ( !mItemSyncer ) { 00156 mItemSyncer = new ItemSync( q->currentCollection() ); 00157 mItemSyncer->setTransactionMode( mItemTransactionMode ); 00158 if ( mItemSyncFetchScope ) { 00159 mItemSyncer->setFetchScope( *mItemSyncFetchScope ); 00160 } 00161 mItemSyncer->setProperty( "collection", QVariant::fromValue( q->currentCollection() ) ); 00162 connect( mItemSyncer, SIGNAL(percent(KJob*,ulong)), q, SLOT(slotPercent(KJob*,ulong)) ); 00163 connect( mItemSyncer, SIGNAL(result(KJob*)), q, SLOT(slotItemSyncDone(KJob*)) ); 00164 } 00165 Q_ASSERT( mItemSyncer ); 00166 } else { 00167 kWarning() << "Calling items retrieval methods although no item retrieval is in progress"; 00168 } 00169 } 00170 00171 public Q_SLOTS: 00172 Q_SCRIPTABLE QString dumpToString() 00173 { 00174 return scheduler->dumpToString(); 00175 } 00176 00177 Q_SCRIPTABLE void dump() 00178 { 00179 scheduler->dump(); 00180 } 00181 00182 Q_SCRIPTABLE void clear() 00183 { 00184 scheduler->clear(); 00185 } 00186 00187 protected Q_SLOTS: 00188 // reimplementations from AgentbBasePrivate, containing sanity checks that only apply to resources 00189 // such as making sure that RIDs are present as well as translations of cross-resource moves 00190 // TODO: we could possibly add recovery code for no-RID notifications by re-enquing those to the change recorder 00191 // as the corresponding Add notifications, although that contains a risk of endless fail/retry loops 00192 00193 void itemAdded(const Akonadi::Item& item, const Akonadi::Collection& collection) 00194 { 00195 if ( collection.remoteId().isEmpty() ) { 00196 changeProcessed(); 00197 return; 00198 } 00199 AgentBasePrivate::itemAdded( item, collection ); 00200 } 00201 00202 void itemChanged(const Akonadi::Item& item, const QSet< QByteArray >& partIdentifiers) 00203 { 00204 if ( item.remoteId().isEmpty() ) { 00205 changeProcessed(); 00206 return; 00207 } 00208 AgentBasePrivate::itemChanged( item, partIdentifiers ); 00209 } 00210 00211 // TODO move the move translation code from AgebtBasePrivate here, it's wrong for agents 00212 void itemMoved(const Akonadi::Item &item, const Akonadi::Collection &source, const Akonadi::Collection &destination) 00213 { 00214 if ( item.remoteId().isEmpty() || destination.remoteId().isEmpty() || destination == source ) { 00215 changeProcessed(); 00216 return; 00217 } 00218 AgentBasePrivate::itemMoved( item, source, destination ); 00219 } 00220 00221 void itemRemoved(const Akonadi::Item& item) 00222 { 00223 if ( item.remoteId().isEmpty() ) { 00224 changeProcessed(); 00225 return; 00226 } 00227 AgentBasePrivate::itemRemoved( item ); 00228 } 00229 00230 void collectionAdded(const Akonadi::Collection& collection, const Akonadi::Collection& parent) 00231 { 00232 if ( parent.remoteId().isEmpty() ) { 00233 changeProcessed(); 00234 return; 00235 } 00236 AgentBasePrivate::collectionAdded( collection, parent ); 00237 } 00238 00239 void collectionChanged(const Akonadi::Collection& collection) 00240 { 00241 if ( collection.remoteId().isEmpty() ) { 00242 changeProcessed(); 00243 return; 00244 } 00245 AgentBasePrivate::collectionChanged( collection ); 00246 } 00247 00248 void collectionChanged(const Akonadi::Collection& collection, const QSet< QByteArray >& partIdentifiers) 00249 { 00250 if ( collection.remoteId().isEmpty() ) { 00251 changeProcessed(); 00252 return; 00253 } 00254 AgentBasePrivate::collectionChanged( collection, partIdentifiers ); 00255 } 00256 00257 // TODO move the move translation code from AgebtBasePrivate here, it's wrong for agents 00258 void collectionMoved(const Akonadi::Collection& collection, const Akonadi::Collection& source, const Akonadi::Collection& destination) 00259 { 00260 if ( collection.remoteId().isEmpty() || destination.remoteId().isEmpty() || source == destination ) { 00261 changeProcessed(); 00262 return; 00263 } 00264 AgentBasePrivate::collectionMoved( collection, source, destination ); 00265 } 00266 00267 void collectionRemoved(const Akonadi::Collection& collection) 00268 { 00269 if ( collection.remoteId().isEmpty() ) { 00270 changeProcessed(); 00271 return; 00272 } 00273 AgentBasePrivate::collectionRemoved( collection ); 00274 } 00275 00276 public: 00277 // synchronize states 00278 Collection currentCollection; 00279 00280 ResourceScheduler *scheduler; 00281 ItemSync *mItemSyncer; 00282 ItemFetchScope *mItemSyncFetchScope; 00283 ItemSync::TransactionMode mItemTransactionMode; 00284 CollectionSync *mCollectionSyncer; 00285 bool mHierarchicalRid; 00286 QTimer mProgressEmissionCompressor; 00287 int mUnemittedProgress; 00288 QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus; 00289 bool mAutomaticProgressReporting; 00290 }; 00291 00292 ResourceBase::ResourceBase( const QString & id ) 00293 : AgentBase( new ResourceBasePrivate( this ), id ) 00294 { 00295 Q_D( ResourceBase ); 00296 00297 new Akonadi__ResourceAdaptor( this ); 00298 00299 d->scheduler = new ResourceScheduler( this ); 00300 00301 d->mChangeRecorder->setChangeRecordingEnabled( true ); 00302 connect( d->mChangeRecorder, SIGNAL(changesAdded()), 00303 d->scheduler, SLOT(scheduleChangeReplay()) ); 00304 00305 d->mChangeRecorder->setResourceMonitored( d->mId.toLatin1() ); 00306 d->mChangeRecorder->fetchCollection( true ); 00307 00308 connect( d->scheduler, SIGNAL(executeFullSync()), 00309 SLOT(retrieveCollections()) ); 00310 connect( d->scheduler, SIGNAL(executeCollectionTreeSync()), 00311 SLOT(retrieveCollections()) ); 00312 connect( d->scheduler, SIGNAL(executeCollectionSync(Akonadi::Collection)), 00313 SLOT(slotSynchronizeCollection(Akonadi::Collection)) ); 00314 connect( d->scheduler, SIGNAL(executeCollectionAttributesSync(Akonadi::Collection)), 00315 SLOT(slotSynchronizeCollectionAttributes(Akonadi::Collection)) ); 00316 connect( d->scheduler, SIGNAL(executeItemFetch(Akonadi::Item,QSet<QByteArray>)), 00317 SLOT(slotPrepareItemRetrieval(Akonadi::Item)) ); 00318 connect( d->scheduler, SIGNAL(executeResourceCollectionDeletion()), 00319 SLOT(slotDeleteResourceCollection()) ); 00320 connect ( d->scheduler, SIGNAL(executeCacheInvalidation(Akonadi::Collection)), 00321 SLOT(slotInvalidateCache(Akonadi::Collection)) ); 00322 connect( d->scheduler, SIGNAL(status(int,QString)), 00323 SIGNAL(status(int,QString)) ); 00324 connect( d->scheduler, SIGNAL(executeChangeReplay()), 00325 d->mChangeRecorder, SLOT(replayNext()) ); 00326 connect( d->scheduler, SIGNAL(fullSyncComplete()), SIGNAL(synchronized()) ); 00327 connect( d->scheduler, SIGNAL(collectionTreeSyncComplete()), SIGNAL(collectionTreeSynchronized()) ); 00328 connect( d->mChangeRecorder, SIGNAL(nothingToReplay()), d->scheduler, SLOT(taskDone()) ); 00329 connect( d->mChangeRecorder, SIGNAL(collectionRemoved(Akonadi::Collection)), 00330 d->scheduler, SLOT(collectionRemoved(Akonadi::Collection)) ); 00331 connect( this, SIGNAL(abortRequested()), this, SLOT(slotAbortRequested()) ); 00332 connect( this, SIGNAL(synchronized()), d->scheduler, SLOT(taskDone()) ); 00333 connect( this, SIGNAL(collectionTreeSynchronized()), d->scheduler, SLOT(taskDone()) ); 00334 connect( this, SIGNAL(agentNameChanged(QString)), 00335 this, SIGNAL(nameChanged(QString)) ); 00336 00337 connect( &d->mProgressEmissionCompressor, SIGNAL(timeout()), 00338 this, SLOT(slotDelayedEmitProgress()) ); 00339 00340 d->scheduler->setOnline( d->mOnline ); 00341 if ( !d->mChangeRecorder->isEmpty() ) 00342 d->scheduler->scheduleChangeReplay(); 00343 00344 DBusConnectionPool::threadConnection().registerObject( QLatin1String( "/Debug" ), d, QDBusConnection::ExportScriptableSlots ); 00345 00346 new ResourceSelectJob( identifier() ); 00347 00348 connect( d->mChangeRecorder->session(), SIGNAL(reconnected()), SLOT(slotSessionReconnected()) ); 00349 } 00350 00351 ResourceBase::~ResourceBase() 00352 { 00353 } 00354 00355 void ResourceBase::synchronize() 00356 { 00357 d_func()->scheduler->scheduleFullSync(); 00358 } 00359 00360 void ResourceBase::setName( const QString &name ) 00361 { 00362 AgentBase::setAgentName( name ); 00363 } 00364 00365 QString ResourceBase::name() const 00366 { 00367 return AgentBase::agentName(); 00368 } 00369 00370 QString ResourceBase::parseArguments( int argc, char **argv ) 00371 { 00372 QString identifier; 00373 if ( argc < 3 ) { 00374 kDebug() << "Not enough arguments passed..."; 00375 exit( 1 ); 00376 } 00377 00378 for ( int i = 1; i < argc - 1; ++i ) { 00379 if ( QLatin1String( argv[ i ] ) == QLatin1String( "--identifier" ) ) 00380 identifier = QLatin1String( argv[ i + 1 ] ); 00381 } 00382 00383 if ( identifier.isEmpty() ) { 00384 kDebug() << "Identifier argument missing"; 00385 exit( 1 ); 00386 } 00387 00388 const QFileInfo fi( QString::fromLocal8Bit( argv[0] ) ); 00389 // strip off full path and possible .exe suffix 00390 const QByteArray catalog = fi.baseName().toLatin1(); 00391 00392 KCmdLineArgs::init( argc, argv, identifier.toLatin1(), catalog, 00393 ki18nc( "@title application name", "Akonadi Resource" ), KDEPIMLIBS_VERSION, 00394 ki18nc( "@title application description", "Akonadi Resource" ) ); 00395 00396 KCmdLineOptions options; 00397 options.add( "identifier <argument>", 00398 ki18nc( "@label commandline option", "Resource identifier" ) ); 00399 KCmdLineArgs::addCmdLineOptions( options ); 00400 00401 return identifier; 00402 } 00403 00404 int ResourceBase::init( ResourceBase *r ) 00405 { 00406 QApplication::setQuitOnLastWindowClosed( false ); 00407 KGlobal::locale()->insertCatalog( QLatin1String( "libakonadi" ) ); 00408 int rv = kapp->exec(); 00409 delete r; 00410 return rv; 00411 } 00412 00413 void ResourceBasePrivate::slotAbortRequested() 00414 { 00415 Q_Q( ResourceBase ); 00416 00417 scheduler->cancelQueues(); 00418 QMetaObject::invokeMethod( q, "abortActivity" ); 00419 } 00420 00421 void ResourceBase::itemRetrieved( const Item &item ) 00422 { 00423 Q_D( ResourceBase ); 00424 Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::FetchItem ); 00425 if ( !item.isValid() ) { 00426 d->scheduler->currentTask().sendDBusReplies( false ); 00427 d->scheduler->taskDone(); 00428 return; 00429 } 00430 00431 Item i( item ); 00432 QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts; 00433 foreach ( const QByteArray &part, requestedParts ) { 00434 if ( !item.loadedPayloadParts().contains( part ) ) { 00435 kWarning() << "Item does not provide part" << part; 00436 } 00437 } 00438 00439 ItemModifyJob *job = new ItemModifyJob( i ); 00440 // FIXME: remove once the item with which we call retrieveItem() has a revision number 00441 job->disableRevisionCheck(); 00442 connect( job, SIGNAL(result(KJob*)), SLOT(slotDeliveryDone(KJob*)) ); 00443 } 00444 00445 void ResourceBasePrivate::slotDeliveryDone(KJob * job) 00446 { 00447 Q_Q( ResourceBase ); 00448 Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::FetchItem ); 00449 if ( job->error() ) { 00450 emit q->error( QLatin1String( "Error while creating item: " ) + job->errorString() ); 00451 } 00452 scheduler->currentTask().sendDBusReplies( !job->error() ); 00453 scheduler->taskDone(); 00454 } 00455 00456 void ResourceBase::collectionAttributesRetrieved( const Collection &collection ) 00457 { 00458 Q_D( ResourceBase ); 00459 Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes ); 00460 if ( !collection.isValid() ) { 00461 emit attributesSynchronized( d->scheduler->currentTask().collection.id() ); 00462 d->scheduler->taskDone(); 00463 return; 00464 } 00465 00466 CollectionModifyJob *job = new CollectionModifyJob( collection ); 00467 connect( job, SIGNAL(result(KJob*)), SLOT(slotCollectionAttributesSyncDone(KJob*)) ); 00468 } 00469 00470 void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob * job) 00471 { 00472 Q_Q( ResourceBase ); 00473 Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes ); 00474 if ( job->error() ) { 00475 emit q->error( QLatin1String( "Error while updating collection: " ) + job->errorString() ); 00476 } 00477 emit q->attributesSynchronized( scheduler->currentTask().collection.id() ); 00478 scheduler->taskDone(); 00479 } 00480 00481 void ResourceBasePrivate::slotDeleteResourceCollection() 00482 { 00483 Q_Q( ResourceBase ); 00484 00485 CollectionFetchJob *job = new CollectionFetchJob( Collection::root(), CollectionFetchJob::FirstLevel ); 00486 job->fetchScope().setResource( q->identifier() ); 00487 connect( job, SIGNAL(result(KJob*)), q, SLOT(slotDeleteResourceCollectionDone(KJob*)) ); 00488 } 00489 00490 void ResourceBasePrivate::slotDeleteResourceCollectionDone( KJob *job ) 00491 { 00492 Q_Q( ResourceBase ); 00493 if ( job->error() ) { 00494 emit q->error( job->errorString() ); 00495 scheduler->taskDone(); 00496 } else { 00497 const CollectionFetchJob *fetchJob = static_cast<const CollectionFetchJob*>( job ); 00498 00499 if ( !fetchJob->collections().isEmpty() ) { 00500 CollectionDeleteJob *job = new CollectionDeleteJob( fetchJob->collections().first() ); 00501 connect( job, SIGNAL(result(KJob*)), q, SLOT(slotCollectionDeletionDone(KJob*)) ); 00502 } else { 00503 // there is no resource collection, so just ignore the request 00504 scheduler->taskDone(); 00505 } 00506 } 00507 } 00508 00509 void ResourceBasePrivate::slotCollectionDeletionDone( KJob *job ) 00510 { 00511 Q_Q( ResourceBase ); 00512 if ( job->error() ) { 00513 emit q->error( job->errorString() ); 00514 } 00515 00516 scheduler->taskDone(); 00517 } 00518 00519 void ResourceBasePrivate::slotInvalidateCache( const Akonadi::Collection &collection ) 00520 { 00521 Q_Q( ResourceBase ); 00522 InvalidateCacheJob *job = new InvalidateCacheJob( collection, q ); 00523 connect( job, SIGNAL(result(KJob*)), scheduler, SLOT(taskDone()) ); 00524 } 00525 00526 void ResourceBase::changeCommitted( const Item& item ) 00527 { 00528 Q_D( ResourceBase ); 00529 ItemModifyJob *job = new ItemModifyJob( item ); 00530 job->d_func()->setClean(); 00531 job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the error? 00532 job->setIgnorePayload( true ); // we only want to reset the dirty flag and update the remote id 00533 d->changeProcessed(); 00534 } 00535 00536 void ResourceBase::changeCommitted( const Collection &collection ) 00537 { 00538 CollectionModifyJob *job = new CollectionModifyJob( collection ); 00539 connect( job, SIGNAL(result(KJob*)), SLOT(changeCommittedResult(KJob*)) ); 00540 } 00541 00542 void ResourceBasePrivate::changeCommittedResult( KJob *job ) 00543 { 00544 Q_Q( ResourceBase ); 00545 if ( job->error() ) 00546 emit q->error( i18nc( "@info", "Updating local collection failed: %1.", job->errorText() ) ); 00547 mChangeRecorder->d_ptr->invalidateCache( static_cast<CollectionModifyJob*>( job )->collection() ); 00548 changeProcessed(); 00549 } 00550 00551 bool ResourceBase::requestItemDelivery( qint64 uid, const QString & remoteId, 00552 const QString &mimeType, const QStringList &_parts ) 00553 { 00554 Q_D( ResourceBase ); 00555 if ( !isOnline() ) { 00556 emit error( i18nc( "@info", "Cannot fetch item in offline mode." ) ); 00557 return false; 00558 } 00559 00560 setDelayedReply( true ); 00561 // FIXME: we need at least the revision number too 00562 Item item( uid ); 00563 item.setMimeType( mimeType ); 00564 item.setRemoteId( remoteId ); 00565 00566 QSet<QByteArray> parts; 00567 Q_FOREACH( const QString &str, _parts ) 00568 parts.insert( str.toLatin1() ); 00569 00570 d->scheduler->scheduleItemFetch( item, parts, message().createReply() ); 00571 00572 return true; 00573 } 00574 00575 void ResourceBase::collectionsRetrieved( const Collection::List & collections ) 00576 { 00577 Q_D( ResourceBase ); 00578 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00579 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00580 "ResourceBase::collectionsRetrieved()", 00581 "Calling collectionsRetrieved() although no collection retrieval is in progress" ); 00582 if ( !d->mCollectionSyncer ) { 00583 d->mCollectionSyncer = new CollectionSync( identifier() ); 00584 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); 00585 connect( d->mCollectionSyncer, SIGNAL(percent(KJob*,ulong)), SLOT(slotPercent(KJob*,ulong)) ); 00586 connect( d->mCollectionSyncer, SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)) ); 00587 } 00588 d->mCollectionSyncer->setRemoteCollections( collections ); 00589 } 00590 00591 void ResourceBase::collectionsRetrievedIncremental( const Collection::List & changedCollections, 00592 const Collection::List & removedCollections ) 00593 { 00594 Q_D( ResourceBase ); 00595 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00596 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00597 "ResourceBase::collectionsRetrievedIncremental()", 00598 "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress" ); 00599 if ( !d->mCollectionSyncer ) { 00600 d->mCollectionSyncer = new CollectionSync( identifier() ); 00601 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); 00602 connect( d->mCollectionSyncer, SIGNAL(percent(KJob*,ulong)), SLOT(slotPercent(KJob*,ulong)) ); 00603 connect( d->mCollectionSyncer, SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)) ); 00604 } 00605 d->mCollectionSyncer->setRemoteCollections( changedCollections, removedCollections ); 00606 } 00607 00608 void ResourceBase::setCollectionStreamingEnabled( bool enable ) 00609 { 00610 Q_D( ResourceBase ); 00611 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00612 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00613 "ResourceBase::setCollectionStreamingEnabled()", 00614 "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress" ); 00615 if ( !d->mCollectionSyncer ) { 00616 d->mCollectionSyncer = new CollectionSync( identifier() ); 00617 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); 00618 connect( d->mCollectionSyncer, SIGNAL(percent(KJob*,ulong)), SLOT(slotPercent(KJob*,ulong)) ); 00619 connect( d->mCollectionSyncer, SIGNAL(result(KJob*)), SLOT(slotCollectionSyncDone(KJob*)) ); 00620 } 00621 d->mCollectionSyncer->setStreamingEnabled( enable ); 00622 } 00623 00624 void ResourceBase::collectionsRetrievalDone() 00625 { 00626 Q_D( ResourceBase ); 00627 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || 00628 d->scheduler->currentTask().type == ResourceScheduler::SyncAll, 00629 "ResourceBase::collectionsRetrievalDone()", 00630 "Calling collectionsRetrievalDone() although no collection retrieval is in progress" ); 00631 // streaming enabled, so finalize the sync 00632 if ( d->mCollectionSyncer ) { 00633 d->mCollectionSyncer->retrievalDone(); 00634 } 00635 // user did the sync himself, we are done now 00636 else { 00637 // FIXME: we need the same special case for SyncAll as in slotCollectionSyncDone here! 00638 d->scheduler->taskDone(); 00639 } 00640 } 00641 00642 void ResourceBasePrivate::slotCollectionSyncDone( KJob * job ) 00643 { 00644 Q_Q( ResourceBase ); 00645 mCollectionSyncer = 0; 00646 if ( job->error() ) { 00647 if ( job->error() != Job::UserCanceled ) 00648 emit q->error( job->errorString() ); 00649 } else { 00650 if ( scheduler->currentTask().type == ResourceScheduler::SyncAll ) { 00651 CollectionFetchJob *list = new CollectionFetchJob( Collection::root(), CollectionFetchJob::Recursive ); 00652 list->setFetchScope( q->changeRecorder()->collectionFetchScope() ); 00653 list->fetchScope().setResource( mId ); 00654 q->connect( list, SIGNAL(result(KJob*)), q, SLOT(slotLocalListDone(KJob*)) ); 00655 return; 00656 } else if ( scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ) { 00657 scheduler->scheduleCollectionTreeSyncCompletion(); 00658 } 00659 } 00660 scheduler->taskDone(); 00661 } 00662 00663 void ResourceBasePrivate::slotLocalListDone( KJob * job ) 00664 { 00665 Q_Q( ResourceBase ); 00666 if ( job->error() ) { 00667 emit q->error( job->errorString() ); 00668 } else { 00669 Collection::List cols = static_cast<CollectionFetchJob*>( job )->collections(); 00670 foreach ( const Collection &col, cols ) { 00671 scheduler->scheduleSync( col ); 00672 } 00673 scheduler->scheduleFullSyncCompletion(); 00674 } 00675 scheduler->taskDone(); 00676 } 00677 00678 void ResourceBasePrivate::slotSynchronizeCollection( const Collection &col ) 00679 { 00680 Q_Q( ResourceBase ); 00681 currentCollection = col; 00682 // check if this collection actually can contain anything 00683 QStringList contentTypes = currentCollection.contentMimeTypes(); 00684 contentTypes.removeAll( Collection::mimeType() ); 00685 if ( !contentTypes.isEmpty() || (col.rights() & (Collection::CanLinkItem)) ) { // HACK to check for virtual collections 00686 if ( mAutomaticProgressReporting ) { 00687 emit q->status( AgentBase::Running, i18nc( "@info:status", "Syncing folder '%1'", currentCollection.name() ) ); 00688 } 00689 q->retrieveItems( currentCollection ); 00690 return; 00691 } 00692 scheduler->taskDone(); 00693 } 00694 00695 void ResourceBasePrivate::slotSynchronizeCollectionAttributes( const Collection &col ) 00696 { 00697 Q_Q( ResourceBase ); 00698 QMetaObject::invokeMethod( q, "retrieveCollectionAttributes", Q_ARG( Akonadi::Collection, col ) ); 00699 } 00700 00701 void ResourceBasePrivate::slotPrepareItemRetrieval( const Akonadi::Item &item ) 00702 { 00703 Q_Q( ResourceBase ); 00704 ItemFetchJob *fetch = new ItemFetchJob( item, this ); 00705 fetch->fetchScope().setAncestorRetrieval( q->changeRecorder()->itemFetchScope().ancestorRetrieval() ); 00706 fetch->fetchScope().setCacheOnly( true ); 00707 00708 // copy list of attributes to fetch 00709 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes(); 00710 foreach ( const QByteArray &attribute, attributes ) 00711 fetch->fetchScope().fetchAttribute( attribute ); 00712 00713 q->connect( fetch, SIGNAL(result(KJob*)), SLOT(slotPrepareItemRetrievalResult(KJob*)) ); 00714 } 00715 00716 void ResourceBasePrivate::slotPrepareItemRetrievalResult( KJob* job ) 00717 { 00718 Q_Q( ResourceBase ); 00719 Q_ASSERT_X( scheduler->currentTask().type == ResourceScheduler::FetchItem, 00720 "ResourceBasePrivate::slotPrepareItemRetrievalResult()", 00721 "Preparing item retrieval although no item retrieval is in progress" ); 00722 if ( job->error() ) { 00723 q->cancelTask( job->errorText() ); 00724 return; 00725 } 00726 ItemFetchJob *fetch = qobject_cast<ItemFetchJob*>( job ); 00727 if ( fetch->items().count() != 1 ) { 00728 q->cancelTask( i18n( "The requested item no longer exists" ) ); 00729 return; 00730 } 00731 const Item item = fetch->items().first(); 00732 const QSet<QByteArray> parts = scheduler->currentTask().itemParts; 00733 if ( !q->retrieveItem( item, parts ) ) 00734 q->cancelTask(); 00735 } 00736 00737 void ResourceBase::itemsRetrievalDone() 00738 { 00739 Q_D( ResourceBase ); 00740 // streaming enabled, so finalize the sync 00741 if ( d->mItemSyncer ) { 00742 d->mItemSyncer->deliveryDone(); 00743 } 00744 // user did the sync himself, we are done now 00745 else { 00746 d->scheduler->taskDone(); 00747 } 00748 } 00749 00750 void ResourceBase::clearCache() 00751 { 00752 Q_D( ResourceBase ); 00753 d->scheduler->scheduleResourceCollectionDeletion(); 00754 } 00755 00756 void ResourceBase::invalidateCache(const Collection& collection) 00757 { 00758 Q_D( ResourceBase ); 00759 d->scheduler->scheduleCacheInvalidation( collection ); 00760 } 00761 00762 Collection ResourceBase::currentCollection() const 00763 { 00764 Q_D( const ResourceBase ); 00765 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection , 00766 "ResourceBase::currentCollection()", 00767 "Trying to access current collection although no item retrieval is in progress" ); 00768 return d->currentCollection; 00769 } 00770 00771 Item ResourceBase::currentItem() const 00772 { 00773 Q_D( const ResourceBase ); 00774 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::FetchItem , 00775 "ResourceBase::currentItem()", 00776 "Trying to access current item although no item retrieval is in progress" ); 00777 return d->scheduler->currentTask().item; 00778 } 00779 00780 void ResourceBase::synchronizeCollectionTree() 00781 { 00782 d_func()->scheduler->scheduleCollectionTreeSync(); 00783 } 00784 00785 void ResourceBase::cancelTask() 00786 { 00787 Q_D( ResourceBase ); 00788 switch ( d->scheduler->currentTask().type ) { 00789 case ResourceScheduler::FetchItem: 00790 itemRetrieved( Item() ); // sends the error reply and 00791 break; 00792 case ResourceScheduler::ChangeReplay: 00793 d->changeProcessed(); 00794 break; 00795 case ResourceScheduler::SyncCollectionTree: 00796 case ResourceScheduler::SyncAll: 00797 if ( d->mCollectionSyncer ) 00798 d->mCollectionSyncer->rollback(); 00799 else 00800 d->scheduler->taskDone(); 00801 break; 00802 case ResourceScheduler::SyncCollection: 00803 if ( d->mItemSyncer ) 00804 d->mItemSyncer->rollback(); 00805 else 00806 d->scheduler->taskDone(); 00807 break; 00808 default: 00809 d->scheduler->taskDone(); 00810 } 00811 } 00812 00813 void ResourceBase::cancelTask( const QString &msg ) 00814 { 00815 cancelTask(); 00816 00817 emit error( msg ); 00818 } 00819 00820 void ResourceBase::deferTask() 00821 { 00822 Q_D( ResourceBase ); 00823 d->scheduler->deferTask(); 00824 } 00825 00826 void ResourceBase::doSetOnline( bool state ) 00827 { 00828 d_func()->scheduler->setOnline( state ); 00829 } 00830 00831 void ResourceBase::synchronizeCollection( qint64 collectionId ) 00832 { 00833 synchronizeCollection( collectionId, false ); 00834 } 00835 00836 void ResourceBase::synchronizeCollection( qint64 collectionId, bool recursive ) 00837 { 00838 CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), recursive ? CollectionFetchJob::Recursive : CollectionFetchJob::Base ); 00839 job->setFetchScope( changeRecorder()->collectionFetchScope() ); 00840 job->fetchScope().setResource( identifier() ); 00841 job->setProperty( "recursive", recursive ); 00842 connect( job, SIGNAL(result(KJob*)), SLOT(slotCollectionListDone(KJob*)) ); 00843 } 00844 00845 void ResourceBasePrivate::slotCollectionListDone( KJob *job ) 00846 { 00847 if ( !job->error() ) { 00848 Collection::List list = static_cast<CollectionFetchJob*>( job )->collections(); 00849 if ( !list.isEmpty() ) { 00850 if ( job->property( "recursive" ).toBool() ) { 00851 Q_FOREACH ( const Collection &collection, list ) { 00852 scheduler->scheduleSync( collection ); 00853 } 00854 } else { 00855 scheduler->scheduleSync( list.first() ); 00856 } 00857 } 00858 } 00859 // TODO: error handling 00860 } 00861 00862 void ResourceBase::synchronizeCollectionAttributes( qint64 collectionId ) 00863 { 00864 CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), CollectionFetchJob::Base ); 00865 job->setFetchScope( changeRecorder()->collectionFetchScope() ); 00866 job->fetchScope().setResource( identifier() ); 00867 connect( job, SIGNAL(result(KJob*)), SLOT(slotCollectionListForAttributesDone(KJob*)) ); 00868 } 00869 00870 void ResourceBasePrivate::slotCollectionListForAttributesDone( KJob *job ) 00871 { 00872 if ( !job->error() ) { 00873 Collection::List list = static_cast<CollectionFetchJob*>( job )->collections(); 00874 if ( !list.isEmpty() ) { 00875 Collection col = list.first(); 00876 scheduler->scheduleAttributesSync( col ); 00877 } 00878 } 00879 // TODO: error handling 00880 } 00881 00882 void ResourceBase::setTotalItems( int amount ) 00883 { 00884 kDebug() << amount; 00885 Q_D( ResourceBase ); 00886 setItemStreamingEnabled( true ); 00887 d->mItemSyncer->setTotalItems( amount ); 00888 } 00889 00890 void ResourceBase::setItemStreamingEnabled( bool enable ) 00891 { 00892 Q_D( ResourceBase ); 00893 d->createItemSyncInstanceIfMissing(); 00894 d->mItemSyncer->setStreamingEnabled( enable ); 00895 } 00896 00897 void ResourceBase::itemsRetrieved( const Item::List &items ) 00898 { 00899 Q_D( ResourceBase ); 00900 d->createItemSyncInstanceIfMissing(); 00901 d->mItemSyncer->setFullSyncItems( items ); 00902 } 00903 00904 void ResourceBase::itemsRetrievedIncremental( const Item::List &changedItems, const Item::List &removedItems ) 00905 { 00906 Q_D( ResourceBase ); 00907 d->createItemSyncInstanceIfMissing(); 00908 d->mItemSyncer->setIncrementalSyncItems( changedItems, removedItems ); 00909 } 00910 00911 void ResourceBasePrivate::slotItemSyncDone( KJob *job ) 00912 { 00913 mItemSyncer = 0; 00914 Q_Q( ResourceBase ); 00915 if ( job->error() && job->error() != Job::UserCanceled ) { 00916 emit q->error( job->errorString() ); 00917 } 00918 scheduler->taskDone(); 00919 } 00920 00921 00922 void ResourceBasePrivate::slotDelayedEmitProgress() 00923 { 00924 Q_Q( ResourceBase ); 00925 if ( mAutomaticProgressReporting ) { 00926 emit q->percent( mUnemittedProgress ); 00927 00928 Q_FOREACH( const QVariantMap &statusMap, mUnemittedAdvancedStatus ) { 00929 emit q->advancedStatus( statusMap ); 00930 } 00931 } 00932 mUnemittedProgress = 0; 00933 mUnemittedAdvancedStatus.clear(); 00934 } 00935 00936 void ResourceBasePrivate::slotPercent( KJob *job, unsigned long percent ) 00937 { 00938 mUnemittedProgress = percent; 00939 00940 const Collection collection = job->property( "collection" ).value<Collection>(); 00941 if ( collection.isValid() ) { 00942 QVariantMap statusMap; 00943 statusMap.insert( QLatin1String( "key" ), QString::fromLatin1( "collectionSyncProgress" ) ); 00944 statusMap.insert( QLatin1String( "collectionId" ), collection.id() ); 00945 statusMap.insert( QLatin1String( "percent" ), static_cast<unsigned int>( percent ) ); 00946 00947 mUnemittedAdvancedStatus[collection.id()] = statusMap; 00948 } 00949 // deliver completion right away, intermediate progress at 1s intervals 00950 if ( percent == 100 ) { 00951 mProgressEmissionCompressor.stop(); 00952 slotDelayedEmitProgress(); 00953 } else if ( !mProgressEmissionCompressor.isActive() ) { 00954 mProgressEmissionCompressor.start(); 00955 } 00956 } 00957 00958 void ResourceBase::setHierarchicalRemoteIdentifiersEnabled( bool enable ) 00959 { 00960 Q_D( ResourceBase ); 00961 d->mHierarchicalRid = enable; 00962 } 00963 00964 void ResourceBase::scheduleCustomTask( QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority ) 00965 { 00966 Q_D( ResourceBase ); 00967 d->scheduler->scheduleCustomTask( receiver, method, argument, priority ); 00968 } 00969 00970 void ResourceBase::taskDone() 00971 { 00972 Q_D( ResourceBase ); 00973 d->scheduler->taskDone(); 00974 } 00975 00976 void ResourceBase::retrieveCollectionAttributes( const Collection &collection ) 00977 { 00978 collectionAttributesRetrieved( collection ); 00979 } 00980 00981 void Akonadi::ResourceBase::abortActivity() 00982 { 00983 00984 } 00985 00986 void ResourceBase::setItemTransactionMode(ItemSync::TransactionMode mode) 00987 { 00988 Q_D( ResourceBase ); 00989 d->mItemTransactionMode = mode; 00990 } 00991 00992 void ResourceBase::setItemSynchronizationFetchScope(const ItemFetchScope& fetchScope) 00993 { 00994 Q_D( ResourceBase ); 00995 if ( !d->mItemSyncFetchScope ) 00996 d->mItemSyncFetchScope = new ItemFetchScope; 00997 *(d->mItemSyncFetchScope) = fetchScope; 00998 } 00999 01000 void ResourceBase::setAutomaticProgressReporting( bool enabled ) 01001 { 01002 Q_D( ResourceBase ); 01003 d->mAutomaticProgressReporting = enabled; 01004 } 01005 01006 #include "resourcebase.moc" 01007 #include "moc_resourcebase.cpp"