\input epsf
%\input{psadobe}
\makeatletter
\long\def\unmarkedfootnote#1{{\long\def\@makefntext##1{##1}\footnotetext{#1}}}
\makeatother
\documentstyle[fleqn,epsf]{art-nocolon}
%\include{psfig}
%\include{caption}


\textheight =  9.0in
\textwidth = 7.0in              % assume output has this much centered

\topmargin = -.5 in 
\oddsidemargin = -.25 in
\evensidemargin = -.25 in

\renewcommand{\floatpagefraction}{1}
\renewcommand{\topfraction}{1}
\renewcommand{\bottomfraction}{1}
\renewcommand{\textfraction}{0}
\renewcommand{\baselinestretch}{1}
\renewcommand{\arraystretch}{1.5}


\newtheorem{definition}{Definition}
\newtheorem{theorem}{Theorem}
\newtheorem{lemma}{Lemma}
\newcommand{\ignore}[1]{}

\renewcommand{\to}{\rightarrow}
\newcommand{\A}{{\cal A}}
\begin{document}
\bibliographystyle{acm}

%\load{\footnotesize}{\sc}
\title{ 
%\begin{flushleft}
%\vspace{-1.0in}
%\centerline{\scriptsize\em (Extended abstract submitted for PLDI 94)}
%\normalsize { }
%\end{flushleft}
%\vspace{0.4in}
Language and Run--time Support for Network Parallel Computing\\
}
\author{ Peter A. Dinda \and David R. O'Hallaron \and
         Jaspal Subhlok \and Jon A. Webb \and Bwolen Yang}
\date{
   {email:\tt \{pdinda, droh, jass, webb+, bwolen\}@cs.cmu.edu}
      \vspace*{.2in} \\ School of Computer Science \\ Carnegie Mellon
      University \\ Pittsburgh PA 15213}

\maketitle
\thispagestyle{empty}
\abstract{
%
% This still needs some help
%
Network parallel computing is the use of diverse computing resources
interconnected by general purpose networks to run parallel
applications.  This paper describes NetFx, an extension of the Fx
compiler system which uses the Fx model of task parallelism to
distribute and manage computations across the sequential and parallel
machines of a network.  The central problem in compilation for network
parallel computing is that the compiler is presented with an
heterogeneous and dynamic target.  Our approach is based on a novel
run--time system that presents a simple communication interface to the
compiler, yet uses compiler knowledge to customize communication
between tasks executing over the network.  The run--time system is
designed to support complete applications developed with different
compilers and parallel program generators.  It presents a standard
communication interface for point--to--point transfer of distributed
data sets between tasks.  This allows the compiler to be portable,
and enables communication generation without knowledge of exactly how
the tasks will be mapped at run--time and what low level
communication primitives will be used. The compiler also generates a
set of custom routines, called address computation functions, to
translate between different data distributions.  The run--time system
performs the actual communication using a mix of generic and custom
address computation functions depending on run--time parameters like
the type and number of nodes assigned to the communicating tasks and
the data distributions of the variables being communicated.  This
mechanism enables the run--time system to exploit compile--time
optimizations, and enables the compiler to manage foreign tasks that
use non--standard data distributions. We outline several important
applications of network parallel computing and describe the NetFx
programming model and run--time system.

}

%\voffset = -.5 in
\normalsize

\begin{center}
\begin{minipage}[t]{5.5in}
\footnotesize
This research was sponsored in part by the Advanced Research Projects
Agency/CSTO monitored by SPAWAR under contract N00039-93-C-0152, in
part by the National Science Foundation under Grant ASC-9318163, and
in part by a grant from the Intel Corporation.
\end{minipage}
\end{center}

\setcounter{page}{1}
\section{Introduction}\label{intro}

%
% I changed this to include building apps using different languages
% and emphasized the rts -PAD
%
%

Network parallel computing is the use of diverse computing resources
interconnected by general purpose networks to run parallel
applications.  The domain of effective network parallel applications
is rapidly expanding because of considerable improvements in the
latency and bandwidth characteristics of networks such as HiPPI and
ATM.  It is becoming practical to run applications such as scientific
simulations, real-time applications and real-world modeling on network
parallel computing environments.  However, standard parallel languages
like High Performance Fortran (HPF)~\cite{HPFF93} offer little support
for network parallel computing.  Further, multiple languages and
programming paradigms may be necessary to fully realize the potential of
the network parallel computing environment --- yet there is little
support for interoperability in today's parallel languages and
paradigms.  Without such support, the construction of an efficient,
scalable, and environment--independent network parallel application is
a difficult and error--prone task in which the applications programmer
must wear many hats, including that of the network specialist.

The goal of our research is to make it easy to build network parallel
applications that achieve high performance in many different
environments. This paper presents a programming model for network
parallel applications and the design of a compiler and a run--time
system that enables efficient generation and execution of network
parallel applications.

%Our approach is to decompose the application into a set
%of parallel tasks, perhaps written in different languages.  These
%tasks communicate via an extensible, modular run--time system which
%handles the details of moving distributed data sets between tasks.
%The run--time system can be extended by a compiler or programmer to
%improve performance.  Because the run--time system is modular, its
%components can be implemented by specialists to take advantage of
%network and machine characteristics.

Our programming model allows the programmer to express task
parallelism among parallel tasks.  Tasks are invocations of parallel
procedures, called task procedures.  The programmer specifies the
interface of each task procedure, but may implement it in any language.
Tasks are mapped onto the computing environment either by the
compiler or the programmer.  The compiler generates communication
between tasks to satisfy programmer--specified data dependencies.

