View Javadoc

1   // ========================================================================
2   // Copyright 2006-2007 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.jetty.client;
16  
17  import java.io.IOException;
18  import java.io.InputStream;
19  import java.io.InterruptedIOException;
20  import java.util.concurrent.atomic.AtomicBoolean;
21  
22  import org.mortbay.io.Buffer;
23  import org.mortbay.io.Buffers;
24  import org.mortbay.io.ByteArrayBuffer;
25  import org.mortbay.io.Connection;
26  import org.mortbay.io.EndPoint;
27  import org.mortbay.io.View;
28  import org.mortbay.io.nio.SelectChannelEndPoint;
29  import org.mortbay.jetty.HttpGenerator;
30  import org.mortbay.jetty.HttpHeaderValues;
31  import org.mortbay.jetty.HttpHeaders;
32  import org.mortbay.jetty.HttpParser;
33  import org.mortbay.jetty.HttpSchemes;
34  import org.mortbay.jetty.HttpVersions;
35  import org.mortbay.jetty.client.security.Authorization;
36  import org.mortbay.jetty.security.SslHttpChannelEndPoint;
37  import org.mortbay.log.Log;
38  import org.mortbay.thread.Timeout;
39  
40  /**
41   *
42   * @author Greg Wilkins
43   * @author Guillaume Nodet
44   */
45  public class HttpConnection implements Connection
46  {
47      HttpDestination _destination;
48      EndPoint _endp;
49      HttpGenerator _generator;
50      HttpParser _parser;
51      boolean _http11 = true;
52      Buffer _connectionHeader;
53      Buffer _requestContentChunk;
54      boolean _requestComplete;
55      public boolean _reserved;
56      // The current exchange waiting for a response
57      volatile HttpExchange _exchange;
58      HttpExchange _pipeline;
59      private final Timeout.Task _timeout = new TimeoutTask();
60      private AtomicBoolean _idle = new AtomicBoolean(false);
61  
62      public void dump() throws IOException
63      {
64          System.err.println("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
65          System.err.println("generator=" + _generator);
66          System.err.println("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
67          System.err.println("exchange=" + _exchange);
68          if (_endp instanceof SslHttpChannelEndPoint)
69              ((SslHttpChannelEndPoint)_endp).dump();
70      }
71  
72      /* ------------------------------------------------------------ */
73      HttpConnection(Buffers buffers, EndPoint endp, int hbs, int cbs)
74      {
75          _endp = endp;
76          _generator = new HttpGenerator(buffers,endp,hbs,cbs);
77          _parser = new HttpParser(buffers,endp,new Handler(),hbs,cbs);
78      }
79  
80      public void setReserved (boolean reserved)
81      {
82          _reserved = reserved;
83      }
84  
85      public boolean isReserved()
86      {
87          return _reserved;
88      }
89  
90      /* ------------------------------------------------------------ */
91      public HttpDestination getDestination()
92      {
93          return _destination;
94      }
95  
96      /* ------------------------------------------------------------ */
97      public void setDestination(HttpDestination destination)
98      {
99          _destination = destination;
100     }
101 
102     /* ------------------------------------------------------------ */
103     public boolean send(HttpExchange ex) throws IOException
104     {
105         // _message =
106         // Thread.currentThread().getName()+": Generator instance="+_generator
107         // .hashCode()+" state= "+_generator.getState()+" _exchange="+_exchange;
108         synchronized (this)
109         {
110             if (_exchange != null)
111             {
112                 if (_pipeline != null)
113                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
114                 _pipeline = ex;
115                 return true;
116             }
117 
118             if (!_endp.isOpen())
119                 return false;
120 
121             _exchange = ex;
122             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
123 
124             if (_endp.isBlocking())
125             {
126                 this.notify();
127             }
128             else
129             {
130                 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
131                 scep.scheduleWrite();
132             }
133             _destination.getHttpClient().schedule(_timeout);
134 
135             return true;
136         }
137     }
138 
139     /* ------------------------------------------------------------ */
140     public void handle() throws IOException
141     {
142         int no_progress = 0;
143 
144         boolean failed = false;
145         while (_endp.isBufferingInput() || _endp.isOpen())
146         {
147             synchronized (this)
148             {
149                 while (_exchange == null)
150                 {
151                     if (_endp.isBlocking())
152                     {
153                         try
154                         {
155                             this.wait();
156                         }
157                         catch (InterruptedException e)
158                         {
159                             throw new InterruptedIOException();
160                         }
161                     }
162                     else
163                     {
164                         // Hopefully just space?
165                         _parser.fill();
166                         _parser.skipCRLF();
167                         if (_parser.isMoreInBuffer())
168                         {
169                             Log.warn("Unexpected data received but no request sent");
170                             close();
171                         }
172                         return;
173                     }
174                 }
175             }
176             if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
177             {
178                 no_progress = 0;
179                 commitRequest();
180             }
181 
182             try
183             {
184                 long io = 0;
185                 _endp.flush();
186 
187                 if (_generator.isComplete())
188                 {
189                     if (!_requestComplete)
190                     {
191                         _requestComplete = true;
192                         _exchange.getEventListener().onRequestComplete();
193                     }
194                 }
195                 else
196                 {
197                     // Write as much of the request as possible
198                     synchronized (this)
199                     {
200                         if (_exchange == null)
201                             continue;
202                         long flushed = _generator.flush();
203                         io += flushed;
204                     }
205 
206                     if (!_generator.isComplete())
207                     {
208                         InputStream in = _exchange.getRequestContentSource();
209                         if (in != null)
210                         {
211                             if (_requestContentChunk == null || _requestContentChunk.length() == 0)
212                             {
213                                 _requestContentChunk = _exchange.getRequestContentChunk();
214                                 if (_requestContentChunk != null)
215                                     _generator.addContent(_requestContentChunk,false);
216                                 else
217                                     _generator.complete();
218                                 io += _generator.flush();
219                             }
220                         }
221                         else
222                             _generator.complete();
223                     }
224                 }
225 
226                 if (_generator.isComplete() && !_requestComplete)
227                 {
228                     _requestComplete = true;
229                     _exchange.getEventListener().onRequestComplete();
230                 }
231 
232                 // If we are not ended then parse available
233                 if (!_parser.isComplete() && _generator.isCommitted())
234                 {
235                     long filled = _parser.parseAvailable();
236                     io += filled;
237                 }
238 
239                 if (io > 0)
240                     no_progress = 0;
241                 else if (no_progress++ >= 2 && !_endp.isBlocking())
242                 {
243                     // SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
244                     if (_endp instanceof SslHttpChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
245                     {
246                         if (_generator.flush()>0)
247                             continue;
248                     }
249                     return;
250                 }
251             }
252             catch (Throwable e)
253             {
254                 Log.debug("Failure on " + _exchange, e);
255 
256                 if (e instanceof ThreadDeath)
257                     throw (ThreadDeath)e;
258 
259                 synchronized (this)
260                 {
261                     if (_exchange != null)
262                     {
263                         _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
264                         _exchange.getEventListener().onException(e);
265                     }
266                 }
267 
268                 failed = true;
269                 if (e instanceof IOException)
270                     throw (IOException)e;
271 
272                 if (e instanceof Error)
273                     throw (Error)e;
274 
275                 if (e instanceof RuntimeException)
276                     throw (RuntimeException)e;
277 
278                throw new RuntimeException(e);
279             }
280             finally
281             {
282                 boolean complete = false;
283                 boolean close = failed; // always close the connection on error
284                 if (!failed)
285                 {
286                     // are we complete?
287                     if (_generator.isComplete())
288                     {
289                         if (!_requestComplete)
290                         {
291                             _requestComplete = true;
292                             _exchange.getEventListener().onRequestComplete();
293                         }
294 
295                         // we need to return the HttpConnection to a state that
296                         // it can be reused or closed out
297                         if (_parser.isComplete())
298                         {
299                             _destination.getHttpClient().cancel(_timeout);
300                             complete = true;
301                         }
302                     }
303                 }
304 
305                 if (complete || failed)
306                 {
307                     synchronized (this)
308                     {
309                         if (!close)
310                             close = shouldClose();
311 
312                         reset(true);
313 
314                         no_progress = 0;
315                         if (_exchange != null)
316                         {
317                             _exchange = null;
318 
319                             if (_pipeline == null)
320                             {
321                                 if (!isReserved())
322                                     _destination.returnConnection(this,close);
323                             }
324                             else
325                             {
326                                 if (close)
327                                 {
328                                     if (!isReserved())
329                                         _destination.returnConnection(this,close);
330 
331                                     HttpExchange exchange = _pipeline;
332                                     _pipeline = null;
333                                     _destination.send(exchange);
334                                 }
335                                 else
336                                 {
337                                     HttpExchange exchange = _pipeline;
338                                     _pipeline = null;
339                                     send(exchange);
340                                 }
341                             }
342                         }
343                     }
344                 }
345             }
346         }
347     }
348 
349     /* ------------------------------------------------------------ */
350     public boolean isIdle()
351     {
352         synchronized (this)
353         {
354             return _exchange == null;
355         }
356     }
357 
358     /* ------------------------------------------------------------ */
359     public EndPoint getEndPoint()
360     {
361         return _endp;
362     }
363 
364     /* ------------------------------------------------------------ */
365     private void commitRequest() throws IOException
366     {
367         synchronized (this)
368         {
369             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
370                 throw new IllegalStateException();
371 
372             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
373             _generator.setVersion(_exchange._version);
374 
375             String uri = _exchange._uri;
376             if (_destination.isProxied() && uri.startsWith("/"))
377             {
378                 // TODO suppress port 80 or 443
379                 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
380                         + _destination.getAddress().getPort() + uri;
381                 Authorization auth = _destination.getProxyAuthentication();
382                 if (auth != null)
383                     auth.setCredentials(_exchange);
384             }
385 
386             _generator.setRequest(_exchange._method,uri);
387 
388             if (_exchange._version >= HttpVersions.HTTP_1_1_ORDINAL)
389             {
390                 if (!_exchange._requestFields.containsKey(HttpHeaders.HOST_BUFFER))
391                     _exchange._requestFields.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
392             }
393 
394             if (_exchange._requestContent != null)
395             {
396                 _exchange._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH,_exchange._requestContent.length());
397                 _generator.completeHeader(_exchange._requestFields,false);
398                 _generator.addContent(new View(_exchange._requestContent),true);
399             }
400             else if (_exchange._requestContentSource != null)
401             {
402                 _generator.completeHeader(_exchange._requestFields,false);
403                 int available = _exchange._requestContentSource.available();
404                 if (available > 0)
405                 {
406                     // TODO deal with any known content length
407 
408                     // TODO reuse this buffer!
409                     byte[] buf = new byte[available];
410                     int length = _exchange._requestContentSource.read(buf);
411                     _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
412                 }
413             }
414             else
415             {
416                 _exchange._requestFields.remove(HttpHeaders.CONTENT_LENGTH); // TODO
417                 _generator.completeHeader(_exchange._requestFields,true);
418             }
419 
420             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
421         }
422     }
423 
424     /* ------------------------------------------------------------ */
425     protected void reset(boolean returnBuffers) throws IOException
426     {
427         _requestComplete = false;
428         _connectionHeader = null;
429         _parser.reset(returnBuffers);
430         _generator.reset(returnBuffers);
431         _http11 = true;
432     }
433 
434     /* ------------------------------------------------------------ */
435     private boolean shouldClose()
436     {
437         if (_connectionHeader!=null)
438         {
439             if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
440                 return true;
441             if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
442                 return false;
443         }
444         return !_http11;
445     }
446 
447     /* ------------------------------------------------------------ */
448     private class Handler extends HttpParser.EventHandler
449     {
450         @Override
451         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
452         {
453             // System.out.println( method.toString() + "///" + url.toString() +
454             // "///" + version.toString() );
455             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
456             // out here
457             // throw new IllegalStateException();
458         }
459 
460         @Override
461         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
462         {
463             HttpExchange exchange = _exchange;
464             if (exchange!=null)
465             {
466                 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
467                 exchange.getEventListener().onResponseStatus(version,status,reason);
468                 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
469             }
470         }
471 
472         @Override
473         public void parsedHeader(Buffer name, Buffer value) throws IOException
474         {
475             HttpExchange exchange = _exchange;
476             if (exchange!=null)
477             {
478                 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
479                 {
480                     _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
481                 }
482                 exchange.getEventListener().onResponseHeader(name,value);
483             }
484         }
485 
486         @Override
487         public void headerComplete() throws IOException
488         {
489             HttpExchange exchange = _exchange;
490             if (exchange!=null)
491                 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
492         }
493 
494         @Override
495         public void content(Buffer ref) throws IOException
496         {
497             HttpExchange exchange = _exchange;
498             if (exchange!=null)
499                 exchange.getEventListener().onResponseContent(ref);
500         }
501 
502         @Override
503         public void messageComplete(long contextLength) throws IOException
504         {
505             HttpExchange exchange = _exchange;
506             if (exchange!=null)
507                 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
508         }
509     }
510 
511     /* ------------------------------------------------------------ */
512     public String toString()
513     {
514         return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
515     }
516 
517     /* ------------------------------------------------------------ */
518     public String toDetailString()
519     {
520         return toString() + " ex=" + _exchange + " " + _timeout.getAge();
521     }
522 
523     /* ------------------------------------------------------------ */
524     public void close() throws IOException
525     {
526         try
527         {
528             _endp.close();
529         }
530         finally
531         {
532             HttpExchange exchange=_exchange;
533             if (exchange!=null)
534             {
535                 int status = exchange.getStatus();
536                 if (status>HttpExchange.STATUS_START && status<HttpExchange.STATUS_COMPLETED)
537                     exchange.onException(new IOException("CLOSED"));
538             }
539         }
540     }
541 
542 
543     /* ------------------------------------------------------------ */
544     public void setIdleTimeout()
545     {
546         synchronized (this)
547         {
548             if (_idle.compareAndSet(false,true))
549                 _destination.getHttpClient().scheduleIdle(_timeout);
550             else
551                 throw new IllegalStateException();
552         }
553     }
554 
555     /* ------------------------------------------------------------ */
556     public boolean cancelIdleTimeout()
557     {
558         synchronized (this)
559         {
560             if (_idle.compareAndSet(true,false))
561             {
562                 _destination.getHttpClient().cancel(_timeout);
563                 return true;
564             }
565         }
566 
567         return false;
568     }
569 
570     /* ------------------------------------------------------------ */
571     /* ------------------------------------------------------------ */
572     /* ------------------------------------------------------------ */
573     private class TimeoutTask extends Timeout.Task
574     {
575         public void expired()
576         {
577             HttpExchange ex=null;
578             try
579             {
580                 synchronized (HttpConnection.this)
581                 {
582                     ex = _exchange;
583                     _exchange = null;
584                     if (ex != null)
585                     {
586                         _destination.returnConnection(HttpConnection.this,true);
587                     }
588                     else if (_idle.compareAndSet(true,false))
589                     {
590                         _destination.returnIdleConnection(HttpConnection.this);
591                     }
592                 }
593             }
594             catch (Exception e)
595             {
596                 Log.debug(e);
597             }
598             finally
599             {
600                 try
601                 {
602                     close();
603                 }
604                 catch (IOException e)
605                 {
606                     Log.ignore(e);
607                 }
608 
609                 if (ex!=null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
610                 {
611                     ex.setStatus(HttpExchange.STATUS_EXPIRED);
612                 }
613             }
614         }
615     }
616 
617 }