/*
			  CARNEGIE MELLON UNIVERSITY
			    NON-EXCLUSIVE END-USER
			  SOFTWARE LICENSE AGREEMENT

    
				  RETSINA(tm)
    Reusable Environment for Task Structured Intelligent Network Agents(tm)

IMPORTANT: PLEASE READ THIS  SOFTWARE LICENSE AGREEMENT ("AGREEMENT") CAREFULLY.

          LICENSE

The CMU Software, together with  any fonts accompanying this Agreement, whether
on  disk,  in read  only  memory  or  any other  media  or  in any  other  form
(collectively,  the  "CMU Software")  is  never  sold.   It is  non-exclusively
licensed  by Carnegie  Mellon University  ("CMU") to  you solely  for  your own
internal, non-commercial research purposes on the terms of this Agreement.  CMU
retains the ownership of the CMU  Software and any subsequent copies of the CMU
Software. The CMU Software and any copies made under this Agreement are subject
to this Agreement.

          YOU MAY:

          1. LOAD and USE the CMU Software as long as the  CMU Software is only
          used on one (1) computer by one (1) user at a time. This license does
          not allow the CMU Software to exist on more than one (1)  computer at
          a time or on a computer  network,  including  without  limitation  an
          intranet network or a local area network.

          2. USE the CMU Software solely for your own internal,  non-commercial
          research purposes. 

          3. COPY the CMU Software for back-up purposes only. You may make  one
          (1) copy of the CMU Software in  machine-readable  form  for  back-up
	  purposes.  The  back-up  copy  must  contain  all  copyright  notices
	  contained in the original CMU Software.

          4. TERMINATE this Agreement by destroying the original and all copies
          of the CMU Software in whatever form.

          YOU MAY NOT:

          1. Assign, delegate or  otherwise  transfer  the  CMU  Software,  the
	  license (including this Agreement),  or  any  rights  or  obligations
	  hereunder or thereunder, to another person or entity.  Any  purported
	  assignment, delegation or transfer in  violation  of  this  provision
	  shall be void.

          2. Loan, distribute,  rent,  lease,  give,  sublicense  or  otherwise
          transfer the CMU Software (or any copy of the CMU Software), in whole
          or in part, to any other person or entity.

          3. Copy, alter, translate, decompile, disassemble,  reverse  engineer
          or create derivative works from the CMU Software, including  but  not
          limited to, modifying the CMU Software to make  it  operate  on  non-
	  compatible hardware.

          4. Remove, alter or cause not to be displayed, any copyright  notices
          or startup messages contained in the CMU Software.

          5. Export the CMU Software or the product components in violation  of
	  any United States export laws.

Title to the CMU Software,  including the ownership of all copyrights, patents,
trademarks  and  all  other  intellectual  property rights  subsisting  in  the
foregoing, and all  adaptations to and modifications of  the foregoing shall at
all times remain with CMU. CMU  retains all rights not expressly licensed under
this Agreement. The CMU  Software, including any images, graphics, photographs,
animation, video, audio, music and text incorporated therein is owned by CMU or
its  suppliers   and  is  protected   by  United  States  copyright   laws  and
international  treaty provisions.   Except as  otherwise expressly  provided in
this  Agreement,  the copying,  reproduction,  distribution  or preparation  of
derivative works  of the CMU Software  is strictly prohibited by  such laws and
treaty provisions.   Nothing in  this Agreement constitutes  a waiver  of CMU's
rights under United States copyright law.

This Agreement and your rights are  governed by the laws of the Commonwealth of
Pennsylvania.  If  for any reason a  court of competent  jurisdiction finds any
provision  of this  Agreement, or  portion  thereof, to  be unenforceable,  the
remainder of this Agreement shall continue in full force and effect.

THIS LICENSE SHALL TERMINATE AUTOMATICALLY if you fail to comply with the terms
of this Agreement.

          DISCLAIMER OF WARRANTY ON CMU SOFTWARE

You expressly  acknowledge and agree  that your use  of the CMU Software  is at
your sole risk.

THE CMU SOFTWARE IS PROVIDED "AS IS"  AND WITHOUT WARRANTY OF ANY KIND, AND CMU
EXPRESSLY  DISCLAIMS ALL  WARRANTIES, EXPRESS  OR IMPLIED,  INCLUDING,  BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
PURPOSE AND NON-INFRINGEMENT OF THIRD PARTY  RIGHTS.  THE ENTIRE RISK AS TO THE
QUALITY AND PERFORMANCE  OF THE CMU SOFTWARE IS BORNE  BY YOU.  THIS DISCLAIMER
OF WARRANTIES, REMEDIES AND LIABILITY  ARE FUNDAMENTAL ELEMENTS OF THE BASIS OF
THE AGREEMENT BETWEEN  CMU AND YOU.  CMU  WOULD NOT BE ABLE TO  PROVIDE THE CMU
SOFTWARE WITHOUT SUCH LIMITATIONS.

          LIMITATION OF LIABILITY

THE  CMU  SOFTWARE  IS  BEING  PROVIDED  TO  YOU  FREE  OF  CHARGE.   UNDER  NO
CIRCUMSTANCES, INCLUDING  NEGLIGENCE, SHALL CMU  BE LIABLE UNDER ANY  THEORY OR
FOR  ANY  DAMAGES INCLUDING,  WITHOUT  LIMITATION,  DIRECT, INDIRECT,  GENERAL,
SPECIAL, CONSEQUENTIAL, INCIDENTAL, EXEMPLARY  OR OTHER DAMAGES) ARISING OUT OF
THE USE OF OR  INABILITY TO USE THE CMU SOFTWARE OR  OTHERWISE RELATING TO THIS
AGREEMENT (INCLUDING, WITHOUT LIMITATION, DAMAGES FOR LOSS OF BUSINESS PROFITS,
BUSINESS  INTERRUPTION, LOSS  OF BUSINESS  INFORMATION OR  ANY  OTHER PECUNIARY
LOSS), EVEN  IF CMU HAS BEEN ADVISED  OF THE POSSIBILITY OF  SUCH DAMAGES. SOME
JURISDICTIONS  DO  NOT ALLOW  THE  EXCLUSION  OR  LIMITATION OF  INCIDENTAL  OR
CONSEQUENTIAL DAMAGES SO THIS LIMITATION MAY NOT APPLY TO YOU.

          ADDITIONAL PROVISIONS YOU SHOULD BE AWARE OF

This Agreement constitutes  the entire agreement between you  and CMU regarding
the CMU  Software and supersedes any prior  representations, understandings and
agreements, either  oral or  written. No amendment  to or modification  of this
Agreement will be binding unless in writing and signed by CMU.

          U.S. GOVERNMENT RESTRICTED RIGHTS

If the CMU Software or any accompanying documentation is used or acquired by or
on behalf of any unit, division or agency of the United States Government, this
provision  applies.  The  CMU Software  and any  accompanying  documentation is
provided with RESTRICTED RIGHTS.  The use, modification, reproduction, release,
display,  duplication  or disclosure  thereof  by or  on  behalf  of any  unit,
division or agency  of the Government is subject to  the restrictions set forth
in  subdivision (c)(1)  of the  Commercial Computer  Software-Restricted Rights
clause at  48 CFR  52.227-19 and the  restrictions set  forth in the  Rights in
Technical   Data-Non-Commercial   Items   clause    set   forth   in   48   CFR
252.227-7013. The contractor/manufacturer of  the CMU Software and accompanying
documentation is  Carnegie Mellon  University, 5000 Forbes  Avenue, Pittsburgh,
Pennsylvania 15213, U.S.A.
*/
/* Copyright (c) 1998, Dirk Kalp, Katia Sycara, RETSINA Project,
                       Carnegie Mellon University
   All rights reserved
*/

