itemsync.cpp
00001 /* 00002 Copyright (c) 2007 Tobias Koenig <tokoe@kde.org> 00003 Copyright (c) 2007 Volker Krause <vkrause@kde.org> 00004 00005 This library is free software; you can redistribute it and/or modify it 00006 under the terms of the GNU Library General Public License as published by 00007 the Free Software Foundation; either version 2 of the License, or (at your 00008 option) any later version. 00009 00010 This library is distributed in the hope that it will be useful, but WITHOUT 00011 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00012 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 00013 License for more details. 00014 00015 You should have received a copy of the GNU Library General Public License 00016 along with this library; see the file COPYING.LIB. If not, write to the 00017 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 00018 02110-1301, USA. 00019 */ 00020 00021 #include "itemsync.h" 00022 00023 #include "collection.h" 00024 #include "item.h" 00025 #include "item_p.h" 00026 #include "itemcreatejob.h" 00027 #include "itemdeletejob.h" 00028 #include "itemfetchjob.h" 00029 #include "itemmodifyjob.h" 00030 #include "transactionsequence.h" 00031 #include "itemfetchscope.h" 00032 00033 #include <kdebug.h> 00034 00035 #include <QtCore/QStringList> 00036 00037 using namespace Akonadi; 00038 00042 class ItemSync::Private 00043 { 00044 public: 00045 Private( ItemSync *parent ) : 00046 q( parent ), 00047 mTransactionMode( SingleTransaction ), 00048 mCurrentTransaction( 0 ), 00049 mTransactionJobs( 0 ), 00050 mPendingJobs( 0 ), 00051 mProgress( 0 ), 00052 mTotalItems( -1 ), 00053 mTotalItemsProcessed( 0 ), 00054 mStreaming( false ), 00055 mIncremental( false ), 00056 mLocalListDone( false ), 00057 mDeliveryDone( false ), 00058 mFinished( false ) 00059 { 00060 // we want to fetch all data by default 00061 mFetchScope.fetchFullPayload(); 00062 mFetchScope.fetchAllAttributes(); 00063 } 00064 00065 void createLocalItem( const Item &item ); 00066 void checkDone(); 00067 void slotLocalListDone( KJob* ); 00068 void slotLocalDeleteDone( KJob* ); 00069 void slotLocalChangeDone( KJob* ); 00070 void execute(); 00071 void processItems(); 00072 void deleteItems( const Item::List &items ); 00073 void slotTransactionResult( KJob *job ); 00074 Job* subjobParent() const; 00075 00076 ItemSync *q; 00077 Collection mSyncCollection; 00078 QHash<Item::Id, Akonadi::Item> mLocalItemsById; 00079 QHash<QString, Akonadi::Item> mLocalItemsByRemoteId; 00080 QSet<Akonadi::Item> mUnprocessedLocalItems; 00081 00082 ItemSync::TransactionMode mTransactionMode; 00083 TransactionSequence *mCurrentTransaction; 00084 int mTransactionJobs; 00085 00086 // fetch scope for initial item listing 00087 ItemFetchScope mFetchScope; 00088 00089 // remote items 00090 Akonadi::Item::List mRemoteItems; 00091 00092 // removed remote items 00093 Item::List mRemovedRemoteItems; 00094 00095 // create counter 00096 int mPendingJobs; 00097 int mProgress; 00098 int mTotalItems; 00099 int mTotalItemsProcessed; 00100 00101 bool mStreaming; 00102 bool mIncremental; 00103 bool mLocalListDone; 00104 bool mDeliveryDone; 00105 bool mFinished; 00106 }; 00107 00108 void ItemSync::Private::createLocalItem( const Item & item ) 00109 { 00110 // don't try to do anything in error state 00111 if ( q->error() ) 00112 return; 00113 mPendingJobs++; 00114 ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() ); 00115 q->connect( create, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)) ); 00116 } 00117 00118 void ItemSync::Private::checkDone() 00119 { 00120 q->setProcessedAmount( KJob::Bytes, mProgress ); 00121 if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 ) 00122 return; 00123 00124 if ( !mFinished ) { // prevent double result emission, can happen since checkDone() is called from all over the place 00125 mFinished = true; 00126 q->emitResult(); 00127 } 00128 } 00129 00130 ItemSync::ItemSync( const Collection &collection, QObject *parent ) : 00131 Job( parent ), 00132 d( new Private( this ) ) 00133 { 00134 d->mSyncCollection = collection; 00135 } 00136 00137 ItemSync::~ItemSync() 00138 { 00139 delete d; 00140 } 00141 00142 void ItemSync::setFullSyncItems( const Item::List &items ) 00143 { 00144 Q_ASSERT( !d->mIncremental ); 00145 if ( !d->mStreaming ) 00146 d->mDeliveryDone = true; 00147 d->mRemoteItems += items; 00148 d->mTotalItemsProcessed += items.count(); 00149 kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems; 00150 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed ); 00151 if ( d->mTotalItemsProcessed == d->mTotalItems ) 00152 d->mDeliveryDone = true; 00153 d->execute(); 00154 } 00155 00156 void ItemSync::setTotalItems( int amount ) 00157 { 00158 Q_ASSERT( !d->mIncremental ); 00159 Q_ASSERT( amount >= 0 ); 00160 setStreamingEnabled( true ); 00161 kDebug() << amount; 00162 d->mTotalItems = amount; 00163 setTotalAmount( KJob::Bytes, amount ); 00164 if ( d->mTotalItems == 0 ) { 00165 d->mDeliveryDone = true; 00166 d->execute(); 00167 } 00168 } 00169 00170 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems ) 00171 { 00172 d->mIncremental = true; 00173 if ( !d->mStreaming ) 00174 d->mDeliveryDone = true; 00175 d->mRemoteItems += changedItems; 00176 d->mRemovedRemoteItems += removedItems; 00177 d->mTotalItemsProcessed += changedItems.count() + removedItems.count(); 00178 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed ); 00179 if ( d->mTotalItemsProcessed == d->mTotalItems ) 00180 d->mDeliveryDone = true; 00181 d->execute(); 00182 } 00183 00184 void ItemSync::setFetchScope( ItemFetchScope &fetchScope ) 00185 { 00186 d->mFetchScope = fetchScope; 00187 } 00188 00189 ItemFetchScope &ItemSync::fetchScope() 00190 { 00191 return d->mFetchScope; 00192 } 00193 00194 void ItemSync::doStart() 00195 { 00196 ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this ); 00197 job->setFetchScope( d->mFetchScope ); 00198 00199 // we only can fetch parts already in the cache, otherwise this will deadlock 00200 job->fetchScope().setCacheOnly( true ); 00201 00202 connect( job, SIGNAL(result(KJob*)), SLOT(slotLocalListDone(KJob*)) ); 00203 } 00204 00205 bool ItemSync::updateItem( const Item &storedItem, Item &newItem ) 00206 { 00207 // we are in error state, better not change anything at all anymore 00208 if ( error() ) 00209 return false; 00210 00211 /* 00212 * We know that this item has changed (as it is part of the 00213 * incremental changed list), so we just put it into the 00214 * storage. 00215 */ 00216 if ( d->mIncremental ) 00217 return true; 00218 00219 if ( newItem.d_func()->mClearPayload ) 00220 return true; 00221 00222 // Check whether the remote revisions differ 00223 if ( storedItem.remoteRevision() != newItem.remoteRevision() ) 00224 return true; 00225 00226 // Check whether the flags differ 00227 if ( storedItem.flags() != newItem.flags() ) { 00228 kDebug() << "Stored flags " << storedItem.flags() 00229 << "new flags " << newItem.flags(); 00230 return true; 00231 } 00232 00233 // Check whether the new item contains unknown parts 00234 QSet<QByteArray> missingParts = newItem.loadedPayloadParts(); 00235 missingParts.subtract( storedItem.loadedPayloadParts() ); 00236 if ( !missingParts.isEmpty() ) 00237 return true; 00238 00239 // ### FIXME SLOW!!! 00240 // If the available part identifiers don't differ, check 00241 // whether the content of the payload differs 00242 if ( newItem.hasPayload() 00243 && storedItem.payloadData() != newItem.payloadData() ) 00244 return true; 00245 00246 // check if remote attributes have been changed 00247 foreach ( Attribute* attr, newItem.attributes() ) { 00248 if ( !storedItem.hasAttribute( attr->type() ) ) 00249 return true; 00250 if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() ) 00251 return true; 00252 } 00253 00254 return false; 00255 } 00256 00257 void ItemSync::Private::slotLocalListDone( KJob * job ) 00258 { 00259 if ( !job->error() ) { 00260 const Item::List list = static_cast<ItemFetchJob*>( job )->items(); 00261 foreach ( const Item &item, list ) { 00262 if ( item.remoteId().isEmpty() ) 00263 continue; 00264 mLocalItemsById.insert( item.id(), item ); 00265 mLocalItemsByRemoteId.insert( item.remoteId(), item ); 00266 mUnprocessedLocalItems.insert( item ); 00267 } 00268 } 00269 00270 mLocalListDone = true; 00271 execute(); 00272 } 00273 00274 void ItemSync::Private::execute() 00275 { 00276 if ( !mLocalListDone ) 00277 return; 00278 00279 // early exit to avoid unnecessary TransactionSequence creation in MultipleTransactions mode 00280 // TODO: do the transaction handling in a nicer way instead, only creating TransactionSequences when really needed 00281 if ( !mDeliveryDone && mRemoteItems.isEmpty() ) 00282 return; 00283 00284 if ( (mTransactionMode == SingleTransaction && !mCurrentTransaction) || mTransactionMode == MultipleTransactions) { 00285 ++mTransactionJobs; 00286 mCurrentTransaction = new TransactionSequence( q ); 00287 mCurrentTransaction->setAutomaticCommittingEnabled( false ); 00288 connect( mCurrentTransaction, SIGNAL(result(KJob*)), q, SLOT(slotTransactionResult(KJob*)) ); 00289 } 00290 00291 processItems(); 00292 if ( !mDeliveryDone ) { 00293 if ( mTransactionMode == MultipleTransactions && mCurrentTransaction ) { 00294 mCurrentTransaction->commit(); 00295 mCurrentTransaction = 0; 00296 } 00297 return; 00298 } 00299 00300 // removed 00301 if ( !mIncremental ) { 00302 mRemovedRemoteItems = mUnprocessedLocalItems.toList(); 00303 mUnprocessedLocalItems.clear(); 00304 } 00305 00306 deleteItems( mRemovedRemoteItems ); 00307 mLocalItemsById.clear(); 00308 mLocalItemsByRemoteId.clear(); 00309 mRemovedRemoteItems.clear(); 00310 00311 if ( mCurrentTransaction ) { 00312 mCurrentTransaction->commit(); 00313 mCurrentTransaction = 0; 00314 } 00315 00316 checkDone(); 00317 } 00318 00319 void ItemSync::Private::processItems() 00320 { 00321 // added / updated 00322 foreach ( Item remoteItem, mRemoteItems ) { //krazy:exclude=foreach non-const is needed here 00323 #ifndef NDEBUG 00324 if ( remoteItem.remoteId().isEmpty() ) { 00325 kWarning() << "Item " << remoteItem.id() << " does not have a remote identifier"; 00326 } 00327 #endif 00328 00329 Item localItem = mLocalItemsById.value( remoteItem.id() ); 00330 if ( !localItem.isValid() ) 00331 localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() ); 00332 mUnprocessedLocalItems.remove( localItem ); 00333 // missing locally 00334 if ( !localItem.isValid() ) { 00335 createLocalItem( remoteItem ); 00336 continue; 00337 } 00338 00339 if ( q->updateItem( localItem, remoteItem ) ) { 00340 mPendingJobs++; 00341 00342 remoteItem.setId( localItem.id() ); 00343 remoteItem.setRevision( localItem.revision() ); 00344 remoteItem.setSize( localItem.size() ); 00345 remoteItem.setRemoteId( localItem.remoteId() ); // in case someone clears remoteId by accident 00346 ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() ); 00347 mod->disableRevisionCheck(); 00348 q->connect( mod, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)) ); 00349 } else { 00350 mProgress++; 00351 } 00352 } 00353 mRemoteItems.clear(); 00354 } 00355 00356 void ItemSync::Private::deleteItems( const Item::List &items ) 00357 { 00358 // if in error state, better not change anything anymore 00359 if ( q->error() ) 00360 return; 00361 00362 Item::List itemsToDelete; 00363 foreach ( const Item &item, items ) { 00364 Item delItem( item ); 00365 if ( !item.isValid() ) { 00366 delItem = mLocalItemsByRemoteId.value( item.remoteId() ); 00367 } 00368 00369 if ( !delItem.isValid() ) { 00370 #ifndef NDEBUG 00371 kWarning() << "Delete item (remoteeId=" << item.remoteId() 00372 << "mimeType=" << item.mimeType() 00373 << ") does not have a valid UID and no item with that remote ID exists either"; 00374 #endif 00375 continue; 00376 } 00377 00378 if ( delItem.remoteId().isEmpty() ) { 00379 // don't attempt to remove items that never were written to the backend 00380 continue; 00381 } 00382 00383 itemsToDelete.append ( delItem ); 00384 } 00385 00386 if ( !itemsToDelete.isEmpty() ) { 00387 mPendingJobs++; 00388 ItemDeleteJob *job = new ItemDeleteJob( itemsToDelete, subjobParent() ); 00389 q->connect( job, SIGNAL(result(KJob*)), q, SLOT(slotLocalDeleteDone(KJob*)) ); 00390 00391 // It can happen that the groupware servers report us deleted items 00392 // twice, in this case this item delete job will fail on the second try. 00393 // To avoid a rollback of the complete transaction we gracefully allow the job 00394 // to fail :) 00395 TransactionSequence *transaction = qobject_cast<TransactionSequence*>( subjobParent() ); 00396 if ( transaction ) 00397 transaction->setIgnoreJobFailure( job ); 00398 } 00399 } 00400 00401 void ItemSync::Private::slotLocalDeleteDone( KJob* ) 00402 { 00403 mPendingJobs--; 00404 mProgress++; 00405 00406 checkDone(); 00407 } 00408 00409 void ItemSync::Private::slotLocalChangeDone( KJob * job ) 00410 { 00411 Q_UNUSED( job ); 00412 mPendingJobs--; 00413 mProgress++; 00414 00415 checkDone(); 00416 } 00417 00418 void ItemSync::Private::slotTransactionResult( KJob *job ) 00419 { 00420 --mTransactionJobs; 00421 if ( mCurrentTransaction == job ) 00422 mCurrentTransaction = 0; 00423 00424 checkDone(); 00425 } 00426 00427 Job * ItemSync::Private::subjobParent() const 00428 { 00429 if ( mCurrentTransaction && mTransactionMode != NoTransaction ) 00430 return mCurrentTransaction; 00431 return q; 00432 } 00433 00434 void ItemSync::setStreamingEnabled(bool enable) 00435 { 00436 d->mStreaming = enable; 00437 } 00438 00439 void ItemSync::deliveryDone() 00440 { 00441 Q_ASSERT( d->mStreaming ); 00442 d->mDeliveryDone = true; 00443 d->execute(); 00444 } 00445 00446 void ItemSync::slotResult(KJob* job) 00447 { 00448 if ( job->error() ) { 00449 // pretent there were no errors 00450 Akonadi::Job::removeSubjob( job ); 00451 // propagate the first error we got but continue, we might still be fed with stuff from a resource 00452 if ( !error() ) { 00453 setError( job->error() ); 00454 setErrorText( job->errorText() ); 00455 } 00456 } else { 00457 Akonadi::Job::slotResult( job ); 00458 } 00459 } 00460 00461 void ItemSync::rollback() 00462 { 00463 setError( UserCanceled ); 00464 if ( d->mCurrentTransaction ) 00465 d->mCurrentTransaction->rollback(); 00466 d->mDeliveryDone = true; // user wont deliver more data 00467 d->execute(); // end this in an ordered way, since we have an error set no real change will be done 00468 } 00469 00470 void ItemSync::setTransactionMode(ItemSync::TransactionMode mode) 00471 { 00472 d->mTransactionMode = mode; 00473 } 00474 00475 00476 #include "itemsync.moc"