akonadi
session.cpp
00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "session.h"
00021 #include "session_p.h"
00022
00023 #include "imapparser_p.h"
00024 #include "job.h"
00025 #include "job_p.h"
00026 #include "xdgbasedirs_p.h"
00027
00028 #include <kdebug.h>
00029 #include <klocale.h>
00030
00031 #include <QtCore/QDir>
00032 #include <QtCore/QQueue>
00033 #include <QtCore/QThreadStorage>
00034 #include <QtCore/QTimer>
00035
00036 #include <QtNetwork/QLocalSocket>
00037
00038 #define PIPELINE_LENGTH 2
00039
00040 using namespace Akonadi;
00041
00042
00043
00044
00045 static const int minimumProtocolVersion = 2;
00046
00047 void SessionPrivate::startNext()
00048 {
00049 QTimer::singleShot( 0, mParent, SLOT(doStartNext()) );
00050 }
00051
00052 void SessionPrivate::reconnect()
00053 {
00054
00055 if ( socket->state() != QLocalSocket::ConnectedState &&
00056 socket->state() != QLocalSocket::ConnectingState ) {
00057 #ifdef Q_OS_WIN //krazy:exclude=cpp
00058 const QString namedPipe = mConnectionSettings->value( QLatin1String( "Data/NamedPipe" ), QLatin1String( "Akonadi" ) ).toString();
00059 socket->connectToServer( namedPipe );
00060 #else
00061 const QString defaultSocketDir = XdgBaseDirs::saveDir( "data", QLatin1String( "akonadi" ) );
00062 const QString path = mConnectionSettings->value( QLatin1String( "Data/UnixPath" ), defaultSocketDir + QLatin1String( "/akonadiserver.socket" ) ).toString();
00063 socket->connectToServer( path );
00064 #endif
00065 }
00066 }
00067
00068 void SessionPrivate::socketError()
00069 {
00070 if ( currentJob )
00071 currentJob->d_ptr->lostConnection();
00072 connected = false;
00073 QTimer::singleShot( 1000, mParent, SLOT(reconnect()) );
00074 }
00075
00076 void SessionPrivate::dataReceived()
00077 {
00078 while ( socket->bytesAvailable() > 0 ) {
00079 if ( parser->continuationSize() > 1 ) {
00080 const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
00081 parser->parseBlock( data );
00082 } else if ( socket->canReadLine() ) {
00083 if ( !parser->parseNextLine( socket->readLine() ) )
00084 continue;
00085
00086
00087 if ( parser->tag() == QByteArray("0") ) {
00088 if ( parser->data().startsWith( "OK" ) ) {
00089 connected = true;
00090 startNext();
00091 } else {
00092 kWarning( 5250 ) << "Unable to login to Akonadi server:" << parser->data();
00093 socket->close();
00094 QTimer::singleShot( 1000, mParent, SLOT(reconnect()) );
00095 }
00096 }
00097
00098
00099 if ( parser->tag() == "*" && parser->data().startsWith( "OK Akonadi" ) ) {
00100 const int pos = parser->data().indexOf( "[PROTOCOL" );
00101 if ( pos > 0 ) {
00102 qint64 tmp = 0;
00103 ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
00104 protocolVersion = tmp;
00105 }
00106 kDebug( 5250 ) << "Server protocol version is:" << protocolVersion;
00107
00108 writeData( "0 LOGIN " + sessionId + '\n' );
00109
00110
00111 } else {
00112 if ( currentJob )
00113 currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
00114 }
00115
00116
00117 parser->reset();
00118 } else {
00119 break;
00120 }
00121 }
00122 }
00123
00124 bool SessionPrivate::canPipelineNext()
00125 {
00126 if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
00127 return false;
00128 if ( pipeline.isEmpty() && currentJob )
00129 return currentJob->d_ptr->mWriteFinished;
00130 if ( !pipeline.isEmpty() )
00131 return pipeline.last()->d_ptr->mWriteFinished;
00132 return false;
00133 }
00134
00135 void SessionPrivate::doStartNext()
00136 {
00137 if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
00138 return;
00139 if ( canPipelineNext() ) {
00140 Akonadi::Job *nextJob = queue.dequeue();
00141 pipeline.enqueue( nextJob );
00142 startJob( nextJob );
00143 }
00144 if ( jobRunning )
00145 return;
00146 jobRunning = true;
00147 if ( !pipeline.isEmpty() ) {
00148 currentJob = pipeline.dequeue();
00149 } else {
00150 currentJob = queue.dequeue();
00151 startJob( currentJob );
00152 }
00153 }
00154
00155 void SessionPrivate::startJob( Job *job )
00156 {
00157 if ( protocolVersion < minimumProtocolVersion ) {
00158 job->setError( Job::ProtocolVersionMismatch );
00159 job->setErrorText( i18n( "Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion ) );
00160 job->emitResult();
00161 } else {
00162 job->d_ptr->startQueued();
00163 }
00164 }
00165
00166 void SessionPrivate::jobDone(KJob * job)
00167 {
00168 if( job == currentJob ) {
00169 if ( pipeline.isEmpty() ) {
00170 jobRunning = false;
00171 currentJob = 0;
00172 } else {
00173 currentJob = pipeline.dequeue();
00174 }
00175 startNext();
00176 }
00177
00178 else {
00179 kDebug( 5250 ) << job << "Non-current job finished.";
00180 }
00181 }
00182
00183 void SessionPrivate::jobWriteFinished( Akonadi::Job* job )
00184 {
00185 Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
00186 startNext();
00187 }
00188
00189 void SessionPrivate::addJob(Job * job)
00190 {
00191 queue.append( job );
00192 QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) );
00193 QObject::connect( job, SIGNAL(writeFinished(Akonadi::Job*)), mParent, SLOT(jobWriteFinished(Akonadi::Job*)) );
00194 startNext();
00195 }
00196
00197 int SessionPrivate::nextTag()
00198 {
00199 return theNextTag++;
00200 }
00201
00202 void SessionPrivate::writeData(const QByteArray & data)
00203 {
00204 socket->write( data );
00205 }
00206
00207
00208
00209
00210 Session::Session(const QByteArray & sessionId, QObject * parent) :
00211 QObject( parent ),
00212 d( new SessionPrivate( this ) )
00213 {
00214 if ( !sessionId.isEmpty() )
00215 d->sessionId = sessionId;
00216 else
00217 d->sessionId = QByteArray::number( qrand() );
00218
00219 d->connected = false;
00220 d->theNextTag = 1;
00221 d->currentJob = 0;
00222 d->jobRunning = false;
00223
00224 const QString connectionConfigFile = XdgBaseDirs::akonadiConnectionConfigFile();
00225
00226 QFileInfo fileInfo( connectionConfigFile );
00227 if ( !fileInfo.exists() ) {
00228 kWarning( 5250 ) << "Akonadi Client Session: connection config file '"
00229 << "akonadi/akonadiconnectionrc can not be found in '"
00230 << XdgBaseDirs::homePath( "config" ) << "' nor in any of "
00231 << XdgBaseDirs::systemPathList( "config" );
00232 }
00233
00234 d->mConnectionSettings = new QSettings( connectionConfigFile, QSettings::IniFormat );
00235
00236
00237 d->socket = new QLocalSocket( this );
00238
00239 connect( d->socket, SIGNAL(disconnected()), SLOT(socketError()) );
00240 connect( d->socket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError()) );
00241 connect( d->socket, SIGNAL(readyRead()), SLOT(dataReceived()) );
00242 d->reconnect();
00243 }
00244
00245 Session::~Session()
00246 {
00247 clear();
00248 delete d;
00249 }
00250
00251 QByteArray Session::sessionId() const
00252 {
00253 return d->sessionId;
00254 }
00255
00256 QThreadStorage<Session*> instances;
00257
00258 void SessionPrivate::createDefaultSession( const QByteArray &sessionId )
00259 {
00260 Q_ASSERT_X( !sessionId.isEmpty(), "SessionPrivate::createDefaultSession",
00261 "You tried to create a default session with empty session id!" );
00262 Q_ASSERT_X( !instances.hasLocalData(), "SessionPrivate::createDefaultSession",
00263 "You tried to create a default session twice!" );
00264
00265 instances.setLocalData( new Session( sessionId ) );
00266 }
00267
00268 Session* Session::defaultSession()
00269 {
00270 if ( !instances.hasLocalData() )
00271 instances.setLocalData( new Session() );
00272 return instances.localData();
00273 }
00274
00275 void Session::clear()
00276 {
00277 foreach ( Job* job, d->queue )
00278 job->kill( KJob::EmitResult );
00279 d->queue.clear();
00280 if ( d->currentJob )
00281 d->currentJob->kill( KJob::EmitResult );
00282 }
00283
00284 #include "session.moc"