For purposes of the discussion below, a quick reference to the public methods of the Communicator API is provided below. These are not all of the methods, just the ones that are relevant to the discussion here that attempts to explain some of the rationale behind the Communicator design and also to give some insight on how to use the API.
public ConnectionDescriptor openConnection(String agentName); public ConnectionDescriptor openExclusiveConnection(String agentName); public void closeConnection(ConnectionDescriptor cd); public ConnectionDescriptor lookupConnection(InputMsgObject inputMsgObj);public boolean sendMsg(ConnectionDescriptor cd, InternalMsg msg); public boolean queueMsg(ConnectionDescriptor cd, InternalMsg msg); public InternalMsg sendMsgAndGetReply(ConnectionDescriptor cd, InternalMsg msg);
public InternalMsg getInputMsg(ConnectionDescriptor cd); public InternalMsg waitOnGetInputMsg(ConnectionDescriptor cd); public InputMsgObject getInputMsg(); public InputMsgObject waitOnGetInputMsg(); public InputMsgObject getNextInputMsg(); public InputMsgObject waitOnGetNextInputMsg();
public void refuseNewConnections(); public void flushMsgsAndStop(); public void unRegisterWithANS(); <*/PRE> The Communicator provides an abstraction of communication between agents based on the names of agents. The form of the communication is ASCII messages transmitted on TCP/IP sockets. The mapping of an agent name to its host and port address is done by the Communicator through our AgentNameServer (ANS). The ANS provides for a distributed agent name registration and lookup facility. The Communicator will register the agent's name, host and port with the ANS when the Communicator is instantiated. To establish communication with another agent, the Communicator queries the ANS with that other agent's name in order to obtain its host and port. The Communicator then uses the host and port to establish a connection to the external agent. The endpoints of a connection are sockets, one in the agent and one in the external agent. Creation and management of the agent's connections is handled transparently by the Communicator.
The agent never deals directly with the sockets or connections. Instead, through the Communicator API, the agent can obtain a ConnectionDescriptor object that is a reference to a connection with another agent. The agent uses the "openConnection()" or "openExclusiveConnection()" method of the API to obtain a ConnectionDescriptor with the only argument required being the name of the agent to which a connection is desired. Calling either of these methods causes the Communicator to do a lookup to the ANS for the host and port of the external agent. The Communicator then uses this to create a socket and connect it to the socket at the advertised host and port of the external agent.
It is possible for the agent to have multiple connections to the same external agent. When this is desired, the agent should use "openExclusiveConnection()" to get a descriptor to a new connection. There might be any number of reasons why an agent might want to have multiple connections to the same external agent. Perhaps the agent wants to communicate several service requests to some external server agent and manage the dialogue for each request over a separate connection, perhaps assigning a separate thread to send and receive the message dialogue related to each separate service request. In some situations the external server agent may impose the requirement that an agent make each service request by opening a new connection to the server.
When an exclusive connection is not needed by the agent, it can use "openConnection()". This still assigns a new descriptor but it may point to an existing connection to the particular external agent, a connection that was created previously by the agent's prior call to "openConnection()" to get a connection to that external agent. The Communicator will keep a reference count on such connections that have multiple descriptors in order to assure that the connection is not destroyed until each of the descriptors is disposed of.
The agent disposes of descriptors using the "closeConnection()" method, giving it a ConnectionDesriptor as its argument. The concept of a shared connection is useful when the agent is multi-threaded and does not wish to incur the overhead of connection setup each time one of the agent's threads sends a message to the same external agent. Of course, the agent could keep track itself of which of its descriptors refer to which external agents, providing a data structure shared among its threads holding this information. So the "openConnection()" method may be viewed as more of a convenience than a necessity.
With a descriptor in hand, the agent sends a message to the external agent by supplying a descriptor and a message object as args to the "sendMsg()" or "queueMsg()" method of the API. The latter method causes the message to be deposited in an internal message queue of the Communicator where a dedicated Communicator thread handles sending each message over the associated connection. The call to "sendMsg()", however, results in the message being sent directly on the connection with the calling thread absorbing all the overhead down to the actual socket IO. The use of "sendMsg()" may be preferred when the message to be sent has an urgent priority and/or when the agent needs to know immediately if there is some error in the IO. With "queueMsg()", the agent does not learn of IO errors until the next attempt to send a message on the connection. The only error returned by "queueMsg()" would be related to some problem in the message object arg or an invalid descriptor arg. However, in many agent environments, "queueMsg()" is adequate in view of the expectations of uncertainty in an open world model which agents are often expected to anticipate using timeouts and retry/recovery strategies.
The "sendMsg()" and "queueMsg()" methods provide an asynchronous interface for the message sending half of agent communication. The receiving side, getting messages from external agents, of asynchronous IO has a set of methods that will be discussed shortly. But the API also provides a method that performs synchronous IO. The "sendMsgAndGetReply()" method does, as its name suggests, send a message to the external agent and then block the calling thread waiting for the reply message from the external agent. The method's args are also the descriptor of the connection to the external agent and the message object to be sent. Synchronous IO is usually only appropriate when the calling thread can do nothing further that is useful until the external agent replies to the sent message. Synchronous IO may also be acceptable if the external agent is both reliable and prompt with its replies. One useful addition to our API might be to overload the synchronous method to include one that takes a timeout arg for waiting in the Communicator for the reply message.
The most typical use of synchronous IO is in an agent that is performing in the client role, sending a message to an external agent that is a server or is acting in a server role for that particular communication. In a pure server agent, there may be little or no use for synchronous IO as the communication model for servers is to wait for service request messages from clients that arrive in the server asynchronously. This raises the question of how the connection gets created in the server agent. The server agent does not know a priori what clients may want to talk to it and thus cannot just call "openConnection()" to prearrange a connection for every possible client.
Instead, there is a special "listen" operation, provided in any socket-based implementation for communication, that the Communicator calls to listen for external agents (the client in this case) trying to connect to the agent. The listen operation is assigned to a dedicated thread in the Communicator to perform since it blocks listening until a connect request arrives. The result of such a client connect request arriving is that a new socket is created in the server agent to provide the socket endpoint in the server for the connection to the client. The other endpoint is just the socket created in the client and used in opening the connection to the server.
The client and server roles are just duals of each other. A peer to peer communication model is one in which the agent performs both of these roles and in accordance with the agents needs. An agent will usually be willing to provide some service and take on the server role in listening for and accepting new connections initiated by the external clients. An agent will also usually need to make requests itself for the services of other agents, taking on the client role of initiating a connection to a server or other external agent.
In the client role, initiating a connection is done by calling one of the two Communicator methods for opening a connection and results in a descriptor being returned that the agent uses when sending its request message. In the server role, the agent needs to discover when an external agent operating in the client role initiates a connection. The Communicator handles this connection creation passively in response to the external agent's connection initiation. Thus, in the server role, the agent does not learn about the connection until it calls the Communicator API to get an input message. As we will see, there are methods in the API specifically intended to deliver messages arriving on such externally initiated connections. These methods return an object that holds both the message and a descriptor for the connection passively created by the Communicator in response to the external agent's connection initiation.
The distinction between connections that the agent initiates (i.e., the client role) versus connections initiated by external agents and accepted by the agent (i.e., the server role) is one in which the Communicator adopts a notion of connection ownership in which the "owner" of the connection is the agent that initiated it. The Communicator views its willingness to accept connections initiated by external agents as having a corresponding commitment to maintain that connection until the external agent breaks the connection by closing the socket on its endpoint of the connection. Thus the agent's call of "closeConnection()" on a connection "owned" by an external agent is ignored and treated as a no-op by the Communicator. In summary, an agent's owned connections are those explicitly created by the agent in the client role. The agent's unowned connections are those that are passively created by the agent in the server role.
Keeping the notion of connection ownership in mind helps to distinguish the three distinct ways in which the agent obtains messages that arrive in the Communicator on these two sets of connections. The Communicator receives messages sent by external agents that arrive on any of its connections. It then deposits the messages into a central queue. Each msg in the queue is then tagged with the descriptor of its associated connection. The agent can choose how to extract messages from the queue according to the role, client or server, that the agent wishes to perform at any given time. It is more likely that the agent may be multi-threaded, perhaps with one group of threads handling the owned connections, the client role, and with another set of threads handling the unowned connections, the server role.
In the client role, the agent knows all the connections it has created and explicitly provides the descriptor it has for a connection as an arg to the "getInputMsg()" method of the API. This form of "getInputMsg()" returns a message if there was one available on the given connection and null otherwise. The second form of "getInputMsg()" takes no arg and is useful in the server role to obtain the next input message that arrived on an unowned connection. This corresponds to the behavior that a server would perform in getting the next client request. This overloaded form of "getInputMsg()" returns an InputMsgObject from which both the message and the descriptor can be obtained. The third mode of input is to simply get the next msg in the queue regardless of what type of connection it is. This might correspond to a more integrated form of the peer to peer role in which the dichotomy between client and server roles is obscured from the peer to peer abstraction. The "getNextInputMsg()" method of the API takes no args and also delivers an InputMsgObject or null when no message is present in the queue. The descriptor is obtained by calling the "lookupConnection()" method of the API with the InputMsgObject as its arg. The message itself can be extracted by calling the "getMsg()" method in the InputMsgObject.
In all three of the above methods for getting an input message, the method call returns immediately either with the message (or InputMsgObject) or else null. Thus it may not be wise to place the method call in a tight loop unless a sleep or yield is placed inside the loop to allow other threads in the agent to have the opportunity to get the cpu and run. A better solution might be to have the thread wait on the arrival of a message rather than loop. Thus the API provides a dual set of methods to the above set of three input message methods where the names of the methods in this second set are distinguished by having "waitOn" prepended to the names of the methods in the first set. The args are the same as in the first set of methods. The second set, however, since they block waiting for a message, never return null.
When the agent decides to terminate itself, it should call the method "refuseNewConnections()" to prevent any new client connections from being accepted by the agent. It then should call "flushMsgsAndStop()" in the API to flush any output messages queued and stop all Communicator threads. Following this, "unRegisterWithANS()" should be called in the Communicator to remove the registration of the agent with the ANS.
As you may have noticed, all message arguments in the Communicator are specified as type InternalMsg. InternalMsg is a Java interface that defines an abstraction for the message objects to be passed between the agent and its Communicator through the API. By specifying an interface here, the Communicator can be incorporated and used in any agent regardless of the agent message language (KQML or FIPA for example) it may use. All that is required is that the agent be able to define a message object class that implements the InternalMsg interface and interacts with the Communicator on the basis of these objects. We have defined such a message object class based on KQML, InternalKQMLmessage, for our own agents. When the Communicator constructor is called it must be supplied with an instance of the agent's message object class. A similar arrangement is provided on the network side of the Communicator. The ExternalMsg interface provides the abstraction with the agent providing an implementation of ExternalMSg in the desired message language format. We have provided a KQML-based implementation, ExternalKQMLmessage, for our agent community. With these abstractions, we could install one of our agents in a FIPA community by supplying a FIPA-based implementation of ExternalMsg, eg., ExternalFIPAmessage. See the AgentMsg, InternalMsg and ExternalMsg interface modules in the Communicator along with our KQML-based implementations of InternalMsg and External Msg in java.EDU.cmu.softagents.misc.KQMLParser.InternalKQMLmessage and java.EDU.cmu.softagents.misc.KQMLParser.ExternalKQMLmessage for more details. */ public class Communicator implements CommunicationInterface { // To get debugging msgs. public static /*final*/ boolean DEBUG = true; // Do we need the hack or not? Make it a public so we can set it // from the agent init code. public static /*final*/ boolean NEED_HACK_FOR_FROZEN_FEB_DEMO = true; // This file tells the agent which Agent Name Servers it may // communicate with. The format of the file is: //
public static String DEFAULT_ANS_LIST_FILE = "ANSlistFile"; // Communication to the ANS for agent name lookups // happens through this object. protected static ANSClientInterface ansComm; // String consisting of end of transmission characters. public static String endOfTransmission = "\u0004"; // Character array representation of end of transmission characters. public static char[] endOfTransmissionChars = endOfTransmission.toCharArray(); // Once instantiated, this will give us access to the // translateToInternalMsg() method of the specific class // that knows how to convert a user application msg into // an InternalMsg. // protected static InternalMsg internalMsgInstance = null; public static void setInternalMsgInstance(InternalMsg internalMsgInstance) { Communicator.internalMsgInstance = internalMsgInstance; } public static InternalMsg getInternalMsgInstance() { return internalMsgInstance; } // Once instantiated, this will give us access to the // translateToExternalMsg() method of the specific class // that knows how to convert an incoming msg from another // agent into an ExternalMsg. // protected static ExternalMsg externalMsgInstance = null; public static void setExternalMsgInstance(ExternalMsg externalMsgInstance) { Communicator.externalMsgInstance = externalMsgInstance; } public static ExternalMsg getExternalMsgInstance() { return externalMsgInstance; } // All open Connections are stored in this table. All communication // goes through a Connection. The lookup name will be composed from // the triple associated with the // Connection. // NOTE: Now that we've modified the design to include descriptors // for accepted Connections initiated by external agents, the // connection-name will be formed for these type of Conections // from the vector . protected static ConnectionTable connectionTable = null; // A Connection to a server that is shared for more than 1 request // will also be stored in this table so that, when a shared Connection // is desired, it can be easily looked up. The lookup name will be // the serverName associated with the Connection. protected static ConnectionTable sharedConnectionToServerTable = null; // Hashtable needed by the ReplyWithObjects. protected Hashtable replyWithHash; // A queue to hold msgs that come in from other agents. These msgs // are translated to ExternalMsgs when they come in, are validated, // transformed to InternalMsgs, and then put in incomingQueue. protected static GenericQueue incomingQueue; // A queue to hold all msgs from the incomingQueue remaining after // the special msgs (those routed by ReplyWithObjects) are extracted. // In a Retsina agent, the msgs deposited to the inputMsgQueue should all // be client requests, i.e., objectives. protected InputMsgQueue inputMsgQueue; // This is the central queue where msgs to be sent to other agents can // be deposited, i.e., the caller does not block waiting on the actual // IO that does the msg send. protected GenericQueue dispatchQueue; // Info about this agent. protected static String agentName = null; protected static String agentHost = null; protected static int agentPort = 0; protected String agentInfo = null; protected ServerSocket listen_socket = null; // The acceptConnections thread will loop accepting new Connections // from external agents. The created Connections will receive // msgs from the external agents and deposit them to the // incomingQueue. protected AcceptConnections acceptConnections; // The routeInputMsgs thread will take the msgs from the incomingQueue // and recognize msgs with special replyWith IDs, routing them via // ReplyWithObjects. Msgs not specially routed are deposited to the // inputMsgQueue. protected RouteInputMsgs routeInputMsgs; // The dispatchOutputMsgs thread will take msgs deposited to the // dispatchQueue and send them out on the correct Connection to // the associated external agent. protected DispatchOutputMsgs dispatchOutputMsgs; // The Communicator will log msgs sent to or received from other // agents. This logging may be done by an arbitrary number of msg // logging facilities. Logging facilities that also have an // associated external logging agent, are kept in a separate hash // so that the Communicator can avoid an infinite cycle of logging // the logging msgs. protected MsgLogInterface logFacilityList[]; protected Hashtable logFacilityHash; /** Use this constructor when there are no msg logging facilities, such as a demodisplay, being used, and there is an ANSlistFile. */ public Communicator(String agentName, String agentHost, int agentPort, InternalMsg internalMsgInstance, ExternalMsg externalMsgInstance, String agentInfo, boolean isRetsinaAgent) throws CommunicatorException { this(agentName, agentHost, agentPort, internalMsgInstance, externalMsgInstance, agentInfo, isRetsinaAgent, null, null, null); } /** Use this constructor when there are no msg logging facilities, such as a demodisplay, being used, and ANS information is passed in as arguments. */ public Communicator(String agentName, String agentHost, int agentPort, InternalMsg internalMsgInstance, ExternalMsg externalMsgInstance, String agentInfo, boolean isRetsinaAgent, String ANShost, Integer ANSport) throws CommunicatorException { this(agentName, agentHost, agentPort, internalMsgInstance, externalMsgInstance, agentInfo, isRetsinaAgent, null, ANShost, ANSport); } /** Use this constructor when there are msg logging facilities, such as a demodisplay, being used, and there is an ANSlistFile. */ public Communicator(String agentName, String agentHost, int agentPort, InternalMsg internalMsgInstance, ExternalMsg externalMsgInstance, String agentInfo, boolean isRetsinaAgent, MsgLogInterface logFacilityList[]) throws CommunicatorException { this(agentName, agentHost, agentPort, internalMsgInstance, externalMsgInstance, agentInfo, isRetsinaAgent, logFacilityList, null, null); } /** Use this constructor when there are msg logging facilities, such as a demodisplay, being used, and ANS information being passed in as arguments. */ public Communicator(String agentName, String agentHost, int agentPort, InternalMsg internalMsgInstance, ExternalMsg externalMsgInstance, String agentInfo, boolean isRetsinaAgent, MsgLogInterface logFacilityList[], String ANShost, Integer ANSport) throws CommunicatorException { this.agentName = agentName; this.agentHost = agentHost; this.agentPort = agentPort; this.internalMsgInstance = internalMsgInstance; this.externalMsgInstance = externalMsgInstance; this.agentInfo = agentInfo; this.logFacilityList = logFacilityList; try { listen_socket = new ServerSocket(agentPort); } catch (IOException e) { System.out.println("Communicator.constructor: IOException while " + "creating ServerSocket: " + e.getMessage()); throw new CommunicatorException("Communicator.constructor: IOException" + " while creating ServerSocket: " + e.getMessage()); } if (agentPort == 0) { agentPort = listen_socket.getLocalPort(); this.agentPort = agentPort; if (DEBUG) // 02Oct98-garof: added if (DEBUG) System.out.println("Communicator.constructor: Anonymous port " + "selected for connection requests is port " + agentPort); } if( (ANShost == null) || (ANSport == null)){ registerWithANS(); } else { registerWithANS(ANShost, ANSport.intValue()); } // Create connectionTable where all Connections will be stored. // Lookup name in the table will be the connectionName. if (connectionTable == null) connectionTable = new ConnectionTable(agentName, false); // Create sharedConnectionToServerTable which will hold any // Connection to a server agent that is shared. This is a subset // of the Connections in connectionTable. Lookup name in the table //will be the serverName. if (sharedConnectionToServerTable == null) sharedConnectionToServerTable = new ConnectionTable(agentName, true); // Hashtable for ReplyWithObjects that do special routing on // incoming msgs that match their associated reply-with ID. replyWithHash = new Hashtable(); // Hashtable for logging agents associated to their log facilities. logFacilityHash = new Hashtable(); // Create all the queues. incomingQueue = new GenericQueue(); inputMsgQueue = new InputMsgQueue(); dispatchQueue = new GenericQueue(); acceptConnections = new AcceptConnections(incomingQueue, agentName, agentPort, listen_socket); if (!isRetsinaAgent) routeInputMsgs = new RouteInputMsgs(incomingQueue, inputMsgQueue, replyWithHash, true, logFacilityList, logFacilityHash); dispatchOutputMsgs = new DispatchOutputMsgs(agentName, dispatchQueue, logFacilityList, logFacilityHash); // Start the logging facilities only once the Communicator // is completely set up. So if we are bringing up a // RetsinaCommunicator, we'll call startMsgLogFacilities() // at the end of its constructor. if (!isRetsinaAgent) startMsgLogFacilities(logFacilityList); } // end of constructor /** This method starts up the msg logging facility(s) and builds a hash of those that have associated logging agents. */ protected void startMsgLogFacilities(MsgLogInterface logFacilityList[]) throws CommunicatorException { if (logFacilityList == null) return; for (int i = 0; i < logFacilityList.length; i++) { if (logFacilityList[i] == null) { System.out.println("Communicator.startMsgLogFacilities: " + "The list of msg logging facilities had a " + "null entry at list element " + i); throw new CommunicatorException("Communicator.startMsgLogFacilities: " + "The list of msg logging facilities had " + "a null entry at list element " + i); } // Initialize the logging facility. boolean success = logFacilityList[i].logInit(agentName, agentInfo, this); if (!success) { System.out.println("Communicator.startMsgLogFacilities: " + "Failed to init the logging facility, " + logFacilityList[i].getLogFacilityName()); throw new CommunicatorException("Communicator.startMsgLogFacilities: " + "Failed to init the logging facility, " + logFacilityList[i].getLogFacilityName()); } // If the logging facility has an associated logging agent, put // the logging agent's name in a hash for easy lookup so that the // Communicator does not try to log msgs to/from a logging agent. String loggingAgentName = logFacilityList[i].getLoggingAgentName(); if (loggingAgentName != null) logFacilityHash.put(loggingAgentName, logFacilityList[i]); // Log the agent's creation. logFacilityList[i].logAgentBirth(); } // endfor } /** This method provides access to the msg logging facility(s) for other components in the agent or application using the Communicator to log transitions in their states. */ public void logAgentState(String component, String state) // throws CommunicatorException { if (logFacilityList == null) return; for (int i = 0; i < logFacilityList.length; i++) { logFacilityList[i].logAgentState(component, state); } // endfor } // Modified 14 sept 1999 by Matt Easterday /** This method is kept hidden in the Communicator which, for now, will assume the agent always will be registering wth the ANS. */ protected void registerWithANS() throws CommunicatorException { // Register with the ANS, but first get // an object which implements CommunicationInterface try { ansComm = new DirectCommunicationWithANS(DEFAULT_ANS_LIST_FILE); } catch (NoANSfoundException e) { throw new CommunicatorException("Communicator.registerWithANS: " + "ANS not found: " + e.getMessage()); } registerWithANSHelper(ansComm); } // Modified 14 sept 1999 by Matt Easterday /** This method is kept hidden in the Communicator which, for now, will assume the agent always will be registering wth the ANS. Like registerWithANS() but ANS information is specified explicitly rather than read in from an ANSlistFile */ protected void registerWithANS(String ANShost, int ANSport) throws CommunicatorException { // Register with the ANS, but first get // an object which implements CommunicationInterface try{ System.out.println("Communicator.registerWithANS: " + agentName + "creating ANSlistFile " + "information internally."); ansComm = new DirectCommunicationWithANS(ANShost, ANSport); } catch (NoANSfoundException e) { throw new CommunicatorException("Communicator.registerWithANS: " + "ANS not found: " + e.getMessage()); } registerWithANSHelper(ansComm); } // Modified 14 sept 1999 by Matt Easterday /** Helper function for registerWithANS() and registerWithANS(StringANShost, int ANSport). */ protected void registerWithANSHelper(ANSClientInterface ansComm) throws CommunicatorException { try { Status return_val = ansComm.register(agentName, agentHost, agentPort); if (!return_val.get().equals("success")) { if (return_val.get().equals("sorry")) { Status temp_return_val = ansComm.unRegister(agentName); temp_return_val = ansComm.register(agentName, agentHost, agentPort); if (!temp_return_val.get().equals("success")) { throw new CommunicatorException("Communicator.registerWithANS: " + "Tried twice and failed: "); } } else { throw new CommunicatorException("Communicator.registerWithANS: " + "Cannot register with ANS: "); } } } catch (NoANSfoundException e) { throw new CommunicatorException("Communicator.registerWithANS: " + "ANS not found: " + e.getMessage()); } } /** This method is made available to allow for agent death to happen gracefully by unregistering from the ANS. */ public void unRegisterWithANS() { try { Status return_val = ansComm.unRegister(agentName); if (!return_val.get().equals("success")) { System.out.println("Communicator.unregisterWithANS: " + "Cannot unregister " + agentName + " from the ANS"); } } catch (NoANSfoundException e) { System.out.println("Communicator.unRegisterWithANS: ANS not found."); } } //------------------------------------------------------------------------- // Deleted the code for advertising. Will do in BasicAgent. //------------------------------------------------------------------------- //------------------------------------------------------------------------- // END deleted code for advertising. Will do in BasicAgent. //------------------------------------------------------------------------- public static String getAgentName() {return agentName;} public static String getAgentHost() {return agentHost;} // ??? Why is this now public instead of protected and should the // above 2 methods be public? -dirk 7/17/98 public static int getAgentPort() {return agentPort;} protected static GenericQueue getIncomingQueue() {return incomingQueue;} protected static ANSClientInterface getANScomm() {return ansComm;} //----------------------------------------------------------------------- // Public methods that implement the CommunicationInterface follow. //----------------------------------------------------------------------- //------------------------------------------------------------- // These next methods are used to deal with agent death by // preventing new Connections to the agent and flushing the // output queue of any msgs waiting to go out. //------------------------------------------------------------- /** This method is used when an agent terminates to prevent any new client connections from being accepted while the agent performs termination activities. For Retsina agents, this may involve such things as sending SORRY msgs to clients with outstanding top-level tasks and objectives. */ /* NOTE: This mechanism should be made more precise to make sure msgs in the input pipeline reach internal destinations, specifically the ObjectiveDB in a Retsina agent, so that all clients are assured of getting Sorry msgs. Do this after the demo. -dirk 2/20/98 */ public synchronized void refuseNewConnections() { if (acceptConnections == null) return; acceptConnections.stop(); acceptConnections.closeServerSocket(); } /** This method is used when an agent terminates to flush any output msgs it has queued. For Retsina agents, some of these msgs will be SORRY msgs sent to clients with outstanding top-level tasks and objectives. Once the msgs are all sent, the Communicator threads are stopped. */ /* NOTE: Should we make "unRegisterWithANS()" a protected method and insert a call to it here or in "refuseNewConnections()"? -dirk 7/17/98 */ public synchronized void flushMsgsAndStop() { // Wait for msgs to get sent. try { while (!dispatchQueue.isEmpty()) { Thread.yield(); Thread.sleep(1000); // Sleep for 1000 msec } } catch (InterruptedException e) {} if (logFacilityList != null) { // Log the agent's death with each logging facility. for (int i = 0; i < logFacilityList.length; i++) { logFacilityList[i].logAgentDeath(); } } // Wait again for any msgs to logging agents to get sent. try { while (!dispatchQueue.isEmpty()) { Thread.yield(); Thread.sleep(1000); // Sleep for 1000 msec } } catch (InterruptedException e) {} dispatchOutputMsgs.stop(); routeInputMsgs.stop(); } //------------------------------------------------------------- // These next methods handle opening, closing, and looking // up Connections. //------------------------------------------------------------- // Does this need to be synchronized? /** Open a shared connection to the given agent. If a shared connection is already opened, a connection does not need to be created but a new descriptor for it is returned. (The new descriptor bumps the reference count on the connection.) Returns null if the connection cannot be made. */ public ConnectionDescriptor openConnection(String agentName) { return ConnectionDescriptor.openConnectionToServer(agentName); } // Does this need to be synchronized? /** Open an exclusive connection to the given agent. A new connection to the agent is created and a descriptor returned for it. Returns null if the connection cannot be made. */ public ConnectionDescriptor openExclusiveConnection(String agentName) { return ConnectionDescriptor.openExclusiveConnectionToServer(agentName); } // Does this need to be synchronized? /** Close the connection associated with the descriptor. (For a shared connection, the reference count is decremented.) The descriptor is marked as invalid so that it cannot be used again. For a connection that was created by the Communicator in response to a connection request from an external agent (via Communicator's ServerSocket), close will have no effect. */ public void closeConnection(ConnectionDescriptor cd) { cd.close(); } // Does this need to be synchronized? /** Find the connection on which the msg arrived and get the associated descriptor for the connection. If the connection is a shared connection, create a new descriptor for it. Return the descriptor on success. If the connection has since been closed, return null. NOTE: Caller should use the matchMyDescriptor() method of the InputMsgObject to first determine if he already has a descriptor for the connection on which the msg arrived. */ public ConnectionDescriptor lookupConnection(InputMsgObject inputMsgObj) { Connection conn = inputMsgObj.getConnection(); if (conn.isSharedConnectionToServer()) { return openConnection(conn.getServerName()); } else { return conn.getDescriptor(); } } //------------------------------------------------------------- // These next methods all deal with getting input msgs that // have arrived. Each method has a dual listed below it that // does the same thing but which blocks if necessary. Note that // there will be competition among any threads waiting for a msg // on the same descriptor, among any threads waiting for a msg // on set of descriptors generated via a ServerSocket, and among // any threads waiting just for the next msg of any type. There // will also be a cross competition between the group of threads // waiting for the next input msg of any type and the other 2 // thread groups. Thus synchronization in the Communicator for // getting input msgs will be important. And callers of these // methods will have to keep in mind the competition between // their threads when they mix and match the use of competing // method calls. // // NOTE: If multiple threads in the application are reading the // input queue, the use of show and get methods should be // made through an application object that synchronizes // access. Otherwise the msg one thread sees from a show // may be taken by another thread using get before the // first thread has a chance to get it. // // NOTE: For methods that return an InputMsgObject, the msg can be // obtained through a method call in the InputMsgObject. The // InputMsgObject can also be used to see if the caller // has a ConnectionDescriptor for the Connection on which the msg // arrived. A descriptor would be needed for the caller to send a // reply msg. If the caller does not have a descriptor, one can be // obtained through the Communicator's lookupConnection() method. //------------------------------------------------------------- /** Get the next input msg that has arrived on the given connection. If none are currently available, return null. */ public InternalMsg getInputMsg(ConnectionDescriptor cd) { if (cd == null) return null; InputMsgObject imo = inputMsgQueue.getInputMsg(cd.getConnection()); if (imo != null) return imo.getMsg(); else return null; } /** Get the next input msg that has arrived on the given connection. If none are currently available, block waiting for one. */ public InternalMsg waitOnGetInputMsg(ConnectionDescriptor cd) { if (cd == null) return null; InputMsgObject imo = inputMsgQueue.waitOnGetInputMsg(cd.getConnection()); return imo.getMsg(); } /** Show the next input msg that has arrived on the given connection. If none are currently available, return null. The msg is not removed from the input queue. If multiple threads in the application are reading the input queue, the use of show and get should be made through an application object that synchronizes access. Otherwise the msg one thread sees from a show may be taken by another thread using get before the first thread has a chance to get it. */ public InternalMsg showInputMsg(ConnectionDescriptor cd) { if (cd == null) return null; InputMsgObject imo = inputMsgQueue.showInputMsg(cd.getConnection()); if (imo != null) return imo.getMsg(); else return null; } /** Show the next input msg that has arrived on the given connection. If none are currently available, block waiting for one. */ public InternalMsg waitOnShowInputMsg(ConnectionDescriptor cd) { if (cd == null) return null; InputMsgObject imo = inputMsgQueue.waitOnShowInputMsg(cd.getConnection()); return imo.getMsg(); } /** Get the next input msg that has arrived on a connection that was created by the Communicator (via its ServerSocket) responding to an external connection request. If no msg is currently available, return null. Returns an InputMsgObject from which the msg can be obtained. The InputMsgObject can also be used to see if the caller has a ConnectionDescriptor for the Connection on which the msg arrived. A descriptor would be needed for the caller to send a reply msg. If the caller does not have a descriptor, one can be obtained through the Communicator's lookupConnection() method. NOTE: An InputMsgObject will have methods that allow one to obtain the msg and determine if the caller has a descriptor available for the connection on which the msg arrived. The descriptor then allows the caller to send any reply msg back via that connection. Thus a special sendReplyMsg() interface is not needed for msgs that arrive on these type of connections. The only distinction in these connections is that, having been initiated by an external agent, they cannot be destroyed by calling closeConnection(). */ public InputMsgObject getInputMsg() { return inputMsgQueue.getInputMsg(); } /** Get the next input msg that has arrived on a connection that was created by the Communicator (via its ServerSocket) responding to an external connection request. If no msg is currently available, block until one arrives. */ public InputMsgObject waitOnGetInputMsg() { return inputMsgQueue.waitOnGetInputMsg(); } /** Show the next input msg that has arrived on a connection that was created by the Communicator (via its ServerSocket) responding to an external connection request. If no msg is currently available, return null. */ public InputMsgObject showInputMsg() { return inputMsgQueue.showInputMsg(); } /** Show the next input msg that has arrived on a connection that was created by the Communicator (via its ServerSocket) responding to an external connection request. If no msg is currently available, block until one arrives. */ public InputMsgObject waitOnShowInputMsg() { return inputMsgQueue.waitOnShowInputMsg(); } /** This method will remove the next msg from the input queue. These msgs are all the ones received from external agents that have not been specially routed by the Communicator according to distinguished reply-with IDs. In an Retsina agent, the msgs in the input queue should all correspond to client requests containing objectives for the agent to service. */ /** Get the next input msg that has arrived on any connection. If no msg is currently available, return null. */ public InputMsgObject getNextInputMsg() { return inputMsgQueue.getNextInputMsg(); } /** Get the next input msg that has arrived on any connection. If no msg is currently available, block until one arrives. */ public InputMsgObject waitOnGetNextInputMsg() { return inputMsgQueue.waitOnGetNextInputMsg(); } /** Show the next input msg that has arrived on any connection. If no msg is currently available, return null. */ public InputMsgObject showNextInputMsg() { return inputMsgQueue.showNextInputMsg(); } /** Show the next input msg that has arrived on any connection. If no msg is currently available, block until one arrives. */ public InputMsgObject waitOnShowNextInputMsg() { return inputMsgQueue.waitOnShowNextInputMsg(); } /** This method provides a means to route distinguished input msgs to a msg queue belonging to the caller. The string returned by routeInputMsgs() is a unique reply-with ID. Input msgs that have this reply-with ID in their in-reply-to field will be routed to the msgQueue and the caller's thread will process them from the msgQueue. The objects deposited in the msgQueue will be InputMsgObjects, thus allowing the caller to obtain the ConnectionDescriptor and distinguish among multiple senders, when present, and respond to a sender over the same connection upon which the msg arrived. The idea here is that the caller first gets the Communicator to establish a special routing path to the caller's msg queue for any incoming msgs having their in-reply-to field set to the special reply-with ID assigned here. When the caller then sends msgs to any external agent(s) with the reply-with field of the msg set to the special reply-with ID, the reply msgs of the external agent(s) will have their in-reply-to field set to this special reply-with ID. The Communicator recognizes the special ID in the incoming msgs and then routes them to the caller's msg queue instead of the Communicator's internal msg queue. NOTE: Should we provide an unrouting method? public boolean unRouteInputMsgs(GenericQueue msgQueue); or public boolean unRouteInputMsgs(String replyWithID); */ public String routeInputMsgs(GenericQueue msgQueue) { ReplyWithObject replyWith = new ReplyWithObject(msgQueue); String replyWithID = replyWith.getValue(); // First put our ReplyWithObject in a hash keyed on our unique // replyWithID so it can be looked up when the msg comes in. replyWithHash.put(replyWithID, replyWith); if (DEBUG) { System.out.println("Communicator.routeInputMsgs: " + "Setting up routing to a queue for msgs with " + "replyWithID: " + replyWithID); } return replyWithID; } //------------------------------------------------------------- // This last set of methods are for sending msgs. Each method // has a dual listed below it that avoids any blocking or other // delays and overhead associated with the actual socket IO by // queuing the outgoing msg to a central dispatch queue in the // Communicator. // // There will no longer be a sendReplyMsg() method. There is // already a method, buildReplyMsg(), in the InternalMsg interface // that allows one to build a reply msg from a received msg. Thus // to reply to a msg, one would do: // InternalKQMLmessage replyMsg = // (InternalKQMLmessage) msgToReplyTo.buildReplyMsg(performative, // content, // reply-with); // boolean success = sendMsg(cd, replyMsg); // // NOTE: You can of course add more fields to the msg before sending it. // For example: // replyMsg.setField("info", "(special info I want to add)"); //------------------------------------------------------------- /** This method provides a means for the caller to get an instance of an InternalMsg without having to know what the particular msg language format or protocol (eg., KQML) is that the agent has instantiated the Communicator with. This allows the caller's code to be independent (when that is feasible) from any specific msg language when generating a msg to send. This makes the caller's code more portable. */ public InternalMsg createInternalMsg() { return internalMsgInstance.createInternalMsg(); } /** Sends a msg on a Connection. The msgToSend supplied by the caller has been built by the agent with all the necessary information. The infomation about the Connection is obtained through the ConnectionDescriptor and installed in the msgToSend. The caller will have obtained a valid descriptor for the Connection. If msgToSend is invalid or the Connection is dead, return false. The caller can check through the descriptor if the Connection is dead and remove the Connection by closing the descriptor. This method will block on the IO and returns true if msg was successfully sent. */ public boolean sendMsg(ConnectionDescriptor cd, InternalMsg msgToSend) { // Make sure we have a msg and a cd. if ((msgToSend == null) || (cd == null)) return false; // Stuff the msg with the Connection info. stuffMsgWithConnectionInfo(msgToSend, cd); // Reject invalid msg. if (!msgToSend.isValidMsg()) { if (DEBUG) { System.out.println("Communicator.sendMsg: Invalid " + "msg, msgToSend: " + msgToSend.toString()); } return false; } Connection conn = cd.getConnection(); if (conn.isConnectionToClient() && NEED_HACK_FOR_FROZEN_FEB_DEMO) { // HACK: Handle our :reply-with-special field if it is present. // See queueMsgToClientAndConnectProvision(). String specialReplyWithID = msgToSend.getField("reply-with-special"); if (specialReplyWithID != null) msgToSend.setField(InternalMsg.REPLY_WITH, specialReplyWithID); } // Make sure Connection is alive and well. if (!cd.isConnectionAliveAndWell()) { System.out.println("Communicator.sendMsg: " + "Found dead Connection " + conn.getConnectionName() + " trying to send msg: " + msgToSend.toString()); // The caller should close his descriptor and thus his Connection. // For an accepted Connection (one created by an external client), // whoever (should be error_close() from conn.sendMsg() or conn.run() // for a Connection to a client) sets isAliveAndWell false will have // done the close. // conn.close_final(); // Remove the bad Connection. return false; } if (DEBUG) { System.out.println("Communicator.sendMsg: " + "Directly sending msg: " + msgToSend.toString()); } // Do the actual IO to send the msg. if (dispatchOutputMsgs.directSendMsg(conn, msgToSend)) return true; else return false; } /** Sends a msg on a Connection. The msgToSend supplied by the caller has been built by the agent with all the necessary information. The infomation about the Connection is obtained through the ConnectionDescriptor and installed in the msgToSend. The caller will have obtained a valid descriptor for the Connection. If msgToSend is invalid or the Connection is dead, return false. The caller can check through the descriptor if the Connection is dead and remove the Connection by closing the descriptor. This method does not block on the IO, instead depositing the msgToSend in a central dispatch queue where a separate Communicator thread will handle sending it. Returns true if msg was successfully deposited to the outgoing queue. NOTE: Any error sending this msg will not be discovered by the caller until he attempts to send another msg on this same Connection. */ public boolean queueMsg(ConnectionDescriptor cd, InternalMsg msgToSend) { // Make sure we have a msg and a cd. if ((msgToSend == null) || (cd == null)) return false; // Stuff the msg with the Connection info. stuffMsgWithConnectionInfo(msgToSend, cd); // Reject invalid msg. if (!msgToSend.isValidMsg()) { if (DEBUG) { System.out.println("Communicator.queueMsg: Invalid " + "msg, msgToSend: " + msgToSend.toString()); } return false; } Connection conn = cd.getConnection(); if (conn.isConnectionToClient() && NEED_HACK_FOR_FROZEN_FEB_DEMO) { // HACK: Handle our :reply-with-special field if it is present. // See queueMsgToClientAndConnectProvision(). String specialReplyWithID = msgToSend.getField("reply-with-special"); if (specialReplyWithID != null) msgToSend.setField(InternalMsg.REPLY_WITH, specialReplyWithID); } // Make sure Connection is alive and well. if (!cd.isConnectionAliveAndWell()) { System.out.println("Communicator.queueMsg: " + "Found dead Connection " + conn.getConnectionName() + " trying to send msg: " + msgToSend.toString()); // The caller should close his descriptor and thus his Connection. // For an accepted Connection (one created by an external client), // whoever (should be error_close() from conn.sendMsg() or conn.run() // for a Connection to a client) sets isAliveAndWell false will have // done the close. // conn.close_final(); // Remove the bad Connection. return false; } InternalMsg msgToSendCopy = internalMsgInstance.createInternalMsg(msgToSend); if (DEBUG) { System.out.println("Communicator.queueMsg: " + "Queuing to dispatchQueue msg: " + msgToSendCopy.toString()); } // DispatchOutputMsgs thread will handle the actual msg send. dispatchOutputMsgs.addMsgToDispatchQueue(conn, msgToSendCopy); return true; // Return without waiting for the msg to get sent. } /** This method is used to send a msg to and receive a reply from an outside target agent. The communication supported by this method is a single send-reply exchange between this agent and the target agent. The method is synchronous, sending the agent's msg and blocking waiting for the target's reply msg. The msgToSend supplied by the caller has been built by the agent with all the necessary information for the target. The infomation about the Connection is obtained through the ConnectionDescriptor and installed in the msgToSend. The caller will have obtained a valid descriptor for the Connection. If msgToClient is invalid or the Connection is dead, or other IO error occurs, a null msg is returned, otherwise the reply msg is returned. The caller can check through the descriptor if the Connection is dead and remove the Connection by closing the descriptor. */ public InternalMsg sendMsgAndGetReply(ConnectionDescriptor cd, InternalMsg msgToSend) { // Make sure we have a msg and a cd. if ((msgToSend == null) || (cd == null)) return null; // Stuff the msg with the Connection info. stuffMsgWithConnectionInfo(msgToSend, cd); // Now generate a new "reply-with" ID that target agent should use // when responding to the msg we're now sending out to him. ReplyWithObject replyWith = new ReplyWithObject(); String replyWithID = replyWith.getValue(); // Stuff new replyWithID in the msg. msgToSend.setField(InternalMsg.REPLY_WITH, replyWithID); // Reject invalid msg. if (!msgToSend.isValidMsg()) { if (DEBUG) { System.out.println("Communicator.sendMsgAndGetReply: Invalid " + "msg, msgToSend: " + msgToSend.toString()); } return null; } Connection conn = cd.getConnection(); if (conn.isConnectionToClient() && NEED_HACK_FOR_FROZEN_FEB_DEMO) { // I don't think the HACK is relevant here. } // Make sure Connection is alive and well. if (!cd.isConnectionAliveAndWell()) { System.out.println("Communicator.sendMsgAndGetReply: " + "Found dead Connection " + conn.getConnectionName() + " trying to send msg: " + msgToSend.toString()); // The caller should close his descriptor and thus his Connection. // For an accepted Connection (one created by an external client), // whoever (should be error_close() from conn.sendMsg() or conn.run() // for a Connection to a client) sets isAliveAndWell false will have // done the close. // conn.close_final(); // Remove the bad Connection. return null; } // It's a synchronous send, i.e., we will wait here for the target agent // reply to come in. We wait on our ReplyWithObject. When the // target's reply msg comes in, the replyWithID in the msg will be // recognized and result in the reply msg being intercepted and // attached to our ReplyWithObject. A "notify()" on the ReplyWithObject // will wake us up from our wait. // First put our ReplyWithObject in a hash keyed on our unique // replyWithID so it can be looked up when the msg comes in. replyWithHash.put(replyWithID, replyWith); if (DEBUG) { System.out.println("Communicator.sendMsgAndGetReply: " + "Directly sending msg: " + msgToSend.toString()); } // Do the actual IO to send the msg. boolean success = dispatchOutputMsgs.directSendMsg(conn, msgToSend); if (success) { if (DEBUG) { System.out.println("Communicator.sendMsgAndGetReply: " + "Waiting on reply...."); } // Wait here till target's reply msg comes in. (Must use a // synchronized block on the object we will wait on. -dirk 12/8/98) synchronized (replyWith) { while (replyWith.getInternalMsg() == null) { try { replyWith.wait(); } catch (Exception e) {} } } // Remove ourselves from the replyWith hash. replyWithHash.remove(replyWithID); InternalMsg replyMsg = replyWith.getInternalMsg(); if (DEBUG) { System.out.println("Communicator.sendMsgAndGetReply: " + "Got reply: " + replyMsg.toString()); } return replyMsg; } else { if (DEBUG) { System.out.println("Communicator.sendMsgAndGetReply: " + "Error on msg send, aborting."); } // Remove ourselves from the replyWith hash. replyWithHash.remove(replyWithID); return null; } } // This method is used to stuff the Connection info into an InternalMsg, // msgToSend, that will be sent from this agent to another. // protected void stuffMsgWithConnectionInfo(InternalMsg msgToSend, ConnectionDescriptor cd) { msgToSend.setField(InternalMsg.CLIENT_AGENT, cd.getClientName()); msgToSend.setField(InternalMsg.SERVER_AGENT, cd.getServerName()); msgToSend.setField(InternalMsg.LOCALPORT, Integer.toString(cd.getLocalport())); msgToSend.setField(InternalMsg.CONNECTION_NAME, cd.getConnectionName()); } //-------------------------------------------------------------------- // These remaining methods are not being used. They are from a previous // design step in creating the new Communicator when we were taking a // more explicit and restrictive view of the Communicator as providing // for a client/server model of communication. We still acknowledge // that model in as much as connections are created with some implicit // assumptions. In particular, the agent initiating the connection // is treated as the "owner" of the connection. That agent has the // "right" to close the connection while the accepting agent has a // "committment" to support the connection until the "owner" closes // it. The "owner" agent is filling the client role and the agent at // at the other end acts as a server with respect to that connection. // These conventions are observed in the underlying Communicator // implementation that manages connections regardless of whether or // not the actual communication by the agents over the connection is // a client/server type exchange. This is necessary in order to fulfill // committments expected by agents (in particular our Retsina agents) // that need the client/server model to be provided. Thus efforts to // abstract too far away from this in providing a more peer to peer // model are avoided in this design. In particular, a connection that // dies is permitted to be re-established only by the agent that // initiated the connection. The Communicator for the agent at the other // end will not attempt to automatically and seamlessly re-establish // such a connection even if it still has data yet unsent over that // connection. Such restart of the connection will be left to higher // level protocols between the 2 agents and above the level of the // Communicator to re-establish their connection and restart or resume // their unfinished communication. //-------------------------------------------------------------------- //-------------------------------------------------------------------- // The remaaining methods can be deleted at some point. Keep for now // for reference. //-------------------------------------------------------------------- //-------------------------------------------------------------------- // I finally chopped out the old stuff and put in a file, methods.old, // for reference. -dirk 7/17/98 //-------------------------------------------------------------------- } // End of class Communicator