Thursday, April 16, 2020

HPC Parallel Algorithm Models

Parallel Algorithm Models

Having discussed the techniques for decomposition, mapping, and minimizing interaction overheads, we now present some of the commonly used parallel algorithm models. An algorithm model is typically a way of structuring a parallel algorithm by selecting a decomposition and mapping technique and applying the appropriate strategy to minimize interactions.

3.6.1 The Data-Parallel Model
The data-parallel model is one of the simplest algorithm models. In this model, the tasks are statically or semi-statically mapped onto processes and each task performs similar operations on different data. This type of parallelism that is a result of identical operations being applied concurrently on different data items is called data parallelism. The work may be done in phases and the data operated upon in different phases may be different. Typically, data-parallel  computation phases are interspersed with interactions to synchronize the tasks or to get fresh data to the tasks. Since all tasks perform similar computations, the decomposition of the problem into tasks is usually based on data partitioning because a uniform partitioning of data followed by a static mapping is sufficient to guarantee load balance. Data-parallel algorithms can be implemented in both shared-address-space and message passing paradigms. However, the partitioned address-space in a message-passing paradigm may allow better control of placement, and thus may offer a better handle on locality. On the other hand, shared-address space can ease the programming effort, especially if the distribution of data is different in different phases of the algorithm. Interaction overheads in the data-parallel model can be minimized by choosing a locality preserving decomposition and, if applicable, by overlapping computation and interaction and by using optimized collective interaction routines. A key characteristic of data-parallel problems is that for most problems, the degree of data parallelism increases with the size of the problem, making it possible to use more processes to effectively solve larger problems. An example of a data-parallel algorithm is dense matrix multiplication described in Section 3.1.1. In the decomposition shown in Figure 3.10, all tasks are identical; they are applied to different data.

3.6.2 The Task Graph Model
As discussed in Section 3.1, the computations in any parallel algorithm can be viewed as a task dependency graph. The task-dependency graph may be either trivial, as in the case of matrix multiplication, or nontrivial (Problem 3.5). However, in certain parallel algorithms, the task dependency graph is explicitly used in mapping. In the task graph model, the interrelationships among the tasks are utilized to promote locality or to reduce interaction costs. This model is typically employed to solve problems in which the amount of data associated with the tasks is large relative to the amount of computation associated with them. Usually, tasks are mapped statically to help optimize the cost of data movement among tasks. Sometimes a decentralized dynamic mapping may be used, but even then, the mapping uses the information about the task-dependency graph structure and the interaction pattern of tasks to minimize interaction overhead. Work is more easily shared in paradigms with globally addressable space, but mechanisms are available to share work in disjoint address space. Typical interaction-reducing techniques applicable to this model include reducing the volume and frequency of interaction by promoting locality while mapping the tasks based on the interaction pattern of tasks, and using asynchronous interaction methods to overlap the interaction with computation. Examples of algorithms based on the task graph model include parallel quicksort (Section 9.4.1), sparse matrix factorization, and many parallel algorithms derived via divide-andconquer decomposition. This type of parallelism that is naturally expressed by independent tasks in a task-dependency graph is called task parallelism. 

3.6.3 The Work Pool Model
The work pool or the task pool model is characterized by a dynamic mapping of tasks onto processes for load balancing in which any task may potentially be performed by any process. There is no desired premapping of tasks onto processes. The mapping may be centralized or decentralized. Pointers to the tasks may be stored in a physically shared list, priority queue, hash table, or tree, or they could be stored in a physically distributed data structure. The work may be statically available in the beginning, or could be dynamically generated; i.e., the processes may generate work and add it to the global (possibly distributed) work pool. If the work is generated dynamically and a decentralized mapping is used, then a termination detection algorithm (Section 11.4.4) would be required so that all processes can actually detect the completion of the entire program (i.e., exhaustion of all potential tasks) and stop looking for more work. In the message-passing paradigm, the work pool model is typically used when the amount of data associated with tasks is relatively small compared to the computation associated with the tasks. As a result, tasks can be readily moved around without causing too much data interaction overhead. The granularity of the tasks can be adjusted to attain the desired level of trade off between load-imbalance and the overhead of accessing the work pool for adding and extracting tasks.
Parallelization of loops by chunk scheduling (Section 3.4.2) or related methods is an example of the use of the work pool model with centralized mapping when the tasks are statically available. Parallel tree search where the work is represented by a centralized or distributed data structure is an example of the use of the work pool model where the tasks are generated dynamically. 

3.6.4 The Master-Slave Model
In the master-slave or the manager-worker model, one or more master processes generate work and allocate it to worker processes. The tasks may be allocated a priori if the manager can estimate the size of the tasks or if a random mapping can do an adequate job of load balancing. In another scenario, workers are assigned smaller pieces of work at different times. The latter scheme is preferred if it is time consuming for the master to generate work and hence it is not desirable to make all workers wait until the master has generated all work pieces. In some cases, work may need to be performed in phases, and work in each phase must finish before work in the next phases can be generated. In this case, the manager may cause all workers to synchronize after each phase. Usually, there is no desired premapping of work to processes, and any worker can do any job assigned to it. The manager-worker model can be generalized to the hierarchical or multi-level manager-worker model in which the top-level manager feeds large chunks of tasks to second-level managers, who further subdivide the tasks among their own workers and may perform part of the work themselves. This model is generally equally suitable to shared-address-space or message-passing paradigms since the interaction is naturally two-way; i.e., the manager knows that it needs to give out work and workers know that they need to get work from the manager. While using the master-slave model, care should be taken to ensure that the master does not become a bottleneck, which may happen if the tasks are too small (or the workers are relatively fast). The granularity of tasks should be chosen such that the cost of doing work dominates the cost of transferring work and the cost of synchronization. Asynchronous interaction may help overlap interaction and the computation associated with work generation by the master. It may also reduce waiting times if the nature of requests from workers is non-deterministic. 

3.6.5 The Pipeline or Producer-Consumer Model
In the pipeline model, a stream of data is passed on through a succession of processes, each of which perform some task on it. This simultaneous execution of different programs on a data stream is called stream parallelism. With the exception of the process initiating the pipeline, the arrival of new data triggers the execution of a new task by a process in the pipeline. The processes could form such pipelines in the shape of linear or multidimensional arrays, trees, or general graphs with or without cycles. A pipeline is a chain of producers and consumers. Each process in the pipeline can be viewed as a consumer of a sequence of data items for the process preceding it in the pipeline and as a producer of data for the process following it in the pipeline. The pipeline does not need to be a linear chain; it can be a directed graph. The pipeline model usually involves a static mapping of tasks onto processes. Load balancing is a function of task granularity. The larger the granularity, the longer it takes to fill up the pipeline, i.e. for the trigger produced by the first process in the chain to propagate to the last process, thereby keeping some of the processes waiting. However, too fine a granularity may increase interaction overheads because processes will need to interact to receive fresh data after smaller pieces of computation. The most common interaction reduction technique applicable to this model is overlapping interaction with computation. An example of a two-dimensional pipeline is the parallel LU factorization algorithm, which is discussed in detail in Section 8.3.1.

3.6.6 Hybrid Models
In some cases, more than one model may be applicable to the problem at hand, resulting in a hybrid algorithm model. A hybrid model may be composed either of multiple models applied hierarchically or multiple models applied sequentially to different phases of a parallel algorithm. In some cases, an algorithm formulation may have characteristics of more than one algorithm model. For instance, data may flow in a pipelined manner in a pattern guided by a task dependency graph. In another scenario, the major computation may be described by a task dependency graph, but each node of the graph may represent a supertask comprising multiple subtasks that may be suitable for data-parallel or pipelined parallelism. Parallel quicksort (Sections 3.2.5 and 9.4.1) is one of the applications for which a hybrid model is ideally suited.

With Thanks to Ananth Grama, Anshul Gupta, George Karypis, Vipin Kumar
Reference
Content is taken from
Introduction to Parallel Computing, Second Edition
By Ananth Grama, Anshul Gupta, George Karypis, Vipin Kumar
Publisher: Addison Wesley
Pub Date: January 16, 2003
ISBN: 0-201-64865-2
Buy this book
https://books.google.co.in/books/about/Introduction_to_Parallel_Computing.html?id=B3jR2EhdZaMC

Thursday, April 9, 2020

HPC Methods for Containing Interaction Overheads

