\input epsf
%\input{psadobe}
\makeatletter
\long\def\unmarkedfootnote#1{{\long\def\@makefntext##1{##1}\footnotetext{#1}}}
\makeatother
\documentstyle[fleqn]{art-nocolon}
\include{psfig}
%\include{caption}


\textheight =  9.0in
\textwidth = 7.0in              % assume output has this much centered

\topmargin = -.5 in 
\oddsidemargin = -.25 in
\evensidemargin = -.25 in

\renewcommand{\floatpagefraction}{1}
\renewcommand{\topfraction}{1}
\renewcommand{\bottomfraction}{1}
\renewcommand{\textfraction}{0}
\renewcommand{\baselinestretch}{1}
\renewcommand{\arraystretch}{1.5}


\newtheorem{definition}{Definition}
\newtheorem{theorem}{Theorem}
\newtheorem{lemma}{Lemma}
\newcommand{\ignore}[1]{}

\renewcommand{\to}{\rightarrow}
\newcommand{\A}{{\cal A}}
\begin{document}
\bibliographystyle{acm}

%\load{\footnotesize}{\sc}
\title{ 
%\begin{flushleft}
%\vspace{-1.0in}
%\centerline{\scriptsize\em (Extended abstract submitted for PLDI 94)}
%\normalsize { }
%\end{flushleft}
%\vspace{0.4in}
Language and Run--Time Support for Network Parallel Computing\\
}
\author{}
\date{   {email:\tt \{\}@cs.cmu.edu} \vspace*{.2in} \\
      School of Computer Science \\
         Carnegie Mellon University \\
         Pittsburgh PA 15213 \vspace*{.2in}}
%        Corresponding Author: Jaspal Subhlok \\
%        Phone: (412) 268 7893}


\maketitle
\thispagestyle{empty}
\abstract{

This paper describes how the Fx compiler system is being extended to
support network parallel computing. The new system, which is called
NetFx, uses the Fx model of task parallelism to distribute and manage
computations across the sequential and parallel machines of a network.
The central idea is a communication interface that presents a simple
abstraction for point--to--point transfer of distributed data sets,
and provides a novel mechanism for customizing the behavior of the
run--time system. This mechanism enables the run--time system to
exploit compile--time communication optimizations, and enables the
compiler to manage foreign modules that use non--standard data
distributions.  We describe the NetFx programming model, the
communication interface, how NetFx programs are compiled to the
interface, and the issues in implementing the interface.

}

%\voffset = -.5 in
\normalsize

\setcounter{page}{1}
\section{Introduction}\label{intro}

New networking technologies like HiPPI and ATM have made it possible
to implement efficient network parallel applications.  However,
standard parallel languages like High Performance Fortran (HPF) offer
little upport for network parallel computing. The low--level details of
the network protocols are usually exposed, making the construction of
an efficient network parallel application a difficult and error--prone
task in which the applications programmer must become a network
specialist.

The goal of our research is to make it possible to program network
parallel applications in an easy and efficient way. Compiling for a
variety of machines across a network presents several new challenges.
This paper shows how parallelism inside a single machine and across a
set of machines can be expressed in a uniform framework. We also
present the design of a run--time system that enables efficient
compilation of programs for network parallel computing.

The programming model we use combines task and data parallelism.
Procedures in an application are data parallel and each procedure is
mapped to a homogeneous set of nodes, which may be different nodes of
a large parallel machine or cluster of workstations. Different
procedures are placed on different sets of nodes on the network and
the compiler manages the communication between them.

The most challenging part of compilation for a network is generating
efficient communication. The most efficient communication interface
across a network is typically different from the communication
interface between the nodes of a parallel machine, hence the compiler
has to manage multiple communication paradigms.  More important, the
network environment is expected to be dynamic. Run--time load balancing
is important even for applications with a static computation structure
since the external load on different machines can vary. Hence the
compiler has limited information about the run--time environment during
code generation.

For generating efficient communication in a dynamic run--time
environment, it is important to distinguish between the aspects of the
communication that are constant for the duration of the execution and
those that are varying and are known only at run--time. In our model,
the program is partitioned into modules and each module is mapped to a
set of processors. The communication steps between modules are
determined at compile time, while the actual low--level communication
calls may depend on the run--time environment.

