\input epsf
%\input{psadobe}
\makeatletter
\long\def\unmarkedfootnote#1{{\long\def\@makefntext##1{##1}\footnotetext{#1}}}
\makeatother
\documentstyle[fleqn]{art-nocolon}
\include{psfig}
%\include{caption}


\textheight =  9.0in
\textwidth = 7.0in              % assume output has this much centered

\topmargin = -.5 in 
\oddsidemargin = -.25 in
\evensidemargin = -.25 in

\renewcommand{\floatpagefraction}{1}
\renewcommand{\topfraction}{1}
\renewcommand{\bottomfraction}{1}
\renewcommand{\textfraction}{0}
\renewcommand{\baselinestretch}{1}
\renewcommand{\arraystretch}{1.5}


\newtheorem{definition}{Definition}
\newtheorem{theorem}{Theorem}
\newtheorem{lemma}{Lemma}
\newcommand{\ignore}[1]{}

\renewcommand{\to}{\rightarrow}
\newcommand{\A}{{\cal A}}
\begin{document}
\bibliographystyle{acm}

%\load{\footnotesize}{\sc}
\title{ 
%\begin{flushleft}
%\vspace{-1.0in}
%\centerline{\scriptsize\em (Extended abstract submitted for PLDI 94)}
%\normalsize { }
%\end{flushleft}
%\vspace{0.4in}
Language and Run--time Support for Network Parallel Computing\\
}
\author{ Peter A. Dinda \and David R. O'Hallaron \and
         Jaspal Subhlok\thanks{Corresponding author. 
                               ({\tt jass@cs.cmu.edu} , (412) 268 7893) }
 \and Jon A. Webb \and Bwolen Yang}
\date{
%   {email:\tt \{\}@cs.cmu.edu} \vspace*{.2in} \\
      School of Computer Science \\
         Carnegie Mellon University \\
         Pittsburgh PA 15213}

\maketitle
\thispagestyle{empty}
\abstract{
%
% This still needs some help
%
Network parallel computing is the use of diverse computing resources 
interconnected by general purpose networks to run parallel applications.  
This paper describes NetFx, an extension of the Fx compiler system which 
uses the Fx model of task parallelism to distribute and manage computations 
across the sequential and parallel machines of a network.  The central 
problem in compilation for network parallel computing is that the compiler 
is presented with a heterogeneous and dynamic target.  Our approach is 
based on a novel run--time system that presents a simple communication 
interface to the compiler, but uses compiler knowledge to customize 
communication between tasks on heterogeneous machines over complex 
networks.  The run--time system interface is sufficiently general to 
support different client compilers.  The compiler generates code for data 
parallel modules with point--to--point transfer of distributed data sets 
between modules.  This allows the compiler to be portable and enables 
communication generation without knowledge of exactly how the modules will 
be mapped at run--time, and what low level communication primitives will be 
used.  The compiler also generates a set of custom routines, called address 
computation functions, to translate between different data distributions.  
The run--time system performs the actual communication using a mix of 
generic and custom address computation functions depending on run--time 
parameters like the type and number of nodes assigned to the communicating 
modules and the data distributions of the variables being communicated.  
This mechanism enables the run--time system to exploit compile--time 
optimizations, and enables the compiler to manage foreign modules that use 
non--standard data distributions.  We describe the NetFx programming model, 
the communication interface, how NetFx programs are compiled to the 
interface, and the issues in implementing the interface.

}

%\voffset = -.5 in
\normalsize

\setcounter{page}{1}
\section{Introduction}\label{intro}

%
% I changed this to include building apps using different languages
% and emphasized the rts -PAD
%
%

Network parallel computing is the use of diverse computing resources
interconnected by general purpose networks to run parallel
applications.  The domain of effective network parallel applications
is rapidly expanding because of considerable improvements in the
latency and bandwidth characteristics of networks such as HiPPI and
ATM.  It is becoming practical to run applications such as scientific
simulations, real-time applications and real-world modeling on network
parallel computing environments.  However, standard parallel languages
like High Performance Fortran (HPF)~\cite{HPFF93} offer little support
for network parallel computing.  Further, multiple languages and
programming paradigms may be necessary to fully realize the potential of
the network parallel computing environment --- yet there is little
support for interoperability in today's parallel languages and
paradigms.  Without such support, the construction of an efficient,
scalable, and environment--independent network parallel application is
a difficult and error--prone task in which the applications programmer
must wear many hats, including that of the network specialist.

The goal of our research is to make it easy to build network parallel
applications that achieve high performance in many different
environments. This paper presents a programming model for network
parallel applications and the design of a compiler and a run--time
system that enables efficient generation and execution of network
parallel applications.

%Our approach is to decompose the application into a set
%of parallel tasks, perhaps written in different languages.  These
%tasks communicate via an extensible, modular run--time system which
%handles the details of moving distributed data sets between tasks.
%The run--time system can be extended by a compiler or programmer to
%improve performance.  Because the run--time system is modular, its
%components can be implemented by specialists to take advantage of
%network and machine characteristics.