As noted earlier, reducing the interaction overhead among concurrent tasks is important for an efficient parallel program. The overhead that a parallel program incurs due to interaction among its processes depends on many factors, such as the volume of data exchanged during interactions, the frequency of interaction, the spatial and temporal pattern of interactions, etc. In this section, we will discuss some general techniques that can be used to reduce the interaction overheads incurred by parallel programs. These techniques manipulate one or more of the three factors above in order to reduce the interaction overhead. Some of these are applicable while devising the decomposition and mapping schemes for the algorithms and some are applicable while programming the algorithm in a given paradigm. All techniques may not be applicable in all parallel programming paradigms and some of them may require support from the underlying hardware.
3.5.1 Maximizing Data Locality
In most nontrivial parallel programs, the tasks executed by different processes require access to some common data. For example, in sparse matrix-vector multiplication y = Ab, in which tasks correspond to computing individual elements of vector y (Figure 3.6), all elements of the input vector b need to be accessed by multiple tasks. In addition to sharing the original input data, interaction may result if processes require data generated by other processes. The interaction overheads can be reduced by using techniques that promote the use of local data or data that have been recently fetched. Data locality enhancing techniques encompass a wide range of schemes that try to minimize the volume of nonlocal data that are accessed, maximize the reuse of recently accessed data, and minimize the frequency of accesses. In many cases, these schemes are similar in nature to the data reuse optimizations often performed in modern cache based microprocessors. Minimize Volume of Data-Exchange A fundamental technique for reducing the interaction overhead is to minimize the overall volume of shared data that needs to be accessed by concurrent processes. This is akin to maximizing the temporal data locality, i.e., making as many of the consecutive references to the same data as possible. Clearly, performing as much of the computation as possible using locally available data obviates the need for bringing in more data into local memory or cache for a process to perform its tasks. As discussed previously, one way of achieving this is by using appropriate decomposition and mapping schemes. For example, in the case of matrix multiplication, we saw that by using a two dimensional mapping of the computations to the processes we were able to reduce the amount of shared data (i.e., matrices A and B) that needs to be accessed by each task to as opposed to n2/p + n2 required by a one-dimensional mapping (Figure 3.26). In general, using higher dimensional distribution often helps in reducing the volume of nonlocal data that needs to be accessed. Another way of decreasing the amount of shared data that are accessed by multiple processes is to use local data to store intermediate results, and perform the shared data access to only place the final results of the computation. For example, consider computing the dot product of two vectors of length n in parallel such that each of the p tasks multiplies n/p pairs of elements. Rather than adding each individual product of a pair of numbers to the final result, each task can first create a partial dot product of its assigned portion of the vectors of length n/p in its own local location, and only access the final shared location once to add this partial result. This will reduce the number of accesses to the shared location where the result is stored to p from n. Minimize Frequency of Interactions Minimizing interaction frequency is important in reducing the interaction overheads in parallel programs because there is a relatively high startup cost associated with each interaction on many architectures. Interaction frequency can be reduced by restructuring the algorithm such that shared data are accessed and used in large pieces. Thus, by amortizing the startup cost over large accesses, we can reduce the overall interaction overhead, even if such restructuring does not necessarily reduce the overall volume of shared data that need to be accessed. This is akin to increasing the spatial locality of data access, i.e., ensuring the proximity of consecutively accessed data locations. On a shared address-space architecture, each time a word is accessed, an entire cache line containing many
words is fetched. If the program is structured to have spatial locality, then fewer cache lines are accessed. On a message-passing system, spatial locality leads to fewer message-transfers over the network because each message can transfer larger amounts of useful data. The number of messages can sometimes be reduced further on a message-passing system by combining messages between the same source-destination pair into larger messages if the interaction pattern permits and if the data for multiple messages are available at the same time, albeit in separate data structures. Sparse matrix-vector multiplication is a problem whose parallel formulation can use this technique to reduce interaction overhead. In typical applications, repeated sparse matrix-vector multiplication is performed with matrices of the same nonzero pattern but different numerical nonzero values. While solving this problem in parallel, a process interacts with others to access elements of the input vector that it may need for its local computation. Through a one-time scanning of the nonzero pattern of the rows of the sparse matrix that a process is responsible for, it can determine exactly which elements of the input vector it needs and from which processes. Then, before starting each multiplication, a process can first collect all the non local entries of the input vector that it requires, and then perform an interaction-free multiplication. This strategy is far superior than trying to access a nonlocal element of the input vector as and when required in the computation.

3.5.2 Minimizing Contention and Hot Spots

Our discussion so far has been largely focused on reducing interaction overheads by directly or indirectly reducing the frequency and volume of data transfers. However, the data-access and inter-task interaction patterns can often lead to contention that can increase the overall interaction overhead. In general, contention occurs when multiple tasks try to access the same resources concurrently. Multiple simultaneous transmissions of data over the same interconnection link, multiple simultaneous accesses to the same memory block, or multiple processes sending messages to the same process at the same time, can all lead to contention. This is because only one of the multiple operations can proceed at a time and the others are queued and proceed sequentially. Consider the problem of multiplying two matrices C = AB, using the two-dimensional partitioning shown in Figure 3.26(b). Let p be the number of tasks with a one-to-one mapping of tasks onto processes. Let each task be responsible for computing a unique Ci,j, for . The straightforward way of performing this computation is for Ci,j to be computed according to the following formula (written in matrix-block notation):

Looking at the memory access patterns of the above equation, we see that at any one of the steps, tasks will be accessing the same block of A and B. In particular, all the tasks that work on the same row of C will be accessing the same block of A. For example, all processes computing will attempt to read A0,0 at once. Similarly, all the tasks working on the same column of C will be accessing the same block of B. The need to concurrently access these blocks of matrices A and B will create contention on both NUMA shared-address-space and message-passing parallel architectures. One way of reducing contention is to redesign the parallel algorithm to access data in contention-free patterns. For the matrix multiplication algorithm, this contention can be eliminated by modifying the order in which the block multiplications are performed in Equation 3.1. A contention-free way of performing these block-multiplications is to compute Ci,j by using the formula