/*----------------------------------------------------------------------------
  HISTORY:
    Spring/Summer 1998 Dirk Kalp (kalp@cs.cmu.edu)
    Created the module.

    Dec 03 1998 Dirk Kalp (kalp@cs.cmu.edu)
    Added this HISTORY log. 

    Dec 10 1998 Dirk Kalp (kalp@cs.cmu.edu)
    Fixed bug in constructor used to get connection to a server agent.
    The targetLocation returned by ANS lookup needed to be checked for
    null.
----------------------------------------------------------------------------*/


package EDU.cmu.softagents.retsina.Communicator;

import  EDU.cmu.softagents.misc.ANS.ANSClient.*;
import  EDU.cmu.softagents.util.*;
import  java.io.*;
import  java.net.*;
import  java.util.*;


/** A Connection supports communication between the agent and a target
    entity (usually some other agent). The role of the connection
    depends on whether the agent is acting as a client requesting
    service from the target or is acting as a server providing service
    to the client target. A secondary distinction is made for Connections
    used when the agent is acting as client. The Connection can be an
    exclusive one dedicated to a single request (or specific set of
    requests) to the server, or the Connection can be a shared one in
    which any arbitrary number of requests to the server can be sent
    over it. A shared Connection has a reference count indicating the
    number of active users of the Connection. Since our agents are
    multithreaded and more than 1 top-level task can be executing
    concurrently, several Actions may be simultaneously using the same
    server and thus the same Connection.

    Connections are never handed out beyond the Communicator. But
    Connections to servers have associated descriptors which may be
    handed out. These ConnectionDescriptors are needed by the agent
    when acting as a client so he can properly route msgs being sent
    out to the correct server. The agent in this case is the active
    entity initiating a request to a server and managing the followup
    conversation with the server on behalf of the request. When the
    agent is in the server role, it is the passive entity responding
    to a request received and sending replies back over the same
    Connection that was automatically created in the Communicator in
    repsonse to the incoming client request msg. The identity of this
    Connection is held in the InternalMsg of the request (it is just
    the  triple) and is passed
    back into the Communicator in the reply msg to be sent back to the
    client. Thus ConnectionDescriptors are not needed by the agent in
    this passive role. 
*/
public class Connection extends Thread {