Our solution to generating efficient communication involves defining
an inter--module communication interface. This interface provides
mechanisms for transferring data between modules that are mapped on
groups of nodes. The compiler generates code for this communication
interface and the underlying implementation of the interface ensures
that appropriate run--time communication calls are generated.

The key new idea is that the interface provides a mechanism for 
the compiler to customize the behavior of the run--time communication
system by registering arbitrary distribution functions. This mechanism
allows the run--time system to exploit compile--time communication
optimizations, and also allows the compiler to manage foreign
modules that use non--standard data distributions.

We are developing our approach with a compiler system called NetFx,
which is based on the existing Fx system, which in turn is based on
HPF. We begin by motivating the need for network parallel computing
with a few example applications, followed by a description of the
NetFx system.

\section{Motivation for network parallel computing}

Different parts of a large application typically have different
requirements for resources like I/O devices, sensors, processing
power, communication bandwidth, memory, and software tools.
Attempting to satisfy all of these resource requirements in a single
computing platform can be impractical or even impossible. An
attractive alternative is to run the different parts of the
application on the appropriate platforms and use a high--speed network
to transfer results back and forth between the different parts. We
refer to this type of computation as {\em network parallel
computation}.

This section illustrates the motivation for network parallel computing
with four applications: (1) a {\em real--world modeling} application
where inputs are acquired from a camera system, processed on a large
parallel system, and displayed remotely, (2) an {\em air quality
modeling} application where different steps of the computation have
different processing and communication requirements, (3) an {\em
earthquake ground motion modeling} application where the results are
computed on a large parallel system and visualized on a graphics
workstation, and (4) a {\em real--time embedded system} constructed
from commercially--available rack--mounted array processor boards.

\subsection{Real--world modeling}

High performance computers and their programming tools have
traditionally been applied to problems in scientific modeling:
simulation, analysis, and the like. We are applying these same systems
and tools to problems in {\em real--world modeling}, by which we mean
the three--dimensional shape modeling of objects and environments in
the real world by direct observations using computer vision
techniques. In so doing we hope to replace complex and specialized
sensory systems based on technology like laser range--finding by much
simpler, more flexible and scalable, but computationally more
expensive, systems like stereo vision.

In real--world modeling applications the sensor system plays a key
role. In some situations (e.g., imaging moving objects) it is
necessary to capture imagery at video rate (30 Hz) or faster.
Capturing data at this high rate is complicated by the need to
synchronously access several cameras (recently the most significant
advances in stereo vision accuracy and reliability have come from the
introduction of multiple cameras.)  In other situations (e.g., imaging
building interiors) speed of image capture is not so important but the
imaging system must be portable.

These conflicting requirements can be met through the
application of modern networking technology. The system we envision is
illustrated in Figure \ref{fig:netargos}. 
\begin{figure}[htbp]
\label{fig:netargos}
\centerline{\epsfxsize=4in \epsfbox{netargos.eps}}
\caption{foobar}
\end{figure}
The camera system is a detachable unit, and can be attached either to
a large number of workstations, giving a high frame capture rate, or
to a smaller number of portable workstations or PCs, giving
portability at the expense of speed. Once the images are captured (and
the portable computer system is reattached to the network, if
necessary), they are transformed into stereo vision data through the
application of parallel stereo vision techniques we have invented and
coded in Fx. Because of Fx's architecture independence we can run
these programs on the workstations themselves or on powerful
tightly--coupled high performance computers.  The resulting range
imagery is then transformed into a three--dimensional model, again in
parallel using Fx, and the resulting model can then be stored to disk
or displayed remotely, enabling sophisticated three--dimensional
videoconferencing. 

\subsection{Air quality modeling}