%
% This is not really right
%
%The programming model we use combines task and data parallelism.
%Procedures in an application are data parallel and each procedure is
%mapped to a homogeneous set of nodes, which may be different nodes of
%a large parallel machine or a cluster of workstations. Different
%procedures are placed on different sets of nodes on the network and
%the compiler manages the communication between them.

The most challenging part of compilation for a network is generating
efficient communication.  In a network parallel computing, multiple
communication mechanisms must be supported for efficient
communication.  For example, crossing a HiPPI network requires
different mechanisms than communicating between the nodes of an MPP.
More important, the network environment is expected to be
dynamic. Run--time load balancing is important even for applications
with a static computation structure since the external load on
different machines can vary.  This means the compiler has limited
information about the run--time environment during code generation.

For generating efficient and portable communication in a dynamic
run--time environment, it is important to distinguish between the
aspects of the communication that are constant for the duration of the
execution and those that are varying and may be known only at
run--time. The communication steps between tasks are a property of the
program and are determined at compile time, while the actual
low--level communication calls may depend on the run--time
environment.

Our solution to generating efficient communication involves defining
an inter--task communication interface. This interface provides
mechanisms for transferring data between tasks that are mapped on
groups of nodes. The compiler generates code for this communication
interface and the run--time system ensures that appropriate
communication calls are generated. The compiled program is portable
across the machines and network protocols supported by the run--time
system. 

The key new idea is that the interface decouples the compilation from
generation of low level communication, which is essential for a
network environment, yet it provides a mechanism for the compiler to
customize or extend the behavior of the run--time communication
system. This mechanism allows the run--time system to exploit
compile--time communication optimizations, and allows the run--time
system to be painlessly extended to support new kinds of data
distributions.  The run--time system itself is not tied to the
compiler, but may be used by other clients.

We are extending the Fx compiler system~\cite{GrOS94} to support the
programming model and the run--time system.  Fx already extends a data
parallel language based on HPF with task parallelism.  The extended
version of Fx is called NetFx.  We begin by motivating the need for
network parallel computing with a set of example applications and then
describe the NetFx language extensions, compiler strategy and the
extensible run--time system.

\section{Motivation for network parallel computing}

The components of a large application typically have different
requirements for resources like I/O devices, sensors, processing
power, communication bandwidth, memory, and software tools.  Further,
each component may be best expressed in a different language or
paradigm.  Attempting to satisfy all of these resource requirements in
a single computing platform utilizing a single language can be
impractical or even impossible.  An attractive alternative is to build
each component of the application using the appropriate language or
programming paradigm and run it on the appropriate platform using high
speed networks to communicate with other components. We refer to this
type of computation as {\em network parallel computation}.

This section illustrates the motivation for network parallel computing
with four applications: (1) a {\em real--world modeling} application
where inputs are acquired from a camera system, processed on a large
parallel system, and displayed remotely, (2) an {\em air quality
modeling} application where different steps of the computation have
different processing and communication requirements, (3) an {\em
earthquake ground motion modeling} application where the results are
computed on a large parallel system and visualized on a graphics
workstation, and (4) a {\em real--time embedded system} constructed
from commercially--available rack--mounted array processor boards.

\subsection{Real--world modeling}

High performance computers and their programming tools have
traditionally been applied to problems in scientific modeling,
including simulation analysis. A group of researchers at Carnegie
Mellon is applying these same systems and tools to problems in {\em
real--world modeling}, which means three--dimensional shape modeling
of objects and environments in the real world by direct observations
using computer vision techniques. Success of this approach could lead
to, for example, replacement of complex and specialized sensory
systems based on laser range--finding technology by much simpler, more
flexible and scalable, but computationally more expensive, systems
like stereo vision.  In real--world modeling applications the sensor
system plays a key role. In some situations (e.g., imaging moving
objects) it is necessary to capture imagery at video rate (30 Hz) or
faster.  Capturing data at this high rate is complicated by the need
to synchronously access several cameras\footnote{In recent past, the
most significant advances in stereo vision accuracy and reliability
have come from the introduction of multiple cameras.}.  In other
situations (e.g., imaging building interiors) speed of image capture
is not so important but the imaging system must be portable.

These conflicting requirements can be met through the
application of modern networking technology. The system we envision is
illustrated in Figure \ref{fig:netargos}. 
\begin{figure}[htb]
\centerline{\epsfxsize=3in \epsfbox{netargos.eps}}
\caption{Real-world modeling system}
\label{fig:netargos}
\end{figure}
The camera system is a detachable unit, and can be attached either to
a large number of workstations, giving a high frame capture rate, or
to a smaller number of portable workstations or PCs, giving
portability at the expense of speed. Once the images are captured,
they are transformed into stereo vision data through the application
of parallel stereo vision programmed in Fx, which can then be executed
on the workstations themselves or on powerful tightly--coupled high
performance computers.  The resulting range imagery is then
transformed into a three--dimensional model, which is another Fx
program, and the resulting model can then be stored to disk or
displayed remotely. NetFx implementation will allow us to program the
entire framework including communication across the network in a
uniform portable manner.

\subsection{Air quality modeling}