  /** Provide a top level inner class to define a new exception.
  */
  public static class ConnectionException extends Exception {
    ConnectionException(String msg) {
      super(msg);
    }
  }
  

  public static /*final*/ boolean DEBUG = true;
  public static /*final*/ int SLEEPTICKS = 100;

  // The connectionType will be set to indicate the role the Connection
  // plays for the agent.
  private static final String CONNECTION_TO_CLIENT =
                                "connects agent to a client";
  private static final String CONNECTION_TO_SERVER =
                                "connects agent to a server";

  // I used these old designations before. They were confusing so I've
  // replaced them with the above ones.
  //private static final String SERVER_CONNECTION = "agent acting as server";
  //private static final String CLIENT_CONNECTION = "agent acting as client";


  // Build the connectionName from the triple:
  //   
  // as
  //   "****"
  // or, for ACCEPTED_CONNECTIONs, from the vector:
  //   
  // as
  //   "******"
  private String connectionName;
  private String clientName;
  private String serverName;
  private int    localport;
  private long   acceptCount;

  private String connectionType;  // CONNECTION_TO_{CLIENT,SERVER}
  private boolean connectionOpen;
  private int referenceCount;     // Only for CONNECTION_TO_SERVER - counts
                                  // number of different conversations active 
                                  // on the connection, i.e., this agent, as a
                                  // client, may have a number of different
                                  // requests active to the same server.
  // The initial reference count on the connection should be 1. But,
  // for debugging purposes, we might like to make it 2 so that
  // CONNECTION_TO_SERVERS are never deleted. After the system has been
  // debugged, we can initialize properly with 1.
  // NOTE: Made this a public static and not final for now so that the
  //       BasicAgent might easily be able to set it to 2 for debugging.
  public static /*final*/ int INITIAL_REF_COUNT = 1;
  // Reference count is not used on a CONNECTION_TO_CLIENT.
  private static final int UNUSED_REF_COUNT = -1;

  // These are used to delay a close on a CONNECTION_TO_SERVER
  // until all msgs pending in the dispatchQueue are actually
  // sent. We can't have the socket get closed before the actual
  // msgs get sent.
  private int numMsgsInDispatchQueue = 0;
  private boolean closeWhenMsgsDrainFromDispatchQueue = false;

  private String agentName;    // Sometimes I'm a client, sometimes a server.
  private Socket agentSocket;

  private String targetName;   // Sometimes he's a client, sometimes a server.
  private String targetHost;
  private int    targetPort;
  private Location targetLocation;  // ANS lookup spits out this.

  private BufferedInputStream socketInput;
  private PrintWriter socketOutput;

  // These constants are used to set the usage field.
  private static final int UNDEFINED_CONNECTION = 0;  
  private static final int SHARED_CONNECTION    = 1;  
  private static final int EXCLUSIVE_CONNECTION = 2;  
  private static final int ACCEPTED_CONNECTION  = 3;  // needed?

  private int usage = UNDEFINED_CONNECTION;

  // An EXCLUSIVE_CONNECTION or ACCEPTED_CONNECTION will have a
  // single descriptor associated with it.
  private ConnectionDescriptor connectionDescriptor = null;

  // A SHARED_CONNECTION will have a list of
  // descriptors associated with it.
  private Vector connectionDescriptorList = null;


  // Set this false when Connection dies or has an error.
  private boolean isAliveAndWell;

  private GenericQueue incomingQueue;  // All msgs received are put here.
  private ANSClientInterface ansComm;
  //private ConnectionTable connectionTable; should not be needed anymore


  /** Use this constructor when agent is acting as a server and is thus
      establishing a Connection to a client. This constructor is used
      only by PollIncoming when a connection request from a client is
      accepted by the agent.
  */
  protected Connection(String agentName,
		       Socket agentSocket,
		       long acceptCount,
		       GenericQueue incomingQueue) throws
                                                     ConnectionException
  {
    connectionType = CONNECTION_TO_CLIENT;
    referenceCount = UNUSED_REF_COUNT;

    this.agentName = agentName;
    this.agentSocket = agentSocket;
    this.acceptCount = acceptCount;

    // Won't know the client name until we read first msg from socket.
    // Thus we can't construct the connectionName and enter the
    // connection into the ConnectionTable until then. 
    clientName = targetName = null;
    serverName = agentName;
    localport = agentSocket.getLocalPort();
    connectionName = null;
    connectionDescriptor = null;
    targetHost = agentSocket.getInetAddress().getHostName();
    targetPort = agentSocket.getPort();
    
    //this.connectionTable = connectionTable; no longer needed
    this.incomingQueue = incomingQueue;
    this.ansComm = Communicator.getANScomm();

    try {
      socketInput = new BufferedInputStream(agentSocket.getInputStream());
      socketOutput = new PrintWriter(agentSocket.getOutputStream(), true);
    }
    catch (IOException e) {
      try {agentSocket.close();} catch (IOException e1) {}
      throw new ConnectionException("Connection.constructor: IOException " +
				    "while opening socket streams: " +
				    e.getMessage());
    }
    
    isAliveAndWell = true;
    connectionOpen = true;

    this.start();
  }