As part of a Grand Challenge grant on heterogeneous computing for
large Scale environmental modeling, researchers at Carnegie Mellon
have adapted a existing air quality modeling application to run on
parallel systems.  The application consists of three data parallel
steps, repeated indefinitely.  The first step, reading input data from
storage and preprocessing it, is I/O intensive.  The second step,
computing the interaction of different chemical species in the
atmosphere, exhibits massive parallelism and little communication.
The third step, computing the wind--blown movement of chemical
species, has significant parallelism and communication.  Each of these
steps has different resource requirements and is suited to a
particular class of machine.  For example, an Fx implementation of the
chemistry step runs as fast on four Ethernet--connected DEC Alpha
workstations as on 32 nodes of an Intel Paragon.

\subsection{Earthquake ground motion modeling}

As part of a Grand Challenge grant on earthquake modeling, researchers
at Carnegie Mellon are developing techniques for predicting the motion
of the ground during strong earthquakes. The computation consists of
two distinct steps: simulation and visualization.  The simulation step
uses a large, sparse, and irregular finite element model to generate a
sequence of displacement vectors, one vector per time step. The
visualization step manipulates the displacement vectors and produces a
graphical display.  The simulation step requires access to large
amounts of memory, computational power, and low--latency
high--bandwidth communications, while the visualization step requires
access to a bit--mapped display and, for the more complex 3D models, a
graphics accelerator. The natural partitioning of this application is
to run the simulation step on a large tightly--coupled parallel system,
and to run the visualization step on a graphics workstation.

\subsection{Real--time embedded systems}

Economic realities have created a pressing need for embedded systems
that are built with commercial off--the--shelf components.  The basic
building block is typically a commercial array processor board that
consists of a fixed number of processors connected by a fast onboard
switch. For larger systems, multiple boards are mounted in a rack and
connected by a bus or ring structure.  For still larger systems,
multiple racks are connected by a commercial network like FDDI or ATM.

These systems are used for real--time applications like sonar and
radar, where the processing typically consists of reading from one or
more sensors, and then manipulating the inputs in a sequence of
stages. Each stage must be assigned enough processing resources to
match the throughput and latency requirements of the overall system.
An application that needs multiple racks to meet its real--time
requirements  will of necessity be a network parallel computation.

\section{Programming model for NetFx}

The basic programming model provides support for task and data
parallelism and is not changed from Fx~\cite{SSOG93,GrOS94}.  Support
for data parallelism is similar to that provided in High Performance
Fortran (HPF) and is described in~\cite{StOG94,YWSO93}.  Task
parallelism is used to map data parallel subroutines onto different,
possibly heterogeneous, groups of processors. We outline how task
parallelism is expressed in NetFx and illustrate how it is used to
exploit coarse grain parallelism.

\subsection{Task parallelism}
Task parallelism is expressed solely using compiler directives and no
new language extensions are introduced.  To simplify the
implementation, the current version of Fx relies on the user to
identify the side effects of the task--subroutines and to specify
them. Directives are also used to guide the compiler in mapping the
program which is important for performance and also to ensure that
tasks execute on computers for which the compiled versions are
available.  We describe the directives that are used to express task
parallelism and illustrate their use. We use the FFT--Hist kernel as
an example program. This program takes a sequence of arrays as input,
and for each input array, it performs a 2D FFT by performing a set of
1DFFTs, first on columns and then on the rows, followed by histogram
analysis.

\subsubsection*{Parallel sections}
Calls to task--subroutines are permitted only in special code regions
called {\em parallel sections}, denoted by a {\tt begin parallel/end
parallel} pair. For example, the parallel section for the FFT--Hist
example has the following form:
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
c\$ \>{\bf begin parallel} \\
\>     do i = 1,m \\
\>\>     call colffts(A)\\
c\$\>\>  {\em input/output and mapping directives}\\ \vspace{3mm}
\>\>     call rowffts(A)\\
c\$\>\>  {\em input/output and mapping directives}\\ \vspace{3mm}
\>\>     call hist(A)\\
c\$\>\>  {\em input/output and mapping directives}\\
\>     enddo\\
c\$\>   {\bf end parallel}
\end{tabbing}
\rm
The code inside a parallel section can only contain loops and
subroutine calls.  These restrictions are necessary to make it
possible to manage shared data and shared resources (including
nodes) efficiently. Every task--subroutine call inside a parallel
section corresponds to a (possibly data parallel)
{\em task} that can execute in parallel with other tasks
subject to dependence constraints.