Our programming model allows the programmer to express task parallelism 
among parallel tasks.  Tasks are instantiated as procedure calls.  The 
programmer specifies the interface of the task, but may implement the task 
in any language.  Tasks are grouped together into modules which are then 
mapped to the computing environment, either by the compiler or the 
programmer.  The compiler generates communication between tasks to satisfy 
programmer--specified data dependencies between modules.

%
% This is not really right
%
%The programming model we use combines task and data parallelism.
%Procedures in an application are data parallel and each procedure is
%mapped to a homogeneous set of nodes, which may be different nodes of
%a large parallel machine or a cluster of workstations. Different
%procedures are placed on different sets of nodes on the network and
%the compiler manages the communication between them.

The most challenging part of compilation for a network is generating
efficient communication.  In a network parallel computing, multiple
communication mechanisms must be supported for efficient
communication.  For example, crossing a network requires different
mechanisms than communicating between the nodes of an MPP.  More
important, the network environment is expected to be
dynamic. Run--time load balancing is important even for applications
with a static computation structure since the external load on
different machines can vary.  This means the compiler has limited
information about the run--time environment during code generation.

For generating efficient and portable communication in a dynamic
run--time environment, it is important to distinguish between the
aspects of the communication that are constant for the duration of the
execution and those that are varying and may be known only at
run--time. The communication steps between tasks are a property of the
program and are determined at compile time, while the actual
low--level communication calls may depend on the run--time
environment.

Our solution to generating efficient communication involves defining
an inter--module communication interface. This interface provides
mechanisms for transferring data between modules that are mapped on
groups of nodes. The compiler generates code for this communication
interface and the run--time system ensures that appropriate
communication calls are generated. The compiled program is portable
across the machines and network protocols supported by the run--time
system. 

The key new idea is that the interface decouples the compilation from
generation of low level communication which is essential for a network
environment, yet provides a mechanism for the compiler to customize or
extend the behavior of the run--time communication system. This
mechanism allows the run--time system to exploit compile--time
communication optimizations, and allows the run--time system to be
painlessly extended to support new data distributions.  The run--time
system itself is not tied to the compiler, but may be used by other
clients.

We are extending the Fx compiler system~\cite{GrOS94} to support the
programming model and the run--time system.  Fx already extends a data
parallel language based on HPF with task parallelism.  The extended
version of Fx is called NetFx.  We begin by motivating the need for
network parallel computing with a set of example applications and then
describe the NetFx language extensions, compiler strategy and the
extensible run--time system.

\section{Motivation for network parallel computing}

The different components of a large application typically have
different requirements for resources like I/O devices, sensors,
processing power, communication bandwidth, memory, and software tools.
Further, each component may be best expressed in a different language
or paradigm.  Attempting to satisfy all of these resource requirements
in a single computing platform utilizing a single language can be
impractical or even impossible.  An attractive alternative is to build
each component of the application using the appropriate language or
programming paradigm and run it on the appropriate platform using high
speed networks to communicate with other components. We refer to this
type of computation as {\em network parallel computation}.

This section illustrates the motivation for network parallel computing
with four applications: (1) a {\em real--world modeling} application
where inputs are acquired from a camera system, processed on a large
parallel system, and displayed remotely, (2) an {\em air quality
modeling} application where different steps of the computation have
different processing and communication requirements, (3) an {\em
earthquake ground motion modeling} application where the results are
computed on a large parallel system and visualized on a graphics
workstation, and (4) a {\em real--time embedded system} constructed
from commercially--available rack--mounted array processor boards.

\subsection{Real--world modeling}

High performance computers and their programming tools have
traditionally been applied to problems in scientific modeling,
including simulation analysis. A group of researchers at Carnegie
Mellon is applying these same systems and tools to problems in {\em
real--world modeling}, which means three--dimensional shape modeling
of objects and environments in the real world by direct observations
using computer vision techniques. Success of this approach could lead
to, for example, replacement of complex and specialized sensory
systems based on laser range--finding technology by much simpler, more
flexible and scalable, but computationally more expensive, systems
like stereo vision.  In real--world modeling applications the sensor
system plays a key role. In some situations (e.g., imaging moving
objects) it is necessary to capture imagery at video rate (30 Hz) or
faster.  Capturing data at this high rate is complicated by the need
to synchronously access several cameras\footnote{In recent past, the
most significant advances in stereo vision accuracy and reliability
have come from the introduction of multiple cameras.}.  In other
situations (e.g., imaging building interiors) speed of image capture
is not so important but the imaging system must be portable.

