001/* 002 * Copyright 2006 - 2013 Stefan Balev <stefan.balev@graphstream-project.org> 003 * Julien Baudry <julien.baudry@graphstream-project.org> Antoine Dutot 004 * <antoine.dutot@graphstream-project.org> Yoann Pigné 005 * <yoann.pigne@graphstream-project.org> Guilhelm Savin 006 * <guilhelm.savin@graphstream-project.org> 007 * 008 * This file is part of GraphStream <http://graphstream-project.org>. 009 * 010 * GraphStream is a library whose purpose is to handle static or dynamic graph, 011 * create them from scratch, file or any source and display them. 012 * 013 * This program is free software distributed under the terms of two licenses, 014 * the CeCILL-C license that fits European law, and the GNU Lesser General 015 * Public License. You can use, modify and/ or redistribute the software under 016 * the terms of the CeCILL-C license as circulated by CEA, CNRS and INRIA at the 017 * following URL <http://www.cecill.info> or under the terms of the GNU LGPL as 018 * published by the Free Software Foundation, either version 3 of the License, 019 * or (at your option) any later version. 020 * 021 * This program is distributed in the hope that it will be useful, but WITHOUT 022 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 023 * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more 024 * details. 025 * 026 * You should have received a copy of the GNU Lesser General Public License 027 * along with this program. If not, see <http://www.gnu.org/licenses/>. 028 * 029 * The fact that you are presently reading this means that you have had 030 * knowledge of the CeCILL-C and LGPL licenses and that you accept their terms. 031 */ 032package org.graphstream.stream.netstream; 033 034import java.io.IOException; 035import java.io.InputStream; 036import java.net.InetAddress; 037import java.net.InetSocketAddress; 038import java.net.UnknownHostException; 039import java.nio.ByteBuffer; 040import java.nio.channels.SelectionKey; 041import java.nio.channels.Selector; 042import java.nio.channels.ServerSocketChannel; 043import java.nio.channels.SocketChannel; 044import java.util.HashMap; 045import java.util.Iterator; 046import java.util.Set; 047 048import org.graphstream.stream.netstream.packing.NetStreamUnpacker; 049import org.graphstream.stream.thread.ThreadProxyPipe; 050import org.miv.mbox.net.PositionableByteArrayInputStream; 051 052/** 053 * <p> 054 * This class implements a receiver according to specifications the NetStream 055 * protocol. 056 * </p> 057 * 058 * <p> 059 * See {@link NetStreamConstants} for a full description of the protocol, the 060 * sender and the receiver. 061 * </p> 062 * 063 * @see NetStreamConstants 064 * @see NetStreamSender 065 * 066 * 067 * Copyright (c) 2010 University of Luxembourg 068 * 069 * NetStreamReceiver.java 070 * @since Aug 13, 2011 071 * 072 * @author Yoann Pigné 073 * 074 */ 075public class NetStreamReceiver extends Thread implements NetStreamDecoder { 076 077 /** 078 * the hostname this receiver is listening at. 079 */ 080 private String hostname; 081 082 /** 083 * the port listened to. 084 */ 085 private int port; 086 087 /** 088 * Receiver socket. 089 */ 090 protected ServerSocketChannel server; 091 092 /** 093 * Multiplexor. 094 */ 095 protected Selector selector; 096 097 /** 098 * Key for the selector. 099 */ 100 protected SelectionKey key; 101 102 /** 103 * While true, the received is running. 104 */ 105 protected boolean loop = true; 106 107 /** 108 * Show debugging messages. 109 */ 110 protected boolean debug = true; 111 112 /** 113 * Last encountered error. 114 */ 115 protected String lastError = null; 116 117 /** 118 * The current pipe commands are being written to. 119 */ 120 protected ThreadProxyPipe currentStream; 121 122 /** 123 * Utility class that decodes messages according to the NetStream Protocol 124 */ 125 protected NetStreamDecoder decoder; 126 127 128 /** 129 * Current active incoming connections. 130 */ 131 protected HashMap<SelectionKey, IncomingBuffer> incoming = new HashMap<SelectionKey, IncomingBuffer>(); 132 133 class DefaultUnpacker extends NetStreamUnpacker { 134 135 @Override 136 public ByteBuffer unpackMessage(ByteBuffer buffer, int startIndex, 137 int endIndex) { 138 return buffer; 139 } 140 141 @Override 142 public int unpackMessageSize(ByteBuffer buffer) { 143 return buffer.getInt(); 144 } 145 146 /* 147 * (non-Javadoc) 148 * 149 * @see 150 * org.graphstream.stream.netstream.packing.NetStreamUnpacker#sizeOfInt 151 * () 152 */ 153 @Override 154 public int sizeOfInt() { 155 return 4; 156 } 157 }; 158 private NetStreamUnpacker unpacker; 159 160 // Constructors 161 162 /** 163 * New NetStream Receiver, awaiting in its own thread at the given host name 164 * and port, for new graph events. 165 * 166 * @param hostname 167 * The host name to listen at messages. 168 * @param port 169 * The port to listen at messages. 170 */ 171 public NetStreamReceiver(String hostname, int port) throws IOException, 172 UnknownHostException { 173 this(hostname, port, false); 174 } 175 176 /** 177 * New NetStream Receiver, awaiting in its own thread at "localhost" on the 178 * given port, for new graph events. 179 * 180 * @param port 181 * The port to listen at messages. 182 */ 183 public NetStreamReceiver(int port) throws IOException, UnknownHostException { 184 this("localhost", port, false); 185 } 186 187 /** 188 * New NetStream Receiver, awaiting in its own thread at the given host name 189 * and port, for new graph events. 190 * 191 * @param hostname 192 * The host name to listen at messages. 193 * @param port 194 * The port to listen at messages. 195 * @param debug 196 * If true informations are output for each message received. 197 */ 198 public NetStreamReceiver(String hostname, int port, boolean debug) 199 throws IOException, UnknownHostException { 200 this.hostname = hostname; 201 this.port = port; 202 this.unpacker = new DefaultUnpacker(); 203 this.decoder = new DefaultNetStreamDecoder(); 204 setDebugOn(debug); 205 init(); 206 start(); 207 } 208 209 // Access 210 211 /** 212 * False as soon as the receiver terminates. 213 */ 214 public synchronized boolean isRunning() { 215 return loop; 216 } 217 218 219 // Commands 220 221 /** 222 * Initialize the server socket. 223 */ 224 protected void init() throws IOException, UnknownHostException { 225 selector = Selector.open(); 226 server = ServerSocketChannel.open(); 227 228 server.configureBlocking(false); 229 230 InetAddress ia = InetAddress.getByName(hostname); 231 InetSocketAddress isa = new InetSocketAddress(ia, port); 232 233 server.socket().bind(isa); 234 235 if (debug) 236 debug("bound to socket %s:%d", server.socket().getInetAddress(), 237 server.socket().getLocalPort()); 238 239 // Register a first server socket inside the multiplexer. 240 241 key = server.register(selector, SelectionKey.OP_ACCEPT); 242 } 243 244 /** 245 * Enable or disable debugging. 246 */ 247 public void setDebugOn(boolean on) { 248 debug = on; 249 decoder.setDebugOn(on); 250 } 251 252 253 /** 254 * Stop the receiver. 255 */ 256 public synchronized void quit() { 257 loop = false; 258 key.selector().wakeup(); 259 260 if (debug) 261 debug("stopped"); 262 } 263 264 /** 265 * Ask the receiver about its active connections 266 */ 267 public synchronized boolean hasActiveConnections() { 268 return !incoming.isEmpty(); 269 } 270 271 /** 272 * Sets an optional NetStreamUnpaker whose "unpack" method will be called on 273 * each message. 274 * 275 * It allows to do extra decoding on the all byte array message. You can 276 * also decrypt things. 277 * 278 * @param unpaker 279 */ 280 public void setUnpacker(NetStreamUnpacker unpaker) { 281 this.unpacker = unpaker; 282 } 283 public void removeUnpacker() { 284 unpacker = new DefaultUnpacker(); 285 } 286 287 /** 288 * Wait for connections, accept them, demultiplexes them and dispatch 289 * messages to registered message boxes. 290 */ 291 @Override 292 public void run() { 293 boolean l; 294 295 synchronized (this) { 296 l = loop; 297 } 298 299 while (l) { 300 poll(); 301 302 synchronized (this) { 303 l = loop; 304 } 305 } 306 307 try { 308 server.close(); 309 } catch (IOException e) { 310 error("cannot close the server socket: " + e.getMessage(), e); 311 } 312 313 if (debug) { 314 debug("receiver //" + hostname + ":" + port + " finished"); 315 } 316 } 317 318 /** 319 * Wait until one or several chunks of message are acceptable. This method 320 * should be called in a loop. It can be used to block a program until some 321 * data is available. 322 */ 323 public void poll() { 324 try { 325 // Wait for incoming messages in a loop. 326 327 if (key.selector().select() > 0) { 328 Set<?> readyKeys = selector.selectedKeys(); 329 Iterator<?> i = readyKeys.iterator(); 330 331 while (i.hasNext()) { 332 SelectionKey akey = (SelectionKey) i.next(); 333 334 i.remove(); 335 336 if (akey.isAcceptable()) { 337 // If a new connection occurs, register the new socket 338 // in the multiplexer. 339 340 ServerSocketChannel ssocket = (ServerSocketChannel) akey 341 .channel(); 342 SocketChannel socket = ssocket.accept(); 343 344 if (debug) 345 debug("accepting socket %s:%d", socket.socket() 346 .getInetAddress(), socket.socket() 347 .getPort()); 348 349 socket.configureBlocking(false); 350 socket.finishConnect(); 351 352 // SelectionKey otherKey = socket.register( selector, 353 // SelectionKey.OP_READ ); 354 socket.register(selector, SelectionKey.OP_READ); 355 } else if (akey.isReadable()) { 356 // If a message arrives, read it. 357 358 readDataChunk(akey); 359 } else if (akey.isWritable()) { 360 throw new RuntimeException("should not happen"); 361 } 362 } 363 } 364 } catch (IOException e) { 365 error(e, "I/O error in receiver //%s:%d thread: aborting: %s", 366 hostname, port, e.getMessage()); 367 368 loop = false; 369 } catch (Throwable e) { 370 error(e, "Unknown error: %s", e.getMessage()); 371 372 loop = false; 373 } 374 } 375 376 /** 377 * When data is readable on a socket, send it to the appropriate buffer 378 * (creating it if needed). 379 */ 380 protected void readDataChunk(SelectionKey key) throws IOException { 381 IncomingBuffer buf = incoming.get(key); 382 383 if (buf == null) { 384 buf = new IncomingBuffer(); 385 incoming.put(key, buf); 386 SocketChannel socket = (SocketChannel) key.channel(); 387 388 if (debug) 389 debug("creating buffer for new connection from %s:%d", socket 390 .socket().getInetAddress(), socket.socket().getPort()); 391 } 392 393 try { 394 buf.readDataChunk(key); 395 396 } catch (IOException e) { 397 incoming.remove(key); 398 e.printStackTrace(); 399 error(e, 400 "receiver //%s:%d cannot read object socket channel (I/O error): %s", 401 hostname, port, e.getMessage()); 402 loop = false; 403 } 404 405 if (!buf.active) { 406 incoming.remove(key); 407 if (debug) 408 debug("removing buffer %s from incoming for geting inactive. %d left", 409 key.toString(), incoming.size()); 410 411 } 412 413 } 414 415 // Utilities 416 417 protected void error(String message, Object... data) { 418 error(null, message, data); 419 } 420 421 protected static final String LIGHT_YELLOW = "[33;1m"; 422 protected static final String RESET = "[0m"; 423 424 protected void error(Throwable e, String message, Object... data) { 425 // System.err.print( LIGHT_YELLOW ); 426 System.err.print("["); 427 // System.err.print( RESET ); 428 System.err.printf(message, data); 429 // System.err.print( LIGHT_YELLOW ); 430 System.err.printf("]%n"); 431 // System.err.println( RESET ); 432 433 if (e != null) 434 e.printStackTrace(); 435 } 436 437 protected void debug(String message, Object... data) { 438 // System.err.print( LIGHT_YELLOW ); 439 System.err.printf("[//%s:%d | ", hostname, port); 440 // System.err.print( RESET ); 441 System.err.printf(message, data); 442 // System.err.print( LIGHT_YELLOW ); 443 System.err.printf("]%n"); 444 // System.err.println( RESET ); 445 } 446 447 // Nested classes 448 449 /** 450 * The connection to a sender. 451 * 452 * The receiver maintains several incoming connections and demultiplexes 453 * them. 454 */ 455 protected class IncomingBuffer { 456 // Attributes 457 458 protected static final int BUFFER_INITIAL_SIZE = 8192; // 65535, 4096 459 460 /** 461 * Buffer for reading. 462 */ 463 protected ByteBuffer buf = ByteBuffer.allocate(BUFFER_INITIAL_SIZE); 464 465 /** 466 * Index in the buffer past the last byte that forms the current 467 * message. End can be out of the buffer or out of the data read 468 * actually. 469 */ 470 protected int end = -1; 471 472 /** 473 * Index in the buffer of the first byte that forms the currents 474 * message. Beg does not count the 4 bytes that give the size of the 475 * message. While the header is being read, beg is the first byte of the 476 * header. 477 */ 478 protected int beg = 0; 479 480 /** 481 * Position inside beg and end past the last byte read. All bytes at and 482 * after pos have unspecified contents. Pos always verifies pos>=beg 483 * and pos<end. While the header is being read, pos is past the last 484 * byte of the header that has been read. 485 */ 486 protected int pos = 0; 487 488 /** 489 * Object input stream for reading the buffer. This input stream reads 490 * data from the "bin" positionable byte array input stream, itself 491 * mapped on the current message to decode. 492 */ 493 PositionableByteArrayInputStream in; 494 495 /** 496 * Input stream filter on the buffer. This descendant of 497 * ByteArrayInputStream is able to change its offset and length so that 498 * we can map exactly the message to decode inside the buffer. 499 */ 500 PositionableByteArrayInputStream bin; 501 502 /** 503 * When false the socket is closed and this buffer must be removed from 504 * the active connections. 505 */ 506 protected boolean active = true; 507 508 // Constructors 509 510 public IncomingBuffer() { 511 } 512 513 // Commands 514 515 /** 516 * Read the available bytes and buffers them. If one or more complete 517 * serialised objects are available, send them to their respective 518 * MBoxes. 519 * 520 * Here is the junk... 521 */ 522 public void readDataChunk(SelectionKey key) throws IOException { 523 int limit = 0; // Index past the last byte read during the current 524 // invocation. 525 int nbytes = 0; // Number of bytes read. 526 SocketChannel socket = (SocketChannel) key.channel(); 527 528 int sizeOfInt = unpacker.sizeOfInt(); 529 // Buffers the data. 530 531 nbytes = bufferize(pos, socket); 532 limit = pos + nbytes; 533 534 if (nbytes <= 0) 535 return; 536 537 if (debug) { 538 debug("<chunk (%d bytes) from " 539 + socket.socket().getInetAddress() + ":" 540 + socket.socket().getPort() + ">", nbytes); 541 int at = buf.position(); 542 for (int i = 0; i < nbytes; i++) { 543 System.err.printf("%d ", buf.get(at + i)); 544 } 545 System.err.println(); 546 buf.position(at); 547 } 548 // Read the first header. 549 550 if (end < 0) { 551 if ((limit - beg) >= sizeOfInt) { 552 // If no data has been read yet in the buffer or if the 553 // buffer 554 // was emptied completely at previous call: prepare to read 555 // a 556 // new message by decoding its header. 557 558 buf.position(0); 559 int size = unpacker.unpackMessageSize(buf); 560 end = size + sizeOfInt; 561 beg = sizeOfInt; 562 if (debug) 563 debug("start to bufferize a %d byte long messsage", 564 size); 565 } else { 566 // The header is incomplete, wait next call to complete it. 567 568 pos = limit; 569 } 570 } 571 572 // Read one or more messages or wait next call to buffers more. 573 574 if (end > 0) { 575 while (end < limit) { 576 // While the end of the message is in the limit of what was 577 // read, there are one or more complete messages. Decode 578 // them 579 // and read the header of the next message, until a message 580 // is 581 // incomplete or there are no more messages or a header is 582 // incomplete. 583 584 ByteBuffer unpackedBuffer = unpacker.unpackMessage(buf, beg, end); 585 if (unpackedBuffer == buf) { 586 in = new PositionableByteArrayInputStream(buf.array(), beg, end - beg); 587 } else { 588 in = new PositionableByteArrayInputStream( 589 unpackedBuffer.array(), 0, unpackedBuffer.capacity()); 590 } 591 592 decoder.decodeMessage(in); 593 buf.position(end); 594 595 if (end + sizeOfInt <= limit) { 596 // There is a following message. 597 598 beg = end + sizeOfInt; 599 end = end + unpacker.unpackMessageSize(buf) + sizeOfInt; 600 } else { 601 // There is the beginning of a following message 602 // but the header is incomplete. Compact the buffer 603 // and stop here. 604 assert (beg >= sizeOfInt); 605 606 beg = end; 607 int p = sizeOfInt - ((end + sizeOfInt) - limit); 608 compactBuffer(); 609 pos = p; 610 beg = 0; 611 end = -1; 612 break; 613 } 614 } 615 616 if (end == limit) { 617 // If the end of the message coincides with the limit of 618 // what 619 // was read we have one last complete message. We decode it 620 // and 621 // clear the buffer for the next call. 622 623 ByteBuffer unpackedBuffer = unpacker.unpackMessage(buf, beg, end); 624 if (unpackedBuffer == buf) { 625 in = new PositionableByteArrayInputStream(buf.array(), beg, end - beg); 626 } else { 627 in = new PositionableByteArrayInputStream( 628 unpackedBuffer.array(), 0, unpackedBuffer.capacity()); 629 } 630 631 decoder.decodeMessage(in); 632 633 buf.clear(); 634 pos = 0; 635 beg = 0; 636 end = -1; 637 } else if (end > limit) { 638 // If the end of the message if after what was read, prepare 639 // to 640 // read more at next call when we will have buffered more 641 // data. If we are at the end of the buffer compact it (else 642 // no 643 // more space will be available for buffering). 644 645 pos = limit; 646 647 if (end > buf.capacity()) 648 compactBuffer(); 649 } 650 } 651 } 652 653 /** 654 * Read more data from the <code>socket</code> and put it in the buffer 655 * at <code>at</code>. If the read returns -1 bytes (meaning the 656 * connection ended), the socket is closed and this buffer will be made 657 * inactive (and therefore removed from the active connections by the 658 * Receiver that called it). 659 * 660 * @return the number of bytes read. 661 * @throws IOException 662 * if an I/O error occurs, in between the socket is closed 663 * and the connection is made inactive, then the exception 664 * is thrown. 665 */ 666 protected int bufferize(int at, SocketChannel socket) 667 throws IOException { 668 int nbytes = 0; 669 // int limit = 0; 670 671 try { 672 buf.position(at); 673 674 nbytes = socket.read(buf); 675 676 if (nbytes < 0) { 677 active = false; 678 if (in != null) 679 in.close(); 680 socket.close(); 681 if (debug) 682 debug("socket from %s:%d closed", socket.socket() 683 .getInetAddress(), socket.socket().getPort()); 684 return nbytes; 685 } else if (nbytes == 0) { 686 throw new RuntimeException( 687 "should not happen: buffer to small, 0 bytes read: compact does not function? messages is larger than " 688 + buf.capacity() + "?"); 689 // This means that there are no bytes remaining in the 690 // buffer... it is full. 691 // compactBuffer(); 692 // return nbytes; 693 } 694 695 buf.position(at); 696 697 return nbytes; 698 } catch (IOException e) { 699 if (debug) 700 debug("socket from %s:%d I/O error: %s", socket.socket() 701 .getInetAddress(), socket.socket().getPort(), 702 e.getMessage()); 703 active = false; 704 if (in != null) 705 in.close(); 706 socket.close(); 707 throw e; 708 } 709 } 710 711 712 /** 713 * Compact the buffer by removing all read data before <code>beg</code>. 714 * The <code>beg</code>, <code>end</code> and <code>pos</code> markers 715 * are updated accordingly. Compact works only if beg is larger than 716 * four (the size of a header). 717 * 718 * @return the offset. 719 */ 720 protected int compactBuffer() { 721 if (beg > unpacker.sizeOfInt()) { 722 int off = beg; 723 724 buf.position(beg); 725 buf.limit(buf.capacity()); 726 buf.compact(); 727 728 pos -= beg; 729 end -= beg; 730 beg = 0; 731 732 return off; 733 } 734 735 return 0; 736 } 737 738 /** 739 * Not used in the current implementation, we assumes that no message 740 * will be larger than the size of the buffer. 741 */ 742 protected void enlargeBuffer() { 743 ByteBuffer tmp = ByteBuffer.allocate(buf.capacity() * 2); 744 745 buf.position(0); 746 buf.limit(buf.capacity()); 747 tmp.put(buf); 748 tmp.position(pos); 749 750 buf = tmp; 751 752 if (bin != null) 753 bin.changeBuffer(buf.array()); 754 } 755 } 756 757 /* (non-Javadoc) 758 * @see org.graphstream.stream.netstream.NetStreamDecoder#getStream(java.lang.String) 759 */ 760 public ThreadProxyPipe getStream(String name) { 761 return decoder.getStream(name); 762 } 763 764 /* (non-Javadoc) 765 * @see org.graphstream.stream.netstream.NetStreamDecoder#getDefaultStream() 766 */ 767 public ThreadProxyPipe getDefaultStream() { 768 return decoder.getDefaultStream(); 769 } 770 771 /* (non-Javadoc) 772 * @see org.graphstream.stream.netstream.NetStreamDecoder#register(java.lang.String, org.graphstream.stream.thread.ThreadProxyPipe) 773 */ 774 public void register(String name, ThreadProxyPipe stream) throws Exception { 775 decoder.register(name, stream); 776 } 777 778 /* (non-Javadoc) 779 * @see org.graphstream.stream.netstream.NetStreamDecoder#decodeMessage(java.io.InputStream) 780 */ 781 public void decodeMessage(InputStream in) throws IOException { 782 decoder.decodeMessage(in); 783 784 } 785 786 787}