A parallel section introduces a data parallel thread for each
task--subroutine inside it.  Outside the parallel section, the program
executes as a single data parallel thread.

\subsubsection*{Input/output directives}

The user includes {\tt input} and {\tt output} directives to
define the side--effects of a task--subroutine, i.e., the data space
that the subroutine accesses and modifies.  Every variable whose value
at the call site may potentially be used by the called subroutine must
be added to the input parameter list of the task--subroutine.
Similarly, every variable whose value may be modified by the called
subroutine must be included in the output parameter list. The variables
in the input and output parameter lists can be  distributed or replicated
arrays, or scalars.

For example, the input and output directives for the call to {\tt
rowffts} have the form:
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
\>\>     call rowffts(A)\\
c\$\>\>  {\bf input} (A),  {\bf output} (A)\\
c\$\>\>  {\em mapping directives}\\
\end{tabbing}
\rm
This tells the compiler that the subroutine {\tt rowffts} can
potentially use values of, and write to the parameter array {\tt A}.
As another example, the input and output directives for the the call
to {\tt colffts} has the form:
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
\>\>     call colffts(A)\\
c\$\>\>  {\bf output} (A)\\
c\$\>\>  {\em mapping directives}\\
\end{tabbing}
\rm
This tells the compiler that subroutine {\tt colffts} does not use the
value of any parameter that is passed but can potentially write to
array {\tt A}

\subsubsection*{Mapping directives}
Labeling task--subroutine calls and supplying input/output information
is sufficient for the compiler to generate a correct parallel program.
However, the performance of the program largely depends on how the
task--subroutines are mapped onto the available computation resources
on the network. The possible mappings are constrained by the
availability of data parallel implementations of different
task--subroutines for different architectures, memory requirements of
the task--subroutines, and physical location of devices like
frame buffers.  Ideally NetFx should be able to automatically map the
program for correct execution and best possible performance, but that
is a hard problem and NetFx is not able to do it except in some simple
situations~\cite{SuVo95}. In general, the programmer has to supply the
mapping information.

The mapping information specifies where each of the task--subroutines
should execute. The details of the mapping directives are somewhat
cumbersome and machine specific and we will not describe them here.
For example, when mapping a task--subroutine to a network of
workstations, it may be necessary to specify the names of the
workstations to be used as a node, but when mapping to a part of a
massively parallel machine like the Intel Paragon, it may be
sufficient to specify the number of nodes to be used.

\subsection{Programming coarse grain parallelism}
NetFx allows the programmer to use fine grain data parallelism as well
as coarse grain task parallelism. We illustrate how the task
parallelism directives described above are used to exploit common
forms of coarse grain parallelism.

\subsubsection*{Task pipelines}
In image and signal processing programs , the main computation often consists
of repeatedly executing an acyclic task graph. A simple example of such
a program is the  FFT--Hist example program, which we now present
with directives:
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
c\$ \>{\bf begin parallel} \\
\>     do i = 1,m \\
\>\>     call colffts(A)\\
c\$\>\>  {\bf output} (A)\\
\>\>     call rowffts(A)\\
c\$\>\>  {\bf input} (A),  {\bf output} (A)\
\>\>     call hist(A)\\
c\$\>\>  {\bf input} (A)\\
\>     enddo\\
c\$\>   {\bf end parallel}
\end{tabbing}
\rm

In FFT--Hist, all data dependences in the program are forward and loop
independent. Task--subroutine {\em colffts} reads input, computes and
sends output data set to {\em rowffts}, which receives the data set,
computes, and sends the output data set to {\em hist}, which computes
the final output. This allows an earlier stage, say {\em colffts} to
process a new data set while {\em rowffts} is processing the previous
data set, allowing coarse grain pipeline parallelism. In related work,
we have argued that such parallelism can be exploited efficiently and
profitably for many applications including narrowband tracking radar
and multibaseline stereo~\cite{SOGD94}.

