Orbital library

orbital.util
Class StreamMethod

java.lang.Object
  extended by java.lang.Thread
      extended by orbital.util.StreamMethod
All Implemented Interfaces:
java.lang.Runnable, Callback

public abstract class StreamMethod
extends java.lang.Thread
implements Callback

Base class for stream method coroutines that are concurrent and streamed architectural connectors. Stream methods allow "continuation reentrance" for abstract communication connectors that return an iterated stream and might be running concurrent or synchronous. Coroutines also provide an easy way of implementing non-deterministic functions with a multitude of return-values instead of a single distinct return-value. The caller will receive these multiple return-values in the given order.

You can easily use stream method coroutines like with this preorder traversal implementation

 Iterator traversal = new StreamMethod(true) {
     public void runStream() {
         visit(getRoot());
     }
     protected final void visit(Node node) {
         resumedReturn(node);
         for (Iterator i = node.edges(); i.hasNext(); )
             visit((Node) i.next());
     }
 }.apply();
 

Note: This class is not necessarily thread-safe. You should not use an instance of StreamMethod from several threads without explicit synchronization.

Also make sure that the Thread using the iterator of this StreamMethod coroutine has either the same priority as the creator thread of it and therefore the stream method. Or at least provide that the StreamMethod has the higher one. Otherwise in a system with many threads but without priority inheritance or priority ceiling, the StreamMethod might not get a chance to produce new data at all.


The implementation follows a modified producer/consumer pattern. It uses safe suspend and resume techniques, but nevertheless be cautious when obtaining a lock on the monitor of critical system resources within runStream() to prevent deadlocks.

Author:
André Platzer
See Also:
concurrent implementation, UML, Consumer Producer

Nested Class Summary
 
Nested classes/interfaces inherited from class java.lang.Thread
java.lang.Thread.State, java.lang.Thread.UncaughtExceptionHandler
 
Field Summary
 
Fields inherited from class java.lang.Thread
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
 
Constructor Summary
protected StreamMethod()
          Construct this StreamMethod as a synchronous coroutine stream connector.
protected StreamMethod(boolean synchronousConnector)
          Construct this StreamMethod as a coroutine stream connector.
 
Method Summary
 java.util.Iterator apply()
          Call to apply this stream method coroutine.
protected  boolean isSuspended()
          Whether this thread is safely suspended
 void request()
          Request next data forcing resume and wait if necessary.
protected  void resumedReturn(java.lang.Object ret)
          Call this method to return a value from the stream coroutine.
 void run()
          Do not call, directly.
protected abstract  void runStream()
          StreamMethod implementation method.
protected  void safeResume()
          Safely resumes this thread.
protected  void safeSuspend()
          Safely suspends this thread.
 
Methods inherited from class java.lang.Thread
activeCount, checkAccess, countStackFrames, currentThread, destroy, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, stop, suspend, toString, yield
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Constructor Detail

StreamMethod

protected StreamMethod()
Construct this StreamMethod as a synchronous coroutine stream connector.


StreamMethod

protected StreamMethod(boolean synchronousConnector)
Construct this StreamMethod as a coroutine stream connector.

Parameters:
synchronousConnector - true if this StreamMethod should run as a synchronous connector with synchronous method calls running on demand. If false, it is running as an asynchronous connector with an asynchronous thread running in background to collect results.

While asynchronous connectors usually have an advantage in speed, they may need a big buffer for the results, whether they will ever be used or not.

Method Detail

apply

public java.util.Iterator apply()
Call to apply this stream method coroutine.

Note: This method is not necessarily thread-safe. You should not use an instance of StreamMethod from several threads without explicit synchronization.

Returns:
a stream iterator containing the return values of this coroutine.

resumedReturn

protected void resumedReturn(java.lang.Object ret)
Call this method to return a value from the stream coroutine. If not aborted, execution will resume afteron, although some time may have passed in between.

This method can be used to return a multitude of values to the caller.

Parameters:
ret - the return-value to pass to the caller

request

public void request()
Request next data forcing resume and wait if necessary.


runStream

protected abstract void runStream()
StreamMethod implementation method. Can use resumedReturn(Object) to return a value and resume execution, and return to end continuation reentrant execution.


run

public final void run()
Do not call, directly.

Specified by:
run in interface java.lang.Runnable
Overrides:
run in class java.lang.Thread

safeSuspend

protected final void safeSuspend()
Safely suspends this thread. Will apply the next time, checkSuspend() is called.

See Also:
checkSuspend(), Thread.suspend()

safeResume

protected final void safeResume()
Safely resumes this thread.

See Also:
Thread.resume()

isSuspended

protected final boolean isSuspended()
Whether this thread is safely suspended


Orbital library
1.3.0: 11 Apr 2009

Copyright © 1996-2009 André Platzer
All Rights Reserved.