These conflicting requirements can be met through the
application of modern networking technology. The system we envision is
illustrated in Figure \ref{fig:netargos}. 
\begin{figure}[htb]
\centerline{\epsfxsize=3in \epsfbox{netargos.eps}}
\caption{Real-world modeling system}
\label{fig:netargos}
\end{figure}
The camera system is a detachable unit, and can be attached either to
a large number of workstations, giving a high frame capture rate, or
to a smaller number of portable workstations or PCs, giving
portability at the expense of speed. Once the images are captured,
they are transformed into stereo vision data through the application
of parallel stereo vision programmed in Fx, which can then be executed
on the workstations themselves or on powerful tightly--coupled high
performance computers.  The resulting range imagery is then
transformed into a three--dimensional model, which is another Fx
program, and the resulting model can then be stored to disk or
displayed remotely. NetFx implementation will allow us to program the
entire framework including communication across the network in a
uniform portable manner.

\subsection{Air quality modeling}

As part of a Grand Challenge grant on heterogeneous computing for
large scale environmental modeling, researchers at Carnegie Mellon
have adapted the airshed environmental model application~\cite{McRH92}
to run on parallel systems.  The application consists of repeated
application of three data parallel steps.  The first step, reading
input data from storage and preprocessing it, is I/O intensive.  The
second step, computing the interaction of different chemical species
in the atmosphere, exhibits massive parallelism and little
communication.  The third step, computing the wind--blown transport of
chemical species, has significant parallelism and communication.  Each
of these steps is suited to a different class of machine.  For
example, an Fx implementation of the chemistry step runs as fast on
four Ethernet--connected DEC Alpha workstations as on 32 nodes of an
Intel Paragon since the relatively slow communication is not an
important factor, but the transport step is expected to be more
sensitive to the parameters of the communication system.  Further, the
input step is best run concurrently with the other stages.  A NetFx
implementation will allow us to use a single high level program for
the entire application and the ability to use different mappings for
execution.


\subsection{Earthquake ground motion modeling}

As part of a Grand Challenge grant on earthquake modeling, researchers
at Carnegie Mellon are developing techniques for predicting the motion
of the ground during strong earthquakes. The computation consists of
two distinct steps: simulation and visualization.  The simulation step
uses a large, sparse, and irregular finite element model to generate a
sequence of displacement vectors, one vector per time step. The
visualization step manipulates the displacement vectors and produces a
graphical display.  The simulation step requires access to large
amounts of memory, computational power, and low--latency
high--bandwidth communications, while the visualization step requires
access to a bit--mapped display and, for the more complex 3D models, a
graphics accelerator. The natural partitioning of this application is
to run the simulation step on a large tightly--coupled parallel
system, and to run the visualization step on a graphics workstation.

\subsection{Real--time embedded systems}

Economic realities have created a pressing need for embedded systems
that are built with commercial off--the--shelf components.  The basic
building block is typically a commercial array processor board that
consists of a fixed number of processors connected by a fast onboard
switch. For larger systems, multiple boards are mounted in a rack and
connected by a bus or ring structure.  For still larger systems,
multiple racks are connected by a commercial network like FDDI or ATM.

These systems are used for real--time applications like sonar and
radar, where the processing typically consists of reading from one or
more sensors, and then manipulating the inputs in a sequence of
stages. Each stage must be assigned enough processing resources to
match the throughput and latency requirements of the overall system.
An application that needs multiple racks to meet its real--time
requirements will of necessity be a network parallel computation.

\section{Programming model for NetFx}
%
% I changed this to stress that it's an initial implementation and to 
% mention the language restrictions.  There's a real problem in that none
% of us, I think, are really sure what the programming model should look
% for a heterogenous system and with multiple language support.
%
%
The initial implementation of the programming model is an extension of the 
Fx ~\cite{GrOS94,SSOG93} compiler system called NetFx.  NetFx provides 
support for task and data parallelism but restricts language support to Fx.  
Support for data parallelism is similar to that provided in High 
Performance Fortran (HPF) and is described in~\cite{StOG94,YWSO93}.  Task 
parallelism is used to map data parallel subroutines onto different, 
possibly heterogeneous, groups of nodes.  We outline how task parallelism 
is expressed in NetFx and illustrate how it is used to exploit coarse grain 
parallelism.

\subsection{Task parallelism}
Task parallelism is expressed solely using compiler directives and no
new language extensions are introduced.  To simplify the
implementation, the current version of Fx relies on the user to
identify the side effects of the task--subroutines and to specify
them. Directives are also used to guide the compiler in mapping the
program which is important for performance and also to ensure that
tasks execute on computers for which the compiled versions are
available.  We describe the directives that are used to express task
parallelism and illustrate their use. We use the FFT--Hist kernel as
an example program. This program takes a sequence of arrays as input,
and for each input array, it performs a 2D FFT by performing a set of
1D FFTs, first on columns and then on the rows, followed by histogram
analysis.

