1 /* 2 * 3 * This is the SyncIO library that uses MPI-IO collective functions to 4 * implement a flexible I/O checkpoint solution for a large number of 5 * processors. 6 * 7 * Previous developer: Ning Liu (liun2@cs.rpi.edu) 8 * Jing Fu (fuj@cs.rpi.edu) 9 * Current developers: Michel Rasquin (Michel.Rasquin@colorado.edu), 10 * Ben Matthews (benjamin.a.matthews@colorado.edu) 11 * 12 */ 13 14 #include <map> 15 #include <vector> 16 #include <string> 17 #include <string.h> 18 #include <ctype.h> 19 #include <stdlib.h> 20 #include <stdio.h> 21 #include <math.h> 22 #include <sstream> 23 #include "phastaIO.h" 24 #include "mpi.h" 25 #include "phiotmrc.h" 26 27 #define VERSION_INFO_HEADER_SIZE 8192 28 #define DB_HEADER_SIZE 1024 29 #define ONE_MEGABYTE 1048576 30 #define TWO_MEGABYTE 2097152 31 #define ENDIAN_TEST_NUMBER 12180 // Troy's Zip Code!! 32 #define MAX_PHASTA_FILES 64 33 #define MAX_PHASTA_FILE_NAME_LENGTH 1024 34 #define MAX_FIELDS_NUMBER ((VERSION_INFO_HEADER_SIZE/MAX_FIELDS_NAME_LENGTH)-4) // The meta data include - MPI_IO_Tag, nFields, nFields*names of the fields, nppf 35 // -3 for MPI_IO_Tag, nFields and nppf, -4 for extra security (former nFiles) 36 #define MAX_FIELDS_NAME_LENGTH 128 37 #define DefaultMHSize (4*ONE_MEGABYTE) 38 //#define DefaultMHSize (8350) //For test 39 #define LATEST_WRITE_VERSION 1 40 #define inv1024sq 953.674316406e-9 // = 1/1024/1024 41 int MasterHeaderSize = -1; 42 43 bool PRINT_PERF = false; // default print no perf results 44 int irank = -1; // global rank, should never be manually manipulated 45 int mysize = -1; 46 47 // Static variables are bad but used here to store the subcommunicator and associated variables 48 // Prevent MPI_Comm_split to be called more than once, especially on BGQ with the V1R2M1 driver (leak detected in MPI_Comm_split - IBM working on it) 49 static int s_assign_local_comm = 0; 50 static MPI_Comm s_local_comm; 51 static int s_local_size = -1; 52 static int s_local_rank = -1; 53 54 //unsigned long long pool_align = 8; 55 //unsigned long long mem_address; 56 57 // the following defines are for debug printf 58 #define PHASTAIO_PREFIX "phastaIO debug: " 59 #define PHASTAIO_DEBUG 0 //default to not print any debugging info 60 61 #if PHASTAIO_DEBUG 62 #define phprintf( s, arg...) printf(PHASTAIO_PREFIX s "\n", ##arg) 63 #define phprintf_0( s, arg...) do{ \ 64 MPI_Comm_rank(MPI_COMM_WORLD, &irank); \ 65 if(irank == 0){ \ 66 printf(PHASTAIO_PREFIX "irank=0: " s "\n", ##arg); \ 67 } \ 68 } while(0) 69 #else 70 #define phprintf( s, arg...) 71 #define phprintf_0( s, arg...) 72 #endif 73 74 enum PhastaIO_Errors 75 { 76 MAX_PHASTA_FILES_EXCEEDED = -1, 77 UNABLE_TO_OPEN_FILE = -2, 78 NOT_A_MPI_FILE = -3, 79 GPID_EXCEEDED = -4, 80 DATA_TYPE_ILLEGAL = -5, 81 }; 82 83 using namespace std; 84 85 namespace{ 86 87 map< int , char* > LastHeaderKey; 88 vector< FILE* > fileArray; 89 vector< bool > byte_order; 90 vector< int > header_type; 91 int DataSize=0; 92 bool LastHeaderNotFound = false; 93 bool Wrong_Endian = false ; 94 bool Strict_Error = false ; 95 bool binary_format = true; 96 97 /***********************************************************************/ 98 /***************** NEW PHASTA IO CODE STARTS HERE **********************/ 99 /***********************************************************************/ 100 101 typedef struct 102 { 103 char filename[MAX_PHASTA_FILE_NAME_LENGTH]; /* defafults to 1024 */ 104 unsigned long long my_offset; 105 unsigned long long next_start_address; 106 unsigned long long **my_offset_table; 107 unsigned long long **my_read_table; 108 109 double * double_chunk; 110 double * read_double_chunk; 111 112 int field_count; 113 int part_count; 114 int read_field_count; 115 int read_part_count; 116 int GPid; 117 int start_id; 118 119 int mhsize; 120 121 int myrank; 122 int numprocs; 123 int local_myrank; 124 int local_numprocs; 125 126 int nppp; 127 int nPPF; 128 int nFiles; 129 int nFields; 130 131 int * int_chunk; 132 int * read_int_chunk; 133 134 int Wrong_Endian; /* default to false */ 135 char * master_header; 136 MPI_File file_handle; 137 MPI_Comm local_comm; 138 } phastaio_file_t; 139 140 typedef struct 141 { 142 //unsigned long long my_offset; 143 //unsigned long long **offset_table; 144 //int fileID; 145 int nppf, nfields; 146 //int GPid; 147 //int read_field_count; 148 char * masterHeader; 149 }serial_file; 150 151 serial_file *SerialFile; 152 phastaio_file_t *PhastaIOActiveFiles[MAX_PHASTA_FILES]; 153 int PhastaIONextActiveIndex = 0; /* indicates next index to allocate */ 154 155 // the caller has the responsibility to delete the returned string 156 // TODO: StringStipper("nbc value? ") returns NULL? 157 char* 158 StringStripper( const char istring[] ) { 159 160 int length = strlen( istring ); 161 162 //char* dest = new char [ length + 1 ]; 163 char* dest = (char *)malloc( length + 1 ); 164 165 //char * dest; 166 //dest = (char *)malloc( length + 1 + pool_align ); 167 //if (dest & 0x0F != 0) printf("not aligned!!!!\n"); 168 //mem_address = (long long )dest; 169 //if( mem_address & (pool_align -1) ) 170 // dest += pool_align - (mem_address & (pool_align -1)); 171 172 strcpy( dest, istring ); 173 dest[ length ] = '\0'; 174 175 if ( char* p = strpbrk( dest, " ") ) 176 *p = '\0'; 177 178 return dest; 179 } 180 181 inline int 182 cscompare( const char teststring[], 183 const char targetstring[] ) { 184 185 char* s1 = const_cast<char*>(teststring); 186 char* s2 = const_cast<char*>(targetstring); 187 188 while( *s1 == ' ') s1++; 189 while( *s2 == ' ') s2++; 190 while( ( *s1 ) 191 && ( *s2 ) 192 && ( *s2 != '?') 193 && ( tolower( *s1 )==tolower( *s2 ) ) ) { 194 s1++; 195 s2++; 196 while( *s1 == ' ') s1++; 197 while( *s2 == ' ') s2++; 198 } 199 if ( !( *s1 ) || ( *s1 == '?') ) return 1; 200 else return 0; 201 } 202 203 inline void 204 isBinary( const char iotype[] ) { 205 206 char* fname = StringStripper( iotype ); 207 if ( cscompare( fname, "binary" ) ) binary_format = true; 208 else binary_format = false; 209 free (fname); 210 211 } 212 213 inline size_t 214 typeSize( const char typestring[] ) { 215 216 char* ts1 = StringStripper( typestring ); 217 218 if ( cscompare( "integer", ts1 ) ) { 219 free (ts1); 220 return sizeof(int); 221 } else if ( cscompare( "double", ts1 ) ) { 222 free (ts1); 223 return sizeof( double ); 224 } else { 225 free (ts1); 226 fprintf(stderr,"unknown type : %s\n",ts1); 227 return 0; 228 } 229 } 230 231 int 232 readHeader( FILE* fileObject, 233 const char phrase[], 234 int* params, 235 int expect ) { 236 237 char* text_header; 238 char* token; 239 char Line[1024]; 240 char junk; 241 bool FOUND = false ; 242 int real_length; 243 int skip_size, integer_value; 244 int rewind_count=0; 245 246 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 247 rewind( fileObject ); 248 clearerr( fileObject ); 249 rewind_count++; 250 fgets( Line, 1024, fileObject ); 251 } 252 253 while( !FOUND && ( rewind_count < 2 ) ) { 254 if ( ( Line[0] != '\n' ) && ( real_length = strcspn( Line, "#" )) ){ 255 //text_header = new char [ real_length + 1 ]; 256 text_header = (char *)malloc( real_length + 1 ); 257 //text_header = (char *)malloc( real_length + 1 + pool_align ); 258 //mem_address = (long long )text_header; 259 //if( mem_address & (pool_align -1) ) 260 // text_header += pool_align - (mem_address & (pool_align -1)); 261 262 strncpy( text_header, Line, real_length ); 263 text_header[ real_length ] =static_cast<char>(NULL); 264 token = strtok ( text_header, ":" ); 265 //if( cscompare( phrase , token ) ) { 266 // Double comparison required because different fields can still start 267 // with the same name for mixed meshes (nbc code, nbc values, etc). 268 // Would be easy to fix cscompare instead but would it break sth else? 269 if( cscompare( phrase , token ) && cscompare( token , phrase ) ) { 270 FOUND = true ; 271 token = strtok( NULL, " ,;<>" ); 272 skip_size = atoi( token ); 273 int i; 274 for( i=0; i < expect && ( token = strtok( NULL," ,;<>") ); i++) { 275 params[i] = atoi( token ); 276 } 277 if ( i < expect ) { 278 fprintf(stderr,"Aloha Expected # of ints not found for: %s\n",phrase ); 279 } 280 } else if ( cscompare(token,"byteorder magic number") ) { 281 if ( binary_format ) { 282 fread((void*)&integer_value,sizeof(int),1,fileObject); 283 fread( &junk, sizeof(char), 1 , fileObject ); 284 if ( 362436 != integer_value ) Wrong_Endian = true; 285 } else{ 286 fscanf(fileObject, "%d\n", &integer_value ); 287 } 288 } else { 289 /* some other header, so just skip over */ 290 token = strtok( NULL, " ,;<>" ); 291 skip_size = atoi( token ); 292 if ( binary_format) 293 fseek( fileObject, skip_size, SEEK_CUR ); 294 else 295 for( int gama=0; gama < skip_size; gama++ ) 296 fgets( Line, 1024, fileObject ); 297 } 298 free (text_header); 299 } // end of if before while loop 300 301 if ( !FOUND ) 302 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 303 rewind( fileObject ); 304 clearerr( fileObject ); 305 rewind_count++; 306 fgets( Line, 1024, fileObject ); 307 } 308 } 309 310 if ( !FOUND ) { 311 //fprintf(stderr, "Error: Could not find: %s\n", phrase); 312 if(irank == 0) printf("WARNING: Could not find: %s\n", phrase); 313 return 1; 314 } 315 return 0; 316 } 317 318 } // end unnamed namespace 319 320 321 // begin of publicly visible functions 322 323 /** 324 * This function takes a long long pointer and assign (start) phiotmrc value to it 325 */ 326 //void startTimer(unsigned long long* start) { 327 void startTimer(double* start) { 328 329 if( !PRINT_PERF ) return; 330 331 MPI_Barrier(MPI_COMM_WORLD); 332 *start = phiotmrc(); 333 } 334 335 /** 336 * This function takes a long long pointer and assign (end) phiotmrc value to it 337 */ 338 void endTimer(double* end) { 339 340 if( !PRINT_PERF ) return; 341 342 *end = phiotmrc(); 343 MPI_Barrier(MPI_COMM_WORLD); 344 } 345 346 /** 347 * choose to print some performance results (or not) according to 348 * the PRINT_PERF macro 349 */ 350 void printPerf( 351 const char* func_name, 352 double start, 353 double end, 354 unsigned long long datasize, 355 int printdatainfo, 356 const char* extra_msg) { 357 358 if( !PRINT_PERF ) return; 359 360 unsigned long long data_size = datasize; 361 362 double time = end - start; 363 364 unsigned long long isizemin,isizemax,isizetot; 365 double sizemin,sizemax,sizeavg,sizetot,rate; 366 double tmin, tmax, tavg, ttot; 367 368 MPI_Allreduce(&time, &tmin,1, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD); 369 MPI_Allreduce(&time, &tmax,1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); 370 MPI_Allreduce(&time, &ttot,1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); 371 tavg = ttot/mysize; 372 373 if(irank == 0) { 374 if ( PhastaIONextActiveIndex == 0 ) printf("** 1PFPP "); 375 else printf("** syncIO "); 376 printf("%s(): Tmax = %f sec, Tmin = %f sec, Tavg = %f sec", func_name, tmax, tmin, tavg); 377 } 378 379 if(printdatainfo == 1) { // if printdatainfo ==1, compute I/O rate and block size 380 MPI_Allreduce(&data_size,&isizemin,1,MPI_LONG_LONG_INT,MPI_MIN,MPI_COMM_WORLD); 381 MPI_Allreduce(&data_size,&isizemax,1,MPI_LONG_LONG_INT,MPI_MAX,MPI_COMM_WORLD); 382 MPI_Allreduce(&data_size,&isizetot,1,MPI_LONG_LONG_INT,MPI_SUM,MPI_COMM_WORLD); 383 384 sizemin=(double)(isizemin*inv1024sq); 385 sizemax=(double)(isizemax*inv1024sq); 386 sizetot=(double)(isizetot*inv1024sq); 387 sizeavg=(double)(1.0*sizetot/mysize); 388 rate=(double)(1.0*sizetot/tmax); 389 390 if( irank == 0) { 391 printf(", Rate = %f MB/s [%s] \n \t\t\t block size: Min= %f MB; Max= %f MB; Avg= %f MB; Tot= %f MB\n", rate, extra_msg, sizemin,sizemax,sizeavg,sizetot); 392 } 393 } 394 else { 395 if(irank == 0) { 396 printf(" \n"); 397 //printf(" (%s) \n", extra_msg); 398 } 399 } 400 } 401 402 /** 403 * This function is normally called at the beginning of a read operation, before 404 * init function. 405 * This function (uses rank 0) reads out nfields, nppf, master header size, 406 * endianess and allocates for masterHeader string. 407 * These values are essential for following read operations. Rank 0 will bcast 408 * these values to other ranks in the commm world 409 * 410 * If the file set is of old POSIX format, it would throw error and exit 411 */ 412 void queryphmpiio(const char filename[],int *nfields, int *nppf) 413 { 414 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 415 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 416 417 if(irank == 0) { 418 FILE * fileHandle; 419 char* fname = StringStripper( filename ); 420 421 fileHandle = fopen (fname,"rb"); 422 if (fileHandle == NULL ) { 423 printf("\nError: File %s doesn't exist! Please check!\n",fname); 424 } 425 else { 426 SerialFile =(serial_file *)calloc( 1, sizeof( serial_file) ); 427 //SerialFile = (serial_file *)malloc( sizeof( serial_file ) + pool_align ); 428 //mem_address = (long long )SerialFile; 429 //if( mem_address & (pool_align -1) ) 430 // SerialFile += pool_align - (mem_address & (pool_align -1)); 431 432 //SerialFile->masterHeader = (char *)malloc(MasterHeaderSize); 433 //int meta_size_limit = 4*ONE_MEGABYTE; 434 //int meta_size_limit = (4+ MAX_FIELDS_NUMBER ) * MAX_FIELDS_NAME_LENGTH; 435 int meta_size_limit = VERSION_INFO_HEADER_SIZE; 436 SerialFile->masterHeader = (char *)malloc( meta_size_limit ); 437 //SerialFile->masterHeader = (char *)malloc( meta_size_limit + pool_align ); 438 //mem_address = (long long )SerialFile; 439 //if( mem_address & (pool_align -1) ) 440 // SerialFile->masterHeader += pool_align - (mem_address & (pool_align -1)); 441 442 fread(SerialFile->masterHeader, 1, meta_size_limit, fileHandle); 443 444 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 445 char version[MAX_FIELDS_NAME_LENGTH/4]; 446 int mhsize; 447 char * token; 448 int magic_number; 449 450 memcpy( read_out_tag, 451 SerialFile->masterHeader, 452 MAX_FIELDS_NAME_LENGTH-1 ); 453 454 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) { 455 // Test endianess ... 456 memcpy (&magic_number, 457 SerialFile->masterHeader + sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 458 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 459 460 if ( magic_number != ENDIAN_TEST_NUMBER ) { 461 printf("Endian is different!\n"); 462 // Will do swap later 463 } 464 465 // test version, old version, default masterheader size is 4M 466 // newer version masterheader size is read from first line 467 memcpy(version, 468 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/2, 469 MAX_FIELDS_NAME_LENGTH/4 - 1); //TODO: why -1? 470 471 if( cscompare ("version",version) ) { 472 // if there is "version" tag in the file, then it is newer format 473 // read master header size from here, otherwise use default 474 // Note: if version is "1", we know mhsize is at 3/4 place... 475 476 token = strtok(version, ":"); 477 token = strtok(NULL, " ,;<>" ); 478 int iversion = atoi(token); 479 480 if( iversion == 1) { 481 memcpy( &mhsize, 482 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/4*3 + sizeof("mhsize : ")-1, 483 sizeof(int)); 484 if ( magic_number != ENDIAN_TEST_NUMBER ) 485 SwapArrayByteOrder(&mhsize, sizeof(int), 1); 486 487 if( mhsize > DefaultMHSize ) { 488 //if actual headersize is larger than default, let's re-read 489 free(SerialFile->masterHeader); 490 SerialFile->masterHeader = (char *)malloc(mhsize); 491 fseek(fileHandle, 0, SEEK_SET); // reset the file stream position 492 fread(SerialFile->masterHeader,1,mhsize,fileHandle); 493 } 494 } 495 //TODO: check if this is a valid int?? 496 MasterHeaderSize = mhsize; 497 } 498 else { // else it's version 0's format w/o version tag, implicating MHSize=4M 499 MasterHeaderSize = DefaultMHSize; 500 } 501 502 memcpy( read_out_tag, 503 SerialFile->masterHeader+MAX_FIELDS_NAME_LENGTH+1, 504 MAX_FIELDS_NAME_LENGTH ); //TODO: why +1 505 506 // Read in # fields ... 507 token = strtok( read_out_tag, ":" ); 508 token = strtok( NULL," ,;<>" ); 509 *nfields = atoi( token ); 510 if ( *nfields > MAX_FIELDS_NUMBER) { 511 printf("Error queryphmpiio: nfields is larger than MAX_FIELDS_NUMBER!\n"); 512 } 513 SerialFile->nfields=*nfields; //TODO: sanity check of this int? 514 515 memcpy( read_out_tag, 516 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH * 2 517 + *nfields * MAX_FIELDS_NAME_LENGTH, 518 MAX_FIELDS_NAME_LENGTH); 519 520 token = strtok( read_out_tag, ":" ); 521 token = strtok( NULL," ,;<>" ); 522 *nppf = atoi( token ); 523 SerialFile->nppf=*nppf; //TODO: sanity check of int 524 } // end of if("MPI_IO_TAG") 525 else { 526 printf("Error queryphmpiio: The file you opened is not of syncIO new format, please check! read_out_tag = %s\n",read_out_tag); 527 exit(1); 528 } 529 fclose(fileHandle); 530 free(SerialFile->masterHeader); 531 free(SerialFile); 532 } //end of else 533 free(fname); 534 } 535 536 // Bcast value to every one 537 MPI_Bcast( nfields, 1, MPI_INT, 0, MPI_COMM_WORLD ); 538 MPI_Bcast( nppf, 1, MPI_INT, 0, MPI_COMM_WORLD ); 539 MPI_Bcast( &MasterHeaderSize, 1, MPI_INT, 0, MPI_COMM_WORLD ); 540 phprintf("Info queryphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 541 } 542 543 /** 544 * This function computes the right master header size (round to size of 2^n). 545 * This is only needed for file format version 1 in "write" mode. 546 */ 547 int computeMHSize(int nfields, int nppf, int version) { 548 int mhsize; 549 if(version == 1) { 550 //int meta_info_size = (2+nfields+1) * MAX_FIELDS_NAME_LENGTH; // 2 is MPI_IO_TAG and nFields, the others 1 is nppf 551 int meta_info_size = VERSION_INFO_HEADER_SIZE; 552 int actual_size = nfields * nppf * sizeof(long long) + meta_info_size; 553 //printf("actual_size = %d, offset table size = %d\n", actual_size, nfields * nppf * sizeof(long long)); 554 if (actual_size > DefaultMHSize) { 555 mhsize = (int) ceil( (double) actual_size/DefaultMHSize); // it's rounded to ceiling of this value 556 mhsize *= DefaultMHSize; 557 } 558 else { 559 mhsize = DefaultMHSize; 560 } 561 } 562 return mhsize; 563 } 564 565 /** 566 * Computes correct color of a rank according to number of files. 567 */ 568 extern "C" int computeColor( int myrank, int numprocs, int nfiles) { 569 int color = 570 (int)(myrank / (numprocs / nfiles)); 571 return color; 572 } 573 574 575 /** 576 * Check the file descriptor. 577 */ 578 void checkFileDescriptor(const char fctname[], 579 int* fileDescriptor ) { 580 if ( *fileDescriptor < 0 ) { 581 printf("Error: File descriptor = %d in %s\n",*fileDescriptor,fctname); 582 exit(1); 583 } 584 } 585 586 /** 587 * Initialize the file struct members and allocate space for file struct 588 * buffers. 589 * 590 * Note: this function is only called when we are using new format. Old POSIX 591 * format should skip this routine and call openfile() directly instead. 592 */ 593 int initphmpiio( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[]) 594 { 595 // we init irank again in case query not called (e.g. syncIO write case) 596 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 597 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 598 599 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 600 601 double timer_start, timer_end; 602 startTimer(&timer_start); 603 604 char* imode = StringStripper( mode ); 605 606 // Note: if it's read, we presume query was called prior to init and 607 // MasterHeaderSize is already set to correct value from parsing header 608 // otherwise it's write then it needs some computation to be set 609 if ( cscompare( "read", imode ) ) { 610 // do nothing 611 } 612 else if( cscompare( "write", imode ) ) { 613 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 614 } 615 else { 616 printf("Error initphmpiio: can't recognize the mode %s", imode); 617 exit(1); 618 } 619 free ( imode ); 620 621 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 622 623 int i, j; 624 625 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 626 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 627 endTimer(&timer_end); 628 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 629 return MAX_PHASTA_FILES_EXCEEDED; 630 } 631 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 632 // { 633 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 634 // { 635 // PhastaIOActiveFiles[i] = NULL; 636 // } 637 // } 638 639 640 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 641 //PhastaIOActiveFiles[PhastaIONextActiveIndex] = ( phastaio_file_t *)calloc( 1 + 1, sizeof( phastaio_file_t ) ); 642 //mem_address = (long long )PhastaIOActiveFiles[PhastaIONextActiveIndex]; 643 //if( mem_address & (pool_align -1) ) 644 // PhastaIOActiveFiles[PhastaIONextActiveIndex] += pool_align - (mem_address & (pool_align -1)); 645 646 i = PhastaIONextActiveIndex; 647 PhastaIONextActiveIndex++; 648 649 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 650 651 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 652 653 PhastaIOActiveFiles[i]->Wrong_Endian = false; 654 655 PhastaIOActiveFiles[i]->nFields = *nfields; 656 PhastaIOActiveFiles[i]->nPPF = *nppf; 657 PhastaIOActiveFiles[i]->nFiles = *nfiles; 658 MPI_Comm_rank(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->myrank)); 659 MPI_Comm_size(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->numprocs)); 660 661 662 if( *nfiles > 1 ) { // split the ranks according to each mpiio file 663 664 if ( s_assign_local_comm == 0) { // call mpi_comm_split for the first (and only) time 665 666 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Building subcommunicator\n"); 667 668 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 669 MPI_Comm_split(MPI_COMM_WORLD, 670 color, 671 PhastaIOActiveFiles[i]->myrank, 672 &(PhastaIOActiveFiles[i]->local_comm)); 673 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 674 &(PhastaIOActiveFiles[i]->local_numprocs)); 675 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 676 &(PhastaIOActiveFiles[i]->local_myrank)); 677 678 // back up now these variables so that we do not need to call comm_split again 679 s_local_comm = PhastaIOActiveFiles[i]->local_comm; 680 s_local_size = PhastaIOActiveFiles[i]->local_numprocs; 681 s_local_rank = PhastaIOActiveFiles[i]->local_myrank; 682 s_assign_local_comm = 1; 683 } 684 else { // recycle the subcommunicator 685 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Recycling subcommunicator\n"); 686 PhastaIOActiveFiles[i]->local_comm = s_local_comm; 687 PhastaIOActiveFiles[i]->local_numprocs = s_local_size; 688 PhastaIOActiveFiles[i]->local_myrank = s_local_rank; 689 } 690 } 691 else { // *nfiles == 1 here - no need to call mpi_comm_split here 692 693 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Bypassing subcommunicator\n"); 694 PhastaIOActiveFiles[i]->local_comm = MPI_COMM_WORLD; 695 PhastaIOActiveFiles[i]->local_numprocs = PhastaIOActiveFiles[i]->numprocs; 696 PhastaIOActiveFiles[i]->local_myrank = PhastaIOActiveFiles[i]->myrank; 697 698 } 699 700 PhastaIOActiveFiles[i]->nppp = 701 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 702 703 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 704 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 705 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 706 707 PhastaIOActiveFiles[i]->my_offset_table = 708 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 709 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof(unsigned long long *) ); 710 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table; 711 //if( mem_address & (pool_align -1) ) 712 // PhastaIOActiveFiles[i]->my_offset_table += pool_align - (mem_address & (pool_align -1)); 713 714 PhastaIOActiveFiles[i]->my_read_table = 715 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 716 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof( unsigned long long *) ); 717 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table; 718 //if( mem_address & (pool_align -1) ) 719 // PhastaIOActiveFiles[i]->my_read_table += pool_align - (mem_address & (pool_align -1)); 720 721 for (j=0; j<*nfields; j++) 722 { 723 PhastaIOActiveFiles[i]->my_offset_table[j] = 724 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 725 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp + pool_align, sizeof( unsigned long long) ); 726 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table[j]; 727 //if( mem_address & (pool_align -1) ) 728 // PhastaIOActiveFiles[i]->my_offset_table[j] += pool_align - (mem_address & (pool_align -1)); 729 730 PhastaIOActiveFiles[i]->my_read_table[j] = 731 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 732 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) + pool_align ); 733 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table[j]; 734 //if( mem_address & (pool_align -1) ) 735 // PhastaIOActiveFiles[i]->my_read_table[j] += pool_align - (mem_address & (pool_align -1)); 736 } 737 *filehandle = i; 738 739 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 740 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 741 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 742 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 743 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 744 745 /* 746 PhastaIOActiveFiles[i]->master_header = 747 ( char * ) calloc( MasterHeaderSize + pool_align, sizeof( char ) ); 748 mem_address = (long long )PhastaIOActiveFiles[i]->master_header; 749 if( mem_address & (pool_align -1) ) 750 PhastaIOActiveFiles[i]->master_header += pool_align - (mem_address & (pool_align -1)); 751 752 PhastaIOActiveFiles[i]->double_chunk = 753 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 754 mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 755 if( mem_address & (pool_align -1) ) 756 PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 757 758 PhastaIOActiveFiles[i]->int_chunk = 759 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 760 mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 761 if( mem_address & (pool_align -1) ) 762 PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & (pool_align -1)); 763 764 PhastaIOActiveFiles[i]->read_double_chunk = 765 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 766 mem_address = (long long )PhastaIOActiveFiles[i]->read_double_chunk; 767 if( mem_address & (pool_align -1) ) 768 PhastaIOActiveFiles[i]->read_double_chunk += pool_align - (mem_address & (pool_align -1)); 769 770 PhastaIOActiveFiles[i]->read_int_chunk = 771 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 772 mem_address = (long long )PhastaIOActiveFiles[i]->read_int_chunk; 773 if( mem_address & (pool_align -1) ) 774 PhastaIOActiveFiles[i]->read_int_chunk += pool_align - (mem_address & (pool_align -1)); 775 */ 776 777 // Time monitoring 778 endTimer(&timer_end); 779 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 780 781 phprintf_0("Info initphmpiio: quiting function"); 782 783 return i; 784 } 785 786 /** 787 * Destruct the file struct and free buffers allocated in init function. 788 */ 789 void finalizephmpiio( int *fileDescriptor ) 790 { 791 double timer_start, timer_end; 792 startTimer(&timer_start); 793 794 int i, j; 795 i = *fileDescriptor; 796 //PhastaIONextActiveIndex--; 797 798 /* //free the offset table for this phasta file */ 799 //for(j=0; j<MAX_FIELDS_NUMBER; j++) //Danger: undefined behavior for my_*_table.[j] not allocated or not initialized to NULL 800 for(j=0; j<PhastaIOActiveFiles[i]->nFields; j++) 801 { 802 free( PhastaIOActiveFiles[i]->my_offset_table[j]); 803 free( PhastaIOActiveFiles[i]->my_read_table[j]); 804 } 805 free ( PhastaIOActiveFiles[i]->my_offset_table ); 806 free ( PhastaIOActiveFiles[i]->my_read_table ); 807 free ( PhastaIOActiveFiles[i]->master_header ); 808 free ( PhastaIOActiveFiles[i]->double_chunk ); 809 free ( PhastaIOActiveFiles[i]->int_chunk ); 810 free ( PhastaIOActiveFiles[i]->read_double_chunk ); 811 free ( PhastaIOActiveFiles[i]->read_int_chunk ); 812 813 free( PhastaIOActiveFiles[i]); 814 815 endTimer(&timer_end); 816 printPerf("finalizempiio", timer_start, timer_end, 0, 0, ""); 817 818 PhastaIONextActiveIndex--; 819 } 820 821 822 /** 823 * Special init for M2N in order to create a subcommunicator for the reduced solution (requires PRINT_PERF to be false for now) 824 * Initialize the file struct members and allocate space for file struct buffers. 825 * 826 * Note: this function is only called when we are using new format. Old POSIX 827 * format should skip this routine and call openfile() directly instead. 828 */ 829 int initphmpiiosub( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[],MPI_Comm my_local_comm) 830 { 831 // we init irank again in case query not called (e.g. syncIO write case) 832 833 MPI_Comm_rank(my_local_comm, &irank); 834 MPI_Comm_size(my_local_comm, &mysize); 835 836 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 837 838 double timer_start, timer_end; 839 startTimer(&timer_start); 840 841 char* imode = StringStripper( mode ); 842 843 // Note: if it's read, we presume query was called prior to init and 844 // MasterHeaderSize is already set to correct value from parsing header 845 // otherwise it's write then it needs some computation to be set 846 if ( cscompare( "read", imode ) ) { 847 // do nothing 848 } 849 else if( cscompare( "write", imode ) ) { 850 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 851 } 852 else { 853 printf("Error initphmpiio: can't recognize the mode %s", imode); 854 exit(1); 855 } 856 free ( imode ); 857 858 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 859 860 int i, j; 861 862 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 863 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 864 endTimer(&timer_end); 865 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 866 return MAX_PHASTA_FILES_EXCEEDED; 867 } 868 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 869 // { 870 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 871 // { 872 // PhastaIOActiveFiles[i] = NULL; 873 // } 874 // } 875 876 877 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 878 //PhastaIOActiveFiles[PhastaIONextActiveIndex] = ( phastaio_file_t *)calloc( 1 + 1, sizeof( phastaio_file_t ) ); 879 //mem_address = (long long )PhastaIOActiveFiles[PhastaIONextActiveIndex]; 880 //if( mem_address & (pool_align -1) ) 881 // PhastaIOActiveFiles[PhastaIONextActiveIndex] += pool_align - (mem_address & (pool_align -1)); 882 883 i = PhastaIONextActiveIndex; 884 PhastaIONextActiveIndex++; 885 886 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 887 888 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 889 890 PhastaIOActiveFiles[i]->Wrong_Endian = false; 891 892 PhastaIOActiveFiles[i]->nFields = *nfields; 893 PhastaIOActiveFiles[i]->nPPF = *nppf; 894 PhastaIOActiveFiles[i]->nFiles = *nfiles; 895 MPI_Comm_rank(my_local_comm, &(PhastaIOActiveFiles[i]->myrank)); 896 MPI_Comm_size(my_local_comm, &(PhastaIOActiveFiles[i]->numprocs)); 897 898 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 899 MPI_Comm_split(my_local_comm, 900 color, 901 PhastaIOActiveFiles[i]->myrank, 902 &(PhastaIOActiveFiles[i]->local_comm)); 903 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 904 &(PhastaIOActiveFiles[i]->local_numprocs)); 905 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 906 &(PhastaIOActiveFiles[i]->local_myrank)); 907 PhastaIOActiveFiles[i]->nppp = 908 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 909 910 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 911 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 912 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 913 914 PhastaIOActiveFiles[i]->my_offset_table = 915 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 916 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof(unsigned long long *) ); 917 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table; 918 //if( mem_address & (pool_align -1) ) 919 // PhastaIOActiveFiles[i]->my_offset_table += pool_align - (mem_address & (pool_align -1)); 920 921 PhastaIOActiveFiles[i]->my_read_table = 922 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 923 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof( unsigned long long *) ); 924 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table; 925 //if( mem_address & (pool_align -1) ) 926 // PhastaIOActiveFiles[i]->my_read_table += pool_align - (mem_address & (pool_align -1)); 927 928 for (j=0; j<*nfields; j++) 929 { 930 PhastaIOActiveFiles[i]->my_offset_table[j] = 931 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 932 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp + pool_align, sizeof( unsigned long long) ); 933 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table[j]; 934 //if( mem_address & (pool_align -1) ) 935 // PhastaIOActiveFiles[i]->my_offset_table[j] += pool_align - (mem_address & (pool_align -1)); 936 937 PhastaIOActiveFiles[i]->my_read_table[j] = 938 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 939 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) + pool_align ); 940 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table[j]; 941 //if( mem_address & (pool_align -1) ) 942 // PhastaIOActiveFiles[i]->my_read_table[j] += pool_align - (mem_address & (pool_align -1)); 943 } 944 *filehandle = i; 945 946 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 947 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 948 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 949 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 950 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 951 952 /* 953 PhastaIOActiveFiles[i]->master_header = 954 ( char * ) calloc( MasterHeaderSize + pool_align, sizeof( char ) ); 955 mem_address = (long long )PhastaIOActiveFiles[i]->master_header; 956 if( mem_address & (pool_align -1) ) 957 PhastaIOActiveFiles[i]->master_header += pool_align - (mem_address & (pool_align -1)); 958 959 PhastaIOActiveFiles[i]->double_chunk = 960 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 961 mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 962 if( mem_address & (pool_align -1) ) 963 PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 964 965 PhastaIOActiveFiles[i]->int_chunk = 966 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 967 mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 968 if( mem_address & (pool_align -1) ) 969 PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & (pool_align -1)); 970 971 PhastaIOActiveFiles[i]->read_double_chunk = 972 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 973 mem_address = (long long )PhastaIOActiveFiles[i]->read_double_chunk; 974 if( mem_address & (pool_align -1) ) 975 PhastaIOActiveFiles[i]->read_double_chunk += pool_align - (mem_address & (pool_align -1)); 976 977 PhastaIOActiveFiles[i]->read_int_chunk = 978 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 979 mem_address = (long long )PhastaIOActiveFiles[i]->read_int_chunk; 980 if( mem_address & (pool_align -1) ) 981 PhastaIOActiveFiles[i]->read_int_chunk += pool_align - (mem_address & (pool_align -1)); 982 */ 983 984 // Time monitoring 985 endTimer(&timer_end); 986 printPerf("initphmpiiosub", timer_start, timer_end, 0, 0, ""); 987 988 phprintf_0("Info initphmpiiosub: quiting function"); 989 990 return i; 991 } 992 993 994 995 /** open file for both POSIX and MPI-IO syncIO format. 996 * 997 * If it's old POSIX format, simply call posix fopen(). 998 * 999 * If it's MPI-IO foramt: 1000 * in "read" mode, it builds the header table that points to the offset of 1001 * fields for parts; 1002 * in "write" mode, it opens the file with MPI-IO open routine. 1003 */ 1004 void openfile(const char filename[], 1005 const char mode[], 1006 int* fileDescriptor ) 1007 { 1008 phprintf_0("Info: entering openfile"); 1009 1010 double timer_start, timer_end; 1011 startTimer(&timer_start); 1012 1013 if ( PhastaIONextActiveIndex == 0 ) 1014 { 1015 FILE* file=NULL ; 1016 *fileDescriptor = 0; 1017 char* fname = StringStripper( filename ); 1018 char* imode = StringStripper( mode ); 1019 1020 if ( cscompare( "read", imode ) ) file = fopen(fname, "rb" ); 1021 else if( cscompare( "write", imode ) ) file = fopen(fname, "wb" ); 1022 else if( cscompare( "append", imode ) ) file = fopen(fname, "ab" ); 1023 1024 if ( !file ){ 1025 fprintf(stderr,"Error openfile: unable to open file %s",fname ) ; 1026 } else { 1027 fileArray.push_back( file ); 1028 byte_order.push_back( false ); 1029 header_type.push_back( sizeof(int) ); 1030 *fileDescriptor = fileArray.size(); 1031 } 1032 free (fname); 1033 free (imode); 1034 } 1035 else // else it would be parallel I/O, opposed to posix io 1036 { 1037 char* fname = StringStripper( filename ); 1038 char* imode = StringStripper( mode ); 1039 int rc; 1040 int i = *fileDescriptor; 1041 checkFileDescriptor("openfile",&i); 1042 char* token; 1043 1044 if ( cscompare( "read", imode ) ) 1045 { 1046 // if (PhastaIOActiveFiles[i]->myrank == 0) 1047 // printf("\n **********\nRead open ... ... regular version\n"); 1048 1049 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1050 fname, 1051 MPI_MODE_RDONLY, 1052 MPI_INFO_NULL, 1053 &(PhastaIOActiveFiles[i]->file_handle) ); 1054 1055 if(rc) 1056 { 1057 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1058 printf("Error openfile: Unable to open file %s! File descriptor = %d\n",fname,*fileDescriptor); 1059 endTimer(&timer_end); 1060 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1061 return; 1062 } 1063 1064 MPI_Status read_tag_status; 1065 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1066 int j; 1067 int magic_number; 1068 1069 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1070 MPI_File_read_at( PhastaIOActiveFiles[i]->file_handle, 1071 0, 1072 PhastaIOActiveFiles[i]->master_header, 1073 MasterHeaderSize, 1074 MPI_CHAR, 1075 &read_tag_status ); 1076 } 1077 1078 MPI_Bcast( PhastaIOActiveFiles[i]->master_header, 1079 MasterHeaderSize, 1080 MPI_CHAR, 1081 0, 1082 PhastaIOActiveFiles[i]->local_comm ); 1083 1084 memcpy( read_out_tag, 1085 PhastaIOActiveFiles[i]->master_header, 1086 MAX_FIELDS_NAME_LENGTH-1 ); 1087 1088 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) 1089 { 1090 // Test endianess ... 1091 memcpy ( &magic_number, 1092 PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1093 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 1094 1095 if ( magic_number != ENDIAN_TEST_NUMBER ) 1096 { 1097 PhastaIOActiveFiles[i]->Wrong_Endian = true; 1098 } 1099 1100 memcpy( read_out_tag, 1101 PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH+1, // TODO: WHY +1??? 1102 MAX_FIELDS_NAME_LENGTH ); 1103 1104 // Read in # fields ... 1105 token = strtok ( read_out_tag, ":" ); 1106 token = strtok( NULL," ,;<>" ); 1107 PhastaIOActiveFiles[i]->nFields = atoi( token ); 1108 1109 unsigned long long **header_table; 1110 header_table = ( unsigned long long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long long *)); 1111 //header_table = ( unsigned long long ** ) calloc( PhastaIOActiveFiles[i]->nFields + pool_align, sizeof(unsigned long long *)); 1112 //mem_address = (long long )header_table; 1113 //if( mem_address & (pool_align -1) ) 1114 // header_table += pool_align - (mem_address & (pool_align -1)); 1115 1116 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1117 { 1118 header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long long)); 1119 //header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF + pool_align, sizeof(unsigned long long *)); 1120 //mem_address = (long long )header_table[j]; 1121 //if( mem_address & (pool_align -1) ) 1122 // header_table[j] += pool_align - (mem_address & (pool_align -1)); 1123 } 1124 1125 // Read in the offset table ... 1126 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1127 { 1128 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1129 memcpy( header_table[j], 1130 PhastaIOActiveFiles[i]->master_header + 1131 VERSION_INFO_HEADER_SIZE + 1132 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long), 1133 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long) ); 1134 } 1135 1136 MPI_Scatter( header_table[j], 1137 PhastaIOActiveFiles[i]->nppp, 1138 MPI_LONG_LONG_INT, 1139 PhastaIOActiveFiles[i]->my_read_table[j], 1140 PhastaIOActiveFiles[i]->nppp, 1141 MPI_LONG_LONG_INT, 1142 0, 1143 PhastaIOActiveFiles[i]->local_comm ); 1144 1145 // Swap byte order if endianess is different ... 1146 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) { 1147 SwapArrayByteOrder( PhastaIOActiveFiles[i]->my_read_table[j], 1148 sizeof(long long int), 1149 PhastaIOActiveFiles[i]->nppp ); 1150 } 1151 } 1152 1153 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1154 free ( header_table[j] ); 1155 } 1156 free (header_table); 1157 1158 } // end of if MPI_IO_TAG 1159 else //else not valid MPI file 1160 { 1161 *fileDescriptor = NOT_A_MPI_FILE; 1162 printf("Error openfile: The file %s you opened is not in syncIO new format, please check again! File descriptor = %d, MasterHeaderSize = %d, read_out_tag = %s\n",fname,*fileDescriptor,MasterHeaderSize,read_out_tag); 1163 //Printing MasterHeaderSize is useful to test a compiler bug on Intrepid BGP 1164 endTimer(&timer_end); 1165 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1166 return; 1167 } 1168 } // end of if "read" 1169 else if( cscompare( "write", imode ) ) 1170 { 1171 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1172 fname, 1173 MPI_MODE_WRONLY | MPI_MODE_CREATE, 1174 MPI_INFO_NULL, 1175 &(PhastaIOActiveFiles[i]->file_handle) ); 1176 if(rc) 1177 { 1178 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1179 return; 1180 } 1181 } // end of if "write" 1182 free (fname); 1183 free (imode); 1184 } // end of if FileIndex != 0 1185 1186 endTimer(&timer_end); 1187 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1188 } 1189 1190 /** close file for both POSIX and MPI-IO syncIO format. 1191 * 1192 * If it's old POSIX format, simply call posix fclose(). 1193 * 1194 * If it's MPI-IO foramt: 1195 * in "read" mode, it simply close file with MPI-IO close routine. 1196 * in "write" mode, rank 0 in each group will re-assemble the master header and 1197 * offset table and write to the beginning of file, then close the file. 1198 */ 1199 void closefile( int* fileDescriptor, 1200 const char mode[] ) 1201 { 1202 double timer_start, timer_end; 1203 startTimer(&timer_start); 1204 1205 int i = *fileDescriptor; 1206 checkFileDescriptor("closefile",&i); 1207 1208 if ( PhastaIONextActiveIndex == 0 ) { 1209 char* imode = StringStripper( mode ); 1210 1211 if( cscompare( "write", imode ) 1212 || cscompare( "append", imode ) ) { 1213 fflush( fileArray[ *fileDescriptor - 1 ] ); 1214 } 1215 1216 fclose( fileArray[ *fileDescriptor - 1 ] ); 1217 free (imode); 1218 } 1219 else { 1220 char* imode = StringStripper( mode ); 1221 1222 //write master header here: 1223 if ( cscompare( "write", imode ) ) { 1224 // if ( PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields < 2*ONE_MEGABYTE/8 ) //SHOULD BE CHECKED 1225 // MasterHeaderSize = 4*ONE_MEGABYTE; 1226 // else 1227 // MasterHeaderSize = 4*ONE_MEGABYTE + PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields * 8 - 2*ONE_MEGABYTE; 1228 1229 MasterHeaderSize = computeMHSize( PhastaIOActiveFiles[i]->nFields, PhastaIOActiveFiles[i]->nPPF, LATEST_WRITE_VERSION); 1230 phprintf_0("Info closefile: myrank = %d, MasterHeaderSize = %d\n", PhastaIOActiveFiles[i]->myrank, MasterHeaderSize); 1231 1232 MPI_Status write_header_status; 1233 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1234 char version[MAX_FIELDS_NAME_LENGTH/4]; 1235 char mhsize[MAX_FIELDS_NAME_LENGTH/4]; 1236 int magic_number = ENDIAN_TEST_NUMBER; 1237 1238 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) 1239 { 1240 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1241 sprintf(mpi_tag, "MPI_IO_Tag : "); 1242 memcpy(PhastaIOActiveFiles[i]->master_header, 1243 mpi_tag, 1244 MAX_FIELDS_NAME_LENGTH); 1245 1246 bzero((void*)version,MAX_FIELDS_NAME_LENGTH/4); 1247 // this version is "1", print version in ASCII 1248 sprintf(version, "version : %d",1); 1249 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/2, 1250 version, 1251 MAX_FIELDS_NAME_LENGTH/4); 1252 1253 // master header size is computed using the formula above 1254 bzero((void*)mhsize,MAX_FIELDS_NAME_LENGTH/4); 1255 sprintf(mhsize, "mhsize : "); 1256 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/4*3, 1257 mhsize, 1258 MAX_FIELDS_NAME_LENGTH/4); 1259 1260 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1261 sprintf(mpi_tag, 1262 "\nnFields : %d\n", 1263 PhastaIOActiveFiles[i]->nFields); 1264 memcpy(PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH, 1265 mpi_tag, 1266 MAX_FIELDS_NAME_LENGTH); 1267 1268 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1269 sprintf(mpi_tag, "\nnPPF : %d\n", PhastaIOActiveFiles[i]->nPPF); 1270 memcpy( PhastaIOActiveFiles[i]->master_header+ 1271 PhastaIOActiveFiles[i]->nFields * 1272 MAX_FIELDS_NAME_LENGTH + 1273 MAX_FIELDS_NAME_LENGTH * 2, 1274 mpi_tag, 1275 MAX_FIELDS_NAME_LENGTH); 1276 1277 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1278 &magic_number, 1279 sizeof(int)); 1280 1281 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("mhsize : ") -1 + MAX_FIELDS_NAME_LENGTH/4*3, 1282 &MasterHeaderSize, 1283 sizeof(int)); 1284 } 1285 1286 int j = 0; 1287 unsigned long long **header_table; 1288 header_table = ( unsigned long long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long long *)); 1289 //header_table = ( unsigned long long ** ) calloc( PhastaIOActiveFiles[i]->nFields + pool_align, sizeof(unsigned long long *)); 1290 //mem_address = (long long )header_table; 1291 //if( mem_address & (pool_align -1) ) 1292 // header_table += pool_align - (mem_address & (pool_align -1)); 1293 1294 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1295 header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long long)); 1296 //header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF + pool_align, sizeof (unsigned long long *)); 1297 //mem_address = (long long )header_table[j]; 1298 //if( mem_address & (pool_align -1) ) 1299 // header_table[j] += pool_align - (mem_address & (pool_align - 1)); 1300 } 1301 1302 //if( irank == 0 ) printf("gonna mpi_gather, myrank = %d\n", irank); 1303 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1304 MPI_Gather( PhastaIOActiveFiles[i]->my_offset_table[j], 1305 PhastaIOActiveFiles[i]->nppp, 1306 MPI_LONG_LONG_INT, 1307 header_table[j], 1308 PhastaIOActiveFiles[i]->nppp, 1309 MPI_LONG_LONG_INT, 1310 0, 1311 PhastaIOActiveFiles[i]->local_comm ); 1312 } 1313 1314 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1315 1316 //if( irank == 0 ) printf("gonna memcpy for every procs, myrank = %d\n", irank); 1317 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1318 memcpy ( PhastaIOActiveFiles[i]->master_header + 1319 VERSION_INFO_HEADER_SIZE + 1320 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long), 1321 header_table[j], 1322 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long) ); 1323 } 1324 1325 //if( irank == 0 ) printf("gonna file_write_at(), myrank = %d\n", irank); 1326 MPI_File_write_at( PhastaIOActiveFiles[i]->file_handle, 1327 0, 1328 PhastaIOActiveFiles[i]->master_header, 1329 MasterHeaderSize, 1330 MPI_CHAR, 1331 &write_header_status ); 1332 } 1333 1334 ////free(PhastaIOActiveFiles[i]->master_header); 1335 1336 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1337 free ( header_table[j] ); 1338 } 1339 free (header_table); 1340 } 1341 1342 //if( irank == 0 ) printf("gonna file_close(), myrank = %d\n", irank); 1343 MPI_File_close( &( PhastaIOActiveFiles[i]->file_handle ) ); 1344 free ( imode ); 1345 } 1346 1347 endTimer(&timer_end); 1348 printPerf("closefile_", timer_start, timer_end, 0, 0, ""); 1349 } 1350 1351 int commRank() { 1352 int r; 1353 MPI_Comm_rank(MPI_COMM_WORLD, &r); 1354 return r; 1355 } 1356 1357 void readheader( int* fileDescriptor, 1358 const char keyphrase[], 1359 void* valueArray, 1360 int* nItems, 1361 const char datatype[], 1362 const char iotype[] ) 1363 { 1364 std::stringstream ss; 1365 ss << keyphrase << "@" << commRank()+1 << "?"; 1366 std::string s = ss.str(); 1367 keyphrase = s.c_str(); 1368 double timer_start, timer_end; 1369 1370 startTimer(&timer_start); 1371 1372 int i = *fileDescriptor; 1373 checkFileDescriptor("readheader",&i); 1374 1375 if ( PhastaIONextActiveIndex == 0 ) { 1376 int filePtr = *fileDescriptor - 1; 1377 FILE* fileObject; 1378 int* valueListInt; 1379 1380 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1381 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1382 fprintf(stderr,"openfile_ function has to be called before \n") ; 1383 fprintf(stderr,"acessing the file\n ") ; 1384 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1385 endTimer(&timer_end); 1386 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1387 return; 1388 } 1389 1390 LastHeaderKey[ filePtr ] = const_cast< char* >( keyphrase ); 1391 LastHeaderNotFound = false; 1392 1393 fileObject = fileArray[ filePtr ] ; 1394 Wrong_Endian = byte_order[ filePtr ]; 1395 1396 isBinary( iotype ); 1397 typeSize( datatype ); //redundant call, just avoid a compiler warning. 1398 1399 // right now we are making the assumption that we will only write integers 1400 // on the header line. 1401 1402 valueListInt = static_cast< int* >( valueArray ); 1403 int ierr = readHeader( fileObject , 1404 keyphrase, 1405 valueListInt, 1406 *nItems ) ; 1407 1408 byte_order[ filePtr ] = Wrong_Endian ; 1409 1410 if ( ierr ) LastHeaderNotFound = true; 1411 1412 //return ; // don't return, go to the end to print perf 1413 } 1414 else { 1415 unsigned int skip_size; 1416 int* valueListInt; 1417 valueListInt = static_cast <int*>(valueArray); 1418 char* token; 1419 bool FOUND = false ; 1420 isBinary( iotype ); 1421 1422 MPI_Status read_offset_status; 1423 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1424 char readouttag[MAX_FIELDS_NUMBER][MAX_FIELDS_NAME_LENGTH]; 1425 int j; 1426 1427 int string_length = strlen( keyphrase ); 1428 char* buffer = (char*) malloc ( string_length+1 ); 1429 //char* buffer = ( char * ) malloc( string_length + 1 + pool_align ); 1430 //mem_address = (long long )buffer; 1431 //if( mem_address & (pool_align -1) ) 1432 // buffer += pool_align - (mem_address & (pool_align -1)); 1433 1434 strcpy ( buffer, keyphrase ); 1435 buffer[ string_length ] = '\0'; 1436 1437 char* st2 = strtok ( buffer, "@" ); 1438 st2 = strtok (NULL, "@"); 1439 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1440 if ( char* p = strpbrk(buffer, "@") ) 1441 *p = '\0'; 1442 1443 // Check if the user has input the right GPid 1444 if ( ( PhastaIOActiveFiles[i]->GPid <= 1445 PhastaIOActiveFiles[i]->myrank * 1446 PhastaIOActiveFiles[i]->nppp )|| 1447 ( PhastaIOActiveFiles[i]->GPid > 1448 ( PhastaIOActiveFiles[i]->myrank + 1 ) * 1449 PhastaIOActiveFiles[i]->nppp ) ) 1450 { 1451 *fileDescriptor = NOT_A_MPI_FILE; 1452 printf("Error readheader: The file is not in syncIO new format, please check! myrank = %d, GPid = %d, nppp = %d, keyphrase = %s\n", PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->GPid, PhastaIOActiveFiles[i]->nppp, keyphrase); 1453 // It is possible atoi could not generate a clear integer from st2 because of additional garbage character in keyphrase 1454 endTimer(&timer_end); 1455 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1456 return; 1457 } 1458 1459 // Find the field we want ... 1460 //for ( j = 0; j<MAX_FIELDS_NUMBER; j++ ) 1461 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1462 { 1463 memcpy( readouttag[j], 1464 PhastaIOActiveFiles[i]->master_header + j*MAX_FIELDS_NAME_LENGTH+MAX_FIELDS_NAME_LENGTH*2+1, 1465 MAX_FIELDS_NAME_LENGTH-1 ); 1466 } 1467 1468 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1469 { 1470 token = strtok ( readouttag[j], ":" ); 1471 1472 //if ( cscompare( buffer, token ) ) 1473 if ( cscompare( token , buffer ) && cscompare( buffer, token ) ) 1474 // This double comparison is required for the field "number of nodes" and all the other fields that start with "number of nodes" (i.g. number of nodes in the mesh"). 1475 // Would be safer to rename "number of nodes" by "number of nodes in the part" so that the name are completely unique. But much more work to do that (Nspre, phParAdapt, etc). 1476 // Since the field name are unique in SyncIO (as it includes part ID), this should be safe and there should be no issue with the "?" trailing character. 1477 { 1478 PhastaIOActiveFiles[i]->read_field_count = j; 1479 FOUND = true; 1480 //printf("buffer: %s | token: %s | j: %d\n",buffer,token,j); 1481 break; 1482 } 1483 } 1484 free(buffer); 1485 1486 if (!FOUND) 1487 { 1488 //if(irank==0) printf("Warning readheader: Not found %s \n",keyphrase); //PhastaIOActiveFiles[i]->myrank is certainly initialized here. 1489 if(PhastaIOActiveFiles[i]->myrank == 0) printf("WARNING readheader: Not found %s\n",keyphrase); 1490 endTimer(&timer_end); 1491 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1492 return; 1493 } 1494 1495 // Find the part we want ... 1496 PhastaIOActiveFiles[i]->read_part_count = PhastaIOActiveFiles[i]->GPid - 1497 PhastaIOActiveFiles[i]->myrank * PhastaIOActiveFiles[i]->nppp - 1; 1498 1499 PhastaIOActiveFiles[i]->my_offset = 1500 PhastaIOActiveFiles[i]->my_read_table[PhastaIOActiveFiles[i]->read_field_count][PhastaIOActiveFiles[i]->read_part_count]; 1501 1502 // printf("****Rank %d offset is %d\n",PhastaIOActiveFiles[i]->myrank,PhastaIOActiveFiles[i]->my_offset); 1503 1504 // Read each datablock header here ... 1505 1506 MPI_File_read_at_all( PhastaIOActiveFiles[i]->file_handle, 1507 PhastaIOActiveFiles[i]->my_offset+1, 1508 read_out_tag, 1509 MAX_FIELDS_NAME_LENGTH-1, 1510 MPI_CHAR, 1511 &read_offset_status ); 1512 token = strtok ( read_out_tag, ":" ); 1513 1514 // printf("&&&&Rank %d read_out_tag is %s\n",PhastaIOActiveFiles[i]->myrank,read_out_tag); 1515 1516 if( cscompare( keyphrase , token ) ) //No need to compare also token with keyphrase like above. We should already have the right one. Otherwise there is a problem. 1517 { 1518 FOUND = true ; 1519 token = strtok( NULL, " ,;<>" ); 1520 skip_size = atoi( token ); 1521 for( j=0; j < *nItems && ( token = strtok( NULL," ,;<>") ); j++ ) 1522 valueListInt[j] = atoi( token ); 1523 1524 if ( j < *nItems ) 1525 { 1526 fprintf( stderr, "Expected # of ints not found for: %s\n", keyphrase ); 1527 } 1528 } 1529 else { 1530 //if(irank==0) 1531 if(PhastaIOActiveFiles[i]->myrank == 0) 1532 // If we enter this if, there is a problem with the name of some fields 1533 { 1534 printf("Error readheader: Unexpected mismatch between keyphrase = %s and token = %s\n",keyphrase,token); 1535 } 1536 } 1537 } 1538 1539 endTimer(&timer_end); 1540 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1541 1542 } 1543 1544 void readdatablock( int* fileDescriptor, 1545 const char keyphrase[], 1546 void* valueArray, 1547 int* nItems, 1548 const char datatype[], 1549 const char iotype[] ) 1550 { 1551 std::stringstream ss; 1552 ss << keyphrase << "@" << commRank()+1 << "?"; 1553 std::string s = ss.str(); 1554 keyphrase = s.c_str(); 1555 1556 //if(irank == 0) printf("entering readdatablock()\n"); 1557 unsigned long long data_size = 0; 1558 double timer_start, timer_end; 1559 startTimer(&timer_start); 1560 1561 int i = *fileDescriptor; 1562 checkFileDescriptor("readdatablock",&i); 1563 1564 if ( PhastaIONextActiveIndex == 0 ) { 1565 int filePtr = *fileDescriptor - 1; 1566 FILE* fileObject; 1567 char junk; 1568 1569 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1570 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1571 fprintf(stderr,"openfile_ function has to be called before\n") ; 1572 fprintf(stderr,"acessing the file\n ") ; 1573 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1574 endTimer(&timer_end); 1575 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1576 return; 1577 } 1578 1579 // error check.. 1580 // since we require that a consistant header always preceed the data block 1581 // let us check to see that it is actually the case. 1582 1583 if ( ! cscompare( LastHeaderKey[ filePtr ], keyphrase ) ) { 1584 fprintf(stderr, "Header not consistant with data block\n"); 1585 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ] ); 1586 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1587 fprintf(stderr, "Please recheck read sequence \n"); 1588 if( Strict_Error ) { 1589 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1590 endTimer(&timer_end); 1591 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1592 return; 1593 } 1594 } 1595 1596 if ( LastHeaderNotFound ) { 1597 endTimer(&timer_end); 1598 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1599 return; 1600 } 1601 fileObject = fileArray[ filePtr ]; 1602 Wrong_Endian = byte_order[ filePtr ]; 1603 1604 size_t type_size = typeSize( datatype ); 1605 int nUnits = *nItems; 1606 isBinary( iotype ); 1607 1608 if ( binary_format ) { 1609 fread( valueArray, type_size, nUnits, fileObject ); 1610 fread( &junk, sizeof(char), 1 , fileObject ); 1611 if ( Wrong_Endian ) SwapArrayByteOrder( valueArray, type_size, nUnits ); 1612 } else { 1613 1614 char* ts1 = StringStripper( datatype ); 1615 if ( cscompare( "integer", ts1 ) ) { 1616 for( int n=0; n < nUnits ; n++ ) 1617 fscanf(fileObject, "%d\n",(int*)((int*)valueArray+n) ); 1618 } else if ( cscompare( "double", ts1 ) ) { 1619 for( int n=0; n < nUnits ; n++ ) 1620 fscanf(fileObject, "%lf\n",(double*)((double*)valueArray+n) ); 1621 } 1622 free (ts1); 1623 } 1624 1625 //return; 1626 } 1627 else { 1628 // printf("read data block\n"); 1629 MPI_Status read_data_status; 1630 size_t type_size = typeSize( datatype ); 1631 int nUnits = *nItems; 1632 isBinary( iotype ); 1633 1634 // read datablock then 1635 //MR CHANGE 1636 // if ( cscompare ( datatype, "double")) 1637 char* ts2 = StringStripper( datatype ); 1638 if ( cscompare ( "double" , ts2)) 1639 //MR CHANGE END 1640 { 1641 1642 MPI_File_read_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1643 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1644 valueArray, 1645 nUnits, 1646 MPI_DOUBLE ); 1647 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1648 valueArray, 1649 &read_data_status ); 1650 data_size=8*nUnits; 1651 1652 } 1653 //MR CHANGE 1654 // else if ( cscompare ( datatype, "integer")) 1655 else if ( cscompare ( "integer" , ts2)) 1656 //MR CHANGE END 1657 { 1658 MPI_File_read_at_all_begin(PhastaIOActiveFiles[i]->file_handle, 1659 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1660 valueArray, 1661 nUnits, 1662 MPI_INT ); 1663 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1664 valueArray, 1665 &read_data_status ); 1666 data_size=4*nUnits; 1667 } 1668 else 1669 { 1670 *fileDescriptor = DATA_TYPE_ILLEGAL; 1671 printf("readdatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 1672 endTimer(&timer_end); 1673 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1674 return; 1675 } 1676 free(ts2); 1677 1678 1679 // printf("%d Read finishe\n",PhastaIOActiveFiles[i]->myrank); 1680 1681 // Swap data byte order if endianess is different ... 1682 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) 1683 { 1684 SwapArrayByteOrder( valueArray, type_size, nUnits ); 1685 } 1686 } 1687 1688 endTimer(&timer_end); 1689 char extra_msg[1024]; 1690 memset(extra_msg, '\0', 1024); 1691 char* key = StringStripper(keyphrase); 1692 sprintf(extra_msg, " field is %s ", key); 1693 printPerf("readdatablock", timer_start, timer_end, data_size, 1, extra_msg); 1694 free(key); 1695 1696 } 1697 1698 void writeheader( const int* fileDescriptor, 1699 const char keyphrase[], 1700 const void* valueArray, 1701 const int* nItems, 1702 const int* ndataItems, 1703 const char datatype[], 1704 const char iotype[]) 1705 { 1706 1707 //if(irank == 0) printf("entering writeheader()\n"); 1708 1709 double timer_start, timer_end; 1710 startTimer(&timer_start); 1711 1712 int i = *fileDescriptor; 1713 checkFileDescriptor("writeheader",&i); 1714 1715 if ( PhastaIONextActiveIndex == 0 ) { 1716 int filePtr = *fileDescriptor - 1; 1717 FILE* fileObject; 1718 1719 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1720 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1721 fprintf(stderr,"openfile_ function has to be called before \n") ; 1722 fprintf(stderr,"acessing the file\n ") ; 1723 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1724 endTimer(&timer_end); 1725 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1726 return; 1727 } 1728 1729 LastHeaderKey[ filePtr ] = const_cast< char* >( keyphrase ); 1730 DataSize = *ndataItems; 1731 fileObject = fileArray[ filePtr ] ; 1732 size_t type_size = typeSize( datatype ); 1733 isBinary( iotype ); 1734 header_type[ filePtr ] = type_size; 1735 1736 int _newline = ( *ndataItems > 0 ) ? sizeof( char ) : 0; 1737 int size_of_nextblock = 1738 ( binary_format ) ? type_size*( *ndataItems )+ _newline : *ndataItems ; 1739 1740 fprintf( fileObject, "%s : < %d > ", keyphrase, size_of_nextblock ); 1741 for( int i = 0; i < *nItems; i++ ) 1742 fprintf(fileObject, "%d ", *((int*)((int*)valueArray+i))); 1743 fprintf(fileObject, "\n"); 1744 1745 //return ; 1746 } 1747 else { // else it's parallel I/O 1748 DataSize = *ndataItems; 1749 size_t type_size = typeSize( datatype ); 1750 isBinary( iotype ); 1751 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1752 1753 int string_length = strlen( keyphrase ); 1754 char* buffer = (char*) malloc ( string_length+1 ); 1755 //char* buffer = ( char * ) malloc( string_length + 1 + pool_align ); 1756 //mem_address = (long long )buffer; 1757 //if( mem_address & (pool_align -1) ) 1758 // buffer += pool_align - (mem_address & (pool_align -1)); 1759 1760 strcpy ( buffer, keyphrase); 1761 buffer[ string_length ] = '\0'; 1762 1763 char* st2 = strtok ( buffer, "@" ); 1764 st2 = strtok (NULL, "@"); 1765 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1766 1767 if ( char* p = strpbrk(buffer, "@") ) 1768 *p = '\0'; 1769 1770 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1771 sprintf(mpi_tag, "\n%s : %d\n", buffer, PhastaIOActiveFiles[i]->field_count); 1772 unsigned long long offset_value; 1773 1774 int temp = *ndataItems; 1775 unsigned long long number_of_items = (unsigned long long)temp; 1776 MPI_Barrier(PhastaIOActiveFiles[i]->local_comm); 1777 1778 MPI_Scan( &number_of_items, 1779 &offset_value, 1780 1, 1781 MPI_LONG_LONG_INT, 1782 MPI_SUM, 1783 PhastaIOActiveFiles[i]->local_comm ); 1784 1785 offset_value = (offset_value - number_of_items) * type_size; 1786 1787 offset_value += PhastaIOActiveFiles[i]->local_myrank * 1788 DB_HEADER_SIZE + 1789 PhastaIOActiveFiles[i]->next_start_address; 1790 // This offset is the starting address of each datablock header... 1791 PhastaIOActiveFiles[i]->my_offset = offset_value; 1792 1793 // Write in my offset table ... 1794 PhastaIOActiveFiles[i]->my_offset_table[PhastaIOActiveFiles[i]->field_count][PhastaIOActiveFiles[i]->part_count] = 1795 PhastaIOActiveFiles[i]->my_offset; 1796 1797 // Update the next-start-address ... 1798 PhastaIOActiveFiles[i]->next_start_address = offset_value + 1799 number_of_items * type_size + 1800 DB_HEADER_SIZE; 1801 MPI_Bcast( &(PhastaIOActiveFiles[i]->next_start_address), 1802 1, 1803 MPI_LONG_LONG_INT, 1804 PhastaIOActiveFiles[i]->local_numprocs-1, 1805 PhastaIOActiveFiles[i]->local_comm ); 1806 1807 // Prepare datablock header ... 1808 int _newline = (*ndataItems>0)?sizeof(char):0; 1809 unsigned int size_of_nextblock = type_size * (*ndataItems) + _newline; 1810 1811 //char datablock_header[255]; 1812 //bzero((void*)datablock_header,255); 1813 char datablock_header[DB_HEADER_SIZE]; 1814 bzero((void*)datablock_header,DB_HEADER_SIZE); 1815 1816 PhastaIOActiveFiles[i]->GPid = PhastaIOActiveFiles[i]->nppp*PhastaIOActiveFiles[i]->myrank+PhastaIOActiveFiles[i]->part_count; 1817 sprintf( datablock_header, 1818 "\n%s : < %u >", 1819 keyphrase, 1820 size_of_nextblock ); 1821 1822 for ( int j = 0; j < *nItems; j++ ) 1823 { 1824 sprintf( datablock_header, 1825 "%s %d ", 1826 datablock_header, 1827 *((int*)((int*)valueArray+j))); 1828 } 1829 sprintf( datablock_header, 1830 "%s\n ", 1831 datablock_header ); 1832 1833 // Write datablock header ... 1834 //MR CHANGE 1835 // if ( cscompare(datatype,"double") ) 1836 char* ts1 = StringStripper( datatype ); 1837 if ( cscompare("double",ts1) ) 1838 //MR CHANGE END 1839 { 1840 free ( PhastaIOActiveFiles[i]->double_chunk ); 1841 PhastaIOActiveFiles[i]->double_chunk = ( double * )malloc( (sizeof( double )*number_of_items+ DB_HEADER_SIZE)); 1842 //PhastaIOActiveFiles[i]->double_chunk = ( double * ) malloc( sizeof( double )*number_of_items+ DB_HEADER_SIZE + pool_align ); 1843 //mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 1844 //if( mem_address & (pool_align -1) ) 1845 // PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 1846 1847 double * aa = ( double * )datablock_header; 1848 memcpy(PhastaIOActiveFiles[i]->double_chunk, aa, DB_HEADER_SIZE); 1849 } 1850 //MR CHANGE 1851 // if ( cscompare(datatype,"integer") ) 1852 else if ( cscompare("integer",ts1) ) 1853 //MR CHANGE END 1854 { 1855 free ( PhastaIOActiveFiles[i]->int_chunk ); 1856 PhastaIOActiveFiles[i]->int_chunk = ( int * )malloc( (sizeof( int )*number_of_items+ DB_HEADER_SIZE)); 1857 //PhastaIOActiveFiles[i]->int_chunk = ( int * ) malloc( sizeof( int )*number_of_items+ DB_HEADER_SIZE + pool_align ); 1858 //mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 1859 //if( mem_address & (pool_align -1) ) 1860 // PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & ( pool_align -1)); 1861 1862 int * aa = ( int * )datablock_header; 1863 memcpy(PhastaIOActiveFiles[i]->int_chunk, aa, DB_HEADER_SIZE); 1864 } 1865 else { 1866 // *fileDescriptor = DATA_TYPE_ILLEGAL; 1867 printf("writeheader - DATA_TYPE_ILLEGAL - %s\n",datatype); 1868 endTimer(&timer_end); 1869 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1870 return; 1871 } 1872 free(ts1); 1873 1874 PhastaIOActiveFiles[i]->part_count++; 1875 if ( PhastaIOActiveFiles[i]->part_count == PhastaIOActiveFiles[i]->nppp ) { 1876 //A new field will be written 1877 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1878 memcpy( PhastaIOActiveFiles[i]->master_header + 1879 PhastaIOActiveFiles[i]->field_count * 1880 MAX_FIELDS_NAME_LENGTH + 1881 MAX_FIELDS_NAME_LENGTH * 2, 1882 mpi_tag, 1883 MAX_FIELDS_NAME_LENGTH-1); 1884 } 1885 PhastaIOActiveFiles[i]->field_count++; 1886 PhastaIOActiveFiles[i]->part_count=0; 1887 } 1888 free(buffer); 1889 } 1890 1891 endTimer(&timer_end); 1892 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1893 } 1894 1895 void writedatablock( const int* fileDescriptor, 1896 const char keyphrase[], 1897 const void* valueArray, 1898 const int* nItems, 1899 const char datatype[], 1900 const char iotype[] ) 1901 { 1902 //if(irank == 0) printf("entering writedatablock()\n"); 1903 1904 unsigned long long data_size = 0; 1905 double timer_start, timer_end; 1906 startTimer(&timer_start); 1907 1908 int i = *fileDescriptor; 1909 checkFileDescriptor("writedatablock",&i); 1910 1911 if ( PhastaIONextActiveIndex == 0 ) { 1912 int filePtr = *fileDescriptor - 1; 1913 1914 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1915 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1916 fprintf(stderr,"openfile_ function has to be called before \n") ; 1917 fprintf(stderr,"acessing the file\n ") ; 1918 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1919 endTimer(&timer_end); 1920 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1921 return; 1922 } 1923 // since we require that a consistant header always preceed the data block 1924 // let us check to see that it is actually the case. 1925 1926 if ( ! cscompare( LastHeaderKey[ filePtr ], keyphrase ) ) { 1927 fprintf(stderr, "Header not consistant with data block\n"); 1928 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ] ); 1929 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1930 fprintf(stderr, "Please recheck write sequence \n"); 1931 if( Strict_Error ) { 1932 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1933 endTimer(&timer_end); 1934 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1935 return; 1936 } 1937 } 1938 1939 FILE* fileObject = fileArray[ filePtr ] ; 1940 size_t type_size=typeSize( datatype ); 1941 isBinary( iotype ); 1942 1943 if ( header_type[filePtr] != (int)type_size ) { 1944 fprintf(stderr,"header and datablock differ on typeof data in the block for\n"); 1945 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1946 if( Strict_Error ) { 1947 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1948 endTimer(&timer_end); 1949 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1950 return; 1951 } 1952 } 1953 1954 int nUnits = *nItems; 1955 1956 if ( nUnits != DataSize ) { 1957 fprintf(stderr,"header and datablock differ on number of data items for\n"); 1958 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1959 if( Strict_Error ) { 1960 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1961 endTimer(&timer_end); 1962 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1963 return; 1964 } 1965 } 1966 1967 if ( binary_format ) { 1968 1969 fwrite( valueArray, type_size, nUnits, fileObject ); 1970 fprintf( fileObject,"\n"); 1971 1972 } else { 1973 1974 char* ts1 = StringStripper( datatype ); 1975 if ( cscompare( "integer", ts1 ) ) { 1976 for( int n=0; n < nUnits ; n++ ) 1977 fprintf(fileObject,"%d\n",*((int*)((int*)valueArray+n))); 1978 } else if ( cscompare( "double", ts1 ) ) { 1979 for( int n=0; n < nUnits ; n++ ) 1980 fprintf(fileObject,"%lf\n",*((double*)((double*)valueArray+n))); 1981 } 1982 free (ts1); 1983 } 1984 //return ; 1985 } 1986 else { // syncIO case 1987 MPI_Status write_data_status; 1988 isBinary( iotype ); 1989 int nUnits = *nItems; 1990 1991 //MR CHANGE 1992 // if ( cscompare(datatype,"double") ) 1993 char* ts1 = StringStripper( datatype ); 1994 if ( cscompare("double",ts1) ) 1995 //MR CHANGE END 1996 { 1997 memcpy((PhastaIOActiveFiles[i]->double_chunk+DB_HEADER_SIZE/sizeof(double)), valueArray, nUnits*sizeof(double)); 1998 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1999 PhastaIOActiveFiles[i]->my_offset, 2000 PhastaIOActiveFiles[i]->double_chunk, 2001 //BLOCK_SIZE/sizeof(double), 2002 nUnits+DB_HEADER_SIZE/sizeof(double), 2003 MPI_DOUBLE ); 2004 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 2005 PhastaIOActiveFiles[i]->double_chunk, 2006 &write_data_status ); 2007 data_size=8*nUnits; 2008 } 2009 //MR CHANGE 2010 // else if ( cscompare ( datatype, "integer")) 2011 else if ( cscompare("integer",ts1) ) 2012 //MR CHANGE END 2013 { 2014 memcpy((PhastaIOActiveFiles[i]->int_chunk+DB_HEADER_SIZE/sizeof(int)), valueArray, nUnits*sizeof(int)); 2015 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 2016 PhastaIOActiveFiles[i]->my_offset, 2017 PhastaIOActiveFiles[i]->int_chunk, 2018 nUnits+DB_HEADER_SIZE/sizeof(int), 2019 MPI_INT ); 2020 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 2021 PhastaIOActiveFiles[i]->int_chunk, 2022 &write_data_status ); 2023 data_size=4*nUnits; 2024 } 2025 else { 2026 printf("Error: writedatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 2027 endTimer(&timer_end); 2028 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 2029 return; 2030 } 2031 free(ts1); 2032 } 2033 2034 endTimer(&timer_end); 2035 char extra_msg[1024]; 2036 memset(extra_msg, '\0', 1024); 2037 char* key = StringStripper(keyphrase); 2038 sprintf(extra_msg, " field is %s ", key); 2039 printPerf("writedatablock", timer_start, timer_end, data_size, 1, extra_msg); 2040 free(key); 2041 2042 } 2043 2044 void 2045 SwapArrayByteOrder( void* array, 2046 int nbytes, 2047 int nItems ) 2048 { 2049 /* This swaps the byte order for the array of nItems each 2050 of size nbytes , This will be called only locally */ 2051 int i,j; 2052 unsigned char* ucDst = (unsigned char*)array; 2053 2054 for(i=0; i < nItems; i++) { 2055 for(j=0; j < (nbytes/2); j++) 2056 std::swap( ucDst[j] , ucDst[(nbytes - 1) - j] ); 2057 ucDst += nbytes; 2058 } 2059 } 2060 2061 void 2062 writestring( int* fileDescriptor, 2063 const char inString[] ) 2064 { 2065 2066 int filePtr = *fileDescriptor - 1; 2067 FILE* fileObject = fileArray[filePtr] ; 2068 fprintf(fileObject,"%s",inString ); 2069 return; 2070 } 2071 2072 void 2073 Gather_Headers( int* fileDescriptor, 2074 vector< string >& headers ) 2075 { 2076 2077 FILE* fileObject; 2078 char Line[1024]; 2079 2080 fileObject = fileArray[ (*fileDescriptor)-1 ]; 2081 2082 while( !feof(fileObject) ) { 2083 fgets( Line, 1024, fileObject); 2084 if ( Line[0] == '#' ) { 2085 headers.push_back( Line ); 2086 } else { 2087 break; 2088 } 2089 } 2090 rewind( fileObject ); 2091 clearerr( fileObject ); 2092 } 2093 void 2094 isWrong( void ) { (Wrong_Endian) ? fprintf(stdout,"YES\n"): fprintf(stdout,"NO\n") ; } 2095 2096 void 2097 togglestrictmode( void ) { Strict_Error = !Strict_Error; } 2098 2099 int 2100 isLittleEndian( void ) 2101 { 2102 // this function returns a 1 if the current running architecture is 2103 // LittleEndian Byte Ordered, else it returns a zero 2104 2105 union { 2106 long a; 2107 char c[sizeof( long )]; 2108 } endianUnion; 2109 2110 endianUnion.a = 1 ; 2111 2112 if ( endianUnion.c[sizeof(long)-1] != 1 ) return 1 ; 2113 else return 0; 2114 } 2115 2116 namespace PHASTA { 2117 const char* const PhastaIO_traits<int>::type_string = "integer"; 2118 const char* const PhastaIO_traits<double>::type_string = "double"; 2119 } 2120