As part of a Grand Challenge grant on heterogeneous computing for
large scale environmental modeling, researchers at Carnegie Mellon
have adapted the airshed environmental modeling application~\cite{McRH92}
to run on parallel systems.  The application consists of repeated
application of three data parallel steps.  The first step, reading
input data from storage and preprocessing it, is I/O intensive.  The
second step, computing the interaction of different chemical species
in the atmosphere, exhibits massive parallelism and little
communication.  The third step, computing the wind--blown transport of
chemical species, has significant parallelism and communication.  Each
of these steps is suited to a different class of machine.  For
example, an Fx implementation of the chemistry step runs as fast on
four Ethernet--connected DEC Alpha workstations as on 32 nodes of an
Intel Paragon since the relatively slow communication is not an
important factor, but the transport step is expected to be more
sensitive to the parameters of the communication system.  Further, the
input step is best run concurrently with the other stages.  A NetFx
implementation will allow us to use a single high level program for
the entire application and the ability to use different mappings for
execution.


\subsection{Earthquake ground motion modeling}

As part of a Grand Challenge grant on earthquake modeling, researchers
at Carnegie Mellon are developing techniques for predicting the motion
of the ground during strong earthquakes. The computation consists of
two distinct steps: simulation and visualization.  The simulation step
uses a large, sparse, and irregular finite element model to generate a
sequence of displacement vectors, one vector per time step. The
visualization step manipulates the displacement vectors and produces a
graphical display.  The simulation step requires access to large
amounts of memory, computational power, and low--latency
high--bandwidth communications, while the visualization step requires
access to a bit--mapped display and, for the more complex 3D models, a
graphics accelerator. The natural partitioning of this application is
to run the simulation step on a large tightly--coupled parallel
system, and to run the visualization step on a graphics workstation.

\subsection{Real--time embedded systems}

Economic realities have created a pressing need for embedded systems
that are built with commercial off--the--shelf components.  The basic
building block is typically a commercial array processor board that
consists of a fixed number of processors connected by a fast onboard
switch. For larger systems, multiple boards are mounted in a rack and
connected by a bus or ring structure.  For still larger systems,
multiple racks are connected by a commercial network like FDDI or ATM.

These systems are used for real--time applications like sonar and
radar, where the processing typically consists of reading from one or
more sensors, and then manipulating the inputs in a sequence of
stages. Each stage must be assigned enough processing resources to
match the throughput and latency requirements of the overall system.
An application that needs multiple racks to meet its real--time
requirements will of necessity be a network parallel computation.


\section{Our approach to network parallel computing}
%
% Note - Network Parallel Computing Enivronment means the computation
%        resources available, NOT the development environment
%

%
% Give an overview of our approach
%
\begin{figure}
\epsfxsize = 5.0 in
\centerline{\epsfbox{Overview.eps}}
\caption{Compilers and Run--time system for Network Parallel Computing}
\label{fig:overview}
\end{figure}

This section describes our approach to network parallel computing.
Our ultimate goal is a compiler and run--time system that makes it
possible to use multiple parallel languages and paradigms to
effectively utilize the heterogeneous network parallel computing
environment.  The model of a network parallel application is a set of
communicating tasks.  Tasks are invocations of parallel task
procedures, which may be written in any language.  The data
communicated between tasks is in the form of distributed data sets.

Figure~\ref{fig:overview} shows how a network parallel application is
built.  First, appropriate compilers and programming tools are used to
generate task procedures and interoperability information. Each of
these tools can also generate routines for a custom distribution
management library, which will be discussed later. In addition to the
task procedures, a tasking program, which describes how the tasks
interact, is written.  A special tool, the network task parallel
compiler, combines the tasking program, task interoperability
information, and information about the computing environment to
produce a task management program, which is then linked to the task
procedures, the custom distribution management library, a standard
library, and the run--time system.  The task management program
orchestrates the different tasks of the application, and performs
inter--task communication on their behalf by calling the run--time
system.

Because both the distribution of data within a task and the mapping of
tasks to nodes can be dynamic, much of the work involved in
communicating distributed data sets between tasks is done by the
run--time system, which presents the abstraction of a task--to--task
communication of distributed data sets to the task management program.
A major part of communicating distributed data sets is mapping from
one distribution to another.  While the standard library provides many
methods for doing this, the run--time system can also exploit
compile--time knowledge in the form of the custom distribution
management library.  This mechanism also allows the run--time system
to be extended to support new kinds of data distributions.  The
run--time system is also responsible for global load balancing and
miscellaneous utilities.  The extensible run--time system for network
parallel computing is the main contribution of this paper.

The run--time system is currently being used to support the NetFx
compiler system, as shown in Figure~\ref{fig:NetFx}.  The NetFx
compiler can map data parallel Fx tasks onto different sequential and
parallel machines over a network.  In future, we plan to extend our
programming model and demonstrate that the run--time system can be
used to develop applications using different parallel program
generators.

\begin{figure}
\epsfxsize=3.0in
\centerline{\epsfbox{NetFx.eps}}
\caption{Structure of NetFx}
\label{fig:NetFx}
\end{figure}


\section{Programming model for NetFx}
The current NetFx compiler uses the basic Fx programming model of
integrated task and data parallelism~\cite{GrOS94,SSOG93} .  The main
extension is that a data parallel task can be mapped to any sequential
and parallel machines on a network.  Support for data parallelism is
similar to that provided in High Performance Fortran (HPF) and is
described in~\cite{StOG94,YWSO93}.  Task parallelism is used to map
data parallel tasks onto different, possibly heterogeneous, groups of
nodes.  We briefly outline how task parallelism is expressed in NetFx
and illustrate how it is used to exploit coarse grain parallelism.

