\input epsf
%\input{psadobe}
\makeatletter
\long\def\unmarkedfootnote#1{{\long\def\@makefntext##1{##1}\footnotetext{#1}}}
\makeatother
\documentstyle[fleqn,epsf]{art-nocolon}
%\include{psfig}
%\include{caption}


\textheight =  9.0in
\textwidth = 7.0in              % assume output has this much centered

\topmargin = -.5 in 
\oddsidemargin = -.25 in
\evensidemargin = -.25 in

\renewcommand{\floatpagefraction}{1}
\renewcommand{\topfraction}{1}
\renewcommand{\bottomfraction}{1}
\renewcommand{\textfraction}{0}
\renewcommand{\baselinestretch}{1}
\renewcommand{\arraystretch}{1.5}


\newtheorem{definition}{Definition}
\newtheorem{theorem}{Theorem}
\newtheorem{lemma}{Lemma}
\newcommand{\ignore}[1]{}

\renewcommand{\to}{\rightarrow}
\newcommand{\A}{{\cal A}}
\begin{document}
\bibliographystyle{acm}

%\load{\footnotesize}{\sc}
\title{ 
%\begin{flushleft}
%\vspace{-1.0in}
%\centerline{\scriptsize\em (Extended abstract submitted for PLDI 94)}
%\normalsize { }
%\end{flushleft}
%\vspace{0.4in}
Language and Run--time Support for Network Parallel Computing\\
}
\author{ Peter A. Dinda \and David R. O'Hallaron \and
         Jaspal Subhlok\thanks{Corresponding author. 
                               ({\tt jass@cs.cmu.edu} , (412) 268 7893) }
 \and Jon A. Webb \and Bwolen Yang}
\date{
%   {email:\tt \{\}@cs.cmu.edu} \vspace*{.2in} \\
      School of Computer Science \\
         Carnegie Mellon University \\
         Pittsburgh PA 15213}

\maketitle
\thispagestyle{empty}
\abstract{
%
% This still needs some help
%
Network parallel computing is the use of diverse computing resources 
interconnected by general purpose networks to run parallel applications.  
This paper describes NetFx, an extension of the Fx compiler system which 
uses the Fx model of task parallelism to distribute and manage computations 
across the sequential and parallel machines of a network.  The central 
problem in compilation for network parallel computing is that the compiler 
is presented with a heterogeneous and dynamic target.  Our approach is 
based on a novel run--time system that presents a simple communication 
interface to the compiler, but uses compiler knowledge to customize 
communication between tasks on heterogeneous machines over complex 
networks.  The run--time system interface is sufficiently general to 
support different client compilers.  The compiler generates code for data 
parallel modules with point--to--point transfer of distributed data sets 
between modules.  This allows the compiler to be portable and enables 
communication generation without knowledge of exactly how the modules will 
be mapped at run--time, and what low level communication primitives will be 
used.  The compiler also generates a set of custom routines, called address 
computation functions, to translate between different data distributions.  
The run--time system performs the actual communication using a mix of 
generic and custom address computation functions depending on run--time 
parameters like the type and number of nodes assigned to the communicating 
modules and the data distributions of the variables being communicated.  
This mechanism enables the run--time system to exploit compile--time 
optimizations, and enables the compiler to manage foreign modules that use 
non--standard data distributions.  We describe the NetFx programming model, 
the communication interface, how NetFx programs are compiled to the 
interface, and the issues in implementing the interface.

}

%\voffset = -.5 in
\normalsize

\setcounter{page}{1}
\section{Introduction}\label{intro}

%
% I changed this to include building apps using different languages
% and emphasized the rts -PAD
%
%

Network parallel computing is the use of diverse computing resources
interconnected by general purpose networks to run parallel
applications.  The domain of effective network parallel applications
is rapidly expanding because of considerable improvements in the
latency and bandwidth characteristics of networks such as HiPPI and
ATM.  It is becoming practical to run applications such as scientific
simulations, real-time applications and real-world modeling on network
parallel computing environments.  However, standard parallel languages
like High Performance Fortran (HPF)~\cite{HPFF93} offer little support
for network parallel computing.  Further, multiple languages and
programming paradigms may be necessary to fully realize the potential of
the network parallel computing environment --- yet there is little
support for interoperability in today's parallel languages and
paradigms.  Without such support, the construction of an efficient,
scalable, and environment--independent network parallel application is
a difficult and error--prone task in which the applications programmer
must wear many hats, including that of the network specialist.

The goal of our research is to make it easy to build network parallel
applications that achieve high performance in many different
environments. This paper presents a programming model for network
parallel applications and the design of a compiler and a run--time
system that enables efficient generation and execution of network
parallel applications.

%Our approach is to decompose the application into a set
%of parallel tasks, perhaps written in different languages.  These
%tasks communicate via an extensible, modular run--time system which
%handles the details of moving distributed data sets between tasks.
%The run--time system can be extended by a compiler or programmer to
%improve performance.  Because the run--time system is modular, its
%components can be implemented by specialists to take advantage of
%network and machine characteristics.

Our programming model allows the programmer to express task parallelism 
among parallel tasks.  Tasks are instantiated as procedure calls.  The 
programmer specifies the interface of the task, but may implement the task 
in any language.  Tasks are grouped together into modules which are then 
mapped to the computing environment, either by the compiler or the 
programmer.  The compiler generates communication between tasks to satisfy 
programmer--specified data dependencies between modules.