\subsubsection*{Parallel sections}
Calls to task--subroutines are permitted only in special code regions
called {\em parallel sections}, denoted by a {\tt begin parallel/end
parallel} pair. For example, the parallel section for the FFT--Hist
example has the following form:
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
c\$ \>{\bf begin parallel} \\
\>     do i = 1,m \\
\>\>     call colffts(A)\\
c\$\>\>  {\em input/output and mapping directives}\\ \vspace{3mm}
\>\>     call rowffts(A)\\
c\$\>\>  {\em input/output and mapping directives}\\ \vspace{3mm}
\>\>     call hist(A)\\
c\$\>\>  {\em input/output and mapping directives}\\
\>     enddo\\
c\$\>   {\bf end parallel}
\end{tabbing}
\rm
The code inside a parallel section can only contain loops and
subroutine calls.  These restrictions are necessary to make it
possible to manage shared data and shared resources (including
nodes) efficiently. Every task--subroutine call inside a parallel
section corresponds to a (possibly data parallel)
{\em task} that can execute in parallel with other tasks
subject to dependence constraints.

A parallel section introduces a data parallel thread for each
task--subroutine inside it.  Outside the parallel section, the program
executes as a single data parallel thread.  The effect of executing a 
parallel section is the same as if its contents were executed sequentially.

\subsubsection*{Input/output directives}

The user includes {\tt input} and {\tt output} directives to
define the side--effects of a task--subroutine, i.e., the data space
that the subroutine accesses and modifies.  Every variable whose value
at the call site may potentially be used by the called subroutine must
be added to the input parameter list of the task--subroutine.
Similarly, every variable whose value may be modified by the called
subroutine must be included in the output parameter list. The variables
in the input and output parameter lists can be  distributed or replicated
arrays, or scalars.

For example, the input and output directives for the call to {\tt
rowffts} have the form:
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
\>\>     call rowffts(A)\\
c\$\>\>  {\bf input} (A),  {\bf output} (A)\\
c\$\>\>  {\em mapping directives}\\
\end{tabbing}
\rm
This tells the compiler that the subroutine {\tt rowffts} can
potentially use values of, and write to the parameter array {\tt A}.
As another example, the input and output directives for the the call
to {\tt colffts} has the form:
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
\>\>     call colffts(A)\\
c\$\>\>  {\bf output} (A)\\
c\$\>\>  {\em mapping directives}\\
\end{tabbing}
\rm
This tells the compiler that subroutine {\tt colffts} does not use the
value of any parameter that is passed but can potentially write to
array {\tt A}

\subsubsection*{Mapping directives}
%
% This is sorely lacking in detail - again, because we don't know what we 
% need.
%
%
Labeling task--subroutine calls and supplying input/output information
is sufficient for the compiler to generate a correct parallel program.
However, the performance of the program largely depends on how the
task--subroutines are mapped onto the available computation resources
on the network. The possible mappings are constrained by the
availability of data parallel implementations of different
task--subroutines for different architectures, the memory requirements of
the task--subroutines, and the physical locations of devices like
frame buffers.  Ideally NetFx should be able to automatically map the
program for correct execution and best possible performance, but that
is a hard problem and NetFx is not able to do it except in some simple
situations~\cite{SuVo95}. In general, the programmer has to supply the
mapping information.

The mapping information specifies where each of the task--subroutines
should execute. The details of the mapping directives are somewhat
cumbersome and machine specific and we will not describe them here.
For example, when mapping a task--subroutine to a network of
workstations, it may be necessary to specify the names of the
workstations to be used as a node, but when mapping to a part of a
massively parallel machine like the Intel Paragon, it may be
sufficient to specify the number of nodes to be used.

\subsection{Programming coarse grain parallelism}
NetFx allows the programmer to use fine grain data parallelism as well
as coarse grain task parallelism. We illustrate how the task
parallelism directives described above are used to exploit common
forms of coarse grain parallelism.

\subsubsection*{Task pipelines}
In image and signal processing programs , the main computation often consists
of repeatedly executing an acyclic task graph. A simple example of such
a program is the  FFT--Hist example program, which we now present
with directives:
%
% I patched it so it looks like it does something....
%
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
c\$ \>{\bf begin parallel} \\
\>     do i = 1,m \\
\>\>     call colffts(A)\\
c\$\>\>  {\bf output} (A)\\
\>\>     call rowffts(A)\\
c\$\>\>  {\bf input} (A),  {\bf output} (A)\\
\>\>     call hist(A)\\
c\$\>\>  {\bf input} (A)
\>     enddo\\
c\$\>   {\bf end parallel}
\end{tabbing}
\rm

