001/*
002 * ====================================================================
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 * ====================================================================
020 *
021 * This software consists of voluntary contributions made by many
022 * individuals on behalf of the Apache Software Foundation.  For more
023 * information on the Apache Software Foundation, please see
024 * <http://www.apache.org/>.
025 *
026 */
027package org.apache.http.pool;
028
029import java.io.IOException;
030import java.util.Date;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.LinkedList;
035import java.util.Map;
036import java.util.Set;
037import java.util.concurrent.ExecutionException;
038import java.util.concurrent.Future;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.TimeoutException;
041import java.util.concurrent.atomic.AtomicBoolean;
042import java.util.concurrent.atomic.AtomicReference;
043import java.util.concurrent.locks.Condition;
044import java.util.concurrent.locks.Lock;
045import java.util.concurrent.locks.ReentrantLock;
046
047import org.apache.http.annotation.Contract;
048import org.apache.http.annotation.ThreadingBehavior;
049import org.apache.http.concurrent.FutureCallback;
050import org.apache.http.util.Args;
051import org.apache.http.util.Asserts;
052
053/**
054 * Abstract synchronous (blocking) pool of connections.
055 * <p>
056 * Please note that this class does not maintain its own pool of execution {@link Thread}s.
057 * Therefore, one <b>must</b> call {@link Future#get()} or {@link Future#get(long, TimeUnit)}
058 * method on the {@link Future} object returned by the
059 * {@link #lease(Object, Object, FutureCallback)} method in order for the lease operation
060 * to complete.
061 *
062 * @param <T> the route type that represents the opposite endpoint of a pooled
063 *   connection.
064 * @param <C> the connection type.
065 * @param <E> the type of the pool entry containing a pooled connection.
066 * @since 4.2
067 */
068@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
069public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
070                                               implements ConnPool<T, E>, ConnPoolControl<T> {
071
072    private final Lock lock;
073    private final Condition condition;
074    private final ConnFactory<T, C> connFactory;
075    private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
076    private final Set<E> leased;
077    private final LinkedList<E> available;
078    private final LinkedList<Future<E>> pending;
079    private final Map<T, Integer> maxPerRoute;
080
081    private volatile boolean isShutDown;
082    private volatile int defaultMaxPerRoute;
083    private volatile int maxTotal;
084    private volatile int validateAfterInactivity;
085
086    public AbstractConnPool(
087            final ConnFactory<T, C> connFactory,
088            final int defaultMaxPerRoute,
089            final int maxTotal) {
090        super();
091        this.connFactory = Args.notNull(connFactory, "Connection factory");
092        this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");
093        this.maxTotal = Args.positive(maxTotal, "Max total value");
094        this.lock = new ReentrantLock();
095        this.condition = this.lock.newCondition();
096        this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();
097        this.leased = new HashSet<E>();
098        this.available = new LinkedList<E>();
099        this.pending = new LinkedList<Future<E>>();
100        this.maxPerRoute = new HashMap<T, Integer>();
101    }
102
103    /**
104     * Creates a new entry for the given connection with the given route.
105     */
106    protected abstract E createEntry(T route, C conn);
107
108    /**
109     * @since 4.3
110     */
111    protected void onLease(final E entry) {
112    }
113
114    /**
115     * @since 4.3
116     */
117    protected void onRelease(final E entry) {
118    }
119
120    /**
121     * @since 4.4
122     */
123    protected void onReuse(final E entry) {
124    }
125
126    /**
127     * @since 4.4
128     */
129    protected boolean validate(final E entry) {
130        return true;
131    }
132
133    public boolean isShutdown() {
134        return this.isShutDown;
135    }
136
137    /**
138     * Shuts down the pool.
139     */
140    public void shutdown() throws IOException {
141        if (this.isShutDown) {
142            return ;
143        }
144        this.isShutDown = true;
145        this.lock.lock();
146        try {
147            for (final E entry: this.available) {
148                entry.close();
149            }
150            for (final E entry: this.leased) {
151                entry.close();
152            }
153            for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {
154                pool.shutdown();
155            }
156            this.routeToPool.clear();
157            this.leased.clear();
158            this.available.clear();
159        } finally {
160            this.lock.unlock();
161        }
162    }
163
164    private RouteSpecificPool<T, C, E> getPool(final T route) {
165        RouteSpecificPool<T, C, E> pool = this.routeToPool.get(route);
166        if (pool == null) {
167            pool = new RouteSpecificPool<T, C, E>(route) {
168
169                @Override
170                protected E createEntry(final C conn) {
171                    return AbstractConnPool.this.createEntry(route, conn);
172                }
173
174            };
175            this.routeToPool.put(route, pool);
176        }
177        return pool;
178    }
179
180    /**
181     * {@inheritDoc}
182     * <p>
183     * Please note that this class does not maintain its own pool of execution
184     * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
185     * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
186     * returned by this method in order for the lease operation to complete.
187     */
188    @Override
189    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
190        Args.notNull(route, "Route");
191        Asserts.check(!this.isShutDown, "Connection pool shut down");
192
193        return new Future<E>() {
194
195            private final AtomicBoolean cancelled = new AtomicBoolean(false);
196            private final AtomicBoolean done = new AtomicBoolean(false);
197            private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
198
199            @Override
200            public boolean cancel(final boolean mayInterruptIfRunning) {
201                if (cancelled.compareAndSet(false, true)) {
202                    done.set(true);
203                    lock.lock();
204                    try {
205                        condition.signalAll();
206                    } finally {
207                        lock.unlock();
208                    }
209                    if (callback != null) {
210                        callback.cancelled();
211                    }
212                    return true;
213                } else {
214                    return false;
215                }
216            }
217
218            @Override
219            public boolean isCancelled() {
220                return cancelled.get();
221            }
222
223            @Override
224            public boolean isDone() {
225                return done.get();
226            }
227
228            @Override
229            public E get() throws InterruptedException, ExecutionException {
230                try {
231                    return get(0L, TimeUnit.MILLISECONDS);
232                } catch (final TimeoutException ex) {
233                    throw new ExecutionException(ex);
234                }
235            }
236
237            @Override
238            public E get(final long timeout, final TimeUnit tunit) throws InterruptedException, ExecutionException, TimeoutException {
239                final E entry = entryRef.get();
240                if (entry != null) {
241                    return entry;
242                }
243                synchronized (this) {
244                    try {
245                        for (;;) {
246                            final E leasedEntry = getPoolEntryBlocking(route, state, timeout, tunit, this);
247                            if (validateAfterInactivity > 0)  {
248                                if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {
249                                    if (!validate(leasedEntry)) {
250                                        leasedEntry.close();
251                                        release(leasedEntry, false);
252                                        continue;
253                                    }
254                                }
255                            }
256                            entryRef.set(leasedEntry);
257                            done.set(true);
258                            onLease(leasedEntry);
259                            if (callback != null) {
260                                callback.completed(leasedEntry);
261                            }
262                            return leasedEntry;
263                        }
264                    } catch (final IOException ex) {
265                        done.set(true);
266                        if (callback != null) {
267                            callback.failed(ex);
268                        }
269                        throw new ExecutionException(ex);
270                    }
271                }
272            }
273
274        };
275    }
276
277    /**
278     * Attempts to lease a connection for the given route and with the given
279     * state from the pool.
280     * <p>
281     * Please note that this class does not maintain its own pool of execution
282     * {@link Thread}s. Therefore, one <b>must</b> call {@link Future#get()}
283     * or {@link Future#get(long, TimeUnit)} method on the {@link Future}
284     * returned by this method in order for the lease operation to complete.
285     *
286     * @param route route of the connection.
287     * @param state arbitrary object that represents a particular state
288     *  (usually a security principal or a unique token identifying
289     *  the user whose credentials have been used while establishing the connection).
290     *  May be {@code null}.
291     * @return future for a leased pool entry.
292     */
293    public Future<E> lease(final T route, final Object state) {
294        return lease(route, state, null);
295    }
296
297    private E getPoolEntryBlocking(
298            final T route, final Object state,
299            final long timeout, final TimeUnit tunit,
300            final Future<E> future) throws IOException, InterruptedException, TimeoutException {
301
302        Date deadline = null;
303        if (timeout > 0) {
304            deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout));
305        }
306        this.lock.lock();
307        try {
308            final RouteSpecificPool<T, C, E> pool = getPool(route);
309            E entry;
310            for (;;) {
311                Asserts.check(!this.isShutDown, "Connection pool shut down");
312                for (;;) {
313                    entry = pool.getFree(state);
314                    if (entry == null) {
315                        break;
316                    }
317                    if (entry.isExpired(System.currentTimeMillis())) {
318                        entry.close();
319                    }
320                    if (entry.isClosed()) {
321                        this.available.remove(entry);
322                        pool.free(entry, false);
323                    } else {
324                        break;
325                    }
326                }
327                if (entry != null) {
328                    this.available.remove(entry);
329                    this.leased.add(entry);
330                    onReuse(entry);
331                    return entry;
332                }
333
334                // New connection is needed
335                final int maxPerRoute = getMax(route);
336                // Shrink the pool prior to allocating a new connection
337                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
338                if (excess > 0) {
339                    for (int i = 0; i < excess; i++) {
340                        final E lastUsed = pool.getLastUsed();
341                        if (lastUsed == null) {
342                            break;
343                        }
344                        lastUsed.close();
345                        this.available.remove(lastUsed);
346                        pool.remove(lastUsed);
347                    }
348                }
349
350                if (pool.getAllocatedCount() < maxPerRoute) {
351                    final int totalUsed = this.leased.size();
352                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
353                    if (freeCapacity > 0) {
354                        final int totalAvailable = this.available.size();
355                        if (totalAvailable > freeCapacity - 1) {
356                            if (!this.available.isEmpty()) {
357                                final E lastUsed = this.available.removeLast();
358                                lastUsed.close();
359                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
360                                otherpool.remove(lastUsed);
361                            }
362                        }
363                        final C conn = this.connFactory.create(route);
364                        entry = pool.add(conn);
365                        this.leased.add(entry);
366                        return entry;
367                    }
368                }
369
370                boolean success = false;
371                try {
372                    if (future.isCancelled()) {
373                        throw new InterruptedException("Operation interrupted");
374                    }
375                    pool.queue(future);
376                    this.pending.add(future);
377                    if (deadline != null) {
378                        success = this.condition.awaitUntil(deadline);
379                    } else {
380                        this.condition.await();
381                        success = true;
382                    }
383                    if (future.isCancelled()) {
384                        throw new InterruptedException("Operation interrupted");
385                    }
386                } finally {
387                    // In case of 'success', we were woken up by the
388                    // connection pool and should now have a connection
389                    // waiting for us, or else we're shutting down.
390                    // Just continue in the loop, both cases are checked.
391                    pool.unqueue(future);
392                    this.pending.remove(future);
393                }
394                // check for spurious wakeup vs. timeout
395                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
396                    break;
397                }
398            }
399            throw new TimeoutException("Timeout waiting for connection");
400        } finally {
401            this.lock.unlock();
402        }
403    }
404
405    @Override
406    public void release(final E entry, final boolean reusable) {
407        this.lock.lock();
408        try {
409            if (this.leased.remove(entry)) {
410                final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
411                pool.free(entry, reusable);
412                if (reusable && !this.isShutDown) {
413                    this.available.addFirst(entry);
414                } else {
415                    entry.close();
416                }
417                onRelease(entry);
418                Future<E> future = pool.nextPending();
419                if (future != null) {
420                    this.pending.remove(future);
421                } else {
422                    future = this.pending.poll();
423                }
424                if (future != null) {
425                    this.condition.signalAll();
426                }
427            }
428        } finally {
429            this.lock.unlock();
430        }
431    }
432
433    private int getMax(final T route) {
434        final Integer v = this.maxPerRoute.get(route);
435        if (v != null) {
436            return v.intValue();
437        } else {
438            return this.defaultMaxPerRoute;
439        }
440    }
441
442    @Override
443    public void setMaxTotal(final int max) {
444        Args.positive(max, "Max value");
445        this.lock.lock();
446        try {
447            this.maxTotal = max;
448        } finally {
449            this.lock.unlock();
450        }
451    }
452
453    @Override
454    public int getMaxTotal() {
455        this.lock.lock();
456        try {
457            return this.maxTotal;
458        } finally {
459            this.lock.unlock();
460        }
461    }
462
463    @Override
464    public void setDefaultMaxPerRoute(final int max) {
465        Args.positive(max, "Max per route value");
466        this.lock.lock();
467        try {
468            this.defaultMaxPerRoute = max;
469        } finally {
470            this.lock.unlock();
471        }
472    }
473
474    @Override
475    public int getDefaultMaxPerRoute() {
476        this.lock.lock();
477        try {
478            return this.defaultMaxPerRoute;
479        } finally {
480            this.lock.unlock();
481        }
482    }
483
484    @Override
485    public void setMaxPerRoute(final T route, final int max) {
486        Args.notNull(route, "Route");
487        Args.positive(max, "Max per route value");
488        this.lock.lock();
489        try {
490            this.maxPerRoute.put(route, Integer.valueOf(max));
491        } finally {
492            this.lock.unlock();
493        }
494    }
495
496    @Override
497    public int getMaxPerRoute(final T route) {
498        Args.notNull(route, "Route");
499        this.lock.lock();
500        try {
501            return getMax(route);
502        } finally {
503            this.lock.unlock();
504        }
505    }
506
507    @Override
508    public PoolStats getTotalStats() {
509        this.lock.lock();
510        try {
511            return new PoolStats(
512                    this.leased.size(),
513                    this.pending.size(),
514                    this.available.size(),
515                    this.maxTotal);
516        } finally {
517            this.lock.unlock();
518        }
519    }
520
521    @Override
522    public PoolStats getStats(final T route) {
523        Args.notNull(route, "Route");
524        this.lock.lock();
525        try {
526            final RouteSpecificPool<T, C, E> pool = getPool(route);
527            return new PoolStats(
528                    pool.getLeasedCount(),
529                    pool.getPendingCount(),
530                    pool.getAvailableCount(),
531                    getMax(route));
532        } finally {
533            this.lock.unlock();
534        }
535    }
536
537    /**
538     * Returns snapshot of all knows routes
539     * @return the set of routes
540     *
541     * @since 4.4
542     */
543    public Set<T> getRoutes() {
544        this.lock.lock();
545        try {
546            return new HashSet<T>(routeToPool.keySet());
547        } finally {
548            this.lock.unlock();
549        }
550    }
551
552    /**
553     * Enumerates all available connections.
554     *
555     * @since 4.3
556     */
557    protected void enumAvailable(final PoolEntryCallback<T, C> callback) {
558        this.lock.lock();
559        try {
560            final Iterator<E> it = this.available.iterator();
561            while (it.hasNext()) {
562                final E entry = it.next();
563                callback.process(entry);
564                if (entry.isClosed()) {
565                    final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());
566                    pool.remove(entry);
567                    it.remove();
568                }
569            }
570            purgePoolMap();
571        } finally {
572            this.lock.unlock();
573        }
574    }
575
576    /**
577     * Enumerates all leased connections.
578     *
579     * @since 4.3
580     */
581    protected void enumLeased(final PoolEntryCallback<T, C> callback) {
582        this.lock.lock();
583        try {
584            final Iterator<E> it = this.leased.iterator();
585            while (it.hasNext()) {
586                final E entry = it.next();
587                callback.process(entry);
588            }
589        } finally {
590            this.lock.unlock();
591        }
592    }
593
594    private void purgePoolMap() {
595        final Iterator<Map.Entry<T, RouteSpecificPool<T, C, E>>> it = this.routeToPool.entrySet().iterator();
596        while (it.hasNext()) {
597            final Map.Entry<T, RouteSpecificPool<T, C, E>> entry = it.next();
598            final RouteSpecificPool<T, C, E> pool = entry.getValue();
599            if (pool.getPendingCount() + pool.getAllocatedCount() == 0) {
600                it.remove();
601            }
602        }
603    }
604
605    /**
606     * Closes connections that have been idle longer than the given period
607     * of time and evicts them from the pool.
608     *
609     * @param idletime maximum idle time.
610     * @param tunit time unit.
611     */
612    public void closeIdle(final long idletime, final TimeUnit tunit) {
613        Args.notNull(tunit, "Time unit");
614        long time = tunit.toMillis(idletime);
615        if (time < 0) {
616            time = 0;
617        }
618        final long deadline = System.currentTimeMillis() - time;
619        enumAvailable(new PoolEntryCallback<T, C>() {
620
621            @Override
622            public void process(final PoolEntry<T, C> entry) {
623                if (entry.getUpdated() <= deadline) {
624                    entry.close();
625                }
626            }
627
628        });
629    }
630
631    /**
632     * Closes expired connections and evicts them from the pool.
633     */
634    public void closeExpired() {
635        final long now = System.currentTimeMillis();
636        enumAvailable(new PoolEntryCallback<T, C>() {
637
638            @Override
639            public void process(final PoolEntry<T, C> entry) {
640                if (entry.isExpired(now)) {
641                    entry.close();
642                }
643            }
644
645        });
646    }
647
648    /**
649     * @return the number of milliseconds
650     * @since 4.4
651     */
652    public int getValidateAfterInactivity() {
653        return this.validateAfterInactivity;
654    }
655
656    /**
657     * @param ms the number of milliseconds
658     * @since 4.4
659     */
660    public void setValidateAfterInactivity(final int ms) {
661        this.validateAfterInactivity = ms;
662    }
663
664    @Override
665    public String toString() {
666        final StringBuilder buffer = new StringBuilder();
667        buffer.append("[leased: ");
668        buffer.append(this.leased);
669        buffer.append("][available: ");
670        buffer.append(this.available);
671        buffer.append("][pending: ");
672        buffer.append(this.pending);
673        buffer.append("]");
674        return buffer.toString();
675    }
676
677}