%
% This is not really right
%
%The programming model we use combines task and data parallelism.
%Procedures in an application are data parallel and each procedure is
%mapped to a homogeneous set of nodes, which may be different nodes of
%a large parallel machine or a cluster of workstations. Different
%procedures are placed on different sets of nodes on the network and
%the compiler manages the communication between them.

The most challenging part of compilation for a network is generating
efficient communication.  In a network parallel computing, multiple
communication mechanisms must be supported for efficient
communication.  For example, crossing a network requires different
mechanisms than communicating between the nodes of an MPP.  More
important, the network environment is expected to be
dynamic. Run--time load balancing is important even for applications
with a static computation structure since the external load on
different machines can vary.  This means the compiler has limited
information about the run--time environment during code generation.

For generating efficient and portable communication in a dynamic
run--time environment, it is important to distinguish between the
aspects of the communication that are constant for the duration of the
execution and those that are varying and may be known only at
run--time. The communication steps between tasks are a property of the
program and are determined at compile time, while the actual
low--level communication calls may depend on the run--time
environment.

Our solution to generating efficient communication involves defining
an inter--module communication interface. This interface provides
mechanisms for transferring data between modules that are mapped on
groups of nodes. The compiler generates code for this communication
interface and the run--time system ensures that appropriate
communication calls are generated. The compiled program is portable
across the machines and network protocols supported by the run--time
system. 

The key new idea is that the interface decouples the compilation from
generation of low level communication which is essential for a network
environment, yet provides a mechanism for the compiler to customize or
extend the behavior of the run--time communication system. This
mechanism allows the run--time system to exploit compile--time
communication optimizations, and allows the run--time system to be
painlessly extended to support new data distributions.  The run--time
system itself is not tied to the compiler, but may be used by other
clients.

We are extending the Fx compiler system~\cite{GrOS94} to support the
programming model and the run--time system.  Fx already extends a data
parallel language based on HPF with task parallelism.  The extended
version of Fx is called NetFx.  We begin by motivating the need for
network parallel computing with a set of example applications and then
describe the NetFx language extensions, compiler strategy and the
extensible run--time system.

\section{Motivation for network parallel computing}

The different components of a large application typically have
different requirements for resources like I/O devices, sensors,
processing power, communication bandwidth, memory, and software tools.
Further, each component may be best expressed in a different language
or paradigm.  Attempting to satisfy all of these resource requirements
in a single computing platform utilizing a single language can be
impractical or even impossible.  An attractive alternative is to build
each component of the application using the appropriate language or
programming paradigm and run it on the appropriate platform using high
speed networks to communicate with other components. We refer to this
type of computation as {\em network parallel computation}.

This section illustrates the motivation for network parallel computing
with four applications: (1) a {\em real--world modeling} application
where inputs are acquired from a camera system, processed on a large
parallel system, and displayed remotely, (2) an {\em air quality
modeling} application where different steps of the computation have
different processing and communication requirements, (3) an {\em
earthquake ground motion modeling} application where the results are
computed on a large parallel system and visualized on a graphics
workstation, and (4) a {\em real--time embedded system} constructed
from commercially--available rack--mounted array processor boards.

\subsection{Real--world modeling}

High performance computers and their programming tools have
traditionally been applied to problems in scientific modeling,
including simulation analysis. A group of researchers at Carnegie
Mellon is applying these same systems and tools to problems in {\em
real--world modeling}, which means three--dimensional shape modeling
of objects and environments in the real world by direct observations
using computer vision techniques. Success of this approach could lead
to, for example, replacement of complex and specialized sensory
systems based on laser range--finding technology by much simpler, more
flexible and scalable, but computationally more expensive, systems
like stereo vision.  In real--world modeling applications the sensor
system plays a key role. In some situations (e.g., imaging moving
objects) it is necessary to capture imagery at video rate (30 Hz) or
faster.  Capturing data at this high rate is complicated by the need
to synchronously access several cameras\footnote{In recent past, the
most significant advances in stereo vision accuracy and reliability
have come from the introduction of multiple cameras.}.  In other
situations (e.g., imaging building interiors) speed of image capture
is not so important but the imaging system must be portable.

These conflicting requirements can be met through the
application of modern networking technology. The system we envision is
illustrated in Figure \ref{fig:netargos}. 
\begin{figure}[htb]
\centerline{\epsfxsize=3in \epsfbox{netargos.eps}}
\caption{Real-world modeling system}
\label{fig:netargos}
\end{figure}
The camera system is a detachable unit, and can be attached either to
a large number of workstations, giving a high frame capture rate, or
to a smaller number of portable workstations or PCs, giving
portability at the expense of speed. Once the images are captured,
they are transformed into stereo vision data through the application
of parallel stereo vision programmed in Fx, which can then be executed
on the workstations themselves or on powerful tightly--coupled high
performance computers.  The resulting range imagery is then
transformed into a three--dimensional model, which is another Fx
program, and the resulting model can then be stored to disk or
displayed remotely. NetFx implementation will allow us to program the
entire framework including communication across the network in a
uniform portable manner.

\subsection{Air quality modeling}