In FFT--Hist, all data dependences in the program are forward and loop 
independent.  Task--subroutine {\em colffts}, reads a data set from some 
input device, computes, and sends the output data set to {\em rowffts}, 
which receives the data set, computes, and sends the output data set to 
{\em hist}, which computes the final output data set.  This allows an 
earlier stage, say {\em colffts} to process a new data set while {\em 
rowffts} is processing the previous data set, allowing coarse grain 
pipeline parallelism as shown in Figure~\ref{fig:pipe}.
\begin{figure}
\epsfxsize = 4.0 in
\centerline{\epsfbox{pipe.eps}}
\caption{Dependences and coarse grain parallelism in FFT-Hist}
\label{fig:pipe}
\end{figure}
In related work,
we have argued that such parallelism can be exploited efficiently and
profitably for many applications including narrowband tracking radar
and multibaseline stereo~\cite{SOGD94}.

\subsubsection*{Communicating tasks}
In many scientific applications, different subroutines can execute in
parallel, but need to transfer or exchange data between invocations.
For example, two routines {\em fproca} and {\em fprocb} may work on
different parts of a data structure and  exchange boundary
elements between successive invocations. A task parallel program that
expresses this computation is as follows:

\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
c\$ \>{\bf begin parallel} \\
\>     do i = 1,m \\
\>\>     call fproca(A, boundA, boundBx)\\
c\$\>\>  {\bf output} (boundA), {\bf input} (boundBx)\\
\>\>     call fprocb(B, boundB, boundAx)\\
c\$\>\>  {\bf output} (boundB),  {\bf input} (boundAx)\\
\>\>     call ftransfer(boundA, boundB, boundAx, boundBx)\\
c\$\>\>  {\bf input} (boundA, boundB) {\bf output} (boundAx, boundBx)\\
\>     enddo\\
c\$\>   {\bf end parallel}
\end{tabbing}
\rm

We show the input/output directives only for the boundary
elements for the purpose of illustration.
The task dependence graph for this computation is shown in
Figure~\ref{fig:par}.
There is no direct dependence between task--subroutines
{\em fproca} and {\em fprocb}, but they have a loop carried dependence
on {\em ftransfer} due to variables {\em boundAx} and {\em boundBx}
respectively. Task--subroutine {\em ftransfer} has a loop independent
dependence on {\em fproca} and {\em fprocb}, due to variables {\em
boundA} and {\em boundB}. Thus, the iterations of {\em fprocA} and
{\em fprocB} execute in parallel and effectively exchange boundary
elements using task-subroutine
 {\em ftransfer}. The computation schedule is illustrated in
Figure~\ref{fig:par}. 
 Note that the results obtained by task
parallel execution are consistent with sequential execution.
\begin{figure}[htb]
\epsfxsize = 4.0 in
\centerline{\epsfbox{par.eps}}
\caption{Dependences and coarse grain parallelism for a program
         with  data exchange between parallel iterations}
\label{fig:par}
\end{figure}

\subsection{Compilation}
The compiler uses a data flow algorithm to analyze the dependences
caused by input/output variables and precisely determines the data
movement between tasks. The basic paradigm is that the results
obtained must always be consistent with sequential execution.  It then
uses the mapping information to group task--subroutines into {\em
modules}. All task--subroutines inside a module are mapped to the same
set of nodes. A module repeatedly receives input, computes,
and generates output.

The compiler generates calls to the run--time system for communication
between modules, and may also generate address computation functions that the
run--time system uses to translate between data distributions.
Inter--module communication management is done jointly by the compiler
and the run--time system and the details are discussed in the next
section.  The involvement of the run--time system is necessary since
the compiler may not know the mapping of the modules to the available
nodes, the current distribution of data on a remote module, or
the status of the (possibly general purpose) network at run-time.
This information may change unpredictably for a variety of reasons,
including load balancing, the use of non-NetFx task-subroutines, or
other traffic on the network.


%The involvement of the run--time system is necessary since
%the compiler is aware of the communication pattern between modules,
%but may not be aware of the mapping of the modules. This is
%particularly important if the mappings can change dynamically, or if
%some of the modules contain routines not generated by the NetFx
%compiler.

\section{Run--time support for task parallelism}

The run--time system is responsible for efficiently transporting
distributed data sets between modules.  This is a complicated task for
several reasons.  The support of multiple languages and programming 
paradigms means many different kinds of data distributions must be 
supported.  Further, this support must be extensible since an a priori 
limit on the kinds of data distributions would quickly cause obsolesence.  
Some data distributions can change dynamically due to load--balancing and 
other reasons.  The mapping of modules onto nodes can also change for
similar reasons.  At a lower level, the run--time system must be able to 
deal with machine and network heterogeneity to provide high performance
communication.  

Rising out of this cacophony of complexity, the run--time system exports a 
programming interface that presents the abstraction of high--level 
module--to--module communication of distributed data sets.  Although the 
initial client of this interface is the NetFx compiler, the interface is 
not tied to NetFx.  When handling a call to the interface, the run--time 
system must generate and perform the low level communication as efficiently 
as possible.  In general, this is very complex since the data sets involved 
may have arbitrary kinds of distributions inside the modules.  The 
run--time system uses a distribution management library that is linked in, 
as well as custom distribution management functions generated by a compiler 
or programmer and registered with the run--time system.  This mechanism 
also provides a path for extending the range of supported kinds of 
distributions.

