Showing posts with label MPI. Show all posts
Showing posts with label MPI. Show all posts

Wednesday, June 13, 2012

Programmetically execute simple MPI job on Ranger using Apache Airavata

In a earlier post [2] we looked at how to execute a MPI job in Ranger [1] using the XBaya GUI. This post describes how to run the same scenario using a java client. This java client does not use the AiravataClient API but it uses XML Beans generated from Schema to describe and run the MPI job programmetically. I will be writing a test client later, which will be using AiravataClient API.

1. Configure gram.properties file which will be used in the test case. (Let's assume it's named gram_ranger.properties)
# The myproxy server to retrieve the grid credentials
myproxy.server=myproxy.teragrid.org
# Example: XSEDE myproxy server
#myproxy.server=myproxy.teragrid.org

# The user name and password to fetch grid proxy
myproxy.username=username
myproxy.password=********

#Directory with Grid Certification Authority certificates and CRL's
# The certificates for XSEDE can be downloaded from http://software.xsede.org/security/xsede-certs.tar.gz
ca.certificates.directory=/home/heshan/Dev/setup/gram-provider/certificates

# On computational grids, an allocation is awarded with a charge number. On XSEDE, the numbers are typically of the format TG-DIS123456
allocation.charge.number=TG-STA110014S

# The scratch space with ample space to create temporary working directory on target compute cluster
scratch.working.directory=/scratch/01437/ogce/test

# Name, FQDN, and gram and gridftp end points of the remote compute cluster
host.commom.name=gram
host.fqdn.name=gatekeeper2.ranger.tacc.teragrid.org
gridftp.endpoint=gsiftp://gridftp.ranger.tacc.teragrid.org:2811/
gram.endpoints=gatekeeper.ranger.tacc.teragrid.org:2119/jobmanager-sge
defualt.queue=development
2. Using the above configured properties file (gram_ranger.properties) run the test case which will execute the simple MPI job on Ranger.
import org.apache.airavata.commons.gfac.type.ActualParameter;
import org.apache.airavata.commons.gfac.type.ApplicationDeploymentDescription;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.commons.gfac.type.ServiceDescription;
import org.apache.airavata.core.gfac.context.invocation.impl.DefaultExecutionContext;
import org.apache.airavata.core.gfac.context.invocation.impl.DefaultInvocationContext;
import org.apache.airavata.core.gfac.context.message.impl.ParameterContextImpl;
import org.apache.airavata.core.gfac.context.security.impl.GSISecurityContext;
import org.apache.airavata.core.gfac.notification.impl.LoggingNotification;
import org.apache.airavata.core.gfac.services.impl.PropertiesBasedServiceImpl;
import org.apache.airavata.migrator.registry.MigrationUtil;
import org.apache.airavata.registry.api.impl.AiravataJCRRegistry;
import org.apache.airavata.schemas.gfac.*;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.net.URL;
import java.util.*;

import static org.junit.Assert.fail;

public class GramProviderMPIRangerTest {

    public static final String MYPROXY = "myproxy";
    public static final String GRAM_PROPERTIES = "gram_ranger.properties";
    private AiravataJCRRegistry jcrRegistry = null;

