1 /* 2 * 3 * This is the SyncIO library that uses MPI-IO collective functions to 4 * implement a flexible I/O checkpoint solution for a large number of 5 * processors. 6 * 7 * Previous developer: Ning Liu (liun2@cs.rpi.edu) 8 * Jing Fu (fuj@cs.rpi.edu) 9 * Current developers: Michel Rasquin (Michel.Rasquin@colorado.edu), 10 * Ben Matthews (benjamin.a.matthews@colorado.edu) 11 * 12 */ 13 14 #include <map> 15 #include <vector> 16 #include <string> 17 #include <cstdarg> 18 #include <string.h> 19 #include <ctype.h> 20 #include <stdlib.h> 21 #include <stdio.h> 22 #include <math.h> 23 #include <sstream> 24 #include "phastaIO.h" 25 #include "phiompi.h" 26 #include "mpi.h" 27 #include "phiotmrc.h" 28 #include "phiostats.h" 29 #include <assert.h> 30 31 /* OS-specific things try to stay here */ 32 #include <sys/stat.h> 33 #include <unistd.h> 34 #include <errno.h> 35 36 37 #define VERSION_INFO_HEADER_SIZE 8192 38 #define DB_HEADER_SIZE 1024 39 #define ONE_MEGABYTE 1048576 40 #define TWO_MEGABYTE 2097152 41 #define ENDIAN_TEST_NUMBER 12180 // Troy's Zip Code!! 42 #define MAX_PHASTA_FILES 64 43 #define MAX_PHASTA_FILE_NAME_LENGTH 1024 44 #define MAX_FIELDS_NUMBER ((VERSION_INFO_HEADER_SIZE/MAX_FIELDS_NAME_LENGTH)-4) // The meta data include - MPI_IO_Tag, nFields, nFields*names of the fields, nppf 45 // -3 for MPI_IO_Tag, nFields and nppf, -4 for extra security (former nFiles) 46 #define MAX_FIELDS_NAME_LENGTH 128 47 #define DefaultMHSize (4*ONE_MEGABYTE) 48 //#define DefaultMHSize (8350) //For test 49 #define LATEST_WRITE_VERSION 1 50 #define inv1024sq 953.674316406e-9 // = 1/1024/1024 51 int MasterHeaderSize = -1; 52 53 bool PRINT_PERF = false; // default print no perf results 54 int irank = -1; // global rank, should never be manually manipulated 55 int mysize = -1; 56 57 // Static variables are bad but used here to store the subcommunicator and associated variables 58 // Prevent MPI_Comm_split to be called more than once, especially on BGQ with the V1R2M1 driver (leak detected in MPI_Comm_split - IBM working on it) 59 static int s_assign_local_comm = 0; 60 static MPI_Comm s_local_comm; 61 static int s_local_size = -1; 62 static int s_local_rank = -1; 63 64 // the following defines are for debug printf 65 #define PHASTAIO_DEBUG 0 //default to not print any debugging info 66 67 void phprintf(const char* fmt, ...) { 68 (void)fmt; 69 #if PHASTAIO_DEBUG 70 char format[1024]; 71 snprintf(format, sizeof(format), "phastaIO debug: %s", fmt); 72 va_list ap; 73 va_start(ap,fmt); 74 vprintf(format,ap); 75 va_end(ap); 76 #endif 77 } 78 79 void phprintf_0(const char* fmt, ...) { 80 (void)fmt; 81 #if PHASTAIO_DEBUG 82 int rank = 0; 83 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 84 if(rank == 0){ 85 char format[1024]; 86 snprintf(format, sizeof(format), "phastaIO debug: irank=0 %s", fmt); 87 va_list ap; 88 va_start(ap,fmt); 89 vprintf(format, ap); 90 va_end(ap); 91 } 92 #endif 93 } 94 95 enum PhastaIO_Errors 96 { 97 MAX_PHASTA_FILES_EXCEEDED = -1, 98 UNABLE_TO_OPEN_FILE = -2, 99 NOT_A_MPI_FILE = -3, 100 GPID_EXCEEDED = -4, 101 DATA_TYPE_ILLEGAL = -5 102 }; 103 104 using namespace std; 105 106 namespace{ 107 108 map<int, std::string> LastHeaderKey; 109 vector< FILE* > fileArray; 110 vector< bool > byte_order; 111 vector< int > header_type; 112 int DataSize=0; 113 bool LastHeaderNotFound = false; 114 bool Wrong_Endian = false ; 115 bool Strict_Error = false ; 116 bool binary_format = true; 117 118 /***********************************************************************/ 119 /***************** NEW PHASTA IO CODE STARTS HERE **********************/ 120 /***********************************************************************/ 121 122 typedef struct 123 { 124 char filename[MAX_PHASTA_FILE_NAME_LENGTH]; /* defafults to 1024 */ 125 unsigned long my_offset; 126 unsigned long next_start_address; 127 unsigned long **my_offset_table; 128 unsigned long **my_read_table; 129 130 double * double_chunk; 131 double * read_double_chunk; 132 133 int field_count; 134 int part_count; 135 int read_field_count; 136 int read_part_count; 137 int GPid; 138 int start_id; 139 140 int mhsize; 141 142 int myrank; 143 int numprocs; 144 int local_myrank; 145 int local_numprocs; 146 147 int nppp; 148 int nPPF; 149 int nFiles; 150 int nFields; 151 152 int * int_chunk; 153 int * read_int_chunk; 154 155 int Wrong_Endian; /* default to false */ 156 char * master_header; 157 MPI_File file_handle; 158 MPI_Comm local_comm; 159 } phastaio_file_t; 160 161 typedef struct 162 { 163 int nppf, nfields; 164 char * masterHeader; 165 }serial_file; 166 167 serial_file *SerialFile; 168 phastaio_file_t *PhastaIOActiveFiles[MAX_PHASTA_FILES]; 169 int PhastaIONextActiveIndex = 0; /* indicates next index to allocate */ 170 171 // the caller has the responsibility to delete the returned string 172 // TODO: StringStipper("nbc value? ") returns NULL? 173 char* StringStripper( const char istring[] ) { 174 int length = strlen( istring ); 175 char* dest = (char *)malloc( length + 1 ); 176 strcpy( dest, istring ); 177 dest[ length ] = '\0'; 178 if ( char* p = strpbrk( dest, " ") ) 179 *p = '\0'; 180 return dest; 181 } 182 183 inline int cscompare( const char teststring[], const char targetstring[] ) { 184 char* s1 = const_cast<char*>(teststring); 185 char* s2 = const_cast<char*>(targetstring); 186 187 while( *s1 == ' ') s1++; 188 while( *s2 == ' ') s2++; 189 while( ( *s1 ) 190 && ( *s2 ) 191 && ( *s2 != '?') 192 && ( tolower( *s1 )==tolower( *s2 ) ) ) { 193 s1++; 194 s2++; 195 while( *s1 == ' ') s1++; 196 while( *s2 == ' ') s2++; 197 } 198 if ( !( *s1 ) || ( *s1 == '?') ) return 1; 199 else return 0; 200 } 201 202 inline void isBinary( const char iotype[] ) { 203 char* fname = StringStripper( iotype ); 204 if ( cscompare( fname, "binary" ) ) binary_format = true; 205 else binary_format = false; 206 free (fname); 207 208 } 209 210 inline size_t typeSize( const char typestring[] ) { 211 char* ts1 = StringStripper( typestring ); 212 if ( cscompare( "integer", ts1 ) ) { 213 free (ts1); 214 return sizeof(int); 215 } else if ( cscompare( "double", ts1 ) ) { 216 free (ts1); 217 return sizeof( double ); 218 } else { 219 free (ts1); 220 fprintf(stderr,"unknown type : %s\n",ts1); 221 return 0; 222 } 223 } 224 225 int readHeader( 226 FILE* fileObject, 227 const char phrase[], 228 int* params, 229 int expect ) { 230 char* text_header; 231 char* token; 232 char Line[1024] = "\0"; 233 char junk; 234 bool FOUND = false ; 235 int real_length; 236 int skip_size, integer_value; 237 int rewind_count=0; 238 239 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 240 rewind( fileObject ); 241 clearerr( fileObject ); 242 rewind_count++; 243 fgets( Line, 1024, fileObject ); 244 } 245 246 while( !FOUND && ( rewind_count < 2 ) ) { 247 if ( ( Line[0] != '\n' ) && ( real_length = strcspn( Line, "#" )) ) { 248 text_header = (char *)malloc( real_length + 1 ); 249 strncpy( text_header, Line, real_length ); 250 text_header[ real_length ] =static_cast<char>(NULL); 251 token = strtok ( text_header, ":" ); 252 assert(token); 253 if( cscompare( phrase , token ) ) { 254 FOUND = true ; 255 token = strtok( NULL, " ,;<>" ); 256 assert(token); 257 skip_size = atoi( token ); 258 int i; 259 for( i=0; i < expect && ( token = strtok( NULL," ,;<>") ); i++) { 260 assert(token); 261 params[i] = atoi( token ); 262 } 263 if ( i < expect ) { 264 fprintf(stderr, 265 "Aloha Expected # of ints not found for: %s\n",phrase ); 266 } 267 } else if ( cscompare(token,"byteorder magic number") ) { 268 if ( binary_format ) { 269 fread((void*)&integer_value,sizeof(int),1,fileObject); 270 fread( &junk, sizeof(char), 1 , fileObject ); 271 if ( 362436 != integer_value ) Wrong_Endian = true; 272 } else{ 273 fscanf(fileObject, "%d\n", &integer_value ); 274 } 275 } else { 276 /* some other header, so just skip over */ 277 token = strtok( NULL, " ,;<>" ); 278 assert(token); 279 skip_size = atoi( token ); 280 if ( binary_format) 281 fseek( fileObject, skip_size, SEEK_CUR ); 282 else 283 for( int gama=0; gama < skip_size; gama++ ) 284 fgets( Line, 1024, fileObject ); 285 } 286 free (text_header); 287 } // end of if before while loop 288 289 if ( !FOUND ) 290 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 291 rewind( fileObject ); 292 clearerr( fileObject ); 293 rewind_count++; 294 fgets( Line, 1024, fileObject ); 295 } 296 } 297 298 if ( !FOUND ) { 299 //fprintf(stderr, "Error: Could not find: %s\n", phrase); 300 if(irank == 0) printf("WARNING: Could not find: %s\n", phrase); 301 return 1; 302 } 303 return 0; 304 } 305 306 } // end unnamed namespace 307 308 309 // begin of publicly visible functions 310 311 /** 312 * This function takes a long long pointer and assign (start) phiotmrc value to it 313 */ 314 void startTimer(double* start) { 315 if( !PRINT_PERF ) return; 316 MPI_Barrier(MPI_COMM_WORLD); 317 *start = phiotmrc(); 318 } 319 320 /** 321 * This function takes a long long pointer and assign (end) phiotmrc value to it 322 */ 323 void endTimer(double* end) { 324 if( !PRINT_PERF ) return; 325 *end = phiotmrc(); 326 MPI_Barrier(MPI_COMM_WORLD); 327 } 328 329 /** 330 * choose to print some performance results (or not) according to 331 * the PRINT_PERF macro 332 */ 333 void printPerf( 334 const char* func_name, 335 double start, 336 double end, 337 unsigned long datasize, 338 int printdatainfo, 339 const char* extra_msg) { 340 if( !PRINT_PERF ) return; 341 unsigned long data_size = datasize; 342 double time = end - start; 343 unsigned long isizemin,isizemax,isizetot; 344 double sizemin,sizemax,sizeavg,sizetot,rate; 345 double tmin, tmax, tavg, ttot; 346 347 MPI_Allreduce(&time, &tmin,1, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD); 348 MPI_Allreduce(&time, &tmax,1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); 349 MPI_Allreduce(&time, &ttot,1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); 350 tavg = ttot/mysize; 351 352 if(irank == 0) { 353 if ( PhastaIONextActiveIndex == 0 ) printf("** 1PFPP "); 354 else printf("** syncIO "); 355 printf("%s(): Tmax = %f sec, Tmin = %f sec, Tavg = %f sec", func_name, tmax, tmin, tavg); 356 } 357 358 if(printdatainfo == 1) { // if printdatainfo ==1, compute I/O rate and block size 359 MPI_Allreduce(&data_size,&isizemin,1,MPI_LONG_LONG_INT,MPI_MIN,MPI_COMM_WORLD); 360 MPI_Allreduce(&data_size,&isizemax,1,MPI_LONG_LONG_INT,MPI_MAX,MPI_COMM_WORLD); 361 MPI_Allreduce(&data_size,&isizetot,1,MPI_LONG_LONG_INT,MPI_SUM,MPI_COMM_WORLD); 362 363 sizemin=(double)(isizemin*inv1024sq); 364 sizemax=(double)(isizemax*inv1024sq); 365 sizetot=(double)(isizetot*inv1024sq); 366 sizeavg=(double)(1.0*sizetot/mysize); 367 rate=(double)(1.0*sizetot/tmax); 368 369 if( irank == 0) { 370 printf(", Rate = %f MB/s [%s] \n \t\t\t" 371 " block size: Min= %f MB; Max= %f MB; Avg= %f MB; Tot= %f MB\n", 372 rate, extra_msg, sizemin,sizemax,sizeavg,sizetot); 373 } 374 } 375 else { 376 if(irank == 0) { 377 printf(" \n"); 378 //printf(" (%s) \n", extra_msg); 379 } 380 } 381 } 382 383 /** 384 * This function is normally called at the beginning of a read operation, before 385 * init function. 386 * This function (uses rank 0) reads out nfields, nppf, master header size, 387 * endianess and allocates for masterHeader string. 388 * These values are essential for following read operations. Rank 0 will bcast 389 * these values to other ranks in the commm world 390 * 391 * If the file set is of old POSIX format, it would throw error and exit 392 */ 393 void queryphmpiio(const char filename[],int *nfields, int *nppf) 394 { 395 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 396 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 397 398 if(irank == 0) { 399 FILE * fileHandle; 400 char* fname = StringStripper( filename ); 401 402 phioTime t0,t1; 403 phastaio_time(&t0); 404 fileHandle = fopen (fname,"rb"); 405 phastaio_time(&t1); 406 const size_t elapsed = phastaio_time_diff(&t0,&t1); 407 phastaio_addOpenTime(elapsed); 408 if (fileHandle == NULL ) { 409 printf("\nError: File %s doesn't exist! Please check!\n",fname); 410 } 411 else { 412 SerialFile =(serial_file *)calloc( 1, sizeof( serial_file) ); 413 int meta_size_limit = VERSION_INFO_HEADER_SIZE; 414 SerialFile->masterHeader = (char *)malloc( meta_size_limit ); 415 fread(SerialFile->masterHeader, 1, meta_size_limit, fileHandle); 416 417 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 418 char version[MAX_FIELDS_NAME_LENGTH/4]; 419 int mhsize; 420 char * token; 421 int magic_number; 422 423 memcpy( read_out_tag, 424 SerialFile->masterHeader, 425 MAX_FIELDS_NAME_LENGTH-1 ); 426 427 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) { 428 // Test endianess ... 429 memcpy (&magic_number, 430 SerialFile->masterHeader + sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 431 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 432 433 if ( magic_number != ENDIAN_TEST_NUMBER ) { 434 printf("Endian is different!\n"); 435 // Will do swap later 436 } 437 438 // test version, old version, default masterheader size is 4M 439 // newer version masterheader size is read from first line 440 memcpy(version, 441 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/2, 442 MAX_FIELDS_NAME_LENGTH/4 - 1); //TODO: why -1? 443 444 if( cscompare ("version",version) ) { 445 // if there is "version" tag in the file, then it is newer format 446 // read master header size from here, otherwise use default 447 // Note: if version is "1", we know mhsize is at 3/4 place... 448 449 token = strtok(version, ":"); 450 token = strtok(NULL, " ,;<>" ); 451 int iversion = atoi(token); 452 453 if( iversion == 1) { 454 memcpy( &mhsize, 455 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/4*3 + sizeof("mhsize : ")-1, 456 sizeof(int)); 457 if ( magic_number != ENDIAN_TEST_NUMBER ) 458 SwapArrayByteOrder(&mhsize, sizeof(int), 1); 459 460 if( mhsize > DefaultMHSize ) { 461 //if actual headersize is larger than default, let's re-read 462 free(SerialFile->masterHeader); 463 SerialFile->masterHeader = (char *)malloc(mhsize); 464 fseek(fileHandle, 0, SEEK_SET); // reset the file stream position 465 fread(SerialFile->masterHeader,1,mhsize,fileHandle); 466 } 467 } 468 //TODO: check if this is a valid int?? 469 MasterHeaderSize = mhsize; 470 } 471 else { // else it's version 0's format w/o version tag, implicating MHSize=4M 472 MasterHeaderSize = DefaultMHSize; 473 } 474 475 memcpy( read_out_tag, 476 SerialFile->masterHeader+MAX_FIELDS_NAME_LENGTH+1, 477 MAX_FIELDS_NAME_LENGTH ); //TODO: why +1 478 479 // Read in # fields ... 480 token = strtok( read_out_tag, ":" ); 481 token = strtok( NULL," ,;<>" ); 482 *nfields = atoi( token ); 483 if ( *nfields > MAX_FIELDS_NUMBER) { 484 printf("Error queryphmpiio: nfields is larger than MAX_FIELDS_NUMBER!\n"); 485 } 486 SerialFile->nfields=*nfields; //TODO: sanity check of this int? 487 488 memcpy( read_out_tag, 489 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH * 2 490 + *nfields * MAX_FIELDS_NAME_LENGTH, 491 MAX_FIELDS_NAME_LENGTH); 492 493 token = strtok( read_out_tag, ":" ); 494 token = strtok( NULL," ,;<>" ); 495 *nppf = atoi( token ); 496 SerialFile->nppf=*nppf; //TODO: sanity check of int 497 } // end of if("MPI_IO_TAG") 498 else { 499 printf("Error queryphmpiio: The file you opened is not of syncIO new format, please check! read_out_tag = %s\n",read_out_tag); 500 exit(1); 501 } 502 phioTime t0,t1; 503 phastaio_time(&t0); 504 fclose(fileHandle); 505 phastaio_time(&t1); 506 const size_t elapsed = phastaio_time_diff(&t0,&t1); 507 phastaio_addCloseTime(elapsed); 508 free(SerialFile->masterHeader); 509 free(SerialFile); 510 } //end of else 511 free(fname); 512 } 513 514 // Bcast value to every one 515 MPI_Bcast( nfields, 1, MPI_INT, 0, MPI_COMM_WORLD ); 516 MPI_Bcast( nppf, 1, MPI_INT, 0, MPI_COMM_WORLD ); 517 MPI_Bcast( &MasterHeaderSize, 1, MPI_INT, 0, MPI_COMM_WORLD ); 518 phprintf("Info queryphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 519 } 520 521 /** 522 * This function computes the right master header size (round to size of 2^n). 523 * This is only needed for file format version 1 in "write" mode. 524 */ 525 int computeMHSize(int nfields, int nppf, int version) { 526 int mhsize=0; 527 if(version == 1) { 528 //int meta_info_size = (2+nfields+1) * MAX_FIELDS_NAME_LENGTH; // 2 is MPI_IO_TAG and nFields, the others 1 is nppf 529 int meta_info_size = VERSION_INFO_HEADER_SIZE; 530 int actual_size = nfields * nppf * sizeof(unsigned long) + meta_info_size; 531 //printf("actual_size = %d, offset table size = %d\n", actual_size, nfields * nppf * sizeof(long long)); 532 if (actual_size > DefaultMHSize) { 533 mhsize = (int) ceil( (double) actual_size/DefaultMHSize); // it's rounded to ceiling of this value 534 mhsize *= DefaultMHSize; 535 } 536 else { 537 mhsize = DefaultMHSize; 538 } 539 } else { 540 int rank = 0; 541 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 542 if(!rank) { 543 fprintf(stderr, 544 "ERROR invalid version passed to %s... exiting\n", __func__); 545 exit(EXIT_FAILURE); 546 } 547 } 548 return mhsize; 549 } 550 551 /** 552 * Computes correct color of a rank according to number of files. 553 */ 554 extern "C" int computeColor( int myrank, int numprocs, int nfiles) { 555 int color = 556 (int)(myrank / (numprocs / nfiles)); 557 return color; 558 } 559 560 561 /** 562 * Check the file descriptor. 563 */ 564 void checkFileDescriptor(const char fctname[], int* fileDescriptor ) { 565 if ( *fileDescriptor < 0 ) { 566 printf("Error: File descriptor = %d in %s\n",*fileDescriptor,fctname); 567 exit(1); 568 } 569 } 570 571 /** 572 * Initialize the file struct members and allocate space for file struct 573 * buffers. 574 * 575 * Note: this function is only called when we are using new format. Old POSIX 576 * format should skip this routine and call openfile() directly instead. 577 */ 578 int initphmpiio( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[]) 579 { 580 // we init irank again in case query not called (e.g. syncIO write case) 581 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 582 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 583 584 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d, nfields %d, nppf %d, nfiles %d\n", irank, MasterHeaderSize, *nfields, *nppf, *nfiles); 585 586 double timer_start, timer_end; 587 startTimer(&timer_start); 588 589 char* imode = StringStripper( mode ); 590 591 // Note: if it's read, we presume query was called prior to init and 592 // MasterHeaderSize is already set to correct value from parsing header 593 // otherwise it's write then it needs some computation to be set 594 if ( cscompare( "read", imode ) ) { 595 // do nothing 596 } 597 else if( cscompare( "write", imode ) ) { 598 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 599 } 600 else { 601 printf("Error initphmpiio: can't recognize the mode %s", imode); 602 exit(1); 603 } 604 free ( imode ); 605 606 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 607 608 int i, j; 609 610 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 611 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 612 endTimer(&timer_end); 613 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 614 return MAX_PHASTA_FILES_EXCEEDED; 615 } 616 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 617 // { 618 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 619 // { 620 // PhastaIOActiveFiles[i] = NULL; 621 // } 622 // } 623 624 625 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 626 627 i = PhastaIONextActiveIndex; 628 PhastaIONextActiveIndex++; 629 630 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 631 632 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 633 634 PhastaIOActiveFiles[i]->Wrong_Endian = false; 635 636 PhastaIOActiveFiles[i]->nFields = *nfields; 637 PhastaIOActiveFiles[i]->nPPF = *nppf; 638 PhastaIOActiveFiles[i]->nFiles = *nfiles; 639 MPI_Comm_rank(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->myrank)); 640 MPI_Comm_size(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->numprocs)); 641 642 643 if( *nfiles > 1 ) { // split the ranks according to each mpiio file 644 645 if ( s_assign_local_comm == 0) { // call mpi_comm_split for the first (and only) time 646 647 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Building subcommunicator\n"); 648 649 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 650 MPI_Comm_split(MPI_COMM_WORLD, 651 color, 652 PhastaIOActiveFiles[i]->myrank, 653 &(PhastaIOActiveFiles[i]->local_comm)); 654 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 655 &(PhastaIOActiveFiles[i]->local_numprocs)); 656 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 657 &(PhastaIOActiveFiles[i]->local_myrank)); 658 659 // back up now these variables so that we do not need to call comm_split again 660 s_local_comm = PhastaIOActiveFiles[i]->local_comm; 661 s_local_size = PhastaIOActiveFiles[i]->local_numprocs; 662 s_local_rank = PhastaIOActiveFiles[i]->local_myrank; 663 s_assign_local_comm = 1; 664 } 665 else { // recycle the subcommunicator 666 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Recycling subcommunicator\n"); 667 PhastaIOActiveFiles[i]->local_comm = s_local_comm; 668 PhastaIOActiveFiles[i]->local_numprocs = s_local_size; 669 PhastaIOActiveFiles[i]->local_myrank = s_local_rank; 670 } 671 } 672 else { // *nfiles == 1 here - no need to call mpi_comm_split here 673 674 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Bypassing subcommunicator\n"); 675 PhastaIOActiveFiles[i]->local_comm = MPI_COMM_WORLD; 676 PhastaIOActiveFiles[i]->local_numprocs = PhastaIOActiveFiles[i]->numprocs; 677 PhastaIOActiveFiles[i]->local_myrank = PhastaIOActiveFiles[i]->myrank; 678 679 } 680 681 PhastaIOActiveFiles[i]->nppp = 682 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 683 684 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 685 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 686 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 687 688 PhastaIOActiveFiles[i]->my_offset_table = 689 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 690 691 PhastaIOActiveFiles[i]->my_read_table = 692 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 693 694 for (j=0; j<*nfields; j++) 695 { 696 PhastaIOActiveFiles[i]->my_offset_table[j] = 697 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 698 699 PhastaIOActiveFiles[i]->my_read_table[j] = 700 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 701 } 702 *filehandle = i; 703 704 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 705 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 706 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 707 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 708 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 709 710 // Time monitoring 711 endTimer(&timer_end); 712 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 713 714 phprintf_0("Info initphmpiio: quiting function"); 715 716 return i; 717 } 718 719 /** 720 * Destruct the file struct and free buffers allocated in init function. 721 */ 722 void finalizephmpiio( int *fileDescriptor ) 723 { 724 double timer_start, timer_end; 725 startTimer(&timer_start); 726 727 int i, j; 728 i = *fileDescriptor; 729 //PhastaIONextActiveIndex--; 730 731 /* //free the offset table for this phasta file */ 732 //for(j=0; j<MAX_FIELDS_NUMBER; j++) //Danger: undefined behavior for my_*_table.[j] not allocated or not initialized to NULL 733 for(j=0; j<PhastaIOActiveFiles[i]->nFields; j++) 734 { 735 free( PhastaIOActiveFiles[i]->my_offset_table[j]); 736 free( PhastaIOActiveFiles[i]->my_read_table[j]); 737 } 738 free ( PhastaIOActiveFiles[i]->my_offset_table ); 739 free ( PhastaIOActiveFiles[i]->my_read_table ); 740 free ( PhastaIOActiveFiles[i]->master_header ); 741 free ( PhastaIOActiveFiles[i]->double_chunk ); 742 free ( PhastaIOActiveFiles[i]->int_chunk ); 743 free ( PhastaIOActiveFiles[i]->read_double_chunk ); 744 free ( PhastaIOActiveFiles[i]->read_int_chunk ); 745 746 if( PhastaIOActiveFiles[i]->nFiles > 1 && s_assign_local_comm ) { // the comm was split 747 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Freeing subcommunicator\n"); 748 s_assign_local_comm = 0; 749 MPI_Comm_free(&(PhastaIOActiveFiles[i]->local_comm)); 750 } 751 752 free( PhastaIOActiveFiles[i]); 753 754 endTimer(&timer_end); 755 printPerf("finalizempiio", timer_start, timer_end, 0, 0, ""); 756 757 PhastaIONextActiveIndex--; 758 } 759 760 761 /** 762 * Special init for M2N in order to create a subcommunicator for the reduced solution (requires PRINT_PERF to be false for now) 763 * Initialize the file struct members and allocate space for file struct buffers. 764 * 765 * Note: this function is only called when we are using new format. Old POSIX 766 * format should skip this routine and call openfile() directly instead. 767 */ 768 int initphmpiiosub( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[],MPI_Comm my_local_comm) 769 { 770 // we init irank again in case query not called (e.g. syncIO write case) 771 772 MPI_Comm_rank(my_local_comm, &irank); 773 MPI_Comm_size(my_local_comm, &mysize); 774 775 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 776 777 double timer_start, timer_end; 778 startTimer(&timer_start); 779 780 char* imode = StringStripper( mode ); 781 782 // Note: if it's read, we presume query was called prior to init and 783 // MasterHeaderSize is already set to correct value from parsing header 784 // otherwise it's write then it needs some computation to be set 785 if ( cscompare( "read", imode ) ) { 786 // do nothing 787 } 788 else if( cscompare( "write", imode ) ) { 789 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 790 } 791 else { 792 printf("Error initphmpiio: can't recognize the mode %s", imode); 793 exit(1); 794 } 795 free ( imode ); 796 797 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 798 799 int i, j; 800 801 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 802 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 803 endTimer(&timer_end); 804 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 805 return MAX_PHASTA_FILES_EXCEEDED; 806 } 807 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 808 // { 809 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 810 // { 811 // PhastaIOActiveFiles[i] = NULL; 812 // } 813 // } 814 815 816 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 817 818 i = PhastaIONextActiveIndex; 819 PhastaIONextActiveIndex++; 820 821 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 822 823 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 824 825 PhastaIOActiveFiles[i]->Wrong_Endian = false; 826 827 PhastaIOActiveFiles[i]->nFields = *nfields; 828 PhastaIOActiveFiles[i]->nPPF = *nppf; 829 PhastaIOActiveFiles[i]->nFiles = *nfiles; 830 MPI_Comm_rank(my_local_comm, &(PhastaIOActiveFiles[i]->myrank)); 831 MPI_Comm_size(my_local_comm, &(PhastaIOActiveFiles[i]->numprocs)); 832 833 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 834 MPI_Comm_split(my_local_comm, 835 color, 836 PhastaIOActiveFiles[i]->myrank, 837 &(PhastaIOActiveFiles[i]->local_comm)); 838 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 839 &(PhastaIOActiveFiles[i]->local_numprocs)); 840 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 841 &(PhastaIOActiveFiles[i]->local_myrank)); 842 PhastaIOActiveFiles[i]->nppp = 843 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 844 845 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 846 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 847 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 848 849 PhastaIOActiveFiles[i]->my_offset_table = 850 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 851 852 PhastaIOActiveFiles[i]->my_read_table = 853 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 854 855 for (j=0; j<*nfields; j++) 856 { 857 PhastaIOActiveFiles[i]->my_offset_table[j] = 858 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 859 860 PhastaIOActiveFiles[i]->my_read_table[j] = 861 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 862 } 863 *filehandle = i; 864 865 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 866 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 867 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 868 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 869 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 870 871 // Time monitoring 872 endTimer(&timer_end); 873 printPerf("initphmpiiosub", timer_start, timer_end, 0, 0, ""); 874 875 phprintf_0("Info initphmpiiosub: quiting function"); 876 877 return i; 878 } 879 880 namespace { 881 882 enum { 883 DIR_MODE = S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH 884 }; 885 886 bool my_mkdir(const char* name) { 887 errno = 0; 888 int err = mkdir(name, DIR_MODE); 889 if ((err == -1) && (errno == EEXIST)) { 890 errno = 0; 891 err = 0; 892 return false; 893 } 894 assert(!err); 895 return true; 896 } 897 898 enum { 899 DIR_FANOUT = 2048 900 }; 901 902 std::string getSubDirPrefix() { 903 if (phio_peers() <= DIR_FANOUT) 904 return string(""); 905 int self = phio_self(); 906 int subSelf = self % DIR_FANOUT; 907 int subGroup = self / DIR_FANOUT; 908 std::stringstream ss; 909 ss << subGroup << '/'; 910 return ss.str(); 911 } 912 } 913 914 /** open file for both POSIX and MPI-IO syncIO format. 915 * 916 * If it's old POSIX format, simply call posix fopen(). 917 * 918 * If it's MPI-IO foramt: 919 * in "read" mode, it builds the header table that points to the offset of 920 * fields for parts; 921 * in "write" mode, it opens the file with MPI-IO open routine. 922 */ 923 void openfile(const char filename[], const char mode[], int* fileDescriptor ) 924 { 925 phprintf_0("Info: entering openfile"); 926 927 double timer_start, timer_end; 928 startTimer(&timer_start); 929 930 if ( PhastaIONextActiveIndex == 0 ) 931 { 932 FILE* file=NULL ; 933 *fileDescriptor = 0; 934 char* fname = StringStripper( filename ); 935 char* imode = StringStripper( mode ); 936 937 std::string posixname = getSubDirPrefix(); 938 if (!phio_self()) 939 my_mkdir(posixname.c_str()); 940 phio_barrier(); 941 posixname += string(fname); 942 phioTime t0,t1; 943 phastaio_time(&t0); 944 if ( cscompare( "read", imode ) ) 945 file = fopen(posixname.c_str(), "rb" ); 946 else if( cscompare( "write", imode ) ) 947 file = fopen(posixname.c_str(), "wb" ); 948 else if( cscompare( "append", imode ) ) 949 file = fopen(posixname.c_str(), "ab" ); 950 phastaio_time(&t1); 951 const size_t elapsed = phastaio_time_diff(&t0,&t1); 952 phastaio_addOpenTime(elapsed); 953 954 if ( !file ){ 955 fprintf(stderr,"Error openfile: unable to open file %s\n",fname); 956 } else { 957 fileArray.push_back( file ); 958 byte_order.push_back( false ); 959 header_type.push_back( sizeof(int) ); 960 *fileDescriptor = fileArray.size(); 961 } 962 free (fname); 963 free (imode); 964 } 965 else // else it would be parallel I/O, opposed to posix io 966 { 967 char* fname = StringStripper( filename ); 968 char* imode = StringStripper( mode ); 969 int rc; 970 int i = *fileDescriptor; 971 checkFileDescriptor("openfile",&i); 972 char* token; 973 974 if ( cscompare( "read", imode ) ) 975 { 976 // if (PhastaIOActiveFiles[i]->myrank == 0) 977 // printf("\n **********\nRead open ... ... regular version\n"); 978 979 phioTime t0,t1; 980 phastaio_time(&t0); 981 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 982 fname, 983 MPI_MODE_RDONLY, 984 MPI_INFO_NULL, 985 &(PhastaIOActiveFiles[i]->file_handle) ); 986 phastaio_time(&t1); 987 const size_t elapsed = phastaio_time_diff(&t0,&t1); 988 phastaio_addOpenTime(elapsed); 989 990 if(rc) 991 { 992 *fileDescriptor = UNABLE_TO_OPEN_FILE; 993 int error_string_length; 994 char error_string[4096]; 995 MPI_Error_string(rc, error_string, &error_string_length); 996 fprintf(stderr, "Error openfile: Unable to open file %s! MPI reports \"%s\"\n",fname,error_string); 997 endTimer(&timer_end); 998 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 999 return; 1000 } 1001 1002 MPI_Status read_tag_status; 1003 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1004 int j; 1005 int magic_number; 1006 1007 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1008 MPI_File_read_at( PhastaIOActiveFiles[i]->file_handle, 1009 0, 1010 PhastaIOActiveFiles[i]->master_header, 1011 MasterHeaderSize, 1012 MPI_CHAR, 1013 &read_tag_status ); 1014 } 1015 1016 MPI_Bcast( PhastaIOActiveFiles[i]->master_header, 1017 MasterHeaderSize, 1018 MPI_CHAR, 1019 0, 1020 PhastaIOActiveFiles[i]->local_comm ); 1021 1022 memcpy( read_out_tag, 1023 PhastaIOActiveFiles[i]->master_header, 1024 MAX_FIELDS_NAME_LENGTH-1 ); 1025 1026 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) 1027 { 1028 // Test endianess ... 1029 memcpy ( &magic_number, 1030 PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1031 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 1032 1033 if ( magic_number != ENDIAN_TEST_NUMBER ) 1034 { 1035 PhastaIOActiveFiles[i]->Wrong_Endian = true; 1036 } 1037 1038 memcpy( read_out_tag, 1039 PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH+1, // TODO: WHY +1??? 1040 MAX_FIELDS_NAME_LENGTH ); 1041 1042 // Read in # fields ... 1043 token = strtok ( read_out_tag, ":" ); 1044 token = strtok( NULL," ,;<>" ); 1045 PhastaIOActiveFiles[i]->nFields = atoi( token ); 1046 1047 unsigned long **header_table; 1048 header_table = ( unsigned long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long *)); 1049 1050 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1051 { 1052 header_table[j]=( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long)); 1053 } 1054 1055 // Read in the offset table ... 1056 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1057 { 1058 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1059 memcpy( header_table[j], 1060 PhastaIOActiveFiles[i]->master_header + 1061 VERSION_INFO_HEADER_SIZE + 1062 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long), 1063 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long) ); 1064 } 1065 1066 MPI_Scatter( header_table[j], 1067 PhastaIOActiveFiles[i]->nppp, 1068 MPI_LONG_LONG_INT, 1069 PhastaIOActiveFiles[i]->my_read_table[j], 1070 PhastaIOActiveFiles[i]->nppp, 1071 MPI_LONG_LONG_INT, 1072 0, 1073 PhastaIOActiveFiles[i]->local_comm ); 1074 1075 // Swap byte order if endianess is different ... 1076 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) { 1077 SwapArrayByteOrder( PhastaIOActiveFiles[i]->my_read_table[j], 1078 sizeof(unsigned long), 1079 PhastaIOActiveFiles[i]->nppp ); 1080 } 1081 } 1082 1083 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1084 free ( header_table[j] ); 1085 } 1086 free (header_table); 1087 1088 } // end of if MPI_IO_TAG 1089 else //else not valid MPI file 1090 { 1091 *fileDescriptor = NOT_A_MPI_FILE; 1092 printf("Error openfile: The file %s you opened is not in syncIO new format, please check again! File descriptor = %d, MasterHeaderSize = %d, read_out_tag = %s\n",fname,*fileDescriptor,MasterHeaderSize,read_out_tag); 1093 //Printing MasterHeaderSize is useful to test a compiler bug on Intrepid BGP 1094 endTimer(&timer_end); 1095 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1096 return; 1097 } 1098 } // end of if "read" 1099 else if( cscompare( "write", imode ) ) 1100 { 1101 phioTime t0,t1; 1102 phastaio_time(&t0); 1103 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1104 fname, 1105 MPI_MODE_WRONLY | MPI_MODE_CREATE, 1106 MPI_INFO_NULL, 1107 &(PhastaIOActiveFiles[i]->file_handle) ); 1108 phastaio_time(&t1); 1109 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1110 phastaio_addOpenTime(elapsed); 1111 if(rc != MPI_SUCCESS) 1112 { 1113 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1114 int error_string_length; 1115 char error_string[4096]; 1116 MPI_Error_string(rc, error_string, &error_string_length); 1117 fprintf(stderr, "Error openfile: Unable to open file %s! MPI reports \"%s\"\n",fname,error_string); 1118 return; 1119 } 1120 } // end of if "write" 1121 free (fname); 1122 free (imode); 1123 } // end of if FileIndex != 0 1124 1125 endTimer(&timer_end); 1126 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1127 } 1128 1129 /** close file for both POSIX and MPI-IO syncIO format. 1130 * 1131 * If it's old POSIX format, simply call posix fclose(). 1132 * 1133 * If it's MPI-IO foramt: 1134 * in "read" mode, it simply close file with MPI-IO close routine. 1135 * in "write" mode, rank 0 in each group will re-assemble the master header and 1136 * offset table and write to the beginning of file, then close the file. 1137 */ 1138 void closefile( int* fileDescriptor, const char mode[] ) 1139 { 1140 double timer_start, timer_end; 1141 startTimer(&timer_start); 1142 1143 int i = *fileDescriptor; 1144 checkFileDescriptor("closefile",&i); 1145 1146 if ( PhastaIONextActiveIndex == 0 ) { 1147 char* imode = StringStripper( mode ); 1148 1149 if( cscompare( "write", imode ) 1150 || cscompare( "append", imode ) ) { 1151 fflush( fileArray[ *fileDescriptor - 1 ] ); 1152 } 1153 1154 phioTime t0,t1; 1155 phastaio_time(&t0); 1156 fclose( fileArray[ *fileDescriptor - 1 ] ); 1157 phastaio_time(&t1); 1158 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1159 phastaio_addCloseTime(elapsed); 1160 free (imode); 1161 } 1162 else { 1163 char* imode = StringStripper( mode ); 1164 1165 //write master header here: 1166 if ( cscompare( "write", imode ) ) { 1167 // if ( PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields < 2*ONE_MEGABYTE/8 ) //SHOULD BE CHECKED 1168 // MasterHeaderSize = 4*ONE_MEGABYTE; 1169 // else 1170 // MasterHeaderSize = 4*ONE_MEGABYTE + PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields * 8 - 2*ONE_MEGABYTE; 1171 1172 MasterHeaderSize = computeMHSize( PhastaIOActiveFiles[i]->nFields, PhastaIOActiveFiles[i]->nPPF, LATEST_WRITE_VERSION); 1173 phprintf_0("Info closefile: myrank = %d, MasterHeaderSize = %d\n", PhastaIOActiveFiles[i]->myrank, MasterHeaderSize); 1174 1175 MPI_Status write_header_status; 1176 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1177 char version[MAX_FIELDS_NAME_LENGTH/4]; 1178 char mhsize[MAX_FIELDS_NAME_LENGTH/4]; 1179 int magic_number = ENDIAN_TEST_NUMBER; 1180 1181 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) 1182 { 1183 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1184 sprintf(mpi_tag, "MPI_IO_Tag : "); 1185 memcpy(PhastaIOActiveFiles[i]->master_header, 1186 mpi_tag, 1187 MAX_FIELDS_NAME_LENGTH); 1188 1189 bzero((void*)version,MAX_FIELDS_NAME_LENGTH/4); 1190 // this version is "1", print version in ASCII 1191 sprintf(version, "version : %d",1); 1192 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/2, 1193 version, 1194 MAX_FIELDS_NAME_LENGTH/4); 1195 1196 // master header size is computed using the formula above 1197 bzero((void*)mhsize,MAX_FIELDS_NAME_LENGTH/4); 1198 sprintf(mhsize, "mhsize : "); 1199 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/4*3, 1200 mhsize, 1201 MAX_FIELDS_NAME_LENGTH/4); 1202 1203 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1204 sprintf(mpi_tag, 1205 "\nnFields : %d\n", 1206 PhastaIOActiveFiles[i]->nFields); 1207 memcpy(PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH, 1208 mpi_tag, 1209 MAX_FIELDS_NAME_LENGTH); 1210 1211 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1212 sprintf(mpi_tag, "\nnPPF : %d\n", PhastaIOActiveFiles[i]->nPPF); 1213 memcpy( PhastaIOActiveFiles[i]->master_header+ 1214 PhastaIOActiveFiles[i]->nFields * 1215 MAX_FIELDS_NAME_LENGTH + 1216 MAX_FIELDS_NAME_LENGTH * 2, 1217 mpi_tag, 1218 MAX_FIELDS_NAME_LENGTH); 1219 1220 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1221 &magic_number, 1222 sizeof(int)); 1223 1224 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("mhsize : ") -1 + MAX_FIELDS_NAME_LENGTH/4*3, 1225 &MasterHeaderSize, 1226 sizeof(int)); 1227 } 1228 1229 int j = 0; 1230 unsigned long **header_table; 1231 header_table = ( unsigned long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long *)); 1232 1233 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1234 header_table[j]=( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long)); 1235 } 1236 1237 //if( irank == 0 ) printf("gonna mpi_gather, myrank = %d\n", irank); 1238 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1239 MPI_Gather( PhastaIOActiveFiles[i]->my_offset_table[j], 1240 PhastaIOActiveFiles[i]->nppp, 1241 MPI_LONG_LONG_INT, 1242 header_table[j], 1243 PhastaIOActiveFiles[i]->nppp, 1244 MPI_LONG_LONG_INT, 1245 0, 1246 PhastaIOActiveFiles[i]->local_comm ); 1247 } 1248 1249 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1250 1251 //if( irank == 0 ) printf("gonna memcpy for every procs, myrank = %d\n", irank); 1252 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1253 memcpy ( PhastaIOActiveFiles[i]->master_header + 1254 VERSION_INFO_HEADER_SIZE + 1255 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long), 1256 header_table[j], 1257 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long) ); 1258 } 1259 1260 //if( irank == 0 ) printf("gonna file_write_at(), myrank = %d\n", irank); 1261 MPI_File_write_at( PhastaIOActiveFiles[i]->file_handle, 1262 0, 1263 PhastaIOActiveFiles[i]->master_header, 1264 MasterHeaderSize, 1265 MPI_CHAR, 1266 &write_header_status ); 1267 } 1268 1269 ////free(PhastaIOActiveFiles[i]->master_header); 1270 1271 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1272 free ( header_table[j] ); 1273 } 1274 free (header_table); 1275 } 1276 1277 //if( irank == 0 ) printf("gonna file_close(), myrank = %d\n", irank); 1278 phioTime t0,t1; 1279 phastaio_time(&t0); 1280 MPI_File_close( &( PhastaIOActiveFiles[i]->file_handle ) ); 1281 phastaio_time(&t1); 1282 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1283 phastaio_addCloseTime(elapsed); 1284 free ( imode ); 1285 } 1286 1287 endTimer(&timer_end); 1288 printPerf("closefile_", timer_start, timer_end, 0, 0, ""); 1289 } 1290 1291 int readHeader( FILE* f, const char phrase[], 1292 int* params, int numParams, const char* iotype) { 1293 isBinary(iotype); 1294 return readHeader(f,phrase,params,numParams); 1295 } 1296 1297 void readheader( 1298 int* fileDescriptor, 1299 const char keyphrase[], 1300 void* valueArray, 1301 int* nItems, 1302 const char datatype[], 1303 const char iotype[] ) 1304 { 1305 double timer_start, timer_end; 1306 1307 startTimer(&timer_start); 1308 1309 int i = *fileDescriptor; 1310 checkFileDescriptor("readheader",&i); 1311 1312 if ( PhastaIONextActiveIndex == 0 ) { 1313 int filePtr = *fileDescriptor - 1; 1314 FILE* fileObject; 1315 int* valueListInt; 1316 1317 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1318 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1319 fprintf(stderr,"openfile_ function has to be called before \n") ; 1320 fprintf(stderr,"acessing the file\n ") ; 1321 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1322 endTimer(&timer_end); 1323 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1324 return; 1325 } 1326 1327 LastHeaderKey[filePtr] = std::string(keyphrase); 1328 LastHeaderNotFound = false; 1329 1330 fileObject = fileArray[ filePtr ] ; 1331 Wrong_Endian = byte_order[ filePtr ]; 1332 1333 isBinary( iotype ); 1334 typeSize( datatype ); //redundant call, just avoid a compiler warning. 1335 1336 // right now we are making the assumption that we will only write integers 1337 // on the header line. 1338 1339 valueListInt = static_cast< int* >( valueArray ); 1340 int ierr = readHeader( fileObject , 1341 keyphrase, 1342 valueListInt, 1343 *nItems ) ; 1344 1345 byte_order[ filePtr ] = Wrong_Endian ; 1346 1347 if ( ierr ) LastHeaderNotFound = true; 1348 1349 //return ; // don't return, go to the end to print perf 1350 } 1351 else { 1352 int* valueListInt; 1353 valueListInt = static_cast <int*>(valueArray); 1354 char* token = NULL; 1355 bool FOUND = false ; 1356 isBinary( iotype ); 1357 1358 MPI_Status read_offset_status; 1359 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1360 memset(read_out_tag, '\0', MAX_FIELDS_NAME_LENGTH); 1361 char readouttag[MAX_FIELDS_NUMBER][MAX_FIELDS_NAME_LENGTH]; 1362 int j; 1363 1364 int string_length = strlen( keyphrase ); 1365 char* buffer = (char*) malloc ( string_length+1 ); 1366 1367 strcpy ( buffer, keyphrase ); 1368 buffer[ string_length ] = '\0'; 1369 1370 char* st2 = strtok ( buffer, "@" ); 1371 st2 = strtok (NULL, "@"); 1372 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1373 if ( char* p = strpbrk(buffer, "@") ) 1374 *p = '\0'; 1375 1376 // Check if the user has input the right GPid 1377 if ( ( PhastaIOActiveFiles[i]->GPid <= 1378 PhastaIOActiveFiles[i]->myrank * 1379 PhastaIOActiveFiles[i]->nppp )|| 1380 ( PhastaIOActiveFiles[i]->GPid > 1381 ( PhastaIOActiveFiles[i]->myrank + 1 ) * 1382 PhastaIOActiveFiles[i]->nppp ) ) 1383 { 1384 *fileDescriptor = NOT_A_MPI_FILE; 1385 printf("Error readheader: The file is not in syncIO new format, please check! myrank = %d, GPid = %d, nppp = %d, keyphrase = %s\n", PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->GPid, PhastaIOActiveFiles[i]->nppp, keyphrase); 1386 // It is possible atoi could not generate a clear integer from st2 because of additional garbage character in keyphrase 1387 endTimer(&timer_end); 1388 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1389 return; 1390 } 1391 1392 // Find the field we want ... 1393 //for ( j = 0; j<MAX_FIELDS_NUMBER; j++ ) 1394 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1395 { 1396 memcpy( readouttag[j], 1397 PhastaIOActiveFiles[i]->master_header + j*MAX_FIELDS_NAME_LENGTH+MAX_FIELDS_NAME_LENGTH*2+1, 1398 MAX_FIELDS_NAME_LENGTH-1 ); 1399 } 1400 1401 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1402 { 1403 token = strtok ( readouttag[j], ":" ); 1404 1405 //if ( cscompare( buffer, token ) ) 1406 if ( cscompare( token , buffer ) && cscompare( buffer, token ) ) 1407 // This double comparison is required for the field "number of nodes" and all the other fields that start with "number of nodes" (i.g. number of nodes in the mesh"). 1408 // Would be safer to rename "number of nodes" by "number of nodes in the part" so that the name are completely unique. But much more work to do that (Nspre, phParAdapt, etc). 1409 // Since the field name are unique in SyncIO (as it includes part ID), this should be safe and there should be no issue with the "?" trailing character. 1410 { 1411 PhastaIOActiveFiles[i]->read_field_count = j; 1412 FOUND = true; 1413 //printf("buffer: %s | token: %s | j: %d\n",buffer,token,j); 1414 break; 1415 } 1416 } 1417 free(buffer); 1418 1419 if (!FOUND) 1420 { 1421 //if(irank==0) printf("Warning readheader: Not found %s \n",keyphrase); //PhastaIOActiveFiles[i]->myrank is certainly initialized here. 1422 if(PhastaIOActiveFiles[i]->myrank == 0) printf("WARNING readheader: Not found %s\n",keyphrase); 1423 endTimer(&timer_end); 1424 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1425 return; 1426 } 1427 1428 // Find the part we want ... 1429 PhastaIOActiveFiles[i]->read_part_count = PhastaIOActiveFiles[i]->GPid - 1430 PhastaIOActiveFiles[i]->myrank * PhastaIOActiveFiles[i]->nppp - 1; 1431 1432 PhastaIOActiveFiles[i]->my_offset = 1433 PhastaIOActiveFiles[i]->my_read_table[PhastaIOActiveFiles[i]->read_field_count][PhastaIOActiveFiles[i]->read_part_count]; 1434 1435 // printf("****Rank %d offset is %d\n",PhastaIOActiveFiles[i]->myrank,PhastaIOActiveFiles[i]->my_offset); 1436 1437 // Read each datablock header here ... 1438 1439 MPI_File_read_at_all( PhastaIOActiveFiles[i]->file_handle, 1440 PhastaIOActiveFiles[i]->my_offset+1, 1441 read_out_tag, 1442 MAX_FIELDS_NAME_LENGTH-1, 1443 MPI_CHAR, 1444 &read_offset_status ); 1445 token = strtok ( read_out_tag, ":" ); 1446 1447 // printf("&&&&Rank %d read_out_tag is %s\n",PhastaIOActiveFiles[i]->myrank,read_out_tag); 1448 1449 if( cscompare( keyphrase , token ) ) //No need to compare also token with keyphrase like above. We should already have the right one. Otherwise there is a problem. 1450 { 1451 FOUND = true ; 1452 token = strtok( NULL, " ,;<>" ); 1453 for( j=0; j < *nItems && ( token = strtok( NULL," ,;<>") ); j++ ) 1454 valueListInt[j] = atoi( token ); 1455 1456 if ( j < *nItems ) 1457 { 1458 fprintf( stderr, "Expected # of ints not found for: %s\n", keyphrase ); 1459 } 1460 } 1461 else { 1462 //if(irank==0) 1463 if(PhastaIOActiveFiles[i]->myrank == 0) 1464 // If we enter this if, there is a problem with the name of some fields 1465 { 1466 printf("Error readheader: Unexpected mismatch between keyphrase = %s and token = %s\n",keyphrase,token); 1467 } 1468 } 1469 } 1470 1471 endTimer(&timer_end); 1472 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1473 1474 } 1475 1476 void readDataBlock( 1477 FILE* fileObject, 1478 void* valueArray, 1479 int nItems, 1480 const char datatype[], 1481 const char iotype[] ) 1482 { 1483 isBinary(iotype); 1484 size_t type_size = typeSize( datatype ); 1485 phioTime t0,t1; 1486 phastaio_time(&t0); 1487 if ( binary_format ) { 1488 char junk = '\0'; 1489 fread( valueArray, type_size, nItems, fileObject ); 1490 fread( &junk, sizeof(char), 1 , fileObject ); 1491 if ( Wrong_Endian ) SwapArrayByteOrder( valueArray, type_size, nItems ); 1492 } else { 1493 char* ts1 = StringStripper( datatype ); 1494 if ( cscompare( "integer", ts1 ) ) { 1495 for( int n=0; n < nItems ; n++ ) 1496 fscanf(fileObject, "%d\n",(int*)((int*)valueArray+n) ); 1497 } else if ( cscompare( "double", ts1 ) ) { 1498 for( int n=0; n < nItems ; n++ ) 1499 fscanf(fileObject, "%lf\n",(double*)((double*)valueArray+n) ); 1500 } 1501 free (ts1); 1502 } 1503 phastaio_time(&t1); 1504 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1505 phastaio_addReadTime(elapsed); 1506 phastaio_addReadBytes(nItems*type_size); 1507 } 1508 1509 void readdatablock( 1510 int* fileDescriptor, 1511 const char keyphrase[], 1512 void* valueArray, 1513 int* nItems, 1514 const char datatype[], 1515 const char iotype[] ) 1516 { 1517 //if(irank == 0) printf("entering readdatablock()\n"); 1518 unsigned long data_size = 0; 1519 double timer_start, timer_end; 1520 startTimer(&timer_start); 1521 1522 int i = *fileDescriptor; 1523 checkFileDescriptor("readdatablock",&i); 1524 1525 if ( PhastaIONextActiveIndex == 0 ) { 1526 int filePtr = *fileDescriptor - 1; 1527 FILE* fileObject; 1528 1529 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1530 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1531 fprintf(stderr,"openfile_ function has to be called before\n") ; 1532 fprintf(stderr,"acessing the file\n ") ; 1533 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1534 endTimer(&timer_end); 1535 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1536 return; 1537 } 1538 1539 // error check.. 1540 // since we require that a consistant header always preceed the data block 1541 // let us check to see that it is actually the case. 1542 1543 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1544 fprintf(stderr, "Header not consistant with data block\n"); 1545 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1546 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1547 fprintf(stderr, "Please recheck read sequence \n"); 1548 if( Strict_Error ) { 1549 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1550 endTimer(&timer_end); 1551 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1552 return; 1553 } 1554 } 1555 1556 if ( LastHeaderNotFound ) { 1557 endTimer(&timer_end); 1558 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1559 return; 1560 } 1561 fileObject = fileArray[ filePtr ]; 1562 Wrong_Endian = byte_order[ filePtr ]; 1563 LastHeaderKey.erase(filePtr); 1564 readDataBlock(fileObject,valueArray,*nItems,datatype,iotype); 1565 1566 //return; 1567 } 1568 else { 1569 // printf("read data block\n"); 1570 MPI_Status read_data_status; 1571 size_t type_size = typeSize( datatype ); 1572 int nUnits = *nItems; 1573 isBinary( iotype ); 1574 1575 // read datablock then 1576 //MR CHANGE 1577 // if ( cscompare ( datatype, "double")) 1578 char* ts2 = StringStripper( datatype ); 1579 if ( cscompare ( "double" , ts2)) 1580 //MR CHANGE END 1581 { 1582 1583 phioTime t0,t1; 1584 phastaio_time(&t0); 1585 MPI_File_read_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1586 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1587 valueArray, 1588 nUnits, 1589 MPI_DOUBLE ); 1590 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1591 valueArray, 1592 &read_data_status ); 1593 data_size=8*nUnits; 1594 phastaio_time(&t1); 1595 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1596 phastaio_addReadTime(elapsed); 1597 phastaio_addReadBytes(nUnits*sizeof(double)); 1598 } 1599 //MR CHANGE 1600 // else if ( cscompare ( datatype, "integer")) 1601 else if ( cscompare ( "integer" , ts2)) 1602 //MR CHANGE END 1603 { 1604 phioTime t0,t1; 1605 phastaio_time(&t0); 1606 MPI_File_read_at_all_begin(PhastaIOActiveFiles[i]->file_handle, 1607 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1608 valueArray, 1609 nUnits, 1610 MPI_INT ); 1611 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1612 valueArray, 1613 &read_data_status ); 1614 data_size=4*nUnits; 1615 phastaio_time(&t1); 1616 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1617 phastaio_addReadTime(elapsed); 1618 phastaio_addReadBytes(nUnits*sizeof(int)); 1619 } 1620 else 1621 { 1622 *fileDescriptor = DATA_TYPE_ILLEGAL; 1623 printf("readdatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 1624 endTimer(&timer_end); 1625 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1626 return; 1627 } 1628 free(ts2); 1629 1630 1631 // printf("%d Read finishe\n",PhastaIOActiveFiles[i]->myrank); 1632 1633 // Swap data byte order if endianess is different ... 1634 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) 1635 { 1636 SwapArrayByteOrder( valueArray, type_size, nUnits ); 1637 } 1638 } 1639 1640 endTimer(&timer_end); 1641 char extra_msg[1024]; 1642 memset(extra_msg, '\0', 1024); 1643 char* key = StringStripper(keyphrase); 1644 sprintf(extra_msg, " field is %s ", key); 1645 printPerf("readdatablock", timer_start, timer_end, data_size, 1, extra_msg); 1646 free(key); 1647 1648 } 1649 1650 void writeHeader( 1651 FILE* f, 1652 const char keyphrase[], 1653 const void* valueArray, 1654 const int nItems, 1655 const int ndataItems, 1656 const char datatype[], 1657 const char iotype[]) 1658 { 1659 isBinary( iotype ); 1660 1661 const int _newline = 1662 ( ndataItems > 0 ) ? sizeof( char ) : 0; 1663 int size_of_nextblock = 1664 ( binary_format ) ? typeSize(datatype) * ndataItems + _newline : ndataItems; 1665 1666 fprintf( f, "%s : < %d > ", keyphrase, size_of_nextblock ); 1667 for( int i = 0; i < nItems; i++ ) 1668 fprintf( f, "%d ", *((int*)((int*)valueArray+i))); 1669 fprintf( f, "\n"); 1670 } 1671 1672 void writeheader( 1673 const int* fileDescriptor, 1674 const char keyphrase[], 1675 const void* valueArray, 1676 const int* nItems, 1677 const int* ndataItems, 1678 const char datatype[], 1679 const char iotype[]) 1680 { 1681 1682 //if(irank == 0) printf("entering writeheader()\n"); 1683 1684 double timer_start, timer_end; 1685 startTimer(&timer_start); 1686 1687 int i = *fileDescriptor; 1688 checkFileDescriptor("writeheader",&i); 1689 1690 if ( PhastaIONextActiveIndex == 0 ) { 1691 int filePtr = *fileDescriptor - 1; 1692 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1693 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1694 fprintf(stderr,"openfile_ function has to be called before \n") ; 1695 fprintf(stderr,"acessing the file\n ") ; 1696 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1697 endTimer(&timer_end); 1698 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1699 return; 1700 } 1701 1702 LastHeaderKey[filePtr] = std::string(keyphrase); 1703 DataSize = *ndataItems; 1704 FILE* fileObject = fileArray[ filePtr ] ; 1705 header_type[ filePtr ] = typeSize( datatype ); 1706 writeHeader(fileObject,keyphrase,valueArray,*nItems, 1707 *ndataItems,datatype,iotype); 1708 } 1709 else { // else it's parallel I/O 1710 DataSize = *ndataItems; 1711 size_t type_size = typeSize( datatype ); 1712 isBinary( iotype ); 1713 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1714 1715 int string_length = strlen( keyphrase ); 1716 char* buffer = (char*) malloc ( string_length+1 ); 1717 1718 strcpy ( buffer, keyphrase); 1719 buffer[ string_length ] = '\0'; 1720 1721 char* st2 = strtok ( buffer, "@" ); 1722 st2 = strtok (NULL, "@"); 1723 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1724 1725 if ( char* p = strpbrk(buffer, "@") ) 1726 *p = '\0'; 1727 1728 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1729 sprintf(mpi_tag, "\n%s : %d\n", buffer, PhastaIOActiveFiles[i]->field_count); 1730 unsigned long offset_value; 1731 1732 int temp = *ndataItems; 1733 unsigned long number_of_items = (unsigned long)temp; 1734 MPI_Barrier(PhastaIOActiveFiles[i]->local_comm); 1735 1736 MPI_Scan( &number_of_items, 1737 &offset_value, 1738 1, 1739 MPI_LONG_LONG_INT, 1740 MPI_SUM, 1741 PhastaIOActiveFiles[i]->local_comm ); 1742 1743 offset_value = (offset_value - number_of_items) * type_size; 1744 1745 offset_value += PhastaIOActiveFiles[i]->local_myrank * 1746 DB_HEADER_SIZE + 1747 PhastaIOActiveFiles[i]->next_start_address; 1748 // This offset is the starting address of each datablock header... 1749 PhastaIOActiveFiles[i]->my_offset = offset_value; 1750 1751 // Write in my offset table ... 1752 PhastaIOActiveFiles[i]->my_offset_table[PhastaIOActiveFiles[i]->field_count][PhastaIOActiveFiles[i]->part_count] = 1753 PhastaIOActiveFiles[i]->my_offset; 1754 1755 // Update the next-start-address ... 1756 PhastaIOActiveFiles[i]->next_start_address = offset_value + 1757 number_of_items * type_size + 1758 DB_HEADER_SIZE; 1759 MPI_Bcast( &(PhastaIOActiveFiles[i]->next_start_address), 1760 1, 1761 MPI_LONG_LONG_INT, 1762 PhastaIOActiveFiles[i]->local_numprocs-1, 1763 PhastaIOActiveFiles[i]->local_comm ); 1764 1765 // Prepare datablock header ... 1766 int _newline = (*ndataItems>0)?sizeof(char):0; 1767 unsigned int size_of_nextblock = type_size * (*ndataItems) + _newline; 1768 1769 //char datablock_header[255]; 1770 //bzero((void*)datablock_header,255); 1771 char datablock_header[DB_HEADER_SIZE]; 1772 bzero((void*)datablock_header,DB_HEADER_SIZE); 1773 1774 PhastaIOActiveFiles[i]->GPid = PhastaIOActiveFiles[i]->nppp*PhastaIOActiveFiles[i]->myrank+PhastaIOActiveFiles[i]->part_count; 1775 sprintf( datablock_header, 1776 "\n%s : < %u >", 1777 keyphrase, 1778 size_of_nextblock ); 1779 1780 for ( int j = 0; j < *nItems; j++ ) 1781 { 1782 sprintf( datablock_header, 1783 "%s %d ", 1784 datablock_header, 1785 *((int*)((int*)valueArray+j))); 1786 } 1787 sprintf( datablock_header, 1788 "%s\n ", 1789 datablock_header ); 1790 1791 // Write datablock header ... 1792 //MR CHANGE 1793 // if ( cscompare(datatype,"double") ) 1794 char* ts1 = StringStripper( datatype ); 1795 if ( cscompare("double",ts1) ) 1796 //MR CHANGE END 1797 { 1798 free ( PhastaIOActiveFiles[i]->double_chunk ); 1799 PhastaIOActiveFiles[i]->double_chunk = ( double * )malloc( (sizeof( double )*number_of_items+ DB_HEADER_SIZE)); 1800 1801 double * aa = ( double * )datablock_header; 1802 memcpy(PhastaIOActiveFiles[i]->double_chunk, aa, DB_HEADER_SIZE); 1803 } 1804 //MR CHANGE 1805 // if ( cscompare(datatype,"integer") ) 1806 else if ( cscompare("integer",ts1) ) 1807 //MR CHANGE END 1808 { 1809 free ( PhastaIOActiveFiles[i]->int_chunk ); 1810 PhastaIOActiveFiles[i]->int_chunk = ( int * )malloc( (sizeof( int )*number_of_items+ DB_HEADER_SIZE)); 1811 1812 int * aa = ( int * )datablock_header; 1813 memcpy(PhastaIOActiveFiles[i]->int_chunk, aa, DB_HEADER_SIZE); 1814 } 1815 else { 1816 // *fileDescriptor = DATA_TYPE_ILLEGAL; 1817 printf("writeheader - DATA_TYPE_ILLEGAL - %s\n",datatype); 1818 endTimer(&timer_end); 1819 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1820 return; 1821 } 1822 free(ts1); 1823 1824 PhastaIOActiveFiles[i]->part_count++; 1825 if ( PhastaIOActiveFiles[i]->part_count == PhastaIOActiveFiles[i]->nppp ) { 1826 //A new field will be written 1827 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1828 memcpy( PhastaIOActiveFiles[i]->master_header + 1829 PhastaIOActiveFiles[i]->field_count * 1830 MAX_FIELDS_NAME_LENGTH + 1831 MAX_FIELDS_NAME_LENGTH * 2, 1832 mpi_tag, 1833 MAX_FIELDS_NAME_LENGTH-1); 1834 } 1835 PhastaIOActiveFiles[i]->field_count++; 1836 PhastaIOActiveFiles[i]->part_count=0; 1837 } 1838 free(buffer); 1839 } 1840 1841 endTimer(&timer_end); 1842 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1843 } 1844 1845 void writeDataBlock( 1846 FILE* f, 1847 const void* valueArray, 1848 const int nItems, 1849 const char datatype[], 1850 const char iotype[] ) 1851 { 1852 isBinary( iotype ); 1853 size_t type_size = typeSize( datatype ); 1854 phioTime t0,t1; 1855 phastaio_time(&t0); 1856 if ( binary_format ) { 1857 fwrite( valueArray, type_size, nItems, f ); 1858 fprintf( f,"\n"); 1859 } else { 1860 char* ts1 = StringStripper( datatype ); 1861 if ( cscompare( "integer", ts1 ) ) { 1862 const int* vals = (int*) valueArray; 1863 for( int n=0; n < nItems ; n++ ) 1864 fprintf(f,"%d\n",vals[n]); 1865 } else if ( cscompare( "double", ts1 ) ) { 1866 const double* vals = (double*) valueArray; 1867 for( int n=0; n < nItems ; n++ ) 1868 fprintf(f,"%f\n",vals[n]); 1869 } 1870 free (ts1); 1871 } 1872 phastaio_time(&t1); 1873 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1874 phastaio_addWriteTime(elapsed); 1875 phastaio_addWriteBytes(nItems*type_size); 1876 } 1877 1878 void writedatablock( 1879 const int* fileDescriptor, 1880 const char keyphrase[], 1881 const void* valueArray, 1882 const int* nItems, 1883 const char datatype[], 1884 const char iotype[] ) 1885 { 1886 //if(irank == 0) printf("entering writedatablock()\n"); 1887 1888 unsigned long data_size = 0; 1889 double timer_start, timer_end; 1890 startTimer(&timer_start); 1891 1892 int i = *fileDescriptor; 1893 checkFileDescriptor("writedatablock",&i); 1894 1895 if ( PhastaIONextActiveIndex == 0 ) { 1896 int filePtr = *fileDescriptor - 1; 1897 1898 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1899 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1900 fprintf(stderr,"openfile_ function has to be called before \n") ; 1901 fprintf(stderr,"acessing the file\n ") ; 1902 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1903 endTimer(&timer_end); 1904 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1905 return; 1906 } 1907 // since we require that a consistant header always preceed the data block 1908 // let us check to see that it is actually the case. 1909 1910 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1911 fprintf(stderr, "Header not consistant with data block\n"); 1912 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1913 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1914 fprintf(stderr, "Please recheck write sequence \n"); 1915 if( Strict_Error ) { 1916 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1917 endTimer(&timer_end); 1918 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1919 return; 1920 } 1921 } 1922 1923 FILE* fileObject = fileArray[ filePtr ] ; 1924 size_t type_size=typeSize( datatype ); 1925 isBinary( iotype ); 1926 1927 LastHeaderKey.erase(filePtr); 1928 1929 if ( header_type[filePtr] != (int)type_size ) { 1930 fprintf(stderr,"header and datablock differ on typeof data in the block for\n"); 1931 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1932 if( Strict_Error ) { 1933 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1934 endTimer(&timer_end); 1935 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1936 return; 1937 } 1938 } 1939 1940 int nUnits = *nItems; 1941 1942 if ( nUnits != DataSize ) { 1943 fprintf(stderr,"header and datablock differ on number of data items for\n"); 1944 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1945 if( Strict_Error ) { 1946 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1947 endTimer(&timer_end); 1948 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1949 return; 1950 } 1951 } 1952 writeDataBlock(fileObject,valueArray,*nItems,datatype,iotype); 1953 } 1954 else { // syncIO case 1955 MPI_Status write_data_status; 1956 isBinary( iotype ); 1957 int nUnits = *nItems; 1958 1959 //MR CHANGE 1960 // if ( cscompare(datatype,"double") ) 1961 char* ts1 = StringStripper( datatype ); 1962 if ( cscompare("double",ts1) ) 1963 //MR CHANGE END 1964 { 1965 memcpy((PhastaIOActiveFiles[i]->double_chunk+DB_HEADER_SIZE/sizeof(double)), valueArray, nUnits*sizeof(double)); 1966 phioTime t0,t1; 1967 phastaio_time(&t0); 1968 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1969 PhastaIOActiveFiles[i]->my_offset, 1970 PhastaIOActiveFiles[i]->double_chunk, 1971 //BLOCK_SIZE/sizeof(double), 1972 nUnits+DB_HEADER_SIZE/sizeof(double), 1973 MPI_DOUBLE ); 1974 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1975 PhastaIOActiveFiles[i]->double_chunk, 1976 &write_data_status ); 1977 data_size=8*nUnits; 1978 phastaio_time(&t1); 1979 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1980 phastaio_addWriteTime(elapsed); 1981 phastaio_addWriteBytes((nUnits*sizeof(double))+DB_HEADER_SIZE); 1982 } 1983 //MR CHANGE 1984 // else if ( cscompare ( datatype, "integer")) 1985 else if ( cscompare("integer",ts1) ) 1986 //MR CHANGE END 1987 { 1988 memcpy((PhastaIOActiveFiles[i]->int_chunk+DB_HEADER_SIZE/sizeof(int)), valueArray, nUnits*sizeof(int)); 1989 phioTime t0,t1; 1990 phastaio_time(&t0); 1991 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1992 PhastaIOActiveFiles[i]->my_offset, 1993 PhastaIOActiveFiles[i]->int_chunk, 1994 nUnits+DB_HEADER_SIZE/sizeof(int), 1995 MPI_INT ); 1996 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1997 PhastaIOActiveFiles[i]->int_chunk, 1998 &write_data_status ); 1999 data_size=4*nUnits; 2000 phastaio_time(&t1); 2001 const size_t elapsed = phastaio_time_diff(&t0,&t1); 2002 phastaio_addWriteTime(elapsed); 2003 phastaio_addWriteBytes((nUnits*sizeof(int))+DB_HEADER_SIZE); 2004 } 2005 else { 2006 printf("Error: writedatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 2007 endTimer(&timer_end); 2008 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 2009 return; 2010 } 2011 free(ts1); 2012 } 2013 2014 endTimer(&timer_end); 2015 char extra_msg[1024]; 2016 memset(extra_msg, '\0', 1024); 2017 char* key = StringStripper(keyphrase); 2018 sprintf(extra_msg, " field is %s ", key); 2019 printPerf("writedatablock", timer_start, timer_end, data_size, 1, extra_msg); 2020 free(key); 2021 2022 } 2023 2024 void SwapArrayByteOrder( void* array, int nbytes, int nItems ) 2025 { 2026 /* This swaps the byte order for the array of nItems each 2027 of size nbytes , This will be called only locally */ 2028 int i,j; 2029 unsigned char* ucDst = (unsigned char*)array; 2030 for(i=0; i < nItems; i++) { 2031 for(j=0; j < (nbytes/2); j++) 2032 std::swap( ucDst[j] , ucDst[(nbytes - 1) - j] ); 2033 ucDst += nbytes; 2034 } 2035 } 2036 2037 void writestring( int* fileDescriptor, const char inString[] ) 2038 { 2039 int filePtr = *fileDescriptor - 1; 2040 FILE* fileObject = fileArray[filePtr] ; 2041 fprintf(fileObject,"%s",inString ); 2042 return; 2043 } 2044 2045 void Gather_Headers( int* fileDescriptor, vector< string >& headers ) 2046 { 2047 FILE* fileObject; 2048 char Line[1024]; 2049 2050 fileObject = fileArray[ (*fileDescriptor)-1 ]; 2051 2052 while( !feof(fileObject) ) { 2053 fgets( Line, 1024, fileObject); 2054 if ( Line[0] == '#' ) { 2055 headers.push_back( Line ); 2056 } else { 2057 break; 2058 } 2059 } 2060 rewind( fileObject ); 2061 clearerr( fileObject ); 2062 } 2063 2064 void isWrong( void ) { 2065 (Wrong_Endian) ? fprintf(stdout,"YES\n") : fprintf(stdout,"NO\n"); 2066 } 2067 2068 void togglestrictmode( void ) { Strict_Error = !Strict_Error; } 2069 2070 int isLittleEndian( void ) 2071 { 2072 // this function returns a 1 if the current running architecture is 2073 // LittleEndian Byte Ordered, else it returns a zero 2074 union { 2075 long a; 2076 char c[sizeof( long )]; 2077 } endianUnion; 2078 endianUnion.a = 1 ; 2079 if ( endianUnion.c[sizeof(long)-1] != 1 ) return 1 ; 2080 else return 0; 2081 } 2082 2083 namespace PHASTA { 2084 const char* const PhastaIO_traits<int>::type_string = "integer"; 2085 const char* const PhastaIO_traits<double>::type_string = "double"; 2086 } 2087