As part of a Grand Challenge grant on heterogeneous computing for
large scale environmental modeling, researchers at Carnegie Mellon
have adapted the airshed environmental model application~\cite{McRH92}
to run on parallel systems.  The application consists of repeated
application of three data parallel steps.  The first step, reading
input data from storage and preprocessing it, is I/O intensive.  The
second step, computing the interaction of different chemical species
in the atmosphere, exhibits massive parallelism and little
communication.  The third step, computing the wind--blown transport of
chemical species, has significant parallelism and communication.  Each
of these steps is suited to a different class of machine.  For
example, an Fx implementation of the chemistry step runs as fast on
four Ethernet--connected DEC Alpha workstations as on 32 nodes of an
Intel Paragon since the relatively slow communication is not an
important factor, but the transport step is expected to be more
sensitive to the parameters of the communication system.  Further, the
input step is best run concurrently with the other stages.  A NetFx
implementation will allow us to use a single high level program for
the entire application and the ability to use different mappings for
execution.


\subsection{Earthquake ground motion modeling}

As part of a Grand Challenge grant on earthquake modeling, researchers
at Carnegie Mellon are developing techniques for predicting the motion
of the ground during strong earthquakes. The computation consists of
two distinct steps: simulation and visualization.  The simulation step
uses a large, sparse, and irregular finite element model to generate a
sequence of displacement vectors, one vector per time step. The
visualization step manipulates the displacement vectors and produces a
graphical display.  The simulation step requires access to large
amounts of memory, computational power, and low--latency
high--bandwidth communications, while the visualization step requires
access to a bit--mapped display and, for the more complex 3D models, a
graphics accelerator. The natural partitioning of this application is
to run the simulation step on a large tightly--coupled parallel
system, and to run the visualization step on a graphics workstation.

\subsection{Real--time embedded systems}

Economic realities have created a pressing need for embedded systems
that are built with commercial off--the--shelf components.  The basic
building block is typically a commercial array processor board that
consists of a fixed number of processors connected by a fast onboard
switch. For larger systems, multiple boards are mounted in a rack and
connected by a bus or ring structure.  For still larger systems,
multiple racks are connected by a commercial network like FDDI or ATM.

These systems are used for real--time applications like sonar and
radar, where the processing typically consists of reading from one or
more sensors, and then manipulating the inputs in a sequence of
stages. Each stage must be assigned enough processing resources to
match the throughput and latency requirements of the overall system.
An application that needs multiple racks to meet its real--time
requirements will of necessity be a network parallel computation.


\section{Our approach}
%
% Give an overview of our approach
%
\begin{figure}
\epsfxsize = 5.0 in
\centerline{\epsfbox{Overview.eps}}
\caption{Overview of our approach}
\label{fig:overview}
\end{figure}
\begin{figure}
\epsfxsize = 3.0 in
\centerline{\epsfbox{NetFx.eps}}
\caption{Overview of the NetFx approach}
\label{fig:NetFx}
\end{figure}
Our programming model for network parallel computing environments
exploits functional parallelism among parallel tasks.  Because the
machines in the environment are heterogeneous, tasks can be programmed
using any language or programming paradigm in order to best exploit
the hardware.  The flow of data between tasks as well as the mapping
of each task to the environment is described by a tasking program
written in a specialized tasking language.  The language is currently
being defined.  The tasking compiler combines the tasking program, and
information about the tasks, the environment and standard support
libraries to build an executable for the environment.  The tasking
compiler emits calls to a high performance run--time system customized
for the environment in order to perform data transfers between
tasks. The run--time system handles the low-level details of
communication in a heterogeneous machine and network environment and
can be extended by the tasking compiler, a language compiler or the
programmer in order to specialize certain data transfers or to support
data transfers that are not supported by the standard library.  A task is
free to perform local load balancing, moving data and computations
between the machines allotted to it, but the run--time system is
responsible for global load balancing by changing the machines
allotted to a task.  Figure~\ref{fig:overview} demonstrates how the
system works.

\subsection{NetFx approach}
The NetFx programming model and compiler, described below, implements
a limited subset of our approach.  The programming model limits tasks
to be Fx subroutines and restricts how they can be interconnected.
However, heterogeneity is supported.  The tasking language used is an
extension of Fx parallel sections and and support for the language is
embedded in the Fx compiler.  NetFx uses the run--time system
described above, but no global (or local) load balancing is performed.
Figure~\ref{fig:NetFx} demonstrates how the system works.



\section{Programming model for NetFx}
%
% I changed this to stress that it's an initial implementation and to 
% mention the language restrictions.  There's a real problem in that none
% of us, I think, are really sure what the programming model should look
% for a heterogenous system and with multiple language support.
%
%
The initial implementation of the programming model is an extension of
the Fx ~\cite{GrOS94,SSOG93} compiler system called NetFx.  NetFx
provides support for task and data parallelism but restricts language
support to Fx.  Support for data parallelism is similar to that
provided in High Performance Fortran (HPF) and is described
in~\cite{StOG94,YWSO93}.  Task parallelism is used to map data
parallel subroutines onto different, possibly heterogeneous, groups of
nodes.  We outline how task parallelism is expressed in NetFx and
illustrate how it is used to exploit coarse grain parallelism.