  /** Use this constructor when agent is acting as a client and thus
      establishing a Connection to a server agent. This
      constructor would be called when agent initiates a service
      request asking for a ConnectionDescriptor for a Connection
      to a server. The desired Connection could be either shared or
      exclusive and is typically made by calling the static methods,
      newConnectionToServer() or newExclusiveConnectionToServer(),
      respectively to open a new Connection. This constructor is usually
      called directly by only those 2 methods.
  */
  protected Connection(String agentName,
		       String targetName,
		       int usage,
		       GenericQueue incomingQueue) throws
                                                     ConnectionException
  {
    connectionType = CONNECTION_TO_SERVER;
    referenceCount = INITIAL_REF_COUNT;

    this.agentName = agentName;

    clientName = agentName;
    serverName = targetName;
    this.targetName = targetName;

    this.usage = usage;
    // this.connectionTable = connectionTable; no longer needed
    this.incomingQueue = incomingQueue;
    this.ansComm = Communicator.getANScomm();

    // Look up the target in the ANS and create a socket.
    try {
      Location targetLocation = ansComm.lookUp(targetName);
      if ((targetLocation != null) && (targetLocation.isValid())) {
	targetHost = targetLocation.getHostName();
	targetPort = targetLocation.getPort();
        agentSocket = new Socket(targetHost, targetPort);
	if (DEBUG) {
	  System.out.println("Connection.constructor: Connecting to target " +
			     targetName + " on host " + targetHost +
			     " at port " + targetPort);
	}
      } else {
	throw new ConnectionException("Connection.constructor: ANS lookup " +
				      "of target " + targetName + " failed.");
      }
    }
    catch (NoANSfoundException e1) {
      throw new ConnectionException("Connection.constructor: ANS not found. " +
				    "NoANSfoundException: " + e1.getMessage());
    }
    catch (UnknownHostException e2) {
      throw new ConnectionException("Connection.constructor: Host " +
				    targetHost + " unknown when creating " +
				    "socket for connection " +
				    connectionName + ". " + 
				    "UnknownHostException: " +e2.getMessage());
    }
    catch (IOException e3) {
      throw new ConnectionException("Connection.constructor: Could not " +
				    "create socket to host " +
				    targetHost + " for connection " +
				    connectionName + ". " + 
				    "IOException: " +e3.getMessage());
    }

    try {
      socketInput = new BufferedInputStream(agentSocket.getInputStream());
      socketOutput = new PrintWriter(agentSocket.getOutputStream(), true);
    }
    catch (IOException e) {
      try {agentSocket.close();} catch (IOException e1) {}
      throw new ConnectionException("Connection.constructor: IOException " +
				    "while opening socket streams: " +
				    e.getMessage());
    }
    
    localport = agentSocket.getLocalPort();
    connectionName =
      buildConnectionName(clientName, serverName, Integer.toString(localport));
    Communicator.connectionTable.addConnection(this);
    if (usage == SHARED_CONNECTION) {
      Communicator.sharedConnectionToServerTable.addConnection(this);
      connectionDescriptorList = new Vector();  // Caller will add a cd.
    } else {
      connectionDescriptor = null;  // Caller will fill in.
    }

    isAliveAndWell = true;
    connectionOpen = true;

    numMsgsInDispatchQueue = 0;
    closeWhenMsgsDrainFromDispatchQueue = false;

    this.start();
  }


  /** This static method knows how to compose a connection name from
      the triple  associated with
      a Connection. This connection name is used to enter and lookup
      the Connection in the Communicator's connectionTable.
  */
  protected static String buildConnectionName(String clientName,
					      String serverName,
					      String localport)
  {
    return clientName + "**" + serverName + "**" + localport;
  }


  /** This static method knows how to compose a connection name from
      the vector 
      associated with a Connection. This connection name is used to enter
      and lookup the Connection in the Communicator's connectionTable.
  */
  protected static String buildConnectionName(String clientName,
					      String serverName,
					      String localport,
					      String acceptCount)
  {
    return clientName + "**" + serverName + "**" + localport + "**" + acceptCount;
  }


  /** This static method can be used to create a new Connection to a server
      that may be shared. The shared Connection will support a reference
      count to determine how many distinct requests to a server over
      the Connection are open and active.
  */
  protected static Connection newSharedConnectionToServer(String serverName)
  {
    try {
      Connection conn = new Connection(Communicator.getAgentName(),
				       serverName,
				       SHARED_CONNECTION,
				       Communicator.getIncomingQueue());
      return conn;
    }
    catch (ConnectionException e) {
      System.out.println("Connection.newSharedConnectionToServer: " +
			 "Could not geta shared Connection: " +
			 e.getMessage());
      return null;
    }
  }