In this section, we first explain what is involved in inter--module 
communication, defining the terms we will use along the way.  Next, we 
describe the interface exported by the run--time system.  This is followed 
by an explanation of the run--time system's internal structure and 
interfaces.  Finally, we show how the interfaces are traversed during the 
execution of a simple send operation.


\subsection{Inter--module communication}

%A {\em task} is a data parallel subroutine.  One or more tasks are
%grouped into a {\em module}, where they execute consecutively.  A
%module runs on a set of processes mapped to a {\em processor group},
%which is a set of processors that share some common attributes.  There
%is a {\em dependence} between two modules if a task in one produces a
%data set for a task in the other.  The compiler resolves this
%dependence by generating a send call for the producer module and a
%receive call for the consumer module.  Every module essentially
%repeats three phases.  First, it receives data sets from each
%module it is dependent on.  Next, it executes its tasks.  Finally,
%it sends data sets to each module that depends on it.

The goal of inter--module communication is to transport a distributed data 
set from a source module to a destination module.  The elements of the data 
set are partitioned across the processes of a module according to some 
rule.  This rule is identified by the data set's distribution type and 
distribution.  A {\em distribution type} identifies a class of similar 
rules, while a {\em distribution} identifies a specific member of the 
class.  For example, the distribution type of $HPF\_ARRAY$ includes the 
distribution $(*,BLOCK)$.  The run--time system assumes that the 
distribution type of the data set on each module is constant throughout the 
execution of the program, however its distribution may change.  If the 
distribution can change, we say that its {\em binding} is {\em dynamic}, 
otherwise it is {\em static}.  The initial value of both kinds of bindings 
are supplied at compile time.

To send the data set from a source module to a destination module, it is 
necessary to be able to translate between the distributions at the two 
end--points.  It is only possible to translate between distributions whose 
global structures are compatible.  For example, if the source global 
structure is an array, then the destination global structure must be an 
array of the same size and shape.  The two end--point distribution types 
identify a method for translating between the two member distributions, a 
process called {\em address computation}.  Given two distributions of the 
appropriate type and a process pair, the method, or {\em address 
computation function}, returns an {\em offset relation} between offsets 
into the chunk of elements owned by each process.  This 
architecture--independent relation can be trivially converted to an 
architecture--dependent {\em address relation}, which maps memory addresses 
on one machine to memory addresses on the other.

The address computation function operates under the abstraction that a data
set's elements are distributed over a consecutively numbered group of
processes of the module.  Once an address relation has been computed
for each pair of processes, the abstract process numbers must be
mapped to the actual nodes of the machines on the network.  The
properties of these module bindings are similar to those of the
distribution bindings discussed above.  Specifically, the bindings may
be static or dynamic.  Once the mappings are known, actual data
transfer can begin.


%The run--time system presents the abstraction of ``module--to--module'' 
%communication of a distributed data set between uniquely numbered modules.  
%This is a natural extension of point--to--point message passing where the 
%endpoints are groups of processes and the data sets are distributed.  
%Within the run--time system, the next lower level of abstraction is a 
%mapping of each element of a data set to an element offset from an initial 
%address on each process of a set of consecutively numbered processes.  This 
%is the abstraction used during distribution computation.  Notice that this 
%abstraction makes distribution computation and the resulting offset 
%relation architecture--independent.   The abstractions play a substantial 
%role in making the components of the run--time system interchangeable.

\subsection{Interface}
The client compiler generates calls to the run--time system, which in
turn may call custom  address computation functions generated by
the compiler.  The interface between the NetFx Executable and the
run--time system has three components as shown in Figure
\ref{fig:interface}.  The first two of these are exported by the run--time
system, while the last is exported by the program itself.  The
first component is the {\em inter--module communication interface},
which consists of simple, high--level, non--blocking collective send
and receive calls for transporting whole distributed data sets between
modules.  It is required that if more than one data set is transferred
between a pair of modules, the order of the sends and receives on the
modules be the same.  The arguments of the send and receive calls are
\begin{itemize}
\item Target (send) or source (receive) module 
\item Address of first element local to the the calling process
\item Data type of each element
\item Sender distribution type and distribution
\item Receiver distribution type and distribution
\item Hint (implementation specific)
\end{itemize}
There is a single call to wait for all outstanding receives to complete
which allows a module to complete input communication before executing.
%I though this was implicit...