\subsubsection*{Communicating tasks}
In many scientific applications, different subroutines can execute in
parallel, but need to transfer or exchange data between invocations.
For example, two routines {\em fproca} and {\em fprocb} may work on
different parts of a data structure and need to exchange boundary
elements between successive invocations. A task parallel program that
expresses this computation is as follows:

\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
c\$ \>{\bf begin parallel} \\
\>     do i = 1,m \\
\>\>     call fproca(A, boundA, boundBx)\\
c\$\>\>  {\bf output} (boundA), {\bf input} (boundBx)\\
\>\>     call fprocb(B, boundB, boundAx)\\
c\$\>\>  {\bf output} (boundB),  {\bf input} (boundAx)\\
\>\>     call ftransfer(boundA, boundB, boundAx, boundBx)\\
c\$\>\>  {\bf input} (boundA, boundB) {\bf output} (boundAx, boundBx)\\
\>     enddo\\
c\$\>   {\bf end parallel}
\end{tabbing}
\rm

The task dependence graph for this computation is shown in Figure
(TOBEMADE).  There is no direct dependence between task--subroutines
{\em fproca} and {\em fprocb}, but they have a loop carried dependence
on {\em ftransfer} due to variables {\em boundAx} and {\em boundBx}
respectively. Task--subroutine {\em ftransfer} has a loop independent
dependence on {\em fproca} and {\em fprocb}, due to variables {\em
boundA} and {\em boundB}. Thus, the iterations of {\em fprocA} and
{\em fprocB} execute in parallel and effectively exchange boundary
elements using {\em ftransfer}. Note that the results obtained by task
parallel execution are consistent with sequential execution.

\subsection{Compilation}
The compiler uses a data flow algorithm to analyze the dependences
caused by input/output variables and precisely determines the data
movement between tasks. The basic paradigm is that the results
obtained must always be consistent with sequential execution.  It then
uses the mapping information to group task--subroutines into {\em
modules}. All task--subroutines inside a module are mapped to the same
set of processor nodes.

The compiler generates calls to the run--time system for communication
between modules, and may also generate distribution mapping routines
that the run--time system uses for communication. Inter--module
communication management is done jointly by the compiler and the
run--time system and the details are discussed in the next section. The
involvement of the run--time system is necessary since the compiler is
aware of the communication pattern between modules, but may not be
aware of the mapping of the modules. This is particularly important if
the mappings can change dynamically, or if some of the modules contain
routines not generated by the NetFx compiler.

\section{Run--time support for task parallelism}

The NetFx run--time system is responsible for efficiently transporting data 
sets between modules.  This task is complicated by a number of 
requirements, including dynamic distribution and module bindings, 
asynchrony, the desire for compile--time customizability, and node and 
network heterogeneity.  The interface exported by the run--time system 
presents the abstraction of high--level module--to--module communication of 
distributed data sets via collective non--blocking send and receive 
operations.  Compile--time customizability is achieved by a facility for 
registering compiler--generated distribution functions with the 
run--time system.  This mechanism also provides a path for extending the 
range of supported distribution types.  The run--time system itself consists 
of three subsystems with well defined interfaces between them.  More than a 
good design practice, this allows us to independently specialize subsystems 
for particular environments in order to deal with heterogeneity.

In this section, we first define the terms and abstractions we use.  
Next, we describe the interface exported by the run--time system.  This is 
followed by an explanation of the run--time system's internal structure 
and interfaces.  Finally, we show how the interfaces are traversed during 
the execution of a simple send operation.


\subsection{Terms and abstractions}

A {\em task} is a data parallel subroutine.  One or more tasks are
grouped into a {\em module}, where they execute consecutively.  A
module runs on a set of processes mapped to a {\em processor group},
which is a set of processors that share some common attributes.  There
is a {\em dependence} between two modules if a task in one produces a
data set for a task in the other.  The compiler resolves this
dependence by generating a send call for the producer module and a
receive call for the consumer module.  Every module essentially
repeats three phases.  First, it receives data sets from each
module it is dependent on.  Next, it executes its tasks.  Finally,
it sends data sets to each module that depends on it.