\subsection{Task parallelism}
Task parallelism is expressed solely using compiler directives and no
new language extensions are introduced.  To simplify the
implementation, the current version of Fx relies on the user to
identify the side effects of the task procedures and to specify
them. Directives are also used to guide the compiler in mapping the
tasks, which is important for performance and also to ensure that
tasks execute on computers for which the compiled versions are
available.  We describe the directives that are used to express task
parallelism and illustrate their use.

\subsubsection*{Parallel sections}
Calls to task procedures are permitted only in special code regions
called {\em parallel sections}, denoted by a {\tt begin parallel/end
parallel} pair. For example, the parallel section for a 2D FFT program
has the following form:
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
c\$ \>{\bf begin parallel} \\
\>     do i = 1,m \\
\>\>     call colffts(A)\\
c\$\>\>  {\em input/output and mapping directives}\\ \vspace{3mm}
\>\>     call rowffts(A)\\
c\$\>\>  {\em input/output and mapping directives}\\ \vspace{3mm}
\>     enddo\\
c\$\>   {\bf end parallel}
\end{tabbing}
\rm
The code inside a parallel section can only contain loops and
task procedure calls.  Every call to a task procedure inside a parallel
section corresponds to a data parallel {\em task} that can execute in
parallel with other tasks subject to dependence constraints. Outside
the parallel section, the program executes as a single data parallel
thread.  The effect of executing a parallel section is the same as if
its contents were executed sequentially.

\subsubsection*{Input/output directives}

The user includes {\tt input} and {\tt output} directives to define
the side--effects of executing a task procedure, i.e., the data space
that the procedure accesses and modifies.

For example, the input and output directives for the call to {\tt
rowffts} have the form:
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
\>\>     call rowffts(A)\\
c\$\>\>  {\bf input} (A),  {\bf output} (A)\\
c\$\>\>  {\em mapping directives}\\
\end{tabbing}
\rm
This tells the compiler that the task procedure {\tt rowffts} can
potentially use values of, and write to the parameter array {\tt A}.
As another example, the input and output directives for the call
to {\tt colffts} has the form:
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
\>\>     call colffts(A)\\
c\$\>\>  {\bf output} (A)\\
c\$\>\>  {\em mapping directives}\\
\end{tabbing}
\rm
This tells the compiler that the task procedure {\tt colffts} does not
use the value of any parameter that is passed but can potentially
write to array {\tt A}

\subsubsection*{Mapping directives}
%
% This is sorely lacking in detail - again, because we don't know what we 
% need.
%
%
Labeling task procedure calls and supplying input/output information
is sufficient for the compiler to generate a correct parallel program.
However, the performance of the program largely depends on how the
tasks are mapped onto the available computation resources on the
network. The possible mappings are constrained by the availability of
data parallel implementations of different task procedures for
different architectures, the memory requirements of the task
procedures, and the physical locations of devices like frame buffers.
Ideally NetFx should be able to automatically map the program for
correct execution and best possible performance, but that is a hard
problem and NetFx is not able to do it except in some relatively
simple situations~\cite{SuVo95}. In general, the programmer has to
supply the mapping information.

The mapping information specifies where each task executes. Mapping
information is given for each call statement in a parallel section.
Thus all executions of a particular task procedure call statement
(i.e., each task associated with the statement) are mapped in the same
way.  Currently, the mapping information is static and all mapping is
done at compile--time.  However, the run--time system is designed to
support dynamic task mapping as well.  The details of the mapping
directives are somewhat cumbersome and machine specific and we will
not describe them here.  For example, when mapping a task to a network
of workstations, it may be necessary to specify the names of the
workstations to be used as a node, but when mapping to a part of a
massively parallel machine like the Intel Paragon, it may be
sufficient to specify the number of nodes to be used. For the purpose
of this paper, we will assume that a mechanism for mapping any task to
any group of available processors exists.

\subsection{Programming coarse grain parallelism}
NetFx allows the programmer to use fine grain data parallelism as well
as coarse grain task parallelism. We illustrate how the task
parallelism directives described above are used to exploit common
forms of coarse grain parallelism.

\subsubsection*{Task pipelines}
In image and signal processing programs, the main computation often consists
of repeatedly executing an acyclic task graph. A simple example of such
a program is the  FFT--Hist example program, which consists of three pipelined
The task parallel section of this example is as follows: 
%
% I patched it so it looks like it does something....
%
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
c\$ \>{\bf begin parallel} \\
\>     do i = 1,m \\
\>\>     call colffts(A)\\
c\$\>\>  {\bf output} (A)\\
\>\>     call rowffts(A)\\
c\$\>\>  {\bf input} (A),  {\bf output} (A)\\
\>\>     call hist(A)\\
c\$\>\>  {\bf input} (A)\\
\>     enddo\\
c\$\>   {\bf end parallel}
\end{tabbing}
\rm

In FFT--Hist, all data dependences in the program are forward and loop 
independent.  The task procedure {\em colffts}, reads a data set from an 
input device, computes, and sends the output data set to {\em rowffts}, 
which receives the data set, computes, and sends the output data set to 
{\em hist}, which computes the final output data set.  This allows an 
earlier stage, say {\em colffts} to process a new data set while {\em 
rowffts} is processing the previous data set, allowing coarse grain 
pipeline parallelism as shown in Figure~\ref{fig:pipe}.
\begin{figure}
\epsfxsize = 3.5 in
\centerline{\epsfbox{pipe.eps}}
\caption{Dependences and coarse grain parallelism in FFT-Hist}
\label{fig:pipe}
\end{figure}
In related work, we have argued that such parallelism can be exploited
efficiently and profitably for many applications including narrowband
tracking radar and multibaseline stereo~\cite{SOGD94}.

