001/* 002 * Copyright 2006 - 2013 003 * Stefan Balev <stefan.balev@graphstream-project.org> 004 * Julien Baudry <julien.baudry@graphstream-project.org> 005 * Antoine Dutot <antoine.dutot@graphstream-project.org> 006 * Yoann Pigné <yoann.pigne@graphstream-project.org> 007 * Guilhelm Savin <guilhelm.savin@graphstream-project.org> 008 * 009 * This file is part of GraphStream <http://graphstream-project.org>. 010 * 011 * GraphStream is a library whose purpose is to handle static or dynamic 012 * graph, create them from scratch, file or any source and display them. 013 * 014 * This program is free software distributed under the terms of two licenses, the 015 * CeCILL-C license that fits European law, and the GNU Lesser General Public 016 * License. You can use, modify and/ or redistribute the software under the terms 017 * of the CeCILL-C license as circulated by CEA, CNRS and INRIA at the following 018 * URL <http://www.cecill.info> or under the terms of the GNU LGPL as published by 019 * the Free Software Foundation, either version 3 of the License, or (at your 020 * option) any later version. 021 * 022 * This program is distributed in the hope that it will be useful, but WITHOUT ANY 023 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A 024 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. 025 * 026 * You should have received a copy of the GNU Lesser General Public License 027 * along with this program. If not, see <http://www.gnu.org/licenses/>. 028 * 029 * The fact that you are presently reading this means that you have had 030 * knowledge of the CeCILL-C and LGPL licenses and that you accept their terms. 031 */ 032package org.graphstream.stream.thread; 033 034import org.graphstream.graph.Edge; 035import org.graphstream.graph.Graph; 036import org.graphstream.graph.Node; 037import org.graphstream.stream.ProxyPipe; 038import org.graphstream.stream.Sink; 039import org.graphstream.stream.Source; 040import org.graphstream.stream.SourceBase; 041import org.miv.mbox.CannotPostException; 042import org.miv.mbox.MBox; 043import org.miv.mbox.MBoxListener; 044import org.miv.mbox.MBoxStandalone; 045 046/** 047 * Filter that allows to pass graph events between two threads without explicit 048 * synchronization. 049 * 050 * <p> 051 * This filter allows to register it as an output for some source of events in a 052 * source thread (hereafter called the input thread) and to register listening 053 * outputs in a destination thread (hereafter called the sink thread). 054 * </p> 055 * 056 * <pre> 057 * | 058 * Source ---> ThreadProxyFilter ----> Sink 059 * Thread 1 | Thread 2 060 * | 061 * </pre> 062 * 063 * <p> 064 * In other words, this class allows to listen in a sink thread graph events 065 * that are produced in another source thread without any explicit 066 * synchronization on the source of events. 067 * </p> 068 * 069 * <p> 070 * The only restriction is that the sink thread must regularly call the 071 * {@link #pump()} method to dispatch events coming from the source to all sinks 072 * registered (see the explanation in {@link org.graphstream.stream.ProxyPipe}). 073 * </p> 074 * 075 * <p> 076 * You can register any kind of input as source of event, but if the input is a 077 * graph, then you can choose to "replay" all the content of the graph so that 078 * at the other end of the filter, all outputs receive the complete content of 079 * the graph. This is the default behavior if this filter is constructed with a 080 * graph as input. 081 * </p> 082 * 083 * @deprecated This is the old version of 084 * {@link org.graphstream.stream.thread.ThreadProxyPipe}. 085 */ 086@Deprecated 087public class ThreadProxyPipeOld extends SourceBase implements ProxyPipe, 088 MBoxListener { 089 090 /** 091 * Proxy id. 092 */ 093 protected String id; 094 095 /** 096 * The event sender name, usually the graph name. 097 */ 098 protected String from; 099 100 /** 101 * The message box used to exchange messages between the two threads. 102 */ 103 protected MBox events; 104 105 /** 106 * Used only to remove the listener. We ensure this is done in the source 107 * thread. 108 */ 109 protected Source input; 110 111 /** 112 * Signals that this proxy must be removed from the source input. 113 */ 114 protected boolean unregisterWhenPossible = false; 115 116 /** 117 * New thread proxy pipe with no input. 118 */ 119 public ThreadProxyPipeOld() { 120 this((Source) null); 121 } 122 123 /** 124 * Listen at an input in a given thread and redirect all events to 125 * GraphListeners that may be in another thread. 126 * 127 * @param input 128 * The source of graph events we listen at. 129 */ 130 public ThreadProxyPipeOld(Source input) { 131 this(input, new MBoxStandalone()); 132 } 133 134 /** 135 * Like {@link #ThreadProxyPipe(Source)}, but allow to share the message box 136 * with another message processor. This can be needed to share the same 137 * message stack, when message order is important. 138 * 139 * @param input 140 * The source of events we listen at. 141 * @param sharedMBox 142 * The message box used to send and receive graph messages across 143 * the thread boundary. 144 */ 145 public ThreadProxyPipeOld(Source input, MBox sharedMBox) { 146 this.events = sharedMBox; 147 this.from = "<in>"; 148 this.input = input; 149 150 if (input != null) 151 input.addSink(this); 152 153 ((MBoxStandalone) this.events).addListener(this); 154 } 155 156 /** 157 * Listen at an input graph in a given thread and redirect all events to 158 * GraphListeners that may be in another thread. By default, if the graph 159 * already contains some elements, they are "replayed". This means that 160 * events are sent to mimic the fact they just appeared. 161 * 162 * @param inputGraph 163 * The graph we listen at. 164 */ 165 public ThreadProxyPipeOld(Graph inputGraph) { 166 this(inputGraph, true); 167 } 168 169 /** 170 * Like {@link #ThreadProxyPipe(Graph)} but allow to avoid replaying the 171 * graph. 172 * 173 * @param inputGraph 174 * The graph we listen at. 175 * @param replayGraph 176 * If false, and if the input graph already contains element they 177 * are not replayed. 178 */ 179 public ThreadProxyPipeOld(Graph inputGraph, boolean replayGraph) { 180 this(inputGraph, null, replayGraph); 181 } 182 183 /** 184 * Like {@link #ThreadProxyPipe(Graph,boolean)} but allows to pass an 185 * initial listener, therefore specifying the input and output at once. 186 * 187 * @param inputGraph 188 * The graph we listen at. 189 * @param firstListener 190 * The initial listener to register. 191 * @param replayGraph 192 * If false, and if the input graph already contains element they 193 * are not replayed. 194 */ 195 public ThreadProxyPipeOld(Graph inputGraph, Sink firstListener, 196 boolean replayGraph) { 197 this(inputGraph, firstListener, replayGraph, new MBoxStandalone()); 198 } 199 200 /** 201 * Like {@link #ThreadProxyPipe(Graph,Sink,boolean)}, but allows to share 202 * the message box with another message processor. This can be needed to 203 * share the same message stack, when message order is important. 204 * 205 * @param inputGraph 206 * The graph we listen at. 207 * @param replayGraph 208 * If false, and if the input graph already contains element they 209 * are not replayed. 210 * @param sharedMBox 211 * The message box used to send and receive graph messages across 212 * the thread boundary. 213 */ 214 public ThreadProxyPipeOld(Graph inputGraph, Sink firstListener, 215 boolean replayGraph, MBox sharedMBox) { 216 this.events = sharedMBox; 217 this.from = inputGraph.getId(); 218 this.input = inputGraph; 219 220 if (firstListener != null) 221 addSink(firstListener); 222 223 if (replayGraph) 224 replayGraph(inputGraph); 225 226 input.addSink(this); 227 ((MBoxStandalone) this.events).addListener(this); 228 } 229 230 @Override 231 public String toString() { 232 String dest = "nil"; 233 234 if (attrSinks.size() > 0) 235 dest = attrSinks.get(0).toString(); 236 237 return String.format("thread-proxy(from %s to %s)", from, dest); 238 } 239 240 /** 241 * Ask the proxy to unregister from the event input source (stop receive 242 * events) as soon as possible (when the next event will occur in the 243 * graph). 244 */ 245 public void unregisterFromSource() { 246 unregisterWhenPossible = true; 247 } 248 249 /** 250 * This method must be called regularly in the output thread to check if the 251 * input source sent events. If some event occurred, the listeners will be 252 * called. 253 */ 254 public void pump() { 255 ((MBoxStandalone) events).processMessages(); 256 } 257 258 /* 259 * (non-Javadoc) 260 * 261 * @see org.graphstream.stream.ProxyPipe#blockingPump() 262 */ 263 public void blockingPump() throws InterruptedException { 264 throw new UnsupportedOperationException(); 265 } 266 267 /* 268 * (non-Javadoc) 269 * 270 * @see org.graphstream.stream.ProxyPipe#blockingPump(long) 271 */ 272 public void blockingPump(long timeout) throws InterruptedException { 273 throw new UnsupportedOperationException(); 274 } 275 276 /** 277 * Set of events sent via the message box. 278 */ 279 protected static enum GraphEvents { 280 ADD_NODE, DEL_NODE, ADD_EDGE, DEL_EDGE, STEP, CLEARED, ADD_GRAPH_ATTR, CHG_GRAPH_ATTR, DEL_GRAPH_ATTR, ADD_NODE_ATTR, CHG_NODE_ATTR, DEL_NODE_ATTR, ADD_EDGE_ATTR, CHG_EDGE_ATTR, DEL_EDGE_ATTR 281 }; 282 283 protected void replayGraph(Graph graph) { 284 try { 285 String graphId = "@replay"; 286 287 // Replay all graph attributes. 288 289 if (graph.getAttributeKeySet() != null) 290 for (String key : graph.getAttributeKeySet()) 291 events.post(from, GraphEvents.ADD_GRAPH_ATTR, graphId, 292 sourceTime.newEvent(), key, graph.getAttribute(key)); 293 294 Thread.yield(); 295 296 // Replay all nodes and their attributes. 297 298 for (Node node : graph) { 299 events.post(from, GraphEvents.ADD_NODE, graphId, 300 sourceTime.newEvent(), node.getId()); 301 302 if (node.getAttributeKeySet() != null) 303 for (String key : node.getAttributeKeySet()) 304 events.post(from, GraphEvents.ADD_NODE_ATTR, graphId, 305 sourceTime.newEvent(), node.getId(), key, 306 node.getAttribute(key)); 307 Thread.yield(); 308 } 309 310 // Replay all edges and their attributes. 311 312 for (Edge edge : graph.getEachEdge()) { 313 events.post(from, GraphEvents.ADD_EDGE, graphId, sourceTime 314 .newEvent(), edge.getId(), 315 edge.getSourceNode().getId(), edge.getTargetNode() 316 .getId(), edge.isDirected()); 317 318 if (edge.getAttributeKeySet() != null) 319 for (String key : edge.getAttributeKeySet()) 320 events.post(from, GraphEvents.ADD_EDGE_ATTR, graphId, 321 sourceTime.newEvent(), edge.getId(), key, 322 edge.getAttribute(key)); 323 Thread.yield(); 324 } 325 } catch (CannotPostException e) { 326 System.err 327 .printf("GraphRendererRunner: cannot post message to listeners: %s%n", 328 e.getMessage()); 329 } 330 } 331 332 protected boolean maybeUnregister() { 333 if (unregisterWhenPossible) { 334 if (input != null) 335 input.removeSink(this); 336 return true; 337 } 338 339 return false; 340 } 341 342 public void edgeAttributeAdded(String graphId, long timeId, String edgeId, 343 String attribute, Object value) { 344 if (maybeUnregister()) 345 return; 346 347 try { 348 events.post(from, GraphEvents.ADD_EDGE_ATTR, graphId, timeId, 349 edgeId, attribute, value); 350 } catch (CannotPostException e) { 351 e.printStackTrace(); 352 } 353 } 354 355 public void edgeAttributeChanged(String graphId, long timeId, 356 String edgeId, String attribute, Object oldValue, Object newValue) { 357 if (maybeUnregister()) 358 return; 359 360 try { 361 events.post(from, GraphEvents.CHG_EDGE_ATTR, graphId, timeId, 362 edgeId, attribute, oldValue, newValue); 363 } catch (CannotPostException e) { 364 e.printStackTrace(); 365 } 366 } 367 368 public void edgeAttributeRemoved(String graphId, long timeId, 369 String edgeId, String attribute) { 370 if (maybeUnregister()) 371 return; 372 373 try { 374 events.post(from, GraphEvents.DEL_EDGE_ATTR, graphId, timeId, 375 edgeId, attribute); 376 } catch (CannotPostException e) { 377 e.printStackTrace(); 378 } 379 } 380 381 public void graphAttributeAdded(String graphId, long timeId, 382 String attribute, Object value) { 383 if (maybeUnregister()) 384 return; 385 386 try { 387 events.post(from, GraphEvents.ADD_GRAPH_ATTR, graphId, timeId, 388 attribute, value); 389 } catch (CannotPostException e) { 390 e.printStackTrace(); 391 } 392 } 393 394 public void graphAttributeChanged(String graphId, long timeId, 395 String attribute, Object oldValue, Object newValue) { 396 if (maybeUnregister()) 397 return; 398 399 try { 400 events.post(from, GraphEvents.CHG_GRAPH_ATTR, graphId, timeId, 401 attribute, oldValue, newValue); 402 } catch (CannotPostException e) { 403 e.printStackTrace(); 404 } 405 } 406 407 public void graphAttributeRemoved(String graphId, long timeId, 408 String attribute) { 409 if (maybeUnregister()) 410 return; 411 412 try { 413 events.post(from, GraphEvents.DEL_GRAPH_ATTR, graphId, timeId, 414 attribute); 415 } catch (CannotPostException e) { 416 e.printStackTrace(); 417 } 418 } 419 420 public void nodeAttributeAdded(String graphId, long timeId, String nodeId, 421 String attribute, Object value) { 422 if (maybeUnregister()) 423 return; 424 425 try { 426 events.post(from, GraphEvents.ADD_NODE_ATTR, graphId, timeId, 427 nodeId, attribute, value); 428 } catch (CannotPostException e) { 429 e.printStackTrace(); 430 } 431 } 432 433 public void nodeAttributeChanged(String graphId, long timeId, 434 String nodeId, String attribute, Object oldValue, Object newValue) { 435 if (maybeUnregister()) 436 return; 437 438 try { 439 events.post(from, GraphEvents.CHG_NODE_ATTR, graphId, timeId, 440 nodeId, attribute, oldValue, newValue); 441 } catch (CannotPostException e) { 442 e.printStackTrace(); 443 } 444 } 445 446 public void nodeAttributeRemoved(String graphId, long timeId, 447 String nodeId, String attribute) { 448 if (maybeUnregister()) 449 return; 450 451 try { 452 events.post(from, GraphEvents.DEL_NODE_ATTR, graphId, timeId, 453 nodeId, attribute); 454 } catch (CannotPostException e) { 455 e.printStackTrace(); 456 } 457 } 458 459 public void edgeAdded(String graphId, long timeId, String edgeId, 460 String fromNodeId, String toNodeId, boolean directed) { 461 if (maybeUnregister()) 462 return; 463 464 try { 465 events.post(from, GraphEvents.ADD_EDGE, graphId, timeId, edgeId, 466 fromNodeId, toNodeId, directed); 467 } catch (CannotPostException e) { 468 e.printStackTrace(); 469 } 470 } 471 472 public void edgeRemoved(String graphId, long timeId, String edgeId) { 473 if (maybeUnregister()) 474 return; 475 476 try { 477 events.post(from, GraphEvents.DEL_EDGE, graphId, timeId, edgeId); 478 } catch (CannotPostException e) { 479 e.printStackTrace(); 480 } 481 } 482 483 public void graphCleared(String graphId, long timeId) { 484 if (maybeUnregister()) 485 return; 486 487 try { 488 events.post(from, GraphEvents.CLEARED, graphId, timeId); 489 } catch (CannotPostException e) { 490 e.printStackTrace(); 491 } 492 } 493 494 public void nodeAdded(String graphId, long timeId, String nodeId) { 495 if (maybeUnregister()) 496 return; 497 498 try { 499 events.post(from, GraphEvents.ADD_NODE, graphId, timeId, nodeId); 500 } catch (CannotPostException e) { 501 e.printStackTrace(); 502 } 503 } 504 505 public void nodeRemoved(String graphId, long timeId, String nodeId) { 506 if (maybeUnregister()) 507 return; 508 509 try { 510 events.post(from, GraphEvents.DEL_NODE, graphId, timeId, nodeId); 511 } catch (CannotPostException e) { 512 e.printStackTrace(); 513 } 514 } 515 516 public void stepBegins(String graphId, long timeId, double step) { 517 if (maybeUnregister()) 518 return; 519 520 try { 521 events.post(from, GraphEvents.STEP, graphId, timeId, step); 522 } catch (CannotPostException e) { 523 e.printStackTrace(); 524 } 525 } 526 527 // MBoxListener 528 529 public void processMessage(String from, Object[] data) { 530 // System.err.printf( " %s.msg(%s, %s, %s, %s)%n", from, data[1], 531 // data[2], data[0], data[3] ); 532 if (data[0].equals(GraphEvents.ADD_NODE)) { 533 String graphId = (String) data[1]; 534 Long timeId = (Long) data[2]; 535 String nodeId = (String) data[3]; 536 537 sendNodeAdded(graphId, timeId, nodeId); 538 } else if (data[0].equals(GraphEvents.DEL_NODE)) { 539 String graphId = (String) data[1]; 540 Long timeId = (Long) data[2]; 541 String nodeId = (String) data[3]; 542 543 sendNodeRemoved(graphId, timeId, nodeId); 544 } else if (data[0].equals(GraphEvents.ADD_EDGE)) { 545 String graphId = (String) data[1]; 546 Long timeId = (Long) data[2]; 547 String edgeId = (String) data[3]; 548 String fromId = (String) data[4]; 549 String toId = (String) data[5]; 550 boolean directed = (Boolean) data[6]; 551 552 sendEdgeAdded(graphId, timeId, edgeId, fromId, toId, directed); 553 } else if (data[0].equals(GraphEvents.DEL_EDGE)) { 554 String graphId = (String) data[1]; 555 Long timeId = (Long) data[2]; 556 String edgeId = (String) data[3]; 557 558 sendEdgeRemoved(graphId, timeId, edgeId); 559 } else if (data[0].equals(GraphEvents.STEP)) { 560 String graphId = (String) data[1]; 561 Long timeId = (Long) data[2]; 562 double step = (Double) data[3]; 563 564 sendStepBegins(graphId, timeId, step); 565 } else if (data[0].equals(GraphEvents.ADD_GRAPH_ATTR)) { 566 String graphId = (String) data[1]; 567 Long timeId = (Long) data[2]; 568 String attribute = (String) data[3]; 569 Object value = data[4]; 570 571 sendGraphAttributeAdded(graphId, timeId, attribute, value); 572 } else if (data[0].equals(GraphEvents.CHG_GRAPH_ATTR)) { 573 String graphId = (String) data[1]; 574 Long timeId = (Long) data[2]; 575 String attribute = (String) data[3]; 576 Object oldValue = data[4]; 577 Object newValue = data[5]; 578 579 sendGraphAttributeChanged(graphId, timeId, attribute, oldValue, 580 newValue); 581 } else if (data[0].equals(GraphEvents.DEL_GRAPH_ATTR)) { 582 String graphId = (String) data[1]; 583 Long timeId = (Long) data[2]; 584 String attribute = (String) data[3]; 585 586 sendGraphAttributeRemoved(graphId, timeId, attribute); 587 } else if (data[0].equals(GraphEvents.ADD_EDGE_ATTR)) { 588 String graphId = (String) data[1]; 589 Long timeId = (Long) data[2]; 590 String edgeId = (String) data[3]; 591 String attribute = (String) data[4]; 592 Object value = data[5]; 593 594 sendEdgeAttributeAdded(graphId, timeId, edgeId, attribute, value); 595 } else if (data[0].equals(GraphEvents.CHG_EDGE_ATTR)) { 596 String graphId = (String) data[1]; 597 Long timeId = (Long) data[2]; 598 String edgeId = (String) data[3]; 599 String attribute = (String) data[4]; 600 Object oldValue = data[5]; 601 Object newValue = data[6]; 602 603 sendEdgeAttributeChanged(graphId, timeId, edgeId, attribute, 604 oldValue, newValue); 605 } else if (data[0].equals(GraphEvents.DEL_EDGE_ATTR)) { 606 String graphId = (String) data[1]; 607 Long timeId = (Long) data[2]; 608 String edgeId = (String) data[3]; 609 String attribute = (String) data[4]; 610 611 sendEdgeAttributeRemoved(graphId, timeId, edgeId, attribute); 612 } else if (data[0].equals(GraphEvents.ADD_NODE_ATTR)) { 613 String graphId = (String) data[1]; 614 Long timeId = (Long) data[2]; 615 String nodeId = (String) data[3]; 616 String attribute = (String) data[4]; 617 Object value = data[5]; 618 619 sendNodeAttributeAdded(graphId, timeId, nodeId, attribute, value); 620 } else if (data[0].equals(GraphEvents.CHG_NODE_ATTR)) { 621 String graphId = (String) data[1]; 622 Long timeId = (Long) data[2]; 623 String nodeId = (String) data[3]; 624 String attribute = (String) data[4]; 625 Object oldValue = data[5]; 626 Object newValue = data[6]; 627 628 sendNodeAttributeChanged(graphId, timeId, nodeId, attribute, 629 oldValue, newValue); 630 } else if (data[0].equals(GraphEvents.DEL_NODE_ATTR)) { 631 String graphId = (String) data[1]; 632 Long timeId = (Long) data[2]; 633 String nodeId = (String) data[3]; 634 String attribute = (String) data[4]; 635 636 sendNodeAttributeRemoved(graphId, timeId, nodeId, attribute); 637 } else if (data[0].equals(GraphEvents.CLEARED)) { 638 String graphId = (String) data[1]; 639 Long timeId = (Long) data[2]; 640 641 sendGraphCleared(graphId, timeId); 642 } else { 643 System.err.printf("ThreadProxyFilter : Unknown message %s !!%n", 644 data[0]); 645 } 646 } 647}