The elements of a data set are partitioned across the processes of a module 
according to some rule.  This rule is identified by a distribution type and 
a distribution.  A {\em distribution type} identifies a class of similar 
rules, while a {\em distribution} identifies a specific member of the 
class.  For example, the distribution type of $HPF_ARRAY$ includes the 
distribution $(*,BLOCK)$.  Two distribution types identify a method for 
mapping between their member distributions, a process called {\em 
distribution computation}.  Given two distributions of the appropriate type 
and a process pair, the method, or {\em distribution function}, returns an 
{\em offset relation} between offsets into the chunk of elements owned by 
each process.  Given the address of the initial element owned by each 
process and the machine architectures they run on, the offset relation can 
be trivially converted into an {\em address relation}, which maps memory 
addresses on one machine to memory addresses on the other.  Alternatively, 
the distribution function can directly assemble and disassemble message 
buffers.

The distribution of a data set may be {\em static} or {\em dynamic}.
A static distribution never changes while a dynamic one may change.  A
data set's distribution type never changes.  For example, a load
balanced distribution type that involves data movement will certainly
result in a dynamic distribution.  The mapping of a module's processes
to a processor group may also be static or dynamic.  The process of
finding the current values of distributions or module mappings is
referred to as {\em binding} and the results are {\em bindings}.
Bindings are static or dynamic depending on the distribution or module
mappings they represent.

The run--time system presents the abstraction of ``module--to--module'' 
communication of a distributed data set between uniquely numbered modules.  
This is a natural extension of point--to--point message passing where the 
endpoints are groups of processes and the data sets are distributed.  
Within the run--time system, the next lower level of abstraction is a 
mapping of each element of a data set to an element offset from an initial 
address on each process of a set of consecutively numbered processes.  This 
is the abstraction used during distribution computation.  Notice that this 
abstraction makes distribution computation and the resulting offset 
relation architecture--independent.   The abstractions play a substantial 
role in making the components of the run--time system interchangeable.

\subsection{Interface}

The interface between the NetFx executable and the run--time system has 
three components as shown in Figure \ref{fig:interface}.  The first is the 
{\em intermodule communication interface}, which consists of simple, 
high--level, non--blocking collective send and receive calls for transporting 
whole distributed data sets between modules.  It is required that if more 
than one data set is transferred between a pair of modules, the order of the 
sends and receives on the modules be the same.  Of course, the module must 
take care that all receives complete before executing its tasks.  The 
arguments of the send and receive calls are
\begin{itemize}
\item Target (send) or source (receive) module 
\item Address of first element local to the the calling process
\item Data type of each element
\item Sender distribution type and distribution
\item Receiver distribution type and distribution
\item Hint (implementation specific)
\end{itemize}
There is a single call to wait for all outstanding receives to complete.  

Before performing sends and receives, the NetFx program must describe the 
characteristics of relevant modules, distribution types, and custom 
distribution functions to the run--time system.  This is done through the 
{\em registry interface}.  The NetFx program explicitly registers module 
mappings or distribution types as dynamic or static.  All dynamic mappings 
or types must be registered.  In the simplest case, where module mappings 
are static, and only static, library--supported distribution types are used, 
only the initial module mappings are registered and predefined constants are 
used for distribution types.  To use a predefined distribution type that is 
dynamic, the program must register a dynamic instance of it.  Dynamic 
bindings are updated implicitly by examining send and receive calls.

For several reasons, a program may want to register its own distribution 
types.  One reason is extensibility --- the distribution type may not be 
supported by the library.  Another is efficiency --- the compiler may have 
been able to generate a specialized distribution function for a particular 
communication.  In either case, the program must also register custom 
distribution functions for any communication that involves the new 
distribution type.  For example, suppose module 1 communicates from a new 
distribution type $DTX$ to predefined types $DT2$ on module 2 and $DT3$ 
on module 3.  All three modules must register distribution type $DTX$.
Module 1 must register new distribution functions $\lambda (DTX,DT2)$ and 
$\lambda (DTX,DT3)$, module 2 must register $\lambda (DTX,DT2)$, and module 
3 must register $\lambda (DTX,DT3)$.  All distribution functions must 
conform to a common calling convention, the {\em standard distribution
function interface}.  Each distribution function must be able to both
return offset relations or assemble/disassemble message buffers.  The 
run--time system decides which functionality will be used.