  /** This static method can be used to create a new Connection to a server
      that is not shared. The exclusive Connection is dedicated to a
      single request (or set of requests) and not just any number of
      arbitrary requests to the server.
  */
  protected static Connection newExclusiveConnectionToServer(String serverName)
  {
    try {
      Connection conn = new Connection(Communicator.getAgentName(),
				       serverName,
				       EXCLUSIVE_CONNECTION,
				       Communicator.getIncomingQueue());
      return conn;
    }
    catch (ConnectionException e) {
      System.out.println("Connection.newExclusiveConnectionToServer: " +
			 "Could not get an exclusive Connection: " +
			 e.getMessage());
      return null;
    }
  }


  protected void associateDescriptor(ConnectionDescriptor cd) {
    if (usage == SHARED_CONNECTION)
      connectionDescriptorList.addElement(cd);
    else
      connectionDescriptor = cd;
  }


  protected ConnectionDescriptor getDescriptor() {
    return connectionDescriptor;
  }      
  protected Vector getDescriptorList() {
    return connectionDescriptorList;
  }      

  protected String getConnectionName() {return connectionName;}
  protected String getClientName() {return clientName;}
  protected String getServerName() {return serverName;}
  protected int    getLocalport() {return localport;}

  protected boolean isConnectionToServer() {
    return connectionType.equals(CONNECTION_TO_SERVER);
  }

  protected boolean isConnectionToClient() {
    return connectionType.equals(CONNECTION_TO_CLIENT);
  }

  protected boolean isSharedConnectionToServer() {
    if (connectionType.equals(CONNECTION_TO_SERVER) &&
	usage == SHARED_CONNECTION)
      return true;
    else
      return false;
  }

  protected synchronized boolean isConnectionOpen() {return connectionOpen;}

  protected synchronized boolean isAliveAndWell() {return isAliveAndWell;}
  protected synchronized void setIsAliveAndWell(boolean state) {isAliveAndWell = state;}


  // Check if a string ends in a character from the set of
  // designated EOT chars.
  //
  private boolean endsInEOT(String inString) {
    for(int i=0; i < Communicator.endOfTransmissionChars.length; i++) {
      if (inString.endsWith(String.valueOf(Communicator.endOfTransmissionChars[i])))
	return true;
    }
    return false;
  }


  // A CONNECTION_TO_CLIENT setup cannot be completed until the first
  // request msg comes in from the client and we can read the client
  // name out of the msg.
  //
  private void completeConnectionToClient(ExternalMsg inputMsg)
  {
    String debugMsg = "Incoming msg on my port " + agentSocket.getLocalPort() +
	              " from client at " +
	              agentSocket.getInetAddress().getHostName() +
	              " and port " + agentSocket.getPort();

    clientName = inputMsg.getField(ExternalMsg.SENDER);
    if ((clientName == null) || !hasNonWhiteSpace(clientName)) {
      if (DEBUG) {
        System.out.println("Connection.completeConnectionToClient: " +
		           "Missing sender name. " + debugMsg + " is: " +
			   inputMsg.toString());
      }
      return;
    }

    connectionName =
      buildConnectionName(clientName,
			  serverName,
			  Integer.toString(localport),
			  Long.toString(acceptCount));
    targetName = clientName;
    Communicator.connectionTable.addConnection(this);

    connectionDescriptor = new ConnectionDescriptor(this, ACCEPTED_CONNECTION);
    
    if (DEBUG) {
      System.out.println("Connection.completeConnectionToClient: " +
		         debugMsg + " is: " + inputMsg.toString());
    }
  }


  /** Open an existing CONNECTION_TO_SERVER, one going from this agent
      as client to a server, for this agent to use for another request
      to that server. We just increment the reference count on the
      Connection to track how many outstanding requests are using the
      Connection.
  */
  protected synchronized void open_again() {
    if (connectionType.equals(CONNECTION_TO_SERVER)) {
      referenceCount++;
    }
  }


  /** This method is called when a msg is queued to the dispatchQueue to
      be sent on a Connection to a server. We keep track of msgs queued
      for these type of Connections so that we can be sure the Connection
      does not get closed until msgs queued for it actually get sent. A
      close of such a Connection with msgs queued for dispatch, will cause
      a flag to be set to suspend the close until the msgs drain from the
      queue. DispatchOutputMsgs will indirectly complete such a suspended
      close when it calls the signalMsgDispatched() method each time a 
      queued msg for this Connection is sent.
  */
  protected synchronized void signalMsgQueuedToDispatch() {
    if (connectionType.equals(CONNECTION_TO_SERVER))
      numMsgsInDispatchQueue++;
  }


  /** This method is called by DispatchOutputMsgs each time it sends a
      queued msg on the Connection. If the close of the Connection is
      pending on the dispatchQueue being drained of msgs for this
      Connection, then the close will eventually be completed by a
      call to this method.
  */
  protected synchronized void signalMsgDispatched() {
    if (connectionType.equals(CONNECTION_TO_SERVER)) {
      numMsgsInDispatchQueue--;
      if (closeWhenMsgsDrainFromDispatchQueue && numMsgsInDispatchQueue == 0) {
	close_final();
      }
    }
  }



  
  /** Close a connection. A shared Connection won't be deleted
      until its reference count goes to 0.
  */
  protected synchronized void close() {
    if (connectionType.equals(CONNECTION_TO_SERVER)) {
      referenceCount--;
      if (referenceCount > 0)
	return;
      else {
	if (numMsgsInDispatchQueue > 0)
	  closeWhenMsgsDrainFromDispatchQueue = true;
	else
          close_final();
      }
    } else {
      close_final();
    }
  }