    @Before
    public void setUp() throws Exception {
        Map<String,String> config = new HashMap<String,String>();
            config.put("org.apache.jackrabbit.repository.home","target");

        jcrRegistry = new AiravataJCRRegistry(null,
                "org.apache.jackrabbit.core.RepositoryFactoryImpl", "admin",
                "admin", config);

        // Host
        URL url = this.getClass().getClassLoader().getResource(GRAM_PROPERTIES);
        Properties properties = new Properties();
        properties.load(url.openStream());
        HostDescription host = new HostDescription();
        host.getType().changeType(GlobusHostType.type);
        host.getType().setHostName(properties.getProperty("host.commom.name"));
        host.getType().setHostAddress(properties.getProperty("host.fqdn.name"));
        ((GlobusHostType) host.getType()).setGridFTPEndPointArray(new String[]{properties.getProperty("gridftp.endpoint")});
        ((GlobusHostType) host.getType()).setGlobusGateKeeperEndPointArray(new String[]{properties.getProperty("gram.endpoints")});

        /* Application */
        ApplicationDeploymentDescription appDesc = new ApplicationDeploymentDescription(GramApplicationDeploymentType.type);
        GramApplicationDeploymentType app = (GramApplicationDeploymentType) appDesc.getType();
        app.setCpuCount(1);
        app.setNodeCount(1);
        ApplicationDeploymentDescriptionType.ApplicationName name = appDesc.getType().addNewApplicationName();
        name.setStringValue("EchoMPILocal");
        app.setExecutableLocation("/share/home/01437/ogce/airavata-test/mpi-hellow-world");
        app.setScratchWorkingDirectory(properties.getProperty("scratch.working.directory"));
        app.setCpuCount(16);
        app.setJobType(MigrationUtil.getJobTypeEnum("MPI"));
        //app.setMinMemory();
        ProjectAccountType projectAccountType = ((GramApplicationDeploymentType) appDesc.getType()).addNewProjectAccount();
        projectAccountType.setProjectAccountNumber(properties.getProperty("allocation.charge.number"));

        /* Service */
        ServiceDescription serv = new ServiceDescription();
        serv.getType().setName("SimpleMPIEcho");

        InputParameterType input = InputParameterType.Factory.newInstance();
        ParameterType parameterType = input.addNewParameterType();
        parameterType.setName("echo_mpi_input");
        List<InputParameterType> inputList = new ArrayList<InputParameterType>();
        inputList.add(input);
        InputParameterType[] inputParamList = inputList.toArray(new InputParameterType[inputList
                .size()]);

        OutputParameterType output = OutputParameterType.Factory.newInstance();
        ParameterType parameterType1 = output.addNewParameterType();
        parameterType1.setName("echo_mpi_output");
        List<OutputParameterType> outputList = new ArrayList<OutputParameterType>();
        outputList.add(output);
        OutputParameterType[] outputParamList = outputList
                .toArray(new OutputParameterType[outputList.size()]);
        serv.getType().setInputParametersArray(inputParamList);
        serv.getType().setOutputParametersArray(outputParamList);

        /* Save to Registry */
        jcrRegistry.saveHostDescription(host);
        jcrRegistry.saveDeploymentDescription(serv.getType().getName(), host.getType().getHostName(), appDesc);
        jcrRegistry.saveServiceDescription(serv);
        jcrRegistry.deployServiceOnHost(serv.getType().getName(), host.getType().getHostName());
    }

    @Test
    public void testExecute() {
        try {
            URL url = this.getClass().getClassLoader().getResource(GRAM_PROPERTIES);
            Properties properties = new Properties();
            properties.load(url.openStream());

            DefaultInvocationContext ct = new DefaultInvocationContext();
            DefaultExecutionContext ec = new DefaultExecutionContext();
            ec.addNotifiable(new LoggingNotification());
            ec.setRegistryService(jcrRegistry);
            ct.setExecutionContext(ec);


            GSISecurityContext gsiSecurityContext = new GSISecurityContext();
            gsiSecurityContext.setMyproxyServer(properties.getProperty("myproxy.server"));
            gsiSecurityContext.setMyproxyUserName(properties.getProperty("myproxy.username"));
            gsiSecurityContext.setMyproxyPasswd(properties.getProperty("myproxy.password"));
            gsiSecurityContext.setMyproxyLifetime(14400);
            gsiSecurityContext.setTrustedCertLoc(properties.getProperty("ca.certificates.directory"));

            ct.addSecurityContext(MYPROXY, gsiSecurityContext);

            ct.setServiceName("SimpleMPIEcho");

            /* Input */
            ParameterContextImpl input = new ParameterContextImpl();
            ActualParameter echo_input = new ActualParameter();
            ((StringParameterType) echo_input.getType()).setValue("echo_mpi_output=hi");
            input.add("echo_mpi_input", echo_input);

            /* Output */
            ParameterContextImpl output = new ParameterContextImpl();
            ActualParameter echo_output = new ActualParameter();
            output.add("echo_mpi_output", echo_output);

            /* parameter */
            ct.setInput(input);
            ct.setOutput(output);

            PropertiesBasedServiceImpl service = new PropertiesBasedServiceImpl();
            service.init();
            service.execute(ct);

            System.out.println("output              : " + ct.getOutput().toString());
            System.out.println("output from service : " + ct.getOutput().getValue("echo_mpi_output"));

            Assert.assertNotNull(ct.getOutput());
            Assert.assertNotNull(ct.getOutput().getValue("echo_mpi_output"));

            System.out.println("output              : " + ((StringParameterType) ((ActualParameter) ct.getOutput().getValue("echo_mpi_output")).getType()).getValue());

        } catch (Exception e) {
            e.printStackTrace();
            fail("ERROR");
        }
    }
}
[1] - http://www.tacc.utexas.edu/user-services/user-guides/ranger-user-guide 
[2] - http://heshans.blogspot.com/2012/06/execute-simple-mpi-job-on-ranger-using.html