3.5.3 Overlapping Computations with Interactions
The amount of time that processes spend waiting for shared data to arrive or to receive additional work after an interaction has been initiated can be reduced, often substantially, by doing some useful computations during this waiting time. There are a number of techniques that can be used to overlap computations with interactions. A simple way of overlapping is to initiate an interaction early enough so that it is completed before it is needed for computation. To achieve this, we must be able to identify computations that can be performed before the interaction and do not depend on it. Then the parallel program must be structured to initiate the interaction at an earlier point in the execution than it is needed in the original algorithm. Typically, this is possible if the interaction pattern is spatially and temporally static (and therefore, predictable) or if multiple tasks that are ready for execution are available on the same process so that if one blocks to wait for an interaction to complete, the process can work on another task. The reader should note that by increasing the number of parallel tasks to promote computation-interaction overlap, we are reducing the granularity of the tasks, which in general tends to increase overheads. Therefore, this technique must be used judiciously. In certain dynamic mapping schemes, as soon as a process runs out of work, it requests and gets additional work from another process. It then waits for the request to be serviced. If the process can anticipate that it is going to run out of work and initiate a work transfer interaction in advance, then it may continue towards finishing the tasks at hand while the request for more work is being serviced. Depending on the problem, estimating the amount of remaining work may be easy or hard.
In most cases, overlapping computations with interaction requires support from the programming paradigm, the operating system, and the hardware. The programming paradigm must provide a mechanism to allow interactions and computations to proceed concurrently. This mechanism should be supported by the underlying hardware. Disjoint address-space paradigms and architectures usually provide this support via non-blocking message passing primitives. The programming paradigm provides functions for sending and receiving messages that return control to the user's program before they have actually completed. Thus, the program can use these primitives to initiate the interactions, and then proceed with the computations. If the hardware permits computation to proceed concurrently with message transfers, then the interaction overhead can be reduced significantly.
On a shared-address-space architecture, the overlapping of computations and interaction is often assisted by prefetching hardware. In this case, an access to shared data is nothing more than a regular load or store instruction. The prefetch hardware can anticipate the memory addresses that will need to be accessed in the immediate future, and can initiate the access in advance of when they are needed. In the absence of prefetching hardware, the same effect can be achieved by a compiler that detects the access pattern and places pseudo-references to certain key memory locations before these locations are actually utilized by the computation. The degree of success of this scheme is dependent upon the available structure in the program that can be inferred by the prefetch hardware and by the degree of independence with which the prefetch hardware can function while computation is in progress.
3.5.4 Replicating Data or Computations
Replication of data or computations is another technique that may be useful in reducing interaction overheads. In some parallel algorithms, multiple processes may require frequent read-only access to
shared data structure, such as a hash-table, in an irregular pattern. Unless the additional memory requirements are prohibitive, it may be best in a situation like this to replicate a copy of the shared data structure on each process so that after the initial interaction during replication, all subsequent accesses to this data structure are free of any interaction overhead. In the shared-address-space paradigm, replication of frequently accessed read-only data is often affected by the caches without explicit programmer intervention. Explicit data replication is particularly suited for architectures and programming paradigms in which read-only access to shared data is significantly more expensive or harder to express than local data accesses. Therefore, the message-passing programming paradigm benefits the most from data replication, which may reduce interaction overhead and also significantly simplify the writing of the parallel program. Data replication, however, does not come without its own cost. Data replication increases the memory requirements of a parallel program. The aggregate amount of memory required to store the replicated data increases linearly with the number of concurrent processes. This may limit the size of the problem that can be solved on a given parallel computer. For this reason, data replication must be used selectively to replicate relatively small amounts of data. In addition to input data, the processes in a parallel program often share intermediate results.
In some situations, it may be more cost-effective for a process to compute these intermediate results than to get them from another process that generates them. In such situations, interaction overhead can be traded for replicated computation. For example, while performing the Fast Fourier Transform (see Section 13.2.3 for more details), on an N-point series, N distinct powers of w or "twiddle factors" are computed and used at various points in the computation. In a parallel implementation of FFT, different processes require overlapping subsets of these N twiddle factors. In a message-passing paradigm, it is best for each process to locally compute all the twiddle factors it needs. Although the parallel algorithm may perform many more twiddle factor computations than the serial algorithm, it may still be faster than sharing the twiddle factors.
3.5.5 Using Optimized Collective Interaction Operations
As discussed in Section 3.3.2, often the interaction patterns among concurrent activities are static and regular. A class of such static and regular interaction patterns are those that are performed by groups of tasks, and they are used to achieve regular data accesses or to perform certain type of computations on distributed data. A number of key such collective interaction operations have been identified that appear frequently in many parallel algorithms. Broadcasting some data to all the processes or adding up numbers, each belonging to a different process, are examples of such collective operations. The collective data-sharing operations can be classified into three categories. The first category contains operations that are used by the tasks to access data, the second category of operations are used to perform some communication-intensive computations, and finally, the third category is used for synchronization. Highly optimized implementations of these collective operations have been developed that minimize the overheads due to data transfer as well as contention. Chapter 4 describes
algorithms for implementing some of the commonly used collective interaction operations. Optimized implementations of these operations are available in library form from the vendors of most parallel computers, e.g., MPI (message passing interface). As a result, the algorithm designer does not need to think about how these operations are implemented and needs to focus only on the functionality achieved by these operations. However, as discussed in Section 3.5.6, sometimes the interaction pattern may make it worthwhile for the parallel programmer to implement one's own collective communication procedure. 
3.5.6 Overlapping Interactions with Other Interactions
If the data-transfer capacity of the underlying hardware permits, then overlapping interactions between multiple pairs of processes can reduce the effective volume of communication. As an example of overlapping interactions, consider the commonly used collective communication operation of one-to-all broadcast in a message-passing paradigm with four processes P0, P1, P2, and P3. A commonly used algorithm to broadcast some data from P0 to all other processes works as follows. In the first step, P0 sends the data to P2. In the second step, P0 sends the data to P1, and concurrently, P2 sends the same data that it had received from P0 to P3. The entire operation is thus complete in two steps because the two interactions of the second step require only one time step. This operation is illustrated in Figure 3.41(a). On the other hand, a naive broadcast algorithm would send the data from P0 to P1 to P2 to P3, thereby consuming three steps as illustrated in Figure 3.41(b).


Interestingly, however, there are situations when the naive broadcast algorithm shown in Figure 3.41(b) may be adapted to actually increase the amount of overlap. Assume that a parallel algorithm needs to broadcast four data structures one after the other. The entire interaction would require eight steps using the first two-step broadcast algorithm. However, using the naive algorithm accomplishes the interaction in only six steps as shown in Figure 3.41(c). In the first step, P0 sends the first message to P1. In the second step P0 sends the second message to P1 while P1 simultaneously sends the first message to P2. In the third step, P0 sends the third message to P1, P1 sends the second message to P2, and P2 sends the first message to P3. Proceeding similarly in a pipelined fashion, the last of the four messages is sent out of P0 after four steps and reaches P3 in six. Since this method is rather expensive for a single broadcast operation, it is unlikely to be included in a collective communication library. However, the programmer must infer from the interaction pattern of the algorithm that in this scenario, it is better to make an exception to the suggestion of Section 3.5.5 and write your own collective communication function.


With Thanks to Ananth Grama, Anshul Gupta, George Karypis, Vipin Kumar
Reference
Content is taken from
Introduction to Parallel Computing, Second Edition
By Ananth Grama, Anshul Gupta, George Karypis, Vipin Kumar
Publisher: Addison Wesley
Pub Date: January 16, 2003
ISBN: 0-201-64865-2
Buy this book
https://books.google.co.in/books/about/Introduction_to_Parallel_Computing.html?id=B3jR2EhdZaMC

Tuesday, April 7, 2020

HPC Mapping Techniques for Load Balancing

Once a computation has been decomposed into tasks, these tasks are mapped onto processes with the objective that all tasks complete in the shortest amount of elapsed time. In order to achieve a small execution time, the overheads of executing the tasks in parallel must be minimized. For a given decomposition, there are two key sources of overhead. The time spent in inter-process interaction is one source of overhead. Another important source of overhead is the time that some processes may spend being idle. Some processes can be idle even before the overall computation is finished for a variety of reasons. Uneven load distribution may cause some processes to finish earlier than others. At times, all the unfinished tasks mapped onto a process may be waiting for tasks mapped onto other processes to finish in order to satisfy the constraints imposed by the task-dependency graph. Both interaction and idling are often a function of mapping. Therefore, a good mapping of tasks onto processes must strive to achieve the twin objectives of (1) reducing the amount of time processes spend in interacting with each other, and (2) reducing the total amount of time some processes are idle while the others are engaged in performing some tasks.These two objectives often conflict with each other. For example, the objective of minimizing the interactions can be easily achieved by assigning sets of tasks that need to interact with each other onto the same process. In most cases, such a mapping will result in a highly unbalanced workload among the processes. In fact, following this strategy to the limit will often map all tasks onto a single process. As a result, the processes with a lighter load will be idle when those with a heavier load are trying to finish their tasks. Similarly, to balance the load among processes, it may be necessary to assign tasks that interact heavily to different processes. Due to the conflicts between these objectives, finding a good mapping is a nontrivial problem. In this section, we will discuss various schemes for mapping tasks onto processes with the Primary view of balancing the task workload of processes and minimizing their idle time. Reducing inter-process interaction is the topic of Section 3.5. The reader should be aware that assigning a balanced aggregate load of tasks to each process is a necessary but not sufficient condition for reducing process idling. Recall that the tasks resulting from a decomposition are not all ready for execution at the same time. A task-dependency graph determines which tasks can execute in parallel and which must wait for some others to finish at a given stage in the execution of a parallel algorithm. Therefore, it is possible in a certain parallel formulation that although all processes perform the same aggregate amount of work, at different times, only a fraction of the processes are active while the remainder contain tasks that must wait for other tasks to finish. Similarly, poor synchronization among interactingtasks can lead to idling if one of the tasks has to wait to send or receive data from another task. A good mapping must ensure that the computations and interactions among processes at each stage of the execution of the parallel algorithm are well balanced. Figure 3.23 shows two mappings of 12-task decomposition in which the last four tasks can be started only after the first eight are finished due to dependencies among tasks. As the figure shows, two mappings,
each with an overall balanced workload, can result in different completion times.

Figure 3.23. Two mappings of a hypothetical decomposition with a
synchronization.



Mapping techniques used in parallel algorithms can be broadly classified into two categories: static and dynamic. The parallel programming paradigm and the characteristics of tasks and the interactions among them determine whether a static or a dynamic mapping is more suitable. Static Mapping: Static mapping techniques distribute the tasks among processes prior to the execution of the algorithm. For statically generated tasks, either static or dynamic mapping can be used. The choice of a good mapping in this case depends on several factors, including the knowledge of task sizes, the size of data associated with tasks, the characteristics of inter-task interactions, and even the parallel programming paradigm. Even when task sizes are known, in general, the problem of obtaining an optimal mapping is an NP-complete problem for non-uniform tasks. However, for many practical cases, relatively inexpensive heuristics provide fairly acceptable approximate solutions to the optimal static mapping problem. Algorithms that make use of static mapping are in general easier to design and program. Dynamic Mapping: Dynamic mapping techniques distribute the work among processes

