12.9 Kommunikation zwischen Threads mit Pipes
Die Kommunikation zwischen Programmen kann auf vielfältige Weise geschehen. Eine Möglichkeit, die wir bei den Threads kennen gelernt haben, sind statische Variablen oder gemeinsame Datenstrukturen. Bei getrennten Programmen lässt sich die Kommunikation über Dateien realisieren. Auch Datenströme können von einem Teil geschrieben und vom anderen gelesen werden. Wenn wir aber mit Threads arbeiten, wäre eine Kommunikation über Dateien zwar denkbar, aber zu aufwändig. Ein anderes Stromkonzept ist praktisch.
12.9.1 PipedOutputStream und PipedInputStream
Einfacher ist der Austausch der Daten über die speziellen Stream-Klassen PipedOutputStream und PipedInputStream, die eine so genannte Pipe bilden. Damit können zwei Threads über Byteströme Informationen austauschen. PipedOuputStream ist eine Unterklasse von OutputStream und kann daher zu allen Ausgabeströmen wie DataOutputStream ausgebaut werden. Das Gleiche gilt für PipedInputStream. Eine Pipe zwischen zwei Threads wird durch die Verbindung eines PipedOutputStream mit einem PipedInputStream eingerichtet und umgekehrt. Hierzu gibt es mehrere Varianten. Beide Konstruktoren gibt es in doppelter Ausführung: entweder als Standard-Konstruktor oder als Konstruktor, der den jeweils anderen Stream aufnimmt.
Beispiel Verbinde den Eingabe-Stream pis mit dem Ausgabe-Stream pos
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream( pis );
|
Werden jetzt Daten produziert und in den pos gepackt, dann werden sie nach pis geschickt. Da wir den PipedOutputStream mit einem Eingabestrom verbunden haben, ist ein einseitiger Verbindungskanal aufgebaut. An pis lassen sich mit read() die Daten entnehmen.
Hier klicken, um das Bild zu Vergrößern
Bei einer bidirektionalen Verbindung müssen wir natürlich beide Seiten miteinander verbinden. Die Klassen PipedOutputStream und PipedInputStream bieten eine Methode connect() an, mit der nachträglich das zweite Paar gebildet werden kann.
Beispiel Ein PipedOutputStream soll mit einem PipedInputStream doppelseitig verbunden werden.
|
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream();
po.connect( pis );
pi.connect( pos );
Interna
Die Daten, die vom PipedOutputStream mittels write() geschrieben werden, gelangen direkt ohne Pufferung zum Eingabestrom. Werfen wir einen kurzen Blick auf die relevanten Teile der Implementierung:
class PipedOutputStream extends OutputStream
{
private PipedInputStream sink;
public PipedOutputStream( PipedInputStream snk )
throws IOException
{
// Fehlerbehandlung
sink = snk;
snk.in = -1;
snk.out = 0;
snk.connected = true;
}
public void write(int b) throws IOException
{
if (sink == null)
throw new IOException("Pipe not connected");
sink.receive(b);
}
}
Der Eingabestrom ist etwas anders konstruiert, denn er nutzt intern einen Puffer von 1.024 Zeichen (512 bei Kaffe). Das bedeutet, der Schreibende kann bis zu 1.024 Bytes produzieren, bis die Kommunikation stoppen muss. Denn mit dieser Größe ist der Puffer voll, und der Lesende muss den Puffer leeren, damit der Konsument wieder etwas produzieren kann. Erst wenn der Puffer entleert wurde, kann es weitergehen. Umgekehrt heißt das, dass der lesende Thread bei ungenügend vielen Zeichen warten muss, bis der Schreiber die nötige Anzahl hinterlegt hat. Dafür wird intern mittels Thread-Synchronisation gearbeitet.
12.9.2 PipedWriter und PipedReader
Die Klassen PipedWriter und PipedReader sind die char-Varianten für die sonst Byte-orientierten Klassen PipedOutputStream und PipedInputStream. Diese sollen uns für ein Beispiel dienen. Zwei Threads arbeiten miteinander und tauschen Daten aus. Der eine Thread produziert Zufallszahlen, die ein anderer auf dem Bildschirm darstellt.
Listing 12.25 PipeDemo.java
import java.io.*;
class RandomWriter extends Thread
{
private PipedWriter out;
RandomWriter()
{
out = new PipedWriter();
}
PipedWriter getPipedWriter()
{
return out;
}
public void run()
{
PrintWriter pw = new PrintWriter( out );
for ( int i=0; i<10; i++ )
pw.println( Math.random() );
}
}
class RandomReader extends Thread
{
private PipedReader in;
RandomReader( PipedWriter out ) throws IOException
{
in = new PipedReader( out );
}
public void run()
{
BufferedReader br = new BufferedReader( in );
while ( true )
try
{
System.out.println( br.readLine() );
} catch ( IOException e ) { }
}
}
public class PipeDemo
{
public static void main( String args[] ) throws Exception
{
RandomWriter rw = new RandomWriter();
RandomReader rr = new RandomReader( rw.getPipedWriter() );
rr.start();
rw.start();
}
}
|