\subsection{Task parallelism}
Task parallelism is expressed solely using compiler directives and no
new language extensions are introduced.  To simplify the
implementation, the current version of Fx relies on the user to
identify the side effects of the task--subroutines and to specify
them. Directives are also used to guide the compiler in mapping the
program which is important for performance and also to ensure that
tasks execute on computers for which the compiled versions are
available.  We describe the directives that are used to express task
parallelism and illustrate their use. We use the FFT--Hist kernel as
an example program. This program takes a sequence of arrays as input,
and for each input array, it performs a 2D FFT by performing a set of
1D FFTs, first on columns and then on the rows, followed by histogram
analysis.

\subsubsection*{Parallel sections}
Calls to task--subroutines are permitted only in special code regions
called {\em parallel sections}, denoted by a {\tt begin parallel/end
parallel} pair. For example, the parallel section for the FFT--Hist
example has the following form:
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
c\$ \>{\bf begin parallel} \\
\>     do i = 1,m \\
\>\>     call colffts(A)\\
c\$\>\>  {\em input/output and mapping directives}\\ \vspace{3mm}
\>\>     call rowffts(A)\\
c\$\>\>  {\em input/output and mapping directives}\\ \vspace{3mm}
\>\>     call hist(A)\\
c\$\>\>  {\em input/output and mapping directives}\\
\>     enddo\\
c\$\>   {\bf end parallel}
\end{tabbing}
\rm
The code inside a parallel section can only contain loops and
subroutine calls.  These restrictions are necessary to make it
possible to manage shared data and shared resources (including
nodes) efficiently. Every task--subroutine call inside a parallel
section corresponds to a (possibly data parallel)
{\em task} that can execute in parallel with other tasks
subject to dependence constraints.

A parallel section introduces a data parallel thread for each
task--subroutine inside it.  Outside the parallel section, the program
executes as a single data parallel thread.  The effect of executing a 
parallel section is the same as if its contents were executed sequentially.

\subsubsection*{Input/output directives}

The user includes {\tt input} and {\tt output} directives to
define the side--effects of a task--subroutine, i.e., the data space
that the subroutine accesses and modifies.  Every variable whose value
at the call site may potentially be used by the called subroutine must
be added to the input parameter list of the task--subroutine.
Similarly, every variable whose value may be modified by the called
subroutine must be included in the output parameter list. The variables
in the input and output parameter lists can be  distributed or replicated
arrays, or scalars.

For example, the input and output directives for the call to {\tt
rowffts} have the form:
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
\>\>     call rowffts(A)\\
c\$\>\>  {\bf input} (A),  {\bf output} (A)\\
c\$\>\>  {\em mapping directives}\\
\end{tabbing}
\rm
This tells the compiler that the subroutine {\tt rowffts} can
potentially use values of, and write to the parameter array {\tt A}.
As another example, the input and output directives for the the call
to {\tt colffts} has the form:
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
\>\>     call colffts(A)\\
c\$\>\>  {\bf output} (A)\\
c\$\>\>  {\em mapping directives}\\
\end{tabbing}
\rm
This tells the compiler that subroutine {\tt colffts} does not use the
value of any parameter that is passed but can potentially write to
array {\tt A}

\subsubsection*{Mapping directives}
%
% This is sorely lacking in detail - again, because we don't know what we 
% need.
%
%
Labeling task--subroutine calls and supplying input/output information
is sufficient for the compiler to generate a correct parallel program.
However, the performance of the program largely depends on how the
task--subroutines are mapped onto the available computation resources
on the network. The possible mappings are constrained by the
availability of data parallel implementations of different
task--subroutines for different architectures, the memory requirements of
the task--subroutines, and the physical locations of devices like
frame buffers.  Ideally NetFx should be able to automatically map the
program for correct execution and best possible performance, but that
is a hard problem and NetFx is not able to do it except in some simple
situations~\cite{SuVo95}. In general, the programmer has to supply the
mapping information.

The mapping information specifies where each of the task--subroutines
should execute. The details of the mapping directives are somewhat
cumbersome and machine specific and we will not describe them here.
For example, when mapping a task--subroutine to a network of
workstations, it may be necessary to specify the names of the
workstations to be used as a node, but when mapping to a part of a
massively parallel machine like the Intel Paragon, it may be
sufficient to specify the number of nodes to be used.

\subsection{Programming coarse grain parallelism}
NetFx allows the programmer to use fine grain data parallelism as well
as coarse grain task parallelism. We illustrate how the task
parallelism directives described above are used to exploit common
forms of coarse grain parallelism.

\subsubsection*{Task pipelines}
In image and signal processing programs , the main computation often consists
of repeatedly executing an acyclic task graph. A simple example of such
a program is the  FFT--Hist example program, which we now present
with directives:
%
% I patched it so it looks like it does something....
%
\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
c\$ \>{\bf begin parallel} \\
\>     do i = 1,m \\
\>\>     call colffts(A)\\
c\$\>\>  {\bf output} (A)\\
\>\>     call rowffts(A)\\
c\$\>\>  {\bf input} (A),  {\bf output} (A)\\
\>\>     call hist(A)\\
c\$\>\>  {\bf input} (A)
\>     enddo\\
c\$\>   {\bf end parallel}
\end{tabbing}
\rm

