#include #include #include #include /* Paul McCarthy Microprocessors Spring 2002 Dr. Rommel */ /* This program started out as ForkJoin.c, pp. 63-68, from the PVM User's Guide and Tutorial for Networked Parallel Computing, MIT Press, 1994. Obviously I've modified it extensively from that base, however. */ /* defines and prototypes for the PVM library */ #include "pvm3.h" #define maxN 8 /* Maximum number of children this program will spawn */ #define MAXNCHILD 8 /* Tag to use for the joing message */ #define JOINTAG 11 int main(int argc, char* argv[]) { void mergesortBU(int a[], int l, int r); time_t t; /* number of tasks to spawn, use 8 as the default */ int ntask = 8; /* return code from pvm calls */ int info; /* my task id */ int mytid, mygid; /* my parents task id */ int myparent; /* children task id array */ int child[MAXNCHILD]; int i, j, x, y, list[64], mydata[8], pair_data[8], dummy[8], buf, len, tag, tid, size, CX_pair; int CX_data[] = {1,3,1,7,2,1}; int nhost, narch; char *workers; struct pvmhostinfo *hostp[32]; info = pvm_config(&nhost, &narch, hostp); if (strcmp(hostp[0]->hi_arch, "WIN32") == 0) { workers = "parmerge.exe"; } else if (strcmp(hostp[0]->hi_arch, "LINUX") == 0) { workers = "parmerge"; } else { workers = "parmerge"; } /* find out my task id number */ mytid = pvm_mytid(); /* check for error */ if (mytid < 0) { /* print out the error */ pvm_perror(argv[0]); /* exit the program */ return -1; } /* find my parent's task id number */ myparent = pvm_parent(); /* exit if there is some error other than PvmNoParent */ if ((myparent < 0) && (myparent != PvmNoParent)) { pvm_perror(argv[0]); pvm_exit(); return -1; } /* if I don't have a parent then I am the parent */ if (myparent == PvmNoParent) { /* spawn the child tasks */ for (i = 0; i < ntask; i++) { info = pvm_spawn(workers, (char**)0, PvmTaskDefault, NULL, 1, &child[i]); /* make sure spawn succeeded */ if (info == 0) { pvm_exit(); return -1; } /* wait for pvm to assign i'th group number */ while (pvm_getinst(workers, child[i]) < i); } /* srand(pvm_mytid()); */ t = time(NULL); srand(t); for (i = 0; i < 64; i++) { list[i] = rand() % 1000; } printf("Unsorted lists farmed out to workers...\n\n"); for (i = 0; i < ntask; i++) { info = pvm_initsend(PvmDataDefault); if (info < 0) { pvm_perror("calling pvm_initsend"); pvm_exit(); return -1; } for (j = i*8; j < (i+1)*8; j++) { mydata[j % 8] = list[j]; } info = pvm_psend(child[i], JOINTAG, mydata, 8, PVM_INT); if (info < 0) { pvm_perror("calling pvm_send"); pvm_exit(); return -1; } for (j = 0; j < 8; j++) { printf("%d ", mydata[j]); } printf(" %d\n", i); } printf("\n\n"); printf("Lists after sequential mergesort of each worker's data...\n\n"); for (i = 0; i < ntask; i++) { /* recv a message from any child process */ buf = pvm_recv(-1, JOINTAG); if (buf < 0) pvm_perror("calling recv"); info = pvm_bufinfo(buf, &len, &tag, &tid); if (info < 0) pvm_perror("calling pvm_bufinfo"); info = pvm_upkint(mydata, 8, 1); if (info < 0) pvm_perror("calling pvm_upkint"); for (j = 0; j < 8; j++) printf("%d ", mydata[j]); info = pvm_upkint(&mygid, 1, 1); printf(" %d", mygid); printf("\n"); } printf("\n\n"); printf("Final list partitioned amongst the 8 processes after parallel merge ...\n\n"); for (i = 0; i < ntask; i++) { /* recv a message from any child process */ buf = pvm_recv(-1, JOINTAG); if (buf < 0) pvm_perror("calling recv"); info = pvm_bufinfo(buf, &len, &tag, &tid); if (info < 0) pvm_perror("calling pvm_bufinfo"); info = pvm_upkint(mydata, 8, 1); if (info < 0) pvm_perror("calling pvm_upkint"); for (j = 0; j < 8; j++) printf("%d ", mydata[j]); info = pvm_upkint(&mygid, 1, 1); printf(" %d", mygid); printf("\n"); } /* only added to separate the different program runs printf("\n\n\n\n\n"); */ pvm_exit(); return 0; } /* I'm a child */ mygid = pvm_joingroup(workers); /* recv data from parent process */ buf = pvm_recv(myparent, JOINTAG); if (buf < 0) pvm_perror("calling recv"); info = pvm_bufinfo(buf, &len, &tag, &tid); if (info < 0) pvm_perror("calling pvm_bufinfo"); info = pvm_upkint(mydata, 8, 1); if (info < 0) pvm_perror("calling pvm_upkint"); mergesortBU(mydata, 0, 7); info = pvm_initsend(PvmDataDefault); if (info < 0) { pvm_perror("calling pvm_initsend"); pvm_exit(); return -1; } info = pvm_pkint(mydata, 8, 1); if (info < 0) pvm_perror("calling pvm_upkint"); info = pvm_pkint(&mygid, 1, 1); if (info < 0) { pvm_perror("calling pvm_pkint"); pvm_exit(); return -1; } info = pvm_send(myparent, JOINTAG); if (info < 0) { pvm_perror("calling pvm_send"); pvm_exit(); return -1; } for (i = 0; i < 6; i++) { CX_pair = mygid ^ CX_data[i]; /* send data to CX pair process */ info = pvm_initsend(PvmDataDefault); if (info < 0) { pvm_perror("calling pvm_initsend"); pvm_exit(); return -1; } info = pvm_psend(pvm_gettid(workers, CX_pair), JOINTAG, mydata, 8, PVM_INT); if (info < 0) { pvm_perror("calling pvm_psend"); pvm_exit(); return -1; } /* recv data from CX pair process */ buf = pvm_recv(pvm_gettid(workers, CX_pair), JOINTAG); if (buf < 0) pvm_perror("calling recv"); info = pvm_bufinfo(buf, &len, &tag, &tid); if (info < 0) pvm_perror("calling pvm_bufinfo"); info = pvm_upkint(pair_data, 8, 1); if (info < 0) pvm_perror("calling pvm_upkint"); /* perform parallel merging */ for (j = 0; j < 8; j++) dummy[j] = mydata[j]; if (mygid < CX_pair) { x = y = 0; for (j = 0; j < 8; j++) { mydata[j] = (dummy[x] < pair_data[y]) ? dummy[x++] : pair_data[y++]; } } else { x = y = 7; for (j = 7; j >= 0; j--) { mydata[j] = (dummy[x] > pair_data[y]) ? dummy[x--] : pair_data[y--]; } } /* wait for all processes to complete parallel merge step */ info = pvm_barrier(workers, 8); if (info < 0) { pvm_perror("calling pvm_barrier"); pvm_exit(); return -1; } } info = pvm_initsend(PvmDataDefault); if (info < 0) { pvm_perror("calling pvm_initsend"); pvm_exit(); return -1; } info = pvm_pkint(mydata, 8, 1); if (info < 0) pvm_perror("calling pvm_upkint"); info = pvm_pkint(&mygid, 1, 1); if (info < 0) { pvm_perror("calling pvm_pkint"); pvm_exit(); return -1; } info = pvm_send(myparent, JOINTAG); if (info < 0) { pvm_perror("calling pvm_send"); pvm_exit(); return -1; } pvm_exit(); return 0; } /* These next two functions are pure Sedgewick... Functions merge and mergesortBU are only slight modifications, mostly just to make them C functions, of Sedgewick's algorithms as given in "Algorithms in C++", Robert Sedgewick, Addison-Wesley 1998, pp. 352, 360. */ void merge(int a[], int l, int m, int r) { int i, j, k; static int aux[maxN]; for (i = m + 1; i > l; i--) aux[i-1] = a[i-1]; for (j = m; j < r; j++) aux[r+m-j] = a[j+1]; for (k = l; k <= r; k++) if (aux[j] < aux[i]) a[k] = aux[j--]; else a[k] = aux[i++]; } void mergesortBU(int a[], int l, int r) { int m, i; for (m = 1; m <= r-l; m = m+m) for (i = l; i <= r-m; i += m+m) merge(a, i, i+m-1, lesser(i+m+m-1, r)); } /* Necessary because on Linux doesn't have the 'min' macro defined... so, the Windows gets mad if there's another min function defined and the Linux doesn't seem to have the macro. So, this seems an easy way to make the program more portable. */ int lesser(int a, int b) { return (a < b) ? a : b; }