\subsection{Internal structure}

\begin{figure}
\label{fig:interface}
\centerline{\epsfxsize=5in \epsfbox{Int1.eps}}
\caption{The NetFx run--time system: subsystems, interfaces, and
relation to the NetFx executable}
\end{figure}

%\caption[]{he NetFx run--time system: subsystems, interfaces, and 
%relation to the NetFx executable.  Each arrow represents an interface 
%and points from the caller of the interface to the callee.  
%Note that some callees have several callers.}

The run--time system presents the abstraction of ``module--to--module'' 
communication of a distributed data set between uniquely numbered modules.  
This requires distribution computation, binding maintenance, and efficient 
communication over heterogeneous networks.  These tasks are segregated into 
three subsystems (distribution, binding, and communication) with well 
defined interfaces between them.  More than just good design practice, the 
ability to independently develop and enhance each subsystem is vital for 
extensibility and good performance.  Clearly, the number of distribution 
types will grow as NetFx is used to coordinate non--Fx modules.  Further, 
compile--time optimization of distribution computation is important for 
performance \cite{stichnot94}.  There are a variety of mechanisms that 
could be used to support dynamic bindings and it is unclear that any one is 
best for all environments.  Finally, to achieve high performance, it will 
be necessary to specialize the communication system for different networks.  
By clearly defining the interfaces between subsystems, we make this 
specialization possible and easy.  Indeed, all the subsystems are 
``plug--and--play.''

The internals of the run--time system are shown in Figure
\ref{fig:interface}.  The {\em communication subsystem} is the heart of the 
run--time system and exports the previously discussed intermodule 
communication interface to the NetFx program.  It makes use of the {\em 
binding subsystem} to determine current module and distribution bindings.  
In return it supplies the binding subsystem with a simple {\em binding 
communication interface} in case intermodule communication is necessary.  
If the communication subsystem determines that distribution computation 
must be done (for example, a dynamic distribution may have changed), it 
calls on the {\em distribution subsystem} via the {\em distribution 
interface}.  This subsystem serves two purposes.  First, it dispatches the 
appropriate distribution function for the sender and receiver distribution 
types.  Second, it conforms the offset relation returned by that function 
to the format desired by the communication system and converts it to an 
address relation.  (The communication system can request that a message be 
assembled or disassembled instead.) The subsystem uses the binding 
interface to bind the two distribution types to the distribution function.  
This function may be one of the {\em library distribution functions} linked 
to every NetFx executable, or a compiler--generated {\em custom 
distribution function}.  In either case, the function must conform to the 
{\em standard distribution function interface}.

Once address relations have been computed (or messages assembled), the 
communication system can use the module bindings to identify the actual 
processors to communicate with and perform the actual data transfer and a 
network and architecture dependent manner.  On the receiving module, 
essentially the same steps are followed, with obvious changes.

\subsection{Example}

\begin{figure}
        \centerline{\epsfxsize=5in \epsfbox{useint.eps}}
	\caption{Example of the steps followed for an intertask send using a 
	custom distribution function.}
	\protect\label{fig:steps}
\end{figure}

For clarity, we show how an intertask send using a custom distribution 
function is performed.  Figure \ref{fig:steps} shows the steps graphically.  
On the right are the actual calls performed by the node program and 
internally for the intertask send.  Notice that the calls are numbered to 
correspond with the interfaces used and with the steps discussed in this 
section.  There is a dependence from module $M1$ to module $M2$, which the 
compiler has satisfied by a generating a send from $M1$ to $M2$ (and a 
matching receive in $M2$, which is not discussed here.) The compiler also 
produced a custom distribution function $DISTFUNC$ for this communication.  
At run time, $M1$'s first step is to use the registry interface to register 
the initial values of all relevant bindings.  The module bindings for $M1$ 
and $M2$ are registered as static, so their values will never change.  Also 
registered as static are the distribution types $DT1$ and $DT2$ .  Finally, 
$M1$ registers $DISTFUNC$ as the distribution function for $DT1$ and $DT2$.