In FFT--Hist, all data dependences in the program are forward and loop 
independent.  Task--subroutine {\em colffts}, reads a data set from some 
input device, computes, and sends the output data set to {\em rowffts}, 
which receives the data set, computes, and sends the output data set to 
{\em hist}, which computes the final output data set.  This allows an 
earlier stage, say {\em colffts} to process a new data set while {\em 
rowffts} is processing the previous data set, allowing coarse grain 
pipeline parallelism as shown in Figure~\ref{fig:pipe}.
\begin{figure}
\epsfxsize = 4.0 in
\centerline{\epsfbox{pipe.eps}}
\caption{Dependences and coarse grain parallelism in FFT-Hist}
\label{fig:pipe}
\end{figure}
In related work,
we have argued that such parallelism can be exploited efficiently and
profitably for many applications including narrowband tracking radar
and multibaseline stereo~\cite{SOGD94}.

\subsubsection*{Communicating tasks}
In many scientific applications, different subroutines can execute in
parallel, but need to transfer or exchange data between invocations.
For example, two routines {\em fproca} and {\em fprocb} may work on
different parts of a data structure and  exchange boundary
elements between successive invocations. A task parallel program that
expresses this computation is as follows:

\tt
\begin{tabbing}
tttttttt\=ttt\=ttt\= ttt\=ttt\= \kill
c\$ \>{\bf begin parallel} \\
\>     do i = 1,m \\
\>\>     call fproca(A, boundA, boundBx)\\
c\$\>\>  {\bf output} (boundA), {\bf input} (boundBx)\\
\>\>     call fprocb(B, boundB, boundAx)\\
c\$\>\>  {\bf output} (boundB),  {\bf input} (boundAx)\\
\>\>     call ftransfer(boundA, boundB, boundAx, boundBx)\\
c\$\>\>  {\bf input} (boundA, boundB) {\bf output} (boundAx, boundBx)\\
\>     enddo\\
c\$\>   {\bf end parallel}
\end{tabbing}
\rm

We show the input/output directives only for the boundary
elements for the purpose of illustration.
The task dependence graph for this computation is shown in
Figure~\ref{fig:par}.
There is no direct dependence between task--subroutines
{\em fproca} and {\em fprocb}, but they have a loop carried dependence
on {\em ftransfer} due to variables {\em boundAx} and {\em boundBx}
respectively. Task--subroutine {\em ftransfer} has a loop independent
dependence on {\em fproca} and {\em fprocb}, due to variables {\em
boundA} and {\em boundB}. Thus, the iterations of {\em fprocA} and
{\em fprocB} execute in parallel and effectively exchange boundary
elements using task-subroutine
 {\em ftransfer}. The computation schedule is illustrated in
Figure~\ref{fig:par}. 
 Note that the results obtained by task
parallel execution are consistent with sequential execution.
\begin{figure}[htb]
\epsfxsize = 4.0 in
\centerline{\epsfbox{par.eps}}
\caption{Dependences and coarse grain parallelism for a program
         with  data exchange between parallel iterations}
\label{fig:par}
\end{figure}

\subsection{Compilation}
The compiler uses a data flow algorithm to analyze the dependences
caused by input/output variables and precisely determines the data
movement between tasks. The basic paradigm is that the results
obtained must always be consistent with sequential execution.  It then
uses the mapping information to group task--subroutines into {\em
modules}. All task--subroutines inside a module are mapped to the same
set of nodes. A module repeatedly receives input, computes,
and generates output.

The compiler generates calls to the run--time system for communication
between modules, and may also generate address computation functions that the
run--time system uses to translate between data distributions.
Inter--module communication management is done jointly by the compiler
and the run--time system and the details are discussed in the next
section.  The involvement of the run--time system is necessary since
the compiler may not know the mapping of the modules to the available
nodes, the current distribution of data on a remote module, or
the status of the (possibly general purpose) network at run-time.
This information may change unpredictably for a variety of reasons,
including load balancing, the use of non-NetFx task-subroutines, or
other traffic on the network.


%The involvement of the run--time system is necessary since
%the compiler is aware of the communication pattern between modules,
%but may not be aware of the mapping of the modules. This is
%particularly important if the mappings can change dynamically, or if
%some of the modules contain routines not generated by the NetFx
%compiler.

\section{Run--time support for task parallelism}

The main purpose of the run--time system is the efficient transfer of
distributed data sets between modules with few constraints on
processors, networks, languages, modules, or data.  In this section,
we first describe the design challenges of the run--time system and
our approach to meeting them.  Next, we explain the details of
inter--module communication, and the programming interface exported to
the compiler.  This is followed by a discussion of the internal
structure of the run--time system.  Finally, we give an example of how
an intermodule communication is accomplished using the run--time
system.

\subsection{Design Challenges}
\label{sec:deschal}

The main purpose of the run--time system is the efficient transfer of
distributed data sets between modules.  To meet this goal, several
design challenges must be overcome.  These include multiple languages
and programming paradigms, dynamic program and data mappings,
processor and network heterogeneity.  Each of these challenges are
discussed below.

\subsubsection*{Multiple languages and programming paradigms}

Each task in the program may be written in a different parallel
language or paradigm. There are two primary motivations for this.
First, the heterogeneity of the network parallel computing environment
means that no one language or paradigm is ``best'' for programming it.
For example, the environment may include both shared memory and
distributed memory computers.  The second motivation is
multidisciplinary applications.  Each component of a multidisciplinary
application may be best expressed in a particular language or
paradigm.  For example, one component may require sparse matrix
support such as Archimedes'~\cite{ARCHIMEDES}, while another requires
fast dense matrix support such as Fx's, and a third may be best
expressed with nested data parallelism as in NESL~\cite{NESL}.
%
% Maybe a more concrete example of a multidisc. app would be good.
% Quake?
%