  protected synchronized void error_close() {
    // NOTE: I need to also think about what happens to msg sends that
    //       fail for Connection error for callers using the ReplyWithObject
    //       mechanism, those using sync sends and connected Provisions.
    //       Are we getting descriptors for those? What happens when the msgs
    //       come in? We need to find a Connection in the connectionTable.
    //       Or do we really need to have a Connection existing when they
    //       come in? How will the waiting thread (on sync sends) get
    //       awakened and the ReplyWithObjects get released?

    // Mark the Connection with an error.
    isAliveAndWell = false;

    // What we do now depends on the type of Connection.
    if (connectionType.equals(CONNECTION_TO_SERVER)) {
      // Do we want to close the socket here or just wait for close_final()
      // to do that? I think it might be good to close it here so no further
      // IO is attempted, esp msg receives if we try to cleanup ReplyWithObject
      // uses.
      try {agentSocket.close();} catch (IOException e) {}

      if (usage == SHARED_CONNECTION) {
	// Make sure it is no longer in the shared table so
	// that new calls on Communicator to send a msg on this
	// Connection will not succeed. Instead the Communicator call
	// will fail and the caller will close his ConnectionDescriptor
	// and may try to get a new one to the same server, establishing a
	// new Connection.
        Communicator.sharedConnectionToServerTable.deleteConnection(this);
      }
      // Regardless of whether the Connection is shared or exclusive,
      // we leave it in the regular connectionTable. (Perhaps we can
      // even remove it from there - it depends on how I fix up
      // DispatchOutputMsgs to deal with msgs already queued that also
      // use this Connection.) We expect that owners of descriptors
      // of these Connections will perform the close() eventually and
      // thus result in the Connection being removed from connectionTable.
      // (Wait, I need to make sure that if some Action, for example,
      // does a msg send and, expecting no reply, closes his descriptor
      // while the msg is still queued. The close might delete the Connection
      // from the connectionTable before DispatchOutputMsgs gets to lookup
      // the Connection in connectionTable. So the message would never
      // actually get sent and the sender would never know. Perhaps, besides
      // referenceCount, we also need to have a numMsgsQueuedForThisConnection
      // count that tells us to hold onto the Connection in the table until
      // both the referenceCount==0 and the numMsgsQueuedForThisConnection==0.
      // I need to consider this further.)
      //
      // NOTE: I addressed the above concern by adding methods
      //       signalMsgDispatched() and signalMsgQueuedToDispatch()
      //       along with some flags they need. -dirk 4/5/98
      // NOTE: A flush() method that waits for the queued msgs to drain
      //       from the dispatchQueue is another interesting idea to
      //       implement. The hard case is the shared Connection where
      //       multiple descriptors are associated. This might be handled
      //       by keeping a reference list in the Connection of the
      //       descriptors with msgs queued on the Connection and augmenting
      //       the dispatchQueue object to have the descriptor reference
      //       with the msg. But this will all have to wait till later also.
      //       -dirk 4/5/98

    } else {
      // For CONNECTION_TO_CLIENT, we just do the hard close on the
      // Connection. There are no descriptors for these Connections.
      // As mentioned above, however, we may still need to worry about
      // outstanding msgs already queued and how to deal with these.
      //
      // NOTE: Queued msgs are not an issue for this type Connection.
      //       Such a Connection never gets closed unless there is an
      //       error on conn.sendMsg(). This point raises the issue
      //       again that we have no way of knowing when to reclaim
      //       and remove a Connection to a client. We will need to
      //       implement a background thread that examines each
      //       Connection's socket to see if the client connected to
      //       the other end has destroyed his socket. Only then can
      //       we be sure that the client, who initiated the connection
      //       to us, is done with us. Implementing such a background
      //       thread will take some investigation as there is no obvious
      //       way or socket method provided to determine if the socket
      //       on the other end is still there. We may have to resort to
      //       some PING protocol if a way cennot be found. This will all
      //       have to wait till I have more time. For now, we will just
      //       let these Connections remain with us. -dirk 4/5/98
      close_final();
    }
  }


  // Close a connection without question. Delete it from any
  // ConnectionTable it is in.
  //
  private synchronized void close_final() {
    try {agentSocket.close();} catch (IOException e) {}
    isAliveAndWell = false;
    connectionOpen = false;

    Communicator.connectionTable.deleteConnection(this);
    if (isSharedConnectionToServer())
      Communicator.sharedConnectionToServerTable.ddeleteConnection(this);
  }