\subsubsection*{Communicating tasks}
In many scientific applications, different procedures can execute in
parallel, but need to transfer or exchange data between invocations.
For example, two routines {\em fproca} and {\em fprocb} may work on
different parts of a data structure and exchange boundary elements
between successive invocations. A task parallel program that expresses
this computation is as follows:

\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
c\$ \>{\bf begin parallel} \\
\>     do i = 1,m \\
\>\>     call fproca(A, boundA, boundBx)\\
c\$\>\>  {\bf input} (boundBx), {\bf output} (boundA)\\
\>\>     call fprocb(B, boundB, boundAx)\\
c\$\>\>  {\bf input} (boundAx),  {\bf input} (boundB)\\
\>\>     call ftransfer(boundA, boundB, boundAx, boundBx)\\
c\$\>\>  {\bf input} (boundA, boundB) {\bf output} (boundAx, boundBx)\\
\>     enddo\\
c\$\>   {\bf end parallel}
\end{tabbing}
\rm

We show the input/output directives only for the boundary elements for
the purpose of illustration.  The task dependence graph for this
computation is shown in Figure~\ref{fig:par}.  There is no direct
dependence between task procedures {\em fproca} and {\em fprocb}, but
they have a loop carried dependence on {\em ftransfer} due to
variables {\em boundAx} and {\em boundBx}, respectively. Task
procedure {\em ftransfer} has loop independent dependences on {\em
fproca} and {\em fprocb} due to variables {\em boundA} and {\em
boundB}. The iterations of {\em fprocA} and {\em fprocB} execute in
parallel and effectively exchange boundary elements using
task procedure {\em ftransfer}. The computation schedule is
illustrated in Figure~\ref{fig:par}.  Note that the results obtained
by task parallel execution are consistent with sequential execution.
\begin{figure}[htb]
\epsfxsize = 3.5 in
\centerline{\epsfbox{par.eps}}
\caption{Dependences and coarse grain parallelism for a program
         with  data exchange between parallel iterations}
\label{fig:par}
\end{figure}

\subsection{Compilation}
The compiler uses a data flow algorithm to analyze the dependences
caused by input/output variables and precisely determines the data
movement between tasks. The basic paradigm is that the results
obtained must always be consistent with sequential execution.  It then
uses the mapping information to  map each call statement to a
task procedure to a set of nodes.  At run--time, executing the call
statement amounts to receiving input data sets, executing the task
procedure (i.e., invoking a task), and sending output data sets.
These steps are identical for compiling for a single massively
parallel machine (as in Fx) or for a heterogeneous network (as in
NetFx).

Currently, NetFx operates on the same assumptions as Fx: the placement
of all tasks and the distribution of data inside them is fixed and
known at compile time, the characteristics of the communication
network are fixed and known at compile time, as the network is not
shared with other applications.  Ultimately, NetFx will deal with the
realities of network parallel computing: the network is inherently a
shared resource whose usage pattern changes dynamically, the mapping
of the tasks can change at run--time, and data distributions may not
be known at compile time.  This dynamicity means the NetFx compiler
does not have all the information to generate optimal inter--task
communication.  Instead, communication is managed jointly by the
compiler and the run--time system.  The compiler generates calls to
the run--time system for communication between tasks, and may also
generate address computation functions that the run--time system uses
to translate between data distributions.  The details of the
communication are managed by the run--time system, and are discussed
in the next section.  


\section{Run--time support for task parallelism}

The main function of the run--time system is the efficient transfer of
distributed data sets between tasks with few constraints on
processors, networks, languages, tasks, or data.  In this section, we
first describe the design challenges of the run--time system and our
approach to meeting them.  Next, we explain the details of inter--task
communication, and the programming interface exported to the compiler.
This is followed by a discussion of the internal structure of the
run--time system.  Finally, we give an example of how an inter--task
communication step is accomplished using the run--time system.

\subsection{Design challenges}
\label{sec:deschal}

The run--time system is designed for very general network computing,
although it is presently being used mainly to manage data parallel
NetFx tasks on a network. To meet this goal, several design
challenges must be overcome.  These include multiple languages and
programming paradigms, dynamic program and data mappings, and
processor and network heterogeneity.  Each of these challenges is
discussed separately.

\subsubsection*{Multiple languages and programming paradigms}

Each task procedure in the program may be written in a different
parallel language or paradigm. There are two major motivations for
this.  First, the heterogeneity of the network parallel computing
environment means that no one language or paradigm is ``best.''  For
example, the environment may include both shared memory and
distributed memory computers.  The second motivation is
multidisciplinary applications.  Each component of a multidisciplinary
application may be best expressed using a particular language or
paradigm.  For example, one component may require the sparse matrix
support available in Archimedes~\cite{ARCHIMEDES}, while another may
require the fast dense matrix support available in Fx, and a third may
be best expressed with nested data parallelism in NESL~\cite{NESL}.
%
% Maybe a more concrete example of a multidisc. app would be good.
% Quake?
% Yes, Quake will be good here
% 

Regardless of what language or paradigm is used to implement the
tasks, it should always be possible to transfer data between two tasks
as long as the {\em abstract} data types are compatible.  For example,
it should be possible to send a vector from an Fx task, where it is
implemented as a set of local Fortran subvectors distributed over the
processors, to a DOME~\cite{DOME} task, where it is implemented as a
distributed C++ object.  The challenge is to translate between the two
implementations.  In general, if there are $n$ implementations of an
abstract data type, there are $n^2$ combinations of implementations
that must be supported.  Furthermore, it should be easy to extend the
range of implementations.