Regardless of what the language or paradigm is used to implement the
tasks, it should always be possible to transfer data between two tasks
as long as the {\em abstract} data types are compatible.  For example,
it should be possible to send a vector from an HPF task to a
DOME~\cite{DOME} task.  The challenge is that the implementation
vectors in HPF is very different from that in DOME.  In general, if
there are $n$ implementations of an abstract data type, there are
$n^2$ combinations of implementations that must be supported.
Furthermore, it should be easy to extend the range of implementations.


\subsubsection*{Dynamic program and data mappings}

In many languages and paradigms, the distribution of data can change
dynamically. For example, DOME load balances vectors by shifting
elements between processors.  At another level, the assignment of
processors to modules may change due to global load balancing or to a
support a run--time automatic mapping tool.  The run--time system
should not restrict data transfer under these conditions.


\subsubsection*{Processor and network heterogeneity}

Processor and network heterogeneity make efficient communication
difficult.  There are several reasons for this.  First, processor
heterogeneity may require that data representation be changed.
Communication systems such as PVM~\cite{PVMBOOK} deal with this by
converting to and from the XDR format, but this usually
means additional copy steps.  The second reason is that routing is
complicated by an essentially arbitrary topology and many different
link types and access mechanisms.  A common solution is to rely on
internet routing~\cite{IPROUTE}, but this mechanism cannot make
use of the parallel nature of inter--module communications because
there is no notion of a parallel connection in TCP/IP.  Furthermore,
the TCP/IP stack for a high speed network adaptor may be significantly
less efficient than the native stack.  Finally, MPPs typically do not
support TCP/IP internally, resulting in a store--and--forward stop at
an interface node, which increases latency.


\subsection{Meeting the design challenges}

Our approach to the design challenges described in
section~\ref{sec:deschal} has three components: a standard run--time
system interface, run--time system extensibility, and run--time system
modularity.

\subsubsection*{Standard run--time system interface}

The run--time system exports a standard interface to the program
generator.  The main abstraction is that of communicating distributed
data sets between modules.


\subsubsection*{Run--time system extensibility}

To support multiple languages and paradigms, we provide a standard
method for a compiler or programmer to extend the run--time system to
support new combinations of data distributions.  This is similar to
how a generic sort routine allows an application to pass in
\verb.compare.  and \verb.swap. functions for a particular data type, but
on a larger scale.  This mechanism also provides a way for compile--time
knowledge to be used by the run--time system for higher performance.
 

\subsubsection*{Run--time system modularity}

The run--time system is composed of three subsystems with clearly
defined interfaces between them.  This allows each subsystem to be
developed and optimized indepently of the others.  The idea is that
the communication subsystem, which actually performs (parallel) data
transfers, is supplied with client subsystems that handle the details
of mapping between data distributions and dynamic program and data
mappings.  This segregation of functionality opens the development of
communication subsystems to the networking community.  Conversely, the
development of distribution mapping methods is simplified because 
communication is not part of the picture.  

\subsection{Details of Inter--module communication}

The goal of inter--module communication is to transport a distributed
data set from a source module to a destination module.  The elements
of the data set are partitioned across the processes of a module
according to some rule.  This rule is identified by the data set's
distribution type and distribution.  A {\em distribution type}
identifies a class of similar rules, while a {\em distribution}
identifies a specific member of the class.  For example, the
distribution type of $HPF\_ARRAY$ includes the distribution
$(*,BLOCK)$.  The run--time system assumes that the distribution type
of the data set on each module is constant throughout the execution of
the program, however its distribution may change.  If the distribution
can change, we say that its {\em binding} is {\em dynamic}, otherwise
it is {\em static}.  The initial value of both kinds of bindings are
supplied at compile time.

To send the data set from a source module to a destination module, it
is necessary to be able to translate between the distributions at the
two end--points.  It is only possible to translate between
distributions whose global structures are compatible.  For example, if
the source global structure is an array, then the destination global
structure must be an array of the same size and shape.  The two
end--point distribution types identify a method for translating
between the two member distributions, a process called {\em address
computation}.  Given two distributions of the appropriate type and a
process pair, the method, or {\em address computation function},
returns an {\em offset relation} between offsets into the chunk of
elements owned by each process.  This architecture--independent
relation can be trivially converted to an architecture--dependent {\em
address relation}, which maps memory addresses on one machine to
memory addresses on the other.

The address computation function operates under the abstraction that a
data set's elements are distributed over a consecutively numbered
group of processes of the module.  Once an address relation has been
computed for each pair of processes, the abstract process numbers must
be mapped to the actual nodes of the machines on the network.  The
properties of these module bindings are similar to those of the
distribution bindings discussed above.  Specifically, the bindings may
be static or dynamic.  Once the mappings are known, actual data
transfer can begin.


\subsection{Run--time System Interface}
This section describes the programming interface between the program
generator and the run--time system.  It is a two way interface: the
programmer generator emits calls to the run--time system, and the
run--time system may itself call procedures produced by the program
generator. The interface has three components as shown in Figure
\ref{fig:interface}.  The first two of these are exported by the run--time
system, while the last is exported by the program itself.

