Table of
contents
9.1.2010
Mandelbrot with MPI
Mandelbrot fractal rendering using MPI -- need I say more?
I commented the code to a ridiculous extent, so.. I guess I need not. :-)
Drag on the window to select a zoom region. Click on the window to quit. Here you go:
/*
* THE MANDELBROT SET
*
* Notes:
* If you notice lines or blocks that seem to be skipped, do not worry;
* clients avoid traversing through the whole line buffers, and there might
* be some lines that are left unprocessed until the very end. The clients
* are still busy all the time. This occurs often when communication is
* very fast, and the master is able to feed the slaves one line at a time.
*
* The design idea:
* To have a scalable (to hundreds of slaves) implementation,
* which is tolerant to slow and lagging communication.
* For example, nothing is assumed of the message order,
* just that messages eventually will get through in some order.
*
* Implementation:
* Each client has a buffer of rows. These rows can be one of the following:
* 1. Free
* 2. Reserved for receiving (only the amount is reserved, not specific rows)
* 3. Data for an old view (will get cleaned automatically once encountered)
* 4. Data for this current view (will be selected to be rendered in an arbitrary order)
* 5. Data for a future view (they are skipped until they are current)
* At start, clients report the master of how many
* free (ready to accept) rows they have.
* In a loop, clients process one row and then check for new messages.
* If there are no rows to process, clients will block while waiting for messages.
* Upon receiving a new view (new dimensions), a view count is incremented
* and all rows for the current view are considered obsolete
* (but purged only upon an encounter).
* New rows are accepted for current and future views, and each time a pack
* of rows is received, client reports (async) how many new free lines it has.
* Clients don't spam the master, and have only one outstanding message
* at all times. In regular situations, they report 5-50% of the buffer size as
* being free, but if the master is underloaded, it might be just 1 row.
* Note, that very big buffer sizes can be used, because the buffer is not
* traversed through in each loop.
*
* The master thread also avoids spamming the communication system.
* It sends row packs to clients in a synchronized fashion, forcing other clients
* to cumulate the free count while processing. When it is a client's turn,
* the master sends it a pack of rows (to fill all the reported free slots, unless
* a smaller pack size is forced (HEIGHT/client count)), and the client eventually
* replies with an update to its free slot situation.
* Because of this, communication stays rather constant, and when there are
* very many clients, the row pack sizes just get bigger, but the master doesn't
* choke and clients do not run out of computation.
*
* compile: mpicc -mpe=graphics mandelbrot.c -o mandelbrot -lm
* run: mpicc mpiexec -n 20 ./mandelbrot
*
* Click the window to quit.
*
* set "#define DEBUG 1" in the code below if you would like to see how the
* execution goes. It gets rather messy, though, but has allowed me to confirm
* that the architecture works as planned.
*/
#include <mpi.h>
#define MPE_GRAPHICS
#include <mpe.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define WIDTH 1024
#define HEIGHT 768
#define BUFSIZE 25 // How many row requests to keep in each client
#define DEBUG 0 // Debug messages (true/false)
int procc, rank; // Process count and the current rank
static MPE_XGraph graph;
static MPE_Color color[256];
static MPE_Point line[WIDTH];
typedef struct {
double r, i;
} complex;
typedef struct {
int rowNumber;
int view; // In which consecutive view this line belongs to
} rowInfo;
enum { // Communication tags. Enumeration type casts to integer, so we can use this directly
tag_free, // Clients inform the server how many free spots they have in their buffer
tag_dim, // Server informs clients of new dimensions
tag_rows // Server hands out rows to clients
};
int mpiInit(int argc, char **argv) {
if (MPI_Init(&argc, &argv) != MPI_SUCCESS) {
fprintf(stderr, "Failure in initializing MPI\n");
return 100;
}
if (MPI_Comm_size(MPI_COMM_WORLD, &procc) != MPI_SUCCESS) {
fprintf(stderr, "Couldn't get the process count\n");
MPI_Finalize();
return 101;
} else if (procc < 2) {
fprintf(stderr, "Must have at least 2 processes (have %d)\n", procc);
MPI_Finalize();
return 102;
}
if (MPI_Comm_rank(MPI_COMM_WORLD, &rank) != MPI_SUCCESS) {
fprintf(stderr, "Couldn't get rank for a process\n");
MPI_Finalize();
return 103;
}
MPE_Open_graphics(&graph, MPI_COMM_WORLD, NULL, 0, 0, WIDTH, HEIGHT, 0);
MPE_Make_color_array(graph, 256, color);
if (DEBUG && !rank) // Root only
printf("MPI Initialized\n");
return 0;
} // mpiIinit
// We're using the same naming, and the same function.
// (As this assignment is about parallel programming,
// I think it's acceptable to just copy this)
int calc_mandel(complex c) {
int count = 0;
int max = 255;
complex z;
double len2, temp;
z.r = z.i = 0.0;
do {
temp = z.r*z.r - z.i*z.i + c.r;
z.i = 2.0*z.r*z.i + c.i;
z.r = temp;
len2 = z.r*z.r + z.i*z.i;
if (len2 > 4.0) break;
count++;
} while (count < max);
return count;
} // calc_mandel()
// This is the final step of a row
void processRow(int row, double *dim) {
int i;
double rowPosX; // Where in the complex plane this row belongs
double rowPosY;
double advanceX; // How wide is a pixel in the complex plane
complex c;
// If we had a framebuffer, we could just memcpy the color index data into it,
// but now we have to loop through the points (inefficient).
// (Don't blame me, blame the MPE requirement in the assignment :-)
rowPosY = dim[1] + dim[3]*((double)row/(double)HEIGHT);
rowPosX = dim[0];
advanceX = dim[2]/(double)WIDTH; // This gets to 0.0 when zoomed close enough == horizontal bands
c.i = rowPosY;
c.r = rowPosX;
for (i = 0; i < WIDTH; ++i) {
line[i].y = row;
line[i].x = i;
c.r += advanceX;
line[i].c = color[calc_mandel(c)];
}
MPE_Draw_points(graph, line, WIDTH); // Drawing the line
MPE_Update(graph);
}
void clientExec() {
rowInfo rowBuffer[BUFSIZE];
unsigned char free = BUFSIZE; // The amount of free buffer slots
// We only need to store one row at a time, as we dispose it immediately.
// The values are 0..255 so unsigned char suits fine.
unsigned char tempRowData[WIDTH];
int i;
int quit = 0;
int procIndex = 0, storeIndex = 0; // procIndex == processing, storeIndex == receiving
int viewCount = 0; // This is used to detect old row requests
double dim[4]; // Dimensions of the view
MPI_Status tempStatus;
MPI_Request *req; // Rows, dimension, send
// reportedRows keeps count of how many there are pending from the server,
// and reportRows is the send buffer of the last report message
int reportedRows = BUFSIZE, reportRows;
int freeRows = 0; // How many rows are free, that are not reported to server
int nothingToProcess = 0;
// The incoming messages
double dimMsg[4]; // Dimensions
// Row message:
// The first value is view number (related to viewCount),
// the second is the amount of rows given,
// and is followed by the list of the rows (caps at BUFSIZE).
int rowMsg[2+BUFSIZE];
int msgId, commFlag;
// Whether we have already sent an update of our free situation,
// but not yet received data. This is used to avoid sending
// too much updates about how many free slots we have, when the
// master is choking.
// If, due to master's choking or the delay of the communication,
// the row processing is N times faster than the round trip of
// a message, then we automatically settle to sending an update
// saying (on average) that "we have N rows free". (N < BUFSIZE)
// If N > BUFSIZE, we request row counts of: BUFSIZE, 1, BUFSIZE, 1, ...
// So you should make BUFSIZE large enough, but so that BUFSIZE < HEIGHT/processes
int responsePending;
// Sending an asynchronous message telling that we have BUFSIZE free spots
for (i = 0; i < BUFSIZE; ++i)
rowBuffer[i].view = -1; // Marking the row as being free
req = (MPI_Request*) malloc(sizeof(MPI_Request) * 3);
reportRows = reportedRows;
MPI_Isend(&reportRows, 1, MPI_INT, 0, tag_free, MPI_COMM_WORLD, &req[2]);
responsePending = 1;
// Starting 2 nonblocking receives, for dimensions and row data
MPI_Irecv(dimMsg, 4, MPI_DOUBLE, 0, tag_dim, MPI_COMM_WORLD, &req[1]);
MPI_Irecv(rowMsg, 2+BUFSIZE, MPI_INT, 0, tag_rows, MPI_COMM_WORLD, &req[0]);
if (DEBUG)
printf("Client %d: Initialized (buffer size %d), switching to main loop\n", rank, BUFSIZE);
while (!quit) {
if (DEBUG)
printf("Client %d: Begin loop. %d processable rows\n\tBUFSIZE %d, free rows %d, rows in request %d, procIndex %d, storeIndex %d\n",
rank, BUFSIZE-freeRows-reportedRows, BUFSIZE, freeRows, reportedRows, procIndex, storeIndex);
// If reportedRows + freeRows == BUFSIZE, we have nothing to do until we get
// a task from the master. Therefore we start waiting for a message.
// Also, if we previously looped through the data and found out that all we have
// is future rows, we need to wait for more data or a dimension update.
if (reportedRows + freeRows == BUFSIZE
|| nothingToProcess) {
if (DEBUG)
printf("Client %d: Empty or only future rows. Waiting for any message.\n", rank);
MPI_Waitany(2, req, // Rows and dimension messages were the first 2
&msgId, &tempStatus); // No need for status, we know the length of the row message after parsing
} else {
// Testing
MPI_Testany(2, req, &msgId, &commFlag, &tempStatus);
if (!commFlag)
msgId = -1; // No messages
}
// Checking out messages
switch (msgId) {
case -1:
if (DEBUG)
printf("Child %d: No messages\n", rank);
break;
case 0: // Receiving new rows
// If the view number is 0, we quit.
if (!rowMsg[0]) {
if (DEBUG)
printf("Child %d: We got quit signal, aborting\n", rank);
quit = 1;
break;
}
responsePending = 0; // We should send a new request, if we have free spots
if (DEBUG)
printf("Child %d: We got %d rows for view %d (reported buffer spots after this: %d)\n",
rank, rowMsg[1], rowMsg[0], reportedRows-rowMsg[1]);
// Since we haven't sent more requests than we can handle,
// no overflow can happen. (We would be forever in this loop in an overflow)
reportedRows -= rowMsg[1];
while (rowMsg[1]) {
// Finding a spot from the buffer for the new row.
// This is a row with old view (view < viewCount), or free (-1).
// Note that this also accepts rows for future views!
// (storeIndex is not connected to procIndex, they both traverse
// separate from each other.
for (; rowBuffer[storeIndex].view >= viewCount;
storeIndex = (storeIndex+1)%BUFSIZE);
// Now storeIndex points to a buffer spot which is free
// (either -1 (== free) or belongs to an old view)
if (rowBuffer[storeIndex].view != -1) { // We're freeing it if it was old
freeRows++; // Freeing this spot
rowBuffer[storeIndex].view = -1;
if (DEBUG)
printf("Child %d: Spot %d was obsolete (%d < %d)\n",
rank, storeIndex, rowBuffer[storeIndex].view, viewCount);
}
if (DEBUG)
printf("Child %d: Storing request for row %d (view %d) to buffer spot %d.\n",
rank, rowMsg[1+rowMsg[1]], rowMsg[0], storeIndex);
rowBuffer[storeIndex].view = rowMsg[0];
rowBuffer[storeIndex].rowNumber = rowMsg[1+rowMsg[1]];
rowMsg[1]--; // One less to go
}
if (DEBUG)
printf("Child %d: New row processing ended, spawning a new recv\n", rank);
// And receiving a new row message (at some point)
MPI_Irecv(rowMsg, 2+BUFSIZE, MPI_INT, 0, tag_rows, MPI_COMM_WORLD, &req[0]);
break;
case 1: // New dimensions
viewCount++;
memcpy((void*)dim, (void*)dimMsg, sizeof(double)*4);
if (DEBUG)
printf("Child %d: Received new dimensions (x %.3f y %.3f w %.3f h %.3f), increasing viewCount to %d\n",
rank, dim[0], dim[1], dim[2], dim[3], viewCount);
MPI_Irecv(dimMsg, 4, MPI_DOUBLE, 0, tag_dim, MPI_COMM_WORLD, &req[1]);
break; // Unnecessary
} // switch(msgId)
// If we have received a quit signal, this is the next piece of code that follows.
if (quit)
break;
// It might be that we received dimension update after having nothing to process.
// In that case, we can't go into this loop, because we still have nothing to process.
if (freeRows + reportedRows == BUFSIZE)
continue;
// Searching the next processable row.
// It might be possible, that we were given a full buffer of row requests
// for a future view. This might be, for example, because the preceeding
// dimension message lagged. Therefore we must escape if we found nothing
// to process after BUFSIZE iterations. If that is the case, we will
// set nothingToProcess to indicate that we should not go into this loop
// again before receiving more messages.
for (i = 0; i < BUFSIZE; ++i, procIndex = (procIndex+1)%BUFSIZE) {
if (rowBuffer[procIndex].view == -1); // Free, ignore
else if (rowBuffer[procIndex].view < viewCount) {
// Obsolete, free
if (DEBUG)
printf("Child %d: Slot %d is obsolete (%d < %d), freeing in proc loop\n",
rank, procIndex, rowBuffer[procIndex].view, viewCount);
rowBuffer[procIndex].view = -1;
freeRows++;
} else if (rowBuffer[procIndex].view == viewCount) {
// This needs processing
if (DEBUG)
printf("Child %d: Processing slot %d (view %d, row %d)\n",
rank, procIndex, rowBuffer[procIndex].view, rowBuffer[procIndex].rowNumber);
processRow(rowBuffer[procIndex].rowNumber, dim);
rowBuffer[procIndex].view = -1;
freeRows++;
// We could loop more, but we try to avoid running out of processing,
// so we check the status of communication each loop
nothingToProcess = 0;
break;
} // What remains are the future rows (i.e. rows for a view whose dimensions aren't declared yet)
}
if (i == BUFSIZE) // We didn't find anything
nothingToProcess = 1;
// If there are free rows, and if the master has replied us since the last report,
// and if the previous asynchronous send has already finished,
// we report the master all the new free rows we've had since our last report.
// (We wait for master's reply so that we don't flood it; if comm. is slow
// and there are hundreds of clients, each can't have multiple reports pending at a time.)
if (freeRows > 0 && !responsePending) {
MPI_Test(&req[2], &commFlag, &tempStatus);
//printf("rvalue: %d, comm %d, success? %d\n", rvalue, commFlag, rvalue == MPI_SUCCESS);
if (commFlag) {
if (DEBUG)
printf("Child %d: Previous send of %d slots went through, sending a new one with %d\n",
rank, reportRows, freeRows);
reportedRows += freeRows;
reportRows = freeRows;
freeRows = 0;
MPI_Isend(&reportRows, 1, MPI_INT, 0, tag_free, MPI_COMM_WORLD, &req[2]);
responsePending = 1;
}
}
} // while (!quit)
if (DEBUG)
printf("Child %d: Telling the master we're done\n", rank);
// We send a blocking message to the master telling that we're ready to quit
reportRows = 0;
MPI_Ssend(&reportRows, 1, MPI_INT, 0, tag_free, MPI_COMM_WORLD);
if (DEBUG)
printf("Child %d: Terminating execution\n", rank);
} // childExec()
// We already have open listeners for tag_free,
// so we continue listening them (req).
void terminate(MPI_Request *req, int *freeSlots) {
int i;
int msg;
MPI_Status tempStatus;
// Quitting, we send each client the quit signal and wait
for (i = 1; i < procc; ++i) {
if (DEBUG)
printf("Master: Sending client %d the termination signal\n", i);
msg = 0; // We might as well use this as the data
MPI_Ssend(&msg, 1, MPI_INT, i, tag_rows, MPI_COMM_WORLD);
if (DEBUG)
printf("Master: Waiting for acknowledgement...\n");
do {
MPI_Wait(&req[i-1], &tempStatus);
// Starting a listen for the next value
MPI_Irecv(&freeSlots[i-1], 1, MPI_INT, i, tag_free, MPI_COMM_WORLD, &req[i-1]);
if (DEBUG)
printf("Master: Received %d free slots from client %d, expected 0\n",
freeSlots[i-1], i);
} while (freeSlots[i-1]); // We ignore any remaining "give more rows"-calls
if (DEBUG)
printf("Master: Client %d has terminated\n", i);
}
printf("All clients have terminated. Master terminating.\n");
} // terminate()
// Returns true on exit, and stores new dimensions over the old ones
int getDrag(double *dim) { // Returns 1 on exit
int dragX1, dragX2,
dragY1, dragY2;
int dragArea;
static int dragAreaThreshold = 10;
// Reading the new dimensions from the user
MPE_Get_drag_region(graph, 1, 1, &dragX1, &dragY1, &dragX2, &dragY2);
if (DEBUG)
printf("Dragged %d %d %d %d\n", dragX1, dragY1, dragX2, dragY2);
dragArea = (dragX2-dragX1)*(dragY2-dragY1); // It seems that always dragX2 > dragX1, dragY2 > dragY1
// We simply compute the new dimensions.
dim[0] += dim[2]*((double)dragX1/(double)WIDTH);
dim[1] += dim[3]*((double)dragY1/(double)HEIGHT);
dim[2] *= ((double)(dragX2-dragX1)/(double)WIDTH);
dim[3] *= ((double)(dragY2-dragY1)/(double)HEIGHT);
if (DEBUG)
printf("Drag area %d, threshold %d\n", dragArea, dragAreaThreshold);
// If the area that was dragged was < threshold, we exit.
if (dragArea < dragAreaThreshold)
return 1;
else
return 0;
} // getDrag()
void serverExec() {
// Initial values
double dim[] = { // X, Y, W, H
-2.0, -2.0, 4.0, 4.0
};
int quit = 0;
int i, j;
int rowsLeft;
int viewCount = 0;
double startTime, tempTime;
int clients = procc - 1;
MPI_Request req_rows[clients];
MPI_Request req_dim[clients];
MPI_Status sta[clients];
int completeCount;
// There should be at most procc-1 requests, one per client,
// but we allow 2 just in case.
int completed[clients*2];
int freeSlots[clients]; // Total free slots
int newFreeSlots[clients]; // Received new free slots
int tempRowCount;
int tempRowData[2+HEIGHT]; // Send buffer, this is for 1 client and BUFSIZE >= HEIGHT
// Initially we fork a listener for each client
for (i = 0; i < clients; ++i) {
freeSlots[i] = 0; // Resetting free slots
MPI_Irecv(&newFreeSlots[i], 1, MPI_INT, i+1, tag_free, MPI_COMM_WORLD, &req_rows[i]);
}
if (DEBUG)
printf("Master: Listeners for %d clients forked\n", clients);
do {
startTime = MPI_Wtime();
viewCount++;
// Announce dimensions. The optimal solution would be to use broadcast,
// but we rely on tags, so we can't use it. Fortunately this phase
// isn't a big time-consumer.
if (DEBUG)
printf("Master: Announcing dimensions (%.3f, %.3f, %.3f, %.3f)\n",
dim[0], dim[1], dim[2], dim[3]);
for (i = 0; i < clients; ++i)
// We can fork and forget, because clients can receive row data
// before they know the dimensions.
MPI_Isend(dim, 4, MPI_DOUBLE, i+1, tag_dim, MPI_COMM_WORLD, &req_dim[i]);
rowsLeft = HEIGHT;
// While we have rows to give out, we block and wait for any client announcements
while (rowsLeft) {
// We need to get responses from clients fairly, and
// Waitany() might favor some clients more than others,
// so we use Waitsome, and process incoming requests from all clients.
if (MPI_Waitsome(clients, req_rows, &completeCount, completed, sta) != MPI_SUCCESS)
fprintf(stderr, "Master: Error in reading children status\n");
if (DEBUG)
printf("Master: Got %d announcements\n", completeCount);
// For each client, we read the results and refork the receiver.
for (i = 0; i < completeCount; ++i) {
if (DEBUG)
printf("Master: Client %d reported %d free slots\n",
completed[i], newFreeSlots[completed[i]]);
// Adding the new spots to what we previously knew
freeSlots[completed[i]] += newFreeSlots[completed[i]];
// Restarting the listener
MPI_Irecv(&newFreeSlots[completed[i]], 1, MPI_INT,
completed[i]+1, tag_free, MPI_COMM_WORLD, &req_rows[completed[i]]);
// We hand out N rows to the client.
// N is max(rows the client can accept, HEIGHT/clients, rowsLeft).
// We cap N to HEIGHT/clients so that we have something for every client.
// We could also do any type of scheduling here, for example give more
// rows in the beginning of the calculation, etc.
tempRowCount = (HEIGHT/clients < freeSlots[completed[i]]) ?
HEIGHT/clients : freeSlots[completed[i]]; // Rounding errors in int/int division don't matter
if (tempRowCount > rowsLeft)
tempRowCount = rowsLeft;
// Example:
// If we start at HEIGHT = 10, and this client eats up 5 requests,
// we give it rows 9, 8, 7, 6 and 5. The next client gets
// 4, 3, 2, 1 and 0. After that, while(rowsLeft) exits.
tempRowData[0] = viewCount;
tempRowData[1] = tempRowCount;
for (j = 0; j < tempRowCount; ++j)
tempRowData[2+j] = --rowsLeft;
// Counterintuitively, we are doing a synchronized send here.
// We can actually sleep in the master loop quite long before clients
// deplete themselves of processing. And it is really beneficial not to
// give out large amounts of small messages, with large pending message queues.
// Each client accepts new rows after processing one line, therefore
// BUFSIZE for a client needs to be at least 1*client_count for no stalling to occur.
// This also keeps the communication at the minimum.
// NOTE: clients < BUFSIZE is not a serious limitation unless we want to use
// thousands of clients for small fractal resolution. But if we really feel that
// it is constricting, we can spawn several asynchronous sends for different clients,
// but this increases communication.
if (DEBUG)
printf("Master: Sending %d new rows to client %d, leaving %d free\n",
tempRowCount, completed[i]+1, freeSlots[completed[i]]-tempRowCount);
MPI_Ssend(tempRowData, tempRowCount+2, MPI_INT, completed[i]+1, tag_rows, MPI_COMM_WORLD);
// Removing the sent rows from the available pool
freeSlots[completed[i]] -= tempRowCount;
} // for
} // while(rowsLeft)
// We should wait here for all the clients to finish, but that would have an
// impact on the user experience: We designed clients so that they can be
// interrupted before they finish, and they discard old rows automatically.
// Therefore user can make another zoom as soon as he likes (well, the master
// has to have got rid of the rowsLeft first)
// This has an impact on the timing: the time tells how fast the master thread
// is ready to take another drag event. There might still be up to
// BUFSIZE*clients rows left to process.
tempTime = MPI_Wtime();
printf("Master: View %d took %.3f seconds\n",
viewCount, tempTime - startTime);
// PS. If we wanted to do this properly, we would need an extra thread to catch
// "I'm finished" calls from clients while we sit in the MPE_Get_drag_region function,
// and then report the time...
if (getDrag(dim))
quit = 1;
} while (!quit);
if (DEBUG)
printf("Master: Exited the main loop\n");
terminate(req_rows, newFreeSlots);
} // serverExec
int main(int argc, char **argv) {
if (mpiInit(argc, argv)) {
fprintf(stderr, "Couldn't init MPI\n");
return 10;
}
if (!rank && procc-1 > BUFSIZE)
printf("WARNING: You have %d clients and %d BUFSIZE.\nTo keep all clients busy, BUFSIZE is recommended to be larger than client count.\n", procc-1, BUFSIZE);
if (rank)
clientExec();
else
serverExec();
MPI_Finalize();
return 0;
} // main()
mandelbrot.c
Comments
21.12.2011
15.10.2012