Before performing sends and receives, the  program must describe the 
characteristics of relevant modules, distribution types, and custom 
address computation functions to the run--time system.  This is done through the 
{\em registry interface}.  The  program explicitly registers the 
initial module mapping and whether it is static or dynamic.  Registry of 
distribution type is required if the distribution type is not 
library-supported, or if the distribution is dynamic.
%  To use a predefined distribution type that is 
%dynamic, the program must register a dynamic instance of it. 
Dynamic bindings are updated implicitly by examining send and receive
calls.  There are several reasons why a program may want to register
its own distribution types.  One reason is extensibility --- the
distribution type may not be supported by the library.  Another is
efficiency --- the compiler may have been able to generate a
specialized address computation function for a particular communication.  In
either case, the program must also register custom address computation 
functions for any communication that involves the new distribution
type.  For example, suppose module 1 communicates from a new
distribution type $DTX$ to predefined types $DT2$ on module 2 and
$DT3$ on module 3.  All three modules must register distribution type
$DTX$.  Module 1 must register new address computation functions $\lambda
(DTX,DT2)$ and $\lambda (DTX,DT3)$, module 2 must register $\lambda
(DTX,DT2)$, and module 3 must register $\lambda (DTX,DT3)$.

All address computation functions must conform to a common calling 
convention, the {\em standard address computation function interface}, 
which is the third component of the interface between the  program and 
the run-time system.  Each address computation function must be able to 
both return offset relations or assemble/disassemble message buffers.  The 
run--time system decides which functionality is used.

\subsection{Internal structure}

\begin{figure}
\centerline{\epsfxsize=4in \epsfbox{Int1.eps}}
\caption{The run--time system: subsystems, interfaces, and
relation to the NetFx Executable}
\label{fig:interface}
\end{figure}

%\caption[]{he  run--time system: subsystems, interfaces, and 
%relation to the  NetFx Executable.  Each arrow represents an interface 
%and points from the caller of the interface to the callee.  
%Note that some callees have several callers.}

The run--time system presents the abstraction of ``module--to--module'' 
communication of a distributed data set between uniquely labeled modules.  
This requires address computation, binding maintenance, and efficient 
communication over heterogeneous networks.  These tasks are segregated into 
three subsystems (distribution, binding, and communication) with well 
defined interfaces between them.  More than just good design practice, the 
ability to independently develop and enhance each subsystem is vital for 
extensibility and good performance.  Clearly, the number of distribution 
types will grow over time as more languages and programming paradigms are 
developed and supported.  Further, compile--time optimization of 
address computation is important for performance \cite{StOG94}.  There 
are a variety of mechanisms that could be used to support dynamic bindings 
and it is unclear that any one is best for all environments.  Finally, to 
achieve high performance, it will be necessary to specialize the 
communication system for different networks.  By clearly defining the 
interfaces between subsystems, we make this specialization possible and 
easy.  Indeed, all the subsystems are ``plug--and--play.''

The internals of the run--time system are shown in Figure
\ref{fig:interface}.  The {\em communication subsystem} is the heart of the 
run--time system and exports the previously discussed inter--module 
communication interface to the program.  It makes use of the {\em 
binding subsystem} to determine the current module and distribution 
bindings.  In return it supplies the binding subsystem with a simple {\em 
binding communication interface} in case the binding system needs to 
perform an inter--module communication step to to determine a binding (for 
example, the distribution of an irregular data structure).  To translate 
between the source and target distributions, the communication subsystem 
calls on the {\em distribution subsystem} via the {\em address computation 
interface}.  This subsystem serves two purposes.  First, it dispatches the 
appropriate address computation function for the sender and receiver 
distribution types.  Second, it conforms the offset relation returned by 
that function to the format desired by the communication system and 
converts it to an address relation.  (The communication system can request 
that a message be assembled or disassembled instead.) The subsystem uses 
the binding interface to bind the two distribution types to the appropriate 
address computation function.  This function may be one of the {\em library 
address computation functions} linked to every NetFx Executable, or a 
compiler--generated {\em custom address computation function}.
% In either case, the function must conform to the 
%{\em standard address computation function interface}.

Once address relations have been computed (or messages assembled), the 
communication system can use the module bindings to identify the actual 
nodes to communicate with and perform the actual data transfer in a 
network and architecture dependent manner. 

\subsection{Example}

\begin{figure}
        \centerline{\epsfxsize=4in \epsfbox{useint.eps}}
	\caption{Example of the steps followed for an inter--module send using a 
	custom address computation function.}
	\protect\label{fig:steps}
\end{figure}

We illustrate how an inter--module send using a custom address computation 
function is performed.  Figure \ref{fig:steps} shows the steps graphically.  
On the right are the actual calls performed by the node program and those 
performed internally for the inter--module send.  The calls are numbered to 
correspond to the interfaces used and to the steps discussed in this 
section.  There is a dependence from module $M1$ to module $M2$ and the 
compiler has generated a send from $M1$ to $M2$ (and a matching receive in 
$M2$, which is not discussed here.) The compiler also produced a custom 
address computation function $DISTFUNC$ for this communication.  At run 
time, $M1$'s first step is to use the registry interface to register the 
initial values of all relevant bindings.  The module bindings for $M1$ and 
$M2$ are registered as static, as are the distribution types $DT1$ and 
$DT2$ .  Finally, $M1$ registers $ADXFUNC$ as the address computation 
function for $DT1$ and $DT2$.