Execute simple MPI job on Ranger using Apache Airavata

In an earlier post[3] we looked at how to install a simple hello world MPI program in Ranger [1]. In this post we will look at how to execute the previously installed application on Ranger using Apache Airavata.

1. Before starting airavata configure repository.properties file by modifying default fake values for following properties.

trusted.cert.location=path to certificates for ranger
myproxy.server=myproxy.teragrid.org
myproxy.user=username
myproxy.pass=password
myproxy.life=3600

Configure your certificate location and my proxy credentials and start Jackrabbit instance and airavata server with all the services.

2. To create a workflow to run on ranger you need to create Host Description, Application Description and a Service Description. If you  don’t know know how to create them refer airavata 10 minutes article [2] to understand how to create those documents. Once you become familiar with XBaya UI, use following values for fields given below and create documents using XBaya GUI.


Host Description
Click on the check box - Define this host as Globus host, then following two entries will enable to fillup.

Service Description

Fill the values similarly like in 10 minutes article [2] service description saving part.



Application Description

  • Executable path - /share/home/01437/ogce/airavata-test/mpi-hellow-world
  • Temporary Directory - /scratch/01437/ogce/test
Select the above created service descriptor and host Descriptor.. When you select the above created host descriptor you will see a new button Gram Configuration. Now click on that and fill following values.
  • Job Type - MPI
  • Project Account Number - You will see this when you login to ranger, put your project Account number in this field

  • Project Description - Description of the project - not mandetory
  • Queue Type - development
Click on Update button and save the Application Description.

Now you have successfully created Descriptors and create a MPI workflow like you have done in 10 minutes article [2] and try to run it.



[1] - http://www.tacc.utexas.edu/user-services/user-guides/ranger-user-guide
[2] - http://incubator.apache.org/airavata/documentation/system/airavata-in-10-minutes.html
[3] - http://heshans.blogspot.com/2012/06/running-simple-mpi-job-on-ranger.html

Running a simple MPI job on Ranger

Let's consider running a simple MPI job on Ranger [1]. The MPI program considered here will be a hello-world program.

1) Write a hello world application in C.
#include <stdio.h>
#include <mpi.h>


int main (argc, argv)
     int argc;
     char *argv[];
{
  int rank, size;

  MPI_Init (&argc, &argv);        /* starts MPI */
  MPI_Comm_rank (MPI_COMM_WORLD, &rank);        /* get current process id */
  MPI_Comm_size (MPI_COMM_WORLD, &size);        /* get number of processes */
  printf( "Hello world from process %d of %d\n", rank, size );
  MPI_Finalize();
  return 0;
}

2) Compile it in Ranger.
mpicc -o mpi-hellow-world mpi-hellow-world.c

3) Write a schedular script to run your application. (Let's assume that we have saved it under the name scheduler_sge_job_mpi_helloworld)
#!/bin/bash
# Grid Engine batch job script built by Globus job manager

#$ -S /bin/bash
#$ -V
#$ -pe 16way 16
#$ -N MPI-Airavata-Testing-Script
#$ -M heshan@ogce.org
#$ -m n
#$ -q development
#$ -A ***********
#$ -l h_rt=0:09:00
#$ -o /share/home/01437/ogce/airavata-test/mpi-hello.stdout
#$ -e /share/home/01437/ogce/airavata-test/mpi-hello.stderr
ibrun /share/home/01437/ogce/airavata-test/mpi-hellow-world

4) Use the qsub command to submit a batch job to Ranger.
ogce@login3.ranger.tacc.utexas.edu:/airavata-test/{13}> qsub scheduler_sge_job_mpi_helloworld
Once the job is submitted following output can be seen.
-------------------------------------------------------------------
------- Welcome to TACC's Ranger System, an NSF XD Resource -------
-------------------------------------------------------------------
--> Checking that you specified -V...
--> Checking that you specified a time limit...
--> Checking that you specified a queue...
--> Setting project...
--> Checking that you specified a parallel environment...
--> Checking that you specified a valid parallel environment name...
--> Checking that the minimum and maximum PE counts are the same...
--> Checking that the number of PEs requested is valid...
--> Ensuring absence of dubious h_vmem,h_data,s_vmem,s_data limits...
--> Requesting valid memory configuration (31.3G)...
--> Verifying WORK file-system availability...
--> Verifying HOME file-system availability...
--> Verifying SCRATCH file-system availability...
--> Checking ssh setup...
--> Checking that you didn't request more cores than the maximum...
--> Checking that you don't already have the maximum number of jobs...
--> Checking that you don't already have the maximum number of jobs in queue development...
--> Checking that your time limit isn't over the maximum...
--> Checking available allocation...
--> Submitting job...