during the execution of the algorithm. If tasks are generated dynamically, then they must be mapped dynamically too. If task sizes are unknown, then a static mapping can potentially lead to serious load-imbalances and dynamic mappings are usually more effective. If the amount of data associated with tasks is large relative to the computation, then a dynamic mapping may entail moving this data among processes. The cost of this data movement may outweigh some other advantages of dynamic mapping and may render a static mapping more suitable. However, in a shared-address-space paradigm, dynamic mapping may work well even with large data associated with tasks if the interaction is read-only. The reader should be aware that the shared-address-space programming paradigm does not automatically provide immunity against data-movement costs. If the underlying hardware is NUMA (Section 2.3.2), then the data may physically move from a distant memory. Even in a cc-UMA architecture, the data may have to move from one cache to another. Algorithms that require dynamic mapping are usually more complicated, particularly in the message-passing programming paradigm. Having discussed the guidelines for choosing between static and dynamic mappings, we now describe various schemes of these two types of mappings in detail. 

3.4.1 Schemes for Static Mapping
Static mapping is often, though not exclusively, used in conjunction with a decomposition based on data partitioning. Static mapping is also used for mapping certain problems that are expressed naturally by a static task-dependency graph. In the following subsections, we will discuss mapping schemes based on data partitioning and task partitioning. Mappings Based on Data Partitioning In this section, we will discuss mappings based on partitioning two of the most common ways of representing data in algorithms, namely, arrays and graphs. The data-partitioning actually induces a decomposition, but the partitioning or the decomposition is selected with the final mapping in mind.
Array Distribution Schemes In a decomposition based on partitioning data, the tasks are closely associated with portions of data by the owner-computes rule. Therefore, mapping the relevant data onto the processes is equivalent to mapping tasks onto processes. We now study some commonly used techniques of distributing arrays or matrices among processes. 
Block Distributions 
Block distributions are some of the simplest ways to distribute an array and assign uniform contiguous portions of the array to different processes. In these distributions, a d-dimensional array is distributed among the processes such that each process receives a contiguous block of array entries along a specified subset of array dimensions. Block distributions of arrays are particularly suitable when there is a locality of interaction, i.e., computation of an element of an array requires other nearby elements in the array. For example, consider an n x n two-dimensional array A with n rows and n columns. We can now select one of these dimensions, e.g., the first dimension, and partition the array into p parts such that the kth part contains rows kn/p...(k + 1)n/p - 1, where 0 k < p. That is, each partition contains a block of n/p consecutive rows of A. Similarly, if we partition A along the
second dimension, then each partition contains a block of n/p consecutive columns. These row and column-wise array distributions are illustrated in Figure 3.24.


partition. For instance, in the case of array A we can select both dimensions and partition the matrix into blocks such that each block corresponds to a n/p1 x n/p2 section of the matrix, with p = p1 x p2 being the number of processes. Figure 3.25 illustrates two different two-dimensional distributions, on a 4 x 4 and 2x 8 process grid, respectively. In general, given a d-dimensional array, we can distribute it using up to a d-dimensional block distribution. 


With Thanks to Ananth Grama, Anshul Gupta, George Karypis, Vipin Kumar
Reference 
Content is taken from 
Introduction to Parallel Computing, Second Edition
By Ananth Grama, Anshul Gupta, George Karypis, Vipin Kumar
Publisher: Addison Wesley
Pub Date: January 16, 2003
ISBN: 0-201-64865-2
Buy this book

Tuesday, March 31, 2020

HPC Characteristics of Tasks and Interactions

3.3 Characteristics of Tasks and Interactions
The various decomposition techniques described in the previous section allow us to identify the
concurrency that is available in a problem and decompose it into tasks that can be executed in
parallel. The next step in the process of designing a parallel algorithm is to take these tasks and
assign (i.e., map) them onto the available processes. While devising a mapping scheme to
construct a good parallel algorithm, we often take a cue from the decomposition. The nature of
the tasks and the interactions among them has a bearing on the mapping. In this section, we
shall discuss the various properties of tasks and inter-task interactions that affect the choice of
a good mapping.
3.3.1 Characteristics of Tasks
The following four characteristics of the tasks have a large influence on the suitability of a
mapping scheme.
Task Generation The tasks that constitute a parallel algorithm may be generated either
statically or dynamically. Static task generation refers to the scenario where all the tasks are
known before the algorithm starts execution. Data decomposition usually leads to static task
generation. Examples of data-decomposition leading to a static task generation include matrixmultiplication
and LU factorization (Problem 3.5). Recursive decomposition can also lead to a
static task-dependency graph. Finding the minimum of a list of numbers (Figure 3.9) is an
example of a static recursive task-dependency graph.
Certain decompositions lead to a dynamic task generation during the execution of the
algorithm. In such decompositions, the actual tasks and the task-dependency graph are not
explicitly available a priori, although the high level rules or guidelines governing task generation
are known as a part of the algorithm. Recursive decomposition can lead to dynamic task
generation. For example, consider the recursive decomposition in quicksort (Figure 3.8). The
tasks are generated dynamically, and the size and shape of the task tree is determined by the
values in the input array to be sorted. An array of the same size can lead to task-dependency
graphs of different shapes and with a different total number of tasks.
Exploratory decomposition can be formulated to generate tasks either statically or dynamically.
For example, consider the 15-puzzle problem discussed in Section 3.2.3. One way to generate a
static task-dependency graph using exploratory decomposition is as follows. First, a
preprocessing task starts with the initial configuration and expands the search tree in a
breadth-first manner until a predefined number of configurations are generated. These
configuration now represent independent tasks, which can be mapped onto different processes
and run independently. A different decomposition that generates tasks dynamically would be
one in which a task takes a state as input, expands it through a predefined number of steps of
breadth-first search and spawns new tasks to perform the same computation on each of the
resulting states (unless it has found the solution, in which case the algorithm terminates).
Task Sizes The size of a task is the relative amount of time required to complete it. The
complexity of mapping schemes often depends on whether or not the tasks are uniform; i.e.,
whether or not they require roughly the same amount of time. If the amount of time required
by the tasks varies significantly, then they are said to be non-uniform. For example, the tasks
in the decompositions for matrix multiplication shown in Figures 3.10 and 3.11 would be
considered uniform. On the other hand, the tasks in quicksort in Figure 3.8 are non-uniform.
Knowledge of Task Sizes The third characteristic that influences the choice of mapping
scheme is knowledge of the task size. If the size of all the tasks is known, then this information
can often be used in mapping of tasks to processes. For example, in the various decompositions
for matrix multiplication discussed so far, the computation time for each task is known before
the parallel program starts. On the other hand, the size of a typical task in the 15-puzzle
problem is unknown. We do not know a priori how many moves will lead to the solution from a
given state.
Size of Data Associated with Tasks Another important characteristic of a task is the size of
data associated with it. The reason this is an important consideration for mapping is that the
data associated with a task must be available to the process performing that task, and the size
and the location of these data may determine the process that can perform the task without
incurring excessive data-movement overheads.
Different types of data associated with a task may have different sizes. For instance, the input
data may be small but the output may be large, or vice versa. For example, the input to a task
in the 15-puzzle problem may be just one state of the puzzle. This is a small input relative to
the amount of computation that may be required to find a sequence of moves from this state to
a solution state. In the problem of computing the minimum of a sequence, the size of the input
is proportional to the amount of computation, but the output is just one number. In the parallel
formulation of the quick sort, the size of both the input and the output data is of the same order
as the sequential time needed to solve the task.
3.3.2 Characteristics of Inter-Task Interactions
In any nontrivial parallel algorithm, tasks need to interact with each other to share data, work,
or synchronization information. Different parallel algorithms require different types of
interactions among concurrent tasks. The nature of these interactions makes them more
suitable for certain programming paradigms and mapping schemes, and less suitable for others.
The types of inter-task interactions can be described along different dimensions, each
corresponding to a distinct characteristic of the underlying computations.
Static versus Dynamic One way of classifying the type of interactions that take place among
concurrent tasks is to consider whether or not these interactions have a static or dynamic
pattern. An interaction pattern is static if for each task, the interactions happen at
predetermined times, and if the set of tasks to interact with at these times is known prior to the
execution of the algorithm. In other words, in a static interaction pattern, not only is the taskinteraction
graph known a priori, but the stage of the computation at which each interaction
occurs is also known. An interaction pattern is dynamic if the timing of interactions or the set of
tasks to interact with cannot be determined prior to the execution of the algorithm.
Static interactions can be programmed easily in the message-passing paradigm, but dynamic
interactions are harder to program. The reason is that interactions in message-passing require
active involvement of both interacting tasks – the sender and the receiver of information. The
unpredictable nature of dynamic iterations makes it hard for both the sender and the receiver to
participate in the interaction at the same time. Therefore, when implementing a parallel
algorithm with dynamic interactions in the message-passing paradigm, the tasks must be
assigned additional synchronization or polling responsibility. Shared-address space
programming can code both types of interactions equally easily.
The decompositions for parallel matrix multiplication presented earlier in this chapter exhibit
static inter-task interactions. For an example of dynamic interactions, consider solving the 15-
puzzle problem in which tasks are assigned different states to explore after an initial step that
generates the desirable number of states by applying breadth-first search on the initial state. It
is possible that a certain state leads to all dead ends and a task exhausts its search space
without reaching the goal state, while other tasks are still busy trying to find a solution. The
task that has exhausted its work can pick up an unexplored state from the queue of another
busy task and start exploring it. The interactions involved in such a transfer of work from one
task to another are dynamic.
Regular versus Irregular Another way of classifying the interactions is based upon their
spatial structure. An interaction pattern is considered to be regular if it has some structure that
can be exploited for efficient implementation. On the other hand, an interaction pattern is called
irregular if no such regular pattern exists. Irregular and dynamic communications are harder
to handle, particularly in the message-passing programming paradigm. An example of a
decomposition with a regular interaction pattern is the problem of image dithering.
Example 3.9 Image dithering
In image dithering, the color of each pixel in the image is determined as the weighted
average of its original value and the values of its neighboring pixels. We can easily
decompose this computation, by breaking the image into square regions and using a
different task to dither each one of these regions. Note that each task needs to access
the pixel values of the region assigned to it as well as the values of the image
surrounding its region. Thus, if we regard the tasks as nodes of a graph with an edge
linking a pair of interacting tasks, the resulting pattern is a two-dimensional mesh, as
shown in Figure 3.22.

