1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package org.mortbay.jetty.client;
16
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.InterruptedIOException;
20 import java.util.concurrent.atomic.AtomicBoolean;
21
22 import org.mortbay.io.Buffer;
23 import org.mortbay.io.Buffers;
24 import org.mortbay.io.ByteArrayBuffer;
25 import org.mortbay.io.Connection;
26 import org.mortbay.io.EndPoint;
27 import org.mortbay.io.View;
28 import org.mortbay.io.nio.SelectChannelEndPoint;
29 import org.mortbay.jetty.HttpGenerator;
30 import org.mortbay.jetty.HttpHeaderValues;
31 import org.mortbay.jetty.HttpHeaders;
32 import org.mortbay.jetty.HttpParser;
33 import org.mortbay.jetty.HttpSchemes;
34 import org.mortbay.jetty.HttpVersions;
35 import org.mortbay.jetty.client.security.Authorization;
36 import org.mortbay.jetty.security.SslHttpChannelEndPoint;
37 import org.mortbay.log.Log;
38 import org.mortbay.thread.Timeout;
39
40
41
42
43
44
45 public class HttpConnection implements Connection
46 {
47 HttpDestination _destination;
48 EndPoint _endp;
49 HttpGenerator _generator;
50 HttpParser _parser;
51 boolean _http11 = true;
52 Buffer _connectionHeader;
53 Buffer _requestContentChunk;
54 boolean _requestComplete;
55 public boolean _reserved;
56
57 volatile HttpExchange _exchange;
58 HttpExchange _pipeline;
59 private final Timeout.Task _timeout = new TimeoutTask();
60 private AtomicBoolean _idle = new AtomicBoolean(false);
61
62 public void dump() throws IOException
63 {
64 System.err.println("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
65 System.err.println("generator=" + _generator);
66 System.err.println("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
67 System.err.println("exchange=" + _exchange);
68 if (_endp instanceof SslHttpChannelEndPoint)
69 ((SslHttpChannelEndPoint)_endp).dump();
70 }
71
72
73 HttpConnection(Buffers buffers, EndPoint endp, int hbs, int cbs)
74 {
75 _endp = endp;
76 _generator = new HttpGenerator(buffers,endp,hbs,cbs);
77 _parser = new HttpParser(buffers,endp,new Handler(),hbs,cbs);
78 }
79
80 public void setReserved (boolean reserved)
81 {
82 _reserved = reserved;
83 }
84
85 public boolean isReserved()
86 {
87 return _reserved;
88 }
89
90
91 public HttpDestination getDestination()
92 {
93 return _destination;
94 }
95
96
97 public void setDestination(HttpDestination destination)
98 {
99 _destination = destination;
100 }
101
102
103 public boolean send(HttpExchange ex) throws IOException
104 {
105
106
107
108 synchronized (this)
109 {
110 if (_exchange != null)
111 {
112 if (_pipeline != null)
113 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
114 _pipeline = ex;
115 return true;
116 }
117
118 if (!_endp.isOpen())
119 return false;
120
121 _exchange = ex;
122 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
123
124 if (_endp.isBlocking())
125 {
126 this.notify();
127 }
128 else
129 {
130 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
131 scep.scheduleWrite();
132 }
133 _destination.getHttpClient().schedule(_timeout);
134
135 return true;
136 }
137 }
138
139
140 public void handle() throws IOException
141 {
142 int no_progress = 0;
143
144 boolean failed = false;
145 while (_endp.isBufferingInput() || _endp.isOpen())
146 {
147 synchronized (this)
148 {
149 while (_exchange == null)
150 {
151 if (_endp.isBlocking())
152 {
153 try
154 {
155 this.wait();
156 }
157 catch (InterruptedException e)
158 {
159 throw new InterruptedIOException();
160 }
161 }
162 else
163 {
164
165 _parser.fill();
166 _parser.skipCRLF();
167 if (_parser.isMoreInBuffer())
168 {
169 Log.warn("Unexpected data received but no request sent");
170 close();
171 }
172 return;
173 }
174 }
175 }
176 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
177 {
178 no_progress = 0;
179 commitRequest();
180 }
181
182 try
183 {
184 long io = 0;
185 _endp.flush();
186
187 if (_generator.isComplete())
188 {
189 if (!_requestComplete)
190 {
191 _requestComplete = true;
192 _exchange.getEventListener().onRequestComplete();
193 }
194 }
195 else
196 {
197
198 synchronized (this)
199 {
200 if (_exchange == null)
201 continue;
202 long flushed = _generator.flush();
203 io += flushed;
204 }
205
206 if (!_generator.isComplete())
207 {
208 InputStream in = _exchange.getRequestContentSource();
209 if (in != null)
210 {
211 if (_requestContentChunk == null || _requestContentChunk.length() == 0)
212 {
213 _requestContentChunk = _exchange.getRequestContentChunk();
214 if (_requestContentChunk != null)
215 _generator.addContent(_requestContentChunk,false);
216 else
217 _generator.complete();
218 io += _generator.flush();
219 }
220 }
221 else
222 _generator.complete();
223 }
224 }
225
226 if (_generator.isComplete() && !_requestComplete)
227 {
228 _requestComplete = true;
229 _exchange.getEventListener().onRequestComplete();
230 }
231
232
233 if (!_parser.isComplete() && _generator.isCommitted())
234 {
235 long filled = _parser.parseAvailable();
236 io += filled;
237 }
238
239 if (io > 0)
240 no_progress = 0;
241 else if (no_progress++ >= 2 && !_endp.isBlocking())
242 {
243
244 if (_endp instanceof SslHttpChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
245 {
246 if (_generator.flush()>0)
247 continue;
248 }
249 return;
250 }
251 }
252 catch (Throwable e)
253 {
254 Log.debug("Failure on " + _exchange, e);
255
256 if (e instanceof ThreadDeath)
257 throw (ThreadDeath)e;
258
259 synchronized (this)
260 {
261 if (_exchange != null)
262 {
263 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
264 _exchange.getEventListener().onException(e);
265 }
266 }
267
268 failed = true;
269 if (e instanceof IOException)
270 throw (IOException)e;
271
272 if (e instanceof Error)
273 throw (Error)e;
274
275 if (e instanceof RuntimeException)
276 throw (RuntimeException)e;
277
278 throw new RuntimeException(e);
279 }
280 finally
281 {
282 boolean complete = false;
283 boolean close = failed;
284 if (!failed)
285 {
286
287 if (_generator.isComplete())
288 {
289 if (!_requestComplete)
290 {
291 _requestComplete = true;
292 _exchange.getEventListener().onRequestComplete();
293 }
294
295
296
297 if (_parser.isComplete())
298 {
299 _destination.getHttpClient().cancel(_timeout);
300 complete = true;
301 }
302 }
303 }
304
305 if (complete || failed)
306 {
307 synchronized (this)
308 {
309 if (!close)
310 close = shouldClose();
311
312 reset(true);
313
314 no_progress = 0;
315 if (_exchange != null)
316 {
317 _exchange = null;
318
319 if (_pipeline == null)
320 {
321 if (!isReserved())
322 _destination.returnConnection(this,close);
323 }
324 else
325 {
326 if (close)
327 {
328 if (!isReserved())
329 _destination.returnConnection(this,close);
330
331 HttpExchange exchange = _pipeline;
332 _pipeline = null;
333 _destination.send(exchange);
334 }
335 else
336 {
337 HttpExchange exchange = _pipeline;
338 _pipeline = null;
339 send(exchange);
340 }
341 }
342 }
343 }
344 }
345 }
346 }
347 }
348
349
350 public boolean isIdle()
351 {
352 synchronized (this)
353 {
354 return _exchange == null;
355 }
356 }
357
358
359 public EndPoint getEndPoint()
360 {
361 return _endp;
362 }
363
364
365 private void commitRequest() throws IOException
366 {
367 synchronized (this)
368 {
369 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
370 throw new IllegalStateException();
371
372 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
373 _generator.setVersion(_exchange._version);
374
375 String uri = _exchange._uri;
376 if (_destination.isProxied() && uri.startsWith("/"))
377 {
378
379 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
380 + _destination.getAddress().getPort() + uri;
381 Authorization auth = _destination.getProxyAuthentication();
382 if (auth != null)
383 auth.setCredentials(_exchange);
384 }
385
386 _generator.setRequest(_exchange._method,uri);
387
388 if (_exchange._version >= HttpVersions.HTTP_1_1_ORDINAL)
389 {
390 if (!_exchange._requestFields.containsKey(HttpHeaders.HOST_BUFFER))
391 _exchange._requestFields.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
392 }
393
394 if (_exchange._requestContent != null)
395 {
396 _exchange._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH,_exchange._requestContent.length());
397 _generator.completeHeader(_exchange._requestFields,false);
398 _generator.addContent(new View(_exchange._requestContent),true);
399 }
400 else if (_exchange._requestContentSource != null)
401 {
402 _generator.completeHeader(_exchange._requestFields,false);
403 int available = _exchange._requestContentSource.available();
404 if (available > 0)
405 {
406
407
408
409 byte[] buf = new byte[available];
410 int length = _exchange._requestContentSource.read(buf);
411 _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
412 }
413 }
414 else
415 {
416 _exchange._requestFields.remove(HttpHeaders.CONTENT_LENGTH);
417 _generator.completeHeader(_exchange._requestFields,true);
418 }
419
420 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
421 }
422 }
423
424
425 protected void reset(boolean returnBuffers) throws IOException
426 {
427 _requestComplete = false;
428 _connectionHeader = null;
429 _parser.reset(returnBuffers);
430 _generator.reset(returnBuffers);
431 _http11 = true;
432 }
433
434
435 private boolean shouldClose()
436 {
437 if (_connectionHeader!=null)
438 {
439 if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
440 return true;
441 if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
442 return false;
443 }
444 return !_http11;
445 }
446
447
448 private class Handler extends HttpParser.EventHandler
449 {
450 @Override
451 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
452 {
453
454
455
456
457
458 }
459
460 @Override
461 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
462 {
463 HttpExchange exchange = _exchange;
464 if (exchange!=null)
465 {
466 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
467 exchange.getEventListener().onResponseStatus(version,status,reason);
468 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
469 }
470 }
471
472 @Override
473 public void parsedHeader(Buffer name, Buffer value) throws IOException
474 {
475 HttpExchange exchange = _exchange;
476 if (exchange!=null)
477 {
478 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
479 {
480 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
481 }
482 exchange.getEventListener().onResponseHeader(name,value);
483 }
484 }
485
486 @Override
487 public void headerComplete() throws IOException
488 {
489 HttpExchange exchange = _exchange;
490 if (exchange!=null)
491 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
492 }
493
494 @Override
495 public void content(Buffer ref) throws IOException
496 {
497 HttpExchange exchange = _exchange;
498 if (exchange!=null)
499 exchange.getEventListener().onResponseContent(ref);
500 }
501
502 @Override
503 public void messageComplete(long contextLength) throws IOException
504 {
505 HttpExchange exchange = _exchange;
506 if (exchange!=null)
507 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
508 }
509 }
510
511
512 public String toString()
513 {
514 return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
515 }
516
517
518 public String toDetailString()
519 {
520 return toString() + " ex=" + _exchange + " " + _timeout.getAge();
521 }
522
523
524 public void close() throws IOException
525 {
526 try
527 {
528 _endp.close();
529 }
530 finally
531 {
532 HttpExchange exchange=_exchange;
533 if (exchange!=null)
534 {
535 int status = exchange.getStatus();
536 if (status>HttpExchange.STATUS_START && status<HttpExchange.STATUS_COMPLETED)
537 exchange.onException(new IOException("CLOSED"));
538 }
539 }
540 }
541
542
543
544 public void setIdleTimeout()
545 {
546 synchronized (this)
547 {
548 if (_idle.compareAndSet(false,true))
549 _destination.getHttpClient().scheduleIdle(_timeout);
550 else
551 throw new IllegalStateException();
552 }
553 }
554
555
556 public boolean cancelIdleTimeout()
557 {
558 synchronized (this)
559 {
560 if (_idle.compareAndSet(true,false))
561 {
562 _destination.getHttpClient().cancel(_timeout);
563 return true;
564 }
565 }
566
567 return false;
568 }
569
570
571
572
573 private class TimeoutTask extends Timeout.Task
574 {
575 public void expired()
576 {
577 HttpExchange ex=null;
578 try
579 {
580 synchronized (HttpConnection.this)
581 {
582 ex = _exchange;
583 _exchange = null;
584 if (ex != null)
585 {
586 _destination.returnConnection(HttpConnection.this,true);
587 }
588 else if (_idle.compareAndSet(true,false))
589 {
590 _destination.returnIdleConnection(HttpConnection.this);
591 }
592 }
593 }
594 catch (Exception e)
595 {
596 Log.debug(e);
597 }
598 finally
599 {
600 try
601 {
602 close();
603 }
604 catch (IOException e)
605 {
606 Log.ignore(e);
607 }
608
609 if (ex!=null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
610 {
611 ex.setStatus(HttpExchange.STATUS_EXPIRED);
612 }
613 }
614 }
615 }
616
617 }