\subsubsection*{Dynamic program and data mappings}

In many languages and paradigms, the distribution of data can change
dynamically. For example, DOME load balances vectors by shifting
elements between processors.  At another level, the assignment of
processors to tasks may change due to global load balancing or to
support a run--time automatic mapping tool.  The run--time system must
support data transfer under these conditions.


\subsubsection*{Processor and network heterogeneity}

Processor and network heterogeneity make efficient communication
difficult.  There are several reasons for this.  First, processor
heterogeneity may require that the data representation be changed.
Communication systems such as PVM~\cite{PVMBOOK} deal with this by
converting to and from the XDR format, but this usually means
additional copy steps.  The second reason is that routing is
complicated by an essentially arbitrary topology and many different
link types and access mechanisms.  A common solution is to rely on
internet routing~\cite{IPROUTE}, but this mechanism cannot make use of
the parallel nature of inter--task communications because there is
no notion of a parallel connection in TCP/IP.  Furthermore, the TCP/IP
stack for a high speed network adaptor may be significantly less
efficient than the native stack.  Finally, MPPs typically do not
support TCP/IP internally, resulting in a store--and--forward stop at
an interface node, which increases latency.


\subsection{Meeting the design challenges}

Our approach to these design challenges described in has three
 components: a standard run--time system interface, run--time system
 extensibility, and run--time system modularity.

\subsubsection*{Standard run--time system interface}

The run--time system exports a standard interface to the program
generator.  The main abstraction is that of communicating distributed
data sets between tasks mapped to groups of nodes.


\subsubsection*{Run--time system extensibility}

To support multiple languages and paradigms, we provide a standard
method for a compiler or programmer to extend the run--time system to
support new combinations of data distributions.  This is similar to
how a generic sort routine allows an application to pass in
\verb.compare.  and \verb.swap. functions for a particular data type, but
on a larger scale.  This mechanism also provides a way for compile--time
knowledge to be used by the run--time system for better performance.
 

\subsubsection*{Run--time system modularity}

The run--time system is composed of three subsystems with clearly
defined interfaces between them. This allows each subsystem to be
developed and optimized independent of the others, and reduces overall
complexity.  For example, the communication subsystem, which actually
performs parallel data transfers, can be developed independently by
the network specialists since it is related to the other subsystems
only through a well defined interface.  Similarly, the development of
distribution mapping methods is simplified because it is separated
from actual data transfer.


% The idea is that
%the communication subsystem, which actually performs (parallel) data
%transfers, is supplied with client subsystems that handle the details
%of mapping between data distributions and dynamic program and data
%mappings.  This segregation of functionality opens the development of
%communication subsystems to the networking community.  Conversely, the
%development of distribution mapping methods is simplified because 
%communication is not part of the picture.  


\subsection{Inter--task communication details}

The goal of inter--task communication is to transport a distributed
data set from a source task to a destination task.  The elements of
the data set are partitioned across the processes of a task according
to some rule.  This rule is identified by the data set's distribution
type and distribution.  A {\em distribution type} identifies a class
of similar rules, while a {\em distribution} identifies a specific
member of the class.  For example, the distribution type of
$HPF\_ARRAY$ includes the distribution $(*,BLOCK)$.  The run--time
system assumes that the distribution type of the data set on each task
is constant throughout the execution of the program, but its
distribution may change.  If the distribution can change, we say that
its {\em binding} is {\em dynamic}, otherwise it is {\em static}.
% The initial value of both kinds of bindings are
%supplied at compile time.

To send a data set from a source task to a destination task, it is
necessary to be able to translate between the distributions at the two
end--points.  It is only possible to translate between distributions
whose abstract data types are compatible.  For example, if the source
abstract data type is an array, then the destination abstract data
type must be an array of the same size and shape.  The two end--point
distribution types define a method for translating between the two
member distributions. This method is called an {\em address
computation function} and the process of executing it is {\em address
computation}.  Given two distributions of the appropriate type and a
process pair, the address computation function returns an {\em offset
relation} between offsets into the chunk of elements owned by each
process, for corresponding global array elements.  This
architecture--independent relation can be trivially converted to an
architecture--dependent {\em address relation}, which maps memory
addresses on one machine to memory addresses on the other.

The address computation function operates under the abstraction that a
data set's elements are distributed over a consecutively numbered
group of processes of the task.  Once an address relation has been
computed for each pair of processes, the abstract process numbers must
be mapped to the actual nodes of the machines on the network.  The
properties of these task bindings are similar to those of the
distribution bindings discussed above.  Specifically, the bindings may
be static or dynamic.  Once the mappings are known, actual data
transfer can begin.


\subsection{Run--time system interface}
This section describes the programming interface between a program
generator and the run--time system.  It is a two way interface: the
program generator emits calls to the run--time system, and the
run--time system may itself call procedures produced by the program
generator. The interface has three components as shown in Figure
\ref{fig:interface}.  The first two of these are exported by the run--time
system, while the third is exported by the program itself.

\subsubsection*{Inter--task communication interface}
This interface consists of simple, high--level, non--blocking
collective send and receive calls for transporting whole distributed
data sets between tasks.  It is required that if more than one data
set is transferred between a pair of tasks, the order of the sends
and receives on the tasks be the same.  The arguments of the send
and receive calls are
\begin{itemize}
\item Target (send) or source (receive) task 
\item Address of first data set element local to the calling process
\item Data type of each element
\item Sender distribution type and distribution
\item Receiver distribution type and distribution
\item Hint (implementation specific)
\end{itemize}
There is a single call to wait for all outstanding receives to
complete, which allows a task to complete input communication before
executing.
%I though this was implicit...