Your job 2518464 ("MPI-Airavata-Testing-Script2") has been submitted

5) Using the qstat command check the status of the job.
ogce@login3.ranger.tacc.utexas.edu:/airavata-test/{17}> qstat
job-ID  prior   name       user         state submit/start at     queue                          slots ja-task-ID 
-----------------------------------------------------------------------------------------------------------------
2518464 0.00000 MPI-Airava ogce         qw    04/19/2012 11:42:01                                   16        

6) The result of the batch job is written to the specified output file.
TACC: Setting memory limits for job 2518464 to unlimited KB
TACC: Dumping job script:
--------------------------------------------------------------------------------
#!/bin/bash
# Grid Engine batch job script built by Globus job manager

#$ -S /bin/bash
#$ -V
#$ -pe 16way 16
#$ -N MPI-Airavata-Testing-Script2
#$ -M ***@ogce.org
#$ -m n
#$ -q development
#$ -A TG-STA110014S
#$ -l h_rt=0:09:00
#$ -o /share/home/01437/ogce/airavata-test/mpi-hello.stdout
#$ -e /share/home/01437/ogce/airavata-test/mpi-hello.stderr
ibrun /share/home/01437/ogce/airavata-test/mpi-hellow-world
--------------------------------------------------------------------------------
TACC: Done.
TACC: Starting up job 2518464
TACC: Setting up parallel environment for OpenMPI mpirun.
TACC: Setup complete. Running job script.
TACC: starting parallel tasks...
echo_mpi_output=Hello world from process 7 of 16
echo_mpi_output=Hello world from process 6 of 16
echo_mpi_output=Hello world from process 2 of 16
echo_mpi_output=Hello world from process 4 of 16
echo_mpi_output=Hello world from process 3 of 16
echo_mpi_output=Hello world from process 10 of 16
echo_mpi_output=Hello world from process 13 of 16
echo_mpi_output=Hello world from process 9 of 16
echo_mpi_output=Hello world from process 8 of 16
echo_mpi_output=Hello world from process 0 of 16
echo_mpi_output=Hello world from process 1 of 16
echo_mpi_output=Hello world from process 12 of 16

[1] - http://www.tacc.utexas.edu/user-services/user-guides/ranger-user-guide

Tuesday, June 2, 2009

MPI vs Pthread

I did a performance comparison between my MPI & pthread codes for Matrix multiplication. Matrices used were 1000*1000 and MPI code was run on UCSC's swelanka cluster (using 4 CPUs). While the MPI code took only 2.6 seconds to multiply two 1000*1000 matrices, the pthread code took more that 45 seconds(on avg).

Wednesday, May 13, 2009

Matrix Multiplication using MPI

MPI(Message Passing Interface) is a library specification for message-passing. MPI was designed for high performance on both massively parallel machines and on workstation clusters. Following matrix multiplication is written in accordance to MPI. You could download the code from here.

/******************************************************************************
* Matrix Multiplication Program
* Heshan Suriyaarachchi
* ABOUT:
* Master task distributes a matrix multiply
* operation to numtasks-1 worker tasks.
*
******************************************************************************/

#include <stdio.h>
#include "mpi.h"
#define NRA 512 /* number of rows in matrix A */
#define NCA 512 /* number of columns in matrix A */
#define NCB 512 /* number of columns in matrix B */
#define MASTER 0 /* taskid of first task */
#define FROM_MASTER 1 /* setting a message type */
#define FROM_WORKER 2 /* setting a message type */

MPI_Status status;

double a[NRA][NCA], /* matrix A to be multiplied */
b[NCA][NCB], /* matrix B to be multiplied */
c[NRA][NCB]; /* result matrix C */

main(int argc, char **argv)
{
int numtasks, /* number of tasks in partition */
taskid, /* a task identifier */
numworkers, /* number of worker tasks */
source, /* task id of message source */
dest, /* task id of message destination */
nbytes, /* number of bytes in message */
mtype, /* message type */
intsize, /* size of an integer in bytes */
dbsize, /* size of a double float in bytes */
rows, /* rows of matrix A sent to each worker */
averow, extra, offset, /* used to determine rows sent to each worker */
i, j, k, /* misc */
count;

struct timeval start, stop;

intsize = sizeof(int);
dbsize = sizeof(double);

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &taskid);
MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
numworkers = numtasks-1;

