00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "session.h"
00021 #include "session_p.h"
00022
00023 #include "imapparser_p.h"
00024 #include "job.h"
00025 #include "job_p.h"
00026 #include "servermanager.h"
00027 #include "servermanager_p.h"
00028 #include "xdgbasedirs_p.h"
00029
00030 #include <kdebug.h>
00031 #include <klocale.h>
00032
00033 #include <QCoreApplication>
00034 #include <QtCore/QDir>
00035 #include <QtCore/QQueue>
00036 #include <QtCore/QThreadStorage>
00037 #include <QtCore/QTimer>
00038 #include <QSettings>
00039
00040 #include <QtNetwork/QLocalSocket>
00041 #include <QtNetwork/QTcpSocket>
00042
00043
00044
00045
00046 #define PIPELINE_LENGTH 0
00047
00048
00049 using namespace Akonadi;
00050
00051
00052
00053
00054 void SessionPrivate::startNext()
00055 {
00056 QTimer::singleShot( 0, mParent, SLOT( doStartNext() ) );
00057 }
00058
00059 void SessionPrivate::reconnect()
00060 {
00061 QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
00062 if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
00063 || localSocket->state() == QLocalSocket::ConnectingState ) ) {
00064
00065 return;
00066 }
00067
00068 QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
00069 if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
00070 || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
00071
00072 return;
00073 }
00074
00075
00076 QString serverAddress;
00077 quint16 port = 0;
00078 bool useTcp = false;
00079
00080
00081 const QByteArray serverAddressEnvVar = qgetenv( "AKONADI_SERVER_ADDRESS" );
00082 if ( !serverAddressEnvVar.isEmpty() ) {
00083 const int pos = serverAddressEnvVar.indexOf( ':' );
00084 const QByteArray protocol = serverAddressEnvVar.left( pos );
00085 QMap<QString, QString> options;
00086 foreach ( const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(',') ) ) {
00087 const QStringList pair = entry.split( QLatin1Char('=') );
00088 if ( pair.size() != 2 )
00089 continue;
00090 options.insert( pair.first(), pair.last() );
00091 }
00092 kDebug() << protocol << options;
00093
00094 if ( protocol == "tcp" ) {
00095 serverAddress = options.value( QLatin1String( "host" ) );
00096 port = options.value( QLatin1String( "port" ) ).toUInt();
00097 useTcp = true;
00098 } else if ( protocol == "unix" ) {
00099 serverAddress = options.value( QLatin1String( "path" ) );
00100 } else if ( protocol == "pipe" ) {
00101 serverAddress = options.value( QLatin1String( "name" ) );
00102 }
00103 }
00104
00105
00106 if ( serverAddress.isEmpty() ) {
00107 const QString connectionConfigFile = XdgBaseDirs::akonadiConnectionConfigFile();
00108 const QFileInfo fileInfo( connectionConfigFile );
00109 if ( !fileInfo.exists() ) {
00110 kDebug() << "Akonadi Client Session: connection config file '"
00111 "akonadi/akonadiconnectionrc' can not be found in"
00112 << XdgBaseDirs::homePath( "config" ) << "nor in any of"
00113 << XdgBaseDirs::systemPathList( "config" );
00114 }
00115 const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
00116
00117 #ifdef Q_OS_WIN //krazy:exclude=cpp
00118 serverAddress = connectionSettings.value( QLatin1String( "Data/NamedPipe" ), QLatin1String( "Akonadi" ) ).toString();
00119 #else
00120 const QString defaultSocketDir = XdgBaseDirs::saveDir( "data", QLatin1String( "akonadi" ) );
00121 serverAddress = connectionSettings.value( QLatin1String( "Data/UnixPath" ), defaultSocketDir + QLatin1String( "/akonadiserver.socket" ) ).toString();
00122 #endif
00123 }
00124
00125
00126
00127 if ( !socket ) {
00128 if ( !useTcp ) {
00129 socket = localSocket = new QLocalSocket( mParent );
00130 mParent->connect( localSocket, SIGNAL( error( QLocalSocket::LocalSocketError ) ), SLOT( socketError( QLocalSocket::LocalSocketError ) ) );
00131 } else {
00132 socket = tcpSocket = new QTcpSocket( mParent );
00133 mParent->connect( tcpSocket, SIGNAL( error( QAbstractSocket::SocketError ) ), SLOT( socketError( QAbstractSocket::SocketError ) ) );
00134 }
00135 mParent->connect( socket, SIGNAL( disconnected() ), SLOT( socketDisconnected() ) );
00136 mParent->connect( socket, SIGNAL( readyRead() ), SLOT( dataReceived() ) );
00137 }
00138
00139
00140 kDebug() << "connectToServer" << serverAddress;
00141 if ( !useTcp ) {
00142 localSocket->connectToServer( serverAddress );
00143 } else {
00144 tcpSocket->connectToHost( serverAddress, port );
00145 }
00146 }
00147
00148 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
00149 {
00150 Q_ASSERT( mParent->sender() == socket );
00151 kWarning() << "Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
00152 socketDisconnected();
00153 }
00154
00155 void SessionPrivate::socketError( QAbstractSocket::SocketError )
00156 {
00157 Q_ASSERT( mParent->sender() == socket );
00158 kWarning() << "Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
00159 socketDisconnected();
00160 }
00161
00162 void SessionPrivate::socketDisconnected()
00163 {
00164 if ( currentJob )
00165 currentJob->d_ptr->lostConnection();
00166 connected = false;
00167 QTimer::singleShot( 30000, mParent, SLOT( reconnect() ) );
00168 }
00169
00170 void SessionPrivate::dataReceived()
00171 {
00172 while ( socket->bytesAvailable() > 0 ) {
00173 if ( parser->continuationSize() > 1 ) {
00174 const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
00175 parser->parseBlock( data );
00176 } else if ( socket->canReadLine() ) {
00177 if ( !parser->parseNextLine( socket->readLine() ) )
00178 continue;
00179
00180
00181 if ( parser->tag() == QByteArray( "0" ) ) {
00182 if ( parser->data().startsWith( "OK" ) ) {
00183 connected = true;
00184 startNext();
00185 } else {
00186 kWarning() << "Unable to login to Akonadi server:" << parser->data();
00187 socket->close();
00188 QTimer::singleShot( 1000, mParent, SLOT( reconnect() ) );
00189 }
00190 }
00191
00192
00193 if ( parser->tag() == "*" && parser->data().startsWith( "OK Akonadi" ) ) {
00194 const int pos = parser->data().indexOf( "[PROTOCOL" );
00195 if ( pos > 0 ) {
00196 qint64 tmp = 0;
00197 ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
00198 protocolVersion = tmp;
00199 Internal::setServerProtocolVersion( tmp );
00200 }
00201 kDebug() << "Server protocol version is:" << protocolVersion;
00202
00203 writeData( "0 LOGIN " + ImapParser::quote( sessionId ) + '\n' );
00204
00205
00206 } else {
00207 if ( currentJob )
00208 currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
00209 }
00210
00211
00212 parser->reset();
00213 } else {
00214 break;
00215 }
00216 }
00217 }
00218
00219 bool SessionPrivate::canPipelineNext()
00220 {
00221 if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
00222 return false;
00223 if ( pipeline.isEmpty() && currentJob )
00224 return currentJob->d_ptr->mWriteFinished;
00225 if ( !pipeline.isEmpty() )
00226 return pipeline.last()->d_ptr->mWriteFinished;
00227 return false;
00228 }
00229
00230 void SessionPrivate::doStartNext()
00231 {
00232 if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
00233 return;
00234 if ( canPipelineNext() ) {
00235 Akonadi::Job *nextJob = queue.dequeue();
00236 pipeline.enqueue( nextJob );
00237 startJob( nextJob );
00238 }
00239 if ( jobRunning )
00240 return;
00241 jobRunning = true;
00242 if ( !pipeline.isEmpty() ) {
00243 currentJob = pipeline.dequeue();
00244 } else {
00245 currentJob = queue.dequeue();
00246 startJob( currentJob );
00247 }
00248 }
00249
00250 void SessionPrivate::startJob( Job *job )
00251 {
00252 if ( protocolVersion < minimumProtocolVersion() ) {
00253 job->setError( Job::ProtocolVersionMismatch );
00254 job->setErrorText( i18n( "Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
00255 job->emitResult();
00256 } else {
00257 job->d_ptr->startQueued();
00258 }
00259 }
00260
00261 void SessionPrivate::endJob( Job *job )
00262 {
00263 job->emitResult();
00264 }
00265
00266 void SessionPrivate::jobDone(KJob * job)
00267 {
00268
00269
00270 if ( job == currentJob ) {
00271 if ( pipeline.isEmpty() ) {
00272 jobRunning = false;
00273 currentJob = 0;
00274 } else {
00275 currentJob = pipeline.dequeue();
00276 }
00277 startNext();
00278 } else {
00279
00280 queue.removeAll( static_cast<Akonadi::Job*>( job ) );
00281
00282 pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
00283 }
00284 }
00285
00286 void SessionPrivate::jobWriteFinished( Akonadi::Job* job )
00287 {
00288 Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
00289 Q_UNUSED( job );
00290
00291 startNext();
00292 }
00293
00294 void SessionPrivate::jobDestroyed(QObject * job)
00295 {
00296
00297 jobDone( static_cast<KJob*>( job ) );
00298 }
00299
00300 void SessionPrivate::addJob(Job * job)
00301 {
00302 queue.append( job );
00303 QObject::connect( job, SIGNAL( result( KJob* ) ), mParent, SLOT( jobDone( KJob* ) ) );
00304 QObject::connect( job, SIGNAL( writeFinished( Akonadi::Job* ) ), mParent, SLOT( jobWriteFinished( Akonadi::Job* ) ) );
00305 QObject::connect( job, SIGNAL( destroyed( QObject* ) ), mParent, SLOT( jobDestroyed( QObject* ) ) );
00306 startNext();
00307 }
00308
00309 int SessionPrivate::nextTag()
00310 {
00311 return theNextTag++;
00312 }
00313
00314 void SessionPrivate::writeData(const QByteArray & data)
00315 {
00316 if ( socket )
00317 socket->write( data );
00318 else
00319 kWarning() << "Trying to write while session is disconnected!" << kBacktrace();
00320 }
00321
00322 void SessionPrivate::serverStateChanged( ServerManager::State state )
00323 {
00324 if ( state == ServerManager::Running && !connected )
00325 reconnect();
00326 }
00327
00328
00329
00330
00331 SessionPrivate::SessionPrivate( Session *parent )
00332 : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 )
00333 {
00334 }
00335
00336 void SessionPrivate::init( const QByteArray &id )
00337 {
00338 kDebug() << id;
00339 parser = new ImapParser();
00340
00341 if ( !id.isEmpty() ) {
00342 sessionId = id;
00343 } else {
00344 sessionId = QCoreApplication::instance()->applicationName().toUtf8()
00345 + '-' + QByteArray::number( qrand() );
00346 }
00347
00348 connected = false;
00349 theNextTag = 1;
00350 jobRunning = false;
00351
00352 if ( ServerManager::state() == ServerManager::NotRunning )
00353 ServerManager::start();
00354 mParent->connect( ServerManager::self(), SIGNAL( stateChanged( Akonadi::ServerManager::State ) ),
00355 SLOT( serverStateChanged( Akonadi::ServerManager::State ) ) );
00356
00357 reconnect();
00358 }
00359
00360 Session::Session(const QByteArray & sessionId, QObject * parent) :
00361 QObject( parent ),
00362 d( new SessionPrivate( this ) )
00363 {
00364 d->init( sessionId );
00365 }
00366
00367 Session::Session( SessionPrivate *dd, const QByteArray & sessionId, QObject * parent)
00368 : QObject( parent ),
00369 d( dd )
00370 {
00371 d->init( sessionId );
00372 }
00373
00374 Session::~Session()
00375 {
00376 clear();
00377 delete d;
00378 }
00379
00380 QByteArray Session::sessionId() const
00381 {
00382 return d->sessionId;
00383 }
00384
00385 QThreadStorage<Session*> instances;
00386
00387 void SessionPrivate::createDefaultSession( const QByteArray &sessionId )
00388 {
00389 Q_ASSERT_X( !sessionId.isEmpty(), "SessionPrivate::createDefaultSession",
00390 "You tried to create a default session with empty session id!" );
00391 Q_ASSERT_X( !instances.hasLocalData(), "SessionPrivate::createDefaultSession",
00392 "You tried to create a default session twice!" );
00393
00394 instances.setLocalData( new Session( sessionId ) );
00395 }
00396
00397 Session* Session::defaultSession()
00398 {
00399 if ( !instances.hasLocalData() )
00400 instances.setLocalData( new Session() );
00401 return instances.localData();
00402 }
00403
00404 void Session::clear()
00405 {
00406 foreach ( Job* job, d->queue )
00407 job->kill( KJob::EmitResult );
00408 d->queue.clear();
00409 foreach ( Job* job, d->pipeline )
00410 job->kill( KJob::EmitResult );
00411 d->pipeline.clear();
00412 if ( d->currentJob )
00413 d->currentJob->kill( KJob::EmitResult );
00414 d->jobRunning = false;
00415 d->connected = false;
00416 if ( d->socket )
00417 d->socket->disconnect( this );
00418 delete d->socket;
00419 d->socket = 0;
00420 QMetaObject::invokeMethod( this, "reconnect", Qt::QueuedConnection );
00421 }
00422
00423 #include "session.moc"