  /** Send a msg over a Connection. Returns true if msg successfully sent.
  */
  protected synchronized boolean /*was void*/ sendMsg(String msg) {
    // See if the Connection is OK.
    if (!isAliveAndWell()) {
      error_close();  // Maybe we don't need this but it shouldn't hurt.
      return false;
    }

    msg = msg + Communicator.endOfTransmission;

    socketOutput.print(msg);

    if (DEBUG) {
      System.out.println("Connection.sendMsg: Writing socket output stream " +
			 "for connection " + connectionName + ". Msg is: " +
			 msg);
    }

    if (socketOutput.checkError()) {
      System.out.println("Connection.sendMsg: ERROR: Failed writing socket " +
                         "output stream for connection " + 
                         connectionName + ". Closing Connection. Msg was: " +
			 msg);

      // We used to attempt to reopen the Connection for connections
      // to servers and try to resend. We could not do the same for
      // connections to clients since a protocol with the client would
      // have been necessary. We now eliminate retries for connections
      // to servers as well. It's too difficult to manage with descriptors
      // now associated with Connections and it's easier for the callers
      // to the send methods in Communicator to handle getting a new
      // descriptor to reopen a Connection when errors arise.

      // Just do an error_close() on the Connection.
      error_close();
      return false;
    }

    return true;
  }


  // This method is used by the run() method when it encounters
  // an improperly formatted or invlaid msg from another agent. It 
  // simply turns around an error msg on the socket without further
  // procesing of the received msg.
  //
  private synchronized void sendErrorMsg(String inputMsg,
					 ExternalMsg eMsg,
					 String errmsg)
  {
    String content = "(Error: " + errmsg + " Msg sent was: " + inputMsg + ")";
    if (eMsg != null) {
      eMsg.setField(ExternalMsg.CONTENT, content);
      sendMsg(eMsg.toString());
    } else {
      // Just do the best we can to compose a msg.
      String msg = "(ERROR :" + ExternalMsg.SENDER + " " + agentName +
	           " :" + ExternalMsg.CONTENT + " (" + content + "))";
      sendMsg(msg);
    }
  }

      

  // Set internal fields in an InternalMsg which identify the
  // communicating agents and their role along with the Connection
  // name. These fields are carried with the msg so that they can
  // be used to recover the Connection on which the msg arrived.
  // This is especially important to Connections the agent has to
  // clients. These are passively created within the Communicator
  // (see PollIncoming) when it accepts connections from external
  // clients. Thus there are no descriptors associated with these
  // Connections and the resultant InternalMsgs received from such
  // clients are used to hold the information about the Connection.
  // The application side of the agent will use such an InternalMsg
  // containing the client request to build the reply msg from the
  // agent. The Communicator's msg send routines can then extract the
  // identity of the Connection from the same internal fields of the
  // reply msg.
  //
  private void setInternalFields(InternalMsg iMsg) {
    iMsg.setField(InternalMsg.CLIENT_AGENT, clientName);
    iMsg.setField(InternalMsg.SERVER_AGENT, serverName);
    iMsg.setField(InternalMsg.LOCALPORT, Integer.toString(localport));
    iMsg.setField(InternalMsg.CONNECTION_NAME, connectionName);
  }



  // This routine supplies a hack into a reply msg from the agent to
  // the PortfolioAgent for our frozen Feb demo. I've made the hacks
  // for that demo conditional on the flag:
  //     NEED_HACK_FOR_FROZEN_FEB_DEMO
  // The PortfolioAgent wants to get its original msg content in
  // SORRY replies, especially when the agent dies or is killed (say
  // by the QUIT button on the agent GUI.
  //
  // Comment below is from time of the Feb demo:
  // Insert an :info field in the msg and store as its value the
  // :content field of the msg. This is so we don't lose the
  // original :content field of a client request from the KQML input
  // msg. We still use the original KQML input msg as a template
  // for building the reply msg in some cases. Well, maybe we don't
  // but I'm not positive at this point that I've fixed all the
  // places in Actions, etc where this has been done. I've tried to
  // change things to clone the original KQML input msg and pass the
  // clone around as the new template. We want to keep the original
  // KQML input msg intact so that we can refer to its objective.
  // In particular, when the agent dies, we want to be able to
  // send SORRY msgs to all clients with pending requests in the
  // agent and its helpful to the clients if we can give them back their
  // original request msg, esp the content field so they can recover
  // more easily. 
  //
  private void insertInfoField(InternalMsg iMsg) {
    String content = iMsg.getField(InternalMsg.CONTENT);
    if (content == null) content = " ";
    iMsg.setField("info", content);
  }