Sparse matrix-vector multiplication discussed in Section 3.1.2 provides a good example of
irregular interaction, which is shown in Figure 3.6. In this decomposition, even though each
task, by virtue of the decomposition, knows a priori which rows of matrix A it needs to access,
without scanning the row(s) of A assigned to it, a task cannot know which entries of vector b it
requires. The reason is that the access pattern for b depends on the structure of the sparse
matrix A.
Read-only versus Read-Write We have already learned that sharing of data among tasks
leads to inter-task interaction. However, the type of sharing may impact the choice of the
mapping. Data sharing interactions can be categorized as either read-only or read-write
interactions. As the name suggests, in read-only interactions, tasks require only a read-access
to the data shared among many concurrent tasks. For example, in the decomposition for
parallel matrix multiplication shown in Figure 3.10, the tasks only need to read the shared input
matrices A and B. In read-write interactions, multiple tasks need to read and write on some
shared data. For example, consider the problem of solving the 15-puzzle. The parallel
formulation method proposed in Section 3.2.3 uses an exhaustive search to find a solution. In
this formulation, each state is considered an equally suitable candidate for further expansion.
The search can be made more efficient if the states that appear to be closer to the solution are
given a priority for further exploration. An alternative search technique known as heuristic
search implements such a strategy. In heuristic search, we use a heuristic to provide a relative
approximate indication of the distance of each state from the solution (i.e. the potential number
of moves required to reach the solution). In the case of the 15-puzzle, the number of tiles that
are out of place in a given state could serve as such a heuristic. The states that need to be
expanded further are stored in a priority queue based on the value of this heuristic. While
choosing the states to expand, we give preference to more promising states, i.e. the ones that
have fewer out-of-place tiles and hence, are more likely to lead to a quick solution. In this
situation, the priority queue constitutes shared data and tasks need both read and write access
to it; they need to put the states resulting from an expansion into the queue and they need to
pick up the next most promising state for the next expansion.
One-way versus Two-way In some interactions, the data or work needed by a task or a
subset of tasks is explicitly supplied by another task or subset of tasks. Such interactions are
called two-way interactions and usually involve predefined producer and consumer tasks. In
other interactions, only one of a pair of communicating tasks initiates the interaction and
completes it without interrupting the other one. Such an interaction is called a one-way
interaction. All read-only interactions can be formulated as one-way interactions. Read-write
interactions can be either one-way or two-way.
The shared-address-space programming paradigms can handle both one-way and two-way
interactions equally easily. However, one-way interactions cannot be directly programmed in
the message-passing paradigm because the source of the data to be transferred must explicitly
send the data to the recipient. In the message-passing paradigm, all one-way interactions must
be converted to two-way interactions via program restructuring. Static one-way interactions can
be easily converted to two-way communications. Since the time and the location in the program
of a static one-way interaction is known a priori, introducing a matching interaction operation in
the partner task is enough to convert a one-way static interaction to a two-way static
interaction. On the other hand, dynamic one-way interactions can require some nontrivial
program restructuring to be converted to two-way interactions. The most common such
restructuring involves polling. Each task checks for pending requests from other tasks after
regular intervals, and services such requests, if any.

With Thanks to Ananth Grama, Anshul Gupta, George Karypis, Vipin Kumar
Reference 
Content is taken from 
Introduction to Parallel Computing, Second Edition
By Ananth Grama, Anshul Gupta, George Karypis, Vipin Kumar
Publisher: Addison Wesley
Pub Date: January 16, 2003
ISBN: 0-201-64865-2
Buy this book

Thursday, March 26, 2020

HPC Chapter No 4 Principles of Parallel Algorithm Design

4.2 Decomposition Techniques

As mentioned earlier, one of the fundamental steps that we need to undertake to solve a
problem in parallel is to split the computations to be performed into a set of tasks for
concurrent execution defined by the task-dependency graph. In this section, we describe some
commonly used decomposition techniques for achieving concurrency. This is not an exhaustive
set of possible decomposition techniques. Also, a given decomposition is not always guaranteed
to lead to the best parallel algorithm for a given problem. Despite these shortcomings, the
decomposition techniques described in this section often provide a good starting point for many
problems and one or a combination of these techniques can be used to obtain effective
decompositions for a large variety of problems.
These techniques are broadly classified as recursive decomposition, data-decomposition,
exploratory decomposition, and speculative decomposition. The recursive- and datadecomposition
techniques are relatively general purpose as they can be used to decompose a
wide variety of problems. On the other hand, speculative- and exploratory-decomposition
techniques are more of a special purpose nature because they apply to specific classes of
problems.


4.2.1 Recursive Decomposition

Recursive decomposition is a method for inducing concurrency in problems that can be solved
using the divide-and-conquer strategy. In this technique, a problem is solved by first dividing it
into a set of independent subproblems. Each one of these subproblems is solved by recursively
applying a similar division into smaller subproblems followed by a combination of their results.
The divide-and-conquer strategy results in natural concurrency, as different subproblems can be
solved concurrently.

Example 3.4 Quicksort
Consider the problem of sorting a sequence A of n elements using the commonly used
quicksort algorithm. Quicksort is a divide and conquer algorithm that starts by
selecting a pivot element x and then partitions the sequence A into two subsequences
A0 and A1 such that all the elements in A0 are smaller than x and all the elements in
A1 are greater than or equal to x. This partitioning step forms the divide step of the
algorithm. Each one of the subsequences A0 and A1 is sorted by recursively calling
quicksort. Each one of these recursive calls further partitions the sequences. This is
illustrated in Figure 3.8 for a sequence of 12 numbers. The recursion terminates when
each subsequence contains only a single element.

Figure 3.8. The quicksort task-dependency graph based on
recursive decomposition for sorting a sequence of 12 numbers.




In Figure 3.8, we define a task as the work of partitioning a given subsequence. Therefore,
Figure 3.8 also represents the task graph for the problem. Initially, there is only one sequence
(i.e., the root of the tree), and we can use only a single process to partition it. The completion
of the root task results in two subsequences (A0 and A1, corresponding to the two nodes at the
first level of the tree) and each one can be partitioned in parallel. Similarly, the concurrency
continues to increase as we move down the tree.


3.2.2 Data Decomposition


Data decomposition is a powerful and commonly used method for deriving concurrency in
algorithms that operate on large data structures. In this method, the decomposition of
computations is done in two steps. In the first step, the data on which the computations are
performed is partitioned, and in the second step, this data partitioning is used to induce a
partitioning of the computations into tasks. The operations that these tasks perform on different
data partitions are usually similar (e.g., matrix multiplication introduced in Example 3.5) or are
chosen from a small set of operations (e.g., LU factorization introduced in Example 3.10).
The partitioning of data can be performed in many possible ways as discussed next. In general,
one must explore and evaluate all possible ways of partitioning the data and determine which
one yields a natural and efficient computational decomposition.
Partitioning Output Data In many computations, each element of the output can be computed
independently of others as a function of the input. In such computations, a partitioning of the
output data automatically induces a decomposition of the problems into tasks, where each task
is assigned the work of computing a portion of the output. We introduce the problem of matrix multiplication
in Example 3.5 to illustrate a decomposition based on partitioning output data.
Example 3.5 Matrix multiplication
Consider the problem of multiplying two n x n matrices A and B to yield a matrix C.
Figure 3.10 shows a decomposition of this problem into four tasks. Each matrix is
considered to be composed of four blocks or submatrices defined by splitting each
dimension of the matrix into half. The four submatrices of C, roughly of size n/2 x n/2
each, are then independently computed by four tasks as the sums of the appropriate
products of submatrices of A and B.


