1 /* 2 * 3 * This is the SyncIO library that uses MPI-IO collective functions to 4 * implement a flexible I/O checkpoint solution for a large number of 5 * processors. 6 * 7 * Previous developer: Ning Liu (liun2@cs.rpi.edu) 8 * Jing Fu (fuj@cs.rpi.edu) 9 * Current developers: Michel Rasquin (Michel.Rasquin@colorado.edu), 10 * Ben Matthews (benjamin.a.matthews@colorado.edu) 11 * 12 */ 13 14 #include <map> 15 #include <vector> 16 #include <string> 17 #include <string.h> 18 #include <ctype.h> 19 #include <stdlib.h> 20 #include <stdio.h> 21 #include <math.h> 22 #include <sstream> 23 #include "phastaIO.h" 24 #include "mpi.h" 25 #include "phiotmrc.h" 26 27 #define VERSION_INFO_HEADER_SIZE 8192 28 #define DB_HEADER_SIZE 1024 29 #define ONE_MEGABYTE 1048576 30 #define TWO_MEGABYTE 2097152 31 #define ENDIAN_TEST_NUMBER 12180 // Troy's Zip Code!! 32 #define MAX_PHASTA_FILES 64 33 #define MAX_PHASTA_FILE_NAME_LENGTH 1024 34 #define MAX_FIELDS_NUMBER ((VERSION_INFO_HEADER_SIZE/MAX_FIELDS_NAME_LENGTH)-4) // The meta data include - MPI_IO_Tag, nFields, nFields*names of the fields, nppf 35 // -3 for MPI_IO_Tag, nFields and nppf, -4 for extra security (former nFiles) 36 #define MAX_FIELDS_NAME_LENGTH 128 37 #define DefaultMHSize (4*ONE_MEGABYTE) 38 //#define DefaultMHSize (8350) //For test 39 #define LATEST_WRITE_VERSION 1 40 #define inv1024sq 953.674316406e-9 // = 1/1024/1024 41 int MasterHeaderSize = -1; 42 43 bool PRINT_PERF = false; // default print no perf results 44 int irank = -1; // global rank, should never be manually manipulated 45 int mysize = -1; 46 47 // Static variables are bad but used here to store the subcommunicator and associated variables 48 // Prevent MPI_Comm_split to be called more than once, especially on BGQ with the V1R2M1 driver (leak detected in MPI_Comm_split - IBM working on it) 49 static int s_assign_local_comm = 0; 50 static MPI_Comm s_local_comm; 51 static int s_local_size = -1; 52 static int s_local_rank = -1; 53 54 //unsigned long long pool_align = 8; 55 //unsigned long long mem_address; 56 57 // the following defines are for debug printf 58 #define PHASTAIO_PREFIX "phastaIO debug: " 59 #define PHASTAIO_DEBUG 0 //default to not print any debugging info 60 61 #if PHASTAIO_DEBUG 62 #define phprintf( s, arg...) printf(PHASTAIO_PREFIX s "\n", ##arg) 63 #define phprintf_0( s, arg...) do{ \ 64 MPI_Comm_rank(MPI_COMM_WORLD, &irank); \ 65 if(irank == 0){ \ 66 printf(PHASTAIO_PREFIX "irank=0: " s "\n", ##arg); \ 67 } \ 68 } while(0) 69 #else 70 #define phprintf( s, arg...) 71 #define phprintf_0( s, arg...) 72 #endif 73 74 enum PhastaIO_Errors 75 { 76 MAX_PHASTA_FILES_EXCEEDED = -1, 77 UNABLE_TO_OPEN_FILE = -2, 78 NOT_A_MPI_FILE = -3, 79 GPID_EXCEEDED = -4, 80 DATA_TYPE_ILLEGAL = -5, 81 }; 82 83 using namespace std; 84 85 namespace{ 86 87 map< int , char* > LastHeaderKey; 88 vector< FILE* > fileArray; 89 vector< bool > byte_order; 90 vector< int > header_type; 91 int DataSize=0; 92 bool LastHeaderNotFound = false; 93 bool Wrong_Endian = false ; 94 bool Strict_Error = false ; 95 bool binary_format = true; 96 97 /***********************************************************************/ 98 /***************** NEW PHASTA IO CODE STARTS HERE **********************/ 99 /***********************************************************************/ 100 101 typedef struct 102 { 103 char filename[MAX_PHASTA_FILE_NAME_LENGTH]; /* defafults to 1024 */ 104 unsigned long long my_offset; 105 unsigned long long next_start_address; 106 unsigned long long **my_offset_table; 107 unsigned long long **my_read_table; 108 109 double * double_chunk; 110 double * read_double_chunk; 111 112 int field_count; 113 int part_count; 114 int read_field_count; 115 int read_part_count; 116 int GPid; 117 int start_id; 118 119 int mhsize; 120 121 int myrank; 122 int numprocs; 123 int local_myrank; 124 int local_numprocs; 125 126 int nppp; 127 int nPPF; 128 int nFiles; 129 int nFields; 130 131 int * int_chunk; 132 int * read_int_chunk; 133 134 int Wrong_Endian; /* default to false */ 135 char * master_header; 136 MPI_File file_handle; 137 MPI_Comm local_comm; 138 } phastaio_file_t; 139 140 typedef struct 141 { 142 //unsigned long long my_offset; 143 //unsigned long long **offset_table; 144 //int fileID; 145 int nppf, nfields; 146 //int GPid; 147 //int read_field_count; 148 char * masterHeader; 149 }serial_file; 150 151 serial_file *SerialFile; 152 phastaio_file_t *PhastaIOActiveFiles[MAX_PHASTA_FILES]; 153 int PhastaIONextActiveIndex = 0; /* indicates next index to allocate */ 154 155 // the caller has the responsibility to delete the returned string 156 // TODO: StringStipper("nbc value? ") returns NULL? 157 char* 158 StringStripper( const char istring[] ) { 159 160 int length = strlen( istring ); 161 162 //char* dest = new char [ length + 1 ]; 163 char* dest = (char *)malloc( length + 1 ); 164 165 //char * dest; 166 //dest = (char *)malloc( length + 1 + pool_align ); 167 //if (dest & 0x0F != 0) printf("not aligned!!!!\n"); 168 //mem_address = (long long )dest; 169 //if( mem_address & (pool_align -1) ) 170 // dest += pool_align - (mem_address & (pool_align -1)); 171 172 strcpy( dest, istring ); 173 dest[ length ] = '\0'; 174 175 if ( char* p = strpbrk( dest, " ") ) 176 *p = '\0'; 177 178 return dest; 179 } 180 181 inline int 182 cscompare( const char teststring[], 183 const char targetstring[] ) { 184 185 char* s1 = const_cast<char*>(teststring); 186 char* s2 = const_cast<char*>(targetstring); 187 188 while( *s1 == ' ') s1++; 189 while( *s2 == ' ') s2++; 190 while( ( *s1 ) 191 && ( *s2 ) 192 && ( *s2 != '?') 193 && ( tolower( *s1 )==tolower( *s2 ) ) ) { 194 s1++; 195 s2++; 196 while( *s1 == ' ') s1++; 197 while( *s2 == ' ') s2++; 198 } 199 if ( !( *s1 ) || ( *s1 == '?') ) return 1; 200 else return 0; 201 } 202 203 inline void 204 isBinary( const char iotype[] ) { 205 206 char* fname = StringStripper( iotype ); 207 if ( cscompare( fname, "binary" ) ) binary_format = true; 208 else binary_format = false; 209 free (fname); 210 211 } 212 213 inline size_t 214 typeSize( const char typestring[] ) { 215 216 char* ts1 = StringStripper( typestring ); 217 218 if ( cscompare( "integer", ts1 ) ) { 219 free (ts1); 220 return sizeof(int); 221 } else if ( cscompare( "double", ts1 ) ) { 222 free (ts1); 223 return sizeof( double ); 224 } else { 225 free (ts1); 226 fprintf(stderr,"unknown type : %s\n",ts1); 227 return 0; 228 } 229 } 230 231 int 232 readHeader( FILE* fileObject, 233 const char phrase[], 234 int* params, 235 int expect ) { 236 237 char* text_header; 238 char* token; 239 char Line[1024]; 240 char junk; 241 bool FOUND = false ; 242 int real_length; 243 int skip_size, integer_value; 244 int rewind_count=0; 245 246 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 247 rewind( fileObject ); 248 clearerr( fileObject ); 249 rewind_count++; 250 fgets( Line, 1024, fileObject ); 251 } 252 253 while( !FOUND && ( rewind_count < 2 ) ) { 254 if ( ( Line[0] != '\n' ) && ( real_length = strcspn( Line, "#" )) ){ 255 //text_header = new char [ real_length + 1 ]; 256 text_header = (char *)malloc( real_length + 1 ); 257 //text_header = (char *)malloc( real_length + 1 + pool_align ); 258 //mem_address = (long long )text_header; 259 //if( mem_address & (pool_align -1) ) 260 // text_header += pool_align - (mem_address & (pool_align -1)); 261 262 strncpy( text_header, Line, real_length ); 263 text_header[ real_length ] =static_cast<char>(NULL); 264 token = strtok ( text_header, ":" ); 265 //if( cscompare( phrase , token ) ) { 266 // Double comparison required because different fields can still start 267 // with the same name for mixed meshes (nbc code, nbc values, etc). 268 // Would be easy to fix cscompare instead but would it break sth else? 269 if( cscompare( phrase , token ) && cscompare( token , phrase ) ) { 270 FOUND = true ; 271 token = strtok( NULL, " ,;<>" ); 272 skip_size = atoi( token ); 273 int i; 274 for( i=0; i < expect && ( token = strtok( NULL," ,;<>") ); i++) { 275 params[i] = atoi( token ); 276 } 277 if ( i < expect ) { 278 fprintf(stderr,"Aloha Expected # of ints not found for: %s\n",phrase ); 279 } 280 } else if ( cscompare(token,"byteorder magic number") ) { 281 if ( binary_format ) { 282 fread((void*)&integer_value,sizeof(int),1,fileObject); 283 fread( &junk, sizeof(char), 1 , fileObject ); 284 if ( 362436 != integer_value ) Wrong_Endian = true; 285 } else{ 286 fscanf(fileObject, "%d\n", &integer_value ); 287 } 288 } else { 289 /* some other header, so just skip over */ 290 token = strtok( NULL, " ,;<>" ); 291 skip_size = atoi( token ); 292 if ( binary_format) 293 fseek( fileObject, skip_size, SEEK_CUR ); 294 else 295 for( int gama=0; gama < skip_size; gama++ ) 296 fgets( Line, 1024, fileObject ); 297 } 298 free (text_header); 299 } // end of if before while loop 300 301 if ( !FOUND ) 302 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 303 rewind( fileObject ); 304 clearerr( fileObject ); 305 rewind_count++; 306 fgets( Line, 1024, fileObject ); 307 } 308 } 309 310 if ( !FOUND ) { 311 //fprintf(stderr, "Error: Could not find: %s\n", phrase); 312 if(irank == 0) printf("WARNING: Could not find: %s\n", phrase); 313 return 1; 314 } 315 return 0; 316 } 317 318 } // end unnamed namespace 319 320 321 // begin of publicly visible functions 322 323 /** 324 * This function takes a long long pointer and assign (start) phiotmrc value to it 325 */ 326 //void startTimer(unsigned long long* start) { 327 void startTimer(double* start) { 328 329 if( !PRINT_PERF ) return; 330 331 MPI_Barrier(MPI_COMM_WORLD); 332 *start = phiotmrc(); 333 } 334 335 /** 336 * This function takes a long long pointer and assign (end) phiotmrc value to it 337 */ 338 void endTimer(double* end) { 339 340 if( !PRINT_PERF ) return; 341 342 *end = phiotmrc(); 343 MPI_Barrier(MPI_COMM_WORLD); 344 } 345 346 /** 347 * choose to print some performance results (or not) according to 348 * the PRINT_PERF macro 349 */ 350 void printPerf( 351 const char* func_name, 352 double start, 353 double end, 354 unsigned long long datasize, 355 int printdatainfo, 356 const char* extra_msg) { 357 358 if( !PRINT_PERF ) return; 359 360 unsigned long long data_size = datasize; 361 362 double time = end - start; 363 364 unsigned long long isizemin,isizemax,isizetot; 365 double sizemin,sizemax,sizeavg,sizetot,rate; 366 double tmin, tmax, tavg, ttot; 367 368 MPI_Allreduce(&time, &tmin,1, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD); 369 MPI_Allreduce(&time, &tmax,1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); 370 MPI_Allreduce(&time, &ttot,1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); 371 tavg = ttot/mysize; 372 373 if(irank == 0) { 374 if ( PhastaIONextActiveIndex == 0 ) printf("** 1PFPP "); 375 else printf("** syncIO "); 376 printf("%s(): Tmax = %f sec, Tmin = %f sec, Tavg = %f sec", func_name, tmax, tmin, tavg); 377 } 378 379 if(printdatainfo == 1) { // if printdatainfo ==1, compute I/O rate and block size 380 MPI_Allreduce(&data_size,&isizemin,1,MPI_LONG_LONG_INT,MPI_MIN,MPI_COMM_WORLD); 381 MPI_Allreduce(&data_size,&isizemax,1,MPI_LONG_LONG_INT,MPI_MAX,MPI_COMM_WORLD); 382 MPI_Allreduce(&data_size,&isizetot,1,MPI_LONG_LONG_INT,MPI_SUM,MPI_COMM_WORLD); 383 384 sizemin=(double)(isizemin*inv1024sq); 385 sizemax=(double)(isizemax*inv1024sq); 386 sizetot=(double)(isizetot*inv1024sq); 387 sizeavg=(double)(1.0*sizetot/mysize); 388 rate=(double)(1.0*sizetot/tmax); 389 390 if( irank == 0) { 391 printf(", Rate = %f MB/s [%s] \n \t\t\t block size: Min= %f MB; Max= %f MB; Avg= %f MB; Tot= %f MB\n", rate, extra_msg, sizemin,sizemax,sizeavg,sizetot); 392 } 393 } 394 else { 395 if(irank == 0) { 396 printf(" \n"); 397 //printf(" (%s) \n", extra_msg); 398 } 399 } 400 } 401 402 /** 403 * This function is normally called at the beginning of a read operation, before 404 * init function. 405 * This function (uses rank 0) reads out nfields, nppf, master header size, 406 * endianess and allocates for masterHeader string. 407 * These values are essential for following read operations. Rank 0 will bcast 408 * these values to other ranks in the commm world 409 * 410 * If the file set is of old POSIX format, it would throw error and exit 411 */ 412 void queryphmpiio(const char filename[],int *nfields, int *nppf) 413 { 414 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 415 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 416 417 if(irank == 0) { 418 FILE * fileHandle; 419 char* fname = StringStripper( filename ); 420 421 fileHandle = fopen (fname,"rb"); 422 if (fileHandle == NULL ) { 423 printf("\nError: File %s doesn't exist! Please check!\n",fname); 424 } 425 else { 426 SerialFile =(serial_file *)calloc( 1, sizeof( serial_file) ); 427 //SerialFile = (serial_file *)malloc( sizeof( serial_file ) + pool_align ); 428 //mem_address = (long long )SerialFile; 429 //if( mem_address & (pool_align -1) ) 430 // SerialFile += pool_align - (mem_address & (pool_align -1)); 431 432 //SerialFile->masterHeader = (char *)malloc(MasterHeaderSize); 433 //int meta_size_limit = 4*ONE_MEGABYTE; 434 //int meta_size_limit = (4+ MAX_FIELDS_NUMBER ) * MAX_FIELDS_NAME_LENGTH; 435 int meta_size_limit = VERSION_INFO_HEADER_SIZE; 436 SerialFile->masterHeader = (char *)malloc( meta_size_limit ); 437 //SerialFile->masterHeader = (char *)malloc( meta_size_limit + pool_align ); 438 //mem_address = (long long )SerialFile; 439 //if( mem_address & (pool_align -1) ) 440 // SerialFile->masterHeader += pool_align - (mem_address & (pool_align -1)); 441 442 fread(SerialFile->masterHeader, 1, meta_size_limit, fileHandle); 443 444 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 445 char version[MAX_FIELDS_NAME_LENGTH/4]; 446 int mhsize; 447 char * token; 448 int magic_number; 449 450 memcpy( read_out_tag, 451 SerialFile->masterHeader, 452 MAX_FIELDS_NAME_LENGTH-1 ); 453 454 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) { 455 // Test endianess ... 456 memcpy (&magic_number, 457 SerialFile->masterHeader + sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 458 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 459 460 if ( magic_number != ENDIAN_TEST_NUMBER ) { 461 printf("Endian is different!\n"); 462 // Will do swap later 463 } 464 465 // test version, old version, default masterheader size is 4M 466 // newer version masterheader size is read from first line 467 memcpy(version, 468 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/2, 469 MAX_FIELDS_NAME_LENGTH/4 - 1); //TODO: why -1? 470 471 if( cscompare ("version",version) ) { 472 // if there is "version" tag in the file, then it is newer format 473 // read master header size from here, otherwise use default 474 // Note: if version is "1", we know mhsize is at 3/4 place... 475 476 token = strtok(version, ":"); 477 token = strtok(NULL, " ,;<>" ); 478 int iversion = atoi(token); 479 480 if( iversion == 1) { 481 memcpy( &mhsize, 482 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/4*3 + sizeof("mhsize : ")-1, 483 sizeof(int)); 484 if ( magic_number != ENDIAN_TEST_NUMBER ) 485 SwapArrayByteOrder(&mhsize, sizeof(int), 1); 486 487 if( mhsize > DefaultMHSize ) { 488 //if actual headersize is larger than default, let's re-read 489 free(SerialFile->masterHeader); 490 SerialFile->masterHeader = (char *)malloc(mhsize); 491 fseek(fileHandle, 0, SEEK_SET); // reset the file stream position 492 fread(SerialFile->masterHeader,1,mhsize,fileHandle); 493 } 494 } 495 //TODO: check if this is a valid int?? 496 MasterHeaderSize = mhsize; 497 } 498 else { // else it's version 0's format w/o version tag, implicating MHSize=4M 499 MasterHeaderSize = DefaultMHSize; 500 } 501 502 memcpy( read_out_tag, 503 SerialFile->masterHeader+MAX_FIELDS_NAME_LENGTH+1, 504 MAX_FIELDS_NAME_LENGTH ); //TODO: why +1 505 506 // Read in # fields ... 507 token = strtok( read_out_tag, ":" ); 508 token = strtok( NULL," ,;<>" ); 509 *nfields = atoi( token ); 510 if ( *nfields > MAX_FIELDS_NUMBER) { 511 printf("Error queryphmpiio: nfields is larger than MAX_FIELDS_NUMBER!\n"); 512 } 513 SerialFile->nfields=*nfields; //TODO: sanity check of this int? 514 515 memcpy( read_out_tag, 516 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH * 2 517 + *nfields * MAX_FIELDS_NAME_LENGTH, 518 MAX_FIELDS_NAME_LENGTH); 519 520 token = strtok( read_out_tag, ":" ); 521 token = strtok( NULL," ,;<>" ); 522 *nppf = atoi( token ); 523 SerialFile->nppf=*nppf; //TODO: sanity check of int 524 } // end of if("MPI_IO_TAG") 525 else { 526 printf("Error queryphmpiio: The file you opened is not of syncIO new format, please check! read_out_tag = %s\n",read_out_tag); 527 exit(1); 528 } 529 fclose(fileHandle); 530 free(SerialFile->masterHeader); 531 free(SerialFile); 532 } //end of else 533 free(fname); 534 } 535 536 // Bcast value to every one 537 MPI_Bcast( nfields, 1, MPI_INT, 0, MPI_COMM_WORLD ); 538 MPI_Bcast( nppf, 1, MPI_INT, 0, MPI_COMM_WORLD ); 539 MPI_Bcast( &MasterHeaderSize, 1, MPI_INT, 0, MPI_COMM_WORLD ); 540 phprintf("Info queryphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 541 } 542 543 /** 544 * This function computes the right master header size (round to size of 2^n). 545 * This is only needed for file format version 1 in "write" mode. 546 */ 547 int computeMHSize(int nfields, int nppf, int version) { 548 int mhsize; 549 if(version == 1) { 550 //int meta_info_size = (2+nfields+1) * MAX_FIELDS_NAME_LENGTH; // 2 is MPI_IO_TAG and nFields, the others 1 is nppf 551 int meta_info_size = VERSION_INFO_HEADER_SIZE; 552 int actual_size = nfields * nppf * sizeof(long long) + meta_info_size; 553 //printf("actual_size = %d, offset table size = %d\n", actual_size, nfields * nppf * sizeof(long long)); 554 if (actual_size > DefaultMHSize) { 555 mhsize = (int) ceil( (double) actual_size/DefaultMHSize); // it's rounded to ceiling of this value 556 mhsize *= DefaultMHSize; 557 } 558 else { 559 mhsize = DefaultMHSize; 560 } 561 } 562 return mhsize; 563 } 564 565 /** 566 * Computes correct color of a rank according to number of files. 567 */ 568 extern "C" int computeColor( int myrank, int numprocs, int nfiles) { 569 int color = 570 (int)(myrank / (numprocs / nfiles)); 571 return color; 572 } 573 574 575 /** 576 * Check the file descriptor. 577 */ 578 void checkFileDescriptor(const char fctname[], 579 int* fileDescriptor ) { 580 if ( *fileDescriptor < 0 ) { 581 printf("Error: File descriptor = %d in %s\n",*fileDescriptor,fctname); 582 exit(1); 583 } 584 } 585 586 /** 587 * Initialize the file struct members and allocate space for file struct 588 * buffers. 589 * 590 * Note: this function is only called when we are using new format. Old POSIX 591 * format should skip this routine and call openfile() directly instead. 592 */ 593 int initphmpiio( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[]) 594 { 595 // we init irank again in case query not called (e.g. syncIO write case) 596 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 597 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 598 599 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 600 601 double timer_start, timer_end; 602 startTimer(&timer_start); 603 604 char* imode = StringStripper( mode ); 605 606 // Note: if it's read, we presume query was called prior to init and 607 // MasterHeaderSize is already set to correct value from parsing header 608 // otherwise it's write then it needs some computation to be set 609 if ( cscompare( "read", imode ) ) { 610 // do nothing 611 } 612 else if( cscompare( "write", imode ) ) { 613 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 614 } 615 else { 616 printf("Error initphmpiio: can't recognize the mode %s", imode); 617 exit(1); 618 } 619 free ( imode ); 620 621 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 622 623 int i, j; 624 625 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 626 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 627 endTimer(&timer_end); 628 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 629 return MAX_PHASTA_FILES_EXCEEDED; 630 } 631 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 632 // { 633 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 634 // { 635 // PhastaIOActiveFiles[i] = NULL; 636 // } 637 // } 638 639 640 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 641 //PhastaIOActiveFiles[PhastaIONextActiveIndex] = ( phastaio_file_t *)calloc( 1 + 1, sizeof( phastaio_file_t ) ); 642 //mem_address = (long long )PhastaIOActiveFiles[PhastaIONextActiveIndex]; 643 //if( mem_address & (pool_align -1) ) 644 // PhastaIOActiveFiles[PhastaIONextActiveIndex] += pool_align - (mem_address & (pool_align -1)); 645 646 i = PhastaIONextActiveIndex; 647 PhastaIONextActiveIndex++; 648 649 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 650 651 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 652 653 PhastaIOActiveFiles[i]->Wrong_Endian = false; 654 655 PhastaIOActiveFiles[i]->nFields = *nfields; 656 PhastaIOActiveFiles[i]->nPPF = *nppf; 657 PhastaIOActiveFiles[i]->nFiles = *nfiles; 658 MPI_Comm_rank(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->myrank)); 659 MPI_Comm_size(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->numprocs)); 660 661 662 if( *nfiles > 1 ) { // split the ranks according to each mpiio file 663 664 if ( s_assign_local_comm == 0) { // call mpi_comm_split for the first (and only) time 665 666 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Building subcommunicator\n"); 667 668 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 669 MPI_Comm_split(MPI_COMM_WORLD, 670 color, 671 PhastaIOActiveFiles[i]->myrank, 672 &(PhastaIOActiveFiles[i]->local_comm)); 673 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 674 &(PhastaIOActiveFiles[i]->local_numprocs)); 675 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 676 &(PhastaIOActiveFiles[i]->local_myrank)); 677 678 // back up now these variables so that we do not need to call comm_split again 679 s_local_comm = PhastaIOActiveFiles[i]->local_comm; 680 s_local_size = PhastaIOActiveFiles[i]->local_numprocs; 681 s_local_rank = PhastaIOActiveFiles[i]->local_myrank; 682 s_assign_local_comm = 1; 683 } 684 else { // recycle the subcommunicator 685 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Recycling subcommunicator\n"); 686 PhastaIOActiveFiles[i]->local_comm = s_local_comm; 687 PhastaIOActiveFiles[i]->local_numprocs = s_local_size; 688 PhastaIOActiveFiles[i]->local_myrank = s_local_rank; 689 } 690 } 691 else { // *nfiles == 1 here - no need to call mpi_comm_split here 692 693 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Bypassing subcommunicator\n"); 694 PhastaIOActiveFiles[i]->local_comm = MPI_COMM_WORLD; 695 PhastaIOActiveFiles[i]->local_numprocs = PhastaIOActiveFiles[i]->numprocs; 696 PhastaIOActiveFiles[i]->local_myrank = PhastaIOActiveFiles[i]->myrank; 697 698 } 699 700 PhastaIOActiveFiles[i]->nppp = 701 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 702 703 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 704 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 705 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 706 707 PhastaIOActiveFiles[i]->my_offset_table = 708 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 709 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof(unsigned long long *) ); 710 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table; 711 //if( mem_address & (pool_align -1) ) 712 // PhastaIOActiveFiles[i]->my_offset_table += pool_align - (mem_address & (pool_align -1)); 713 714 PhastaIOActiveFiles[i]->my_read_table = 715 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 716 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof( unsigned long long *) ); 717 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table; 718 //if( mem_address & (pool_align -1) ) 719 // PhastaIOActiveFiles[i]->my_read_table += pool_align - (mem_address & (pool_align -1)); 720 721 for (j=0; j<*nfields; j++) 722 { 723 PhastaIOActiveFiles[i]->my_offset_table[j] = 724 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 725 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp + pool_align, sizeof( unsigned long long) ); 726 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table[j]; 727 //if( mem_address & (pool_align -1) ) 728 // PhastaIOActiveFiles[i]->my_offset_table[j] += pool_align - (mem_address & (pool_align -1)); 729 730 PhastaIOActiveFiles[i]->my_read_table[j] = 731 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 732 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) + pool_align ); 733 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table[j]; 734 //if( mem_address & (pool_align -1) ) 735 // PhastaIOActiveFiles[i]->my_read_table[j] += pool_align - (mem_address & (pool_align -1)); 736 } 737 *filehandle = i; 738 739 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 740 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 741 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 742 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 743 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 744 745 /* 746 PhastaIOActiveFiles[i]->master_header = 747 ( char * ) calloc( MasterHeaderSize + pool_align, sizeof( char ) ); 748 mem_address = (long long )PhastaIOActiveFiles[i]->master_header; 749 if( mem_address & (pool_align -1) ) 750 PhastaIOActiveFiles[i]->master_header += pool_align - (mem_address & (pool_align -1)); 751 752 PhastaIOActiveFiles[i]->double_chunk = 753 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 754 mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 755 if( mem_address & (pool_align -1) ) 756 PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 757 758 PhastaIOActiveFiles[i]->int_chunk = 759 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 760 mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 761 if( mem_address & (pool_align -1) ) 762 PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & (pool_align -1)); 763 764 PhastaIOActiveFiles[i]->read_double_chunk = 765 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 766 mem_address = (long long )PhastaIOActiveFiles[i]->read_double_chunk; 767 if( mem_address & (pool_align -1) ) 768 PhastaIOActiveFiles[i]->read_double_chunk += pool_align - (mem_address & (pool_align -1)); 769 770 PhastaIOActiveFiles[i]->read_int_chunk = 771 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 772 mem_address = (long long )PhastaIOActiveFiles[i]->read_int_chunk; 773 if( mem_address & (pool_align -1) ) 774 PhastaIOActiveFiles[i]->read_int_chunk += pool_align - (mem_address & (pool_align -1)); 775 */ 776 777 // Time monitoring 778 endTimer(&timer_end); 779 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 780 781 phprintf_0("Info initphmpiio: quiting function"); 782 783 return i; 784 } 785 786 /** 787 * Destruct the file struct and free buffers allocated in init function. 788 */ 789 void finalizephmpiio( int *fileDescriptor ) 790 { 791 double timer_start, timer_end; 792 startTimer(&timer_start); 793 794 int i, j; 795 i = *fileDescriptor; 796 //PhastaIONextActiveIndex--; 797 798 /* //free the offset table for this phasta file */ 799 //for(j=0; j<MAX_FIELDS_NUMBER; j++) //Danger: undefined behavior for my_*_table.[j] not allocated or not initialized to NULL 800 for(j=0; j<PhastaIOActiveFiles[i]->nFields; j++) 801 { 802 free( PhastaIOActiveFiles[i]->my_offset_table[j]); 803 free( PhastaIOActiveFiles[i]->my_read_table[j]); 804 } 805 free ( PhastaIOActiveFiles[i]->my_offset_table ); 806 free ( PhastaIOActiveFiles[i]->my_read_table ); 807 free ( PhastaIOActiveFiles[i]->master_header ); 808 free ( PhastaIOActiveFiles[i]->double_chunk ); 809 free ( PhastaIOActiveFiles[i]->int_chunk ); 810 free ( PhastaIOActiveFiles[i]->read_double_chunk ); 811 free ( PhastaIOActiveFiles[i]->read_int_chunk ); 812 813 free( PhastaIOActiveFiles[i]); 814 815 endTimer(&timer_end); 816 printPerf("finalizempiio", timer_start, timer_end, 0, 0, ""); 817 818 PhastaIONextActiveIndex--; 819 } 820 821 822 /** 823 * Special init for M2N in order to create a subcommunicator for the reduced solution (requires PRINT_PERF to be false for now) 824 * Initialize the file struct members and allocate space for file struct buffers. 825 * 826 * Note: this function is only called when we are using new format. Old POSIX 827 * format should skip this routine and call openfile() directly instead. 828 */ 829 int initphmpiiosub( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[],MPI_Comm my_local_comm) 830 { 831 // we init irank again in case query not called (e.g. syncIO write case) 832 833 MPI_Comm_rank(my_local_comm, &irank); 834 MPI_Comm_size(my_local_comm, &mysize); 835 836 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 837 838 double timer_start, timer_end; 839 startTimer(&timer_start); 840 841 char* imode = StringStripper( mode ); 842 843 // Note: if it's read, we presume query was called prior to init and 844 // MasterHeaderSize is already set to correct value from parsing header 845 // otherwise it's write then it needs some computation to be set 846 if ( cscompare( "read", imode ) ) { 847 // do nothing 848 } 849 else if( cscompare( "write", imode ) ) { 850 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 851 } 852 else { 853 printf("Error initphmpiio: can't recognize the mode %s", imode); 854 exit(1); 855 } 856 free ( imode ); 857 858 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 859 860 int i, j; 861 862 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 863 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 864 endTimer(&timer_end); 865 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 866 return MAX_PHASTA_FILES_EXCEEDED; 867 } 868 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 869 // { 870 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 871 // { 872 // PhastaIOActiveFiles[i] = NULL; 873 // } 874 // } 875 876 877 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 878 //PhastaIOActiveFiles[PhastaIONextActiveIndex] = ( phastaio_file_t *)calloc( 1 + 1, sizeof( phastaio_file_t ) ); 879 //mem_address = (long long )PhastaIOActiveFiles[PhastaIONextActiveIndex]; 880 //if( mem_address & (pool_align -1) ) 881 // PhastaIOActiveFiles[PhastaIONextActiveIndex] += pool_align - (mem_address & (pool_align -1)); 882 883 i = PhastaIONextActiveIndex; 884 PhastaIONextActiveIndex++; 885 886 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 887 888 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 889 890 PhastaIOActiveFiles[i]->Wrong_Endian = false; 891 892 PhastaIOActiveFiles[i]->nFields = *nfields; 893 PhastaIOActiveFiles[i]->nPPF = *nppf; 894 PhastaIOActiveFiles[i]->nFiles = *nfiles; 895 MPI_Comm_rank(my_local_comm, &(PhastaIOActiveFiles[i]->myrank)); 896 MPI_Comm_size(my_local_comm, &(PhastaIOActiveFiles[i]->numprocs)); 897 898 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 899 MPI_Comm_split(my_local_comm, 900 color, 901 PhastaIOActiveFiles[i]->myrank, 902 &(PhastaIOActiveFiles[i]->local_comm)); 903 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 904 &(PhastaIOActiveFiles[i]->local_numprocs)); 905 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 906 &(PhastaIOActiveFiles[i]->local_myrank)); 907 PhastaIOActiveFiles[i]->nppp = 908 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 909 910 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 911 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 912 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 913 914 PhastaIOActiveFiles[i]->my_offset_table = 915 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 916 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof(unsigned long long *) ); 917 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table; 918 //if( mem_address & (pool_align -1) ) 919 // PhastaIOActiveFiles[i]->my_offset_table += pool_align - (mem_address & (pool_align -1)); 920 921 PhastaIOActiveFiles[i]->my_read_table = 922 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 923 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof( unsigned long long *) ); 924 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table; 925 //if( mem_address & (pool_align -1) ) 926 // PhastaIOActiveFiles[i]->my_read_table += pool_align - (mem_address & (pool_align -1)); 927 928 for (j=0; j<*nfields; j++) 929 { 930 PhastaIOActiveFiles[i]->my_offset_table[j] = 931 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 932 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp + pool_align, sizeof( unsigned long long) ); 933 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table[j]; 934 //if( mem_address & (pool_align -1) ) 935 // PhastaIOActiveFiles[i]->my_offset_table[j] += pool_align - (mem_address & (pool_align -1)); 936 937 PhastaIOActiveFiles[i]->my_read_table[j] = 938 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 939 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) + pool_align ); 940 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table[j]; 941 //if( mem_address & (pool_align -1) ) 942 // PhastaIOActiveFiles[i]->my_read_table[j] += pool_align - (mem_address & (pool_align -1)); 943 } 944 *filehandle = i; 945 946 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 947 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 948 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 949 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 950 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 951 952 /* 953 PhastaIOActiveFiles[i]->master_header = 954 ( char * ) calloc( MasterHeaderSize + pool_align, sizeof( char ) ); 955 mem_address = (long long )PhastaIOActiveFiles[i]->master_header; 956 if( mem_address & (pool_align -1) ) 957 PhastaIOActiveFiles[i]->master_header += pool_align - (mem_address & (pool_align -1)); 958 959 PhastaIOActiveFiles[i]->double_chunk = 960 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 961 mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 962 if( mem_address & (pool_align -1) ) 963 PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 964 965 PhastaIOActiveFiles[i]->int_chunk = 966 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 967 mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 968 if( mem_address & (pool_align -1) ) 969 PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & (pool_align -1)); 970 971 PhastaIOActiveFiles[i]->read_double_chunk = 972 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 973 mem_address = (long long )PhastaIOActiveFiles[i]->read_double_chunk; 974 if( mem_address & (pool_align -1) ) 975 PhastaIOActiveFiles[i]->read_double_chunk += pool_align - (mem_address & (pool_align -1)); 976 977 PhastaIOActiveFiles[i]->read_int_chunk = 978 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 979 mem_address = (long long )PhastaIOActiveFiles[i]->read_int_chunk; 980 if( mem_address & (pool_align -1) ) 981 PhastaIOActiveFiles[i]->read_int_chunk += pool_align - (mem_address & (pool_align -1)); 982 */ 983 984 // Time monitoring 985 endTimer(&timer_end); 986 printPerf("initphmpiiosub", timer_start, timer_end, 0, 0, ""); 987 988 phprintf_0("Info initphmpiiosub: quiting function"); 989 990 return i; 991 } 992 993 994 995 /** open file for both POSIX and MPI-IO syncIO format. 996 * 997 * If it's old POSIX format, simply call posix fopen(). 998 * 999 * If it's MPI-IO foramt: 1000 * in "read" mode, it builds the header table that points to the offset of 1001 * fields for parts; 1002 * in "write" mode, it opens the file with MPI-IO open routine. 1003 */ 1004 void openfile(const char filename[], 1005 const char mode[], 1006 int* fileDescriptor ) 1007 { 1008 phprintf_0("Info: entering openfile"); 1009 1010 double timer_start, timer_end; 1011 startTimer(&timer_start); 1012 1013 if ( PhastaIONextActiveIndex == 0 ) 1014 { 1015 FILE* file=NULL ; 1016 *fileDescriptor = 0; 1017 char* fname = StringStripper( filename ); 1018 char* imode = StringStripper( mode ); 1019 1020 if ( cscompare( "read", imode ) ) file = fopen(fname, "rb" ); 1021 else if( cscompare( "write", imode ) ) file = fopen(fname, "wb" ); 1022 else if( cscompare( "append", imode ) ) file = fopen(fname, "ab" ); 1023 1024 if ( !file ){ 1025 fprintf(stderr,"Error openfile: unable to open file %s",fname ) ; 1026 } else { 1027 fileArray.push_back( file ); 1028 byte_order.push_back( false ); 1029 header_type.push_back( sizeof(int) ); 1030 *fileDescriptor = fileArray.size(); 1031 } 1032 free (fname); 1033 free (imode); 1034 } 1035 else // else it would be parallel I/O, opposed to posix io 1036 { 1037 char* fname = StringStripper( filename ); 1038 char* imode = StringStripper( mode ); 1039 int rc; 1040 int i = *fileDescriptor; 1041 checkFileDescriptor("openfile",&i); 1042 char* token; 1043 1044 if ( cscompare( "read", imode ) ) 1045 { 1046 // if (PhastaIOActiveFiles[i]->myrank == 0) 1047 // printf("\n **********\nRead open ... ... regular version\n"); 1048 1049 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1050 fname, 1051 MPI_MODE_RDONLY, 1052 MPI_INFO_NULL, 1053 &(PhastaIOActiveFiles[i]->file_handle) ); 1054 1055 if(rc) 1056 { 1057 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1058 printf("Error openfile: Unable to open file %s! File descriptor = %d\n",fname,*fileDescriptor); 1059 endTimer(&timer_end); 1060 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1061 return; 1062 } 1063 1064 MPI_Status read_tag_status; 1065 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1066 int j; 1067 int magic_number; 1068 1069 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1070 MPI_File_read_at( PhastaIOActiveFiles[i]->file_handle, 1071 0, 1072 PhastaIOActiveFiles[i]->master_header, 1073 MasterHeaderSize, 1074 MPI_CHAR, 1075 &read_tag_status ); 1076 } 1077 1078 MPI_Bcast( PhastaIOActiveFiles[i]->master_header, 1079 MasterHeaderSize, 1080 MPI_CHAR, 1081 0, 1082 PhastaIOActiveFiles[i]->local_comm ); 1083 1084 memcpy( read_out_tag, 1085 PhastaIOActiveFiles[i]->master_header, 1086 MAX_FIELDS_NAME_LENGTH-1 ); 1087 1088 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) 1089 { 1090 // Test endianess ... 1091 memcpy ( &magic_number, 1092 PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1093 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 1094 1095 if ( magic_number != ENDIAN_TEST_NUMBER ) 1096 { 1097 PhastaIOActiveFiles[i]->Wrong_Endian = true; 1098 } 1099 1100 memcpy( read_out_tag, 1101 PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH+1, // TODO: WHY +1??? 1102 MAX_FIELDS_NAME_LENGTH ); 1103 1104 // Read in # fields ... 1105 token = strtok ( read_out_tag, ":" ); 1106 token = strtok( NULL," ,;<>" ); 1107 PhastaIOActiveFiles[i]->nFields = atoi( token ); 1108 1109 unsigned long long **header_table; 1110 header_table = ( unsigned long long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long long *)); 1111 //header_table = ( unsigned long long ** ) calloc( PhastaIOActiveFiles[i]->nFields + pool_align, sizeof(unsigned long long *)); 1112 //mem_address = (long long )header_table; 1113 //if( mem_address & (pool_align -1) ) 1114 // header_table += pool_align - (mem_address & (pool_align -1)); 1115 1116 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1117 { 1118 header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long long)); 1119 //header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF + pool_align, sizeof(unsigned long long *)); 1120 //mem_address = (long long )header_table[j]; 1121 //if( mem_address & (pool_align -1) ) 1122 // header_table[j] += pool_align - (mem_address & (pool_align -1)); 1123 } 1124 1125 // Read in the offset table ... 1126 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1127 { 1128 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1129 memcpy( header_table[j], 1130 PhastaIOActiveFiles[i]->master_header + 1131 VERSION_INFO_HEADER_SIZE + 1132 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long), 1133 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long) ); 1134 } 1135 1136 MPI_Scatter( header_table[j], 1137 PhastaIOActiveFiles[i]->nppp, 1138 MPI_LONG_LONG_INT, 1139 PhastaIOActiveFiles[i]->my_read_table[j], 1140 PhastaIOActiveFiles[i]->nppp, 1141 MPI_LONG_LONG_INT, 1142 0, 1143 PhastaIOActiveFiles[i]->local_comm ); 1144 1145 // Swap byte order if endianess is different ... 1146 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) { 1147 SwapArrayByteOrder( PhastaIOActiveFiles[i]->my_read_table[j], 1148 sizeof(long long int), 1149 PhastaIOActiveFiles[i]->nppp ); 1150 } 1151 } 1152 1153 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1154 free ( header_table[j] ); 1155 } 1156 free (header_table); 1157 1158 } // end of if MPI_IO_TAG 1159 else //else not valid MPI file 1160 { 1161 *fileDescriptor = NOT_A_MPI_FILE; 1162 printf("Error openfile: The file %s you opened is not in syncIO new format, please check again! File descriptor = %d, MasterHeaderSize = %d, read_out_tag = %s\n",fname,*fileDescriptor,MasterHeaderSize,read_out_tag); 1163 //Printing MasterHeaderSize is useful to test a compiler bug on Intrepid BGP 1164 endTimer(&timer_end); 1165 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1166 return; 1167 } 1168 } // end of if "read" 1169 else if( cscompare( "write", imode ) ) 1170 { 1171 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1172 fname, 1173 MPI_MODE_WRONLY | MPI_MODE_CREATE, 1174 MPI_INFO_NULL, 1175 &(PhastaIOActiveFiles[i]->file_handle) ); 1176 if(rc) 1177 { 1178 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1179 return; 1180 } 1181 } // end of if "write" 1182 free (fname); 1183 free (imode); 1184 } // end of if FileIndex != 0 1185 1186 endTimer(&timer_end); 1187 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1188 } 1189 1190 /** close file for both POSIX and MPI-IO syncIO format. 1191 * 1192 * If it's old POSIX format, simply call posix fclose(). 1193 * 1194 * If it's MPI-IO foramt: 1195 * in "read" mode, it simply close file with MPI-IO close routine. 1196 * in "write" mode, rank 0 in each group will re-assemble the master header and 1197 * offset table and write to the beginning of file, then close the file. 1198 */ 1199 void closefile( int* fileDescriptor, 1200 const char mode[] ) 1201 { 1202 double timer_start, timer_end; 1203 startTimer(&timer_start); 1204 1205 int i = *fileDescriptor; 1206 checkFileDescriptor("closefile",&i); 1207 1208 if ( PhastaIONextActiveIndex == 0 ) { 1209 char* imode = StringStripper( mode ); 1210 1211 if( cscompare( "write", imode ) 1212 || cscompare( "append", imode ) ) { 1213 fflush( fileArray[ *fileDescriptor - 1 ] ); 1214 } 1215 1216 fclose( fileArray[ *fileDescriptor - 1 ] ); 1217 free (imode); 1218 } 1219 else { 1220 char* imode = StringStripper( mode ); 1221 1222 //write master header here: 1223 if ( cscompare( "write", imode ) ) { 1224 // if ( PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields < 2*ONE_MEGABYTE/8 ) //SHOULD BE CHECKED 1225 // MasterHeaderSize = 4*ONE_MEGABYTE; 1226 // else 1227 // MasterHeaderSize = 4*ONE_MEGABYTE + PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields * 8 - 2*ONE_MEGABYTE; 1228 1229 MasterHeaderSize = computeMHSize( PhastaIOActiveFiles[i]->nFields, PhastaIOActiveFiles[i]->nPPF, LATEST_WRITE_VERSION); 1230 phprintf_0("Info closefile: myrank = %d, MasterHeaderSize = %d\n", PhastaIOActiveFiles[i]->myrank, MasterHeaderSize); 1231 1232 MPI_Status write_header_status; 1233 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1234 char version[MAX_FIELDS_NAME_LENGTH/4]; 1235 char mhsize[MAX_FIELDS_NAME_LENGTH/4]; 1236 int magic_number = ENDIAN_TEST_NUMBER; 1237 1238 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) 1239 { 1240 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1241 sprintf(mpi_tag, "MPI_IO_Tag : "); 1242 memcpy(PhastaIOActiveFiles[i]->master_header, 1243 mpi_tag, 1244 MAX_FIELDS_NAME_LENGTH); 1245 1246 bzero((void*)version,MAX_FIELDS_NAME_LENGTH/4); 1247 // this version is "1", print version in ASCII 1248 sprintf(version, "version : %d",1); 1249 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/2, 1250 version, 1251 MAX_FIELDS_NAME_LENGTH/4); 1252 1253 // master header size is computed using the formula above 1254 bzero((void*)mhsize,MAX_FIELDS_NAME_LENGTH/4); 1255 sprintf(mhsize, "mhsize : "); 1256 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/4*3, 1257 mhsize, 1258 MAX_FIELDS_NAME_LENGTH/4); 1259 1260 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1261 sprintf(mpi_tag, 1262 "\nnFields : %d\n", 1263 PhastaIOActiveFiles[i]->nFields); 1264 memcpy(PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH, 1265 mpi_tag, 1266 MAX_FIELDS_NAME_LENGTH); 1267 1268 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1269 sprintf(mpi_tag, "\nnPPF : %d\n", PhastaIOActiveFiles[i]->nPPF); 1270 memcpy( PhastaIOActiveFiles[i]->master_header+ 1271 PhastaIOActiveFiles[i]->nFields * 1272 MAX_FIELDS_NAME_LENGTH + 1273 MAX_FIELDS_NAME_LENGTH * 2, 1274 mpi_tag, 1275 MAX_FIELDS_NAME_LENGTH); 1276 1277 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1278 &magic_number, 1279 sizeof(int)); 1280 1281 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("mhsize : ") -1 + MAX_FIELDS_NAME_LENGTH/4*3, 1282 &MasterHeaderSize, 1283 sizeof(int)); 1284 } 1285 1286 int j = 0; 1287 unsigned long long **header_table; 1288 header_table = ( unsigned long long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long long *)); 1289 //header_table = ( unsigned long long ** ) calloc( PhastaIOActiveFiles[i]->nFields + pool_align, sizeof(unsigned long long *)); 1290 //mem_address = (long long )header_table; 1291 //if( mem_address & (pool_align -1) ) 1292 // header_table += pool_align - (mem_address & (pool_align -1)); 1293 1294 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1295 header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long long)); 1296 //header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF + pool_align, sizeof (unsigned long long *)); 1297 //mem_address = (long long )header_table[j]; 1298 //if( mem_address & (pool_align -1) ) 1299 // header_table[j] += pool_align - (mem_address & (pool_align - 1)); 1300 } 1301 1302 //if( irank == 0 ) printf("gonna mpi_gather, myrank = %d\n", irank); 1303 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1304 MPI_Gather( PhastaIOActiveFiles[i]->my_offset_table[j], 1305 PhastaIOActiveFiles[i]->nppp, 1306 MPI_LONG_LONG_INT, 1307 header_table[j], 1308 PhastaIOActiveFiles[i]->nppp, 1309 MPI_LONG_LONG_INT, 1310 0, 1311 PhastaIOActiveFiles[i]->local_comm ); 1312 } 1313 1314 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1315 1316 //if( irank == 0 ) printf("gonna memcpy for every procs, myrank = %d\n", irank); 1317 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1318 memcpy ( PhastaIOActiveFiles[i]->master_header + 1319 VERSION_INFO_HEADER_SIZE + 1320 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long), 1321 header_table[j], 1322 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long) ); 1323 } 1324 1325 //if( irank == 0 ) printf("gonna file_write_at(), myrank = %d\n", irank); 1326 MPI_File_write_at( PhastaIOActiveFiles[i]->file_handle, 1327 0, 1328 PhastaIOActiveFiles[i]->master_header, 1329 MasterHeaderSize, 1330 MPI_CHAR, 1331 &write_header_status ); 1332 } 1333 1334 ////free(PhastaIOActiveFiles[i]->master_header); 1335 1336 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1337 free ( header_table[j] ); 1338 } 1339 free (header_table); 1340 } 1341 1342 //if( irank == 0 ) printf("gonna file_close(), myrank = %d\n", irank); 1343 MPI_File_close( &( PhastaIOActiveFiles[i]->file_handle ) ); 1344 free ( imode ); 1345 } 1346 1347 endTimer(&timer_end); 1348 printPerf("closefile_", timer_start, timer_end, 0, 0, ""); 1349 } 1350 1351 int commRank() { 1352 int r; 1353 MPI_Comm_rank(MPI_COMM_WORLD, &r); 1354 return r; 1355 } 1356 1357 void readheader( int* fileDescriptor, 1358 const char keyphrase[], 1359 void* valueArray, 1360 int* nItems, 1361 const char datatype[], 1362 const char iotype[] ) 1363 { 1364 double timer_start, timer_end; 1365 1366 startTimer(&timer_start); 1367 1368 int i = *fileDescriptor; 1369 checkFileDescriptor("readheader",&i); 1370 1371 if ( PhastaIONextActiveIndex == 0 ) { 1372 int filePtr = *fileDescriptor - 1; 1373 FILE* fileObject; 1374 int* valueListInt; 1375 1376 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1377 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1378 fprintf(stderr,"openfile_ function has to be called before \n") ; 1379 fprintf(stderr,"acessing the file\n ") ; 1380 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1381 endTimer(&timer_end); 1382 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1383 return; 1384 } 1385 1386 LastHeaderKey[ filePtr ] = const_cast< char* >( keyphrase ); 1387 LastHeaderNotFound = false; 1388 1389 fileObject = fileArray[ filePtr ] ; 1390 Wrong_Endian = byte_order[ filePtr ]; 1391 1392 isBinary( iotype ); 1393 typeSize( datatype ); //redundant call, just avoid a compiler warning. 1394 1395 // right now we are making the assumption that we will only write integers 1396 // on the header line. 1397 1398 valueListInt = static_cast< int* >( valueArray ); 1399 int ierr = readHeader( fileObject , 1400 keyphrase, 1401 valueListInt, 1402 *nItems ) ; 1403 1404 byte_order[ filePtr ] = Wrong_Endian ; 1405 1406 if ( ierr ) LastHeaderNotFound = true; 1407 1408 //return ; // don't return, go to the end to print perf 1409 } 1410 else { 1411 unsigned int skip_size; 1412 int* valueListInt; 1413 valueListInt = static_cast <int*>(valueArray); 1414 char* token; 1415 bool FOUND = false ; 1416 isBinary( iotype ); 1417 1418 MPI_Status read_offset_status; 1419 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1420 char readouttag[MAX_FIELDS_NUMBER][MAX_FIELDS_NAME_LENGTH]; 1421 int j; 1422 1423 int string_length = strlen( keyphrase ); 1424 char* buffer = (char*) malloc ( string_length+1 ); 1425 //char* buffer = ( char * ) malloc( string_length + 1 + pool_align ); 1426 //mem_address = (long long )buffer; 1427 //if( mem_address & (pool_align -1) ) 1428 // buffer += pool_align - (mem_address & (pool_align -1)); 1429 1430 strcpy ( buffer, keyphrase ); 1431 buffer[ string_length ] = '\0'; 1432 1433 char* st2 = strtok ( buffer, "@" ); 1434 st2 = strtok (NULL, "@"); 1435 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1436 if ( char* p = strpbrk(buffer, "@") ) 1437 *p = '\0'; 1438 1439 // Check if the user has input the right GPid 1440 if ( ( PhastaIOActiveFiles[i]->GPid <= 1441 PhastaIOActiveFiles[i]->myrank * 1442 PhastaIOActiveFiles[i]->nppp )|| 1443 ( PhastaIOActiveFiles[i]->GPid > 1444 ( PhastaIOActiveFiles[i]->myrank + 1 ) * 1445 PhastaIOActiveFiles[i]->nppp ) ) 1446 { 1447 *fileDescriptor = NOT_A_MPI_FILE; 1448 printf("Error readheader: The file is not in syncIO new format, please check! myrank = %d, GPid = %d, nppp = %d, keyphrase = %s\n", PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->GPid, PhastaIOActiveFiles[i]->nppp, keyphrase); 1449 // It is possible atoi could not generate a clear integer from st2 because of additional garbage character in keyphrase 1450 endTimer(&timer_end); 1451 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1452 return; 1453 } 1454 1455 // Find the field we want ... 1456 //for ( j = 0; j<MAX_FIELDS_NUMBER; j++ ) 1457 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1458 { 1459 memcpy( readouttag[j], 1460 PhastaIOActiveFiles[i]->master_header + j*MAX_FIELDS_NAME_LENGTH+MAX_FIELDS_NAME_LENGTH*2+1, 1461 MAX_FIELDS_NAME_LENGTH-1 ); 1462 } 1463 1464 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1465 { 1466 token = strtok ( readouttag[j], ":" ); 1467 1468 //if ( cscompare( buffer, token ) ) 1469 if ( cscompare( token , buffer ) && cscompare( buffer, token ) ) 1470 // This double comparison is required for the field "number of nodes" and all the other fields that start with "number of nodes" (i.g. number of nodes in the mesh"). 1471 // Would be safer to rename "number of nodes" by "number of nodes in the part" so that the name are completely unique. But much more work to do that (Nspre, phParAdapt, etc). 1472 // Since the field name are unique in SyncIO (as it includes part ID), this should be safe and there should be no issue with the "?" trailing character. 1473 { 1474 PhastaIOActiveFiles[i]->read_field_count = j; 1475 FOUND = true; 1476 //printf("buffer: %s | token: %s | j: %d\n",buffer,token,j); 1477 break; 1478 } 1479 } 1480 free(buffer); 1481 1482 if (!FOUND) 1483 { 1484 //if(irank==0) printf("Warning readheader: Not found %s \n",keyphrase); //PhastaIOActiveFiles[i]->myrank is certainly initialized here. 1485 if(PhastaIOActiveFiles[i]->myrank == 0) printf("WARNING readheader: Not found %s\n",keyphrase); 1486 endTimer(&timer_end); 1487 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1488 return; 1489 } 1490 1491 // Find the part we want ... 1492 PhastaIOActiveFiles[i]->read_part_count = PhastaIOActiveFiles[i]->GPid - 1493 PhastaIOActiveFiles[i]->myrank * PhastaIOActiveFiles[i]->nppp - 1; 1494 1495 PhastaIOActiveFiles[i]->my_offset = 1496 PhastaIOActiveFiles[i]->my_read_table[PhastaIOActiveFiles[i]->read_field_count][PhastaIOActiveFiles[i]->read_part_count]; 1497 1498 // printf("****Rank %d offset is %d\n",PhastaIOActiveFiles[i]->myrank,PhastaIOActiveFiles[i]->my_offset); 1499 1500 // Read each datablock header here ... 1501 1502 MPI_File_read_at_all( PhastaIOActiveFiles[i]->file_handle, 1503 PhastaIOActiveFiles[i]->my_offset+1, 1504 read_out_tag, 1505 MAX_FIELDS_NAME_LENGTH-1, 1506 MPI_CHAR, 1507 &read_offset_status ); 1508 token = strtok ( read_out_tag, ":" ); 1509 1510 // printf("&&&&Rank %d read_out_tag is %s\n",PhastaIOActiveFiles[i]->myrank,read_out_tag); 1511 1512 if( cscompare( keyphrase , token ) ) //No need to compare also token with keyphrase like above. We should already have the right one. Otherwise there is a problem. 1513 { 1514 FOUND = true ; 1515 token = strtok( NULL, " ,;<>" ); 1516 skip_size = atoi( token ); 1517 for( j=0; j < *nItems && ( token = strtok( NULL," ,;<>") ); j++ ) 1518 valueListInt[j] = atoi( token ); 1519 1520 if ( j < *nItems ) 1521 { 1522 fprintf( stderr, "Expected # of ints not found for: %s\n", keyphrase ); 1523 } 1524 } 1525 else { 1526 //if(irank==0) 1527 if(PhastaIOActiveFiles[i]->myrank == 0) 1528 // If we enter this if, there is a problem with the name of some fields 1529 { 1530 printf("Error readheader: Unexpected mismatch between keyphrase = %s and token = %s\n",keyphrase,token); 1531 } 1532 } 1533 } 1534 1535 endTimer(&timer_end); 1536 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1537 1538 } 1539 1540 void readdatablock( int* fileDescriptor, 1541 const char keyphrase[], 1542 void* valueArray, 1543 int* nItems, 1544 const char datatype[], 1545 const char iotype[] ) 1546 { 1547 std::stringstream ss; 1548 ss << keyphrase << "@" << commRank()+1 << "?"; 1549 std::string s = ss.str(); 1550 keyphrase = s.c_str(); 1551 1552 //if(irank == 0) printf("entering readdatablock()\n"); 1553 unsigned long long data_size = 0; 1554 double timer_start, timer_end; 1555 startTimer(&timer_start); 1556 1557 int i = *fileDescriptor; 1558 checkFileDescriptor("readdatablock",&i); 1559 1560 if ( PhastaIONextActiveIndex == 0 ) { 1561 int filePtr = *fileDescriptor - 1; 1562 FILE* fileObject; 1563 char junk; 1564 1565 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1566 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1567 fprintf(stderr,"openfile_ function has to be called before\n") ; 1568 fprintf(stderr,"acessing the file\n ") ; 1569 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1570 endTimer(&timer_end); 1571 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1572 return; 1573 } 1574 1575 // error check.. 1576 // since we require that a consistant header always preceed the data block 1577 // let us check to see that it is actually the case. 1578 1579 if ( ! cscompare( LastHeaderKey[ filePtr ], keyphrase ) ) { 1580 fprintf(stderr, "Header not consistant with data block\n"); 1581 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ] ); 1582 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1583 fprintf(stderr, "Please recheck read sequence \n"); 1584 if( Strict_Error ) { 1585 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1586 endTimer(&timer_end); 1587 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1588 return; 1589 } 1590 } 1591 1592 if ( LastHeaderNotFound ) { 1593 endTimer(&timer_end); 1594 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1595 return; 1596 } 1597 fileObject = fileArray[ filePtr ]; 1598 Wrong_Endian = byte_order[ filePtr ]; 1599 1600 size_t type_size = typeSize( datatype ); 1601 int nUnits = *nItems; 1602 isBinary( iotype ); 1603 1604 if ( binary_format ) { 1605 fread( valueArray, type_size, nUnits, fileObject ); 1606 fread( &junk, sizeof(char), 1 , fileObject ); 1607 if ( Wrong_Endian ) SwapArrayByteOrder( valueArray, type_size, nUnits ); 1608 } else { 1609 1610 char* ts1 = StringStripper( datatype ); 1611 if ( cscompare( "integer", ts1 ) ) { 1612 for( int n=0; n < nUnits ; n++ ) 1613 fscanf(fileObject, "%d\n",(int*)((int*)valueArray+n) ); 1614 } else if ( cscompare( "double", ts1 ) ) { 1615 for( int n=0; n < nUnits ; n++ ) 1616 fscanf(fileObject, "%lf\n",(double*)((double*)valueArray+n) ); 1617 } 1618 free (ts1); 1619 } 1620 1621 //return; 1622 } 1623 else { 1624 // printf("read data block\n"); 1625 MPI_Status read_data_status; 1626 size_t type_size = typeSize( datatype ); 1627 int nUnits = *nItems; 1628 isBinary( iotype ); 1629 1630 // read datablock then 1631 //MR CHANGE 1632 // if ( cscompare ( datatype, "double")) 1633 char* ts2 = StringStripper( datatype ); 1634 if ( cscompare ( "double" , ts2)) 1635 //MR CHANGE END 1636 { 1637 1638 MPI_File_read_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1639 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1640 valueArray, 1641 nUnits, 1642 MPI_DOUBLE ); 1643 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1644 valueArray, 1645 &read_data_status ); 1646 data_size=8*nUnits; 1647 1648 } 1649 //MR CHANGE 1650 // else if ( cscompare ( datatype, "integer")) 1651 else if ( cscompare ( "integer" , ts2)) 1652 //MR CHANGE END 1653 { 1654 MPI_File_read_at_all_begin(PhastaIOActiveFiles[i]->file_handle, 1655 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1656 valueArray, 1657 nUnits, 1658 MPI_INT ); 1659 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1660 valueArray, 1661 &read_data_status ); 1662 data_size=4*nUnits; 1663 } 1664 else 1665 { 1666 *fileDescriptor = DATA_TYPE_ILLEGAL; 1667 printf("readdatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 1668 endTimer(&timer_end); 1669 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1670 return; 1671 } 1672 free(ts2); 1673 1674 1675 // printf("%d Read finishe\n",PhastaIOActiveFiles[i]->myrank); 1676 1677 // Swap data byte order if endianess is different ... 1678 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) 1679 { 1680 SwapArrayByteOrder( valueArray, type_size, nUnits ); 1681 } 1682 } 1683 1684 endTimer(&timer_end); 1685 char extra_msg[1024]; 1686 memset(extra_msg, '\0', 1024); 1687 char* key = StringStripper(keyphrase); 1688 sprintf(extra_msg, " field is %s ", key); 1689 printPerf("readdatablock", timer_start, timer_end, data_size, 1, extra_msg); 1690 free(key); 1691 1692 } 1693 1694 void writeheader( const int* fileDescriptor, 1695 const char keyphrase[], 1696 const void* valueArray, 1697 const int* nItems, 1698 const int* ndataItems, 1699 const char datatype[], 1700 const char iotype[]) 1701 { 1702 1703 //if(irank == 0) printf("entering writeheader()\n"); 1704 1705 double timer_start, timer_end; 1706 startTimer(&timer_start); 1707 1708 int i = *fileDescriptor; 1709 checkFileDescriptor("writeheader",&i); 1710 1711 if ( PhastaIONextActiveIndex == 0 ) { 1712 int filePtr = *fileDescriptor - 1; 1713 FILE* fileObject; 1714 1715 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1716 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1717 fprintf(stderr,"openfile_ function has to be called before \n") ; 1718 fprintf(stderr,"acessing the file\n ") ; 1719 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1720 endTimer(&timer_end); 1721 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1722 return; 1723 } 1724 1725 LastHeaderKey[ filePtr ] = const_cast< char* >( keyphrase ); 1726 DataSize = *ndataItems; 1727 fileObject = fileArray[ filePtr ] ; 1728 size_t type_size = typeSize( datatype ); 1729 isBinary( iotype ); 1730 header_type[ filePtr ] = type_size; 1731 1732 int _newline = ( *ndataItems > 0 ) ? sizeof( char ) : 0; 1733 int size_of_nextblock = 1734 ( binary_format ) ? type_size*( *ndataItems )+ _newline : *ndataItems ; 1735 1736 fprintf( fileObject, "%s : < %d > ", keyphrase, size_of_nextblock ); 1737 for( int i = 0; i < *nItems; i++ ) 1738 fprintf(fileObject, "%d ", *((int*)((int*)valueArray+i))); 1739 fprintf(fileObject, "\n"); 1740 1741 //return ; 1742 } 1743 else { // else it's parallel I/O 1744 DataSize = *ndataItems; 1745 size_t type_size = typeSize( datatype ); 1746 isBinary( iotype ); 1747 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1748 1749 int string_length = strlen( keyphrase ); 1750 char* buffer = (char*) malloc ( string_length+1 ); 1751 //char* buffer = ( char * ) malloc( string_length + 1 + pool_align ); 1752 //mem_address = (long long )buffer; 1753 //if( mem_address & (pool_align -1) ) 1754 // buffer += pool_align - (mem_address & (pool_align -1)); 1755 1756 strcpy ( buffer, keyphrase); 1757 buffer[ string_length ] = '\0'; 1758 1759 char* st2 = strtok ( buffer, "@" ); 1760 st2 = strtok (NULL, "@"); 1761 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1762 1763 if ( char* p = strpbrk(buffer, "@") ) 1764 *p = '\0'; 1765 1766 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1767 sprintf(mpi_tag, "\n%s : %d\n", buffer, PhastaIOActiveFiles[i]->field_count); 1768 unsigned long long offset_value; 1769 1770 int temp = *ndataItems; 1771 unsigned long long number_of_items = (unsigned long long)temp; 1772 MPI_Barrier(PhastaIOActiveFiles[i]->local_comm); 1773 1774 MPI_Scan( &number_of_items, 1775 &offset_value, 1776 1, 1777 MPI_LONG_LONG_INT, 1778 MPI_SUM, 1779 PhastaIOActiveFiles[i]->local_comm ); 1780 1781 offset_value = (offset_value - number_of_items) * type_size; 1782 1783 offset_value += PhastaIOActiveFiles[i]->local_myrank * 1784 DB_HEADER_SIZE + 1785 PhastaIOActiveFiles[i]->next_start_address; 1786 // This offset is the starting address of each datablock header... 1787 PhastaIOActiveFiles[i]->my_offset = offset_value; 1788 1789 // Write in my offset table ... 1790 PhastaIOActiveFiles[i]->my_offset_table[PhastaIOActiveFiles[i]->field_count][PhastaIOActiveFiles[i]->part_count] = 1791 PhastaIOActiveFiles[i]->my_offset; 1792 1793 // Update the next-start-address ... 1794 PhastaIOActiveFiles[i]->next_start_address = offset_value + 1795 number_of_items * type_size + 1796 DB_HEADER_SIZE; 1797 MPI_Bcast( &(PhastaIOActiveFiles[i]->next_start_address), 1798 1, 1799 MPI_LONG_LONG_INT, 1800 PhastaIOActiveFiles[i]->local_numprocs-1, 1801 PhastaIOActiveFiles[i]->local_comm ); 1802 1803 // Prepare datablock header ... 1804 int _newline = (*ndataItems>0)?sizeof(char):0; 1805 unsigned int size_of_nextblock = type_size * (*ndataItems) + _newline; 1806 1807 //char datablock_header[255]; 1808 //bzero((void*)datablock_header,255); 1809 char datablock_header[DB_HEADER_SIZE]; 1810 bzero((void*)datablock_header,DB_HEADER_SIZE); 1811 1812 PhastaIOActiveFiles[i]->GPid = PhastaIOActiveFiles[i]->nppp*PhastaIOActiveFiles[i]->myrank+PhastaIOActiveFiles[i]->part_count; 1813 sprintf( datablock_header, 1814 "\n%s : < %u >", 1815 keyphrase, 1816 size_of_nextblock ); 1817 1818 for ( int j = 0; j < *nItems; j++ ) 1819 { 1820 sprintf( datablock_header, 1821 "%s %d ", 1822 datablock_header, 1823 *((int*)((int*)valueArray+j))); 1824 } 1825 sprintf( datablock_header, 1826 "%s\n ", 1827 datablock_header ); 1828 1829 // Write datablock header ... 1830 //MR CHANGE 1831 // if ( cscompare(datatype,"double") ) 1832 char* ts1 = StringStripper( datatype ); 1833 if ( cscompare("double",ts1) ) 1834 //MR CHANGE END 1835 { 1836 free ( PhastaIOActiveFiles[i]->double_chunk ); 1837 PhastaIOActiveFiles[i]->double_chunk = ( double * )malloc( (sizeof( double )*number_of_items+ DB_HEADER_SIZE)); 1838 //PhastaIOActiveFiles[i]->double_chunk = ( double * ) malloc( sizeof( double )*number_of_items+ DB_HEADER_SIZE + pool_align ); 1839 //mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 1840 //if( mem_address & (pool_align -1) ) 1841 // PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 1842 1843 double * aa = ( double * )datablock_header; 1844 memcpy(PhastaIOActiveFiles[i]->double_chunk, aa, DB_HEADER_SIZE); 1845 } 1846 //MR CHANGE 1847 // if ( cscompare(datatype,"integer") ) 1848 else if ( cscompare("integer",ts1) ) 1849 //MR CHANGE END 1850 { 1851 free ( PhastaIOActiveFiles[i]->int_chunk ); 1852 PhastaIOActiveFiles[i]->int_chunk = ( int * )malloc( (sizeof( int )*number_of_items+ DB_HEADER_SIZE)); 1853 //PhastaIOActiveFiles[i]->int_chunk = ( int * ) malloc( sizeof( int )*number_of_items+ DB_HEADER_SIZE + pool_align ); 1854 //mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 1855 //if( mem_address & (pool_align -1) ) 1856 // PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & ( pool_align -1)); 1857 1858 int * aa = ( int * )datablock_header; 1859 memcpy(PhastaIOActiveFiles[i]->int_chunk, aa, DB_HEADER_SIZE); 1860 } 1861 else { 1862 // *fileDescriptor = DATA_TYPE_ILLEGAL; 1863 printf("writeheader - DATA_TYPE_ILLEGAL - %s\n",datatype); 1864 endTimer(&timer_end); 1865 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1866 return; 1867 } 1868 free(ts1); 1869 1870 PhastaIOActiveFiles[i]->part_count++; 1871 if ( PhastaIOActiveFiles[i]->part_count == PhastaIOActiveFiles[i]->nppp ) { 1872 //A new field will be written 1873 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1874 memcpy( PhastaIOActiveFiles[i]->master_header + 1875 PhastaIOActiveFiles[i]->field_count * 1876 MAX_FIELDS_NAME_LENGTH + 1877 MAX_FIELDS_NAME_LENGTH * 2, 1878 mpi_tag, 1879 MAX_FIELDS_NAME_LENGTH-1); 1880 } 1881 PhastaIOActiveFiles[i]->field_count++; 1882 PhastaIOActiveFiles[i]->part_count=0; 1883 } 1884 free(buffer); 1885 } 1886 1887 endTimer(&timer_end); 1888 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1889 } 1890 1891 void writedatablock( const int* fileDescriptor, 1892 const char keyphrase[], 1893 const void* valueArray, 1894 const int* nItems, 1895 const char datatype[], 1896 const char iotype[] ) 1897 { 1898 //if(irank == 0) printf("entering writedatablock()\n"); 1899 1900 unsigned long long data_size = 0; 1901 double timer_start, timer_end; 1902 startTimer(&timer_start); 1903 1904 int i = *fileDescriptor; 1905 checkFileDescriptor("writedatablock",&i); 1906 1907 if ( PhastaIONextActiveIndex == 0 ) { 1908 int filePtr = *fileDescriptor - 1; 1909 1910 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1911 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1912 fprintf(stderr,"openfile_ function has to be called before \n") ; 1913 fprintf(stderr,"acessing the file\n ") ; 1914 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1915 endTimer(&timer_end); 1916 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1917 return; 1918 } 1919 // since we require that a consistant header always preceed the data block 1920 // let us check to see that it is actually the case. 1921 1922 if ( ! cscompare( LastHeaderKey[ filePtr ], keyphrase ) ) { 1923 fprintf(stderr, "Header not consistant with data block\n"); 1924 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ] ); 1925 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1926 fprintf(stderr, "Please recheck write sequence \n"); 1927 if( Strict_Error ) { 1928 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1929 endTimer(&timer_end); 1930 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1931 return; 1932 } 1933 } 1934 1935 FILE* fileObject = fileArray[ filePtr ] ; 1936 size_t type_size=typeSize( datatype ); 1937 isBinary( iotype ); 1938 1939 if ( header_type[filePtr] != (int)type_size ) { 1940 fprintf(stderr,"header and datablock differ on typeof data in the block for\n"); 1941 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1942 if( Strict_Error ) { 1943 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1944 endTimer(&timer_end); 1945 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1946 return; 1947 } 1948 } 1949 1950 int nUnits = *nItems; 1951 1952 if ( nUnits != DataSize ) { 1953 fprintf(stderr,"header and datablock differ on number of data items for\n"); 1954 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1955 if( Strict_Error ) { 1956 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1957 endTimer(&timer_end); 1958 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1959 return; 1960 } 1961 } 1962 1963 if ( binary_format ) { 1964 1965 fwrite( valueArray, type_size, nUnits, fileObject ); 1966 fprintf( fileObject,"\n"); 1967 1968 } else { 1969 1970 char* ts1 = StringStripper( datatype ); 1971 if ( cscompare( "integer", ts1 ) ) { 1972 for( int n=0; n < nUnits ; n++ ) 1973 fprintf(fileObject,"%d\n",*((int*)((int*)valueArray+n))); 1974 } else if ( cscompare( "double", ts1 ) ) { 1975 for( int n=0; n < nUnits ; n++ ) 1976 fprintf(fileObject,"%lf\n",*((double*)((double*)valueArray+n))); 1977 } 1978 free (ts1); 1979 } 1980 //return ; 1981 } 1982 else { // syncIO case 1983 MPI_Status write_data_status; 1984 isBinary( iotype ); 1985 int nUnits = *nItems; 1986 1987 //MR CHANGE 1988 // if ( cscompare(datatype,"double") ) 1989 char* ts1 = StringStripper( datatype ); 1990 if ( cscompare("double",ts1) ) 1991 //MR CHANGE END 1992 { 1993 memcpy((PhastaIOActiveFiles[i]->double_chunk+DB_HEADER_SIZE/sizeof(double)), valueArray, nUnits*sizeof(double)); 1994 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1995 PhastaIOActiveFiles[i]->my_offset, 1996 PhastaIOActiveFiles[i]->double_chunk, 1997 //BLOCK_SIZE/sizeof(double), 1998 nUnits+DB_HEADER_SIZE/sizeof(double), 1999 MPI_DOUBLE ); 2000 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 2001 PhastaIOActiveFiles[i]->double_chunk, 2002 &write_data_status ); 2003 data_size=8*nUnits; 2004 } 2005 //MR CHANGE 2006 // else if ( cscompare ( datatype, "integer")) 2007 else if ( cscompare("integer",ts1) ) 2008 //MR CHANGE END 2009 { 2010 memcpy((PhastaIOActiveFiles[i]->int_chunk+DB_HEADER_SIZE/sizeof(int)), valueArray, nUnits*sizeof(int)); 2011 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 2012 PhastaIOActiveFiles[i]->my_offset, 2013 PhastaIOActiveFiles[i]->int_chunk, 2014 nUnits+DB_HEADER_SIZE/sizeof(int), 2015 MPI_INT ); 2016 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 2017 PhastaIOActiveFiles[i]->int_chunk, 2018 &write_data_status ); 2019 data_size=4*nUnits; 2020 } 2021 else { 2022 printf("Error: writedatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 2023 endTimer(&timer_end); 2024 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 2025 return; 2026 } 2027 free(ts1); 2028 } 2029 2030 endTimer(&timer_end); 2031 char extra_msg[1024]; 2032 memset(extra_msg, '\0', 1024); 2033 char* key = StringStripper(keyphrase); 2034 sprintf(extra_msg, " field is %s ", key); 2035 printPerf("writedatablock", timer_start, timer_end, data_size, 1, extra_msg); 2036 free(key); 2037 2038 } 2039 2040 void 2041 SwapArrayByteOrder( void* array, 2042 int nbytes, 2043 int nItems ) 2044 { 2045 /* This swaps the byte order for the array of nItems each 2046 of size nbytes , This will be called only locally */ 2047 int i,j; 2048 unsigned char* ucDst = (unsigned char*)array; 2049 2050 for(i=0; i < nItems; i++) { 2051 for(j=0; j < (nbytes/2); j++) 2052 std::swap( ucDst[j] , ucDst[(nbytes - 1) - j] ); 2053 ucDst += nbytes; 2054 } 2055 } 2056 2057 void 2058 writestring( int* fileDescriptor, 2059 const char inString[] ) 2060 { 2061 2062 int filePtr = *fileDescriptor - 1; 2063 FILE* fileObject = fileArray[filePtr] ; 2064 fprintf(fileObject,"%s",inString ); 2065 return; 2066 } 2067 2068 void 2069 Gather_Headers( int* fileDescriptor, 2070 vector< string >& headers ) 2071 { 2072 2073 FILE* fileObject; 2074 char Line[1024]; 2075 2076 fileObject = fileArray[ (*fileDescriptor)-1 ]; 2077 2078 while( !feof(fileObject) ) { 2079 fgets( Line, 1024, fileObject); 2080 if ( Line[0] == '#' ) { 2081 headers.push_back( Line ); 2082 } else { 2083 break; 2084 } 2085 } 2086 rewind( fileObject ); 2087 clearerr( fileObject ); 2088 } 2089 void 2090 isWrong( void ) { (Wrong_Endian) ? fprintf(stdout,"YES\n"): fprintf(stdout,"NO\n") ; } 2091 2092 void 2093 togglestrictmode( void ) { Strict_Error = !Strict_Error; } 2094 2095 int 2096 isLittleEndian( void ) 2097 { 2098 // this function returns a 1 if the current running architecture is 2099 // LittleEndian Byte Ordered, else it returns a zero 2100 2101 union { 2102 long a; 2103 char c[sizeof( long )]; 2104 } endianUnion; 2105 2106 endianUnion.a = 1 ; 2107 2108 if ( endianUnion.c[sizeof(long)-1] != 1 ) return 1 ; 2109 else return 0; 2110 } 2111 2112 namespace PHASTA { 2113 const char* const PhastaIO_traits<int>::type_string = "integer"; 2114 const char* const PhastaIO_traits<double>::type_string = "double"; 2115 } 2116