1 /* 2 * 3 * This is the SyncIO library that uses MPI-IO collective functions to 4 * implement a flexible I/O checkpoint solution for a large number of 5 * processors. 6 * 7 * Previous developer: Ning Liu (liun2@cs.rpi.edu) 8 * Jing Fu (fuj@cs.rpi.edu) 9 * Current developers: Michel Rasquin (Michel.Rasquin@colorado.edu), 10 * Ben Matthews (benjamin.a.matthews@colorado.edu) 11 * 12 */ 13 14 #include <map> 15 #include <vector> 16 #include <string> 17 #include <cstdarg> 18 #include <string.h> 19 #include <ctype.h> 20 #include <stdlib.h> 21 #include <stdio.h> 22 #include <math.h> 23 #include <sstream> 24 #include "phastaIO.h" 25 #include "phiompi.h" 26 #include "mpi.h" 27 #include "phiotmrc.h" 28 #include "phiostats.h" 29 #include <assert.h> 30 31 /* OS-specific things try to stay here */ 32 #include <sys/stat.h> 33 #include <unistd.h> 34 #include <errno.h> 35 36 37 #define VERSION_INFO_HEADER_SIZE 8192 38 #define DB_HEADER_SIZE 1024 39 #define ONE_MEGABYTE 1048576 40 #define TWO_MEGABYTE 2097152 41 #define ENDIAN_TEST_NUMBER 12180 // Troy's Zip Code!! 42 #define MAX_PHASTA_FILES 64 43 #define MAX_PHASTA_FILE_NAME_LENGTH 1024 44 #define MAX_FIELDS_NUMBER ((VERSION_INFO_HEADER_SIZE/MAX_FIELDS_NAME_LENGTH)-4) // The meta data include - MPI_IO_Tag, nFields, nFields*names of the fields, nppf 45 // -3 for MPI_IO_Tag, nFields and nppf, -4 for extra security (former nFiles) 46 #define MAX_FIELDS_NAME_LENGTH 128 47 #define DefaultMHSize (4*ONE_MEGABYTE) 48 //#define DefaultMHSize (8350) //For test 49 #define LATEST_WRITE_VERSION 1 50 #define inv1024sq 953.674316406e-9 // = 1/1024/1024 51 int MasterHeaderSize = -1; 52 53 bool PRINT_PERF = false; // default print no perf results 54 int irank = -1; // global rank, should never be manually manipulated 55 int mysize = -1; 56 57 // Static variables are bad but used here to store the subcommunicator and associated variables 58 // Prevent MPI_Comm_split to be called more than once, especially on BGQ with the V1R2M1 driver (leak detected in MPI_Comm_split - IBM working on it) 59 static int s_assign_local_comm = 0; 60 static MPI_Comm s_local_comm; 61 static int s_local_size = -1; 62 static int s_local_rank = -1; 63 64 // the following defines are for debug printf 65 #define PHASTAIO_DEBUG 0 //default to not print any debugging info 66 67 void phprintf(const char* fmt, ...) { 68 (void)fmt; 69 #if PHASTAIO_DEBUG 70 char format[1024]; 71 snprintf(format, sizeof(format), "phastaIO debug: %s", fmt); 72 va_list ap; 73 va_start(ap,fmt); 74 vprintf(format,ap); 75 va_end(ap); 76 #endif 77 } 78 79 void phprintf_0(const char* fmt, ...) { 80 (void)fmt; 81 #if PHASTAIO_DEBUG 82 int rank = 0; 83 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 84 if(rank == 0){ 85 char format[1024]; 86 snprintf(format, sizeof(format), "phastaIO debug: irank=0 %s", fmt); 87 va_list ap; 88 va_start(ap,fmt); 89 vprintf(format, ap); 90 va_end(ap); 91 } 92 #endif 93 } 94 95 enum PhastaIO_Errors 96 { 97 MAX_PHASTA_FILES_EXCEEDED = -1, 98 UNABLE_TO_OPEN_FILE = -2, 99 NOT_A_MPI_FILE = -3, 100 GPID_EXCEEDED = -4, 101 DATA_TYPE_ILLEGAL = -5 102 }; 103 104 using namespace std; 105 106 namespace{ 107 108 map<int, std::string> LastHeaderKey; 109 vector< FILE* > fileArray; 110 vector< bool > byte_order; 111 vector< int > header_type; 112 int DataSize=0; 113 bool LastHeaderNotFound = false; 114 bool Wrong_Endian = false ; 115 bool Strict_Error = false ; 116 bool binary_format = true; 117 118 /***********************************************************************/ 119 /***************** NEW PHASTA IO CODE STARTS HERE **********************/ 120 /***********************************************************************/ 121 122 typedef struct 123 { 124 char filename[MAX_PHASTA_FILE_NAME_LENGTH]; /* defafults to 1024 */ 125 unsigned long my_offset; 126 unsigned long next_start_address; 127 unsigned long **my_offset_table; 128 unsigned long **my_read_table; 129 130 double * double_chunk; 131 double * read_double_chunk; 132 133 int field_count; 134 int part_count; 135 int read_field_count; 136 int read_part_count; 137 int GPid; 138 int start_id; 139 140 int mhsize; 141 142 int myrank; 143 int numprocs; 144 int local_myrank; 145 int local_numprocs; 146 147 int nppp; 148 int nPPF; 149 int nFiles; 150 int nFields; 151 152 int * int_chunk; 153 int * read_int_chunk; 154 155 int Wrong_Endian; /* default to false */ 156 char * master_header; 157 MPI_File file_handle; 158 MPI_Comm local_comm; 159 } phastaio_file_t; 160 161 typedef struct 162 { 163 int nppf, nfields; 164 char * masterHeader; 165 }serial_file; 166 167 serial_file *SerialFile; 168 phastaio_file_t *PhastaIOActiveFiles[MAX_PHASTA_FILES]; 169 int PhastaIONextActiveIndex = 0; /* indicates next index to allocate */ 170 171 // the caller has the responsibility to delete the returned string 172 // TODO: StringStipper("nbc value? ") returns NULL? 173 char* StringStripper( const char istring[] ) { 174 int length = strlen( istring ); 175 char* dest = (char *)malloc( length + 1 ); 176 strcpy( dest, istring ); 177 dest[ length ] = '\0'; 178 if ( char* p = strpbrk( dest, " ") ) 179 *p = '\0'; 180 return dest; 181 } 182 183 inline int cscompare( const char teststring[], const char targetstring[] ) { 184 char* s1 = const_cast<char*>(teststring); 185 char* s2 = const_cast<char*>(targetstring); 186 187 while( *s1 == ' ') s1++; 188 while( *s2 == ' ') s2++; 189 while( ( *s1 ) 190 && ( *s2 ) 191 && ( *s2 != '?') 192 && ( tolower( *s1 )==tolower( *s2 ) ) ) { 193 s1++; 194 s2++; 195 while( *s1 == ' ') s1++; 196 while( *s2 == ' ') s2++; 197 } 198 if ( !( *s1 ) || ( *s1 == '?') ) return 1; 199 else return 0; 200 } 201 202 inline void isBinary( const char iotype[] ) { 203 char* fname = StringStripper( iotype ); 204 if ( cscompare( fname, "binary" ) ) binary_format = true; 205 else binary_format = false; 206 free (fname); 207 208 } 209 210 inline size_t typeSize( const char typestring[] ) { 211 char* ts1 = StringStripper( typestring ); 212 if ( cscompare( "integer", ts1 ) ) { 213 free (ts1); 214 return sizeof(int); 215 } else if ( cscompare( "double", ts1 ) ) { 216 free (ts1); 217 return sizeof( double ); 218 } else { 219 free (ts1); 220 fprintf(stderr,"unknown type : %s\n",ts1); 221 return 0; 222 } 223 } 224 225 int readHeader( 226 FILE* fileObject, 227 const char phrase[], 228 int* params, 229 int expect ) { 230 char* text_header; 231 char* token; 232 char Line[1024] = "\0"; 233 char junk; 234 bool FOUND = false ; 235 int real_length; 236 int skip_size, integer_value; 237 int rewind_count=0; 238 239 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 240 rewind( fileObject ); 241 clearerr( fileObject ); 242 rewind_count++; 243 fgets( Line, 1024, fileObject ); 244 } 245 246 while( !FOUND && ( rewind_count < 2 ) ) { 247 if ( ( Line[0] != '\n' ) && ( real_length = strcspn( Line, "#" )) ) { 248 text_header = (char *)malloc( real_length + 1 ); 249 strncpy( text_header, Line, real_length ); 250 text_header[ real_length ] =static_cast<char>(NULL); 251 token = strtok ( text_header, ":" ); 252 assert(token); 253 if( cscompare( phrase , token ) ) { 254 FOUND = true ; 255 token = strtok( NULL, " ,;<>" ); 256 assert(token); 257 skip_size = atoi( token ); 258 int i; 259 for( i=0; i < expect && ( token = strtok( NULL," ,;<>") ); i++) { 260 assert(token); 261 params[i] = atoi( token ); 262 } 263 if ( i < expect ) { 264 fprintf(stderr, 265 "Aloha Expected # of ints not found for: %s\n",phrase ); 266 } 267 } else if ( cscompare(token,"byteorder magic number") ) { 268 if ( binary_format ) { 269 fread((void*)&integer_value,sizeof(int),1,fileObject); 270 fread( &junk, sizeof(char), 1 , fileObject ); 271 if ( 362436 != integer_value ) Wrong_Endian = true; 272 } else{ 273 fscanf(fileObject, "%d\n", &integer_value ); 274 } 275 } else { 276 /* some other header, so just skip over */ 277 token = strtok( NULL, " ,;<>" ); 278 assert(token); 279 skip_size = atoi( token ); 280 if ( binary_format) 281 fseek( fileObject, skip_size, SEEK_CUR ); 282 else 283 for( int gama=0; gama < skip_size; gama++ ) 284 fgets( Line, 1024, fileObject ); 285 } 286 free (text_header); 287 } // end of if before while loop 288 289 if ( !FOUND ) 290 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 291 rewind( fileObject ); 292 clearerr( fileObject ); 293 rewind_count++; 294 fgets( Line, 1024, fileObject ); 295 } 296 } 297 298 if ( !FOUND ) { 299 //fprintf(stderr, "Error: Could not find: %s\n", phrase); 300 if(irank == 0) printf("WARNING: Could not find: %s\n", phrase); 301 return 1; 302 } 303 return 0; 304 } 305 306 } // end unnamed namespace 307 308 309 // begin of publicly visible functions 310 311 /** 312 * This function takes a long long pointer and assign (start) phiotmrc value to it 313 */ 314 void startTimer(double* start) { 315 if( !PRINT_PERF ) return; 316 MPI_Barrier(MPI_COMM_WORLD); 317 *start = phiotmrc(); 318 } 319 320 /** 321 * This function takes a long long pointer and assign (end) phiotmrc value to it 322 */ 323 void endTimer(double* end) { 324 if( !PRINT_PERF ) return; 325 *end = phiotmrc(); 326 MPI_Barrier(MPI_COMM_WORLD); 327 } 328 329 /** 330 * choose to print some performance results (or not) according to 331 * the PRINT_PERF macro 332 */ 333 void printPerf( 334 const char* func_name, 335 double start, 336 double end, 337 unsigned long datasize, 338 int printdatainfo, 339 const char* extra_msg) { 340 if( !PRINT_PERF ) return; 341 unsigned long data_size = datasize; 342 double time = end - start; 343 unsigned long isizemin,isizemax,isizetot; 344 double sizemin,sizemax,sizeavg,sizetot,rate; 345 double tmin, tmax, tavg, ttot; 346 347 MPI_Allreduce(&time, &tmin,1, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD); 348 MPI_Allreduce(&time, &tmax,1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); 349 MPI_Allreduce(&time, &ttot,1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); 350 tavg = ttot/mysize; 351 352 if(irank == 0) { 353 if ( PhastaIONextActiveIndex == 0 ) printf("** 1PFPP "); 354 else printf("** syncIO "); 355 printf("%s(): Tmax = %f sec, Tmin = %f sec, Tavg = %f sec", func_name, tmax, tmin, tavg); 356 } 357 358 if(printdatainfo == 1) { // if printdatainfo ==1, compute I/O rate and block size 359 MPI_Allreduce(&data_size,&isizemin,1,MPI_LONG_LONG_INT,MPI_MIN,MPI_COMM_WORLD); 360 MPI_Allreduce(&data_size,&isizemax,1,MPI_LONG_LONG_INT,MPI_MAX,MPI_COMM_WORLD); 361 MPI_Allreduce(&data_size,&isizetot,1,MPI_LONG_LONG_INT,MPI_SUM,MPI_COMM_WORLD); 362 363 sizemin=(double)(isizemin*inv1024sq); 364 sizemax=(double)(isizemax*inv1024sq); 365 sizetot=(double)(isizetot*inv1024sq); 366 sizeavg=(double)(1.0*sizetot/mysize); 367 rate=(double)(1.0*sizetot/tmax); 368 369 if( irank == 0) { 370 printf(", Rate = %f MB/s [%s] \n \t\t\t" 371 " block size: Min= %f MB; Max= %f MB; Avg= %f MB; Tot= %f MB\n", 372 rate, extra_msg, sizemin,sizemax,sizeavg,sizetot); 373 } 374 } 375 else { 376 if(irank == 0) { 377 printf(" \n"); 378 //printf(" (%s) \n", extra_msg); 379 } 380 } 381 } 382 383 /** 384 * This function is normally called at the beginning of a read operation, before 385 * init function. 386 * This function (uses rank 0) reads out nfields, nppf, master header size, 387 * endianess and allocates for masterHeader string. 388 * These values are essential for following read operations. Rank 0 will bcast 389 * these values to other ranks in the commm world 390 * 391 * If the file set is of old POSIX format, it would throw error and exit 392 */ 393 void queryphmpiio(const char filename[],int *nfields, int *nppf) 394 { 395 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 396 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 397 398 if(irank == 0) { 399 FILE * fileHandle; 400 char* fname = StringStripper( filename ); 401 402 phioTime t0,t1; 403 phastaio_time(&t0); 404 fileHandle = fopen (fname,"rb"); 405 phastaio_time(&t1); 406 const size_t elapsed = phastaio_time_diff(&t0,&t1); 407 phastaio_addOpenTime(elapsed); 408 if (fileHandle == NULL ) { 409 printf("\nError: File %s doesn't exist! Please check!\n",fname); 410 } 411 else { 412 SerialFile =(serial_file *)calloc( 1, sizeof( serial_file) ); 413 int meta_size_limit = VERSION_INFO_HEADER_SIZE; 414 SerialFile->masterHeader = (char *)malloc( meta_size_limit ); 415 fread(SerialFile->masterHeader, 1, meta_size_limit, fileHandle); 416 417 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 418 char version[MAX_FIELDS_NAME_LENGTH/4]; 419 int mhsize; 420 char * token; 421 int magic_number; 422 423 memcpy( read_out_tag, 424 SerialFile->masterHeader, 425 MAX_FIELDS_NAME_LENGTH-1 ); 426 427 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) { 428 // Test endianess ... 429 memcpy (&magic_number, 430 SerialFile->masterHeader + sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 431 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 432 433 if ( magic_number != ENDIAN_TEST_NUMBER ) { 434 printf("Endian is different!\n"); 435 // Will do swap later 436 } 437 438 // test version, old version, default masterheader size is 4M 439 // newer version masterheader size is read from first line 440 memcpy(version, 441 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/2, 442 MAX_FIELDS_NAME_LENGTH/4 - 1); //TODO: why -1? 443 444 if( cscompare ("version",version) ) { 445 // if there is "version" tag in the file, then it is newer format 446 // read master header size from here, otherwise use default 447 // Note: if version is "1", we know mhsize is at 3/4 place... 448 449 token = strtok(version, ":"); 450 token = strtok(NULL, " ,;<>" ); 451 int iversion = atoi(token); 452 453 if( iversion == 1) { 454 memcpy( &mhsize, 455 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/4*3 + sizeof("mhsize : ")-1, 456 sizeof(int)); 457 if ( magic_number != ENDIAN_TEST_NUMBER ) 458 SwapArrayByteOrder(&mhsize, sizeof(int), 1); 459 460 if( mhsize > DefaultMHSize ) { 461 //if actual headersize is larger than default, let's re-read 462 free(SerialFile->masterHeader); 463 SerialFile->masterHeader = (char *)malloc(mhsize); 464 fseek(fileHandle, 0, SEEK_SET); // reset the file stream position 465 fread(SerialFile->masterHeader,1,mhsize,fileHandle); 466 } 467 } 468 //TODO: check if this is a valid int?? 469 MasterHeaderSize = mhsize; 470 } 471 else { // else it's version 0's format w/o version tag, implicating MHSize=4M 472 MasterHeaderSize = DefaultMHSize; 473 } 474 475 memcpy( read_out_tag, 476 SerialFile->masterHeader+MAX_FIELDS_NAME_LENGTH+1, 477 MAX_FIELDS_NAME_LENGTH ); //TODO: why +1 478 479 // Read in # fields ... 480 token = strtok( read_out_tag, ":" ); 481 token = strtok( NULL," ,;<>" ); 482 *nfields = atoi( token ); 483 if ( *nfields > MAX_FIELDS_NUMBER) { 484 printf("Error queryphmpiio: nfields is larger than MAX_FIELDS_NUMBER!\n"); 485 } 486 SerialFile->nfields=*nfields; //TODO: sanity check of this int? 487 488 memcpy( read_out_tag, 489 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH * 2 490 + *nfields * MAX_FIELDS_NAME_LENGTH, 491 MAX_FIELDS_NAME_LENGTH); 492 493 token = strtok( read_out_tag, ":" ); 494 token = strtok( NULL," ,;<>" ); 495 *nppf = atoi( token ); 496 SerialFile->nppf=*nppf; //TODO: sanity check of int 497 } // end of if("MPI_IO_TAG") 498 else { 499 printf("Error queryphmpiio: The file you opened is not of syncIO new format, please check! read_out_tag = %s\n",read_out_tag); 500 exit(1); 501 } 502 phioTime t0,t1; 503 phastaio_time(&t0); 504 fclose(fileHandle); 505 phastaio_time(&t1); 506 const size_t elapsed = phastaio_time_diff(&t0,&t1); 507 phastaio_addCloseTime(elapsed); 508 free(SerialFile->masterHeader); 509 free(SerialFile); 510 } //end of else 511 free(fname); 512 } 513 514 // Bcast value to every one 515 MPI_Bcast( nfields, 1, MPI_INT, 0, MPI_COMM_WORLD ); 516 MPI_Bcast( nppf, 1, MPI_INT, 0, MPI_COMM_WORLD ); 517 MPI_Bcast( &MasterHeaderSize, 1, MPI_INT, 0, MPI_COMM_WORLD ); 518 phprintf("Info queryphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 519 } 520 521 /** 522 * This function computes the right master header size (round to size of 2^n). 523 * This is only needed for file format version 1 in "write" mode. 524 */ 525 int computeMHSize(int nfields, int nppf, int version) { 526 int mhsize=0; 527 if(version == 1) { 528 //int meta_info_size = (2+nfields+1) * MAX_FIELDS_NAME_LENGTH; // 2 is MPI_IO_TAG and nFields, the others 1 is nppf 529 int meta_info_size = VERSION_INFO_HEADER_SIZE; 530 int actual_size = nfields * nppf * sizeof(unsigned long) + meta_info_size; 531 //printf("actual_size = %d, offset table size = %d\n", actual_size, nfields * nppf * sizeof(long long)); 532 if (actual_size > DefaultMHSize) { 533 mhsize = (int) ceil( (double) actual_size/DefaultMHSize); // it's rounded to ceiling of this value 534 mhsize *= DefaultMHSize; 535 } 536 else { 537 mhsize = DefaultMHSize; 538 } 539 } else { 540 int rank = 0; 541 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 542 if(!rank) { 543 fprintf(stderr, 544 "ERROR invalid version passed to %s... exiting\n", __func__); 545 exit(EXIT_FAILURE); 546 } 547 } 548 return mhsize; 549 } 550 551 /** 552 * Computes correct color of a rank according to number of files. 553 */ 554 extern "C" int computeColor( int myrank, int numprocs, int nfiles) { 555 int color = 556 (int)(myrank / (numprocs / nfiles)); 557 return color; 558 } 559 560 561 /** 562 * Check the file descriptor. 563 */ 564 void checkFileDescriptor(const char fctname[], int* fileDescriptor ) { 565 if ( *fileDescriptor < 0 ) { 566 printf("Error: File descriptor = %d in %s\n",*fileDescriptor,fctname); 567 exit(1); 568 } 569 } 570 571 /** 572 * Initialize the file struct members and allocate space for file struct 573 * buffers. 574 * 575 * Note: this function is only called when we are using new format. Old POSIX 576 * format should skip this routine and call openfile() directly instead. 577 */ 578 int initphmpiio( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[]) 579 { 580 // we init irank again in case query not called (e.g. syncIO write case) 581 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 582 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 583 584 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d, nfields %d, nppf %d, nfiles %d\n", irank, MasterHeaderSize, *nfields, *nppf, *nfiles); 585 586 double timer_start, timer_end; 587 startTimer(&timer_start); 588 589 char* imode = StringStripper( mode ); 590 591 // Note: if it's read, we presume query was called prior to init and 592 // MasterHeaderSize is already set to correct value from parsing header 593 // otherwise it's write then it needs some computation to be set 594 if ( cscompare( "read", imode ) ) { 595 // do nothing 596 } 597 else if( cscompare( "write", imode ) ) { 598 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 599 } 600 else { 601 printf("Error initphmpiio: can't recognize the mode %s", imode); 602 exit(1); 603 } 604 free ( imode ); 605 606 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 607 608 int i, j; 609 610 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 611 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 612 endTimer(&timer_end); 613 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 614 return MAX_PHASTA_FILES_EXCEEDED; 615 } 616 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 617 // { 618 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 619 // { 620 // PhastaIOActiveFiles[i] = NULL; 621 // } 622 // } 623 624 625 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 626 627 i = PhastaIONextActiveIndex; 628 PhastaIONextActiveIndex++; 629 630 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 631 632 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 633 634 PhastaIOActiveFiles[i]->Wrong_Endian = false; 635 636 PhastaIOActiveFiles[i]->nFields = *nfields; 637 PhastaIOActiveFiles[i]->nPPF = *nppf; 638 PhastaIOActiveFiles[i]->nFiles = *nfiles; 639 MPI_Comm_rank(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->myrank)); 640 MPI_Comm_size(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->numprocs)); 641 642 643 if( *nfiles > 1 ) { // split the ranks according to each mpiio file 644 645 if ( s_assign_local_comm == 0) { // call mpi_comm_split for the first (and only) time 646 647 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Building subcommunicator\n"); 648 649 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 650 MPI_Comm_split(MPI_COMM_WORLD, 651 color, 652 PhastaIOActiveFiles[i]->myrank, 653 &(PhastaIOActiveFiles[i]->local_comm)); 654 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 655 &(PhastaIOActiveFiles[i]->local_numprocs)); 656 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 657 &(PhastaIOActiveFiles[i]->local_myrank)); 658 659 // back up now these variables so that we do not need to call comm_split again 660 s_local_comm = PhastaIOActiveFiles[i]->local_comm; 661 s_local_size = PhastaIOActiveFiles[i]->local_numprocs; 662 s_local_rank = PhastaIOActiveFiles[i]->local_myrank; 663 s_assign_local_comm = 1; 664 } 665 else { // recycle the subcommunicator 666 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Recycling subcommunicator\n"); 667 PhastaIOActiveFiles[i]->local_comm = s_local_comm; 668 PhastaIOActiveFiles[i]->local_numprocs = s_local_size; 669 PhastaIOActiveFiles[i]->local_myrank = s_local_rank; 670 } 671 } 672 else { // *nfiles == 1 here - no need to call mpi_comm_split here 673 674 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Bypassing subcommunicator\n"); 675 PhastaIOActiveFiles[i]->local_comm = MPI_COMM_WORLD; 676 PhastaIOActiveFiles[i]->local_numprocs = PhastaIOActiveFiles[i]->numprocs; 677 PhastaIOActiveFiles[i]->local_myrank = PhastaIOActiveFiles[i]->myrank; 678 679 } 680 681 PhastaIOActiveFiles[i]->nppp = 682 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 683 684 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 685 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 686 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 687 688 PhastaIOActiveFiles[i]->my_offset_table = 689 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 690 691 PhastaIOActiveFiles[i]->my_read_table = 692 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 693 694 for (j=0; j<*nfields; j++) 695 { 696 PhastaIOActiveFiles[i]->my_offset_table[j] = 697 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 698 699 PhastaIOActiveFiles[i]->my_read_table[j] = 700 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 701 } 702 *filehandle = i; 703 704 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 705 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 706 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 707 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 708 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 709 710 // Time monitoring 711 endTimer(&timer_end); 712 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 713 714 phprintf_0("Info initphmpiio: quiting function"); 715 716 return i; 717 } 718 719 /** 720 * Destruct the file struct and free buffers allocated in init function. 721 */ 722 void finalizephmpiio( int *fileDescriptor ) 723 { 724 double timer_start, timer_end; 725 startTimer(&timer_start); 726 727 int i, j; 728 i = *fileDescriptor; 729 //PhastaIONextActiveIndex--; 730 731 /* //free the offset table for this phasta file */ 732 //for(j=0; j<MAX_FIELDS_NUMBER; j++) //Danger: undefined behavior for my_*_table.[j] not allocated or not initialized to NULL 733 for(j=0; j<PhastaIOActiveFiles[i]->nFields; j++) 734 { 735 free( PhastaIOActiveFiles[i]->my_offset_table[j]); 736 free( PhastaIOActiveFiles[i]->my_read_table[j]); 737 } 738 free ( PhastaIOActiveFiles[i]->my_offset_table ); 739 free ( PhastaIOActiveFiles[i]->my_read_table ); 740 free ( PhastaIOActiveFiles[i]->master_header ); 741 free ( PhastaIOActiveFiles[i]->double_chunk ); 742 free ( PhastaIOActiveFiles[i]->int_chunk ); 743 free ( PhastaIOActiveFiles[i]->read_double_chunk ); 744 free ( PhastaIOActiveFiles[i]->read_int_chunk ); 745 746 if( PhastaIOActiveFiles[i]->nFiles > 1 && s_assign_local_comm ) { // the comm was split 747 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Freeing subcommunicator\n"); 748 s_assign_local_comm = 0; 749 MPI_Comm_free(&(PhastaIOActiveFiles[i]->local_comm)); 750 } 751 752 free( PhastaIOActiveFiles[i]); 753 754 endTimer(&timer_end); 755 printPerf("finalizempiio", timer_start, timer_end, 0, 0, ""); 756 757 PhastaIONextActiveIndex--; 758 } 759 760 761 /** 762 * Special init for M2N in order to create a subcommunicator for the reduced solution (requires PRINT_PERF to be false for now) 763 * Initialize the file struct members and allocate space for file struct buffers. 764 * 765 * Note: this function is only called when we are using new format. Old POSIX 766 * format should skip this routine and call openfile() directly instead. 767 */ 768 int initphmpiiosub( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[],MPI_Comm my_local_comm) 769 { 770 // we init irank again in case query not called (e.g. syncIO write case) 771 772 MPI_Comm_rank(my_local_comm, &irank); 773 MPI_Comm_size(my_local_comm, &mysize); 774 775 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 776 777 double timer_start, timer_end; 778 startTimer(&timer_start); 779 780 char* imode = StringStripper( mode ); 781 782 // Note: if it's read, we presume query was called prior to init and 783 // MasterHeaderSize is already set to correct value from parsing header 784 // otherwise it's write then it needs some computation to be set 785 if ( cscompare( "read", imode ) ) { 786 // do nothing 787 } 788 else if( cscompare( "write", imode ) ) { 789 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 790 } 791 else { 792 printf("Error initphmpiio: can't recognize the mode %s", imode); 793 exit(1); 794 } 795 free ( imode ); 796 797 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 798 799 int i, j; 800 801 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 802 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 803 endTimer(&timer_end); 804 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 805 return MAX_PHASTA_FILES_EXCEEDED; 806 } 807 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 808 // { 809 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 810 // { 811 // PhastaIOActiveFiles[i] = NULL; 812 // } 813 // } 814 815 816 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 817 818 i = PhastaIONextActiveIndex; 819 PhastaIONextActiveIndex++; 820 821 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 822 823 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 824 825 PhastaIOActiveFiles[i]->Wrong_Endian = false; 826 827 PhastaIOActiveFiles[i]->nFields = *nfields; 828 PhastaIOActiveFiles[i]->nPPF = *nppf; 829 PhastaIOActiveFiles[i]->nFiles = *nfiles; 830 MPI_Comm_rank(my_local_comm, &(PhastaIOActiveFiles[i]->myrank)); 831 MPI_Comm_size(my_local_comm, &(PhastaIOActiveFiles[i]->numprocs)); 832 833 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 834 MPI_Comm_split(my_local_comm, 835 color, 836 PhastaIOActiveFiles[i]->myrank, 837 &(PhastaIOActiveFiles[i]->local_comm)); 838 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 839 &(PhastaIOActiveFiles[i]->local_numprocs)); 840 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 841 &(PhastaIOActiveFiles[i]->local_myrank)); 842 PhastaIOActiveFiles[i]->nppp = 843 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 844 845 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 846 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 847 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 848 849 PhastaIOActiveFiles[i]->my_offset_table = 850 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 851 852 PhastaIOActiveFiles[i]->my_read_table = 853 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 854 855 for (j=0; j<*nfields; j++) 856 { 857 PhastaIOActiveFiles[i]->my_offset_table[j] = 858 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 859 860 PhastaIOActiveFiles[i]->my_read_table[j] = 861 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 862 } 863 *filehandle = i; 864 865 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 866 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 867 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 868 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 869 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 870 871 // Time monitoring 872 endTimer(&timer_end); 873 printPerf("initphmpiiosub", timer_start, timer_end, 0, 0, ""); 874 875 phprintf_0("Info initphmpiiosub: quiting function"); 876 877 return i; 878 } 879 880 namespace { 881 882 enum { 883 DIR_MODE = S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH 884 }; 885 886 bool my_mkdir(std::string name) { 887 if(name.empty()) 888 return true; 889 errno = 0; 890 int err = mkdir(name.c_str(), DIR_MODE); 891 if ((err == -1) && (errno == EEXIST)) { 892 errno = 0; 893 err = 0; 894 return false; 895 } 896 assert(!err); 897 return true; 898 } 899 900 enum { 901 DIR_FANOUT = 2048 902 }; 903 904 std::string getSubDirPrefix() { 905 if (phio_peers() <= DIR_FANOUT) 906 return string(""); 907 int self = phio_self(); 908 int subSelf = self % DIR_FANOUT; 909 int subGroup = self / DIR_FANOUT; 910 std::stringstream ss; 911 ss << subGroup << '/'; 912 return ss.str(); 913 } 914 } 915 916 /** open file for both POSIX and MPI-IO syncIO format. 917 * 918 * If it's old POSIX format, simply call posix fopen(). 919 * 920 * If it's MPI-IO foramt: 921 * in "read" mode, it builds the header table that points to the offset of 922 * fields for parts; 923 * in "write" mode, it opens the file with MPI-IO open routine. 924 */ 925 void openfile(const char filename[], const char mode[], int* fileDescriptor ) 926 { 927 phprintf_0("Info: entering openfile"); 928 929 double timer_start, timer_end; 930 startTimer(&timer_start); 931 932 if ( PhastaIONextActiveIndex == 0 ) 933 { 934 FILE* file=NULL ; 935 *fileDescriptor = 0; 936 char* fname = StringStripper( filename ); 937 char* imode = StringStripper( mode ); 938 939 std::string posixname = getSubDirPrefix(); 940 if (!phio_self()) 941 my_mkdir(posixname); 942 phio_barrier(); 943 posixname += string(fname); 944 phioTime t0,t1; 945 phastaio_time(&t0); 946 if ( cscompare( "read", imode ) ) 947 file = fopen(posixname.c_str(), "rb" ); 948 else if( cscompare( "write", imode ) ) 949 file = fopen(posixname.c_str(), "wb" ); 950 else if( cscompare( "append", imode ) ) 951 file = fopen(posixname.c_str(), "ab" ); 952 phastaio_time(&t1); 953 const size_t elapsed = phastaio_time_diff(&t0,&t1); 954 phastaio_addOpenTime(elapsed); 955 956 if ( !file ){ 957 fprintf(stderr,"Error openfile: unable to open file %s\n",fname); 958 } else { 959 fileArray.push_back( file ); 960 byte_order.push_back( false ); 961 header_type.push_back( sizeof(int) ); 962 *fileDescriptor = fileArray.size(); 963 } 964 free (fname); 965 free (imode); 966 } 967 else // else it would be parallel I/O, opposed to posix io 968 { 969 char* fname = StringStripper( filename ); 970 char* imode = StringStripper( mode ); 971 int rc; 972 int i = *fileDescriptor; 973 checkFileDescriptor("openfile",&i); 974 char* token; 975 976 if ( cscompare( "read", imode ) ) 977 { 978 // if (PhastaIOActiveFiles[i]->myrank == 0) 979 // printf("\n **********\nRead open ... ... regular version\n"); 980 981 phioTime t0,t1; 982 phastaio_time(&t0); 983 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 984 fname, 985 MPI_MODE_RDONLY, 986 MPI_INFO_NULL, 987 &(PhastaIOActiveFiles[i]->file_handle) ); 988 phastaio_time(&t1); 989 const size_t elapsed = phastaio_time_diff(&t0,&t1); 990 phastaio_addOpenTime(elapsed); 991 992 if(rc) 993 { 994 *fileDescriptor = UNABLE_TO_OPEN_FILE; 995 int error_string_length; 996 char error_string[4096]; 997 MPI_Error_string(rc, error_string, &error_string_length); 998 fprintf(stderr, "Error openfile: Unable to open file %s! MPI reports \"%s\"\n",fname,error_string); 999 endTimer(&timer_end); 1000 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1001 return; 1002 } 1003 1004 MPI_Status read_tag_status; 1005 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1006 int j; 1007 int magic_number; 1008 1009 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1010 MPI_File_read_at( PhastaIOActiveFiles[i]->file_handle, 1011 0, 1012 PhastaIOActiveFiles[i]->master_header, 1013 MasterHeaderSize, 1014 MPI_CHAR, 1015 &read_tag_status ); 1016 } 1017 1018 MPI_Bcast( PhastaIOActiveFiles[i]->master_header, 1019 MasterHeaderSize, 1020 MPI_CHAR, 1021 0, 1022 PhastaIOActiveFiles[i]->local_comm ); 1023 1024 memcpy( read_out_tag, 1025 PhastaIOActiveFiles[i]->master_header, 1026 MAX_FIELDS_NAME_LENGTH-1 ); 1027 1028 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) 1029 { 1030 // Test endianess ... 1031 memcpy ( &magic_number, 1032 PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1033 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 1034 1035 if ( magic_number != ENDIAN_TEST_NUMBER ) 1036 { 1037 PhastaIOActiveFiles[i]->Wrong_Endian = true; 1038 } 1039 1040 memcpy( read_out_tag, 1041 PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH+1, // TODO: WHY +1??? 1042 MAX_FIELDS_NAME_LENGTH ); 1043 1044 // Read in # fields ... 1045 token = strtok ( read_out_tag, ":" ); 1046 token = strtok( NULL," ,;<>" ); 1047 PhastaIOActiveFiles[i]->nFields = atoi( token ); 1048 1049 unsigned long **header_table; 1050 header_table = ( unsigned long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long *)); 1051 1052 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1053 { 1054 header_table[j]=( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long)); 1055 } 1056 1057 // Read in the offset table ... 1058 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1059 { 1060 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1061 memcpy( header_table[j], 1062 PhastaIOActiveFiles[i]->master_header + 1063 VERSION_INFO_HEADER_SIZE + 1064 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long), 1065 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long) ); 1066 } 1067 1068 MPI_Scatter( header_table[j], 1069 PhastaIOActiveFiles[i]->nppp, 1070 MPI_LONG_LONG_INT, 1071 PhastaIOActiveFiles[i]->my_read_table[j], 1072 PhastaIOActiveFiles[i]->nppp, 1073 MPI_LONG_LONG_INT, 1074 0, 1075 PhastaIOActiveFiles[i]->local_comm ); 1076 1077 // Swap byte order if endianess is different ... 1078 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) { 1079 SwapArrayByteOrder( PhastaIOActiveFiles[i]->my_read_table[j], 1080 sizeof(unsigned long), 1081 PhastaIOActiveFiles[i]->nppp ); 1082 } 1083 } 1084 1085 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1086 free ( header_table[j] ); 1087 } 1088 free (header_table); 1089 1090 } // end of if MPI_IO_TAG 1091 else //else not valid MPI file 1092 { 1093 *fileDescriptor = NOT_A_MPI_FILE; 1094 printf("Error openfile: The file %s you opened is not in syncIO new format, please check again! File descriptor = %d, MasterHeaderSize = %d, read_out_tag = %s\n",fname,*fileDescriptor,MasterHeaderSize,read_out_tag); 1095 //Printing MasterHeaderSize is useful to test a compiler bug on Intrepid BGP 1096 endTimer(&timer_end); 1097 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1098 return; 1099 } 1100 } // end of if "read" 1101 else if( cscompare( "write", imode ) ) 1102 { 1103 phioTime t0,t1; 1104 phastaio_time(&t0); 1105 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1106 fname, 1107 MPI_MODE_WRONLY | MPI_MODE_CREATE, 1108 MPI_INFO_NULL, 1109 &(PhastaIOActiveFiles[i]->file_handle) ); 1110 phastaio_time(&t1); 1111 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1112 phastaio_addOpenTime(elapsed); 1113 if(rc != MPI_SUCCESS) 1114 { 1115 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1116 int error_string_length; 1117 char error_string[4096]; 1118 MPI_Error_string(rc, error_string, &error_string_length); 1119 fprintf(stderr, "Error openfile: Unable to open file %s! MPI reports \"%s\"\n",fname,error_string); 1120 return; 1121 } 1122 } // end of if "write" 1123 free (fname); 1124 free (imode); 1125 } // end of if FileIndex != 0 1126 1127 endTimer(&timer_end); 1128 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1129 } 1130 1131 /** close file for both POSIX and MPI-IO syncIO format. 1132 * 1133 * If it's old POSIX format, simply call posix fclose(). 1134 * 1135 * If it's MPI-IO foramt: 1136 * in "read" mode, it simply close file with MPI-IO close routine. 1137 * in "write" mode, rank 0 in each group will re-assemble the master header and 1138 * offset table and write to the beginning of file, then close the file. 1139 */ 1140 void closefile( int* fileDescriptor, const char mode[] ) 1141 { 1142 double timer_start, timer_end; 1143 startTimer(&timer_start); 1144 1145 int i = *fileDescriptor; 1146 checkFileDescriptor("closefile",&i); 1147 1148 if ( PhastaIONextActiveIndex == 0 ) { 1149 char* imode = StringStripper( mode ); 1150 1151 if( cscompare( "write", imode ) 1152 || cscompare( "append", imode ) ) { 1153 fflush( fileArray[ *fileDescriptor - 1 ] ); 1154 } 1155 1156 phioTime t0,t1; 1157 phastaio_time(&t0); 1158 fclose( fileArray[ *fileDescriptor - 1 ] ); 1159 phastaio_time(&t1); 1160 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1161 phastaio_addCloseTime(elapsed); 1162 free (imode); 1163 } 1164 else { 1165 char* imode = StringStripper( mode ); 1166 1167 //write master header here: 1168 if ( cscompare( "write", imode ) ) { 1169 // if ( PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields < 2*ONE_MEGABYTE/8 ) //SHOULD BE CHECKED 1170 // MasterHeaderSize = 4*ONE_MEGABYTE; 1171 // else 1172 // MasterHeaderSize = 4*ONE_MEGABYTE + PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields * 8 - 2*ONE_MEGABYTE; 1173 1174 MasterHeaderSize = computeMHSize( PhastaIOActiveFiles[i]->nFields, PhastaIOActiveFiles[i]->nPPF, LATEST_WRITE_VERSION); 1175 phprintf_0("Info closefile: myrank = %d, MasterHeaderSize = %d\n", PhastaIOActiveFiles[i]->myrank, MasterHeaderSize); 1176 1177 MPI_Status write_header_status; 1178 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1179 char version[MAX_FIELDS_NAME_LENGTH/4]; 1180 char mhsize[MAX_FIELDS_NAME_LENGTH/4]; 1181 int magic_number = ENDIAN_TEST_NUMBER; 1182 1183 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) 1184 { 1185 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1186 sprintf(mpi_tag, "MPI_IO_Tag : "); 1187 memcpy(PhastaIOActiveFiles[i]->master_header, 1188 mpi_tag, 1189 MAX_FIELDS_NAME_LENGTH); 1190 1191 bzero((void*)version,MAX_FIELDS_NAME_LENGTH/4); 1192 // this version is "1", print version in ASCII 1193 sprintf(version, "version : %d",1); 1194 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/2, 1195 version, 1196 MAX_FIELDS_NAME_LENGTH/4); 1197 1198 // master header size is computed using the formula above 1199 bzero((void*)mhsize,MAX_FIELDS_NAME_LENGTH/4); 1200 sprintf(mhsize, "mhsize : "); 1201 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/4*3, 1202 mhsize, 1203 MAX_FIELDS_NAME_LENGTH/4); 1204 1205 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1206 sprintf(mpi_tag, 1207 "\nnFields : %d\n", 1208 PhastaIOActiveFiles[i]->nFields); 1209 memcpy(PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH, 1210 mpi_tag, 1211 MAX_FIELDS_NAME_LENGTH); 1212 1213 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1214 sprintf(mpi_tag, "\nnPPF : %d\n", PhastaIOActiveFiles[i]->nPPF); 1215 memcpy( PhastaIOActiveFiles[i]->master_header+ 1216 PhastaIOActiveFiles[i]->nFields * 1217 MAX_FIELDS_NAME_LENGTH + 1218 MAX_FIELDS_NAME_LENGTH * 2, 1219 mpi_tag, 1220 MAX_FIELDS_NAME_LENGTH); 1221 1222 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1223 &magic_number, 1224 sizeof(int)); 1225 1226 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("mhsize : ") -1 + MAX_FIELDS_NAME_LENGTH/4*3, 1227 &MasterHeaderSize, 1228 sizeof(int)); 1229 } 1230 1231 int j = 0; 1232 unsigned long **header_table; 1233 header_table = ( unsigned long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long *)); 1234 1235 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1236 header_table[j]=( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long)); 1237 } 1238 1239 //if( irank == 0 ) printf("gonna mpi_gather, myrank = %d\n", irank); 1240 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1241 MPI_Gather( PhastaIOActiveFiles[i]->my_offset_table[j], 1242 PhastaIOActiveFiles[i]->nppp, 1243 MPI_LONG_LONG_INT, 1244 header_table[j], 1245 PhastaIOActiveFiles[i]->nppp, 1246 MPI_LONG_LONG_INT, 1247 0, 1248 PhastaIOActiveFiles[i]->local_comm ); 1249 } 1250 1251 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1252 1253 //if( irank == 0 ) printf("gonna memcpy for every procs, myrank = %d\n", irank); 1254 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1255 memcpy ( PhastaIOActiveFiles[i]->master_header + 1256 VERSION_INFO_HEADER_SIZE + 1257 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long), 1258 header_table[j], 1259 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long) ); 1260 } 1261 1262 //if( irank == 0 ) printf("gonna file_write_at(), myrank = %d\n", irank); 1263 MPI_File_write_at( PhastaIOActiveFiles[i]->file_handle, 1264 0, 1265 PhastaIOActiveFiles[i]->master_header, 1266 MasterHeaderSize, 1267 MPI_CHAR, 1268 &write_header_status ); 1269 } 1270 1271 ////free(PhastaIOActiveFiles[i]->master_header); 1272 1273 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1274 free ( header_table[j] ); 1275 } 1276 free (header_table); 1277 } 1278 1279 //if( irank == 0 ) printf("gonna file_close(), myrank = %d\n", irank); 1280 phioTime t0,t1; 1281 phastaio_time(&t0); 1282 MPI_File_close( &( PhastaIOActiveFiles[i]->file_handle ) ); 1283 phastaio_time(&t1); 1284 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1285 phastaio_addCloseTime(elapsed); 1286 free ( imode ); 1287 } 1288 1289 endTimer(&timer_end); 1290 printPerf("closefile_", timer_start, timer_end, 0, 0, ""); 1291 } 1292 1293 int readHeader( FILE* f, const char phrase[], 1294 int* params, int numParams, const char* iotype) { 1295 isBinary(iotype); 1296 return readHeader(f,phrase,params,numParams); 1297 } 1298 1299 void readheader( 1300 int* fileDescriptor, 1301 const char keyphrase[], 1302 void* valueArray, 1303 int* nItems, 1304 const char datatype[], 1305 const char iotype[] ) 1306 { 1307 double timer_start, timer_end; 1308 1309 startTimer(&timer_start); 1310 1311 int i = *fileDescriptor; 1312 checkFileDescriptor("readheader",&i); 1313 1314 if ( PhastaIONextActiveIndex == 0 ) { 1315 int filePtr = *fileDescriptor - 1; 1316 FILE* fileObject; 1317 int* valueListInt; 1318 1319 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1320 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1321 fprintf(stderr,"openfile_ function has to be called before \n") ; 1322 fprintf(stderr,"acessing the file\n ") ; 1323 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1324 endTimer(&timer_end); 1325 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1326 return; 1327 } 1328 1329 LastHeaderKey[filePtr] = std::string(keyphrase); 1330 LastHeaderNotFound = false; 1331 1332 fileObject = fileArray[ filePtr ] ; 1333 Wrong_Endian = byte_order[ filePtr ]; 1334 1335 isBinary( iotype ); 1336 typeSize( datatype ); //redundant call, just avoid a compiler warning. 1337 1338 // right now we are making the assumption that we will only write integers 1339 // on the header line. 1340 1341 valueListInt = static_cast< int* >( valueArray ); 1342 int ierr = readHeader( fileObject , 1343 keyphrase, 1344 valueListInt, 1345 *nItems ) ; 1346 1347 byte_order[ filePtr ] = Wrong_Endian ; 1348 1349 if ( ierr ) LastHeaderNotFound = true; 1350 1351 //return ; // don't return, go to the end to print perf 1352 } 1353 else { 1354 int* valueListInt; 1355 valueListInt = static_cast <int*>(valueArray); 1356 char* token = NULL; 1357 bool FOUND = false ; 1358 isBinary( iotype ); 1359 1360 MPI_Status read_offset_status; 1361 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1362 memset(read_out_tag, '\0', MAX_FIELDS_NAME_LENGTH); 1363 char readouttag[MAX_FIELDS_NUMBER][MAX_FIELDS_NAME_LENGTH]; 1364 int j; 1365 1366 int string_length = strlen( keyphrase ); 1367 char* buffer = (char*) malloc ( string_length+1 ); 1368 1369 strcpy ( buffer, keyphrase ); 1370 buffer[ string_length ] = '\0'; 1371 1372 char* st2 = strtok ( buffer, "@" ); 1373 st2 = strtok (NULL, "@"); 1374 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1375 if ( char* p = strpbrk(buffer, "@") ) 1376 *p = '\0'; 1377 1378 // Check if the user has input the right GPid 1379 if ( ( PhastaIOActiveFiles[i]->GPid <= 1380 PhastaIOActiveFiles[i]->myrank * 1381 PhastaIOActiveFiles[i]->nppp )|| 1382 ( PhastaIOActiveFiles[i]->GPid > 1383 ( PhastaIOActiveFiles[i]->myrank + 1 ) * 1384 PhastaIOActiveFiles[i]->nppp ) ) 1385 { 1386 *fileDescriptor = NOT_A_MPI_FILE; 1387 printf("Error readheader: The file is not in syncIO new format, please check! myrank = %d, GPid = %d, nppp = %d, keyphrase = %s\n", PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->GPid, PhastaIOActiveFiles[i]->nppp, keyphrase); 1388 // It is possible atoi could not generate a clear integer from st2 because of additional garbage character in keyphrase 1389 endTimer(&timer_end); 1390 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1391 return; 1392 } 1393 1394 // Find the field we want ... 1395 //for ( j = 0; j<MAX_FIELDS_NUMBER; j++ ) 1396 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1397 { 1398 memcpy( readouttag[j], 1399 PhastaIOActiveFiles[i]->master_header + j*MAX_FIELDS_NAME_LENGTH+MAX_FIELDS_NAME_LENGTH*2+1, 1400 MAX_FIELDS_NAME_LENGTH-1 ); 1401 } 1402 1403 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1404 { 1405 token = strtok ( readouttag[j], ":" ); 1406 1407 //if ( cscompare( buffer, token ) ) 1408 if ( cscompare( token , buffer ) && cscompare( buffer, token ) ) 1409 // This double comparison is required for the field "number of nodes" and all the other fields that start with "number of nodes" (i.g. number of nodes in the mesh"). 1410 // Would be safer to rename "number of nodes" by "number of nodes in the part" so that the name are completely unique. But much more work to do that (Nspre, phParAdapt, etc). 1411 // Since the field name are unique in SyncIO (as it includes part ID), this should be safe and there should be no issue with the "?" trailing character. 1412 { 1413 PhastaIOActiveFiles[i]->read_field_count = j; 1414 FOUND = true; 1415 //printf("buffer: %s | token: %s | j: %d\n",buffer,token,j); 1416 break; 1417 } 1418 } 1419 free(buffer); 1420 1421 if (!FOUND) 1422 { 1423 //if(irank==0) printf("Warning readheader: Not found %s \n",keyphrase); //PhastaIOActiveFiles[i]->myrank is certainly initialized here. 1424 if(PhastaIOActiveFiles[i]->myrank == 0) printf("WARNING readheader: Not found %s\n",keyphrase); 1425 endTimer(&timer_end); 1426 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1427 return; 1428 } 1429 1430 // Find the part we want ... 1431 PhastaIOActiveFiles[i]->read_part_count = PhastaIOActiveFiles[i]->GPid - 1432 PhastaIOActiveFiles[i]->myrank * PhastaIOActiveFiles[i]->nppp - 1; 1433 1434 PhastaIOActiveFiles[i]->my_offset = 1435 PhastaIOActiveFiles[i]->my_read_table[PhastaIOActiveFiles[i]->read_field_count][PhastaIOActiveFiles[i]->read_part_count]; 1436 1437 // printf("****Rank %d offset is %d\n",PhastaIOActiveFiles[i]->myrank,PhastaIOActiveFiles[i]->my_offset); 1438 1439 // Read each datablock header here ... 1440 1441 MPI_File_read_at_all( PhastaIOActiveFiles[i]->file_handle, 1442 PhastaIOActiveFiles[i]->my_offset+1, 1443 read_out_tag, 1444 MAX_FIELDS_NAME_LENGTH-1, 1445 MPI_CHAR, 1446 &read_offset_status ); 1447 token = strtok ( read_out_tag, ":" ); 1448 1449 // printf("&&&&Rank %d read_out_tag is %s\n",PhastaIOActiveFiles[i]->myrank,read_out_tag); 1450 1451 if( cscompare( keyphrase , token ) ) //No need to compare also token with keyphrase like above. We should already have the right one. Otherwise there is a problem. 1452 { 1453 FOUND = true ; 1454 token = strtok( NULL, " ,;<>" ); 1455 for( j=0; j < *nItems && ( token = strtok( NULL," ,;<>") ); j++ ) 1456 valueListInt[j] = atoi( token ); 1457 1458 if ( j < *nItems ) 1459 { 1460 fprintf( stderr, "Expected # of ints not found for: %s\n", keyphrase ); 1461 } 1462 } 1463 else { 1464 //if(irank==0) 1465 if(PhastaIOActiveFiles[i]->myrank == 0) 1466 // If we enter this if, there is a problem with the name of some fields 1467 { 1468 printf("Error readheader: Unexpected mismatch between keyphrase = %s and token = %s\n",keyphrase,token); 1469 } 1470 } 1471 } 1472 1473 endTimer(&timer_end); 1474 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1475 1476 } 1477 1478 void readDataBlock( 1479 FILE* fileObject, 1480 void* valueArray, 1481 int nItems, 1482 const char datatype[], 1483 const char iotype[] ) 1484 { 1485 isBinary(iotype); 1486 size_t type_size = typeSize( datatype ); 1487 phioTime t0,t1; 1488 phastaio_time(&t0); 1489 if ( binary_format ) { 1490 char junk = '\0'; 1491 fread( valueArray, type_size, nItems, fileObject ); 1492 fread( &junk, sizeof(char), 1 , fileObject ); 1493 if ( Wrong_Endian ) SwapArrayByteOrder( valueArray, type_size, nItems ); 1494 } else { 1495 char* ts1 = StringStripper( datatype ); 1496 if ( cscompare( "integer", ts1 ) ) { 1497 for( int n=0; n < nItems ; n++ ) 1498 fscanf(fileObject, "%d\n",(int*)((int*)valueArray+n) ); 1499 } else if ( cscompare( "double", ts1 ) ) { 1500 for( int n=0; n < nItems ; n++ ) 1501 fscanf(fileObject, "%lf\n",(double*)((double*)valueArray+n) ); 1502 } 1503 free (ts1); 1504 } 1505 phastaio_time(&t1); 1506 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1507 phastaio_addReadTime(elapsed); 1508 phastaio_addReadBytes(nItems*type_size); 1509 } 1510 1511 void readdatablock( 1512 int* fileDescriptor, 1513 const char keyphrase[], 1514 void* valueArray, 1515 int* nItems, 1516 const char datatype[], 1517 const char iotype[] ) 1518 { 1519 //if(irank == 0) printf("entering readdatablock()\n"); 1520 unsigned long data_size = 0; 1521 double timer_start, timer_end; 1522 startTimer(&timer_start); 1523 1524 int i = *fileDescriptor; 1525 checkFileDescriptor("readdatablock",&i); 1526 1527 if ( PhastaIONextActiveIndex == 0 ) { 1528 int filePtr = *fileDescriptor - 1; 1529 FILE* fileObject; 1530 1531 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1532 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1533 fprintf(stderr,"openfile_ function has to be called before\n") ; 1534 fprintf(stderr,"acessing the file\n ") ; 1535 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1536 endTimer(&timer_end); 1537 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1538 return; 1539 } 1540 1541 // error check.. 1542 // since we require that a consistant header always preceed the data block 1543 // let us check to see that it is actually the case. 1544 1545 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1546 fprintf(stderr, "Header not consistant with data block\n"); 1547 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1548 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1549 fprintf(stderr, "Please recheck read sequence \n"); 1550 if( Strict_Error ) { 1551 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1552 endTimer(&timer_end); 1553 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1554 return; 1555 } 1556 } 1557 1558 if ( LastHeaderNotFound ) { 1559 endTimer(&timer_end); 1560 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1561 return; 1562 } 1563 fileObject = fileArray[ filePtr ]; 1564 Wrong_Endian = byte_order[ filePtr ]; 1565 LastHeaderKey.erase(filePtr); 1566 readDataBlock(fileObject,valueArray,*nItems,datatype,iotype); 1567 1568 //return; 1569 } 1570 else { 1571 // printf("read data block\n"); 1572 MPI_Status read_data_status; 1573 size_t type_size = typeSize( datatype ); 1574 int nUnits = *nItems; 1575 isBinary( iotype ); 1576 1577 // read datablock then 1578 //MR CHANGE 1579 // if ( cscompare ( datatype, "double")) 1580 char* ts2 = StringStripper( datatype ); 1581 if ( cscompare ( "double" , ts2)) 1582 //MR CHANGE END 1583 { 1584 1585 phioTime t0,t1; 1586 phastaio_time(&t0); 1587 MPI_File_read_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1588 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1589 valueArray, 1590 nUnits, 1591 MPI_DOUBLE ); 1592 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1593 valueArray, 1594 &read_data_status ); 1595 data_size=8*nUnits; 1596 phastaio_time(&t1); 1597 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1598 phastaio_addReadTime(elapsed); 1599 phastaio_addReadBytes(nUnits*sizeof(double)); 1600 } 1601 //MR CHANGE 1602 // else if ( cscompare ( datatype, "integer")) 1603 else if ( cscompare ( "integer" , ts2)) 1604 //MR CHANGE END 1605 { 1606 phioTime t0,t1; 1607 phastaio_time(&t0); 1608 MPI_File_read_at_all_begin(PhastaIOActiveFiles[i]->file_handle, 1609 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1610 valueArray, 1611 nUnits, 1612 MPI_INT ); 1613 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1614 valueArray, 1615 &read_data_status ); 1616 data_size=4*nUnits; 1617 phastaio_time(&t1); 1618 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1619 phastaio_addReadTime(elapsed); 1620 phastaio_addReadBytes(nUnits*sizeof(int)); 1621 } 1622 else 1623 { 1624 *fileDescriptor = DATA_TYPE_ILLEGAL; 1625 printf("readdatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 1626 endTimer(&timer_end); 1627 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1628 return; 1629 } 1630 free(ts2); 1631 1632 1633 // printf("%d Read finishe\n",PhastaIOActiveFiles[i]->myrank); 1634 1635 // Swap data byte order if endianess is different ... 1636 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) 1637 { 1638 SwapArrayByteOrder( valueArray, type_size, nUnits ); 1639 } 1640 } 1641 1642 endTimer(&timer_end); 1643 char extra_msg[1024]; 1644 memset(extra_msg, '\0', 1024); 1645 char* key = StringStripper(keyphrase); 1646 sprintf(extra_msg, " field is %s ", key); 1647 printPerf("readdatablock", timer_start, timer_end, data_size, 1, extra_msg); 1648 free(key); 1649 1650 } 1651 1652 void writeHeader( 1653 FILE* f, 1654 const char keyphrase[], 1655 const void* valueArray, 1656 const int nItems, 1657 const int ndataItems, 1658 const char datatype[], 1659 const char iotype[]) 1660 { 1661 isBinary( iotype ); 1662 1663 const int _newline = 1664 ( ndataItems > 0 ) ? sizeof( char ) : 0; 1665 int size_of_nextblock = 1666 ( binary_format ) ? typeSize(datatype) * ndataItems + _newline : ndataItems; 1667 1668 fprintf( f, "%s : < %d > ", keyphrase, size_of_nextblock ); 1669 for( int i = 0; i < nItems; i++ ) 1670 fprintf( f, "%d ", *((int*)((int*)valueArray+i))); 1671 fprintf( f, "\n"); 1672 } 1673 1674 void writeheader( 1675 const int* fileDescriptor, 1676 const char keyphrase[], 1677 const void* valueArray, 1678 const int* nItems, 1679 const int* ndataItems, 1680 const char datatype[], 1681 const char iotype[]) 1682 { 1683 1684 //if(irank == 0) printf("entering writeheader()\n"); 1685 1686 double timer_start, timer_end; 1687 startTimer(&timer_start); 1688 1689 int i = *fileDescriptor; 1690 checkFileDescriptor("writeheader",&i); 1691 1692 if ( PhastaIONextActiveIndex == 0 ) { 1693 int filePtr = *fileDescriptor - 1; 1694 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1695 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1696 fprintf(stderr,"openfile_ function has to be called before \n") ; 1697 fprintf(stderr,"acessing the file\n ") ; 1698 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1699 endTimer(&timer_end); 1700 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1701 return; 1702 } 1703 1704 LastHeaderKey[filePtr] = std::string(keyphrase); 1705 DataSize = *ndataItems; 1706 FILE* fileObject = fileArray[ filePtr ] ; 1707 header_type[ filePtr ] = typeSize( datatype ); 1708 writeHeader(fileObject,keyphrase,valueArray,*nItems, 1709 *ndataItems,datatype,iotype); 1710 } 1711 else { // else it's parallel I/O 1712 DataSize = *ndataItems; 1713 size_t type_size = typeSize( datatype ); 1714 isBinary( iotype ); 1715 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1716 1717 int string_length = strlen( keyphrase ); 1718 char* buffer = (char*) malloc ( string_length+1 ); 1719 1720 strcpy ( buffer, keyphrase); 1721 buffer[ string_length ] = '\0'; 1722 1723 char* st2 = strtok ( buffer, "@" ); 1724 st2 = strtok (NULL, "@"); 1725 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1726 1727 if ( char* p = strpbrk(buffer, "@") ) 1728 *p = '\0'; 1729 1730 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1731 sprintf(mpi_tag, "\n%s : %d\n", buffer, PhastaIOActiveFiles[i]->field_count); 1732 unsigned long offset_value; 1733 1734 int temp = *ndataItems; 1735 unsigned long number_of_items = (unsigned long)temp; 1736 MPI_Barrier(PhastaIOActiveFiles[i]->local_comm); 1737 1738 MPI_Scan( &number_of_items, 1739 &offset_value, 1740 1, 1741 MPI_LONG_LONG_INT, 1742 MPI_SUM, 1743 PhastaIOActiveFiles[i]->local_comm ); 1744 1745 offset_value = (offset_value - number_of_items) * type_size; 1746 1747 offset_value += PhastaIOActiveFiles[i]->local_myrank * 1748 DB_HEADER_SIZE + 1749 PhastaIOActiveFiles[i]->next_start_address; 1750 // This offset is the starting address of each datablock header... 1751 PhastaIOActiveFiles[i]->my_offset = offset_value; 1752 1753 // Write in my offset table ... 1754 PhastaIOActiveFiles[i]->my_offset_table[PhastaIOActiveFiles[i]->field_count][PhastaIOActiveFiles[i]->part_count] = 1755 PhastaIOActiveFiles[i]->my_offset; 1756 1757 // Update the next-start-address ... 1758 PhastaIOActiveFiles[i]->next_start_address = offset_value + 1759 number_of_items * type_size + 1760 DB_HEADER_SIZE; 1761 MPI_Bcast( &(PhastaIOActiveFiles[i]->next_start_address), 1762 1, 1763 MPI_LONG_LONG_INT, 1764 PhastaIOActiveFiles[i]->local_numprocs-1, 1765 PhastaIOActiveFiles[i]->local_comm ); 1766 1767 // Prepare datablock header ... 1768 int _newline = (*ndataItems>0)?sizeof(char):0; 1769 unsigned int size_of_nextblock = type_size * (*ndataItems) + _newline; 1770 1771 //char datablock_header[255]; 1772 //bzero((void*)datablock_header,255); 1773 char datablock_header[DB_HEADER_SIZE]; 1774 bzero((void*)datablock_header,DB_HEADER_SIZE); 1775 1776 PhastaIOActiveFiles[i]->GPid = PhastaIOActiveFiles[i]->nppp*PhastaIOActiveFiles[i]->myrank+PhastaIOActiveFiles[i]->part_count; 1777 sprintf( datablock_header, 1778 "\n%s : < %u >", 1779 keyphrase, 1780 size_of_nextblock ); 1781 1782 for ( int j = 0; j < *nItems; j++ ) 1783 { 1784 sprintf( datablock_header, 1785 "%s %d ", 1786 datablock_header, 1787 *((int*)((int*)valueArray+j))); 1788 } 1789 sprintf( datablock_header, 1790 "%s\n ", 1791 datablock_header ); 1792 1793 // Write datablock header ... 1794 //MR CHANGE 1795 // if ( cscompare(datatype,"double") ) 1796 char* ts1 = StringStripper( datatype ); 1797 if ( cscompare("double",ts1) ) 1798 //MR CHANGE END 1799 { 1800 free ( PhastaIOActiveFiles[i]->double_chunk ); 1801 PhastaIOActiveFiles[i]->double_chunk = ( double * )malloc( (sizeof( double )*number_of_items+ DB_HEADER_SIZE)); 1802 1803 double * aa = ( double * )datablock_header; 1804 memcpy(PhastaIOActiveFiles[i]->double_chunk, aa, DB_HEADER_SIZE); 1805 } 1806 //MR CHANGE 1807 // if ( cscompare(datatype,"integer") ) 1808 else if ( cscompare("integer",ts1) ) 1809 //MR CHANGE END 1810 { 1811 free ( PhastaIOActiveFiles[i]->int_chunk ); 1812 PhastaIOActiveFiles[i]->int_chunk = ( int * )malloc( (sizeof( int )*number_of_items+ DB_HEADER_SIZE)); 1813 1814 int * aa = ( int * )datablock_header; 1815 memcpy(PhastaIOActiveFiles[i]->int_chunk, aa, DB_HEADER_SIZE); 1816 } 1817 else { 1818 // *fileDescriptor = DATA_TYPE_ILLEGAL; 1819 printf("writeheader - DATA_TYPE_ILLEGAL - %s\n",datatype); 1820 endTimer(&timer_end); 1821 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1822 return; 1823 } 1824 free(ts1); 1825 1826 PhastaIOActiveFiles[i]->part_count++; 1827 if ( PhastaIOActiveFiles[i]->part_count == PhastaIOActiveFiles[i]->nppp ) { 1828 //A new field will be written 1829 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1830 memcpy( PhastaIOActiveFiles[i]->master_header + 1831 PhastaIOActiveFiles[i]->field_count * 1832 MAX_FIELDS_NAME_LENGTH + 1833 MAX_FIELDS_NAME_LENGTH * 2, 1834 mpi_tag, 1835 MAX_FIELDS_NAME_LENGTH-1); 1836 } 1837 PhastaIOActiveFiles[i]->field_count++; 1838 PhastaIOActiveFiles[i]->part_count=0; 1839 } 1840 free(buffer); 1841 } 1842 1843 endTimer(&timer_end); 1844 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1845 } 1846 1847 void writeDataBlock( 1848 FILE* f, 1849 const void* valueArray, 1850 const int nItems, 1851 const char datatype[], 1852 const char iotype[] ) 1853 { 1854 isBinary( iotype ); 1855 size_t type_size = typeSize( datatype ); 1856 phioTime t0,t1; 1857 phastaio_time(&t0); 1858 if ( binary_format ) { 1859 fwrite( valueArray, type_size, nItems, f ); 1860 fprintf( f,"\n"); 1861 } else { 1862 char* ts1 = StringStripper( datatype ); 1863 if ( cscompare( "integer", ts1 ) ) { 1864 const int* vals = (int*) valueArray; 1865 for( int n=0; n < nItems ; n++ ) 1866 fprintf(f,"%d\n",vals[n]); 1867 } else if ( cscompare( "double", ts1 ) ) { 1868 const double* vals = (double*) valueArray; 1869 for( int n=0; n < nItems ; n++ ) 1870 fprintf(f,"%f\n",vals[n]); 1871 } 1872 free (ts1); 1873 } 1874 phastaio_time(&t1); 1875 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1876 phastaio_addWriteTime(elapsed); 1877 phastaio_addWriteBytes(nItems*type_size); 1878 } 1879 1880 void writedatablock( 1881 const int* fileDescriptor, 1882 const char keyphrase[], 1883 const void* valueArray, 1884 const int* nItems, 1885 const char datatype[], 1886 const char iotype[] ) 1887 { 1888 //if(irank == 0) printf("entering writedatablock()\n"); 1889 1890 unsigned long data_size = 0; 1891 double timer_start, timer_end; 1892 startTimer(&timer_start); 1893 1894 int i = *fileDescriptor; 1895 checkFileDescriptor("writedatablock",&i); 1896 1897 if ( PhastaIONextActiveIndex == 0 ) { 1898 int filePtr = *fileDescriptor - 1; 1899 1900 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1901 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1902 fprintf(stderr,"openfile_ function has to be called before \n") ; 1903 fprintf(stderr,"acessing the file\n ") ; 1904 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1905 endTimer(&timer_end); 1906 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1907 return; 1908 } 1909 // since we require that a consistant header always preceed the data block 1910 // let us check to see that it is actually the case. 1911 1912 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1913 fprintf(stderr, "Header not consistant with data block\n"); 1914 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1915 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1916 fprintf(stderr, "Please recheck write sequence \n"); 1917 if( Strict_Error ) { 1918 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1919 endTimer(&timer_end); 1920 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1921 return; 1922 } 1923 } 1924 1925 FILE* fileObject = fileArray[ filePtr ] ; 1926 size_t type_size=typeSize( datatype ); 1927 isBinary( iotype ); 1928 1929 LastHeaderKey.erase(filePtr); 1930 1931 if ( header_type[filePtr] != (int)type_size ) { 1932 fprintf(stderr,"header and datablock differ on typeof data in the block for\n"); 1933 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1934 if( Strict_Error ) { 1935 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1936 endTimer(&timer_end); 1937 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1938 return; 1939 } 1940 } 1941 1942 int nUnits = *nItems; 1943 1944 if ( nUnits != DataSize ) { 1945 fprintf(stderr,"header and datablock differ on number of data items for\n"); 1946 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1947 if( Strict_Error ) { 1948 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1949 endTimer(&timer_end); 1950 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1951 return; 1952 } 1953 } 1954 writeDataBlock(fileObject,valueArray,*nItems,datatype,iotype); 1955 } 1956 else { // syncIO case 1957 MPI_Status write_data_status; 1958 isBinary( iotype ); 1959 int nUnits = *nItems; 1960 1961 //MR CHANGE 1962 // if ( cscompare(datatype,"double") ) 1963 char* ts1 = StringStripper( datatype ); 1964 if ( cscompare("double",ts1) ) 1965 //MR CHANGE END 1966 { 1967 memcpy((PhastaIOActiveFiles[i]->double_chunk+DB_HEADER_SIZE/sizeof(double)), valueArray, nUnits*sizeof(double)); 1968 phioTime t0,t1; 1969 phastaio_time(&t0); 1970 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1971 PhastaIOActiveFiles[i]->my_offset, 1972 PhastaIOActiveFiles[i]->double_chunk, 1973 //BLOCK_SIZE/sizeof(double), 1974 nUnits+DB_HEADER_SIZE/sizeof(double), 1975 MPI_DOUBLE ); 1976 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1977 PhastaIOActiveFiles[i]->double_chunk, 1978 &write_data_status ); 1979 data_size=8*nUnits; 1980 phastaio_time(&t1); 1981 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1982 phastaio_addWriteTime(elapsed); 1983 phastaio_addWriteBytes((nUnits*sizeof(double))+DB_HEADER_SIZE); 1984 } 1985 //MR CHANGE 1986 // else if ( cscompare ( datatype, "integer")) 1987 else if ( cscompare("integer",ts1) ) 1988 //MR CHANGE END 1989 { 1990 memcpy((PhastaIOActiveFiles[i]->int_chunk+DB_HEADER_SIZE/sizeof(int)), valueArray, nUnits*sizeof(int)); 1991 phioTime t0,t1; 1992 phastaio_time(&t0); 1993 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1994 PhastaIOActiveFiles[i]->my_offset, 1995 PhastaIOActiveFiles[i]->int_chunk, 1996 nUnits+DB_HEADER_SIZE/sizeof(int), 1997 MPI_INT ); 1998 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1999 PhastaIOActiveFiles[i]->int_chunk, 2000 &write_data_status ); 2001 data_size=4*nUnits; 2002 phastaio_time(&t1); 2003 const size_t elapsed = phastaio_time_diff(&t0,&t1); 2004 phastaio_addWriteTime(elapsed); 2005 phastaio_addWriteBytes((nUnits*sizeof(int))+DB_HEADER_SIZE); 2006 } 2007 else { 2008 printf("Error: writedatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 2009 endTimer(&timer_end); 2010 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 2011 return; 2012 } 2013 free(ts1); 2014 } 2015 2016 endTimer(&timer_end); 2017 char extra_msg[1024]; 2018 memset(extra_msg, '\0', 1024); 2019 char* key = StringStripper(keyphrase); 2020 sprintf(extra_msg, " field is %s ", key); 2021 printPerf("writedatablock", timer_start, timer_end, data_size, 1, extra_msg); 2022 free(key); 2023 2024 } 2025 2026 void SwapArrayByteOrder( void* array, int nbytes, int nItems ) 2027 { 2028 /* This swaps the byte order for the array of nItems each 2029 of size nbytes , This will be called only locally */ 2030 int i,j; 2031 unsigned char* ucDst = (unsigned char*)array; 2032 for(i=0; i < nItems; i++) { 2033 for(j=0; j < (nbytes/2); j++) 2034 std::swap( ucDst[j] , ucDst[(nbytes - 1) - j] ); 2035 ucDst += nbytes; 2036 } 2037 } 2038 2039 void writestring( int* fileDescriptor, const char inString[] ) 2040 { 2041 int filePtr = *fileDescriptor - 1; 2042 FILE* fileObject = fileArray[filePtr] ; 2043 fprintf(fileObject,"%s",inString ); 2044 return; 2045 } 2046 2047 void Gather_Headers( int* fileDescriptor, vector< string >& headers ) 2048 { 2049 FILE* fileObject; 2050 char Line[1024]; 2051 2052 fileObject = fileArray[ (*fileDescriptor)-1 ]; 2053 2054 while( !feof(fileObject) ) { 2055 fgets( Line, 1024, fileObject); 2056 if ( Line[0] == '#' ) { 2057 headers.push_back( Line ); 2058 } else { 2059 break; 2060 } 2061 } 2062 rewind( fileObject ); 2063 clearerr( fileObject ); 2064 } 2065 2066 void isWrong( void ) { 2067 (Wrong_Endian) ? fprintf(stdout,"YES\n") : fprintf(stdout,"NO\n"); 2068 } 2069 2070 void togglestrictmode( void ) { Strict_Error = !Strict_Error; } 2071 2072 int isLittleEndian( void ) 2073 { 2074 // this function returns a 1 if the current running architecture is 2075 // LittleEndian Byte Ordered, else it returns a zero 2076 union { 2077 long a; 2078 char c[sizeof( long )]; 2079 } endianUnion; 2080 endianUnion.a = 1 ; 2081 if ( endianUnion.c[sizeof(long)-1] != 1 ) return 1 ; 2082 else return 0; 2083 } 2084 2085 namespace PHASTA { 2086 const char* const PhastaIO_traits<int>::type_string = "integer"; 2087 const char* const PhastaIO_traits<double>::type_string = "double"; 2088 } 2089