Most matrix algorithms, including matrix-vector and matrix-matrix multiplication, can be
formulated in terms of block matrix operations. In such a formulation, the matrix is viewed as
composed of blocks or submatrices and the scalar arithmetic operations on its elements are
replaced by the equivalent matrix operations on the blocks. The results of the element and the
block versions of the algorithm are mathematically equivalent (Problem 3.10). Block versions of
matrix algorithms are often used to aid decomposition.
The decomposition shown in Figure 3.10 is based on partitioning the output matrix C into four
submatrices and each of the four tasks computes one of these submatrices. The reader must
note that data-decomposition is distinct from the decomposition of the computation into tasks.
Although the two are often related and the former often aids the latter, a given datadecomposition
does not result in a unique decomposition into tasks. For example, Figure 3.11
shows two other decompositions of matrix multiplication, each into eight tasks, corresponding
to the same data-decomposition as used in Figure 3.10(a).

Figure 3.11. Two examples of decomposition of matrix multiplication
into eight tasks.


With Thanks to Ananth Grama, Anshul Gupta, George Karypis, Vipin Kumar
Reference 
Content is taken from 
Introduction to Parallel Computing, Second Edition
By Ananth Grama, Anshul Gupta, George Karypis, Vipin Kumar
Publisher: Addison Wesley
Pub Date: January 16, 2003
ISBN: 0-201-64865-2
Buy this book

Tuesday, March 24, 2020

HPC Chapter No 4 Principles of Parallel Algorithm Design

Introduction 

Algorithm development is a critical component of problem solving using computers. A sequential
algorithm is essentially a recipe or a sequence of basic steps for solving a given problem using a
serial computer. Similarly, a parallel algorithm is a recipe that tells us how to solve a given
problem using multiple processors. However, specifying a parallel algorithm involves more than
just specifying the steps. At the very least, a parallel algorithm has the added dimension of
concurrency and the algorithm designer must specify sets of steps that can be executed
simultaneously. This is essential for obtaining any performance benefit from the use of a parallel
computer. In practice, specifying a nontrivial parallel algorithm may include some or all of the
following:
Identifying portions of the work that can be performed concurrently.
Mapping the concurrent pieces of work onto multiple processes running in parallel.
Distributing the input, output, and intermediate data associated with the program.
Managing accesses to data shared by multiple processors.
Synchronizing the processors at various stages of the parallel program execution.
Typically, there are several choices for each of the above steps, but usually, relatively few
combinations of choices lead to a parallel algorithm that yields performance commensurate with
the computational and storage resources employed to solve the problem. Often, different
choices yield the best performance on different parallel architectures or under different parallel
programming paradigms.
In this chapter, we methodically discuss the process of designing and implementing parallel
algorithms. We shall assume that the onus of providing a complete description of a parallel
algorithm or program lies on the programmer or the algorithm designer. Tools and compilers
for automatic parallelization at the current state of the art seem to work well only for highly
structured programs or portions of programs. Therefore, we do not consider these in this
chapter or elsewhere in this book.

3.1 Preliminaries

Dividing a computation into smaller computations and assigning them to different processors for parallel execution are the two key steps in the design of parallel algorithms. In this section, we present some basic terminology and introduce these two key steps in parallel algorithm design using matrix-vector multiplication and database query processing as examples.
3.1.1 Decomposition, Tasks, and Dependency Graphs
The process of dividing a computation into smaller parts, some or all of which may potentially
be executed in parallel, is called decomposition. Tasks are programmer-defined units of computation into which the main computation is subdivided by means of decomposition. Simultaneous execution of multiple tasks is the key to reducing the time required to solve the entire problem. Tasks can be of arbitrary size, but once defined, they are regarded as indivisible units of computation. The tasks into which a problem is decomposed may not all be of the same size.




Note that all tasks in Figure 3.1 are independent and can be performed all together or in any
sequence. However, in general, some tasks may use data produced by other tasks and thus
may need to wait for these tasks to finish execution. An abstraction used to express such
dependencies among tasks and their relative order of execution is known as a taskdependency
graph. A task-dependency graph is a directed acyclic graph in which the nodes
represent tasks and the directed edges indicate the dependencies amongst them. The task
corresponding to a node can be executed when all tasks connected to this node by incoming
edges have completed. Note that task-dependency graphs can be disconnected and the edgeset
of a task-dependency graph can be empty. This is the case for matrix-vector multiplication,
where each task computes a subset of the entries of the product vector. To see a more
interesting task-dependency graph, consider the following database query processing example.


3.1.2 Granularity, Concurrency, and Task-Interaction
The number and size of tasks into which a problem is decomposed determines the granularity of the decomposition. A decomposition into a large number of small tasks is called fine-grained and a decomposition into a small number of large tasks is called coarse-grained. For example, the decomposition for matrix-vector multiplication shown in Figure 3.1 would usually be considered fine-grained because each of a large number of tasks performs a single dot-product. Figure 3.4 shows a coarse-grained decomposition of the same problem into four tasks, where each tasks computes n/4 of the entries of the output vector of length n.


A concept related to granularity is that of degree of concurrency. The maximum number of tasks that can be executed simultaneously in a parallel program at any given time is known as its maximum degree of concurrency. In most cases, the maximum degree of concurrency is less than the total number of tasks due to dependencies among the tasks. For example, the maximum degree of concurrency in the task-graphs of Figures 3.2 and 3.3 is four. In these task-graphs, maximum concurrency is available right at the beginning when tables for Model, Year, Color Green, and Color White can be computed simultaneously. In general, for task dependency graphs that are trees, the maximum degree of concurrency is always equal to the number of leaves in the tree. A more useful indicator of a parallel program's performance is the average degree of concurrency, which is the average number of tasks that can run concurrently over the entire duration of execution of the program. Both the maximum and the average degrees of concurrency usually increase as the granularity of tasks becomes smaller (finer). For example, the decomposition of matrix-vector multiplication shown in Figure 3.1 has a fairly small granularity and a large degree of concurrency. The decomposition for the same problem shown in Figure 3.4 has a larger granularity and a smaller degree of concurrency. The degree of concurrency also depends on the shape of the task-dependency graph and the same granularity, in general, does not guarantee the same degree of concurrency. For example, consider the two task graphs in Figure 3.5, which are abstractions of the task graphs of Figures 3.2 and 3.3, respectively (Problem 3.1). The number inside each node represents the amount of work required to complete the task corresponding to that node. The average degree of concurrency of the task graph in Figure 3.5(a) is 2.33 and that of the task graph in Figure 3.5(b) is 1.88 (Problem 3.1), although both task-dependency graphs are based on the same decomposition.


A feature of a task-dependency graph that determines the average degree of concurrency for a
given granularity is its critical path. In a task-dependency graph, let us refer to the nodes with
no incoming edges by start nodes and the nodes with no outgoing edges by finish nodes. The
longest directed path between any pair of start and finish nodes is known as the critical path.
The sum of the weights of nodes along this path is known as the critical path length, where
the weight of a node is the size or the amount of work associated with the corresponding task.
The ratio of the total amount of work to the critical-path length is the average degree of
concurrency. Therefore, a shorter critical path favors a higher degree of concurrency. For
example, the critical path length is 27 in the task-dependency graph shown in Figure 3.5(a) and
is 34 in the task-dependency graph shown in Figure 3.5(b). Since the total amount of work
required to solve the problems using the two decompositions is 63 and 64, respectively, the
average degree of concurrency of the two task-dependency graphs is 2.33 and 1.88,
respectively.
Although it may appear that the time required to solve a problem can be reduced simply by
increasing the granularity of decomposition and utilizing the resulting concurrency to perform
more and more tasks in parallel, this is not the case in most practical scenarios. Usually, there
is an inherent bound on how fine-grained a decomposition a problem permits. For instance,
there are n2 multiplications and additions in matrix-vector multiplication considered in Example
3.1 and the problem cannot be decomposed into more than O(n2) tasks even by using the most
fine-grained decomposition. Other than limited granularity and degree of concurrency, there is another important practical factor that limits our ability to obtain unbounded speedup (ratio of serial to parallel execution time) from parallelization. This factor is the interaction among tasks running on different physical processors. The tasks that a problem is decomposed into often share input, output, or intermediate data. The dependencies in a task-dependency graph usually result from the fact that the output of one task is the input for another. For example, in the database query example, tasks share intermediate data; the table generated by one task is often used by another task as input. Depending on the definition of the tasks and the parallel programming paradigm, there may be interactions among tasks that appear to be independent in a task dependency graph. For example, in the decomposition for matrix-vector multiplication, although all tasks are independent, they all need access to the entire input vector b. Since originally there is only one copy of the vector b, tasks may have to send and receive messages for all of them to access the entire vector in the distributed-memory paradigm. The pattern of interaction among tasks is captured by what is known as a task-interaction graph. The nodes in a task-interaction graph represent tasks and the edges connect tasks that interact with each other. The nodes and edges of a task-interaction graph can be assigned weights proportional to the amount of computation a task performs and the amount of interaction that occurs along an edge, if this information is known. The edges in a task interaction graph are usually undirected, but directed edges can be used to indicate the direction of flow of data, if it is unidirectional. The edge-set of a task-interaction graph is usually a superset of the edge-set of the task-dependency graph. In the database query example discussed earlier, the task-interaction graph is the same as the task-dependency graph. We now give an example of a more interesting task-interaction graph that results from the problem of sparse matrix-vector multiplication.