\subsubsection*{Inter--module communication interface}
This interface consists of simple, high--level, non--blocking
collective send and receive calls for transporting whole distributed
data sets between modules.  It is required that if more than one data
set is transferred between a pair of modules, the order of the sends
and receives on the modules be the same.  The arguments of the send
and receive calls are
\begin{itemize}
\item Target (send) or source (receive) module 
\item Address of first element local to the the calling process
\item Data type of each element
\item Sender distribution type and distribution
\item Receiver distribution type and distribution
\item Hint (implementation specific)
\end{itemize}
There is a single call to wait for all outstanding receives to complete
which allows a module to complete input communication before executing.
%I though this was implicit...

\subsubsection*{Registry interface}

Before performing sends and receives, the program must describe the
characteristics of relevant modules, distribution types, and custom
address computation functions to the run--time system using the
registry interface.  The program explicitly registers the initial
module mapping and whether it is static or dynamic.  Registry of
distribution type is required if the distribution type is not
library-supported, or if the distribution is dynamic.
%  To use a predefined distribution type that is 
%dynamic, the program must register a dynamic instance of it. 
Dynamic bindings are updated implicitly by examining send and receive
calls.  There are several reasons why a program may want to register
its own distribution types.  One reason is extensibility --- the
distribution type may not be supported by the library.  Another is
efficiency --- the compiler may have been able to generate a
specialized address computation function for a particular
communication.  In either case, the program must also register custom
address computation functions for any communication that involves the
new distribution type.  For example, suppose module 1 communicates
from a new distribution type $DTX$ to predefined types $DT2$ on module
2 and $DT3$ on module 3.  All three modules must register distribution
type $DTX$.  Module 1 must register new address computation functions
$\lambda (DTX,DT2)$ and $\lambda (DTX,DT3)$, module 2 must register
$\lambda (DTX,DT2)$, and module 3 must register $\lambda (DTX,DT3)$.

\subsubsection*{Standard address computation function interface}
All address computation functions must conform to this interface.
Each address computation function must be able to both return offset
relations or assemble/disassemble message buffers.  The run--time
system decides which functionality is used.

\subsection{Internal structure}

\begin{figure}
\centerline{\epsfxsize=4in \epsfbox{Int1.eps}}
\caption{The run--time system: subsystems, interfaces, and
relation to the NetFx Executable}
\label{fig:interface}
\end{figure}

%\caption[]{he  run--time system: subsystems, interfaces, and 
%relation to the  NetFx Executable.  Each arrow represents an interface 
%and points from the caller of the interface to the callee.  
%Note that some callees have several callers.}

The run--time system presents the abstraction of
``module--to--module'' communication of a distributed data set between
uniquely labeled modules.  This requires address computation, binding
maintenance, and efficient communication over heterogeneous networks.
These tasks are segregated into three subsystems (distribution,
binding, and communication) with well defined interfaces between them.
More than just good design practice, the ability to independently
develop and enhance each subsystem is vital for extensibility and good
performance.  Clearly, the number of distribution types will grow over
time as more languages and programming paradigms are developed and
supported.  Further, compile--time optimization of address computation
is important for performance \cite{StOG94}.  There are a variety of
mechanisms that could be used to support dynamic bindings and it is
unclear that any one is best for all environments.  Finally, to
achieve high performance, it will be necessary to specialize the
communication system for different networks.  By clearly defining the
interfaces between subsystems, we make this specialization possible
and easy.  Indeed, all the subsystems are ``plug--and--play.''  The
internals of the run--time system are shown in Figure
\ref{fig:interface}.  

\subsubsection*{Communication subsystem}
The communication subsystem is the heart of the run--time system and
exports the previously discussed inter--module communication interface
to the program.  The subsystem drives the run--time system by making
use of the binding subsystem to resolve module and distribution
bindings and the distribution subsystem to compute address relations.
In return, it supplies a simple {\em binding communication interface}
to the binding subsystem.  The communication subsystem performs all
actual data transfer.

\subsubsection*{Binding subsystem}
The binding subsystem resolves module, distribution, and address
computation function bindings for the communication subsystem and the
distribution subsystem through the {\em binding interface.}  It uses
the communication subsystem's binding communication interface if it
needs to perform an inter--module communication step to to determine a
binding (for example, to bind the distribution of an irregular data
structure).  The binding subsystem supplies the registry interface to
the executable.

\subsubsection*{Distribution subsystem}
The distribution subsystem translates between sender and receiver
distributions.  It supplies the {\em address computation interface} to
the communication system for this purpose.  The binding subsystem is
used to bind two distribution types and distributions to the
appropriate address computation function.  This function may be one of
the {\em library address computation functions} linked to every NetFx
Executable, or a compiler--generated {\em custom address computation
function}.  In every case, the address computation function must
conform to the standard address computation function interface,
discussed earlier.  After finding the appropriate address computation
function, it dispatches it, and conforms the resulting offset relation
to the format desired by the communication subsystem and converts it
to an address relation.  (The communication system can request that a
message be assembled or disassembled instead.)

\subsection{Example}

\begin{figure}
        \centerline{\epsfxsize=4in \epsfbox{useint.eps}}
	\caption{Example of the steps followed for an inter--module send using a 
	custom address computation function.}
	\protect\label{fig:steps}
\end{figure}