The second step (which may be repeated many times) is to invoke a send of 
the distributed data set $A$ to $M2$.  The intermodule send call includes 
the target module number, $M2$, the first element of $A$ that is local to 
the process, the data type of $A$'s elements (REAL), the sender and 
receiver distribution types, $DT1$ and $DT2$, and the current sender and 
receiver distributions, $DISTA1$ and $DISTA2$, respectively.  Now in the 
context of the intermodule send call, the third step is to bind the two 
modules to their current physical mappings, and the distribution types and 
distributions to their current distributions.  Both of these steps are 
simple in this case because the bindings are static --- they amount to 
local lookups.  In the fourth step, the communication subsystem calls the 
distribution subsystem to build an address relation between each pair of 
sender and receiver processes $P1$ and $P2$.  The distribution subsystem 
accomplishes this by binding the two distribution types, $DT1$ and $DT2$, 
to their corresponding distribution function, $DISTFUNC$ (step 5) and 
calling the function for each pair (step 6).  The offset relations returned 
are conformed to the format the communication subsystem wanted and 
converted to address relations.  Finally, the relations are used to perform 
the actual communication in a environment--dependent manner.  The 
communication subsystem may decide to keep the address relations handy to 
avoid steps 4--6 the next time the intertask send occurs.  Indeed, because 
the distributions are static, the relations could be kept indefinitely.


\section{Status and implementation}

The Fx compiler currently supports task parallelism on the Intel iWarp, 
Intel Paragon, and on DEC Alpha workstations running PVM.  Compiler 
analysis of parallel sections (although not mapping directive support or 
code emission) is the same for each target.  The run--time system used on 
the Paragon and the Alphas was initially developed for the Alphas with an 
eye towards portability and serves as a prototype for the system discussed 
above.  The prototype essentially has three components: a component that 
computes offset relations for any two (static) HPF distributions, a 
component that conforms these relations to a particular format and caches 
them, and a component that does communication using the relations.  Porting 
this run--time system to the Paragon (and achieving good performance) 
required rewriting only the communication component.  Further details of 
the library are available in \cite{Dinda95}.  

\section{Related work}

Run--time systems to support communicating data parallel tasks is a
fairly new research area.  It seems to be agreed that high level
communication support is the most important component.  Opus
\cite{Hains95} supports communication between tasks using
intermediaries called shared data abstractions (SDAs.) An SDA is
essentially an independent object class whose private data is
distributed over some set of threads and whose methods are protected
with monitor semantics.  For example, one could define a queue SDA
whose elements are block--distributed vectors.  To send a vector to
another task would first send it to an SDA queue object (which
requires a distribution computation) using the enqueue method.  The
receiving task would invoke the dequeue method which would cause the
SDA queue object to sent it a data set (again requiring a distribution
computation). SDAs can support many more tasking paradigms than our
scheme, but can double the communication volume and result in
unnecessary synchronization between tasks due to monitor semantics.

Closer to our scheme is the library described in \cite{Avalini95}.  This 
library supports communication between data parallel tasks with collective 
sends and receives.  Sends (and receives) are split between an 
initialization step which performs distribution computation and build 
communication schedules, and an execution step that actually performs the 
send.  Initializations synchronize sending and receiving tasks, but can be 
amortized over many executions.  Only static module bindings and static HPF 
distributions are supported.  All distribution computation is done at with 
general purpose library routines.

\section{Summary and conclusions}
We have presented an extension of the Fx compiler system, called
NetFx, that supports network parallel computing. The NetFx programming
model is based on the model of task and data parallelism originally
developed for the Fx compiler system.  The key part of the NetFx
design is a communication interface that (1) presents a simple
abstraction for point--to-point transfer of dynamically distributed
data sets, and (2) provides a novel mechanism for registering
arbitrary distribution functions with the run--time system. This
mechanism enables the compiler to customize the behavior of the
run--time system with compile--time communication optimizations, and
to manage foreign modules that use non--HPF data distributions.

\bibliography{/afs/cs/project/iwarp/member/jass/bib/all}
\end{document}