Cloud Computing Lab No. 6

Study Of Installation and Configuration of Ulteo to demonstrate on demand Application delivery over web browser to explore SaaS Environment.


Ulteo is an open source Virtual Desktop infrastructure project that can deliver various operating systems deskops - including Windows and Linux desktops or applications - to end usersThe Open Virtual Desktop allows corporates to deploy virtualized GNU/Linux and/or Windows desktops. Parts of Ulteo products are based on Debian and Ubuntu. Ulteo Open Virtual Desktop is an open source alternative to Citrix and VMWare solutions.
The steps for installation and configuration of Ulteo are given as follows
Step 1: Install Ulteo through DVD or Open Ulteo OVF file in Vmware player by selecting import VM button
1) If you haven't an Ulteo OVD DVD-ROM yet, please download the corresponding ISO file from this place at www.ulteo.com and burn it to a fresh DVD.
2) Insert the Ulteo OVD DVD-ROM into your computer and restart it. If you selected the DVD-ROM as first boot device you'll see the boot loader Screen.
3) Select Install Ulteo Option
4) The first step is used to select the system language. Choose your language from the list and click on Forward.
5) In the second step, the system asks you to define your location. Either select a point on the map or choose one from the Selected city form and click on Forward.
6) The third step is used to define the keyboard layout. Select yours and click on Forward.
7) Then, you have to select the partitioning method. We suggest the automatic method: Erase and use the entire disk.
8) These questions are about the installed operating system itself, user login and password used to access the OS, along with the hostname of the machine.
9) Type a password and confirm it. Useful address is displayed to you for a near future use of OVD.
10) Then read carefully the installation summary, then click on Install and wait til installation completes
11) Finally, click on Restart now to finish installation process.
Step 2: In Management machine Open following URLs
https://Ulteo-Server-ipaddress/ovd for Client access
https://Ulteo-Server-ipaddress/admin for Admin access
Step 3: Login on Admin portal specify Username and Password as Admin
                                 


Under server tab Register server, Click on manage to add ip address of Ulteo Server
a) Go to user tab to add multiple users
b) Go to User tab then select user group then create a new user group and add users in to them
c) Go to Application Tab to Create Application Group

Map User group with Application group And use the services at client side
The Administrator panel is limited to Administrator who can manage Applications, users and groups. Once admin logged in to this portal, he can create users, user groups, Application groups maps users to User group and Application group, manages applications or installs softwares based on users requirement Shown in figure 5.11.

.
Ulteo Administrator panel
The Application menu of admin panel shows available applications which can be mapped to users or user groups Shown in
.
Ulteo Application Menu
Step 4: At client side open https://Ulteo-Server-ipaddress/ovd for Client access,Specify Username and Password and Access the softwares added in Application group
Once user selects Access Ulteo option it shows login page of Ulteo session manager shown below. The user can get login name and password by filling Registration form through main page of cloud portal Shown below.
.
Ulteo user Login Portal
Once user is validated he can access the services using portal mode or Desktop mode. Both the modes give access to software applications which are installed on Linux Application Server and Windows Application Server. In portal mode the user get applications in vertical pane Shown in figure 5.9.

Ulteo Portal mode
While in Desktop mode use gets full flagged Linux desktop running on browser with selected Applications Shown in

Ulteo Desktop mode

Thursday, March 19, 2020

HPC Lecture Notes for 19/03/2020

6.6 Collective Communication and ComputationOperations