  /** Handle the input side of a Connection. Read a msg and put it
      in the incomingQueue. We translate the input string received
      on the socket to an ExternalMsg. If the ExternalMsg is valid,
      we convert it to an InternalMsg and augment it with additional
      internal fields that identify the Connection. Then it is deposited
      to the incomingQueue. An invalid or toherwise improperly formatted
      input msg does not make it to the incomingQueue. Instead an error
      msg is immediately sent out on the socket.
  */
  public void run() {
    // When we read the socket and parse the input, we might get a piece
    // of the next input msg from the buffer. So store that here so we
    // can append to it on the next socket read.
    String saveString = null;

    int m = 0;
    try {
	    
      for (;;) {
	int bytes_available;
	//Wait until there is something to read
	while ((bytes_available = socketInput.available()) == 0) {
	  yield(); sleep(SLEEPTICKS);
	}
		  
        m++; 
	byte buf[] = new byte[bytes_available];
	int bytes_read = socketInput.read(buf,0,bytes_available);
	String readString = new String(buf,0,bytes_available);
	if (saveString == null) {
	  saveString = readString;
	} else {
	  //concat the result to the saved string
	  saveString = saveString.concat(readString);
	}
        if (DEBUG) {
	  System.out.println("Connection.run." + m + ": readString=\"" +
			     readString + "\"");
	  System.out.println("Connection.run." + m + ": saveString=\"" +
			     saveString + "\"");
          System.out.println("Connection.run." + m + ": readString len = " +
			     readString.length());
          System.out.println("Connection.run." + m + ": saveString len = " +
			     saveString.length());
        }
	    

        ExternalMsg eMsg;
        InternalMsg iMsg;
	String token;
	StringTokenizer tokenizer = new StringTokenizer(saveString,
						        Communicator.endOfTransmission);
	int numTokens = tokenizer.countTokens();

	// Put all but last token in msg queue.
	for (int i = 0; i < (numTokens - 1); i++) {
	  token = tokenizer.nextToken();
          if (DEBUG)
            System.out.println("Connection.run." + m + ": token=\"" +
		 	       token + "\"");
	  eMsg =
	    Communicator.externalMsgInstance.createExternalMsg(token);
 	  if ((eMsg == null) || (!eMsg.isValidMsg())) {
            if (DEBUG)
              System.out.println("Connection.run." + m + ": Invalid input " +
				 "msg: " + token);
	    sendErrorMsg(token, eMsg, "Msg sent was invalid.");
	    continue;  // skip to next token.
	  }
	  // Complete connection setup for CONNECTION_TO_CLIENT on 1st msg.
          if (connectionType.equals(CONNECTION_TO_CLIENT) &&
	      (targetName == null)) {
	    completeConnectionToClient(eMsg);
	  }
	  iMsg = eMsg.convertToInternal();
	  setInternalFields(iMsg);
	  if (Communicator.NEED_HACK_FOR_FROZEN_FEB_DEMO)
	    insertInfoField(iMsg);
	  incomingQueue.addObject(iMsg);
	}

	// Get last token so we can handle it special.
	if (numTokens == 0) {
  	  saveString = null;
	  continue;
	} else {
	  token = tokenizer.nextToken();
	}

	// If last token was EOT terminated, it's a complete msg.
	// If not, we save the token and go wait for rest of it
	// to show up on the input stream.
	if (endsInEOT(saveString)) {
          if (DEBUG)
            System.out.println("Connection.run." + m + ": token=\"" +
			       token + "\"");
	  eMsg =
	    Communicator.externalMsgInstance.createExternalMsg(token);
 	  if ((eMsg == null) || (!eMsg.isValidMsg())) {
            if (DEBUG)
              System.out.println("Connection.run." + m + ": Invalid input " +
				 "msg: " + token);
	    sendErrorMsg(token, eMsg, "Msg sent was invalid.");
	    continue;  // ignore this token.
	  }
	  // Complete connection setup for CONNECTION_TO_CLIENT on 1st msg.
          if (connectionType.equals(CONNECTION_TO_CLIENT) &&
	      (targetName == null)) {
	    completeConnectionToClient(eMsg);
	  }
          if (DEBUG)
            System.out.println("Connection.run." + m + ": completed Conn.");
	  iMsg = eMsg.convertToInternal();
	  setInternalFields(iMsg);
	  if (Communicator.NEED_HACK_FOR_FROZEN_FEB_DEMO)
	    insertInfoField(iMsg);
          if (DEBUG)
            System.out.println("Connection.run." + m + ": add to incoming queue.");
	  incomingQueue.addObject(iMsg);
	  saveString = null;
	} else {
 	  saveString = token;  // I'm just a partial msg.
          if (DEBUG)
            System.out.println("Connection.run." + m +
			       ": final saveString=\"" +
			       saveString + "\"");
        }

      } // end of for


    } // end of try
    catch (IOException e) {
      if (DEBUG) {			// 02Oct98-garof: added if (DEBUG)
	System.out.println("Connection.run: IOException: " + e.getMessage());
        System.err.println("Connection.run: IOException: " + e.getMessage());
      }
      error_close();
    } 
    catch (InterruptedException e) {
      System.out.println("Connection.run: InterruptedException: " + e.getMessage());
      System.err.println("Connection.run: InterruptedException: " + e.getMessage());
      error_close();
    } 
    finally {
      //error_close();
      //isAliveAndWell = false;
      //close_final();
    }
  }


  // Checks is a string has any non-whitespace characters. Returns
  // true if it does, false if it does not or if string is null.
  //
  private static boolean hasNonWhiteSpace(String str) {
    if (str == null) return false;
    char[] strChars = str.toCharArray();
    for (int i=0; i