home *** CD-ROM | disk | FTP | other *** search
/ Java 1.2 How-To / JavaHowTo.iso / 3rdParty / jbuilder / unsupported / JDK1.2beta3 / SOURCE / SRC.ZIP / java / io / PipedInputStream.java < prev    next >
Encoding:
Java Source  |  1998-03-20  |  8.4 KB  |  285 lines

  1. /*
  2.  * @(#)PipedInputStream.java    1.21 98/03/18
  3.  *
  4.  * Copyright 1995-1997 by Sun Microsystems, Inc.,
  5.  * 901 San Antonio Road, Palo Alto, California, 94303, U.S.A.
  6.  * All rights reserved.
  7.  *
  8.  * This software is the confidential and proprietary information
  9.  * of Sun Microsystems, Inc. ("Confidential Information").  You
  10.  * shall not disclose such Confidential Information and shall use
  11.  * it only in accordance with the terms of the license agreement
  12.  * you entered into with Sun.
  13.  */
  14.  
  15. package java.io;
  16.  
  17. /**
  18.  * A piped input stream can be connected to a piped output stream 
  19.  * to create a communications pipe. The piped input stream is the 
  20.  * receiving end of the pipe. Two threads can communicate by having 
  21.  * one thread send data to a piped output stream and having the other 
  22.  * thread read the data from the connected piped input stream.
  23.  *
  24.  * @author  James Gosling
  25.  * @version 1.21, 03/18/98
  26.  * @see     java.io.PipedOutputStream
  27.  * @since   JDK1.0
  28.  */
  29. public
  30. class PipedInputStream extends InputStream {
  31.     boolean closed = true;
  32.     boolean closedByReader = false;
  33.     boolean connected = false;
  34.  
  35.     /* REMIND: identification of the read and write sides needs to be
  36.        more sophisticated.  Either using thread groups (but what about
  37.        pipes within a thread?) or using finalization (but it may be a
  38.        long time until the next GC). */
  39.     Thread readSide;
  40.     Thread writeSide;
  41.  
  42.     /**
  43.      * The size of the pipe's circular input buffer.
  44.      * @since   JDK1.1
  45.      */
  46.     protected static final int PIPE_SIZE = 1024;
  47.  
  48.     /**
  49.      * The circular buffer into which incoming data is placed.
  50.      * @since   JDK1.1
  51.      */
  52.     protected byte buffer[] = new byte[PIPE_SIZE];
  53.  
  54.     /**
  55.      * The index of the position in the circular buffer at which the 
  56.      * next byte of data will be stored when received from the connected 
  57.      * piped output stream. <code>in<0</code> implies the buffer is empty, 
  58.      * <code>in==out</code> implies the buffer is full
  59.      * @since   JDK1.1
  60.      */
  61.     protected int in = -1;
  62.  
  63.     /**
  64.      * The index of the position in the circular buffer at which the next 
  65.      * byte of data will be read by this piped input stream.
  66.      * @since   JDK1.1
  67.      */
  68.     protected int out = 0;
  69.  
  70.     /**
  71.      * Creates a piped input stream connected to the specified piped 
  72.      * output stream. 
  73.      *
  74.      * @param      src   the stream to connect to.
  75.      * @exception  IOException  if an I/O error occurs.
  76.      */
  77.     public PipedInputStream(PipedOutputStream src) throws IOException {
  78.     connect(src);
  79.     }
  80.  
  81.     /**
  82.      * Creates a piped input stream that is not yet connected to a piped 
  83.      * output stream. It must be connected to a piped output stream, 
  84.      * either by the receiver or the sender, before being used. 
  85.      *
  86.      * @see     java.io.PipedInputStream#connect(java.io.PipedOutputStream)
  87.      * @see     java.io.PipedOutputStream#connect(java.io.PipedInputStream)
  88.      */
  89.     public PipedInputStream() {
  90.     }
  91.  
  92.     /**
  93.      * Connects this piped input stream to a sender. 
  94.      *
  95.      * @param      src   The piped output stream to connect to.
  96.      * @exception  IOException  if an I/O error occurs.
  97.      */
  98.     public void connect(PipedOutputStream src) throws IOException {
  99.     if (connected) {
  100.         throw new IOException("Pipe already connected");
  101.     }
  102.     src.connect(this);
  103.     connected = true;
  104.     }
  105.     
  106.     /**
  107.      * Receives a byte of data.  This method will block if no input is
  108.      * available.
  109.      * @param b the byte being received
  110.      * @exception IOException If the pipe is broken.
  111.      * @since     JDK1.1
  112.      */
  113.     protected synchronized void receive(int b) throws IOException {
  114.     writeSide = Thread.currentThread();
  115.     while (in == out) {
  116.         if ((readSide != null) && !readSide.isAlive()) {
  117.         throw new IOException("Pipe broken");
  118.         }
  119.         /* full: kick any waiting readers */
  120.         notifyAll();    
  121.         try {
  122.             wait(1000);
  123.         } catch (InterruptedException ex) {
  124.         throw new java.io.InterruptedIOException();
  125.         }
  126.     }
  127.     if (in < 0) {
  128.         in = 0;
  129.         out = 0;
  130.     }
  131.     buffer[in++] = (byte)(b & 0xFF);
  132.     if (in >= buffer.length) {
  133.         in = 0;
  134.     }
  135.     }
  136.  
  137.     /**
  138.      * Receives data into an array of bytes.  This method will
  139.      * block until some input is available. 
  140.      * @param b the buffer into which the data is received
  141.      * @param off the start offset of the data
  142.      * @param len the maximum number of bytes received
  143.      * @return the actual number of bytes received, -1 is
  144.      *          returned when the end of the stream is reached. 
  145.      * @exception IOException If an I/O error has occurred. 
  146.      */
  147.     synchronized void receive(byte b[], int off, int len)  throws IOException {
  148.     while (--len >= 0) {
  149.         receive(b[off++]);
  150.     }
  151.     }
  152.  
  153.     /**
  154.      * Notifies all waiting threads that the last byte of data has been
  155.      * received.
  156.      */
  157.     synchronized void receivedLast() {
  158.     closed = true;
  159.     notifyAll();
  160.     }
  161.  
  162.     /**
  163.      * Reads the next byte of data from this piped input stream. The 
  164.      * value byte is returned as an <code>int</code> in the range 
  165.      * <code>0</code> to <code>255</code>. If no byte is available 
  166.      * because the end of the stream has been reached, the value 
  167.      * <code>-1</code> is returned. This method blocks until input data 
  168.      * is available, the end of the stream is detected, or an exception 
  169.      * is thrown. 
  170.      *
  171.      * @return     the next byte of data, or <code>-1</code> if the end of the
  172.      *             stream is reached.
  173.      * @exception  IOException  if the pipe is broken.
  174.      */
  175.     public synchronized int read()  throws IOException {
  176.     if (closedByReader) {
  177.         throw new IOException("InputStream closed");
  178.     }
  179.     int trials = 2;
  180.     while (in < 0) {
  181.         readSide = Thread.currentThread();
  182.         if (closed) { 
  183.         /* closed by writer, return EOF */
  184.         return -1;
  185.         }
  186.         if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
  187.         throw new IOException("Pipe broken");
  188.         }
  189.             /* might be a writer waiting */
  190.         notifyAll();
  191.         try {
  192.             wait(1000);
  193.         } catch (InterruptedException ex) {
  194.         throw new java.io.InterruptedIOException();
  195.         }
  196.      }
  197.     int ret = buffer[out++] & 0xFF;
  198.     if (out >= buffer.length) {
  199.         out = 0;
  200.     }
  201.     if (in == out) {
  202.             /* now empty */
  203.         in = -1;        
  204.     }
  205.     return ret;
  206.     }
  207.  
  208.     /**
  209.      * Reads up to <code>len</code> bytes of data from this piped input 
  210.      * stream into an array of bytes. Less than <code>len</code> bytes 
  211.      * will be read if the end of the data stream is reached. This method 
  212.      * blocks until at least one byte of input is available. 
  213.      *
  214.      * @param      b     the buffer into which the data is read.
  215.      * @param      off   the start offset of the data.
  216.      * @param      len   the maximum number of bytes read.
  217.      * @return     the total number of bytes read into the buffer, or
  218.      *             <code>-1</code> if there is no more data because the end of
  219.      *             the stream has been reached.
  220.      * @exception  IOException  if an I/O error occurs.
  221.      */
  222.     public synchronized int read(byte b[], int off, int len)  throws IOException {
  223.  
  224.     if (b == null) {
  225.         throw new NullPointerException();
  226.     } else if (off < 0 || len < 0 || off + len > b.length) {
  227.         throw new ArrayIndexOutOfBoundsException();
  228.     } else if (len == 0) {
  229.         return 0;
  230.     }
  231.  
  232.         /* possibly wait on the first character */
  233.     int c = read();        
  234.     if (c < 0) {
  235.         return -1;
  236.     }
  237.     b[off] = (byte) c;
  238.     int rlen = 1;
  239.     while ((in >= 0) && (--len > 0)) {
  240.         b[off + rlen] = buffer[out++];
  241.         rlen++;
  242.         if (out >= buffer.length) {
  243.         out = 0;
  244.         }
  245.         if (in == out) {
  246.                 /* now empty */
  247.         in = -1;    
  248.         }
  249.     }
  250.     return rlen;
  251.     }
  252.  
  253.     /**
  254.      * Returns the number of bytes that can be read from this input 
  255.      * stream without blocking. This method overrides the <code>available</code>
  256.      * method of the parent class.
  257.      *
  258.      * @return     the number of bytes that can be read from this input stream
  259.      *             without blocking.
  260.      * @exception  IOException  if an I/O error occurs.
  261.      * @since   JDK1.0.2
  262.      */
  263.   public synchronized int available() throws IOException {
  264.     if(in < 0)
  265.       return 0;
  266.     else if(in == out)
  267.       return buffer.length;
  268.     else if (in > out)
  269.       return in - out;
  270.     else
  271.       return in + buffer.length - out;
  272.   }
  273.     
  274.     /**
  275.      * Closes this piped input stream and releases any system resources 
  276.      * associated with the stream. 
  277.      *
  278.      * @exception  IOException  if an I/O error occurs.
  279.      */
  280.     public void close()  throws IOException {
  281.     in = -1;
  282.     closedByReader = true;
  283.     }
  284. }
  285.