MPI provides an extensive set of functions for performing many commonly used collective
communication operations. In particular, the majority of the basic communication operations
described in Chapter 4 are supported by MPI. All of the collective communication functions
provided by MPI take as an argument a communicator that defines the group of processes that
participate in the collective operation. All the processes that belong to this communicator
participate in the operation, and all of them must call the collective communication function.
Even though collective communication operations do not act like barriers (i.e., it is possible for
a processor to go past its call for the collective communication operation even before other
processes have reached it), it acts like a virtual synchronization step in the following sense: the
parallel program should be written such that it behaves correctly even if a global
synchronization is performed before and after the collective call. Since the operations are
virtually synchronous, they do not require tags. In some of the collective functions data is
required to be sent from a single process (source-process) or to be received by a single process
(target-process). In these functions, the source- or target-process is one of the arguments
supplied to the routines. All the processes in the group (i.e., communicator) must specify the
same source- or target-process. For most collective communication operations, MPI provides
two different variants. The first transfers equal-size data to or from each process, and the
second transfers data that can be of different sizes.
6.6.1 Barrier
The barrier synchronization operation is performed in MPI using the MPI_Barrier function.
int MPI_Barrier(MPI_Comm comm)
The only argument of MPI_Barrier is the communicator that defines the group of processes
that are synchronized. The call to MPI_Barrier returns only after all the processes in the group
have called this function.
6.6.2 Broadcast
The one-to-all broadcast operation described in Section 4.1 is performed in MPI using the
MPI_Bcast function.
int MPI_Bcast(void *buf, int count, MPI_Datatype datatype,
int source, MPI_Comm comm)
MPI_Bcast sends the data stored in the buffer buf of process source to all the other processes
in the group. The data received by each process is stored in the buffer buf . The data that is
broadcast consist of count entries of type datatype . The amount of data sent by the source
process must be equal to the amount of data that is being received by each process; i.e., the
count and datatype fields must match on all processes.
6.6.3 Reduction
The all-to-one reduction operation described in Section 4.1 is performed in MPI using the
MPI_Reduce function.
int MPI_Reduce(void *sendbuf, void *recvbuf, int count,
MPI_Datatype datatype, MPI_Op op, int target,
MPI_Comm comm)
MPI_Reduce combines the elements stored in the buffer sendbuf of each process in the group,
using the operation specified in op , and returns the combined values in the buffer recvbuf of
the process with rank target . Both the sendbuf and recvbuf must have the same number of
count items of type datatype . Note that all processes must provide a recvbuf array, even if
they are not the target of the reduction operation. When count is more than one, then the
combine operation is applied element-wise on each entry of the sequence. All the processes
must call MPI_Reduce with the same value for count , datatype , op , target , and comm .
MPI provides a list of predefined operations that can be used to combine the elements stored in
sendbuf . MPI also allows programmers to define their own operations, which is not covered in
this book. The predefined operations are shown in Table 6.3 . For example, in order to compute
the maximum of the elements stored in sendbuf , the MPI_MAX value must be used for the op
argument. Not all of these operations can be applied to all possible data-types supported by
MPI. For example, a bit-wise OR operation (i.e., op = MPI_BOR ) is not defined for real-valued
data-types such as MPI_FLOAT and MPI_REAL . The last column of Table 6.3 shows the various
data-types that can be used with each operation.
MPI_MAX
Maximum
C integers and floating point
MPI_MIN
Minimum
C integers and floating point
MPI_SUM
Sum
C integers and floating point
MPI_PROD
Product
C integers and floating point
MPI_LAND
Logical AND
C integers
MPI_BAND
Bit-wise AND
C integers and byte
MPI_LOR
Logical OR
C integers
MPI_BOR
Bit-wise OR
C integers and byte
MPI_LXOR
Logical XOR
C integers
MPI_BXOR
Bit-wise XOR
C integers and byte
MPI_MAXLOC
max-min value-location
Data-pairs
MPI_MINLOC
min-min value-location
Data-pairs
Table 6.3. Predefined reduction operations.
Operation Meaning Datatypes
The operation MPI_MAXLOC combines pairs of values (vi , li ) and returns the pair (v , l ) such
that v is the maximum among all vi 's and l is the smallest among all li 's such that v = vi .
Similarly, MPI_MINLOC combines pairs of values and returns the pair (v , l ) such that v is the
minimum among all vi 's and l is the smallest among all li 's such that v = vi . One possible
application of MPI_MAXLOC or MPI_MINLOC is to compute the maximum or minimum of a list of
numbers each residing on a different process and also the rank of the first process that stores
this maximum or minimum, as illustrated in Figure 6.6 . Since both MPI_MAXLOC and
MPI_MINLOC require datatypes that correspond to pairs of values, a new set of MPI datatypes
have been defined as shown in Table 6.4 . In C, these datatypes are implemented as structures
containing the corresponding types.
Figure 6.6. An example use of the MPI_MINLOC and MPI_MAXLOC operators.
When the result of the reduction operation is needed by all the processes, MPI provides the
MPI_Allreduce operation that returns the result to all the processes. This function provides the
functionality of the all-reduce operation described in Section 4.3 .
MPI_2INT
pair of int s
MPI_SHORT_INT
short and int
MPI_LONG_INT
long and int
MPI_LONG_DOUBLE_INT
long double and int
MPI_FLOAT_INT
float and int
MPI_DOUBLE_INT
double and int
Table 6.4. MPI datatypes for data-pairs used with the MPI_MAXLOC and
MPI_MINLOC reduction operations.
MPI Datatype C Datatype
int MPI_Allreduce(void *sendbuf, void *recvbuf, int count,
MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
Note that there is no target argument since all processes receive the result of the operation.
6.6.4 Prefix
The prefix-sum operation described in Section 4.3 is performed in MPI using the MPI_Scan
function.
int MPI_Scan(void *sendbuf, void *recvbuf, int count,
MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
MPI_Scan performs a prefix reduction of the data stored in the buffer sendbuf at each process
and returns the result in the buffer recvbuf . The receive buffer of the process with rank i will
store, at the end of the operation, the reduction of the send buffers of the processes whose
ranks range from 0 up to and including i . The type of supported operations (i.e., op ) as well as
the restrictions on the various arguments of MPI_Scan are the same as those for the reduction
operation MPI_Reduce .
6.6.5 Gather
The gather operation described in Section 4.4 is performed in MPI using the MPI_Gather
function.
int MPI_Gather(void *sendbuf, int sendcount,
MPI_Datatype senddatatype, void *recvbuf, int recvcount,
MPI_Datatype recvdatatype, int target, MPI_Comm comm)
Each process, including the target process, sends the data stored in the array sendbuf to the
target process. As a result, if p is the number of processors in the communication comm , the
target process receives a total of p buffers. The data is stored in the array recvbuf of the target
process, in a rank order. That is, the data from process with rank i are stored in the recvbuf
starting at location i * sendcount (assuming that the array recvbuf is of the same type as
recvdatatype ).
The data sent by each process must be of the same size and type. That is, MPI_Gather must be
called with the sendcount and senddatatype arguments having the same values at each
process. The information about the receive buffer, its length and type applies only for the target
process and is ignored for all the other processes. The argument recvcount specifies the
number of elements received by each process and not the total number of elements it receives.
So, recvcount must be the same as sendcount and their datatypes must be matching.
MPI also provides the MPI_Allgather function in which the data are gathered to all the
processes and not only at the target process.
int MPI_Allgather(void *sendbuf, int sendcount,
MPI_Datatype senddatatype, void *recvbuf, int recvcount,
MPI_Datatype recvdatatype, MPI_Comm comm)
The meanings of the various parameters are similar to those for MPI_Gather ; however, each
process must now supply a recvbuf array that will store the gathered data.
In addition to the above versions of the gather operation, in which the sizes of the arrays sent
by each process are the same, MPI also provides versions in which the size of the arrays can be
different. MPI refers to these operations as the vector variants. The vector variants of the
MPI_Gather and MPI_Allgather operations are provided by the functions MPI_Gatherv and
MPI_Allgatherv , respectively.
int MPI_Gatherv(void *sendbuf, int sendcount,
MPI_Datatype senddatatype, void *recvbuf,
int *recvcounts, int *displs,
MPI_Datatype recvdatatype, int target, MPI_Comm comm)
int MPI_Allgatherv(void *sendbuf, int sendcount,
MPI_Datatype senddatatype, void *recvbuf,
int *recvcounts, int *displs, MPI_Datatype recvdatatype,
MPI_Comm comm)
These functions allow a different number of data elements to be sent by each process by
replacing the recvcount parameter with the array recvcounts . The amount of data sent by
process i is equal to recvcounts[i] . Note that the size of recvcounts is equal to the size of
the communicator comm . The array parameter displs , which is also of the same size, is used
to determine where in recvbuf the data sent by each process will be stored. In particular, the
data sent by process i are stored in recvbuf starting at location displs[i] . Note that, as
opposed to the non-vector variants, the sendcount parameter can be different for different
processes.
6.6.6 Scatter
The scatter operation described in Section 4.4 is performed in MPI using the MPI_Scatter
function.
int MPI_Scatter(void *sendbuf, int sendcount,
MPI_Datatype senddatatype, void *recvbuf, int recvcount,
MPI_Datatype recvdatatype, int source, MPI_Comm comm)
The source process sends a different part of the send buffer sendbuf to each processes,
including itself. The data that are received are stored in recvbuf . Process i receives sendcount
contiguous elements of type senddatatype starting from the i * sendcount location of the
sendbuf of the source process (assuming that sendbuf is of the same type as senddatatype ).
MPI_Scatter must be called by all the processes with the same values for the sendcount ,
senddatatype , recvcount , recvdatatype , source , and comm arguments. Note again that
sendcount is the number of elements sent to each individual process.
Similarly to the gather operation, MPI provides a vector variant of the scatter operation, called
MPI_Scatterv , that allows different amounts of data to be sent to different processes.
int MPI_Scatterv(void *sendbuf, int *sendcounts, int *displs,
MPI_Datatype senddatatype, void *recvbuf, int recvcount,
MPI_Datatype recvdatatype, int source, MPI_Comm comm)
As we can see, the parameter sendcount has been replaced by the array sendcounts that
determines the number of elements to be sent to each process. In particular, the target
process sends sendcounts[i] elements to process i . Also, the array displs is used to
determine where in sendbuf these elements will be sent from. In particular, if sendbuf is of the
same type is senddatatype , the data sent to process i start at location displs[i] of array
sendbuf . Both the sendcounts and displs arrays are of size equal to the number of processes
in the communicator. Note that by appropriately setting the displs array we can use
MPI_Scatterv to send overlapping regions of sendbuf .
6.6.7 All-to-All
The all-to-all personalized communication operation described in Section 4.5 is performed in
MPI by using the MPI_Alltoall function.
int MPI_Alltoall(void *sendbuf, int sendcount,
MPI_Datatype senddatatype, void *recvbuf, int recvcount,
MPI_Datatype recvdatatype, MPI_Comm comm)
Each process sends a different portion of the sendbuf array to each other process, including
itself. Each process sends to process i sendcount contiguous elements of type senddatatype
starting from the i * sendcount location of its sendbuf array. The data that are received are
stored in the recvbuf array. Each process receives from process i recvcount elements of type
recvdatatype and stores them in its recvbuf array starting at location i * recvcount .
MPI_Alltoall must be called by all the processes with the same values for the sendcount ,
senddatatype , recvcount , recvdatatype , and comm arguments. Note that sendcount and
recvcount are the number of elements sent to, and received from, each individual process.
MPI also provides a vector variant of the all-to-all personalized communication operation called
MPI_Alltoallv that allows different amounts of data to be sent to and received from each
process.
int MPI_Alltoallv(void *sendbuf, int *sendcounts, int *sdispls
MPI_Datatype senddatatype, void *recvbuf, int *recvcounts,
int *rdispls, MPI_Datatype recvdatatype, MPI_Comm comm)
The parameter sendcounts is used to specify the number of elements sent to each process, and
the parameter sdispls is used to specify the location in sendbuf in which these elements are
stored. In particular, each process sends to process i , starting at location sdispls[i] of the
array sendbuf , sendcounts[i] contiguous elements. The parameter recvcounts is used to
specify the number of elements received by each process, and the parameter rdispls is used to
specify the location in recvbuf in which these elements are stored. In particular, each process
receives from process i recvcounts[i] elements that are stored in contiguous locations of
recvbuf starting at location rdispls[i] . MPI_Alltoallv must be called by all the processes
with the same values for the senddatatype , recvdatatype , and comm arguments.

Introduction to OpenMP

Watch below videos in sequence 







Lecture PPTs of MIS , Module 4 to Module 6

 Module 4 Social Computing (SC): Web 3.0 , SC in business-shopping, Marketing, Operational and Analytic CRM, E-business and E-commerce B2B B...