1 /* 2 * 3 * This is the SyncIO library that uses MPI-IO collective functions to 4 * implement a flexible I/O checkpoint solution for a large number of 5 * processors. 6 * 7 * Previous developer: Ning Liu (liun2@cs.rpi.edu) 8 * Jing Fu (fuj@cs.rpi.edu) 9 * Current developers: Michel Rasquin (Michel.Rasquin@colorado.edu), 10 * Ben Matthews (benjamin.a.matthews@colorado.edu) 11 * 12 */ 13 14 #include <map> 15 #include <vector> 16 #include <string> 17 #include <cstdarg> 18 #include <string.h> 19 #include <ctype.h> 20 #include <stdlib.h> 21 #include <stdio.h> 22 #include <math.h> 23 #include <sstream> 24 #include "phastaIO.h" 25 #include "phiotmrc.h" 26 #include "phiompi.h" 27 #include "mpi.h" 28 #include "phiostats.h" 29 #include "phiotimer.h" 30 #include <assert.h> 31 32 /* OS-specific things try to stay here */ 33 #include <sys/stat.h> 34 #include <unistd.h> 35 #include <errno.h> 36 37 38 #define VERSION_INFO_HEADER_SIZE 8192 39 #define DB_HEADER_SIZE 1024 40 #define ONE_MEGABYTE 1048576 41 #define TWO_MEGABYTE 2097152 42 #define ENDIAN_TEST_NUMBER 12180 // Troy's Zip Code!! 43 #define MAX_PHASTA_FILES 64 44 #define MAX_PHASTA_FILE_NAME_LENGTH 1024 45 #define MAX_FIELDS_NUMBER ((VERSION_INFO_HEADER_SIZE/MAX_FIELDS_NAME_LENGTH)-4) // The meta data include - MPI_IO_Tag, nFields, nFields*names of the fields, nppf 46 // -3 for MPI_IO_Tag, nFields and nppf, -4 for extra security (former nFiles) 47 #define MAX_FIELDS_NAME_LENGTH 128 48 #define DefaultMHSize (4*ONE_MEGABYTE) 49 //#define DefaultMHSize (8350) //For test 50 #define LATEST_WRITE_VERSION 1 51 #define inv1024sq 953.674316406e-9 // = 1/1024/1024 52 int MasterHeaderSize = -1; 53 54 bool PRINT_PERF = false; // default print no perf results 55 int irank = -1; // global rank, should never be manually manipulated 56 int mysize = -1; 57 58 // Static variables are bad but used here to store the subcommunicator and associated variables 59 // Prevent MPI_Comm_split to be called more than once, especially on BGQ with the V1R2M1 driver (leak detected in MPI_Comm_split - IBM working on it) 60 static int s_assign_local_comm = 0; 61 static MPI_Comm s_local_comm; 62 static int s_local_size = -1; 63 static int s_local_rank = -1; 64 65 // the following defines are for debug printf 66 #define PHASTAIO_DEBUG 0 //default to not print any debugging info 67 68 void phprintf(const char* fmt, ...) { 69 (void)fmt; 70 #if PHASTAIO_DEBUG 71 char format[1024]; 72 snprintf(format, sizeof(format), "phastaIO debug: %s", fmt); 73 va_list ap; 74 va_start(ap,fmt); 75 vprintf(format,ap); 76 va_end(ap); 77 #endif 78 } 79 80 void phprintf_0(const char* fmt, ...) { 81 (void)fmt; 82 #if PHASTAIO_DEBUG 83 int rank = 0; 84 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 85 if(rank == 0){ 86 char format[1024]; 87 snprintf(format, sizeof(format), "phastaIO debug: irank=0 %s", fmt); 88 va_list ap; 89 va_start(ap,fmt); 90 vprintf(format, ap); 91 va_end(ap); 92 } 93 #endif 94 } 95 96 enum PhastaIO_Errors 97 { 98 MAX_PHASTA_FILES_EXCEEDED = -1, 99 UNABLE_TO_OPEN_FILE = -2, 100 NOT_A_MPI_FILE = -3, 101 GPID_EXCEEDED = -4, 102 DATA_TYPE_ILLEGAL = -5 103 }; 104 105 using namespace std; 106 107 namespace{ 108 109 map<int, std::string> LastHeaderKey; 110 vector< FILE* > fileArray; 111 vector< bool > byte_order; 112 vector< int > header_type; 113 int DataSize=0; 114 bool LastHeaderNotFound = false; 115 bool Wrong_Endian = false ; 116 bool Strict_Error = false ; 117 bool binary_format = true; 118 119 /***********************************************************************/ 120 /***************** NEW PHASTA IO CODE STARTS HERE **********************/ 121 /***********************************************************************/ 122 123 typedef struct 124 { 125 char filename[MAX_PHASTA_FILE_NAME_LENGTH]; /* defafults to 1024 */ 126 unsigned long my_offset; 127 unsigned long next_start_address; 128 unsigned long **my_offset_table; 129 unsigned long **my_read_table; 130 131 double * double_chunk; 132 double * read_double_chunk; 133 134 int field_count; 135 int part_count; 136 int read_field_count; 137 int read_part_count; 138 int GPid; 139 int start_id; 140 141 int mhsize; 142 143 int myrank; 144 int numprocs; 145 int local_myrank; 146 int local_numprocs; 147 148 int nppp; 149 int nPPF; 150 int nFiles; 151 int nFields; 152 153 int * int_chunk; 154 int * read_int_chunk; 155 156 int Wrong_Endian; /* default to false */ 157 char * master_header; 158 MPI_File file_handle; 159 MPI_Comm local_comm; 160 } phastaio_file_t; 161 162 typedef struct 163 { 164 int nppf, nfields; 165 char * masterHeader; 166 }serial_file; 167 168 serial_file *SerialFile; 169 phastaio_file_t *PhastaIOActiveFiles[MAX_PHASTA_FILES]; 170 int PhastaIONextActiveIndex = 0; /* indicates next index to allocate */ 171 172 // the caller has the responsibility to delete the returned string 173 // TODO: StringStipper("nbc value? ") returns NULL? 174 char* StringStripper( const char istring[] ) { 175 int length = strlen( istring ); 176 char* dest = (char *)malloc( length + 1 ); 177 strcpy( dest, istring ); 178 dest[ length ] = '\0'; 179 if ( char* p = strpbrk( dest, " ") ) 180 *p = '\0'; 181 return dest; 182 } 183 184 inline int cscompare( const char teststring[], const char targetstring[] ) { 185 char* s1 = const_cast<char*>(teststring); 186 char* s2 = const_cast<char*>(targetstring); 187 188 while( *s1 == ' ') s1++; 189 while( *s2 == ' ') s2++; 190 while( ( *s1 ) 191 && ( *s2 ) 192 && ( *s2 != '?') 193 && ( tolower( *s1 )==tolower( *s2 ) ) ) { 194 s1++; 195 s2++; 196 while( *s1 == ' ') s1++; 197 while( *s2 == ' ') s2++; 198 } 199 if ( !( *s1 ) || ( *s1 == '?') ) return 1; 200 else return 0; 201 } 202 203 inline void isBinary( const char iotype[] ) { 204 char* fname = StringStripper( iotype ); 205 if ( cscompare( fname, "binary" ) ) binary_format = true; 206 else binary_format = false; 207 free (fname); 208 209 } 210 211 inline size_t typeSize( const char typestring[] ) { 212 char* ts1 = StringStripper( typestring ); 213 if ( cscompare( "integer", ts1 ) ) { 214 free (ts1); 215 return sizeof(int); 216 } else if ( cscompare( "double", ts1 ) ) { 217 free (ts1); 218 return sizeof( double ); 219 } else { 220 free (ts1); 221 fprintf(stderr,"unknown type : %s\n",ts1); 222 return 0; 223 } 224 } 225 226 int readHeader( 227 FILE* fileObject, 228 const char phrase[], 229 int* params, 230 int expect ) { 231 char* text_header; 232 char* token; 233 char Line[1024] = "\0"; 234 char junk; 235 bool FOUND = false ; 236 int real_length; 237 int skip_size, integer_value; 238 int rewind_count=0; 239 240 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 241 rewind( fileObject ); 242 clearerr( fileObject ); 243 rewind_count++; 244 fgets( Line, 1024, fileObject ); 245 } 246 247 while( !FOUND && ( rewind_count < 2 ) ) { 248 if ( ( Line[0] != '\n' ) && ( real_length = strcspn( Line, "#" )) ) { 249 text_header = (char *)malloc( real_length + 1 ); 250 strncpy( text_header, Line, real_length ); 251 text_header[ real_length ] =static_cast<char>(NULL); 252 token = strtok ( text_header, ":" ); 253 assert(token); 254 if( cscompare( phrase , token ) ) { 255 FOUND = true ; 256 token = strtok( NULL, " ,;<>" ); 257 assert(token); 258 skip_size = atoi( token ); 259 int i; 260 for( i=0; i < expect && ( token = strtok( NULL," ,;<>") ); i++) { 261 assert(token); 262 params[i] = atoi( token ); 263 } 264 if ( i < expect ) { 265 fprintf(stderr, 266 "Aloha Expected # of ints not found for: %s\n",phrase ); 267 } 268 } else if ( cscompare(token,"byteorder magic number") ) { 269 if ( binary_format ) { 270 fread((void*)&integer_value,sizeof(int),1,fileObject); 271 fread( &junk, sizeof(char), 1 , fileObject ); 272 if ( 362436 != integer_value ) Wrong_Endian = true; 273 } else{ 274 fscanf(fileObject, "%d\n", &integer_value ); 275 } 276 } else { 277 /* some other header, so just skip over */ 278 token = strtok( NULL, " ,;<>" ); 279 assert(token); 280 skip_size = atoi( token ); 281 if ( binary_format) 282 fseek( fileObject, skip_size, SEEK_CUR ); 283 else 284 for( int gama=0; gama < skip_size; gama++ ) 285 fgets( Line, 1024, fileObject ); 286 } 287 free (text_header); 288 } // end of if before while loop 289 290 if ( !FOUND ) 291 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 292 rewind( fileObject ); 293 clearerr( fileObject ); 294 rewind_count++; 295 fgets( Line, 1024, fileObject ); 296 } 297 } 298 299 if ( !FOUND ) { 300 //fprintf(stderr, "Error: Could not find: %s\n", phrase); 301 if(irank == 0) printf("WARNING: Could not find: %s\n", phrase); 302 return 1; 303 } 304 return 0; 305 } 306 307 } // end unnamed namespace 308 309 310 // begin of publicly visible functions 311 312 /** 313 * This function takes a long long pointer and assign (start) phiotmrc value to it 314 */ 315 void startTimer(double* start) { 316 if( !PRINT_PERF ) return; 317 MPI_Barrier(MPI_COMM_WORLD); 318 *start = phiotmrc(); 319 } 320 321 /** 322 * This function takes a long long pointer and assign (end) phiotmrc value to it 323 */ 324 void endTimer(double* end) { 325 if( !PRINT_PERF ) return; 326 *end = phiotmrc(); 327 MPI_Barrier(MPI_COMM_WORLD); 328 } 329 330 /** 331 * choose to print some performance results (or not) according to 332 * the PRINT_PERF macro 333 */ 334 void printPerf( 335 const char* func_name, 336 double start, 337 double end, 338 unsigned long datasize, 339 int printdatainfo, 340 const char* extra_msg) { 341 if( !PRINT_PERF ) return; 342 unsigned long data_size = datasize; 343 double time = end - start; 344 unsigned long isizemin,isizemax,isizetot; 345 double sizemin,sizemax,sizeavg,sizetot,rate; 346 double tmin, tmax, tavg, ttot; 347 348 MPI_Allreduce(&time, &tmin,1, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD); 349 MPI_Allreduce(&time, &tmax,1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); 350 MPI_Allreduce(&time, &ttot,1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); 351 tavg = ttot/mysize; 352 353 if(irank == 0) { 354 if ( PhastaIONextActiveIndex == 0 ) printf("** 1PFPP "); 355 else printf("** syncIO "); 356 printf("%s(): Tmax = %f sec, Tmin = %f sec, Tavg = %f sec", func_name, tmax, tmin, tavg); 357 } 358 359 if(printdatainfo == 1) { // if printdatainfo ==1, compute I/O rate and block size 360 MPI_Allreduce(&data_size,&isizemin,1,MPI_LONG_LONG_INT,MPI_MIN,MPI_COMM_WORLD); 361 MPI_Allreduce(&data_size,&isizemax,1,MPI_LONG_LONG_INT,MPI_MAX,MPI_COMM_WORLD); 362 MPI_Allreduce(&data_size,&isizetot,1,MPI_LONG_LONG_INT,MPI_SUM,MPI_COMM_WORLD); 363 364 sizemin=(double)(isizemin*inv1024sq); 365 sizemax=(double)(isizemax*inv1024sq); 366 sizetot=(double)(isizetot*inv1024sq); 367 sizeavg=(double)(1.0*sizetot/mysize); 368 rate=(double)(1.0*sizetot/tmax); 369 370 if( irank == 0) { 371 printf(", Rate = %f MB/s [%s] \n \t\t\t" 372 " block size: Min= %f MB; Max= %f MB; Avg= %f MB; Tot= %f MB\n", 373 rate, extra_msg, sizemin,sizemax,sizeavg,sizetot); 374 } 375 } 376 else { 377 if(irank == 0) { 378 printf(" \n"); 379 //printf(" (%s) \n", extra_msg); 380 } 381 } 382 } 383 384 /** 385 * This function is normally called at the beginning of a read operation, before 386 * init function. 387 * This function (uses rank 0) reads out nfields, nppf, master header size, 388 * endianess and allocates for masterHeader string. 389 * These values are essential for following read operations. Rank 0 will bcast 390 * these values to other ranks in the commm world 391 * 392 * If the file set is of old POSIX format, it would throw error and exit 393 */ 394 void queryphmpiio(const char filename[],int *nfields, int *nppf) 395 { 396 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 397 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 398 399 if(irank == 0) { 400 FILE * fileHandle; 401 char* fname = StringStripper( filename ); 402 403 PHASTAIO_OPENTIME(fileHandle = fopen (fname,"rb");) 404 if (fileHandle == NULL ) { 405 printf("\nError: File %s doesn't exist! Please check!\n",fname); 406 } 407 else { 408 SerialFile =(serial_file *)calloc( 1, sizeof( serial_file) ); 409 int meta_size_limit = VERSION_INFO_HEADER_SIZE; 410 SerialFile->masterHeader = (char *)malloc( meta_size_limit ); 411 fread(SerialFile->masterHeader, 1, meta_size_limit, fileHandle); 412 413 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 414 char version[MAX_FIELDS_NAME_LENGTH/4]; 415 int mhsize; 416 char * token; 417 int magic_number; 418 419 memcpy( read_out_tag, 420 SerialFile->masterHeader, 421 MAX_FIELDS_NAME_LENGTH-1 ); 422 423 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) { 424 // Test endianess ... 425 memcpy (&magic_number, 426 SerialFile->masterHeader + sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 427 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 428 429 if ( magic_number != ENDIAN_TEST_NUMBER ) { 430 printf("Endian is different!\n"); 431 // Will do swap later 432 } 433 434 // test version, old version, default masterheader size is 4M 435 // newer version masterheader size is read from first line 436 memcpy(version, 437 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/2, 438 MAX_FIELDS_NAME_LENGTH/4 - 1); //TODO: why -1? 439 440 if( cscompare ("version",version) ) { 441 // if there is "version" tag in the file, then it is newer format 442 // read master header size from here, otherwise use default 443 // Note: if version is "1", we know mhsize is at 3/4 place... 444 445 token = strtok(version, ":"); 446 token = strtok(NULL, " ,;<>" ); 447 int iversion = atoi(token); 448 449 if( iversion == 1) { 450 memcpy( &mhsize, 451 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/4*3 + sizeof("mhsize : ")-1, 452 sizeof(int)); 453 if ( magic_number != ENDIAN_TEST_NUMBER ) 454 SwapArrayByteOrder(&mhsize, sizeof(int), 1); 455 456 if( mhsize > DefaultMHSize ) { 457 //if actual headersize is larger than default, let's re-read 458 free(SerialFile->masterHeader); 459 SerialFile->masterHeader = (char *)malloc(mhsize); 460 fseek(fileHandle, 0, SEEK_SET); // reset the file stream position 461 fread(SerialFile->masterHeader,1,mhsize,fileHandle); 462 } 463 } 464 //TODO: check if this is a valid int?? 465 MasterHeaderSize = mhsize; 466 } 467 else { // else it's version 0's format w/o version tag, implicating MHSize=4M 468 MasterHeaderSize = DefaultMHSize; 469 } 470 471 memcpy( read_out_tag, 472 SerialFile->masterHeader+MAX_FIELDS_NAME_LENGTH+1, 473 MAX_FIELDS_NAME_LENGTH ); //TODO: why +1 474 475 // Read in # fields ... 476 token = strtok( read_out_tag, ":" ); 477 token = strtok( NULL," ,;<>" ); 478 *nfields = atoi( token ); 479 if ( *nfields > MAX_FIELDS_NUMBER) { 480 printf("Error queryphmpiio: nfields is larger than MAX_FIELDS_NUMBER!\n"); 481 } 482 SerialFile->nfields=*nfields; //TODO: sanity check of this int? 483 484 memcpy( read_out_tag, 485 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH * 2 486 + *nfields * MAX_FIELDS_NAME_LENGTH, 487 MAX_FIELDS_NAME_LENGTH); 488 489 token = strtok( read_out_tag, ":" ); 490 token = strtok( NULL," ,;<>" ); 491 *nppf = atoi( token ); 492 SerialFile->nppf=*nppf; //TODO: sanity check of int 493 } // end of if("MPI_IO_TAG") 494 else { 495 printf("Error queryphmpiio: The file you opened is not of syncIO new format, please check! read_out_tag = %s\n",read_out_tag); 496 exit(1); 497 } 498 PHASTAIO_CLOSETIME(fclose(fileHandle);) 499 free(SerialFile->masterHeader); 500 free(SerialFile); 501 } //end of else 502 free(fname); 503 } 504 505 // Bcast value to every one 506 MPI_Bcast( nfields, 1, MPI_INT, 0, MPI_COMM_WORLD ); 507 MPI_Bcast( nppf, 1, MPI_INT, 0, MPI_COMM_WORLD ); 508 MPI_Bcast( &MasterHeaderSize, 1, MPI_INT, 0, MPI_COMM_WORLD ); 509 phprintf("Info queryphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 510 } 511 512 /** 513 * This function computes the right master header size (round to size of 2^n). 514 * This is only needed for file format version 1 in "write" mode. 515 */ 516 int computeMHSize(int nfields, int nppf, int version) { 517 int mhsize=0; 518 if(version == 1) { 519 //int meta_info_size = (2+nfields+1) * MAX_FIELDS_NAME_LENGTH; // 2 is MPI_IO_TAG and nFields, the others 1 is nppf 520 int meta_info_size = VERSION_INFO_HEADER_SIZE; 521 int actual_size = nfields * nppf * sizeof(unsigned long) + meta_info_size; 522 //printf("actual_size = %d, offset table size = %d\n", actual_size, nfields * nppf * sizeof(long long)); 523 if (actual_size > DefaultMHSize) { 524 mhsize = (int) ceil( (double) actual_size/DefaultMHSize); // it's rounded to ceiling of this value 525 mhsize *= DefaultMHSize; 526 } 527 else { 528 mhsize = DefaultMHSize; 529 } 530 } else { 531 int rank = 0; 532 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 533 if(!rank) { 534 fprintf(stderr, 535 "ERROR invalid version passed to %s... exiting\n", __func__); 536 exit(EXIT_FAILURE); 537 } 538 } 539 return mhsize; 540 } 541 542 /** 543 * Computes correct color of a rank according to number of files. 544 */ 545 extern "C" int computeColor( int myrank, int numprocs, int nfiles) { 546 int color = 547 (int)(myrank / (numprocs / nfiles)); 548 return color; 549 } 550 551 552 /** 553 * Check the file descriptor. 554 */ 555 void checkFileDescriptor(const char fctname[], int* fileDescriptor ) { 556 if ( *fileDescriptor < 0 ) { 557 printf("Error: File descriptor = %d in %s\n",*fileDescriptor,fctname); 558 exit(1); 559 } 560 } 561 562 /** 563 * Initialize the file struct members and allocate space for file struct 564 * buffers. 565 * 566 * Note: this function is only called when we are using new format. Old POSIX 567 * format should skip this routine and call openfile() directly instead. 568 */ 569 int initphmpiio( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[]) 570 { 571 // we init irank again in case query not called (e.g. syncIO write case) 572 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 573 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 574 575 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d, nfields %d, nppf %d, nfiles %d\n", irank, MasterHeaderSize, *nfields, *nppf, *nfiles); 576 577 double timer_start, timer_end; 578 startTimer(&timer_start); 579 580 char* imode = StringStripper( mode ); 581 582 // Note: if it's read, we presume query was called prior to init and 583 // MasterHeaderSize is already set to correct value from parsing header 584 // otherwise it's write then it needs some computation to be set 585 if ( cscompare( "read", imode ) ) { 586 // do nothing 587 } 588 else if( cscompare( "write", imode ) ) { 589 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 590 } 591 else { 592 printf("Error initphmpiio: can't recognize the mode %s", imode); 593 exit(1); 594 } 595 free ( imode ); 596 597 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 598 599 int i, j; 600 601 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 602 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 603 endTimer(&timer_end); 604 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 605 return MAX_PHASTA_FILES_EXCEEDED; 606 } 607 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 608 // { 609 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 610 // { 611 // PhastaIOActiveFiles[i] = NULL; 612 // } 613 // } 614 615 616 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 617 618 i = PhastaIONextActiveIndex; 619 PhastaIONextActiveIndex++; 620 621 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 622 623 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 624 625 PhastaIOActiveFiles[i]->Wrong_Endian = false; 626 627 PhastaIOActiveFiles[i]->nFields = *nfields; 628 PhastaIOActiveFiles[i]->nPPF = *nppf; 629 PhastaIOActiveFiles[i]->nFiles = *nfiles; 630 MPI_Comm_rank(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->myrank)); 631 MPI_Comm_size(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->numprocs)); 632 633 634 if( *nfiles > 1 ) { // split the ranks according to each mpiio file 635 636 if ( s_assign_local_comm == 0) { // call mpi_comm_split for the first (and only) time 637 638 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Building subcommunicator\n"); 639 640 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 641 MPI_Comm_split(MPI_COMM_WORLD, 642 color, 643 PhastaIOActiveFiles[i]->myrank, 644 &(PhastaIOActiveFiles[i]->local_comm)); 645 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 646 &(PhastaIOActiveFiles[i]->local_numprocs)); 647 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 648 &(PhastaIOActiveFiles[i]->local_myrank)); 649 650 // back up now these variables so that we do not need to call comm_split again 651 s_local_comm = PhastaIOActiveFiles[i]->local_comm; 652 s_local_size = PhastaIOActiveFiles[i]->local_numprocs; 653 s_local_rank = PhastaIOActiveFiles[i]->local_myrank; 654 s_assign_local_comm = 1; 655 } 656 else { // recycle the subcommunicator 657 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Recycling subcommunicator\n"); 658 PhastaIOActiveFiles[i]->local_comm = s_local_comm; 659 PhastaIOActiveFiles[i]->local_numprocs = s_local_size; 660 PhastaIOActiveFiles[i]->local_myrank = s_local_rank; 661 } 662 } 663 else { // *nfiles == 1 here - no need to call mpi_comm_split here 664 665 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Bypassing subcommunicator\n"); 666 PhastaIOActiveFiles[i]->local_comm = MPI_COMM_WORLD; 667 PhastaIOActiveFiles[i]->local_numprocs = PhastaIOActiveFiles[i]->numprocs; 668 PhastaIOActiveFiles[i]->local_myrank = PhastaIOActiveFiles[i]->myrank; 669 670 } 671 672 PhastaIOActiveFiles[i]->nppp = 673 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 674 675 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 676 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 677 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 678 679 PhastaIOActiveFiles[i]->my_offset_table = 680 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 681 682 PhastaIOActiveFiles[i]->my_read_table = 683 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 684 685 for (j=0; j<*nfields; j++) 686 { 687 PhastaIOActiveFiles[i]->my_offset_table[j] = 688 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 689 690 PhastaIOActiveFiles[i]->my_read_table[j] = 691 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 692 } 693 *filehandle = i; 694 695 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 696 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 697 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 698 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 699 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 700 701 // Time monitoring 702 endTimer(&timer_end); 703 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 704 705 phprintf_0("Info initphmpiio: quiting function"); 706 707 return i; 708 } 709 710 /** 711 * Destruct the file struct and free buffers allocated in init function. 712 */ 713 void finalizephmpiio( int *fileDescriptor ) 714 { 715 double timer_start, timer_end; 716 startTimer(&timer_start); 717 718 int i, j; 719 i = *fileDescriptor; 720 //PhastaIONextActiveIndex--; 721 722 /* //free the offset table for this phasta file */ 723 //for(j=0; j<MAX_FIELDS_NUMBER; j++) //Danger: undefined behavior for my_*_table.[j] not allocated or not initialized to NULL 724 for(j=0; j<PhastaIOActiveFiles[i]->nFields; j++) 725 { 726 free( PhastaIOActiveFiles[i]->my_offset_table[j]); 727 free( PhastaIOActiveFiles[i]->my_read_table[j]); 728 } 729 free ( PhastaIOActiveFiles[i]->my_offset_table ); 730 free ( PhastaIOActiveFiles[i]->my_read_table ); 731 free ( PhastaIOActiveFiles[i]->master_header ); 732 free ( PhastaIOActiveFiles[i]->double_chunk ); 733 free ( PhastaIOActiveFiles[i]->int_chunk ); 734 free ( PhastaIOActiveFiles[i]->read_double_chunk ); 735 free ( PhastaIOActiveFiles[i]->read_int_chunk ); 736 737 if( PhastaIOActiveFiles[i]->nFiles > 1 && s_assign_local_comm ) { // the comm was split 738 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Freeing subcommunicator\n"); 739 s_assign_local_comm = 0; 740 MPI_Comm_free(&(PhastaIOActiveFiles[i]->local_comm)); 741 } 742 743 free( PhastaIOActiveFiles[i]); 744 745 endTimer(&timer_end); 746 printPerf("finalizempiio", timer_start, timer_end, 0, 0, ""); 747 748 PhastaIONextActiveIndex--; 749 } 750 751 752 /** 753 * Special init for M2N in order to create a subcommunicator for the reduced solution (requires PRINT_PERF to be false for now) 754 * Initialize the file struct members and allocate space for file struct buffers. 755 * 756 * Note: this function is only called when we are using new format. Old POSIX 757 * format should skip this routine and call openfile() directly instead. 758 */ 759 int initphmpiiosub( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[],MPI_Comm my_local_comm) 760 { 761 // we init irank again in case query not called (e.g. syncIO write case) 762 763 MPI_Comm_rank(my_local_comm, &irank); 764 MPI_Comm_size(my_local_comm, &mysize); 765 766 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 767 768 double timer_start, timer_end; 769 startTimer(&timer_start); 770 771 char* imode = StringStripper( mode ); 772 773 // Note: if it's read, we presume query was called prior to init and 774 // MasterHeaderSize is already set to correct value from parsing header 775 // otherwise it's write then it needs some computation to be set 776 if ( cscompare( "read", imode ) ) { 777 // do nothing 778 } 779 else if( cscompare( "write", imode ) ) { 780 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 781 } 782 else { 783 printf("Error initphmpiio: can't recognize the mode %s", imode); 784 exit(1); 785 } 786 free ( imode ); 787 788 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 789 790 int i, j; 791 792 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 793 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 794 endTimer(&timer_end); 795 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 796 return MAX_PHASTA_FILES_EXCEEDED; 797 } 798 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 799 // { 800 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 801 // { 802 // PhastaIOActiveFiles[i] = NULL; 803 // } 804 // } 805 806 807 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 808 809 i = PhastaIONextActiveIndex; 810 PhastaIONextActiveIndex++; 811 812 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 813 814 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 815 816 PhastaIOActiveFiles[i]->Wrong_Endian = false; 817 818 PhastaIOActiveFiles[i]->nFields = *nfields; 819 PhastaIOActiveFiles[i]->nPPF = *nppf; 820 PhastaIOActiveFiles[i]->nFiles = *nfiles; 821 MPI_Comm_rank(my_local_comm, &(PhastaIOActiveFiles[i]->myrank)); 822 MPI_Comm_size(my_local_comm, &(PhastaIOActiveFiles[i]->numprocs)); 823 824 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 825 MPI_Comm_split(my_local_comm, 826 color, 827 PhastaIOActiveFiles[i]->myrank, 828 &(PhastaIOActiveFiles[i]->local_comm)); 829 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 830 &(PhastaIOActiveFiles[i]->local_numprocs)); 831 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 832 &(PhastaIOActiveFiles[i]->local_myrank)); 833 PhastaIOActiveFiles[i]->nppp = 834 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 835 836 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 837 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 838 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 839 840 PhastaIOActiveFiles[i]->my_offset_table = 841 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 842 843 PhastaIOActiveFiles[i]->my_read_table = 844 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 845 846 for (j=0; j<*nfields; j++) 847 { 848 PhastaIOActiveFiles[i]->my_offset_table[j] = 849 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 850 851 PhastaIOActiveFiles[i]->my_read_table[j] = 852 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 853 } 854 *filehandle = i; 855 856 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 857 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 858 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 859 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 860 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 861 862 // Time monitoring 863 endTimer(&timer_end); 864 printPerf("initphmpiiosub", timer_start, timer_end, 0, 0, ""); 865 866 phprintf_0("Info initphmpiiosub: quiting function"); 867 868 return i; 869 } 870 871 namespace { 872 873 enum { 874 DIR_MODE = S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH 875 }; 876 877 bool my_mkdir(std::string name) { 878 if(name.empty()) 879 return true; 880 errno = 0; 881 int err = mkdir(name.c_str(), DIR_MODE); 882 if ((err == -1) && (errno == EEXIST)) { 883 errno = 0; 884 err = 0; 885 return false; 886 } 887 assert(!err); 888 return true; 889 } 890 891 enum { 892 DIR_FANOUT = 2048 893 }; 894 895 std::string getSubDirPrefix() { 896 if (phio_peers() <= DIR_FANOUT) 897 return string(""); 898 int self = phio_self(); 899 int subSelf = self % DIR_FANOUT; 900 int subGroup = self / DIR_FANOUT; 901 std::stringstream ss; 902 ss << subGroup << '/'; 903 return ss.str(); 904 } 905 } 906 907 /** open file for both POSIX and MPI-IO syncIO format. 908 * 909 * If it's old POSIX format, simply call posix fopen(). 910 * 911 * If it's MPI-IO foramt: 912 * in "read" mode, it builds the header table that points to the offset of 913 * fields for parts; 914 * in "write" mode, it opens the file with MPI-IO open routine. 915 */ 916 void openfile(const char filename[], const char mode[], int* fileDescriptor ) 917 { 918 phprintf_0("Info: entering openfile"); 919 920 double timer_start, timer_end; 921 startTimer(&timer_start); 922 923 if ( PhastaIONextActiveIndex == 0 ) 924 { 925 FILE* file=NULL ; 926 *fileDescriptor = 0; 927 char* fname = StringStripper( filename ); 928 char* imode = StringStripper( mode ); 929 930 std::string posixname = getSubDirPrefix(); 931 if (!phio_self()) 932 my_mkdir(posixname); 933 phio_barrier(); 934 posixname += string(fname); 935 if ( cscompare( "read", imode ) ) 936 PHASTAIO_OPENTIME(file = fopen(posixname.c_str(), "rb" );) 937 else if( cscompare( "write", imode ) ) 938 PHASTAIO_OPENTIME(file = fopen(posixname.c_str(), "wb" );) 939 else if( cscompare( "append", imode ) ) 940 PHASTAIO_OPENTIME(file = fopen(posixname.c_str(), "ab" );) 941 942 if ( !file ){ 943 fprintf(stderr,"Error openfile: unable to open file %s\n",fname); 944 } else { 945 fileArray.push_back( file ); 946 byte_order.push_back( false ); 947 header_type.push_back( sizeof(int) ); 948 *fileDescriptor = fileArray.size(); 949 } 950 free (fname); 951 free (imode); 952 } 953 else // else it would be parallel I/O, opposed to posix io 954 { 955 char* fname = StringStripper( filename ); 956 char* imode = StringStripper( mode ); 957 int rc; 958 int i = *fileDescriptor; 959 checkFileDescriptor("openfile",&i); 960 char* token; 961 962 if ( cscompare( "read", imode ) ) 963 { 964 // if (PhastaIOActiveFiles[i]->myrank == 0) 965 // printf("\n **********\nRead open ... ... regular version\n"); 966 967 PHASTAIO_OPENTIME( 968 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 969 fname, 970 MPI_MODE_RDONLY, 971 MPI_INFO_NULL, 972 &(PhastaIOActiveFiles[i]->file_handle) ); 973 ) 974 975 if(rc) 976 { 977 *fileDescriptor = UNABLE_TO_OPEN_FILE; 978 int error_string_length; 979 char error_string[4096]; 980 MPI_Error_string(rc, error_string, &error_string_length); 981 fprintf(stderr, "Error openfile: Unable to open file %s! MPI reports \"%s\"\n",fname,error_string); 982 endTimer(&timer_end); 983 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 984 return; 985 } 986 987 MPI_Status read_tag_status; 988 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 989 int j; 990 int magic_number; 991 992 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 993 MPI_File_read_at( PhastaIOActiveFiles[i]->file_handle, 994 0, 995 PhastaIOActiveFiles[i]->master_header, 996 MasterHeaderSize, 997 MPI_CHAR, 998 &read_tag_status ); 999 } 1000 1001 MPI_Bcast( PhastaIOActiveFiles[i]->master_header, 1002 MasterHeaderSize, 1003 MPI_CHAR, 1004 0, 1005 PhastaIOActiveFiles[i]->local_comm ); 1006 1007 memcpy( read_out_tag, 1008 PhastaIOActiveFiles[i]->master_header, 1009 MAX_FIELDS_NAME_LENGTH-1 ); 1010 1011 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) 1012 { 1013 // Test endianess ... 1014 memcpy ( &magic_number, 1015 PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1016 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 1017 1018 if ( magic_number != ENDIAN_TEST_NUMBER ) 1019 { 1020 PhastaIOActiveFiles[i]->Wrong_Endian = true; 1021 } 1022 1023 memcpy( read_out_tag, 1024 PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH+1, // TODO: WHY +1??? 1025 MAX_FIELDS_NAME_LENGTH ); 1026 1027 // Read in # fields ... 1028 token = strtok ( read_out_tag, ":" ); 1029 token = strtok( NULL," ,;<>" ); 1030 PhastaIOActiveFiles[i]->nFields = atoi( token ); 1031 1032 unsigned long **header_table; 1033 header_table = ( unsigned long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long *)); 1034 1035 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1036 { 1037 header_table[j]=( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long)); 1038 } 1039 1040 // Read in the offset table ... 1041 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1042 { 1043 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1044 memcpy( header_table[j], 1045 PhastaIOActiveFiles[i]->master_header + 1046 VERSION_INFO_HEADER_SIZE + 1047 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long), 1048 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long) ); 1049 } 1050 1051 MPI_Scatter( header_table[j], 1052 PhastaIOActiveFiles[i]->nppp, 1053 MPI_LONG_LONG_INT, 1054 PhastaIOActiveFiles[i]->my_read_table[j], 1055 PhastaIOActiveFiles[i]->nppp, 1056 MPI_LONG_LONG_INT, 1057 0, 1058 PhastaIOActiveFiles[i]->local_comm ); 1059 1060 // Swap byte order if endianess is different ... 1061 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) { 1062 SwapArrayByteOrder( PhastaIOActiveFiles[i]->my_read_table[j], 1063 sizeof(unsigned long), 1064 PhastaIOActiveFiles[i]->nppp ); 1065 } 1066 } 1067 1068 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1069 free ( header_table[j] ); 1070 } 1071 free (header_table); 1072 1073 } // end of if MPI_IO_TAG 1074 else //else not valid MPI file 1075 { 1076 *fileDescriptor = NOT_A_MPI_FILE; 1077 printf("Error openfile: The file %s you opened is not in syncIO new format, please check again! File descriptor = %d, MasterHeaderSize = %d, read_out_tag = %s\n",fname,*fileDescriptor,MasterHeaderSize,read_out_tag); 1078 //Printing MasterHeaderSize is useful to test a compiler bug on Intrepid BGP 1079 endTimer(&timer_end); 1080 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1081 return; 1082 } 1083 } // end of if "read" 1084 else if( cscompare( "write", imode ) ) 1085 { 1086 PHASTAIO_OPENTIME( 1087 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1088 fname, 1089 MPI_MODE_WRONLY | MPI_MODE_CREATE, 1090 MPI_INFO_NULL, 1091 &(PhastaIOActiveFiles[i]->file_handle) ); 1092 ) 1093 if(rc != MPI_SUCCESS) 1094 { 1095 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1096 int error_string_length; 1097 char error_string[4096]; 1098 MPI_Error_string(rc, error_string, &error_string_length); 1099 fprintf(stderr, "Error openfile: Unable to open file %s! MPI reports \"%s\"\n",fname,error_string); 1100 return; 1101 } 1102 } // end of if "write" 1103 free (fname); 1104 free (imode); 1105 } // end of if FileIndex != 0 1106 1107 endTimer(&timer_end); 1108 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1109 } 1110 1111 /** close file for both POSIX and MPI-IO syncIO format. 1112 * 1113 * If it's old POSIX format, simply call posix fclose(). 1114 * 1115 * If it's MPI-IO foramt: 1116 * in "read" mode, it simply close file with MPI-IO close routine. 1117 * in "write" mode, rank 0 in each group will re-assemble the master header and 1118 * offset table and write to the beginning of file, then close the file. 1119 */ 1120 void closefile( int* fileDescriptor, const char mode[] ) 1121 { 1122 double timer_start, timer_end; 1123 startTimer(&timer_start); 1124 1125 int i = *fileDescriptor; 1126 checkFileDescriptor("closefile",&i); 1127 1128 if ( PhastaIONextActiveIndex == 0 ) { 1129 char* imode = StringStripper( mode ); 1130 1131 if( cscompare( "write", imode ) 1132 || cscompare( "append", imode ) ) { 1133 fflush( fileArray[ *fileDescriptor - 1 ] ); 1134 } 1135 1136 PHASTAIO_CLOSETIME(fclose( fileArray[ *fileDescriptor - 1 ] );) 1137 free (imode); 1138 } 1139 else { 1140 char* imode = StringStripper( mode ); 1141 1142 //write master header here: 1143 if ( cscompare( "write", imode ) ) { 1144 // if ( PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields < 2*ONE_MEGABYTE/8 ) //SHOULD BE CHECKED 1145 // MasterHeaderSize = 4*ONE_MEGABYTE; 1146 // else 1147 // MasterHeaderSize = 4*ONE_MEGABYTE + PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields * 8 - 2*ONE_MEGABYTE; 1148 1149 MasterHeaderSize = computeMHSize( PhastaIOActiveFiles[i]->nFields, PhastaIOActiveFiles[i]->nPPF, LATEST_WRITE_VERSION); 1150 phprintf_0("Info closefile: myrank = %d, MasterHeaderSize = %d\n", PhastaIOActiveFiles[i]->myrank, MasterHeaderSize); 1151 1152 MPI_Status write_header_status; 1153 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1154 char version[MAX_FIELDS_NAME_LENGTH/4]; 1155 char mhsize[MAX_FIELDS_NAME_LENGTH/4]; 1156 int magic_number = ENDIAN_TEST_NUMBER; 1157 1158 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) 1159 { 1160 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1161 sprintf(mpi_tag, "MPI_IO_Tag : "); 1162 memcpy(PhastaIOActiveFiles[i]->master_header, 1163 mpi_tag, 1164 MAX_FIELDS_NAME_LENGTH); 1165 1166 bzero((void*)version,MAX_FIELDS_NAME_LENGTH/4); 1167 // this version is "1", print version in ASCII 1168 sprintf(version, "version : %d",1); 1169 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/2, 1170 version, 1171 MAX_FIELDS_NAME_LENGTH/4); 1172 1173 // master header size is computed using the formula above 1174 bzero((void*)mhsize,MAX_FIELDS_NAME_LENGTH/4); 1175 sprintf(mhsize, "mhsize : "); 1176 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/4*3, 1177 mhsize, 1178 MAX_FIELDS_NAME_LENGTH/4); 1179 1180 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1181 sprintf(mpi_tag, 1182 "\nnFields : %d\n", 1183 PhastaIOActiveFiles[i]->nFields); 1184 memcpy(PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH, 1185 mpi_tag, 1186 MAX_FIELDS_NAME_LENGTH); 1187 1188 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1189 sprintf(mpi_tag, "\nnPPF : %d\n", PhastaIOActiveFiles[i]->nPPF); 1190 memcpy( PhastaIOActiveFiles[i]->master_header+ 1191 PhastaIOActiveFiles[i]->nFields * 1192 MAX_FIELDS_NAME_LENGTH + 1193 MAX_FIELDS_NAME_LENGTH * 2, 1194 mpi_tag, 1195 MAX_FIELDS_NAME_LENGTH); 1196 1197 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1198 &magic_number, 1199 sizeof(int)); 1200 1201 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("mhsize : ") -1 + MAX_FIELDS_NAME_LENGTH/4*3, 1202 &MasterHeaderSize, 1203 sizeof(int)); 1204 } 1205 1206 int j = 0; 1207 unsigned long **header_table; 1208 header_table = ( unsigned long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long *)); 1209 1210 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1211 header_table[j]=( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long)); 1212 } 1213 1214 //if( irank == 0 ) printf("gonna mpi_gather, myrank = %d\n", irank); 1215 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1216 MPI_Gather( PhastaIOActiveFiles[i]->my_offset_table[j], 1217 PhastaIOActiveFiles[i]->nppp, 1218 MPI_LONG_LONG_INT, 1219 header_table[j], 1220 PhastaIOActiveFiles[i]->nppp, 1221 MPI_LONG_LONG_INT, 1222 0, 1223 PhastaIOActiveFiles[i]->local_comm ); 1224 } 1225 1226 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1227 1228 //if( irank == 0 ) printf("gonna memcpy for every procs, myrank = %d\n", irank); 1229 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1230 memcpy ( PhastaIOActiveFiles[i]->master_header + 1231 VERSION_INFO_HEADER_SIZE + 1232 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long), 1233 header_table[j], 1234 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long) ); 1235 } 1236 1237 //if( irank == 0 ) printf("gonna file_write_at(), myrank = %d\n", irank); 1238 MPI_File_write_at( PhastaIOActiveFiles[i]->file_handle, 1239 0, 1240 PhastaIOActiveFiles[i]->master_header, 1241 MasterHeaderSize, 1242 MPI_CHAR, 1243 &write_header_status ); 1244 } 1245 1246 ////free(PhastaIOActiveFiles[i]->master_header); 1247 1248 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1249 free ( header_table[j] ); 1250 } 1251 free (header_table); 1252 } 1253 1254 //if( irank == 0 ) printf("gonna file_close(), myrank = %d\n", irank); 1255 PHASTAIO_CLOSETIME( 1256 MPI_File_close( &( PhastaIOActiveFiles[i]->file_handle ) ); 1257 ) 1258 free ( imode ); 1259 } 1260 1261 endTimer(&timer_end); 1262 printPerf("closefile_", timer_start, timer_end, 0, 0, ""); 1263 } 1264 1265 int readHeader( FILE* f, const char phrase[], 1266 int* params, int numParams, const char* iotype) { 1267 isBinary(iotype); 1268 return readHeader(f,phrase,params,numParams); 1269 } 1270 1271 void readheader( 1272 int* fileDescriptor, 1273 const char keyphrase[], 1274 void* valueArray, 1275 int* nItems, 1276 const char datatype[], 1277 const char iotype[] ) 1278 { 1279 double timer_start, timer_end; 1280 1281 startTimer(&timer_start); 1282 1283 int i = *fileDescriptor; 1284 checkFileDescriptor("readheader",&i); 1285 1286 if ( PhastaIONextActiveIndex == 0 ) { 1287 int filePtr = *fileDescriptor - 1; 1288 FILE* fileObject; 1289 int* valueListInt; 1290 1291 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1292 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1293 fprintf(stderr,"openfile_ function has to be called before \n") ; 1294 fprintf(stderr,"acessing the file\n ") ; 1295 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1296 endTimer(&timer_end); 1297 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1298 return; 1299 } 1300 1301 LastHeaderKey[filePtr] = std::string(keyphrase); 1302 LastHeaderNotFound = false; 1303 1304 fileObject = fileArray[ filePtr ] ; 1305 Wrong_Endian = byte_order[ filePtr ]; 1306 1307 isBinary( iotype ); 1308 typeSize( datatype ); //redundant call, just avoid a compiler warning. 1309 1310 // right now we are making the assumption that we will only write integers 1311 // on the header line. 1312 1313 valueListInt = static_cast< int* >( valueArray ); 1314 int ierr = readHeader( fileObject , 1315 keyphrase, 1316 valueListInt, 1317 *nItems ) ; 1318 1319 byte_order[ filePtr ] = Wrong_Endian ; 1320 1321 if ( ierr ) LastHeaderNotFound = true; 1322 1323 //return ; // don't return, go to the end to print perf 1324 } 1325 else { 1326 int* valueListInt; 1327 valueListInt = static_cast <int*>(valueArray); 1328 char* token = NULL; 1329 bool FOUND = false ; 1330 isBinary( iotype ); 1331 1332 MPI_Status read_offset_status; 1333 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1334 memset(read_out_tag, '\0', MAX_FIELDS_NAME_LENGTH); 1335 char readouttag[MAX_FIELDS_NUMBER][MAX_FIELDS_NAME_LENGTH]; 1336 int j; 1337 1338 int string_length = strlen( keyphrase ); 1339 char* buffer = (char*) malloc ( string_length+1 ); 1340 1341 strcpy ( buffer, keyphrase ); 1342 buffer[ string_length ] = '\0'; 1343 1344 char* st2 = strtok ( buffer, "@" ); 1345 st2 = strtok (NULL, "@"); 1346 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1347 if ( char* p = strpbrk(buffer, "@") ) 1348 *p = '\0'; 1349 1350 // Check if the user has input the right GPid 1351 if ( ( PhastaIOActiveFiles[i]->GPid <= 1352 PhastaIOActiveFiles[i]->myrank * 1353 PhastaIOActiveFiles[i]->nppp )|| 1354 ( PhastaIOActiveFiles[i]->GPid > 1355 ( PhastaIOActiveFiles[i]->myrank + 1 ) * 1356 PhastaIOActiveFiles[i]->nppp ) ) 1357 { 1358 *fileDescriptor = NOT_A_MPI_FILE; 1359 printf("Error readheader: The file is not in syncIO new format, please check! myrank = %d, GPid = %d, nppp = %d, keyphrase = %s\n", PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->GPid, PhastaIOActiveFiles[i]->nppp, keyphrase); 1360 // It is possible atoi could not generate a clear integer from st2 because of additional garbage character in keyphrase 1361 endTimer(&timer_end); 1362 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1363 return; 1364 } 1365 1366 // Find the field we want ... 1367 //for ( j = 0; j<MAX_FIELDS_NUMBER; j++ ) 1368 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1369 { 1370 memcpy( readouttag[j], 1371 PhastaIOActiveFiles[i]->master_header + j*MAX_FIELDS_NAME_LENGTH+MAX_FIELDS_NAME_LENGTH*2+1, 1372 MAX_FIELDS_NAME_LENGTH-1 ); 1373 } 1374 1375 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1376 { 1377 token = strtok ( readouttag[j], ":" ); 1378 1379 //if ( cscompare( buffer, token ) ) 1380 if ( cscompare( token , buffer ) && cscompare( buffer, token ) ) 1381 // This double comparison is required for the field "number of nodes" and all the other fields that start with "number of nodes" (i.g. number of nodes in the mesh"). 1382 // Would be safer to rename "number of nodes" by "number of nodes in the part" so that the name are completely unique. But much more work to do that (Nspre, phParAdapt, etc). 1383 // Since the field name are unique in SyncIO (as it includes part ID), this should be safe and there should be no issue with the "?" trailing character. 1384 { 1385 PhastaIOActiveFiles[i]->read_field_count = j; 1386 FOUND = true; 1387 //printf("buffer: %s | token: %s | j: %d\n",buffer,token,j); 1388 break; 1389 } 1390 } 1391 free(buffer); 1392 1393 if (!FOUND) 1394 { 1395 //if(irank==0) printf("Warning readheader: Not found %s \n",keyphrase); //PhastaIOActiveFiles[i]->myrank is certainly initialized here. 1396 if(PhastaIOActiveFiles[i]->myrank == 0) printf("WARNING readheader: Not found %s\n",keyphrase); 1397 endTimer(&timer_end); 1398 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1399 return; 1400 } 1401 1402 // Find the part we want ... 1403 PhastaIOActiveFiles[i]->read_part_count = PhastaIOActiveFiles[i]->GPid - 1404 PhastaIOActiveFiles[i]->myrank * PhastaIOActiveFiles[i]->nppp - 1; 1405 1406 PhastaIOActiveFiles[i]->my_offset = 1407 PhastaIOActiveFiles[i]->my_read_table[PhastaIOActiveFiles[i]->read_field_count][PhastaIOActiveFiles[i]->read_part_count]; 1408 1409 // printf("****Rank %d offset is %d\n",PhastaIOActiveFiles[i]->myrank,PhastaIOActiveFiles[i]->my_offset); 1410 1411 // Read each datablock header here ... 1412 1413 MPI_File_read_at_all( PhastaIOActiveFiles[i]->file_handle, 1414 PhastaIOActiveFiles[i]->my_offset+1, 1415 read_out_tag, 1416 MAX_FIELDS_NAME_LENGTH-1, 1417 MPI_CHAR, 1418 &read_offset_status ); 1419 token = strtok ( read_out_tag, ":" ); 1420 1421 // printf("&&&&Rank %d read_out_tag is %s\n",PhastaIOActiveFiles[i]->myrank,read_out_tag); 1422 1423 if( cscompare( keyphrase , token ) ) //No need to compare also token with keyphrase like above. We should already have the right one. Otherwise there is a problem. 1424 { 1425 FOUND = true ; 1426 token = strtok( NULL, " ,;<>" ); 1427 for( j=0; j < *nItems && ( token = strtok( NULL," ,;<>") ); j++ ) 1428 valueListInt[j] = atoi( token ); 1429 1430 if ( j < *nItems ) 1431 { 1432 fprintf( stderr, "Expected # of ints not found for: %s\n", keyphrase ); 1433 } 1434 } 1435 else { 1436 //if(irank==0) 1437 if(PhastaIOActiveFiles[i]->myrank == 0) 1438 // If we enter this if, there is a problem with the name of some fields 1439 { 1440 printf("Error readheader: Unexpected mismatch between keyphrase = %s and token = %s\n",keyphrase,token); 1441 } 1442 } 1443 } 1444 1445 endTimer(&timer_end); 1446 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1447 1448 } 1449 1450 void readDataBlock( 1451 FILE* fileObject, 1452 void* valueArray, 1453 int nItems, 1454 const char datatype[], 1455 const char iotype[] ) 1456 { 1457 isBinary(iotype); 1458 size_t type_size = typeSize( datatype ); 1459 phastaioTime t0,t1; 1460 phastaio_time(&t0); 1461 if ( binary_format ) { 1462 char junk = '\0'; 1463 fread( valueArray, type_size, nItems, fileObject ); 1464 fread( &junk, sizeof(char), 1 , fileObject ); 1465 if ( Wrong_Endian ) SwapArrayByteOrder( valueArray, type_size, nItems ); 1466 } else { 1467 char* ts1 = StringStripper( datatype ); 1468 if ( cscompare( "integer", ts1 ) ) { 1469 for( int n=0; n < nItems ; n++ ) 1470 fscanf(fileObject, "%d\n",(int*)((int*)valueArray+n) ); 1471 } else if ( cscompare( "double", ts1 ) ) { 1472 for( int n=0; n < nItems ; n++ ) 1473 fscanf(fileObject, "%lf\n",(double*)((double*)valueArray+n) ); 1474 } 1475 free (ts1); 1476 } 1477 phastaio_time(&t1); 1478 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1479 phastaio_addReadTime(elapsed); 1480 phastaio_addReadBytes(nItems*type_size); 1481 } 1482 1483 void readdatablock( 1484 int* fileDescriptor, 1485 const char keyphrase[], 1486 void* valueArray, 1487 int* nItems, 1488 const char datatype[], 1489 const char iotype[] ) 1490 { 1491 //if(irank == 0) printf("entering readdatablock()\n"); 1492 unsigned long data_size = 0; 1493 double timer_start, timer_end; 1494 startTimer(&timer_start); 1495 1496 int i = *fileDescriptor; 1497 checkFileDescriptor("readdatablock",&i); 1498 1499 if ( PhastaIONextActiveIndex == 0 ) { 1500 int filePtr = *fileDescriptor - 1; 1501 FILE* fileObject; 1502 1503 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1504 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1505 fprintf(stderr,"openfile_ function has to be called before\n") ; 1506 fprintf(stderr,"acessing the file\n ") ; 1507 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1508 endTimer(&timer_end); 1509 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1510 return; 1511 } 1512 1513 // error check.. 1514 // since we require that a consistant header always preceed the data block 1515 // let us check to see that it is actually the case. 1516 1517 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1518 fprintf(stderr, "Header not consistant with data block\n"); 1519 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1520 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1521 fprintf(stderr, "Please recheck read sequence \n"); 1522 if( Strict_Error ) { 1523 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1524 endTimer(&timer_end); 1525 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1526 return; 1527 } 1528 } 1529 1530 if ( LastHeaderNotFound ) { 1531 endTimer(&timer_end); 1532 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1533 return; 1534 } 1535 fileObject = fileArray[ filePtr ]; 1536 Wrong_Endian = byte_order[ filePtr ]; 1537 LastHeaderKey.erase(filePtr); 1538 readDataBlock(fileObject,valueArray,*nItems,datatype,iotype); 1539 1540 //return; 1541 } 1542 else { 1543 // printf("read data block\n"); 1544 MPI_Status read_data_status; 1545 size_t type_size = typeSize( datatype ); 1546 int nUnits = *nItems; 1547 isBinary( iotype ); 1548 1549 // read datablock then 1550 //MR CHANGE 1551 // if ( cscompare ( datatype, "double")) 1552 char* ts2 = StringStripper( datatype ); 1553 if ( cscompare ( "double" , ts2)) 1554 //MR CHANGE END 1555 { 1556 1557 phastaioTime t0,t1; 1558 phastaio_time(&t0); 1559 MPI_File_read_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1560 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1561 valueArray, 1562 nUnits, 1563 MPI_DOUBLE ); 1564 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1565 valueArray, 1566 &read_data_status ); 1567 data_size=8*nUnits; 1568 phastaio_time(&t1); 1569 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1570 phastaio_addReadTime(elapsed); 1571 phastaio_addReadBytes(nUnits*sizeof(double)); 1572 } 1573 //MR CHANGE 1574 // else if ( cscompare ( datatype, "integer")) 1575 else if ( cscompare ( "integer" , ts2)) 1576 //MR CHANGE END 1577 { 1578 phastaioTime t0,t1; 1579 phastaio_time(&t0); 1580 MPI_File_read_at_all_begin(PhastaIOActiveFiles[i]->file_handle, 1581 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1582 valueArray, 1583 nUnits, 1584 MPI_INT ); 1585 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1586 valueArray, 1587 &read_data_status ); 1588 data_size=4*nUnits; 1589 phastaio_time(&t1); 1590 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1591 phastaio_addReadTime(elapsed); 1592 phastaio_addReadBytes(nUnits*sizeof(int)); 1593 } 1594 else 1595 { 1596 *fileDescriptor = DATA_TYPE_ILLEGAL; 1597 printf("readdatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 1598 endTimer(&timer_end); 1599 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1600 return; 1601 } 1602 free(ts2); 1603 1604 1605 // printf("%d Read finishe\n",PhastaIOActiveFiles[i]->myrank); 1606 1607 // Swap data byte order if endianess is different ... 1608 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) 1609 { 1610 SwapArrayByteOrder( valueArray, type_size, nUnits ); 1611 } 1612 } 1613 1614 endTimer(&timer_end); 1615 char extra_msg[1024]; 1616 memset(extra_msg, '\0', 1024); 1617 char* key = StringStripper(keyphrase); 1618 sprintf(extra_msg, " field is %s ", key); 1619 printPerf("readdatablock", timer_start, timer_end, data_size, 1, extra_msg); 1620 free(key); 1621 1622 } 1623 1624 void writeHeader( 1625 FILE* f, 1626 const char keyphrase[], 1627 const void* valueArray, 1628 const int nItems, 1629 const int ndataItems, 1630 const char datatype[], 1631 const char iotype[]) 1632 { 1633 isBinary( iotype ); 1634 1635 const int _newline = 1636 ( ndataItems > 0 ) ? sizeof( char ) : 0; 1637 int size_of_nextblock = 1638 ( binary_format ) ? typeSize(datatype) * ndataItems + _newline : ndataItems; 1639 1640 fprintf( f, "%s : < %d > ", keyphrase, size_of_nextblock ); 1641 for( int i = 0; i < nItems; i++ ) 1642 fprintf( f, "%d ", *((int*)((int*)valueArray+i))); 1643 fprintf( f, "\n"); 1644 } 1645 1646 void writeheader( 1647 const int* fileDescriptor, 1648 const char keyphrase[], 1649 const void* valueArray, 1650 const int* nItems, 1651 const int* ndataItems, 1652 const char datatype[], 1653 const char iotype[]) 1654 { 1655 1656 //if(irank == 0) printf("entering writeheader()\n"); 1657 1658 double timer_start, timer_end; 1659 startTimer(&timer_start); 1660 1661 int i = *fileDescriptor; 1662 checkFileDescriptor("writeheader",&i); 1663 1664 if ( PhastaIONextActiveIndex == 0 ) { 1665 int filePtr = *fileDescriptor - 1; 1666 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1667 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1668 fprintf(stderr,"openfile_ function has to be called before \n") ; 1669 fprintf(stderr,"acessing the file\n ") ; 1670 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1671 endTimer(&timer_end); 1672 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1673 return; 1674 } 1675 1676 LastHeaderKey[filePtr] = std::string(keyphrase); 1677 DataSize = *ndataItems; 1678 FILE* fileObject = fileArray[ filePtr ] ; 1679 header_type[ filePtr ] = typeSize( datatype ); 1680 writeHeader(fileObject,keyphrase,valueArray,*nItems, 1681 *ndataItems,datatype,iotype); 1682 } 1683 else { // else it's parallel I/O 1684 DataSize = *ndataItems; 1685 size_t type_size = typeSize( datatype ); 1686 isBinary( iotype ); 1687 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1688 1689 int string_length = strlen( keyphrase ); 1690 char* buffer = (char*) malloc ( string_length+1 ); 1691 1692 strcpy ( buffer, keyphrase); 1693 buffer[ string_length ] = '\0'; 1694 1695 char* st2 = strtok ( buffer, "@" ); 1696 st2 = strtok (NULL, "@"); 1697 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1698 1699 if ( char* p = strpbrk(buffer, "@") ) 1700 *p = '\0'; 1701 printf("Printing field count %d\n", PhastaIOActiveFiles[i]->field_count); 1702 printf("Printing max field %d\n", MAX_FIELDS_NUMBER ); 1703 printf("Printing part count %d\n", PhastaIOActiveFiles[i]->part_count); 1704 printf("Printing nppp %d\n", PhastaIOActiveFiles[i]->nppp); 1705 assert(PhastaIOActiveFiles[i]->field_count < MAX_FIELDS_NUMBER); 1706 assert(PhastaIOActiveFiles[i]->part_count < PhastaIOActiveFiles[i]->nppp); 1707 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1708 sprintf(mpi_tag, "\n%s : %d\n", buffer, PhastaIOActiveFiles[i]->field_count); 1709 unsigned long offset_value; 1710 1711 int temp = *ndataItems; 1712 unsigned long number_of_items = (unsigned long)temp; 1713 MPI_Barrier(PhastaIOActiveFiles[i]->local_comm); 1714 1715 MPI_Scan( &number_of_items, 1716 &offset_value, 1717 1, 1718 MPI_LONG_LONG_INT, 1719 MPI_SUM, 1720 PhastaIOActiveFiles[i]->local_comm ); 1721 1722 offset_value = (offset_value - number_of_items) * type_size; 1723 1724 offset_value += PhastaIOActiveFiles[i]->local_myrank * 1725 DB_HEADER_SIZE + 1726 PhastaIOActiveFiles[i]->next_start_address; 1727 // This offset is the starting address of each datablock header... 1728 PhastaIOActiveFiles[i]->my_offset = offset_value; 1729 1730 // Write in my offset table ... 1731 PhastaIOActiveFiles[i]->my_offset_table[PhastaIOActiveFiles[i]->field_count][PhastaIOActiveFiles[i]->part_count] = PhastaIOActiveFiles[i]->my_offset; 1732 1733 // Update the next-start-address ... 1734 PhastaIOActiveFiles[i]->next_start_address = offset_value + 1735 number_of_items * type_size + 1736 DB_HEADER_SIZE; 1737 MPI_Bcast( &(PhastaIOActiveFiles[i]->next_start_address), 1738 1, 1739 MPI_LONG_LONG_INT, 1740 PhastaIOActiveFiles[i]->local_numprocs-1, 1741 PhastaIOActiveFiles[i]->local_comm ); 1742 1743 // Prepare datablock header ... 1744 int _newline = (*ndataItems>0)?sizeof(char):0; 1745 unsigned int size_of_nextblock = type_size * (*ndataItems) + _newline; 1746 1747 //char datablock_header[255]; 1748 //bzero((void*)datablock_header,255); 1749 char datablock_header[DB_HEADER_SIZE]; 1750 bzero((void*)datablock_header,DB_HEADER_SIZE); 1751 1752 PhastaIOActiveFiles[i]->GPid = PhastaIOActiveFiles[i]->nppp*PhastaIOActiveFiles[i]->myrank+PhastaIOActiveFiles[i]->part_count; 1753 sprintf( datablock_header, 1754 "\n%s : < %u >", 1755 keyphrase, 1756 size_of_nextblock ); 1757 1758 for ( int j = 0; j < *nItems; j++ ) 1759 { 1760 sprintf( datablock_header, 1761 "%s %d ", 1762 datablock_header, 1763 *((int*)((int*)valueArray+j))); 1764 } 1765 sprintf( datablock_header, 1766 "%s\n ", 1767 datablock_header ); 1768 1769 // Write datablock header ... 1770 //MR CHANGE 1771 // if ( cscompare(datatype,"double") ) 1772 char* ts1 = StringStripper( datatype ); 1773 if ( cscompare("double",ts1) ) 1774 //MR CHANGE END 1775 { 1776 free ( PhastaIOActiveFiles[i]->double_chunk ); 1777 PhastaIOActiveFiles[i]->double_chunk = ( double * )malloc( (sizeof( double )*number_of_items+ DB_HEADER_SIZE)); 1778 1779 double * aa = ( double * )datablock_header; 1780 memcpy(PhastaIOActiveFiles[i]->double_chunk, aa, DB_HEADER_SIZE); 1781 } 1782 //MR CHANGE 1783 // if ( cscompare(datatype,"integer") ) 1784 else if ( cscompare("integer",ts1) ) 1785 //MR CHANGE END 1786 { 1787 free ( PhastaIOActiveFiles[i]->int_chunk ); 1788 PhastaIOActiveFiles[i]->int_chunk = ( int * )malloc( (sizeof( int )*number_of_items+ DB_HEADER_SIZE)); 1789 1790 int * aa = ( int * )datablock_header; 1791 memcpy(PhastaIOActiveFiles[i]->int_chunk, aa, DB_HEADER_SIZE); 1792 } 1793 else { 1794 // *fileDescriptor = DATA_TYPE_ILLEGAL; 1795 printf("writeheader - DATA_TYPE_ILLEGAL - %s\n",datatype); 1796 endTimer(&timer_end); 1797 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1798 return; 1799 } 1800 free(ts1); 1801 1802 PhastaIOActiveFiles[i]->part_count++; 1803 if ( PhastaIOActiveFiles[i]->part_count == PhastaIOActiveFiles[i]->nppp ) { 1804 //A new field will be written 1805 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1806 memcpy( PhastaIOActiveFiles[i]->master_header + 1807 PhastaIOActiveFiles[i]->field_count * 1808 MAX_FIELDS_NAME_LENGTH + 1809 MAX_FIELDS_NAME_LENGTH * 2, 1810 mpi_tag, 1811 MAX_FIELDS_NAME_LENGTH-1); 1812 } 1813 PhastaIOActiveFiles[i]->field_count++; 1814 PhastaIOActiveFiles[i]->part_count=0; 1815 } 1816 free(buffer); 1817 } 1818 1819 endTimer(&timer_end); 1820 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1821 } 1822 1823 void writeDataBlock( 1824 FILE* f, 1825 const void* valueArray, 1826 const int nItems, 1827 const char datatype[], 1828 const char iotype[] ) 1829 { 1830 isBinary( iotype ); 1831 size_t type_size = typeSize( datatype ); 1832 phastaioTime t0,t1; 1833 phastaio_time(&t0); 1834 if ( binary_format ) { 1835 fwrite( valueArray, type_size, nItems, f ); 1836 fprintf( f,"\n"); 1837 } else { 1838 char* ts1 = StringStripper( datatype ); 1839 if ( cscompare( "integer", ts1 ) ) { 1840 const int* vals = (int*) valueArray; 1841 for( int n=0; n < nItems ; n++ ) 1842 fprintf(f,"%d\n",vals[n]); 1843 } else if ( cscompare( "double", ts1 ) ) { 1844 const double* vals = (double*) valueArray; 1845 for( int n=0; n < nItems ; n++ ) 1846 fprintf(f,"%f\n",vals[n]); 1847 } 1848 free (ts1); 1849 } 1850 phastaio_time(&t1); 1851 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1852 phastaio_addWriteTime(elapsed); 1853 phastaio_addWriteBytes(nItems*type_size); 1854 } 1855 1856 void writedatablock( 1857 const int* fileDescriptor, 1858 const char keyphrase[], 1859 const void* valueArray, 1860 const int* nItems, 1861 const char datatype[], 1862 const char iotype[] ) 1863 { 1864 //if(irank == 0) printf("entering writedatablock()\n"); 1865 1866 unsigned long data_size = 0; 1867 double timer_start, timer_end; 1868 startTimer(&timer_start); 1869 1870 int i = *fileDescriptor; 1871 checkFileDescriptor("writedatablock",&i); 1872 1873 if ( PhastaIONextActiveIndex == 0 ) { 1874 int filePtr = *fileDescriptor - 1; 1875 1876 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1877 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1878 fprintf(stderr,"openfile_ function has to be called before \n") ; 1879 fprintf(stderr,"acessing the file\n ") ; 1880 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1881 endTimer(&timer_end); 1882 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1883 return; 1884 } 1885 // since we require that a consistant header always preceed the data block 1886 // let us check to see that it is actually the case. 1887 1888 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1889 fprintf(stderr, "Header not consistant with data block\n"); 1890 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1891 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1892 fprintf(stderr, "Please recheck write sequence \n"); 1893 if( Strict_Error ) { 1894 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1895 endTimer(&timer_end); 1896 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1897 return; 1898 } 1899 } 1900 1901 FILE* fileObject = fileArray[ filePtr ] ; 1902 size_t type_size=typeSize( datatype ); 1903 isBinary( iotype ); 1904 1905 LastHeaderKey.erase(filePtr); 1906 1907 if ( header_type[filePtr] != (int)type_size ) { 1908 fprintf(stderr,"header and datablock differ on typeof data in the block for\n"); 1909 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1910 if( Strict_Error ) { 1911 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1912 endTimer(&timer_end); 1913 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1914 return; 1915 } 1916 } 1917 1918 int nUnits = *nItems; 1919 1920 if ( nUnits != DataSize ) { 1921 fprintf(stderr,"header and datablock differ on number of data items for\n"); 1922 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1923 if( Strict_Error ) { 1924 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1925 endTimer(&timer_end); 1926 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1927 return; 1928 } 1929 } 1930 writeDataBlock(fileObject,valueArray,*nItems,datatype,iotype); 1931 } 1932 else { // syncIO case 1933 MPI_Status write_data_status; 1934 isBinary( iotype ); 1935 int nUnits = *nItems; 1936 1937 //MR CHANGE 1938 // if ( cscompare(datatype,"double") ) 1939 char* ts1 = StringStripper( datatype ); 1940 if ( cscompare("double",ts1) ) 1941 //MR CHANGE END 1942 { 1943 memcpy((PhastaIOActiveFiles[i]->double_chunk+DB_HEADER_SIZE/sizeof(double)), valueArray, nUnits*sizeof(double)); 1944 phastaioTime t0,t1; 1945 phastaio_time(&t0); 1946 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1947 PhastaIOActiveFiles[i]->my_offset, 1948 PhastaIOActiveFiles[i]->double_chunk, 1949 //BLOCK_SIZE/sizeof(double), 1950 nUnits+DB_HEADER_SIZE/sizeof(double), 1951 MPI_DOUBLE ); 1952 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1953 PhastaIOActiveFiles[i]->double_chunk, 1954 &write_data_status ); 1955 data_size=8*nUnits; 1956 phastaio_time(&t1); 1957 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1958 phastaio_addWriteTime(elapsed); 1959 phastaio_addWriteBytes((nUnits*sizeof(double))+DB_HEADER_SIZE); 1960 } 1961 //MR CHANGE 1962 // else if ( cscompare ( datatype, "integer")) 1963 else if ( cscompare("integer",ts1) ) 1964 //MR CHANGE END 1965 { 1966 memcpy((PhastaIOActiveFiles[i]->int_chunk+DB_HEADER_SIZE/sizeof(int)), valueArray, nUnits*sizeof(int)); 1967 phastaioTime t0,t1; 1968 phastaio_time(&t0); 1969 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1970 PhastaIOActiveFiles[i]->my_offset, 1971 PhastaIOActiveFiles[i]->int_chunk, 1972 nUnits+DB_HEADER_SIZE/sizeof(int), 1973 MPI_INT ); 1974 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1975 PhastaIOActiveFiles[i]->int_chunk, 1976 &write_data_status ); 1977 data_size=4*nUnits; 1978 phastaio_time(&t1); 1979 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1980 phastaio_addWriteTime(elapsed); 1981 phastaio_addWriteBytes((nUnits*sizeof(int))+DB_HEADER_SIZE); 1982 } 1983 else { 1984 printf("Error: writedatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 1985 endTimer(&timer_end); 1986 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1987 return; 1988 } 1989 free(ts1); 1990 } 1991 1992 endTimer(&timer_end); 1993 char extra_msg[1024]; 1994 memset(extra_msg, '\0', 1024); 1995 char* key = StringStripper(keyphrase); 1996 sprintf(extra_msg, " field is %s ", key); 1997 printPerf("writedatablock", timer_start, timer_end, data_size, 1, extra_msg); 1998 free(key); 1999 2000 } 2001 2002 void SwapArrayByteOrder( void* array, int nbytes, int nItems ) 2003 { 2004 /* This swaps the byte order for the array of nItems each 2005 of size nbytes , This will be called only locally */ 2006 int i,j; 2007 unsigned char* ucDst = (unsigned char*)array; 2008 for(i=0; i < nItems; i++) { 2009 for(j=0; j < (nbytes/2); j++) 2010 std::swap( ucDst[j] , ucDst[(nbytes - 1) - j] ); 2011 ucDst += nbytes; 2012 } 2013 } 2014 2015 void writestring( int* fileDescriptor, const char inString[] ) 2016 { 2017 int filePtr = *fileDescriptor - 1; 2018 FILE* fileObject = fileArray[filePtr] ; 2019 fprintf(fileObject,"%s",inString ); 2020 return; 2021 } 2022 2023 void Gather_Headers( int* fileDescriptor, vector< string >& headers ) 2024 { 2025 FILE* fileObject; 2026 char Line[1024]; 2027 2028 fileObject = fileArray[ (*fileDescriptor)-1 ]; 2029 2030 while( !feof(fileObject) ) { 2031 fgets( Line, 1024, fileObject); 2032 if ( Line[0] == '#' ) { 2033 headers.push_back( Line ); 2034 } else { 2035 break; 2036 } 2037 } 2038 rewind( fileObject ); 2039 clearerr( fileObject ); 2040 } 2041 2042 void isWrong( void ) { 2043 (Wrong_Endian) ? fprintf(stdout,"YES\n") : fprintf(stdout,"NO\n"); 2044 } 2045 2046 void togglestrictmode( void ) { Strict_Error = !Strict_Error; } 2047 2048 int isLittleEndian( void ) 2049 { 2050 // this function returns a 1 if the current running architecture is 2051 // LittleEndian Byte Ordered, else it returns a zero 2052 union { 2053 long a; 2054 char c[sizeof( long )]; 2055 } endianUnion; 2056 endianUnion.a = 1 ; 2057 if ( endianUnion.c[sizeof(long)-1] != 1 ) return 1 ; 2058 else return 0; 2059 } 2060 2061 namespace PHASTA { 2062 const char* const PhastaIO_traits<int>::type_string = "integer"; 2063 const char* const PhastaIO_traits<double>::type_string = "double"; 2064 } 2065