\subsubsection*{Registry interface}

Before performing sends and receives, the program must describe the
characteristics of relevant tasks, distribution types, and custom
address computation functions to the run--time system using the
registry interface.  The program explicitly registers the initial
task mapping and whether it is static or dynamic.  Registration of a
distribution type is required if the distribution type is not
library-supported, or if the distribution is dynamic.  Dynamic
bindings are updated implicitly by examining send and receive calls.
There are several reasons why a program may want to register its own
distribution types.  One reason is extensibility --- the distribution
type may not be supported by the library.  Another is efficiency ---
the compiler may be able to generate a specialized address computation
function for a particular communication.  In either case, the program
must also register custom address computation functions for any
communication that involves the new distribution type.  For example,
suppose task 1 communicates from a new distribution type $DTX$ to
predefined types $DT2$ on task 2 and $DT3$ on task 3.  All three
tasks must register distribution type $DTX$.  Task 1 must register
new address computation functions $\lambda (DTX,DT2)$ and $\lambda
(DTX,DT3)$, task 2 must register $\lambda (DTX,DT2)$, and task 3
must register $\lambda (DTX,DT3)$.

\subsubsection*{Standard address computation function interface}
All address computation functions must conform to this interface.
Each address computation function must provide two functionalities.
First, it must be able to generate an offset relation between two
processors.  Additionally, it should be able to directly assemble and
disassemble messages between two processors.

\subsection{Internal structure}

\begin{figure}
\centerline{\epsfxsize=4in \epsfbox{Int1.eps}}
\caption{The run--time system: subsystems, interfaces, and
relation to the NetFx Executable}
\label{fig:interface}
\end{figure}

%\caption[]{he  run--time system: subsystems, interfaces, and 
%relation to the  NetFx Executable.  Each arrow represents an interface 
%and points from the caller of the interface to the callee.  
%Note that some callees have several callers.}

The run--time system presents the abstraction of inter--task
communication of a distributed data set between uniquely labeled
tasks.  Such communication requires task and data distribution
binding, to establish the current state of the computation, address
computation, to determine what data to send and where to send it, and
efficient low level communication over heterogeneous networks.  These
tasks are segregated into three subsystems (binding, distribution, and
communication) with well defined interfaces between them.  The
intention is to decouple the implementations of the subsystems so that
each can be specialized for particular network computing environments.
For example, we would like to encourage the development of
communication subsystems by members of the networking community.  By
encapsulating address computation and binding for them, we greatly
simplify their task.  The relationships of the subsystems are shown in
Figure \ref{fig:interface} and discussed below.


\subsubsection*{Communication subsystem}
The communication subsystem is the heart of the run--time system and
exports the previously discussed inter--task communication interface
to the program.  The subsystem drives the run--time system by making
use of the binding subsystem to resolve task and distribution
bindings and the distribution subsystem to compute address relations.
In return, it supplies a simple {\em binding communication interface}
to the binding subsystem.  The communication subsystem performs all
actual data transfer.

\subsubsection*{Binding subsystem}
The binding subsystem resolves task, distribution, and address
computation function bindings and supplies them to the communication
subsystem and the distribution subsystem through the {\em binding
interface.}  It uses the communication subsystem's binding
communication interface if it needs to perform an inter--task
communication step to to determine a binding (for example, to bind the
distribution of an irregular data structure).  The binding subsystem
supplies the registry interface to the executable.

\subsubsection*{Distribution subsystem}
The distribution subsystem translates between sender and receiver
distributions.  It supplies the {\em address computation interface} to
the communication system for this purpose.  The binding subsystem is
used to bind two distribution types and distributions to the
appropriate address computation function.  This function may be one of
the {\em library address computation functions} linked to every NetFx
Executable, or a compiler--generated {\em custom address computation
function}.  In every case, the address computation function must
conform to the standard address computation function interface
discussed earlier.  After finding the appropriate address computation
function, it dispatches it, and conforms the resulting offset relation
to the format desired by the communication subsystem and converts it
to an address relation.  (The communication system can request that a
message be assembled or disassembled instead.)

\subsection{Example}

\begin{figure}
        \centerline{\epsfxsize=4in \epsfbox{useint.eps}}
	\caption{Example of the steps followed for an inter--task send using a 
	custom address computation function.}
	\protect\label{fig:steps}
\end{figure}

We illustrate how an inter--task send using a custom address
computation function is performed.  Figure \ref{fig:steps} shows the
steps graphically.  On the right are the actual calls performed by
the node program and those performed internally for the inter--task
send.  The calls are numbered to correspond to the interfaces used and
to the steps discussed in this section.  There is a dependence from
task $T1$ to task $T2$ and the compiler has generated a send from $T1$
to $T2$ (and a matching receive in $T2$, which is not discussed here.)
The compiler also produced a custom address computation function
$ADXFUNC$ for this communication.  At run--time, $T1$'s first step is
to use the registry interface to register the initial values of all
relevant bindings.  The task bindings for $T1$ and $T2$ are registered
as static, as are the distribution types $DT1$ and $DT2$ .  Finally,
$T1$ registers $ADXFUNC$ as the address computation function for $DT1$
and $DT2$.

