1 /* 2 * 3 * This is the SyncIO library that uses MPI-IO collective functions to 4 * implement a flexible I/O checkpoint solution for a large number of 5 * processors. 6 * 7 * Previous developer: Ning Liu (liun2@cs.rpi.edu) 8 * Jing Fu (fuj@cs.rpi.edu) 9 * Current developers: Michel Rasquin (Michel.Rasquin@colorado.edu), 10 * Ben Matthews (benjamin.a.matthews@colorado.edu) 11 * 12 */ 13 14 #include <map> 15 #include <vector> 16 #include <string> 17 #include <string.h> 18 #include <ctype.h> 19 #include <stdlib.h> 20 #include <stdio.h> 21 #include <math.h> 22 #include <sstream> 23 #include "phastaIO.h" 24 #include "mpi.h" 25 #include "phiotmrc.h" 26 27 #define VERSION_INFO_HEADER_SIZE 8192 28 #define DB_HEADER_SIZE 1024 29 #define ONE_MEGABYTE 1048576 30 #define TWO_MEGABYTE 2097152 31 #define ENDIAN_TEST_NUMBER 12180 // Troy's Zip Code!! 32 #define MAX_PHASTA_FILES 64 33 #define MAX_PHASTA_FILE_NAME_LENGTH 1024 34 #define MAX_FIELDS_NUMBER ((VERSION_INFO_HEADER_SIZE/MAX_FIELDS_NAME_LENGTH)-4) // The meta data include - MPI_IO_Tag, nFields, nFields*names of the fields, nppf 35 // -3 for MPI_IO_Tag, nFields and nppf, -4 for extra security (former nFiles) 36 #define MAX_FIELDS_NAME_LENGTH 128 37 #define DefaultMHSize (4*ONE_MEGABYTE) 38 //#define DefaultMHSize (8350) //For test 39 #define LATEST_WRITE_VERSION 1 40 #define inv1024sq 953.674316406e-9 // = 1/1024/1024 41 int MasterHeaderSize = -1; 42 43 bool PRINT_PERF = false; // default print no perf results 44 int irank = -1; // global rank, should never be manually manipulated 45 int mysize = -1; 46 47 // Static variables are bad but used here to store the subcommunicator and associated variables 48 // Prevent MPI_Comm_split to be called more than once, especially on BGQ with the V1R2M1 driver (leak detected in MPI_Comm_split - IBM working on it) 49 static int s_assign_local_comm = 0; 50 static MPI_Comm s_local_comm; 51 static int s_local_size = -1; 52 static int s_local_rank = -1; 53 54 //unsigned long long pool_align = 8; 55 //unsigned long long mem_address; 56 57 // the following defines are for debug printf 58 #define PHASTAIO_PREFIX "phastaIO debug: " 59 #define PHASTAIO_DEBUG 0 //default to not print any debugging info 60 61 #if PHASTAIO_DEBUG 62 #define phprintf( s, arg...) printf(PHASTAIO_PREFIX s "\n", ##arg) 63 #define phprintf_0( s, arg...) do{ \ 64 MPI_Comm_rank(MPI_COMM_WORLD, &irank); \ 65 if(irank == 0){ \ 66 printf(PHASTAIO_PREFIX "irank=0: " s "\n", ##arg); \ 67 } \ 68 } while(0) 69 #else 70 #define phprintf( s, arg...) 71 #define phprintf_0( s, arg...) 72 #endif 73 74 enum PhastaIO_Errors 75 { 76 MAX_PHASTA_FILES_EXCEEDED = -1, 77 UNABLE_TO_OPEN_FILE = -2, 78 NOT_A_MPI_FILE = -3, 79 GPID_EXCEEDED = -4, 80 DATA_TYPE_ILLEGAL = -5, 81 }; 82 83 using namespace std; 84 85 namespace{ 86 87 typedef map<int, char*> mic; 88 mic LastHeaderKey; 89 vector< FILE* > fileArray; 90 vector< bool > byte_order; 91 vector< int > header_type; 92 int DataSize=0; 93 bool LastHeaderNotFound = false; 94 bool Wrong_Endian = false ; 95 bool Strict_Error = false ; 96 bool binary_format = true; 97 98 /***********************************************************************/ 99 /***************** NEW PHASTA IO CODE STARTS HERE **********************/ 100 /***********************************************************************/ 101 102 typedef struct 103 { 104 char filename[MAX_PHASTA_FILE_NAME_LENGTH]; /* defafults to 1024 */ 105 unsigned long long my_offset; 106 unsigned long long next_start_address; 107 unsigned long long **my_offset_table; 108 unsigned long long **my_read_table; 109 110 double * double_chunk; 111 double * read_double_chunk; 112 113 int field_count; 114 int part_count; 115 int read_field_count; 116 int read_part_count; 117 int GPid; 118 int start_id; 119 120 int mhsize; 121 122 int myrank; 123 int numprocs; 124 int local_myrank; 125 int local_numprocs; 126 127 int nppp; 128 int nPPF; 129 int nFiles; 130 int nFields; 131 132 int * int_chunk; 133 int * read_int_chunk; 134 135 int Wrong_Endian; /* default to false */ 136 char * master_header; 137 MPI_File file_handle; 138 MPI_Comm local_comm; 139 } phastaio_file_t; 140 141 typedef struct 142 { 143 //unsigned long long my_offset; 144 //unsigned long long **offset_table; 145 //int fileID; 146 int nppf, nfields; 147 //int GPid; 148 //int read_field_count; 149 char * masterHeader; 150 }serial_file; 151 152 serial_file *SerialFile; 153 phastaio_file_t *PhastaIOActiveFiles[MAX_PHASTA_FILES]; 154 int PhastaIONextActiveIndex = 0; /* indicates next index to allocate */ 155 156 // the caller has the responsibility to delete the returned string 157 // TODO: StringStipper("nbc value? ") returns NULL? 158 char* 159 StringStripper( const char istring[] ) { 160 161 int length = strlen( istring ); 162 163 //char* dest = new char [ length + 1 ]; 164 char* dest = (char *)malloc( length + 1 ); 165 166 //char * dest; 167 //dest = (char *)malloc( length + 1 + pool_align ); 168 //if (dest & 0x0F != 0) printf("not aligned!!!!\n"); 169 //mem_address = (long long )dest; 170 //if( mem_address & (pool_align -1) ) 171 // dest += pool_align - (mem_address & (pool_align -1)); 172 173 strcpy( dest, istring ); 174 dest[ length ] = '\0'; 175 176 if ( char* p = strpbrk( dest, " ") ) 177 *p = '\0'; 178 179 return dest; 180 } 181 182 inline int 183 cscompare( const char teststring[], 184 const char targetstring[] ) { 185 186 char* s1 = const_cast<char*>(teststring); 187 char* s2 = const_cast<char*>(targetstring); 188 189 while( *s1 == ' ') s1++; 190 while( *s2 == ' ') s2++; 191 while( ( *s1 ) 192 && ( *s2 ) 193 && ( *s2 != '?') 194 && ( tolower( *s1 )==tolower( *s2 ) ) ) { 195 s1++; 196 s2++; 197 while( *s1 == ' ') s1++; 198 while( *s2 == ' ') s2++; 199 } 200 if ( !( *s1 ) || ( *s1 == '?') ) return 1; 201 else return 0; 202 } 203 204 inline void 205 isBinary( const char iotype[] ) { 206 207 char* fname = StringStripper( iotype ); 208 if ( cscompare( fname, "binary" ) ) binary_format = true; 209 else binary_format = false; 210 free (fname); 211 212 } 213 214 inline size_t 215 typeSize( const char typestring[] ) { 216 217 char* ts1 = StringStripper( typestring ); 218 219 if ( cscompare( "integer", ts1 ) ) { 220 free (ts1); 221 return sizeof(int); 222 } else if ( cscompare( "double", ts1 ) ) { 223 free (ts1); 224 return sizeof( double ); 225 } else { 226 free (ts1); 227 fprintf(stderr,"unknown type : %s\n",ts1); 228 return 0; 229 } 230 } 231 232 int 233 readHeader( FILE* fileObject, 234 const char phrase[], 235 int* params, 236 int expect ) { 237 238 char* text_header; 239 char* token; 240 char Line[1024]; 241 char junk; 242 bool FOUND = false ; 243 int real_length; 244 int skip_size, integer_value; 245 int rewind_count=0; 246 247 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 248 rewind( fileObject ); 249 clearerr( fileObject ); 250 rewind_count++; 251 fgets( Line, 1024, fileObject ); 252 } 253 254 while( !FOUND && ( rewind_count < 2 ) ) { 255 if ( ( Line[0] != '\n' ) && ( real_length = strcspn( Line, "#" )) ){ 256 //text_header = new char [ real_length + 1 ]; 257 text_header = (char *)malloc( real_length + 1 ); 258 //text_header = (char *)malloc( real_length + 1 + pool_align ); 259 //mem_address = (long long )text_header; 260 //if( mem_address & (pool_align -1) ) 261 // text_header += pool_align - (mem_address & (pool_align -1)); 262 263 strncpy( text_header, Line, real_length ); 264 text_header[ real_length ] =static_cast<char>(NULL); 265 token = strtok ( text_header, ":" ); 266 //if( cscompare( phrase , token ) ) { 267 // Double comparison required because different fields can still start 268 // with the same name for mixed meshes (nbc code, nbc values, etc). 269 // Would be easy to fix cscompare instead but would it break sth else? 270 if( cscompare( phrase , token ) && cscompare( token , phrase ) ) { 271 FOUND = true ; 272 token = strtok( NULL, " ,;<>" ); 273 skip_size = atoi( token ); 274 int i; 275 for( i=0; i < expect && ( token = strtok( NULL," ,;<>") ); i++) { 276 params[i] = atoi( token ); 277 } 278 if ( i < expect ) { 279 fprintf(stderr,"Aloha Expected # of ints not found for: %s\n",phrase ); 280 } 281 } else if ( cscompare(token,"byteorder magic number") ) { 282 if ( binary_format ) { 283 fread((void*)&integer_value,sizeof(int),1,fileObject); 284 fread( &junk, sizeof(char), 1 , fileObject ); 285 if ( 362436 != integer_value ) Wrong_Endian = true; 286 } else{ 287 fscanf(fileObject, "%d\n", &integer_value ); 288 } 289 } else { 290 /* some other header, so just skip over */ 291 token = strtok( NULL, " ,;<>" ); 292 skip_size = atoi( token ); 293 if ( binary_format) 294 fseek( fileObject, skip_size, SEEK_CUR ); 295 else 296 for( int gama=0; gama < skip_size; gama++ ) 297 fgets( Line, 1024, fileObject ); 298 } 299 free (text_header); 300 } // end of if before while loop 301 302 if ( !FOUND ) 303 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 304 rewind( fileObject ); 305 clearerr( fileObject ); 306 rewind_count++; 307 fgets( Line, 1024, fileObject ); 308 } 309 } 310 311 if ( !FOUND ) { 312 //fprintf(stderr, "Error: Could not find: %s\n", phrase); 313 if(irank == 0) printf("WARNING: Could not find: %s\n", phrase); 314 return 1; 315 } 316 return 0; 317 } 318 319 } // end unnamed namespace 320 321 322 // begin of publicly visible functions 323 324 /** 325 * This function takes a long long pointer and assign (start) phiotmrc value to it 326 */ 327 //void startTimer(unsigned long long* start) { 328 void startTimer(double* start) { 329 330 if( !PRINT_PERF ) return; 331 332 MPI_Barrier(MPI_COMM_WORLD); 333 *start = phiotmrc(); 334 } 335 336 /** 337 * This function takes a long long pointer and assign (end) phiotmrc value to it 338 */ 339 void endTimer(double* end) { 340 341 if( !PRINT_PERF ) return; 342 343 *end = phiotmrc(); 344 MPI_Barrier(MPI_COMM_WORLD); 345 } 346 347 /** 348 * choose to print some performance results (or not) according to 349 * the PRINT_PERF macro 350 */ 351 void printPerf( 352 const char* func_name, 353 double start, 354 double end, 355 unsigned long long datasize, 356 int printdatainfo, 357 const char* extra_msg) { 358 359 if( !PRINT_PERF ) return; 360 361 unsigned long long data_size = datasize; 362 363 double time = end - start; 364 365 unsigned long long isizemin,isizemax,isizetot; 366 double sizemin,sizemax,sizeavg,sizetot,rate; 367 double tmin, tmax, tavg, ttot; 368 369 MPI_Allreduce(&time, &tmin,1, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD); 370 MPI_Allreduce(&time, &tmax,1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); 371 MPI_Allreduce(&time, &ttot,1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); 372 tavg = ttot/mysize; 373 374 if(irank == 0) { 375 if ( PhastaIONextActiveIndex == 0 ) printf("** 1PFPP "); 376 else printf("** syncIO "); 377 printf("%s(): Tmax = %f sec, Tmin = %f sec, Tavg = %f sec", func_name, tmax, tmin, tavg); 378 } 379 380 if(printdatainfo == 1) { // if printdatainfo ==1, compute I/O rate and block size 381 MPI_Allreduce(&data_size,&isizemin,1,MPI_LONG_LONG_INT,MPI_MIN,MPI_COMM_WORLD); 382 MPI_Allreduce(&data_size,&isizemax,1,MPI_LONG_LONG_INT,MPI_MAX,MPI_COMM_WORLD); 383 MPI_Allreduce(&data_size,&isizetot,1,MPI_LONG_LONG_INT,MPI_SUM,MPI_COMM_WORLD); 384 385 sizemin=(double)(isizemin*inv1024sq); 386 sizemax=(double)(isizemax*inv1024sq); 387 sizetot=(double)(isizetot*inv1024sq); 388 sizeavg=(double)(1.0*sizetot/mysize); 389 rate=(double)(1.0*sizetot/tmax); 390 391 if( irank == 0) { 392 printf(", Rate = %f MB/s [%s] \n \t\t\t block size: Min= %f MB; Max= %f MB; Avg= %f MB; Tot= %f MB\n", rate, extra_msg, sizemin,sizemax,sizeavg,sizetot); 393 } 394 } 395 else { 396 if(irank == 0) { 397 printf(" \n"); 398 //printf(" (%s) \n", extra_msg); 399 } 400 } 401 } 402 403 /** 404 * This function is normally called at the beginning of a read operation, before 405 * init function. 406 * This function (uses rank 0) reads out nfields, nppf, master header size, 407 * endianess and allocates for masterHeader string. 408 * These values are essential for following read operations. Rank 0 will bcast 409 * these values to other ranks in the commm world 410 * 411 * If the file set is of old POSIX format, it would throw error and exit 412 */ 413 void queryphmpiio(const char filename[],int *nfields, int *nppf) 414 { 415 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 416 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 417 418 if(irank == 0) { 419 FILE * fileHandle; 420 char* fname = StringStripper( filename ); 421 422 fileHandle = fopen (fname,"rb"); 423 if (fileHandle == NULL ) { 424 printf("\nError: File %s doesn't exist! Please check!\n",fname); 425 } 426 else { 427 SerialFile =(serial_file *)calloc( 1, sizeof( serial_file) ); 428 //SerialFile = (serial_file *)malloc( sizeof( serial_file ) + pool_align ); 429 //mem_address = (long long )SerialFile; 430 //if( mem_address & (pool_align -1) ) 431 // SerialFile += pool_align - (mem_address & (pool_align -1)); 432 433 //SerialFile->masterHeader = (char *)malloc(MasterHeaderSize); 434 //int meta_size_limit = 4*ONE_MEGABYTE; 435 //int meta_size_limit = (4+ MAX_FIELDS_NUMBER ) * MAX_FIELDS_NAME_LENGTH; 436 int meta_size_limit = VERSION_INFO_HEADER_SIZE; 437 SerialFile->masterHeader = (char *)malloc( meta_size_limit ); 438 //SerialFile->masterHeader = (char *)malloc( meta_size_limit + pool_align ); 439 //mem_address = (long long )SerialFile; 440 //if( mem_address & (pool_align -1) ) 441 // SerialFile->masterHeader += pool_align - (mem_address & (pool_align -1)); 442 443 fread(SerialFile->masterHeader, 1, meta_size_limit, fileHandle); 444 445 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 446 char version[MAX_FIELDS_NAME_LENGTH/4]; 447 int mhsize; 448 char * token; 449 int magic_number; 450 451 memcpy( read_out_tag, 452 SerialFile->masterHeader, 453 MAX_FIELDS_NAME_LENGTH-1 ); 454 455 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) { 456 // Test endianess ... 457 memcpy (&magic_number, 458 SerialFile->masterHeader + sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 459 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 460 461 if ( magic_number != ENDIAN_TEST_NUMBER ) { 462 printf("Endian is different!\n"); 463 // Will do swap later 464 } 465 466 // test version, old version, default masterheader size is 4M 467 // newer version masterheader size is read from first line 468 memcpy(version, 469 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/2, 470 MAX_FIELDS_NAME_LENGTH/4 - 1); //TODO: why -1? 471 472 if( cscompare ("version",version) ) { 473 // if there is "version" tag in the file, then it is newer format 474 // read master header size from here, otherwise use default 475 // Note: if version is "1", we know mhsize is at 3/4 place... 476 477 token = strtok(version, ":"); 478 token = strtok(NULL, " ,;<>" ); 479 int iversion = atoi(token); 480 481 if( iversion == 1) { 482 memcpy( &mhsize, 483 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/4*3 + sizeof("mhsize : ")-1, 484 sizeof(int)); 485 if ( magic_number != ENDIAN_TEST_NUMBER ) 486 SwapArrayByteOrder(&mhsize, sizeof(int), 1); 487 488 if( mhsize > DefaultMHSize ) { 489 //if actual headersize is larger than default, let's re-read 490 free(SerialFile->masterHeader); 491 SerialFile->masterHeader = (char *)malloc(mhsize); 492 fseek(fileHandle, 0, SEEK_SET); // reset the file stream position 493 fread(SerialFile->masterHeader,1,mhsize,fileHandle); 494 } 495 } 496 //TODO: check if this is a valid int?? 497 MasterHeaderSize = mhsize; 498 } 499 else { // else it's version 0's format w/o version tag, implicating MHSize=4M 500 MasterHeaderSize = DefaultMHSize; 501 } 502 503 memcpy( read_out_tag, 504 SerialFile->masterHeader+MAX_FIELDS_NAME_LENGTH+1, 505 MAX_FIELDS_NAME_LENGTH ); //TODO: why +1 506 507 // Read in # fields ... 508 token = strtok( read_out_tag, ":" ); 509 token = strtok( NULL," ,;<>" ); 510 *nfields = atoi( token ); 511 if ( *nfields > MAX_FIELDS_NUMBER) { 512 printf("Error queryphmpiio: nfields is larger than MAX_FIELDS_NUMBER!\n"); 513 } 514 SerialFile->nfields=*nfields; //TODO: sanity check of this int? 515 516 memcpy( read_out_tag, 517 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH * 2 518 + *nfields * MAX_FIELDS_NAME_LENGTH, 519 MAX_FIELDS_NAME_LENGTH); 520 521 token = strtok( read_out_tag, ":" ); 522 token = strtok( NULL," ,;<>" ); 523 *nppf = atoi( token ); 524 SerialFile->nppf=*nppf; //TODO: sanity check of int 525 } // end of if("MPI_IO_TAG") 526 else { 527 printf("Error queryphmpiio: The file you opened is not of syncIO new format, please check! read_out_tag = %s\n",read_out_tag); 528 exit(1); 529 } 530 fclose(fileHandle); 531 free(SerialFile->masterHeader); 532 free(SerialFile); 533 } //end of else 534 free(fname); 535 } 536 537 // Bcast value to every one 538 MPI_Bcast( nfields, 1, MPI_INT, 0, MPI_COMM_WORLD ); 539 MPI_Bcast( nppf, 1, MPI_INT, 0, MPI_COMM_WORLD ); 540 MPI_Bcast( &MasterHeaderSize, 1, MPI_INT, 0, MPI_COMM_WORLD ); 541 phprintf("Info queryphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 542 } 543 544 /** 545 * This function computes the right master header size (round to size of 2^n). 546 * This is only needed for file format version 1 in "write" mode. 547 */ 548 int computeMHSize(int nfields, int nppf, int version) { 549 int mhsize; 550 if(version == 1) { 551 //int meta_info_size = (2+nfields+1) * MAX_FIELDS_NAME_LENGTH; // 2 is MPI_IO_TAG and nFields, the others 1 is nppf 552 int meta_info_size = VERSION_INFO_HEADER_SIZE; 553 int actual_size = nfields * nppf * sizeof(long long) + meta_info_size; 554 //printf("actual_size = %d, offset table size = %d\n", actual_size, nfields * nppf * sizeof(long long)); 555 if (actual_size > DefaultMHSize) { 556 mhsize = (int) ceil( (double) actual_size/DefaultMHSize); // it's rounded to ceiling of this value 557 mhsize *= DefaultMHSize; 558 } 559 else { 560 mhsize = DefaultMHSize; 561 } 562 } 563 return mhsize; 564 } 565 566 /** 567 * Computes correct color of a rank according to number of files. 568 */ 569 extern "C" int computeColor( int myrank, int numprocs, int nfiles) { 570 int color = 571 (int)(myrank / (numprocs / nfiles)); 572 return color; 573 } 574 575 576 /** 577 * Check the file descriptor. 578 */ 579 void checkFileDescriptor(const char fctname[], 580 int* fileDescriptor ) { 581 if ( *fileDescriptor < 0 ) { 582 printf("Error: File descriptor = %d in %s\n",*fileDescriptor,fctname); 583 exit(1); 584 } 585 } 586 587 /** 588 * Initialize the file struct members and allocate space for file struct 589 * buffers. 590 * 591 * Note: this function is only called when we are using new format. Old POSIX 592 * format should skip this routine and call openfile() directly instead. 593 */ 594 int initphmpiio( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[]) 595 { 596 // we init irank again in case query not called (e.g. syncIO write case) 597 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 598 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 599 600 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 601 602 double timer_start, timer_end; 603 startTimer(&timer_start); 604 605 char* imode = StringStripper( mode ); 606 607 // Note: if it's read, we presume query was called prior to init and 608 // MasterHeaderSize is already set to correct value from parsing header 609 // otherwise it's write then it needs some computation to be set 610 if ( cscompare( "read", imode ) ) { 611 // do nothing 612 } 613 else if( cscompare( "write", imode ) ) { 614 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 615 } 616 else { 617 printf("Error initphmpiio: can't recognize the mode %s", imode); 618 exit(1); 619 } 620 free ( imode ); 621 622 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 623 624 int i, j; 625 626 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 627 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 628 endTimer(&timer_end); 629 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 630 return MAX_PHASTA_FILES_EXCEEDED; 631 } 632 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 633 // { 634 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 635 // { 636 // PhastaIOActiveFiles[i] = NULL; 637 // } 638 // } 639 640 641 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 642 //PhastaIOActiveFiles[PhastaIONextActiveIndex] = ( phastaio_file_t *)calloc( 1 + 1, sizeof( phastaio_file_t ) ); 643 //mem_address = (long long )PhastaIOActiveFiles[PhastaIONextActiveIndex]; 644 //if( mem_address & (pool_align -1) ) 645 // PhastaIOActiveFiles[PhastaIONextActiveIndex] += pool_align - (mem_address & (pool_align -1)); 646 647 i = PhastaIONextActiveIndex; 648 PhastaIONextActiveIndex++; 649 650 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 651 652 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 653 654 PhastaIOActiveFiles[i]->Wrong_Endian = false; 655 656 PhastaIOActiveFiles[i]->nFields = *nfields; 657 PhastaIOActiveFiles[i]->nPPF = *nppf; 658 PhastaIOActiveFiles[i]->nFiles = *nfiles; 659 MPI_Comm_rank(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->myrank)); 660 MPI_Comm_size(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->numprocs)); 661 662 663 if( *nfiles > 1 ) { // split the ranks according to each mpiio file 664 665 if ( s_assign_local_comm == 0) { // call mpi_comm_split for the first (and only) time 666 667 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Building subcommunicator\n"); 668 669 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 670 MPI_Comm_split(MPI_COMM_WORLD, 671 color, 672 PhastaIOActiveFiles[i]->myrank, 673 &(PhastaIOActiveFiles[i]->local_comm)); 674 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 675 &(PhastaIOActiveFiles[i]->local_numprocs)); 676 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 677 &(PhastaIOActiveFiles[i]->local_myrank)); 678 679 // back up now these variables so that we do not need to call comm_split again 680 s_local_comm = PhastaIOActiveFiles[i]->local_comm; 681 s_local_size = PhastaIOActiveFiles[i]->local_numprocs; 682 s_local_rank = PhastaIOActiveFiles[i]->local_myrank; 683 s_assign_local_comm = 1; 684 } 685 else { // recycle the subcommunicator 686 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Recycling subcommunicator\n"); 687 PhastaIOActiveFiles[i]->local_comm = s_local_comm; 688 PhastaIOActiveFiles[i]->local_numprocs = s_local_size; 689 PhastaIOActiveFiles[i]->local_myrank = s_local_rank; 690 } 691 } 692 else { // *nfiles == 1 here - no need to call mpi_comm_split here 693 694 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Bypassing subcommunicator\n"); 695 PhastaIOActiveFiles[i]->local_comm = MPI_COMM_WORLD; 696 PhastaIOActiveFiles[i]->local_numprocs = PhastaIOActiveFiles[i]->numprocs; 697 PhastaIOActiveFiles[i]->local_myrank = PhastaIOActiveFiles[i]->myrank; 698 699 } 700 701 PhastaIOActiveFiles[i]->nppp = 702 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 703 704 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 705 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 706 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 707 708 PhastaIOActiveFiles[i]->my_offset_table = 709 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 710 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof(unsigned long long *) ); 711 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table; 712 //if( mem_address & (pool_align -1) ) 713 // PhastaIOActiveFiles[i]->my_offset_table += pool_align - (mem_address & (pool_align -1)); 714 715 PhastaIOActiveFiles[i]->my_read_table = 716 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 717 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof( unsigned long long *) ); 718 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table; 719 //if( mem_address & (pool_align -1) ) 720 // PhastaIOActiveFiles[i]->my_read_table += pool_align - (mem_address & (pool_align -1)); 721 722 for (j=0; j<*nfields; j++) 723 { 724 PhastaIOActiveFiles[i]->my_offset_table[j] = 725 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 726 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp + pool_align, sizeof( unsigned long long) ); 727 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table[j]; 728 //if( mem_address & (pool_align -1) ) 729 // PhastaIOActiveFiles[i]->my_offset_table[j] += pool_align - (mem_address & (pool_align -1)); 730 731 PhastaIOActiveFiles[i]->my_read_table[j] = 732 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 733 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) + pool_align ); 734 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table[j]; 735 //if( mem_address & (pool_align -1) ) 736 // PhastaIOActiveFiles[i]->my_read_table[j] += pool_align - (mem_address & (pool_align -1)); 737 } 738 *filehandle = i; 739 740 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 741 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 742 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 743 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 744 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 745 746 /* 747 PhastaIOActiveFiles[i]->master_header = 748 ( char * ) calloc( MasterHeaderSize + pool_align, sizeof( char ) ); 749 mem_address = (long long )PhastaIOActiveFiles[i]->master_header; 750 if( mem_address & (pool_align -1) ) 751 PhastaIOActiveFiles[i]->master_header += pool_align - (mem_address & (pool_align -1)); 752 753 PhastaIOActiveFiles[i]->double_chunk = 754 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 755 mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 756 if( mem_address & (pool_align -1) ) 757 PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 758 759 PhastaIOActiveFiles[i]->int_chunk = 760 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 761 mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 762 if( mem_address & (pool_align -1) ) 763 PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & (pool_align -1)); 764 765 PhastaIOActiveFiles[i]->read_double_chunk = 766 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 767 mem_address = (long long )PhastaIOActiveFiles[i]->read_double_chunk; 768 if( mem_address & (pool_align -1) ) 769 PhastaIOActiveFiles[i]->read_double_chunk += pool_align - (mem_address & (pool_align -1)); 770 771 PhastaIOActiveFiles[i]->read_int_chunk = 772 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 773 mem_address = (long long )PhastaIOActiveFiles[i]->read_int_chunk; 774 if( mem_address & (pool_align -1) ) 775 PhastaIOActiveFiles[i]->read_int_chunk += pool_align - (mem_address & (pool_align -1)); 776 */ 777 778 // Time monitoring 779 endTimer(&timer_end); 780 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 781 782 phprintf_0("Info initphmpiio: quiting function"); 783 784 return i; 785 } 786 787 /** 788 * Destruct the file struct and free buffers allocated in init function. 789 */ 790 void finalizephmpiio( int *fileDescriptor ) 791 { 792 double timer_start, timer_end; 793 startTimer(&timer_start); 794 795 int i, j; 796 i = *fileDescriptor; 797 //PhastaIONextActiveIndex--; 798 799 /* //free the offset table for this phasta file */ 800 //for(j=0; j<MAX_FIELDS_NUMBER; j++) //Danger: undefined behavior for my_*_table.[j] not allocated or not initialized to NULL 801 for(j=0; j<PhastaIOActiveFiles[i]->nFields; j++) 802 { 803 free( PhastaIOActiveFiles[i]->my_offset_table[j]); 804 free( PhastaIOActiveFiles[i]->my_read_table[j]); 805 } 806 free ( PhastaIOActiveFiles[i]->my_offset_table ); 807 free ( PhastaIOActiveFiles[i]->my_read_table ); 808 free ( PhastaIOActiveFiles[i]->master_header ); 809 free ( PhastaIOActiveFiles[i]->double_chunk ); 810 free ( PhastaIOActiveFiles[i]->int_chunk ); 811 free ( PhastaIOActiveFiles[i]->read_double_chunk ); 812 free ( PhastaIOActiveFiles[i]->read_int_chunk ); 813 814 free( PhastaIOActiveFiles[i]); 815 816 endTimer(&timer_end); 817 printPerf("finalizempiio", timer_start, timer_end, 0, 0, ""); 818 819 PhastaIONextActiveIndex--; 820 } 821 822 823 /** 824 * Special init for M2N in order to create a subcommunicator for the reduced solution (requires PRINT_PERF to be false for now) 825 * Initialize the file struct members and allocate space for file struct buffers. 826 * 827 * Note: this function is only called when we are using new format. Old POSIX 828 * format should skip this routine and call openfile() directly instead. 829 */ 830 int initphmpiiosub( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[],MPI_Comm my_local_comm) 831 { 832 // we init irank again in case query not called (e.g. syncIO write case) 833 834 MPI_Comm_rank(my_local_comm, &irank); 835 MPI_Comm_size(my_local_comm, &mysize); 836 837 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 838 839 double timer_start, timer_end; 840 startTimer(&timer_start); 841 842 char* imode = StringStripper( mode ); 843 844 // Note: if it's read, we presume query was called prior to init and 845 // MasterHeaderSize is already set to correct value from parsing header 846 // otherwise it's write then it needs some computation to be set 847 if ( cscompare( "read", imode ) ) { 848 // do nothing 849 } 850 else if( cscompare( "write", imode ) ) { 851 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 852 } 853 else { 854 printf("Error initphmpiio: can't recognize the mode %s", imode); 855 exit(1); 856 } 857 free ( imode ); 858 859 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 860 861 int i, j; 862 863 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 864 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 865 endTimer(&timer_end); 866 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 867 return MAX_PHASTA_FILES_EXCEEDED; 868 } 869 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 870 // { 871 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 872 // { 873 // PhastaIOActiveFiles[i] = NULL; 874 // } 875 // } 876 877 878 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 879 //PhastaIOActiveFiles[PhastaIONextActiveIndex] = ( phastaio_file_t *)calloc( 1 + 1, sizeof( phastaio_file_t ) ); 880 //mem_address = (long long )PhastaIOActiveFiles[PhastaIONextActiveIndex]; 881 //if( mem_address & (pool_align -1) ) 882 // PhastaIOActiveFiles[PhastaIONextActiveIndex] += pool_align - (mem_address & (pool_align -1)); 883 884 i = PhastaIONextActiveIndex; 885 PhastaIONextActiveIndex++; 886 887 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 888 889 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 890 891 PhastaIOActiveFiles[i]->Wrong_Endian = false; 892 893 PhastaIOActiveFiles[i]->nFields = *nfields; 894 PhastaIOActiveFiles[i]->nPPF = *nppf; 895 PhastaIOActiveFiles[i]->nFiles = *nfiles; 896 MPI_Comm_rank(my_local_comm, &(PhastaIOActiveFiles[i]->myrank)); 897 MPI_Comm_size(my_local_comm, &(PhastaIOActiveFiles[i]->numprocs)); 898 899 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 900 MPI_Comm_split(my_local_comm, 901 color, 902 PhastaIOActiveFiles[i]->myrank, 903 &(PhastaIOActiveFiles[i]->local_comm)); 904 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 905 &(PhastaIOActiveFiles[i]->local_numprocs)); 906 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 907 &(PhastaIOActiveFiles[i]->local_myrank)); 908 PhastaIOActiveFiles[i]->nppp = 909 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 910 911 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 912 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 913 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 914 915 PhastaIOActiveFiles[i]->my_offset_table = 916 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 917 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof(unsigned long long *) ); 918 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table; 919 //if( mem_address & (pool_align -1) ) 920 // PhastaIOActiveFiles[i]->my_offset_table += pool_align - (mem_address & (pool_align -1)); 921 922 PhastaIOActiveFiles[i]->my_read_table = 923 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 924 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof( unsigned long long *) ); 925 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table; 926 //if( mem_address & (pool_align -1) ) 927 // PhastaIOActiveFiles[i]->my_read_table += pool_align - (mem_address & (pool_align -1)); 928 929 for (j=0; j<*nfields; j++) 930 { 931 PhastaIOActiveFiles[i]->my_offset_table[j] = 932 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 933 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp + pool_align, sizeof( unsigned long long) ); 934 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table[j]; 935 //if( mem_address & (pool_align -1) ) 936 // PhastaIOActiveFiles[i]->my_offset_table[j] += pool_align - (mem_address & (pool_align -1)); 937 938 PhastaIOActiveFiles[i]->my_read_table[j] = 939 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 940 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) + pool_align ); 941 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table[j]; 942 //if( mem_address & (pool_align -1) ) 943 // PhastaIOActiveFiles[i]->my_read_table[j] += pool_align - (mem_address & (pool_align -1)); 944 } 945 *filehandle = i; 946 947 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 948 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 949 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 950 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 951 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 952 953 /* 954 PhastaIOActiveFiles[i]->master_header = 955 ( char * ) calloc( MasterHeaderSize + pool_align, sizeof( char ) ); 956 mem_address = (long long )PhastaIOActiveFiles[i]->master_header; 957 if( mem_address & (pool_align -1) ) 958 PhastaIOActiveFiles[i]->master_header += pool_align - (mem_address & (pool_align -1)); 959 960 PhastaIOActiveFiles[i]->double_chunk = 961 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 962 mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 963 if( mem_address & (pool_align -1) ) 964 PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 965 966 PhastaIOActiveFiles[i]->int_chunk = 967 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 968 mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 969 if( mem_address & (pool_align -1) ) 970 PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & (pool_align -1)); 971 972 PhastaIOActiveFiles[i]->read_double_chunk = 973 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 974 mem_address = (long long )PhastaIOActiveFiles[i]->read_double_chunk; 975 if( mem_address & (pool_align -1) ) 976 PhastaIOActiveFiles[i]->read_double_chunk += pool_align - (mem_address & (pool_align -1)); 977 978 PhastaIOActiveFiles[i]->read_int_chunk = 979 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 980 mem_address = (long long )PhastaIOActiveFiles[i]->read_int_chunk; 981 if( mem_address & (pool_align -1) ) 982 PhastaIOActiveFiles[i]->read_int_chunk += pool_align - (mem_address & (pool_align -1)); 983 */ 984 985 // Time monitoring 986 endTimer(&timer_end); 987 printPerf("initphmpiiosub", timer_start, timer_end, 0, 0, ""); 988 989 phprintf_0("Info initphmpiiosub: quiting function"); 990 991 return i; 992 } 993 994 995 996 /** open file for both POSIX and MPI-IO syncIO format. 997 * 998 * If it's old POSIX format, simply call posix fopen(). 999 * 1000 * If it's MPI-IO foramt: 1001 * in "read" mode, it builds the header table that points to the offset of 1002 * fields for parts; 1003 * in "write" mode, it opens the file with MPI-IO open routine. 1004 */ 1005 void openfile(const char filename[], 1006 const char mode[], 1007 int* fileDescriptor ) 1008 { 1009 phprintf_0("Info: entering openfile"); 1010 1011 double timer_start, timer_end; 1012 startTimer(&timer_start); 1013 1014 if ( PhastaIONextActiveIndex == 0 ) 1015 { 1016 FILE* file=NULL ; 1017 *fileDescriptor = 0; 1018 char* fname = StringStripper( filename ); 1019 char* imode = StringStripper( mode ); 1020 1021 if ( cscompare( "read", imode ) ) file = fopen(fname, "rb" ); 1022 else if( cscompare( "write", imode ) ) file = fopen(fname, "wb" ); 1023 else if( cscompare( "append", imode ) ) file = fopen(fname, "ab" ); 1024 1025 if ( !file ){ 1026 fprintf(stderr,"Error openfile: unable to open file %s",fname ) ; 1027 } else { 1028 fileArray.push_back( file ); 1029 byte_order.push_back( false ); 1030 header_type.push_back( sizeof(int) ); 1031 *fileDescriptor = fileArray.size(); 1032 } 1033 free (fname); 1034 free (imode); 1035 } 1036 else // else it would be parallel I/O, opposed to posix io 1037 { 1038 char* fname = StringStripper( filename ); 1039 char* imode = StringStripper( mode ); 1040 int rc; 1041 int i = *fileDescriptor; 1042 checkFileDescriptor("openfile",&i); 1043 char* token; 1044 1045 if ( cscompare( "read", imode ) ) 1046 { 1047 // if (PhastaIOActiveFiles[i]->myrank == 0) 1048 // printf("\n **********\nRead open ... ... regular version\n"); 1049 1050 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1051 fname, 1052 MPI_MODE_RDONLY, 1053 MPI_INFO_NULL, 1054 &(PhastaIOActiveFiles[i]->file_handle) ); 1055 1056 if(rc) 1057 { 1058 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1059 printf("Error openfile: Unable to open file %s! File descriptor = %d\n",fname,*fileDescriptor); 1060 endTimer(&timer_end); 1061 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1062 return; 1063 } 1064 1065 MPI_Status read_tag_status; 1066 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1067 int j; 1068 int magic_number; 1069 1070 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1071 MPI_File_read_at( PhastaIOActiveFiles[i]->file_handle, 1072 0, 1073 PhastaIOActiveFiles[i]->master_header, 1074 MasterHeaderSize, 1075 MPI_CHAR, 1076 &read_tag_status ); 1077 } 1078 1079 MPI_Bcast( PhastaIOActiveFiles[i]->master_header, 1080 MasterHeaderSize, 1081 MPI_CHAR, 1082 0, 1083 PhastaIOActiveFiles[i]->local_comm ); 1084 1085 memcpy( read_out_tag, 1086 PhastaIOActiveFiles[i]->master_header, 1087 MAX_FIELDS_NAME_LENGTH-1 ); 1088 1089 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) 1090 { 1091 // Test endianess ... 1092 memcpy ( &magic_number, 1093 PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1094 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 1095 1096 if ( magic_number != ENDIAN_TEST_NUMBER ) 1097 { 1098 PhastaIOActiveFiles[i]->Wrong_Endian = true; 1099 } 1100 1101 memcpy( read_out_tag, 1102 PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH+1, // TODO: WHY +1??? 1103 MAX_FIELDS_NAME_LENGTH ); 1104 1105 // Read in # fields ... 1106 token = strtok ( read_out_tag, ":" ); 1107 token = strtok( NULL," ,;<>" ); 1108 PhastaIOActiveFiles[i]->nFields = atoi( token ); 1109 1110 unsigned long long **header_table; 1111 header_table = ( unsigned long long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long long *)); 1112 //header_table = ( unsigned long long ** ) calloc( PhastaIOActiveFiles[i]->nFields + pool_align, sizeof(unsigned long long *)); 1113 //mem_address = (long long )header_table; 1114 //if( mem_address & (pool_align -1) ) 1115 // header_table += pool_align - (mem_address & (pool_align -1)); 1116 1117 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1118 { 1119 header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long long)); 1120 //header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF + pool_align, sizeof(unsigned long long *)); 1121 //mem_address = (long long )header_table[j]; 1122 //if( mem_address & (pool_align -1) ) 1123 // header_table[j] += pool_align - (mem_address & (pool_align -1)); 1124 } 1125 1126 // Read in the offset table ... 1127 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1128 { 1129 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1130 memcpy( header_table[j], 1131 PhastaIOActiveFiles[i]->master_header + 1132 VERSION_INFO_HEADER_SIZE + 1133 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long), 1134 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long) ); 1135 } 1136 1137 MPI_Scatter( header_table[j], 1138 PhastaIOActiveFiles[i]->nppp, 1139 MPI_LONG_LONG_INT, 1140 PhastaIOActiveFiles[i]->my_read_table[j], 1141 PhastaIOActiveFiles[i]->nppp, 1142 MPI_LONG_LONG_INT, 1143 0, 1144 PhastaIOActiveFiles[i]->local_comm ); 1145 1146 // Swap byte order if endianess is different ... 1147 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) { 1148 SwapArrayByteOrder( PhastaIOActiveFiles[i]->my_read_table[j], 1149 sizeof(long long int), 1150 PhastaIOActiveFiles[i]->nppp ); 1151 } 1152 } 1153 1154 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1155 free ( header_table[j] ); 1156 } 1157 free (header_table); 1158 1159 } // end of if MPI_IO_TAG 1160 else //else not valid MPI file 1161 { 1162 *fileDescriptor = NOT_A_MPI_FILE; 1163 printf("Error openfile: The file %s you opened is not in syncIO new format, please check again! File descriptor = %d, MasterHeaderSize = %d, read_out_tag = %s\n",fname,*fileDescriptor,MasterHeaderSize,read_out_tag); 1164 //Printing MasterHeaderSize is useful to test a compiler bug on Intrepid BGP 1165 endTimer(&timer_end); 1166 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1167 return; 1168 } 1169 } // end of if "read" 1170 else if( cscompare( "write", imode ) ) 1171 { 1172 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1173 fname, 1174 MPI_MODE_WRONLY | MPI_MODE_CREATE, 1175 MPI_INFO_NULL, 1176 &(PhastaIOActiveFiles[i]->file_handle) ); 1177 if(rc) 1178 { 1179 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1180 return; 1181 } 1182 } // end of if "write" 1183 free (fname); 1184 free (imode); 1185 } // end of if FileIndex != 0 1186 1187 endTimer(&timer_end); 1188 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1189 } 1190 1191 /** close file for both POSIX and MPI-IO syncIO format. 1192 * 1193 * If it's old POSIX format, simply call posix fclose(). 1194 * 1195 * If it's MPI-IO foramt: 1196 * in "read" mode, it simply close file with MPI-IO close routine. 1197 * in "write" mode, rank 0 in each group will re-assemble the master header and 1198 * offset table and write to the beginning of file, then close the file. 1199 */ 1200 void closefile( int* fileDescriptor, 1201 const char mode[] ) 1202 { 1203 double timer_start, timer_end; 1204 startTimer(&timer_start); 1205 1206 int i = *fileDescriptor; 1207 checkFileDescriptor("closefile",&i); 1208 1209 if ( PhastaIONextActiveIndex == 0 ) { 1210 char* imode = StringStripper( mode ); 1211 1212 if( cscompare( "write", imode ) 1213 || cscompare( "append", imode ) ) { 1214 fflush( fileArray[ *fileDescriptor - 1 ] ); 1215 } 1216 1217 fclose( fileArray[ *fileDescriptor - 1 ] ); 1218 free (imode); 1219 for (mic::iterator it=LastHeaderKey.begin(); it!=LastHeaderKey.end(); ++it) 1220 free(it->second); 1221 } 1222 else { 1223 char* imode = StringStripper( mode ); 1224 1225 //write master header here: 1226 if ( cscompare( "write", imode ) ) { 1227 // if ( PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields < 2*ONE_MEGABYTE/8 ) //SHOULD BE CHECKED 1228 // MasterHeaderSize = 4*ONE_MEGABYTE; 1229 // else 1230 // MasterHeaderSize = 4*ONE_MEGABYTE + PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields * 8 - 2*ONE_MEGABYTE; 1231 1232 MasterHeaderSize = computeMHSize( PhastaIOActiveFiles[i]->nFields, PhastaIOActiveFiles[i]->nPPF, LATEST_WRITE_VERSION); 1233 phprintf_0("Info closefile: myrank = %d, MasterHeaderSize = %d\n", PhastaIOActiveFiles[i]->myrank, MasterHeaderSize); 1234 1235 MPI_Status write_header_status; 1236 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1237 char version[MAX_FIELDS_NAME_LENGTH/4]; 1238 char mhsize[MAX_FIELDS_NAME_LENGTH/4]; 1239 int magic_number = ENDIAN_TEST_NUMBER; 1240 1241 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) 1242 { 1243 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1244 sprintf(mpi_tag, "MPI_IO_Tag : "); 1245 memcpy(PhastaIOActiveFiles[i]->master_header, 1246 mpi_tag, 1247 MAX_FIELDS_NAME_LENGTH); 1248 1249 bzero((void*)version,MAX_FIELDS_NAME_LENGTH/4); 1250 // this version is "1", print version in ASCII 1251 sprintf(version, "version : %d",1); 1252 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/2, 1253 version, 1254 MAX_FIELDS_NAME_LENGTH/4); 1255 1256 // master header size is computed using the formula above 1257 bzero((void*)mhsize,MAX_FIELDS_NAME_LENGTH/4); 1258 sprintf(mhsize, "mhsize : "); 1259 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/4*3, 1260 mhsize, 1261 MAX_FIELDS_NAME_LENGTH/4); 1262 1263 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1264 sprintf(mpi_tag, 1265 "\nnFields : %d\n", 1266 PhastaIOActiveFiles[i]->nFields); 1267 memcpy(PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH, 1268 mpi_tag, 1269 MAX_FIELDS_NAME_LENGTH); 1270 1271 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1272 sprintf(mpi_tag, "\nnPPF : %d\n", PhastaIOActiveFiles[i]->nPPF); 1273 memcpy( PhastaIOActiveFiles[i]->master_header+ 1274 PhastaIOActiveFiles[i]->nFields * 1275 MAX_FIELDS_NAME_LENGTH + 1276 MAX_FIELDS_NAME_LENGTH * 2, 1277 mpi_tag, 1278 MAX_FIELDS_NAME_LENGTH); 1279 1280 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1281 &magic_number, 1282 sizeof(int)); 1283 1284 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("mhsize : ") -1 + MAX_FIELDS_NAME_LENGTH/4*3, 1285 &MasterHeaderSize, 1286 sizeof(int)); 1287 } 1288 1289 int j = 0; 1290 unsigned long long **header_table; 1291 header_table = ( unsigned long long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long long *)); 1292 //header_table = ( unsigned long long ** ) calloc( PhastaIOActiveFiles[i]->nFields + pool_align, sizeof(unsigned long long *)); 1293 //mem_address = (long long )header_table; 1294 //if( mem_address & (pool_align -1) ) 1295 // header_table += pool_align - (mem_address & (pool_align -1)); 1296 1297 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1298 header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long long)); 1299 //header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF + pool_align, sizeof (unsigned long long *)); 1300 //mem_address = (long long )header_table[j]; 1301 //if( mem_address & (pool_align -1) ) 1302 // header_table[j] += pool_align - (mem_address & (pool_align - 1)); 1303 } 1304 1305 //if( irank == 0 ) printf("gonna mpi_gather, myrank = %d\n", irank); 1306 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1307 MPI_Gather( PhastaIOActiveFiles[i]->my_offset_table[j], 1308 PhastaIOActiveFiles[i]->nppp, 1309 MPI_LONG_LONG_INT, 1310 header_table[j], 1311 PhastaIOActiveFiles[i]->nppp, 1312 MPI_LONG_LONG_INT, 1313 0, 1314 PhastaIOActiveFiles[i]->local_comm ); 1315 } 1316 1317 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1318 1319 //if( irank == 0 ) printf("gonna memcpy for every procs, myrank = %d\n", irank); 1320 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1321 memcpy ( PhastaIOActiveFiles[i]->master_header + 1322 VERSION_INFO_HEADER_SIZE + 1323 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long), 1324 header_table[j], 1325 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long) ); 1326 } 1327 1328 //if( irank == 0 ) printf("gonna file_write_at(), myrank = %d\n", irank); 1329 MPI_File_write_at( PhastaIOActiveFiles[i]->file_handle, 1330 0, 1331 PhastaIOActiveFiles[i]->master_header, 1332 MasterHeaderSize, 1333 MPI_CHAR, 1334 &write_header_status ); 1335 } 1336 1337 ////free(PhastaIOActiveFiles[i]->master_header); 1338 1339 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1340 free ( header_table[j] ); 1341 } 1342 free (header_table); 1343 } 1344 1345 //if( irank == 0 ) printf("gonna file_close(), myrank = %d\n", irank); 1346 MPI_File_close( &( PhastaIOActiveFiles[i]->file_handle ) ); 1347 free ( imode ); 1348 } 1349 1350 endTimer(&timer_end); 1351 printPerf("closefile_", timer_start, timer_end, 0, 0, ""); 1352 } 1353 1354 void readheader( int* fileDescriptor, 1355 const char keyphrase[], 1356 void* valueArray, 1357 int* nItems, 1358 const char datatype[], 1359 const char iotype[] ) 1360 { 1361 double timer_start, timer_end; 1362 1363 startTimer(&timer_start); 1364 1365 int i = *fileDescriptor; 1366 checkFileDescriptor("readheader",&i); 1367 1368 if ( PhastaIONextActiveIndex == 0 ) { 1369 int filePtr = *fileDescriptor - 1; 1370 FILE* fileObject; 1371 int* valueListInt; 1372 1373 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1374 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1375 fprintf(stderr,"openfile_ function has to be called before \n") ; 1376 fprintf(stderr,"acessing the file\n ") ; 1377 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1378 endTimer(&timer_end); 1379 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1380 return; 1381 } 1382 1383 if( LastHeaderKey.count(filePtr) ) 1384 free(LastHeaderKey[filePtr]); 1385 const int l = strlen(keyphrase)+1; 1386 LastHeaderKey[filePtr] = (char*) malloc(l*sizeof(char)); 1387 strcpy(LastHeaderKey[filePtr], keyphrase); 1388 LastHeaderNotFound = false; 1389 1390 fileObject = fileArray[ filePtr ] ; 1391 Wrong_Endian = byte_order[ filePtr ]; 1392 1393 isBinary( iotype ); 1394 typeSize( datatype ); //redundant call, just avoid a compiler warning. 1395 1396 // right now we are making the assumption that we will only write integers 1397 // on the header line. 1398 1399 valueListInt = static_cast< int* >( valueArray ); 1400 int ierr = readHeader( fileObject , 1401 keyphrase, 1402 valueListInt, 1403 *nItems ) ; 1404 1405 byte_order[ filePtr ] = Wrong_Endian ; 1406 1407 if ( ierr ) LastHeaderNotFound = true; 1408 1409 //return ; // don't return, go to the end to print perf 1410 } 1411 else { 1412 unsigned int skip_size; 1413 int* valueListInt; 1414 valueListInt = static_cast <int*>(valueArray); 1415 char* token; 1416 bool FOUND = false ; 1417 isBinary( iotype ); 1418 1419 MPI_Status read_offset_status; 1420 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1421 char readouttag[MAX_FIELDS_NUMBER][MAX_FIELDS_NAME_LENGTH]; 1422 int j; 1423 1424 int string_length = strlen( keyphrase ); 1425 char* buffer = (char*) malloc ( string_length+1 ); 1426 //char* buffer = ( char * ) malloc( string_length + 1 + pool_align ); 1427 //mem_address = (long long )buffer; 1428 //if( mem_address & (pool_align -1) ) 1429 // buffer += pool_align - (mem_address & (pool_align -1)); 1430 1431 strcpy ( buffer, keyphrase ); 1432 buffer[ string_length ] = '\0'; 1433 1434 char* st2 = strtok ( buffer, "@" ); 1435 st2 = strtok (NULL, "@"); 1436 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1437 if ( char* p = strpbrk(buffer, "@") ) 1438 *p = '\0'; 1439 1440 // Check if the user has input the right GPid 1441 if ( ( PhastaIOActiveFiles[i]->GPid <= 1442 PhastaIOActiveFiles[i]->myrank * 1443 PhastaIOActiveFiles[i]->nppp )|| 1444 ( PhastaIOActiveFiles[i]->GPid > 1445 ( PhastaIOActiveFiles[i]->myrank + 1 ) * 1446 PhastaIOActiveFiles[i]->nppp ) ) 1447 { 1448 *fileDescriptor = NOT_A_MPI_FILE; 1449 printf("Error readheader: The file is not in syncIO new format, please check! myrank = %d, GPid = %d, nppp = %d, keyphrase = %s\n", PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->GPid, PhastaIOActiveFiles[i]->nppp, keyphrase); 1450 // It is possible atoi could not generate a clear integer from st2 because of additional garbage character in keyphrase 1451 endTimer(&timer_end); 1452 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1453 return; 1454 } 1455 1456 // Find the field we want ... 1457 //for ( j = 0; j<MAX_FIELDS_NUMBER; j++ ) 1458 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1459 { 1460 memcpy( readouttag[j], 1461 PhastaIOActiveFiles[i]->master_header + j*MAX_FIELDS_NAME_LENGTH+MAX_FIELDS_NAME_LENGTH*2+1, 1462 MAX_FIELDS_NAME_LENGTH-1 ); 1463 } 1464 1465 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1466 { 1467 token = strtok ( readouttag[j], ":" ); 1468 1469 //if ( cscompare( buffer, token ) ) 1470 if ( cscompare( token , buffer ) && cscompare( buffer, token ) ) 1471 // This double comparison is required for the field "number of nodes" and all the other fields that start with "number of nodes" (i.g. number of nodes in the mesh"). 1472 // Would be safer to rename "number of nodes" by "number of nodes in the part" so that the name are completely unique. But much more work to do that (Nspre, phParAdapt, etc). 1473 // Since the field name are unique in SyncIO (as it includes part ID), this should be safe and there should be no issue with the "?" trailing character. 1474 { 1475 PhastaIOActiveFiles[i]->read_field_count = j; 1476 FOUND = true; 1477 //printf("buffer: %s | token: %s | j: %d\n",buffer,token,j); 1478 break; 1479 } 1480 } 1481 free(buffer); 1482 1483 if (!FOUND) 1484 { 1485 //if(irank==0) printf("Warning readheader: Not found %s \n",keyphrase); //PhastaIOActiveFiles[i]->myrank is certainly initialized here. 1486 if(PhastaIOActiveFiles[i]->myrank == 0) printf("WARNING readheader: Not found %s\n",keyphrase); 1487 endTimer(&timer_end); 1488 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1489 return; 1490 } 1491 1492 // Find the part we want ... 1493 PhastaIOActiveFiles[i]->read_part_count = PhastaIOActiveFiles[i]->GPid - 1494 PhastaIOActiveFiles[i]->myrank * PhastaIOActiveFiles[i]->nppp - 1; 1495 1496 PhastaIOActiveFiles[i]->my_offset = 1497 PhastaIOActiveFiles[i]->my_read_table[PhastaIOActiveFiles[i]->read_field_count][PhastaIOActiveFiles[i]->read_part_count]; 1498 1499 // printf("****Rank %d offset is %d\n",PhastaIOActiveFiles[i]->myrank,PhastaIOActiveFiles[i]->my_offset); 1500 1501 // Read each datablock header here ... 1502 1503 MPI_File_read_at_all( PhastaIOActiveFiles[i]->file_handle, 1504 PhastaIOActiveFiles[i]->my_offset+1, 1505 read_out_tag, 1506 MAX_FIELDS_NAME_LENGTH-1, 1507 MPI_CHAR, 1508 &read_offset_status ); 1509 token = strtok ( read_out_tag, ":" ); 1510 1511 // printf("&&&&Rank %d read_out_tag is %s\n",PhastaIOActiveFiles[i]->myrank,read_out_tag); 1512 1513 if( cscompare( keyphrase , token ) ) //No need to compare also token with keyphrase like above. We should already have the right one. Otherwise there is a problem. 1514 { 1515 FOUND = true ; 1516 token = strtok( NULL, " ,;<>" ); 1517 skip_size = atoi( token ); 1518 for( j=0; j < *nItems && ( token = strtok( NULL," ,;<>") ); j++ ) 1519 valueListInt[j] = atoi( token ); 1520 1521 if ( j < *nItems ) 1522 { 1523 fprintf( stderr, "Expected # of ints not found for: %s\n", keyphrase ); 1524 } 1525 } 1526 else { 1527 //if(irank==0) 1528 if(PhastaIOActiveFiles[i]->myrank == 0) 1529 // If we enter this if, there is a problem with the name of some fields 1530 { 1531 printf("Error readheader: Unexpected mismatch between keyphrase = %s and token = %s\n",keyphrase,token); 1532 } 1533 } 1534 } 1535 1536 endTimer(&timer_end); 1537 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1538 1539 } 1540 1541 void readdatablock( int* fileDescriptor, 1542 const char keyphrase[], 1543 void* valueArray, 1544 int* nItems, 1545 const char datatype[], 1546 const char iotype[] ) 1547 { 1548 //if(irank == 0) printf("entering readdatablock()\n"); 1549 unsigned long long data_size = 0; 1550 double timer_start, timer_end; 1551 startTimer(&timer_start); 1552 1553 int i = *fileDescriptor; 1554 checkFileDescriptor("readdatablock",&i); 1555 1556 if ( PhastaIONextActiveIndex == 0 ) { 1557 int filePtr = *fileDescriptor - 1; 1558 FILE* fileObject; 1559 char junk; 1560 1561 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1562 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1563 fprintf(stderr,"openfile_ function has to be called before\n") ; 1564 fprintf(stderr,"acessing the file\n ") ; 1565 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1566 endTimer(&timer_end); 1567 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1568 return; 1569 } 1570 1571 // error check.. 1572 // since we require that a consistant header always preceed the data block 1573 // let us check to see that it is actually the case. 1574 1575 if ( ! cscompare( LastHeaderKey[ filePtr ], keyphrase ) ) { 1576 fprintf(stderr, "Header not consistant with data block\n"); 1577 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ] ); 1578 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1579 fprintf(stderr, "Please recheck read sequence \n"); 1580 if( Strict_Error ) { 1581 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1582 endTimer(&timer_end); 1583 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1584 return; 1585 } 1586 } 1587 1588 if ( LastHeaderNotFound ) { 1589 endTimer(&timer_end); 1590 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1591 return; 1592 } 1593 fileObject = fileArray[ filePtr ]; 1594 Wrong_Endian = byte_order[ filePtr ]; 1595 1596 size_t type_size = typeSize( datatype ); 1597 int nUnits = *nItems; 1598 isBinary( iotype ); 1599 1600 if ( binary_format ) { 1601 fread( valueArray, type_size, nUnits, fileObject ); 1602 fread( &junk, sizeof(char), 1 , fileObject ); 1603 if ( Wrong_Endian ) SwapArrayByteOrder( valueArray, type_size, nUnits ); 1604 } else { 1605 1606 char* ts1 = StringStripper( datatype ); 1607 if ( cscompare( "integer", ts1 ) ) { 1608 for( int n=0; n < nUnits ; n++ ) 1609 fscanf(fileObject, "%d\n",(int*)((int*)valueArray+n) ); 1610 } else if ( cscompare( "double", ts1 ) ) { 1611 for( int n=0; n < nUnits ; n++ ) 1612 fscanf(fileObject, "%lf\n",(double*)((double*)valueArray+n) ); 1613 } 1614 free (ts1); 1615 } 1616 1617 //return; 1618 } 1619 else { 1620 // printf("read data block\n"); 1621 MPI_Status read_data_status; 1622 size_t type_size = typeSize( datatype ); 1623 int nUnits = *nItems; 1624 isBinary( iotype ); 1625 1626 // read datablock then 1627 //MR CHANGE 1628 // if ( cscompare ( datatype, "double")) 1629 char* ts2 = StringStripper( datatype ); 1630 if ( cscompare ( "double" , ts2)) 1631 //MR CHANGE END 1632 { 1633 1634 MPI_File_read_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1635 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1636 valueArray, 1637 nUnits, 1638 MPI_DOUBLE ); 1639 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1640 valueArray, 1641 &read_data_status ); 1642 data_size=8*nUnits; 1643 1644 } 1645 //MR CHANGE 1646 // else if ( cscompare ( datatype, "integer")) 1647 else if ( cscompare ( "integer" , ts2)) 1648 //MR CHANGE END 1649 { 1650 MPI_File_read_at_all_begin(PhastaIOActiveFiles[i]->file_handle, 1651 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1652 valueArray, 1653 nUnits, 1654 MPI_INT ); 1655 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1656 valueArray, 1657 &read_data_status ); 1658 data_size=4*nUnits; 1659 } 1660 else 1661 { 1662 *fileDescriptor = DATA_TYPE_ILLEGAL; 1663 printf("readdatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 1664 endTimer(&timer_end); 1665 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1666 return; 1667 } 1668 free(ts2); 1669 1670 1671 // printf("%d Read finishe\n",PhastaIOActiveFiles[i]->myrank); 1672 1673 // Swap data byte order if endianess is different ... 1674 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) 1675 { 1676 SwapArrayByteOrder( valueArray, type_size, nUnits ); 1677 } 1678 } 1679 1680 endTimer(&timer_end); 1681 char extra_msg[1024]; 1682 memset(extra_msg, '\0', 1024); 1683 char* key = StringStripper(keyphrase); 1684 sprintf(extra_msg, " field is %s ", key); 1685 printPerf("readdatablock", timer_start, timer_end, data_size, 1, extra_msg); 1686 free(key); 1687 1688 } 1689 1690 void writeheader( const int* fileDescriptor, 1691 const char keyphrase[], 1692 const void* valueArray, 1693 const int* nItems, 1694 const int* ndataItems, 1695 const char datatype[], 1696 const char iotype[]) 1697 { 1698 1699 //if(irank == 0) printf("entering writeheader()\n"); 1700 1701 double timer_start, timer_end; 1702 startTimer(&timer_start); 1703 1704 int i = *fileDescriptor; 1705 checkFileDescriptor("writeheader",&i); 1706 1707 if ( PhastaIONextActiveIndex == 0 ) { 1708 int filePtr = *fileDescriptor - 1; 1709 FILE* fileObject; 1710 1711 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1712 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1713 fprintf(stderr,"openfile_ function has to be called before \n") ; 1714 fprintf(stderr,"acessing the file\n ") ; 1715 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1716 endTimer(&timer_end); 1717 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1718 return; 1719 } 1720 1721 if( LastHeaderKey.count(filePtr) ) 1722 free(LastHeaderKey[filePtr]); 1723 const int l = strlen(keyphrase)+1; 1724 LastHeaderKey[filePtr] = (char*) malloc(l*sizeof(char)); 1725 strcpy(LastHeaderKey[filePtr], keyphrase); 1726 DataSize = *ndataItems; 1727 fileObject = fileArray[ filePtr ] ; 1728 size_t type_size = typeSize( datatype ); 1729 isBinary( iotype ); 1730 header_type[ filePtr ] = type_size; 1731 1732 int _newline = ( *ndataItems > 0 ) ? sizeof( char ) : 0; 1733 int size_of_nextblock = 1734 ( binary_format ) ? type_size*( *ndataItems )+ _newline : *ndataItems ; 1735 1736 fprintf( fileObject, "%s : < %d > ", keyphrase, size_of_nextblock ); 1737 for( int i = 0; i < *nItems; i++ ) 1738 fprintf(fileObject, "%d ", *((int*)((int*)valueArray+i))); 1739 fprintf(fileObject, "\n"); 1740 1741 //return ; 1742 } 1743 else { // else it's parallel I/O 1744 DataSize = *ndataItems; 1745 size_t type_size = typeSize( datatype ); 1746 isBinary( iotype ); 1747 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1748 1749 int string_length = strlen( keyphrase ); 1750 char* buffer = (char*) malloc ( string_length+1 ); 1751 //char* buffer = ( char * ) malloc( string_length + 1 + pool_align ); 1752 //mem_address = (long long )buffer; 1753 //if( mem_address & (pool_align -1) ) 1754 // buffer += pool_align - (mem_address & (pool_align -1)); 1755 1756 strcpy ( buffer, keyphrase); 1757 buffer[ string_length ] = '\0'; 1758 1759 char* st2 = strtok ( buffer, "@" ); 1760 st2 = strtok (NULL, "@"); 1761 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1762 1763 if ( char* p = strpbrk(buffer, "@") ) 1764 *p = '\0'; 1765 1766 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1767 sprintf(mpi_tag, "\n%s : %d\n", buffer, PhastaIOActiveFiles[i]->field_count); 1768 unsigned long long offset_value; 1769 1770 int temp = *ndataItems; 1771 unsigned long long number_of_items = (unsigned long long)temp; 1772 MPI_Barrier(PhastaIOActiveFiles[i]->local_comm); 1773 1774 MPI_Scan( &number_of_items, 1775 &offset_value, 1776 1, 1777 MPI_LONG_LONG_INT, 1778 MPI_SUM, 1779 PhastaIOActiveFiles[i]->local_comm ); 1780 1781 offset_value = (offset_value - number_of_items) * type_size; 1782 1783 offset_value += PhastaIOActiveFiles[i]->local_myrank * 1784 DB_HEADER_SIZE + 1785 PhastaIOActiveFiles[i]->next_start_address; 1786 // This offset is the starting address of each datablock header... 1787 PhastaIOActiveFiles[i]->my_offset = offset_value; 1788 1789 // Write in my offset table ... 1790 PhastaIOActiveFiles[i]->my_offset_table[PhastaIOActiveFiles[i]->field_count][PhastaIOActiveFiles[i]->part_count] = 1791 PhastaIOActiveFiles[i]->my_offset; 1792 1793 // Update the next-start-address ... 1794 PhastaIOActiveFiles[i]->next_start_address = offset_value + 1795 number_of_items * type_size + 1796 DB_HEADER_SIZE; 1797 MPI_Bcast( &(PhastaIOActiveFiles[i]->next_start_address), 1798 1, 1799 MPI_LONG_LONG_INT, 1800 PhastaIOActiveFiles[i]->local_numprocs-1, 1801 PhastaIOActiveFiles[i]->local_comm ); 1802 1803 // Prepare datablock header ... 1804 int _newline = (*ndataItems>0)?sizeof(char):0; 1805 unsigned int size_of_nextblock = type_size * (*ndataItems) + _newline; 1806 1807 //char datablock_header[255]; 1808 //bzero((void*)datablock_header,255); 1809 char datablock_header[DB_HEADER_SIZE]; 1810 bzero((void*)datablock_header,DB_HEADER_SIZE); 1811 1812 PhastaIOActiveFiles[i]->GPid = PhastaIOActiveFiles[i]->nppp*PhastaIOActiveFiles[i]->myrank+PhastaIOActiveFiles[i]->part_count; 1813 sprintf( datablock_header, 1814 "\n%s : < %u >", 1815 keyphrase, 1816 size_of_nextblock ); 1817 1818 for ( int j = 0; j < *nItems; j++ ) 1819 { 1820 sprintf( datablock_header, 1821 "%s %d ", 1822 datablock_header, 1823 *((int*)((int*)valueArray+j))); 1824 } 1825 sprintf( datablock_header, 1826 "%s\n ", 1827 datablock_header ); 1828 1829 // Write datablock header ... 1830 //MR CHANGE 1831 // if ( cscompare(datatype,"double") ) 1832 char* ts1 = StringStripper( datatype ); 1833 if ( cscompare("double",ts1) ) 1834 //MR CHANGE END 1835 { 1836 free ( PhastaIOActiveFiles[i]->double_chunk ); 1837 PhastaIOActiveFiles[i]->double_chunk = ( double * )malloc( (sizeof( double )*number_of_items+ DB_HEADER_SIZE)); 1838 //PhastaIOActiveFiles[i]->double_chunk = ( double * ) malloc( sizeof( double )*number_of_items+ DB_HEADER_SIZE + pool_align ); 1839 //mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 1840 //if( mem_address & (pool_align -1) ) 1841 // PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 1842 1843 double * aa = ( double * )datablock_header; 1844 memcpy(PhastaIOActiveFiles[i]->double_chunk, aa, DB_HEADER_SIZE); 1845 } 1846 //MR CHANGE 1847 // if ( cscompare(datatype,"integer") ) 1848 else if ( cscompare("integer",ts1) ) 1849 //MR CHANGE END 1850 { 1851 free ( PhastaIOActiveFiles[i]->int_chunk ); 1852 PhastaIOActiveFiles[i]->int_chunk = ( int * )malloc( (sizeof( int )*number_of_items+ DB_HEADER_SIZE)); 1853 //PhastaIOActiveFiles[i]->int_chunk = ( int * ) malloc( sizeof( int )*number_of_items+ DB_HEADER_SIZE + pool_align ); 1854 //mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 1855 //if( mem_address & (pool_align -1) ) 1856 // PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & ( pool_align -1)); 1857 1858 int * aa = ( int * )datablock_header; 1859 memcpy(PhastaIOActiveFiles[i]->int_chunk, aa, DB_HEADER_SIZE); 1860 } 1861 else { 1862 // *fileDescriptor = DATA_TYPE_ILLEGAL; 1863 printf("writeheader - DATA_TYPE_ILLEGAL - %s\n",datatype); 1864 endTimer(&timer_end); 1865 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1866 return; 1867 } 1868 free(ts1); 1869 1870 PhastaIOActiveFiles[i]->part_count++; 1871 if ( PhastaIOActiveFiles[i]->part_count == PhastaIOActiveFiles[i]->nppp ) { 1872 //A new field will be written 1873 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1874 memcpy( PhastaIOActiveFiles[i]->master_header + 1875 PhastaIOActiveFiles[i]->field_count * 1876 MAX_FIELDS_NAME_LENGTH + 1877 MAX_FIELDS_NAME_LENGTH * 2, 1878 mpi_tag, 1879 MAX_FIELDS_NAME_LENGTH-1); 1880 } 1881 PhastaIOActiveFiles[i]->field_count++; 1882 PhastaIOActiveFiles[i]->part_count=0; 1883 } 1884 free(buffer); 1885 } 1886 1887 endTimer(&timer_end); 1888 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1889 } 1890 1891 void writedatablock( const int* fileDescriptor, 1892 const char keyphrase[], 1893 const void* valueArray, 1894 const int* nItems, 1895 const char datatype[], 1896 const char iotype[] ) 1897 { 1898 //if(irank == 0) printf("entering writedatablock()\n"); 1899 1900 unsigned long long data_size = 0; 1901 double timer_start, timer_end; 1902 startTimer(&timer_start); 1903 1904 int i = *fileDescriptor; 1905 checkFileDescriptor("writedatablock",&i); 1906 1907 if ( PhastaIONextActiveIndex == 0 ) { 1908 int filePtr = *fileDescriptor - 1; 1909 1910 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1911 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1912 fprintf(stderr,"openfile_ function has to be called before \n") ; 1913 fprintf(stderr,"acessing the file\n ") ; 1914 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1915 endTimer(&timer_end); 1916 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1917 return; 1918 } 1919 // since we require that a consistant header always preceed the data block 1920 // let us check to see that it is actually the case. 1921 1922 if ( ! cscompare( LastHeaderKey[ filePtr ], keyphrase ) ) { 1923 fprintf(stderr, "Header not consistant with data block\n"); 1924 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ] ); 1925 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1926 fprintf(stderr, "Please recheck write sequence \n"); 1927 if( Strict_Error ) { 1928 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1929 endTimer(&timer_end); 1930 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1931 return; 1932 } 1933 } 1934 1935 FILE* fileObject = fileArray[ filePtr ] ; 1936 size_t type_size=typeSize( datatype ); 1937 isBinary( iotype ); 1938 1939 if ( header_type[filePtr] != (int)type_size ) { 1940 fprintf(stderr,"header and datablock differ on typeof data in the block for\n"); 1941 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1942 if( Strict_Error ) { 1943 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1944 endTimer(&timer_end); 1945 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1946 return; 1947 } 1948 } 1949 1950 int nUnits = *nItems; 1951 1952 if ( nUnits != DataSize ) { 1953 fprintf(stderr,"header and datablock differ on number of data items for\n"); 1954 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1955 if( Strict_Error ) { 1956 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1957 endTimer(&timer_end); 1958 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1959 return; 1960 } 1961 } 1962 1963 if ( binary_format ) { 1964 1965 fwrite( valueArray, type_size, nUnits, fileObject ); 1966 fprintf( fileObject,"\n"); 1967 1968 } else { 1969 1970 char* ts1 = StringStripper( datatype ); 1971 if ( cscompare( "integer", ts1 ) ) { 1972 for( int n=0; n < nUnits ; n++ ) 1973 fprintf(fileObject,"%d\n",*((int*)((int*)valueArray+n))); 1974 } else if ( cscompare( "double", ts1 ) ) { 1975 for( int n=0; n < nUnits ; n++ ) 1976 fprintf(fileObject,"%lf\n",*((double*)((double*)valueArray+n))); 1977 } 1978 free (ts1); 1979 } 1980 //return ; 1981 } 1982 else { // syncIO case 1983 MPI_Status write_data_status; 1984 isBinary( iotype ); 1985 int nUnits = *nItems; 1986 1987 //MR CHANGE 1988 // if ( cscompare(datatype,"double") ) 1989 char* ts1 = StringStripper( datatype ); 1990 if ( cscompare("double",ts1) ) 1991 //MR CHANGE END 1992 { 1993 memcpy((PhastaIOActiveFiles[i]->double_chunk+DB_HEADER_SIZE/sizeof(double)), valueArray, nUnits*sizeof(double)); 1994 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1995 PhastaIOActiveFiles[i]->my_offset, 1996 PhastaIOActiveFiles[i]->double_chunk, 1997 //BLOCK_SIZE/sizeof(double), 1998 nUnits+DB_HEADER_SIZE/sizeof(double), 1999 MPI_DOUBLE ); 2000 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 2001 PhastaIOActiveFiles[i]->double_chunk, 2002 &write_data_status ); 2003 data_size=8*nUnits; 2004 } 2005 //MR CHANGE 2006 // else if ( cscompare ( datatype, "integer")) 2007 else if ( cscompare("integer",ts1) ) 2008 //MR CHANGE END 2009 { 2010 memcpy((PhastaIOActiveFiles[i]->int_chunk+DB_HEADER_SIZE/sizeof(int)), valueArray, nUnits*sizeof(int)); 2011 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 2012 PhastaIOActiveFiles[i]->my_offset, 2013 PhastaIOActiveFiles[i]->int_chunk, 2014 nUnits+DB_HEADER_SIZE/sizeof(int), 2015 MPI_INT ); 2016 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 2017 PhastaIOActiveFiles[i]->int_chunk, 2018 &write_data_status ); 2019 data_size=4*nUnits; 2020 } 2021 else { 2022 printf("Error: writedatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 2023 endTimer(&timer_end); 2024 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 2025 return; 2026 } 2027 free(ts1); 2028 } 2029 2030 endTimer(&timer_end); 2031 char extra_msg[1024]; 2032 memset(extra_msg, '\0', 1024); 2033 char* key = StringStripper(keyphrase); 2034 sprintf(extra_msg, " field is %s ", key); 2035 printPerf("writedatablock", timer_start, timer_end, data_size, 1, extra_msg); 2036 free(key); 2037 2038 } 2039 2040 void 2041 SwapArrayByteOrder( void* array, 2042 int nbytes, 2043 int nItems ) 2044 { 2045 /* This swaps the byte order for the array of nItems each 2046 of size nbytes , This will be called only locally */ 2047 int i,j; 2048 unsigned char* ucDst = (unsigned char*)array; 2049 2050 for(i=0; i < nItems; i++) { 2051 for(j=0; j < (nbytes/2); j++) 2052 std::swap( ucDst[j] , ucDst[(nbytes - 1) - j] ); 2053 ucDst += nbytes; 2054 } 2055 } 2056 2057 void 2058 writestring( int* fileDescriptor, 2059 const char inString[] ) 2060 { 2061 2062 int filePtr = *fileDescriptor - 1; 2063 FILE* fileObject = fileArray[filePtr] ; 2064 fprintf(fileObject,"%s",inString ); 2065 return; 2066 } 2067 2068 void 2069 Gather_Headers( int* fileDescriptor, 2070 vector< string >& headers ) 2071 { 2072 2073 FILE* fileObject; 2074 char Line[1024]; 2075 2076 fileObject = fileArray[ (*fileDescriptor)-1 ]; 2077 2078 while( !feof(fileObject) ) { 2079 fgets( Line, 1024, fileObject); 2080 if ( Line[0] == '#' ) { 2081 headers.push_back( Line ); 2082 } else { 2083 break; 2084 } 2085 } 2086 rewind( fileObject ); 2087 clearerr( fileObject ); 2088 } 2089 void 2090 isWrong( void ) { (Wrong_Endian) ? fprintf(stdout,"YES\n"): fprintf(stdout,"NO\n") ; } 2091 2092 void 2093 togglestrictmode( void ) { Strict_Error = !Strict_Error; } 2094 2095 int 2096 isLittleEndian( void ) 2097 { 2098 // this function returns a 1 if the current running architecture is 2099 // LittleEndian Byte Ordered, else it returns a zero 2100 2101 union { 2102 long a; 2103 char c[sizeof( long )]; 2104 } endianUnion; 2105 2106 endianUnion.a = 1 ; 2107 2108 if ( endianUnion.c[sizeof(long)-1] != 1 ) return 1 ; 2109 else return 0; 2110 } 2111 2112 namespace PHASTA { 2113 const char* const PhastaIO_traits<int>::type_string = "integer"; 2114 const char* const PhastaIO_traits<double>::type_string = "double"; 2115 } 2116