Summary
Communicating between threads is made easy by developing a class that uses the techniques discussed in the first part of this series. Herein, see firsthand how to write such a data channel class, and then create a simple example application that illustrates a real-world implementation of the class. (3,700 words)
By Chuck McManis
n part one of this series of colums, I
showed you how two or more threads in the Java runtime can synchronize with the
wait()
and
notify()
methods in class
Object
. In this installment we will write a class that uses these
techniques to create a communication channel that operates between threads.
The communication channel performs four tasks. It
This model is pretty typical of such designs.
The first of these, opening a named channel, requires that the class be able to share information (the channel registry) between threads. In Java, threads typically run in a shared address space. Subsequently, static fields in a Java class are shared across all instances of that class. You will recall that static instance variables (sometimes called class variables) do not require that an object instance be created to access them. Instead, they may be accessed using the Classname.variable_name syntax. For the purposes of our example, however, the more salient property is that they are visible to all threads.
Let the games begin
To create our
DataChannel
class we start out with this code:
1 package util.comm;
2
3 import java.util.Hashtable;
4 import java.util.Enumeration;
5
6 public class DataChannel {
7
8 private Hashtable TIDs;
9
10 private DataChannel() {
11 TIDs = new Hashtable();
12 }
13
14 private static Hashtable registry = null;
15
16 /** getChannel (writer version) */
17 public static synchronized DataChannel getChannel(String name) {
18 if (registry == null)
19 registry = new Hashtable();
20 DataChannel it = (DataChannel) registry.get(name);
21 if (it == null) {
22 it = new DataChannel();
23 it.myName = name;
24 registry.put(name, it);
25 }
26 return it;
27 }
Line 1 puts this code in the
util.comm
package. There are two advantages to doing this: the classes
for the data channels don't clutter up our main classes directory; and we can
create a "helper" class named
DataItem
that is private to this package. Note that the package statement
must be the first non-comment line in the file.
Lines 3 through 13 are the constructor for a new
DataChannel
object. As you can see, the constructor is declared private.
This declaration makes it impossible for any object, except a
DataChannel
object, to construct a new instance of a
DataChannel
. This is a common technique to restrict the ability to
construct new objects to a static method like the one in line 17.
Line 14 declares the
registry
hash table. Because this declaration is static and private, the
hash table is shared across thread address spaces, but accessible only to the
methods in
DataChannel
.
Finally, in lines 17 through 27, comes the channel-writer version of the
getChannel()
method. As a static method, it is accessed using the syntax:
DataChannel dc = DataChannel.getChannel("name");
Because it is a method in
DataChannel
, it is allowed to manipulate the private
registry
hash table and to call the private constructor defined in the
class. Static methods such as this one that return an object of their own class
type are called factory methods. This term is used because unlike
constructors that simply allocate an object from the heap, factory methods
manufacture one.
Because
getChannel
is synchronized, the Java runtime will guarantee that only one
thread at a time can be executing it. As you can see, if the named channel
doesn't exist, it is created.
Reading is fundamental
The second version of the
getChannel
method creates a reader
DataChannel
. Here are the bits:
27 /** getChannel (reader version) */
28 public static DataChannel getChannel(String name, Thread myTID, int
qSize) {
29 DataChannel it = getItem(name);
30
31 if (qSize == 0)
32 return it;
33 it.TIDs.put(myTID, new DataItem(myTID, qSize));
34 return it;
35 }
This version takes the additional parameters of a thread identifier and a
queue size. It starts by accessing the globally named
DataChannel
using the other
getChannel
method. When that channel is returned, it attaches to it an
instance of a
DataItem
object. This object provides the actual channel for the thread.
Note that every data channel has one data item for every thread that is
interested in receiving data on the channel.
As you may now see, our data channels can have many readers, and they can
also have many writers. Further, there can be several data channels active at
one time. To prevent all of them from attempting to synchronize on one object
(the data channel), we'll create the
DataItem
class to provide a distinct object between each reader and
writer.
The
DataItem
class is defined below; we'll describe it in sections. This
version of
DataItem
is easy to understand, but it is not as flexible as it could be.
1 package util.comm;
2
3 class DataItem {
4 private Thread TID; // The consumer thread
5 private Object q[]; // Queued values for thread
6 private int nQueue; // Number of items in the Queue
7 private int dataStatus; // Status of the queue
8
9 // legal Queue status values
10 final static int NONE = 0; // No data available
11 final static int NEW = 1; // New data available
12 final static int OVERRUN = 2; // Data overrun (queue overflow)
13
14 DataItem(Thread t, int Q) {
15 q = new Object[Q];
16 nQueue = 0;
17 TID = t;
18 dataStatus = NONE;
19 }
20
21 /** shorter version for a queue size of two (2) */
22 DataItem(Thread t) {
23 this(t, 2);
24 }
Again, this class is part of the
util.comm
package. This is declared in line 1. Remember that the package
statement must be the first non-comment line in the file.
Lines 3 through 12 declare the instance variables for this class and some constants used within the methods. These maintain the state in this instance of the data item.
Lines 14 through 24 are the constructors for
DataItem
. Neither are public since they are used only by
DataChannel
, which is also defined in the
util.comm
package. The first constructor allocates a data item for the
referenced thread with a queue size as specified in
Q
. The second simply always allocates a queue size of two, which has been
shown to be a useful default.
25
26 /** insert is the "write" method */
27 synchronized void insert(Object x) {
28 if (nQueue == q.length) {
29 dataStatus = OVERRUN;
30 q[nQueue-1] = x;
31 } else {
32 q[nQueue] = x;
33 dataStatus = NEW;
34 nQueue++;
35 }
36 notify();
37 }
The method
insert()
puts new data into the queue. It is synchronized so the writer
gets the object's monitor before entering. This ensures multiple writers won't
enter the method and potentially corrupt the state of the queue.
Lines 28 through 30 check for data overruns. An overrun occurs when an attempt to insert an object into a full queue is made. This version of the method implements a policy that the previous last value is discarded and the new value is placed at the end of the queue.
Lines 32 through 34 insert the item into the queue, change
dataStatus
, and then update the number of items queued. Finally, in line
36, any threads reading this data are notified that there is data available to
be consumed.
The data is read by
fetch()
below. It is only a bit more complicated than
insert()
.
38
39 /** fetch is the "read" method */
40 synchronized Object fetch() throws
DataChannelOverrun, DataChannelShutdown, DataChannelTimeout {
41 Object r = null;
42
43 /* If nothing is waiting, we sleep. */
44 if (dataStatus == NONE) {
45 try {
46 long was = System.currentTimeMillis();
47 wait(timeout);
48 if ((System.currentTimeMillis() - was) >= timeout)
49 throw new DataChannelTimeout();
50 } catch (InterruptedException e) { dataStatus = SHUTDOWN; }
51 }
52
53 /* Data (or status) has arrived so dispatch it. */
54 switch (dataStatus) {
55 case NONE:
56 break;
57 case NEW:
58 r = q[0];
59 System.arraycopy(q, 1, q, 0, q.length - 1);
60 nQueue--;
61 break;
62 case OVERRUN:
63 dataStatus = NEW; // Next call will get the data
64 throw new DataChannelOverrun();
65 case SHUTDOWN:
66 throw new DataChannelShutdown();
67 }
68 if (nQueue == 0)
69 dataStatus = NONE;
70 return r;
71 }
In line 44 the code checks the value of
dataStatus
and goes to sleep in a wait if there is no data
available. This is similar to the code in the earlier
PingPong
class described in Part 1 of this series ("Synchronizing
threads in Java," April 1996 JavaWorld).
When the thread is notified, it awakens and then checks to see how much time
has passed in case this was a timeout. If it was a timeout it throws a timeout
exception. If the thread awakened because it was sent an
InterruptedException
, it immediately sets the data channel into
"shutdown" mode and throws the
DataChannelShutdown
exception a bit later.
If the state is
NEW
, the code returns the next object in the array and then shuffles the
array to make room for more data. In this example, we use the static method
arraycopy
in the
System
class. Alternatively, we could use a read index and a write index
into the queue.
If the state is
OVERRUN
, the code throws the
DataChannelOverrun
exception. This exception has to be thrown here,
rather than at write time, because Java doesn't currently support posting an
asynchronous exception to a thread. Note that the only tricky thing here is that
the data status changes from
OVERRUN
to
NEW
, so that the next time this function is called, the values in the
queue will be read.
All of the exceptions are a subclass of
DataChannelException
. This is done so that clients can either catch
individual exceptions if they are interested in those conditions, or they can
simply catch
DataChannelException
if they wish to catch all possible exceptions
thrown.
Finally, there are a few convenience methods that we use to monitor the state of the object. These are pretty self explanatory.
63 /** available items to be read */
64 int queueSize() { return nQueue; }
65
66 /** data is waiting check */
67 boolean hasData() { return (nQueue > 0); }
68
69 /** last value posted (doesn't block) */
70 Object lastValue() { return q[0]; }
71
72 synchronized void delete() { dataStatus = SHUTDOWN; notify(); }
73
74 }
However, the last is used by the
releaseChannel
method of the
DataChannel
class. The trick is that the thread reading the channel needs
to be notified that the channel is being shut down, but the thread is asleep
waiting for data that will never come. So in
releaseChannel
, the
DataItem
object associated with the passed thread identifier has its
delete()
method called. This sets the state to shutdown and wakes up the
thread. The next thing the thread will see is a
DataChannelShutdown
exception that it must be prepared to deal with.
Straight is the gate
Using the
DataItem
class is reasonably straightforward. In the
DataChannel
class you will notice there are two methods for writing and
reading data. The first is for writing and is shown below.
37 public synchronized void putValue(Object x) {
38 for (Enumeration e = TIDs.elements(); e.hasMoreElements(); ) {
39 ((DataItem)(e.nextElement())).insert(x);
40 }
41 }
This method,
putValue()
, puts the object reference passed in
x
into the queues of all the data items associated with this channel.
Remember that there is one data-channel object but many data-item objects (one
for each thread that registered interest in this channel.) This method uses an
Enumeration
to enumerate each element in the
TIDs
hash table. As
insert()
is called on each data item, the corresponding thread is sent a
notify and may immediately begin running. Further, if no threads have yet
registered an interest in this data channel, this method simply returns.
Reading values is straightforward as well. That is handled by the
getValue()
method shown here.
42 public Object getValue() throws DataChannelOverrun {
43 DataItem di = (DataItem) TIDs.get(Thread.currentThread());
44 return ((di != null) ? di.fetch() : null);
45 }
This method gets the data item associated with the current thread and
returns it; if the data item has no information in its queue, the thread will
block on the call to
fetch()
. Otherwise, it will return immediately with data from the data
item.
Quadrophenia: building a simple application
Let's take a moment to put together a simple application of these classes. The
example application is completely contrived, of course, but it does demonstrate
how the
DataChannel
class might be used. We'll implement a parallel quadratic
equation computation engine. This engine takes an equation of the form
AX2 + BX + C
and produces outputs for the value of X between 1 and 5.
The helper class in this application is called
Computer
. This class takes two values coming in on two data channels,
applies a mathematical operation to those values, and writes the result out on a
third (result) channel. Here it is.
1 public class Computer implements Runnable {
2
3 public final static int ADD = 1;
4 public final static int SUB = 2;
5 public final static int MUL = 3;
6 public final static int DIV = 4;
7
8 private int op;
9 private Thread tid;
10
11 DataChannel aChannel;
12 DataChannel bChannel;
13 DataChannel outChannel;
14
15 public static Computer create(String inA, String inB,
16
String out, int doOp) {
17
18 if ((doOp < 1) || (doOp > 4))
19 return null;
20
21 Computer result = new Computer();
22 result.tid = new Thread(result);
23 result.tid.setName("Computer ("+out+")");
24 result.outChannel = DataChannel.getChannel(out);
25 result.aChannel = DataChannel.getChannel(inA, result.tid, 4);
27 result.bChannel = DataChannel.getChannel(inB, result.tid, 4);
28 result.op = doOp;
29 return result;
30 }
The factory method
create()
creates a new
Computer
object. It also creates a thread for this object to run in.
32 public void start() { tid.start(); }
33
xx public void stop() {
xx op = 0;
xx Integer i = new Integer(0);
xx /** make sure our thread wakes up. */
xx aChannel.releaseChannel(tid);
xx bChannel.releaseChannel(tid);
xx }
Then there are two methods,
start()
and
stop()
, to turn the computer on and off.
And the computation is done in the computer's
run()
method.
36 public void run() {
37 int a, b;
38
39 while (true) {
40 try {
41 a = ((Integer) aChannel.getValue()).intValue();
42 b = ((Integer) bChannel.getValue()).intValue();
43 } catch (DataChannelOverrun e) {
44 System.out.println("OVERRUN on "+tid);
45 break;
46 }
47 System.out.print(tid.getName()+" : ");
48 switch (op) {
49 case 0:
50 break;
51 case ADD:
52 System.out.println("ADD "+a+" + "+b+" -> "+(a+b));
53 outChannel.putValue(new Integer(a+b));
54 break;
...
... same for SUB, MUL, and DIV (check for divide by 0!) ...
...
75 }
76 }
77 }
78 }
For each operation, the class prints out a message telling you what operation it is performing. This is useful for watching the threads run on the console when you are running the application.
The primary application class is called
Quadratic
and takes advantage of the fact that the standard-form
quadratic equation can be broken up into postfix notation.
Given:
AX2 + BX + C
Equation 1
We can write it in postfix notation as:
And diagramming the intermediate results gives us the following parse
tree:
((X X *) A *) (B X *) +) C +)
Equation 2
( X X * ) ( B X * ) Figure 1
| |
V |
( R1 A * ) |
| |
V V
( R2 R3 + )
|
V
( R4 C + )
|
V
Answer
As you can see there are five intermediate results including the answer.
Thus, we'll need a total of five
Computer
objects to compute the answer to the equation.
Here is the
Quadratic
class.
1 import util.*;
2 public class Quadratic {
3 public static void main(String args[]) {
4 DataChannel X = DataChannel.getChannel("X");
5 DataChannel X2 = DataChannel.getChannel("X2");
6 DataChannel A = DataChannel.getChannel("a");
7 DataChannel B = DataChannel.getChannel("b");
8 DataChannel C = DataChannel.getChannel("c");
9 DataChannel answer = DataChannel.getChannel("res5",
10
Thread.currentThread(), 2);
First we allocate data channels for all of the constants and intermediate results:
11 Computer res[] = new Computer[5];
Next we allocate five computers for the computation:
13 res[0] = Computer.create("X", "X2", "res1", Computer.MUL);
14 res[1] = Computer.create("X", "b", "res2", Computer.MUL);
15 res[2] = Computer.create("res1", "a", "res3", Computer.MUL);
16 res[3] = Computer.create("res3", "res2", "res4", Computer.SUB);
17 res[4] = Computer.create("res4", "c", "res5", Computer.ADD);
Note that we could not use "X" twice in the first allocation because there is no way for the thread reader in a data item to distinguish two registrations for the same channel by the same thread.
All five computers are allocated, so start them up.
18 /* Now start them up. */
19 for (int i = 0; i < res.length; i++) res[i].start();
And now, for five iterations, feed the computers values in reverse
order to maximize the delay time between starting threads and thus increase the
chance we will see two running at the same time on the output.
21 /* Now write in the values. */
22 for (int i = 0; i < 5; i++) {
23 C.putValue(new Integer(10));
24 B.putValue(new Integer(5));
25 A.putValue(new Integer(3));
26 X.putValue(new Integer(i));
27 X2.putValue(new Integer(i));
Compute the same value to check our results.
28 int check = ((i * i) * 3) - (5 * i) + 10;
And then wait for the result to appear in the answer data channel.
29 try {
30 System.out.println("X = "+i+", result = "+
31 ((Integer) answer.getValue())+
32
" ["+check+"]");
33 } catch (DataChannelOverrun e) {
34 System.out.println("Overrun on the answer!");
35 }
36 }
Finally, shut down the computer threads we have allocated by telling them
to stop.
37 for (int i = 0; i < res.length; i++) res[i].stop();
43 try {
44 Thread.currentThread().sleep(100);
45 } catch (InterruptedException e) { }
46 }
47 }
And exit the
main()
method.
When this runs you will see an output that is similar to this:
Computer (res2) : MUL 0 * 5 -> 0
Computer (res1) : MUL 0 * 0 -> 0
Computer (res3) : MUL 0 * 3 -> 0
Computer (res4) : SUB 0 - 0 -> 0
Computer (res5) : ADD 0 + 10 -> 10
X = 0, result = 10 [10]
Computer (res2) : MUL 1 * 5 -> 5
Computer (res1) : MUL 1 * 1 -> 1
Computer (res3) : MUL 1 * 3 -> 3
Computer (res4) : SUB 3 - 5 -> -2
Computer (res5) : ADD -2 + 10 -> 8
X = 1, result = 8 [8]
Computer (res2) : MUL 2 * 5 -> 10
Computer (res1) : MUL 2 * 2 -> 4
Computer (res3) : MUL 4 * 3 -> 12
Computer (res4) : SUB 12 - 10 -> 2
Computer (res5) : ADD 2 + 10 -> 12
X = 2, result = 12 [12]
Computer (res2) : MUL 3 * 5 -> 15
Computer (res1) : MUL 3 * 3 -> 9
Computer (res3) : MUL 9 * 3 -> 27
Computer (res4) : SUB 27 - 15 -> 12
Computer (res5) : ADD 12 + 10 -> 22
X = 3, result = 22 [22]
Computer (res2) : MUL 4 * 5 -> 20
Computer (res1) : MUL 4 * 4 -> 16
Computer (res3) : MUL 16 * 3 -> 48
Computer (res4) : SUB 48 - 20 -> 28
Computer (res5) : ADD 28 + 10 -> 38
X = 4, result = 38 [38]
As you can see, our parallel computer got the same answer that the code
got the old fashioned way.
Again, I stress that this is a contrived example and of reasonable
illustrative value but limited usefulness. The real power of
DataChannel
comes to light when combined with applets on a Web page. And
that is the subject of the next part of this series.
About the author
Chuck McManis is currently the Director of Technology at
GolfWeb Inc. (http://www.golfweb.com), a
Web magazine devoted to the game of golf, where he develops technologies that
make the presentation of the magazine interactive, compelling, and enjoyable.
Before joining GolfWeb, he was a member of the Java group. He joined the Java
group just after the formation of FirstPerson Inc. and was a member of the
portable OS group (the group responsible for the OS portion of Java). Later,
when FirstPerson was dissolved, he stayed with the group through the development
of the alpha and beta versions of the software. He was responsible for creating
the Java version of the Sun home page in May 1995. He also developed a
cryptographic library for Java and versions of the Java class loader that could
screen classes based on Digital Signatures. Before joining FirstPerson, Chuck
worked in the Operating Systems area of SunSoft developing networking
applications, where he did the initial design of NIS+.