The second step (which may be repeated many times) is to invoke a send
of the distributed data set $A$ to $T2$.  The inter--task send call
includes the target task number, $T2$, the first element of $A$ that
is local to the process, the data type of $A$'s elements (REAL), the
sender and receiver distribution types, $DT1$ and $DT2$, and the
current sender and receiver distributions, $DISTA1$ and $DISTA2$.  The
third step is to bind the two tasks to their current physical
mappings, and the distribution types and distributions to their
current distributions.  Both of these steps are simple in this case
because the bindings are static --- they amount to local lookups.  In
the fourth step, the communication subsystem calls the distribution
subsystem to build an address relation between each pair of sender and
receiver processes $P1$ and $P2$.  The distribution subsystem
accomplishes this by binding the two distribution types, $DT1$ and
$DT2$, to their corresponding address computation function, $ADXFUNC$
(step 5) and calling the function for each pair (step 6).  The offset
relations returned are conformed to the format the communication
subsystem wanted, and converted to address relations.  Finally, the
relations are used to perform the actual communication in a
environment--dependent manner.  The communication subsystem may decide
to cache the address relations to avoid steps 4--6 the next time that
inter--task send occurs.


\section{Status and implementation}

The Fx compiler currently supports task parallelism on the Intel iWarp, 
Intel Paragon, and on DEC Alpha workstations running PVM.  Compiler 
analysis of parallel sections (although not mapping directive support or 
code emission) is the same for each target.  The run--time system used on 
the Paragon and the Alphas was initially developed for the Alphas with an 
eye towards portability and serves as a prototype for the system discussed 
above.  The prototype essentially has three components: a component that 
computes offset relations for any pair of (static) HPF distributions, a 
component that conforms these relations to a particular format and caches 
them, and a component that does communication using the relations.  Porting 
this run--time system to the Paragon (and achieving good performance) 
required rewriting only the communication component.  Further details of 
the library are available in~\cite{DiOh95}.  

The main target network for NetFx is Gigabit Nectar~\cite{ABCK89}
developed at Carnegie Mellon. A variety of communication interfaces
have been developed for Nectar and we are incorporating them in the
run--time system. Currently we are able to execute homogeneous Fx
programs across Alpha workstations using Nectar as well as simple
heterogeneous Fx programs on Alphas and Suns.  The full heterogeneous
run--time system described in the paper is currently under
development.


\section{Related work}

High level run--time support for communicating data parallel tasks is
a fairly new research area.  Most comparable to our work is the
Schooner system~\cite{SCHOONER-JOURNAL}.  Schooner combines a type
specification language, a heterogeneous RPC mechanism, and a run--time
system to permit procedures written in arbitrary languages to interact
in a heterogeneous machine environment.  A parallel procedures must be
encapsulated in a sequential procedure, which acts as a proxy for
for communication.  Because Schooner is oblivious to distributed data
sets, it can not perform parallel data transfers like our system.
Another difference is that the Schooner run--time system is not
extensible and so cannot take advantage of compiler knowledge.

The goal of Converse~\cite{CONVERSE} is to allow tasks written in
different parallel languages to be linked together.  Because each
language is likely to have its own run--time system, conflicts are
likely to occur.  The Converse solution is to extract essential
aspects of run--time support into a common core.  Run--time systems
implemented on top of Converse can then coexist.  The core
functionality is based on message--driven execution: run--time systems
register handlers that are executed when properly tagged messages
arrive. A well defined machine interface layer makes Converse itself
highly portable.  Compared to our system, Converse has no notion of
data distribution and the possibility for parallel data transfer it
implies, nor does it support heterogeneous machines environments.  
We feel that Converse and our system are orthogonal.  

Opus~\cite{HHMV95} supports communication between tasks using {\em
shared data abstractions} (SDAs).  An SDA is essentially an
independent object class whose private data is distributed over a set
of threads and whose methods are protected with monitor semantics.
For example, one could define a queue SDA whose elements are
block--distributed vectors.  To send a vector to another task, it is
first sent to an SDA queue object (which requires an address
computation) using the enqueue method.  The receiving task would
invoke the dequeue method which would cause the SDA queue object to
send a data set (again requiring address computation computation).
SDAs can support more general tasking paradigms than our scheme, but
appear to significantly increase the communication volume and result
in unnecessary synchronization between tasks due to monitor semantics.

The library described in~\cite{ACFK95} supports communication between
data parallel tasks with collective sends and receives.  Sends (and
receives) are split between an initialization step which performs
address computation and builds communication schedules, and an
execution step that actually performs the send. An initialization step
synchronizes sending and receiving tasks, and can be amortized over
many executions.  Only static task bindings and static HPF
distributions are supported.  All address computation is done with
general purpose library routines.

\section{Summary and conclusions}
We address programming support for high performance networks which are
emerging as an increasingly important form of parallel computing.  We
have presented NetFx, a compiler system that supports network parallel
computing.  The NetFx programming model is based on integrated support
for task and data parallelism originally developed for the Fx compiler
system.  The NetFx run--time system presents a simple abstraction for
task-to-task transfer of distributed data sets so that the
compiler does not have to deal with the dynamic nature of the network
computing environment.  The run--time system allows the compiler and
other tools that generate data parallel tasks to customize execution
behavior.  A clean interface between the compiler and the run--time
system also allows independent development and retargetting of both.
We believe that NetFx is the first tool to provide practical
programming support for networks and will make network computing
easier for non-specialists.


\bibliography{/afs/cs/project/iwarp/member/jass/bib/all,/afs/cs/project/iwarp/member/fx/bib/texbib}
\end{document}