//printf(" size of matrix A = %d by %d\n",NRA,NCA);
//printf(" size of matrix B = %d by %d\n",NRA,NCB);
/*---------------------------- master ----------------------------*/
if (taskid == MASTER) {
printf("Number of worker tasks = %d\n",numworkers);
for (i=0; i<NRA; i++)
for (j=0; j<NCA; j++)
a[i][j]= i+j;
for (i=0; i<NCA; i++)
for (j=0; j<NCB; j++)
b[i][j]= i*j;

gettimeofday(&start, 0);

/* send matrix data to the worker tasks */
averow = NRA/numworkers;
extra = NRA%numworkers;
offset = 0;
mtype = FROM_MASTER;
for (dest=1; dest<=numworkers; dest++) {
rows = (dest <= extra) ? averow+1 : averow;
//printf(" Sending %d rows to task %d\n",rows,dest);
MPI_Send(&offset, 1, MPI_INT, dest, mtype, MPI_COMM_WORLD);
MPI_Send(&rows, 1, MPI_INT, dest, mtype, MPI_COMM_WORLD);
count = rows*NCA;
MPI_Send(&a[offset][0], count, MPI_DOUBLE, dest, mtype, MPI_COMM_WORLD);
count = NCA*NCB;
MPI_Send(&b, count, MPI_DOUBLE, dest, mtype, MPI_COMM_WORLD);

offset = offset + rows;
}

/* wait for results from all worker tasks */
mtype = FROM_WORKER;
for (i=1; i<=numworkers; i++) {
source = i;
MPI_Recv(&offset, 1, MPI_INT, source, mtype, MPI_COMM_WORLD, &status);
MPI_Recv(&rows, 1, MPI_INT, source, mtype, MPI_COMM_WORLD, &status);
count = rows*NCB;
MPI_Recv(&c[offset][0], count, MPI_DOUBLE, source, mtype, MPI_COMM_WORLD,
&status);

}

#ifdef PRINT
printf("Here is the result matrix\n");
for (i=0; i<NRA; i++) {
printf("\n");
for (j=0; j<NCB; j++)
printf("%6.2f ", c[i][j]);
}
printf ("\n");
#endif

gettimeofday(&stop, 0);


fprintf(stdout,"Time = %.6f\n\n",
(stop.tv_sec+stop.tv_usec*1e-6)-(start.tv_sec+start.tv_usec*1e-6));

} /* end of master section */

/*---------------------------- worker (slave)----------------------------*/
if (taskid > MASTER) {
mtype = FROM_MASTER;
source = MASTER;
#ifdef PRINT
printf ("Master =%d, mtype=%d\n", source, mtype);
#endif
MPI_Recv(&offset, 1, MPI_INT, source, mtype, MPI_COMM_WORLD, &status);
#ifdef PRINT
printf ("offset =%d\n", offset);
#endif
MPI_Recv(&rows, 1, MPI_INT, source, mtype, MPI_COMM_WORLD, &status);
#ifdef PRINT
printf ("row =%d\n", rows);
#endif
count = rows*NCA;
MPI_Recv(&a, count, MPI_DOUBLE, source, mtype, MPI_COMM_WORLD, &status);
#ifdef PRINT
printf ("a[0][0] =%e\n", a[0][0]);
#endif
count = NCA*NCB;
MPI_Recv(&b, count, MPI_DOUBLE, source, mtype, MPI_COMM_WORLD, &status);
#ifdef PRINT
printf ("b=\n");
#endif
for (k=0; k<NCB; k++)
for (i=0; i<rows; i++) {
c[i][k] = 0.0;
for (j=0; j<NCA; j++)
c[i][k] = c[i][k] + a[i][j] * b[j][k];
}

//mtype = FROM_WORKER;
#ifdef PRINT
printf ("after computer\n");
#endif
//MPI_Send(&offset, 1, MPI_INT, MASTER, mtype, MPI_COMM_WORLD);
MPI_Send(&offset, 1, MPI_INT, MASTER, FROM_WORKER, MPI_COMM_WORLD);
//MPI_Send(&rows, 1, MPI_INT, MASTER, mtype, MPI_COMM_WORLD);
MPI_Send(&rows, 1, MPI_INT, MASTER, FROM_WORKER, MPI_COMM_WORLD);
//MPI_Send(&c, rows*NCB, MPI_DOUBLE, MASTER, mtype, MPI_COMM_WORLD);
MPI_Send(&c, rows*NCB, MPI_DOUBLE, MASTER, FROM_WORKER, MPI_COMM_WORLD);
#ifdef PRINT
printf ("after send\n");
#endif
} /* end of worker */

MPI_Finalize();
} /* end of main */