001/* 002 * ==================================================================== 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, 014 * software distributed under the License is distributed on an 015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 016 * KIND, either express or implied. See the License for the 017 * specific language governing permissions and limitations 018 * under the License. 019 * ==================================================================== 020 * 021 * This software consists of voluntary contributions made by many 022 * individuals on behalf of the Apache Software Foundation. For more 023 * information on the Apache Software Foundation, please see 024 * <http://www.apache.org/>. 025 * 026 */ 027 028package org.apache.http.impl.nio.reactor; 029 030import java.io.IOException; 031import java.io.InterruptedIOException; 032import java.nio.channels.CancelledKeyException; 033import java.nio.channels.ClosedChannelException; 034import java.nio.channels.ClosedSelectorException; 035import java.nio.channels.SelectionKey; 036import java.nio.channels.Selector; 037import java.nio.channels.SocketChannel; 038import java.util.Collections; 039import java.util.HashSet; 040import java.util.Queue; 041import java.util.Set; 042import java.util.concurrent.ConcurrentLinkedQueue; 043 044import org.apache.http.nio.reactor.IOReactor; 045import org.apache.http.nio.reactor.IOReactorException; 046import org.apache.http.nio.reactor.IOReactorStatus; 047import org.apache.http.nio.reactor.IOSession; 048import org.apache.http.util.Args; 049import org.apache.http.util.Asserts; 050 051/** 052 * Generic implementation of {@link IOReactor} that can used as a subclass 053 * for more specialized I/O reactors. It is based on a single {@link Selector} 054 * instance. 055 * 056 * @since 4.0 057 */ 058public abstract class AbstractIOReactor implements IOReactor { 059 060 private volatile IOReactorStatus status; 061 062 private final Object statusMutex; 063 private final long selectTimeout; 064 private final boolean interestOpsQueueing; 065 private final Selector selector; 066 private final Set<IOSession> sessions; 067 private final Queue<InterestOpEntry> interestOpsQueue; 068 private final Queue<IOSession> closedSessions; 069 private final Queue<ChannelEntry> newChannels; 070 071 /** 072 * Creates new AbstractIOReactor instance. 073 * 074 * @param selectTimeout the select timeout. 075 * @throws IOReactorException in case if a non-recoverable I/O error. 076 */ 077 public AbstractIOReactor(final long selectTimeout) throws IOReactorException { 078 this(selectTimeout, false); 079 } 080 081 /** 082 * Creates new AbstractIOReactor instance. 083 * 084 * @param selectTimeout the select timeout. 085 * @param interestOpsQueueing Ops queueing flag. 086 * 087 * @throws IOReactorException in case if a non-recoverable I/O error. 088 * 089 * @since 4.1 090 */ 091 public AbstractIOReactor(final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException { 092 super(); 093 Args.positive(selectTimeout, "Select timeout"); 094 this.selectTimeout = selectTimeout; 095 this.interestOpsQueueing = interestOpsQueueing; 096 this.sessions = Collections.synchronizedSet(new HashSet<IOSession>()); 097 this.interestOpsQueue = new ConcurrentLinkedQueue<InterestOpEntry>(); 098 this.closedSessions = new ConcurrentLinkedQueue<IOSession>(); 099 this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>(); 100 try { 101 this.selector = Selector.open(); 102 } catch (final IOException ex) { 103 throw new IOReactorException("Failure opening selector", ex); 104 } 105 this.statusMutex = new Object(); 106 this.status = IOReactorStatus.INACTIVE; 107 } 108 109 /** 110 * Triggered when the key signals {@link SelectionKey#OP_ACCEPT} readiness. 111 * <p> 112 * Super-classes can implement this method to react to the event. 113 * 114 * @param key the selection key. 115 */ 116 protected abstract void acceptable(SelectionKey key); 117 118 /** 119 * Triggered when the key signals {@link SelectionKey#OP_CONNECT} readiness. 120 * <p> 121 * Super-classes can implement this method to react to the event. 122 * 123 * @param key the selection key. 124 */ 125 protected abstract void connectable(SelectionKey key); 126 127 /** 128 * Triggered when the key signals {@link SelectionKey#OP_READ} readiness. 129 * <p> 130 * Super-classes can implement this method to react to the event. 131 * 132 * @param key the selection key. 133 */ 134 protected abstract void readable(SelectionKey key); 135 136 /** 137 * Triggered when the key signals {@link SelectionKey#OP_WRITE} readiness. 138 * <p> 139 * Super-classes can implement this method to react to the event. 140 * 141 * @param key the selection key. 142 */ 143 protected abstract void writable(SelectionKey key); 144 145 /** 146 * Triggered to validate keys currently registered with the selector. This 147 * method is called after each I/O select loop. 148 * <p> 149 * Super-classes can implement this method to run validity checks on 150 * active sessions and include additional processing that needs to be 151 * executed after each I/O select loop. 152 * 153 * @param keys all selection keys registered with the selector. 154 */ 155 protected abstract void validate(Set<SelectionKey> keys); 156 157 /** 158 * Triggered when new session has been created. 159 * <p> 160 * Super-classes can implement this method to react to the event. 161 * 162 * @param key the selection key. 163 * @param session new I/O session. 164 */ 165 protected void sessionCreated(final SelectionKey key, final IOSession session) { 166 } 167 168 /** 169 * Triggered when a session has been closed. 170 * <p> 171 * Super-classes can implement this method to react to the event. 172 * 173 * @param session closed I/O session. 174 */ 175 protected void sessionClosed(final IOSession session) { 176 } 177 178 /** 179 * Triggered when a session has timed out. 180 * <p> 181 * Super-classes can implement this method to react to the event. 182 * 183 * @param session timed out I/O session. 184 */ 185 protected void sessionTimedOut(final IOSession session) { 186 } 187 188 /** 189 * Obtains {@link IOSession} instance associated with the given selection 190 * key. 191 * 192 * @param key the selection key. 193 * @return I/O session. 194 */ 195 protected IOSession getSession(final SelectionKey key) { 196 return (IOSession) key.attachment(); 197 } 198 199 @Override 200 public IOReactorStatus getStatus() { 201 return this.status; 202 } 203 204 /** 205 * Returns {@code true} if interest Ops queueing is enabled, {@code false} otherwise. 206 * 207 * @since 4.1 208 */ 209 public boolean getInterestOpsQueueing() { 210 return this.interestOpsQueueing; 211 } 212 213 /** 214 * Adds new channel entry. The channel will be asynchronously registered 215 * with the selector. 216 * 217 * @param channelEntry the channel entry. 218 */ 219 public void addChannel(final ChannelEntry channelEntry) { 220 Args.notNull(channelEntry, "Channel entry"); 221 this.newChannels.add(channelEntry); 222 this.selector.wakeup(); 223 } 224 225 /** 226 * Activates the I/O reactor. The I/O reactor will start reacting to 227 * I/O events and triggering notification methods. 228 * <p> 229 * This method will enter the infinite I/O select loop on 230 * the {@link Selector} instance associated with this I/O reactor. 231 * <p> 232 * The method will remain blocked unto the I/O reactor is shut down or the 233 * execution thread is interrupted. 234 * 235 * @see #acceptable(SelectionKey) 236 * @see #connectable(SelectionKey) 237 * @see #readable(SelectionKey) 238 * @see #writable(SelectionKey) 239 * @see #timeoutCheck(SelectionKey, long) 240 * @see #validate(Set) 241 * @see #sessionCreated(SelectionKey, IOSession) 242 * @see #sessionClosed(IOSession) 243 * 244 * @throws InterruptedIOException if the dispatch thread is interrupted. 245 * @throws IOReactorException in case if a non-recoverable I/O error. 246 */ 247 protected void execute() throws InterruptedIOException, IOReactorException { 248 this.status = IOReactorStatus.ACTIVE; 249 250 try { 251 for (;;) { 252 253 final int readyCount; 254 try { 255 readyCount = this.selector.select(this.selectTimeout); 256 } catch (final InterruptedIOException ex) { 257 throw ex; 258 } catch (final IOException ex) { 259 throw new IOReactorException("Unexpected selector failure", ex); 260 } 261 262 if (this.status == IOReactorStatus.SHUT_DOWN) { 263 // Hard shut down. Exit select loop immediately 264 break; 265 } 266 267 if (this.status == IOReactorStatus.SHUTTING_DOWN) { 268 // Graceful shutdown in process 269 // Try to close things out nicely 270 closeSessions(); 271 closeNewChannels(); 272 } 273 274 // Process selected I/O events 275 if (readyCount > 0) { 276 processEvents(this.selector.selectedKeys()); 277 } 278 279 // Validate active channels 280 validate(this.selector.keys()); 281 282 // Process closed sessions 283 processClosedSessions(); 284 285 // If active process new channels 286 if (this.status == IOReactorStatus.ACTIVE) { 287 processNewChannels(); 288 } 289 290 // Exit select loop if graceful shutdown has been completed 291 if (this.status.compareTo(IOReactorStatus.ACTIVE) > 0 292 && this.sessions.isEmpty()) { 293 break; 294 } 295 296 if (this.interestOpsQueueing) { 297 // process all pending interestOps() operations 298 processPendingInterestOps(); 299 } 300 301 } 302 303 } catch (final ClosedSelectorException ignore) { 304 } finally { 305 hardShutdown(); 306 synchronized (this.statusMutex) { 307 this.statusMutex.notifyAll(); 308 } 309 } 310 } 311 312 private void processEvents(final Set<SelectionKey> selectedKeys) { 313 for (final SelectionKey key : selectedKeys) { 314 315 processEvent(key); 316 317 } 318 selectedKeys.clear(); 319 } 320 321 /** 322 * Processes new event on the given selection key. 323 * 324 * @param key the selection key that triggered an event. 325 */ 326 protected void processEvent(final SelectionKey key) { 327 final IOSessionImpl session = (IOSessionImpl) key.attachment(); 328 try { 329 if (key.isAcceptable()) { 330 acceptable(key); 331 } 332 if (key.isConnectable()) { 333 connectable(key); 334 } 335 if (key.isReadable()) { 336 session.resetLastRead(); 337 readable(key); 338 } 339 if (key.isWritable()) { 340 session.resetLastWrite(); 341 writable(key); 342 } 343 } catch (final CancelledKeyException ex) { 344 queueClosedSession(session); 345 key.attach(null); 346 } 347 } 348 349 /** 350 * Queues the given I/O session to be processed asynchronously as closed. 351 * 352 * @param session the closed I/O session. 353 */ 354 protected void queueClosedSession(final IOSession session) { 355 if (session != null) { 356 this.closedSessions.add(session); 357 } 358 } 359 360 private void processNewChannels() throws IOReactorException { 361 ChannelEntry entry; 362 while ((entry = this.newChannels.poll()) != null) { 363 364 final SocketChannel channel; 365 final SelectionKey key; 366 try { 367 channel = entry.getChannel(); 368 channel.configureBlocking(false); 369 key = channel.register(this.selector, SelectionKey.OP_READ); 370 } catch (final ClosedChannelException ex) { 371 final SessionRequestImpl sessionRequest = entry.getSessionRequest(); 372 if (sessionRequest != null) { 373 sessionRequest.failed(ex); 374 } 375 return; 376 377 } catch (final IOException ex) { 378 throw new IOReactorException("Failure registering channel " + 379 "with the selector", ex); 380 } 381 382 final SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() { 383 384 @Override 385 public void sessionClosed(final IOSession session) { 386 queueClosedSession(session); 387 } 388 389 }; 390 391 InterestOpsCallback interestOpsCallback = null; 392 if (this.interestOpsQueueing) { 393 interestOpsCallback = new InterestOpsCallback() { 394 395 @Override 396 public void addInterestOps(final InterestOpEntry entry) { 397 queueInterestOps(entry); 398 } 399 400 }; 401 } 402 403 final IOSession session; 404 try { 405 session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback); 406 int timeout = 0; 407 try { 408 timeout = channel.socket().getSoTimeout(); 409 } catch (final IOException ex) { 410 // Very unlikely to happen and is not fatal 411 // as the protocol layer is expected to overwrite 412 // this value anyways 413 } 414 415 session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment()); 416 session.setSocketTimeout(timeout); 417 } catch (final CancelledKeyException ex) { 418 continue; 419 } 420 try { 421 this.sessions.add(session); 422 final SessionRequestImpl sessionRequest = entry.getSessionRequest(); 423 if (sessionRequest != null) { 424 sessionRequest.completed(session); 425 } 426 key.attach(session); 427 sessionCreated(key, session); 428 } catch (final CancelledKeyException ex) { 429 queueClosedSession(session); 430 key.attach(null); 431 } 432 } 433 } 434 435 private void processClosedSessions() { 436 IOSession session; 437 while ((session = this.closedSessions.poll()) != null) { 438 if (this.sessions.remove(session)) { 439 try { 440 sessionClosed(session); 441 } catch (final CancelledKeyException ex) { 442 // ignore and move on 443 } 444 } 445 } 446 } 447 448 private void processPendingInterestOps() { 449 // validity check 450 if (!this.interestOpsQueueing) { 451 return; 452 } 453 InterestOpEntry entry; 454 while ((entry = this.interestOpsQueue.poll()) != null) { 455 // obtain the operation's details 456 final SelectionKey key = entry.getSelectionKey(); 457 final int eventMask = entry.getEventMask(); 458 if (key.isValid()) { 459 key.interestOps(eventMask); 460 } 461 } 462 } 463 464 private boolean queueInterestOps(final InterestOpEntry entry) { 465 // validity checks 466 Asserts.check(this.interestOpsQueueing, "Interest ops queueing not enabled"); 467 if (entry == null) { 468 return false; 469 } 470 471 // add this operation to the interestOps() queue 472 this.interestOpsQueue.add(entry); 473 474 return true; 475 } 476 477 /** 478 * Triggered to verify whether the I/O session associated with the 479 * given selection key has not timed out. 480 * <p> 481 * Super-classes can implement this method to react to the event. 482 * 483 * @param key the selection key. 484 * @param now current time as long value. 485 */ 486 protected void timeoutCheck(final SelectionKey key, final long now) { 487 final IOSessionImpl session = (IOSessionImpl) key.attachment(); 488 if (session != null) { 489 final int timeout = session.getSocketTimeout(); 490 if (timeout > 0) { 491 if (session.getLastAccessTime() + timeout < now) { 492 sessionTimedOut(session); 493 } 494 } 495 } 496 } 497 498 /** 499 * Closes out all I/O sessions maintained by this I/O reactor. 500 */ 501 protected void closeSessions() { 502 synchronized (this.sessions) { 503 for (final IOSession session : this.sessions) { 504 session.close(); 505 } 506 } 507 } 508 509 /** 510 * Closes out all new channels pending registration with the selector of 511 * this I/O reactor. 512 * @throws IOReactorException - not thrown currently 513 */ 514 protected void closeNewChannels() throws IOReactorException { 515 ChannelEntry entry; 516 while ((entry = this.newChannels.poll()) != null) { 517 final SessionRequestImpl sessionRequest = entry.getSessionRequest(); 518 if (sessionRequest != null) { 519 sessionRequest.cancel(); 520 } 521 final SocketChannel channel = entry.getChannel(); 522 try { 523 channel.close(); 524 } catch (final IOException ignore) { 525 } 526 } 527 } 528 529 /** 530 * Closes out all active channels registered with the selector of 531 * this I/O reactor. 532 * @throws IOReactorException - not thrown currently 533 */ 534 protected void closeActiveChannels() throws IOReactorException { 535 try { 536 final Set<SelectionKey> keys = this.selector.keys(); 537 for (final SelectionKey key : keys) { 538 final IOSession session = getSession(key); 539 if (session != null) { 540 session.close(); 541 } 542 } 543 this.selector.close(); 544 } catch (final IOException ignore) { 545 } 546 } 547 548 /** 549 * Attempts graceful shutdown of this I/O reactor. 550 */ 551 public void gracefulShutdown() { 552 synchronized (this.statusMutex) { 553 if (this.status != IOReactorStatus.ACTIVE) { 554 // Already shutting down 555 return; 556 } 557 this.status = IOReactorStatus.SHUTTING_DOWN; 558 } 559 this.selector.wakeup(); 560 } 561 562 /** 563 * Attempts force-shutdown of this I/O reactor. 564 */ 565 public void hardShutdown() throws IOReactorException { 566 synchronized (this.statusMutex) { 567 if (this.status == IOReactorStatus.SHUT_DOWN) { 568 // Already shut down 569 return; 570 } 571 this.status = IOReactorStatus.SHUT_DOWN; 572 } 573 574 closeNewChannels(); 575 closeActiveChannels(); 576 processClosedSessions(); 577 } 578 579 /** 580 * Blocks for the given period of time in milliseconds awaiting 581 * the completion of the reactor shutdown. 582 * 583 * @param timeout the maximum wait time. 584 * @throws InterruptedException if interrupted. 585 */ 586 public void awaitShutdown(final long timeout) throws InterruptedException { 587 synchronized (this.statusMutex) { 588 final long deadline = System.currentTimeMillis() + timeout; 589 long remaining = timeout; 590 while (this.status != IOReactorStatus.SHUT_DOWN) { 591 this.statusMutex.wait(remaining); 592 if (timeout > 0) { 593 remaining = deadline - System.currentTimeMillis(); 594 if (remaining <= 0) { 595 break; 596 } 597 } 598 } 599 } 600 } 601 602 @Override 603 public void shutdown(final long gracePeriod) throws IOReactorException { 604 if (this.status != IOReactorStatus.INACTIVE) { 605 gracefulShutdown(); 606 try { 607 awaitShutdown(gracePeriod); 608 } catch (final InterruptedException ignore) { 609 } 610 } 611 if (this.status != IOReactorStatus.SHUT_DOWN) { 612 hardShutdown(); 613 } 614 } 615 616 @Override 617 public void shutdown() throws IOReactorException { 618 shutdown(1000); 619 } 620 621}