We illustrate how an inter--module send using a custom address computation 
function is performed.  Figure \ref{fig:steps} shows the steps graphically.  
On the right are the actual calls performed by the node program and those 
performed internally for the inter--module send.  The calls are numbered to 
correspond to the interfaces used and to the steps discussed in this 
section.  There is a dependence from module $M1$ to module $M2$ and the 
compiler has generated a send from $M1$ to $M2$ (and a matching receive in 
$M2$, which is not discussed here.) The compiler also produced a custom 
address computation function $DISTFUNC$ for this communication.  At run 
time, $M1$'s first step is to use the registry interface to register the 
initial values of all relevant bindings.  The module bindings for $M1$ and 
$M2$ are registered as static, as are the distribution types $DT1$ and 
$DT2$ .  Finally, $M1$ registers $ADXFUNC$ as the address computation 
function for $DT1$ and $DT2$.

The second step (which may be repeated many times) is to invoke a send of 
the distributed data set $A$ to $M2$.  The inter--module send call includes 
the target module number, $M2$, the first element of $A$ that is local to 
the process, the data type of $A$'s elements (REAL), the sender and 
receiver distribution types, $DT1$ and $DT2$, and the current sender and 
receiver distributions, $DISTA1$ and $DISTA2$.  The third step is to bind 
the two modules to their current physical mappings, and the distribution 
types and distributions to their current distributions.  Both of these 
steps are simple in this case because the bindings are static --- they 
amount to local lookups.  In the fourth step, the communication subsystem 
calls the distribution subsystem to build an address relation between each 
pair of sender and receiver processes $P1$ and $P2$.  The distribution 
subsystem accomplishes this by binding the two distribution types, $DT1$ 
and $DT2$, to their corresponding address computation function, $ADXFUNC$ (step 
5) and calling the function for each pair (step 6).  The offset relations 
returned are conformed to the format the communication subsystem wanted and 
converted to address relations.  Finally, the relations are used to perform 
the actual communication in a environment--dependent manner.  The 
communication subsystem may decide to keep the address relations handy to 
avoid steps 4--6 the next time the inter--module send occurs.


\section{Status and implementation}

The Fx compiler currently supports task parallelism on the Intel iWarp, 
Intel Paragon, and on DEC Alpha workstations running PVM.  Compiler 
analysis of parallel sections (although not mapping directive support or 
code emission) is the same for each target.  The run--time system used on 
the Paragon and the Alphas was initially developed for the Alphas with an 
eye towards portability and serves as a prototype for the system discussed 
above.  The prototype essentially has three components: a component that 
computes offset relations for any two (static) HPF distributions, a 
component that conforms these relations to a particular format and caches 
them, and a component that does communication using the relations.  Porting 
this run--time system to the Paragon (and achieving good performance) 
required rewriting only the communication component.  Further details of 
the library are available in~\cite{DiOh95}.  

The main target network for is Gigabit Nectar~\cite{ABCK89} developed
at Carnegie Mellon. A variety of communication interfaces have been
developed for Nectar and we are incorporating them in. Currently
we are able to execute homogeneous Fx programs across Alpha workstations
using Nectar as well as simple heterogeneous Fx programs on Alphas and 
Suns .  The full heterogeneous run--time system described above is currently
under development.


\section{Related work}

High level run--time support for communicating data parallel tasks is a 
fairly new research area.  We compare our approach to two other run--time 
libraries that support communicating data parallel tasks.  
Opus~\cite{HHMV95} supports communication between tasks using {\em shared 
data abstractions} (SDAs).  An SDA is essentially an independent object 
class whose private data is distributed over some set of threads and whose 
methods are protected with monitor semantics.  For example, one could 
define a queue SDA whose elements are block--distributed vectors.  To send 
a vector to another task, it is first sent to an SDA queue object (which 
requires a address computation) using the enqueue method.  The 
receiving task would invoke the dequeue method which would cause the SDA 
queue object to send a data set (again requiring address computation 
computation).  SDAs can support many more tasking paradigms than our 
scheme, but appears to significantly increase the communication volume and 
result in unnecessary synchronization between tasks due to monitor 
semantics.

Closer to our scheme is the library described in~\cite{ACFK95}.  This 
library supports communication between data parallel tasks with collective 
sends and receives.  Sends (and receives) are split between an 
initialization step which performs address computation and builds 
communication schedules, and an execution step that actually performs the 
send.  Initializations synchronize sending and receiving tasks, but can be 
amortized over many executions.  Only static module bindings and static HPF 
distributions are supported.  All address computation is done with general 
purpose library routines.

\section{Summary and conclusions}
We address programming support for high performance networks which are 
emerging as an increasingly important form of parallel computing.  We have 
presented NetFx, a compiler system that supports network parallel 
computing.  The NetFx programming model is based on integrated support for 
task and data parallelism originally developed for the Fx compiler system.  
We also presented a run--time system for network parallel computing which 
presents a simple abstraction for module-to-module transfer of distributed 
data sets so that the compiler does not have to deal with the dynamic 
nature of the network computing environment.  The run--time system allows 
the compiler and other tools that generate foreign data parallel modules to 
customize execution behavior.  A clean interface between the compiler and 
the run--time system also allows independent development and retargetting 
of both.  We believe that NetFx is the first tool to provide practical 
programming support for networks and will make network computing easier for 
non-specialists.


\bibliography{/afs/cs/project/iwarp/member/jass/bib/all,/afs/cs/project/iwarp/member/fx/bib/texbib}
\end{document}