The second step (which may be repeated many times) is to invoke a send of 
the distributed data set $A$ to $M2$.  The inter--module send call includes 
the target module number, $M2$, the first element of $A$ that is local to 
the process, the data type of $A$'s elements (REAL), the sender and 
receiver distribution types, $DT1$ and $DT2$, and the current sender and 
receiver distributions, $DISTA1$ and $DISTA2$.  The third step is to bind 
the two modules to their current physical mappings, and the distribution 
types and distributions to their current distributions.  Both of these 
steps are simple in this case because the bindings are static --- they 
amount to local lookups.  In the fourth step, the communication subsystem 
calls the distribution subsystem to build an address relation between each 
pair of sender and receiver processes $P1$ and $P2$.  The distribution 
subsystem accomplishes this by binding the two distribution types, $DT1$ 
and $DT2$, to their corresponding address computation function, $ADXFUNC$ (step 
5) and calling the function for each pair (step 6).  The offset relations 
returned are conformed to the format the communication subsystem wanted and 
converted to address relations.  Finally, the relations are used to perform 
the actual communication in a environment--dependent manner.  The 
communication subsystem may decide to keep the address relations handy to 
avoid steps 4--6 the next time the inter--module send occurs.


\section{Status and implementation}

The Fx compiler currently supports task parallelism on the Intel iWarp, 
Intel Paragon, and on DEC Alpha workstations running PVM.  Compiler 
analysis of parallel sections (although not mapping directive support or 
code emission) is the same for each target.  The run--time system used on 
the Paragon and the Alphas was initially developed for the Alphas with an 
eye towards portability and serves as a prototype for the system discussed 
above.  The prototype essentially has three components: a component that 
computes offset relations for any two (static) HPF distributions, a 
component that conforms these relations to a particular format and caches 
them, and a component that does communication using the relations.  Porting 
this run--time system to the Paragon (and achieving good performance) 
required rewriting only the communication component.  Further details of 
the library are available in~\cite{DiOh95}.  

The main target network for is Gigabit Nectar~\cite{ABCK89} developed
at Carnegie Mellon. A variety of communication interfaces have been
developed for Nectar and we are incorporating them in. Currently
we are able to execute homogeneous Fx programs across Alpha workstations
using Nectar as well as simple heterogeneous Fx programs on Alphas and 
Suns .  The full heterogeneous run--time system described above is currently
under development.


\section{Related work}

High level run--time support for communicating data parallel tasks is a 
fairly new research area.  We compare our approach to two other run--time 
libraries that support communicating data parallel tasks.  
Opus~\cite{HHMV95} supports communication between tasks using {\em shared 
data abstractions} (SDAs).  An SDA is essentially an independent object 
class whose private data is distributed over some set of threads and whose 
methods are protected with monitor semantics.  For example, one could 
define a queue SDA whose elements are block--distributed vectors.  To send 
a vector to another task, it is first sent to an SDA queue object (which 
requires a address computation) using the enqueue method.  The 
receiving task would invoke the dequeue method which would cause the SDA 
queue object to send a data set (again requiring address computation 
computation).  SDAs can support many more tasking paradigms than our 
scheme, but appears to significantly increase the communication volume and 
result in unnecessary synchronization between tasks due to monitor 
semantics.

Closer to our scheme is the library described in~\cite{ACFK95}.  This 
library supports communication between data parallel tasks with collective 
sends and receives.  Sends (and receives) are split between an 
initialization step which performs address computation and builds 
communication schedules, and an execution step that actually performs the 
send.  Initializations synchronize sending and receiving tasks, but can be 
amortized over many executions.  Only static module bindings and static HPF 
distributions are supported.  All address computation is done with general 
purpose library routines.

\section{Summary and conclusions}
We address programming support for high performance networks which are 
emerging as an increasingly important form of parallel computing.  We have 
presented NetFx, a compiler system that supports network parallel 
computing.  The NetFx programming model is based on integrated support for 
task and data parallelism originally developed for the Fx compiler system.  
We also presented a run--time system for network parallel computing which 
presents a simple abstraction for module-to-module transfer of distributed 
data sets so that the compiler does not have to deal with the dynamic 
nature of the network computing environment.  The run--time system allows 
the compiler and other tools that generate foreign data parallel modules to 
customize execution behavior.  A clean interface between the compiler and 
the run--time system also allows independent development and retargetting 
of both.  We believe that NetFx is the first tool to provide practical 
programming support for networks and will make network computing easier for 
non-specialists.


\bibliography{/afs/cs/project/iwarp/member/jass/bib/all}
\end{document}
