1 /* 2 * 3 * This is the SyncIO library that uses MPI-IO collective functions to 4 * implement a flexible I/O checkpoint solution for a large number of 5 * processors. 6 * 7 * Previous developer: Ning Liu (liun2@cs.rpi.edu) 8 * Jing Fu (fuj@cs.rpi.edu) 9 * Current developers: Michel Rasquin (Michel.Rasquin@colorado.edu), 10 * Ben Matthews (benjamin.a.matthews@colorado.edu) 11 * 12 */ 13 14 #include <map> 15 #include <vector> 16 #include <string> 17 #include <string.h> 18 #include <ctype.h> 19 #include <stdlib.h> 20 #include <stdio.h> 21 #include <math.h> 22 #include <sstream> 23 #include "phastaIO.h" 24 #include "mpi.h" 25 #include "phiotmrc.h" 26 27 #define VERSION_INFO_HEADER_SIZE 8192 28 #define DB_HEADER_SIZE 1024 29 #define ONE_MEGABYTE 1048576 30 #define TWO_MEGABYTE 2097152 31 #define ENDIAN_TEST_NUMBER 12180 // Troy's Zip Code!! 32 #define MAX_PHASTA_FILES 64 33 #define MAX_PHASTA_FILE_NAME_LENGTH 1024 34 #define MAX_FIELDS_NUMBER ((VERSION_INFO_HEADER_SIZE/MAX_FIELDS_NAME_LENGTH)-4) // The meta data include - MPI_IO_Tag, nFields, nFields*names of the fields, nppf 35 // -3 for MPI_IO_Tag, nFields and nppf, -4 for extra security (former nFiles) 36 #define MAX_FIELDS_NAME_LENGTH 128 37 #define DefaultMHSize (4*ONE_MEGABYTE) 38 //#define DefaultMHSize (8350) //For test 39 #define LATEST_WRITE_VERSION 1 40 #define inv1024sq 953.674316406e-9 // = 1/1024/1024 41 int MasterHeaderSize = -1; 42 43 bool PRINT_PERF = false; // default print no perf results 44 int irank = -1; // global rank, should never be manually manipulated 45 int mysize = -1; 46 47 // Static variables are bad but used here to store the subcommunicator and associated variables 48 // Prevent MPI_Comm_split to be called more than once, especially on BGQ with the V1R2M1 driver (leak detected in MPI_Comm_split - IBM working on it) 49 static int s_assign_local_comm = 0; 50 static MPI_Comm s_local_comm; 51 static int s_local_size = -1; 52 static int s_local_rank = -1; 53 54 //unsigned long long pool_align = 8; 55 //unsigned long long mem_address; 56 57 // the following defines are for debug printf 58 #define PHASTAIO_PREFIX "phastaIO debug: " 59 #define PHASTAIO_DEBUG 0 //default to not print any debugging info 60 61 #if PHASTAIO_DEBUG 62 #define phprintf( s, arg...) printf(PHASTAIO_PREFIX s "\n", ##arg) 63 #define phprintf_0( s, arg...) do{ \ 64 MPI_Comm_rank(MPI_COMM_WORLD, &irank); \ 65 if(irank == 0){ \ 66 printf(PHASTAIO_PREFIX "irank=0: " s "\n", ##arg); \ 67 } \ 68 } while(0) 69 #else 70 #define phprintf( s, arg...) 71 #define phprintf_0( s, arg...) 72 #endif 73 74 enum PhastaIO_Errors 75 { 76 MAX_PHASTA_FILES_EXCEEDED = -1, 77 UNABLE_TO_OPEN_FILE = -2, 78 NOT_A_MPI_FILE = -3, 79 GPID_EXCEEDED = -4, 80 DATA_TYPE_ILLEGAL = -5, 81 }; 82 83 using namespace std; 84 85 namespace{ 86 87 map<int, std::string> LastHeaderKey; 88 vector< FILE* > fileArray; 89 vector< bool > byte_order; 90 vector< int > header_type; 91 int DataSize=0; 92 bool LastHeaderNotFound = false; 93 bool Wrong_Endian = false ; 94 bool Strict_Error = false ; 95 bool binary_format = true; 96 97 /***********************************************************************/ 98 /***************** NEW PHASTA IO CODE STARTS HERE **********************/ 99 /***********************************************************************/ 100 101 typedef struct 102 { 103 char filename[MAX_PHASTA_FILE_NAME_LENGTH]; /* defafults to 1024 */ 104 unsigned long long my_offset; 105 unsigned long long next_start_address; 106 unsigned long long **my_offset_table; 107 unsigned long long **my_read_table; 108 109 double * double_chunk; 110 double * read_double_chunk; 111 112 int field_count; 113 int part_count; 114 int read_field_count; 115 int read_part_count; 116 int GPid; 117 int start_id; 118 119 int mhsize; 120 121 int myrank; 122 int numprocs; 123 int local_myrank; 124 int local_numprocs; 125 126 int nppp; 127 int nPPF; 128 int nFiles; 129 int nFields; 130 131 int * int_chunk; 132 int * read_int_chunk; 133 134 int Wrong_Endian; /* default to false */ 135 char * master_header; 136 MPI_File file_handle; 137 MPI_Comm local_comm; 138 } phastaio_file_t; 139 140 typedef struct 141 { 142 //unsigned long long my_offset; 143 //unsigned long long **offset_table; 144 //int fileID; 145 int nppf, nfields; 146 //int GPid; 147 //int read_field_count; 148 char * masterHeader; 149 }serial_file; 150 151 serial_file *SerialFile; 152 phastaio_file_t *PhastaIOActiveFiles[MAX_PHASTA_FILES]; 153 int PhastaIONextActiveIndex = 0; /* indicates next index to allocate */ 154 155 // the caller has the responsibility to delete the returned string 156 // TODO: StringStipper("nbc value? ") returns NULL? 157 char* 158 StringStripper( const char istring[] ) { 159 160 int length = strlen( istring ); 161 162 //char* dest = new char [ length + 1 ]; 163 char* dest = (char *)malloc( length + 1 ); 164 165 //char * dest; 166 //dest = (char *)malloc( length + 1 + pool_align ); 167 //if (dest & 0x0F != 0) printf("not aligned!!!!\n"); 168 //mem_address = (long long )dest; 169 //if( mem_address & (pool_align -1) ) 170 // dest += pool_align - (mem_address & (pool_align -1)); 171 172 strcpy( dest, istring ); 173 dest[ length ] = '\0'; 174 175 if ( char* p = strpbrk( dest, " ") ) 176 *p = '\0'; 177 178 return dest; 179 } 180 181 inline int 182 cscompare( const char teststring[], 183 const char targetstring[] ) { 184 185 char* s1 = const_cast<char*>(teststring); 186 char* s2 = const_cast<char*>(targetstring); 187 188 while( *s1 == ' ') s1++; 189 while( *s2 == ' ') s2++; 190 while( ( *s1 ) 191 && ( *s2 ) 192 && ( *s2 != '?') 193 && ( tolower( *s1 )==tolower( *s2 ) ) ) { 194 s1++; 195 s2++; 196 while( *s1 == ' ') s1++; 197 while( *s2 == ' ') s2++; 198 } 199 if ( !( *s1 ) || ( *s1 == '?') ) return 1; 200 else return 0; 201 } 202 203 inline void 204 isBinary( const char iotype[] ) { 205 206 char* fname = StringStripper( iotype ); 207 if ( cscompare( fname, "binary" ) ) binary_format = true; 208 else binary_format = false; 209 free (fname); 210 211 } 212 213 inline size_t 214 typeSize( const char typestring[] ) { 215 216 char* ts1 = StringStripper( typestring ); 217 218 if ( cscompare( "integer", ts1 ) ) { 219 free (ts1); 220 return sizeof(int); 221 } else if ( cscompare( "double", ts1 ) ) { 222 free (ts1); 223 return sizeof( double ); 224 } else { 225 free (ts1); 226 fprintf(stderr,"unknown type : %s\n",ts1); 227 return 0; 228 } 229 } 230 231 int 232 readHeader( FILE* fileObject, 233 const char phrase[], 234 int* params, 235 int expect ) { 236 237 char* text_header; 238 char* token; 239 char Line[1024]; 240 char junk; 241 bool FOUND = false ; 242 int real_length; 243 int skip_size, integer_value; 244 int rewind_count=0; 245 246 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 247 rewind( fileObject ); 248 clearerr( fileObject ); 249 rewind_count++; 250 fgets( Line, 1024, fileObject ); 251 } 252 253 while( !FOUND && ( rewind_count < 2 ) ) { 254 if ( ( Line[0] != '\n' ) && ( real_length = strcspn( Line, "#" )) ){ 255 //text_header = new char [ real_length + 1 ]; 256 text_header = (char *)malloc( real_length + 1 ); 257 //text_header = (char *)malloc( real_length + 1 + pool_align ); 258 //mem_address = (long long )text_header; 259 //if( mem_address & (pool_align -1) ) 260 // text_header += pool_align - (mem_address & (pool_align -1)); 261 262 strncpy( text_header, Line, real_length ); 263 text_header[ real_length ] =static_cast<char>(NULL); 264 token = strtok ( text_header, ":" ); 265 //if( cscompare( phrase , token ) ) { 266 // Double comparison required because different fields can still start 267 // with the same name for mixed meshes (nbc code, nbc values, etc). 268 // Would be easy to fix cscompare instead but would it break sth else? 269 if( cscompare( phrase , token ) && cscompare( token , phrase ) ) { 270 FOUND = true ; 271 token = strtok( NULL, " ,;<>" ); 272 skip_size = atoi( token ); 273 int i; 274 for( i=0; i < expect && ( token = strtok( NULL," ,;<>") ); i++) { 275 params[i] = atoi( token ); 276 } 277 if ( i < expect ) { 278 fprintf(stderr,"Aloha Expected # of ints not found for: %s\n",phrase ); 279 } 280 } else if ( cscompare(token,"byteorder magic number") ) { 281 if ( binary_format ) { 282 fread((void*)&integer_value,sizeof(int),1,fileObject); 283 fread( &junk, sizeof(char), 1 , fileObject ); 284 if ( 362436 != integer_value ) Wrong_Endian = true; 285 } else{ 286 fscanf(fileObject, "%d\n", &integer_value ); 287 } 288 } else { 289 /* some other header, so just skip over */ 290 token = strtok( NULL, " ,;<>" ); 291 skip_size = atoi( token ); 292 if ( binary_format) 293 fseek( fileObject, skip_size, SEEK_CUR ); 294 else 295 for( int gama=0; gama < skip_size; gama++ ) 296 fgets( Line, 1024, fileObject ); 297 } 298 free (text_header); 299 } // end of if before while loop 300 301 if ( !FOUND ) 302 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 303 rewind( fileObject ); 304 clearerr( fileObject ); 305 rewind_count++; 306 fgets( Line, 1024, fileObject ); 307 } 308 } 309 310 if ( !FOUND ) { 311 //fprintf(stderr, "Error: Could not find: %s\n", phrase); 312 if(irank == 0) printf("WARNING: Could not find: %s\n", phrase); 313 return 1; 314 } 315 return 0; 316 } 317 318 } // end unnamed namespace 319 320 321 // begin of publicly visible functions 322 323 /** 324 * This function takes a long long pointer and assign (start) phiotmrc value to it 325 */ 326 //void startTimer(unsigned long long* start) { 327 void startTimer(double* start) { 328 329 if( !PRINT_PERF ) return; 330 331 MPI_Barrier(MPI_COMM_WORLD); 332 *start = phiotmrc(); 333 } 334 335 /** 336 * This function takes a long long pointer and assign (end) phiotmrc value to it 337 */ 338 void endTimer(double* end) { 339 340 if( !PRINT_PERF ) return; 341 342 *end = phiotmrc(); 343 MPI_Barrier(MPI_COMM_WORLD); 344 } 345 346 /** 347 * choose to print some performance results (or not) according to 348 * the PRINT_PERF macro 349 */ 350 void printPerf( 351 const char* func_name, 352 double start, 353 double end, 354 unsigned long long datasize, 355 int printdatainfo, 356 const char* extra_msg) { 357 358 if( !PRINT_PERF ) return; 359 360 unsigned long long data_size = datasize; 361 362 double time = end - start; 363 364 unsigned long long isizemin,isizemax,isizetot; 365 double sizemin,sizemax,sizeavg,sizetot,rate; 366 double tmin, tmax, tavg, ttot; 367 368 MPI_Allreduce(&time, &tmin,1, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD); 369 MPI_Allreduce(&time, &tmax,1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); 370 MPI_Allreduce(&time, &ttot,1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); 371 tavg = ttot/mysize; 372 373 if(irank == 0) { 374 if ( PhastaIONextActiveIndex == 0 ) printf("** 1PFPP "); 375 else printf("** syncIO "); 376 printf("%s(): Tmax = %f sec, Tmin = %f sec, Tavg = %f sec", func_name, tmax, tmin, tavg); 377 } 378 379 if(printdatainfo == 1) { // if printdatainfo ==1, compute I/O rate and block size 380 MPI_Allreduce(&data_size,&isizemin,1,MPI_LONG_LONG_INT,MPI_MIN,MPI_COMM_WORLD); 381 MPI_Allreduce(&data_size,&isizemax,1,MPI_LONG_LONG_INT,MPI_MAX,MPI_COMM_WORLD); 382 MPI_Allreduce(&data_size,&isizetot,1,MPI_LONG_LONG_INT,MPI_SUM,MPI_COMM_WORLD); 383 384 sizemin=(double)(isizemin*inv1024sq); 385 sizemax=(double)(isizemax*inv1024sq); 386 sizetot=(double)(isizetot*inv1024sq); 387 sizeavg=(double)(1.0*sizetot/mysize); 388 rate=(double)(1.0*sizetot/tmax); 389 390 if( irank == 0) { 391 printf(", Rate = %f MB/s [%s] \n \t\t\t block size: Min= %f MB; Max= %f MB; Avg= %f MB; Tot= %f MB\n", rate, extra_msg, sizemin,sizemax,sizeavg,sizetot); 392 } 393 } 394 else { 395 if(irank == 0) { 396 printf(" \n"); 397 //printf(" (%s) \n", extra_msg); 398 } 399 } 400 } 401 402 /** 403 * This function is normally called at the beginning of a read operation, before 404 * init function. 405 * This function (uses rank 0) reads out nfields, nppf, master header size, 406 * endianess and allocates for masterHeader string. 407 * These values are essential for following read operations. Rank 0 will bcast 408 * these values to other ranks in the commm world 409 * 410 * If the file set is of old POSIX format, it would throw error and exit 411 */ 412 void queryphmpiio(const char filename[],int *nfields, int *nppf) 413 { 414 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 415 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 416 417 if(irank == 0) { 418 FILE * fileHandle; 419 char* fname = StringStripper( filename ); 420 421 fileHandle = fopen (fname,"rb"); 422 if (fileHandle == NULL ) { 423 printf("\nError: File %s doesn't exist! Please check!\n",fname); 424 } 425 else { 426 SerialFile =(serial_file *)calloc( 1, sizeof( serial_file) ); 427 //SerialFile = (serial_file *)malloc( sizeof( serial_file ) + pool_align ); 428 //mem_address = (long long )SerialFile; 429 //if( mem_address & (pool_align -1) ) 430 // SerialFile += pool_align - (mem_address & (pool_align -1)); 431 432 //SerialFile->masterHeader = (char *)malloc(MasterHeaderSize); 433 //int meta_size_limit = 4*ONE_MEGABYTE; 434 //int meta_size_limit = (4+ MAX_FIELDS_NUMBER ) * MAX_FIELDS_NAME_LENGTH; 435 int meta_size_limit = VERSION_INFO_HEADER_SIZE; 436 SerialFile->masterHeader = (char *)malloc( meta_size_limit ); 437 //SerialFile->masterHeader = (char *)malloc( meta_size_limit + pool_align ); 438 //mem_address = (long long )SerialFile; 439 //if( mem_address & (pool_align -1) ) 440 // SerialFile->masterHeader += pool_align - (mem_address & (pool_align -1)); 441 442 fread(SerialFile->masterHeader, 1, meta_size_limit, fileHandle); 443 444 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 445 char version[MAX_FIELDS_NAME_LENGTH/4]; 446 int mhsize; 447 char * token; 448 int magic_number; 449 450 memcpy( read_out_tag, 451 SerialFile->masterHeader, 452 MAX_FIELDS_NAME_LENGTH-1 ); 453 454 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) { 455 // Test endianess ... 456 memcpy (&magic_number, 457 SerialFile->masterHeader + sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 458 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 459 460 if ( magic_number != ENDIAN_TEST_NUMBER ) { 461 printf("Endian is different!\n"); 462 // Will do swap later 463 } 464 465 // test version, old version, default masterheader size is 4M 466 // newer version masterheader size is read from first line 467 memcpy(version, 468 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/2, 469 MAX_FIELDS_NAME_LENGTH/4 - 1); //TODO: why -1? 470 471 if( cscompare ("version",version) ) { 472 // if there is "version" tag in the file, then it is newer format 473 // read master header size from here, otherwise use default 474 // Note: if version is "1", we know mhsize is at 3/4 place... 475 476 token = strtok(version, ":"); 477 token = strtok(NULL, " ,;<>" ); 478 int iversion = atoi(token); 479 480 if( iversion == 1) { 481 memcpy( &mhsize, 482 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/4*3 + sizeof("mhsize : ")-1, 483 sizeof(int)); 484 if ( magic_number != ENDIAN_TEST_NUMBER ) 485 SwapArrayByteOrder(&mhsize, sizeof(int), 1); 486 487 if( mhsize > DefaultMHSize ) { 488 //if actual headersize is larger than default, let's re-read 489 free(SerialFile->masterHeader); 490 SerialFile->masterHeader = (char *)malloc(mhsize); 491 fseek(fileHandle, 0, SEEK_SET); // reset the file stream position 492 fread(SerialFile->masterHeader,1,mhsize,fileHandle); 493 } 494 } 495 //TODO: check if this is a valid int?? 496 MasterHeaderSize = mhsize; 497 } 498 else { // else it's version 0's format w/o version tag, implicating MHSize=4M 499 MasterHeaderSize = DefaultMHSize; 500 } 501 502 memcpy( read_out_tag, 503 SerialFile->masterHeader+MAX_FIELDS_NAME_LENGTH+1, 504 MAX_FIELDS_NAME_LENGTH ); //TODO: why +1 505 506 // Read in # fields ... 507 token = strtok( read_out_tag, ":" ); 508 token = strtok( NULL," ,;<>" ); 509 *nfields = atoi( token ); 510 if ( *nfields > MAX_FIELDS_NUMBER) { 511 printf("Error queryphmpiio: nfields is larger than MAX_FIELDS_NUMBER!\n"); 512 } 513 SerialFile->nfields=*nfields; //TODO: sanity check of this int? 514 515 memcpy( read_out_tag, 516 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH * 2 517 + *nfields * MAX_FIELDS_NAME_LENGTH, 518 MAX_FIELDS_NAME_LENGTH); 519 520 token = strtok( read_out_tag, ":" ); 521 token = strtok( NULL," ,;<>" ); 522 *nppf = atoi( token ); 523 SerialFile->nppf=*nppf; //TODO: sanity check of int 524 } // end of if("MPI_IO_TAG") 525 else { 526 printf("Error queryphmpiio: The file you opened is not of syncIO new format, please check! read_out_tag = %s\n",read_out_tag); 527 exit(1); 528 } 529 fclose(fileHandle); 530 free(SerialFile->masterHeader); 531 free(SerialFile); 532 } //end of else 533 free(fname); 534 } 535 536 // Bcast value to every one 537 MPI_Bcast( nfields, 1, MPI_INT, 0, MPI_COMM_WORLD ); 538 MPI_Bcast( nppf, 1, MPI_INT, 0, MPI_COMM_WORLD ); 539 MPI_Bcast( &MasterHeaderSize, 1, MPI_INT, 0, MPI_COMM_WORLD ); 540 phprintf("Info queryphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 541 } 542 543 /** 544 * This function computes the right master header size (round to size of 2^n). 545 * This is only needed for file format version 1 in "write" mode. 546 */ 547 int computeMHSize(int nfields, int nppf, int version) { 548 int mhsize; 549 if(version == 1) { 550 //int meta_info_size = (2+nfields+1) * MAX_FIELDS_NAME_LENGTH; // 2 is MPI_IO_TAG and nFields, the others 1 is nppf 551 int meta_info_size = VERSION_INFO_HEADER_SIZE; 552 int actual_size = nfields * nppf * sizeof(long long) + meta_info_size; 553 //printf("actual_size = %d, offset table size = %d\n", actual_size, nfields * nppf * sizeof(long long)); 554 if (actual_size > DefaultMHSize) { 555 mhsize = (int) ceil( (double) actual_size/DefaultMHSize); // it's rounded to ceiling of this value 556 mhsize *= DefaultMHSize; 557 } 558 else { 559 mhsize = DefaultMHSize; 560 } 561 } 562 return mhsize; 563 } 564 565 /** 566 * Computes correct color of a rank according to number of files. 567 */ 568 extern "C" int computeColor( int myrank, int numprocs, int nfiles) { 569 int color = 570 (int)(myrank / (numprocs / nfiles)); 571 return color; 572 } 573 574 575 /** 576 * Check the file descriptor. 577 */ 578 void checkFileDescriptor(const char fctname[], 579 int* fileDescriptor ) { 580 if ( *fileDescriptor < 0 ) { 581 printf("Error: File descriptor = %d in %s\n",*fileDescriptor,fctname); 582 exit(1); 583 } 584 } 585 586 /** 587 * Initialize the file struct members and allocate space for file struct 588 * buffers. 589 * 590 * Note: this function is only called when we are using new format. Old POSIX 591 * format should skip this routine and call openfile() directly instead. 592 */ 593 int initphmpiio( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[]) 594 { 595 // we init irank again in case query not called (e.g. syncIO write case) 596 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 597 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 598 599 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 600 601 double timer_start, timer_end; 602 startTimer(&timer_start); 603 604 char* imode = StringStripper( mode ); 605 606 // Note: if it's read, we presume query was called prior to init and 607 // MasterHeaderSize is already set to correct value from parsing header 608 // otherwise it's write then it needs some computation to be set 609 if ( cscompare( "read", imode ) ) { 610 // do nothing 611 } 612 else if( cscompare( "write", imode ) ) { 613 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 614 } 615 else { 616 printf("Error initphmpiio: can't recognize the mode %s", imode); 617 exit(1); 618 } 619 free ( imode ); 620 621 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 622 623 int i, j; 624 625 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 626 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 627 endTimer(&timer_end); 628 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 629 return MAX_PHASTA_FILES_EXCEEDED; 630 } 631 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 632 // { 633 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 634 // { 635 // PhastaIOActiveFiles[i] = NULL; 636 // } 637 // } 638 639 640 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 641 //PhastaIOActiveFiles[PhastaIONextActiveIndex] = ( phastaio_file_t *)calloc( 1 + 1, sizeof( phastaio_file_t ) ); 642 //mem_address = (long long )PhastaIOActiveFiles[PhastaIONextActiveIndex]; 643 //if( mem_address & (pool_align -1) ) 644 // PhastaIOActiveFiles[PhastaIONextActiveIndex] += pool_align - (mem_address & (pool_align -1)); 645 646 i = PhastaIONextActiveIndex; 647 PhastaIONextActiveIndex++; 648 649 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 650 651 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 652 653 PhastaIOActiveFiles[i]->Wrong_Endian = false; 654 655 PhastaIOActiveFiles[i]->nFields = *nfields; 656 PhastaIOActiveFiles[i]->nPPF = *nppf; 657 PhastaIOActiveFiles[i]->nFiles = *nfiles; 658 MPI_Comm_rank(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->myrank)); 659 MPI_Comm_size(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->numprocs)); 660 661 662 if( *nfiles > 1 ) { // split the ranks according to each mpiio file 663 664 if ( s_assign_local_comm == 0) { // call mpi_comm_split for the first (and only) time 665 666 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Building subcommunicator\n"); 667 668 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 669 MPI_Comm_split(MPI_COMM_WORLD, 670 color, 671 PhastaIOActiveFiles[i]->myrank, 672 &(PhastaIOActiveFiles[i]->local_comm)); 673 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 674 &(PhastaIOActiveFiles[i]->local_numprocs)); 675 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 676 &(PhastaIOActiveFiles[i]->local_myrank)); 677 678 // back up now these variables so that we do not need to call comm_split again 679 s_local_comm = PhastaIOActiveFiles[i]->local_comm; 680 s_local_size = PhastaIOActiveFiles[i]->local_numprocs; 681 s_local_rank = PhastaIOActiveFiles[i]->local_myrank; 682 s_assign_local_comm = 1; 683 } 684 else { // recycle the subcommunicator 685 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Recycling subcommunicator\n"); 686 PhastaIOActiveFiles[i]->local_comm = s_local_comm; 687 PhastaIOActiveFiles[i]->local_numprocs = s_local_size; 688 PhastaIOActiveFiles[i]->local_myrank = s_local_rank; 689 } 690 } 691 else { // *nfiles == 1 here - no need to call mpi_comm_split here 692 693 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Bypassing subcommunicator\n"); 694 PhastaIOActiveFiles[i]->local_comm = MPI_COMM_WORLD; 695 PhastaIOActiveFiles[i]->local_numprocs = PhastaIOActiveFiles[i]->numprocs; 696 PhastaIOActiveFiles[i]->local_myrank = PhastaIOActiveFiles[i]->myrank; 697 698 } 699 700 PhastaIOActiveFiles[i]->nppp = 701 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 702 703 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 704 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 705 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 706 707 PhastaIOActiveFiles[i]->my_offset_table = 708 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 709 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof(unsigned long long *) ); 710 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table; 711 //if( mem_address & (pool_align -1) ) 712 // PhastaIOActiveFiles[i]->my_offset_table += pool_align - (mem_address & (pool_align -1)); 713 714 PhastaIOActiveFiles[i]->my_read_table = 715 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 716 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof( unsigned long long *) ); 717 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table; 718 //if( mem_address & (pool_align -1) ) 719 // PhastaIOActiveFiles[i]->my_read_table += pool_align - (mem_address & (pool_align -1)); 720 721 for (j=0; j<*nfields; j++) 722 { 723 PhastaIOActiveFiles[i]->my_offset_table[j] = 724 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 725 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp + pool_align, sizeof( unsigned long long) ); 726 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table[j]; 727 //if( mem_address & (pool_align -1) ) 728 // PhastaIOActiveFiles[i]->my_offset_table[j] += pool_align - (mem_address & (pool_align -1)); 729 730 PhastaIOActiveFiles[i]->my_read_table[j] = 731 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 732 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) + pool_align ); 733 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table[j]; 734 //if( mem_address & (pool_align -1) ) 735 // PhastaIOActiveFiles[i]->my_read_table[j] += pool_align - (mem_address & (pool_align -1)); 736 } 737 *filehandle = i; 738 739 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 740 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 741 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 742 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 743 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 744 745 /* 746 PhastaIOActiveFiles[i]->master_header = 747 ( char * ) calloc( MasterHeaderSize + pool_align, sizeof( char ) ); 748 mem_address = (long long )PhastaIOActiveFiles[i]->master_header; 749 if( mem_address & (pool_align -1) ) 750 PhastaIOActiveFiles[i]->master_header += pool_align - (mem_address & (pool_align -1)); 751 752 PhastaIOActiveFiles[i]->double_chunk = 753 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 754 mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 755 if( mem_address & (pool_align -1) ) 756 PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 757 758 PhastaIOActiveFiles[i]->int_chunk = 759 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 760 mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 761 if( mem_address & (pool_align -1) ) 762 PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & (pool_align -1)); 763 764 PhastaIOActiveFiles[i]->read_double_chunk = 765 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 766 mem_address = (long long )PhastaIOActiveFiles[i]->read_double_chunk; 767 if( mem_address & (pool_align -1) ) 768 PhastaIOActiveFiles[i]->read_double_chunk += pool_align - (mem_address & (pool_align -1)); 769 770 PhastaIOActiveFiles[i]->read_int_chunk = 771 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 772 mem_address = (long long )PhastaIOActiveFiles[i]->read_int_chunk; 773 if( mem_address & (pool_align -1) ) 774 PhastaIOActiveFiles[i]->read_int_chunk += pool_align - (mem_address & (pool_align -1)); 775 */ 776 777 // Time monitoring 778 endTimer(&timer_end); 779 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 780 781 phprintf_0("Info initphmpiio: quiting function"); 782 783 return i; 784 } 785 786 /** 787 * Destruct the file struct and free buffers allocated in init function. 788 */ 789 void finalizephmpiio( int *fileDescriptor ) 790 { 791 double timer_start, timer_end; 792 startTimer(&timer_start); 793 794 int i, j; 795 i = *fileDescriptor; 796 //PhastaIONextActiveIndex--; 797 798 /* //free the offset table for this phasta file */ 799 //for(j=0; j<MAX_FIELDS_NUMBER; j++) //Danger: undefined behavior for my_*_table.[j] not allocated or not initialized to NULL 800 for(j=0; j<PhastaIOActiveFiles[i]->nFields; j++) 801 { 802 free( PhastaIOActiveFiles[i]->my_offset_table[j]); 803 free( PhastaIOActiveFiles[i]->my_read_table[j]); 804 } 805 free ( PhastaIOActiveFiles[i]->my_offset_table ); 806 free ( PhastaIOActiveFiles[i]->my_read_table ); 807 free ( PhastaIOActiveFiles[i]->master_header ); 808 free ( PhastaIOActiveFiles[i]->double_chunk ); 809 free ( PhastaIOActiveFiles[i]->int_chunk ); 810 free ( PhastaIOActiveFiles[i]->read_double_chunk ); 811 free ( PhastaIOActiveFiles[i]->read_int_chunk ); 812 813 if( PhastaIOActiveFiles[i]->nFiles > 1 && s_assign_local_comm ) { // the comm was split 814 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Freeing subcommunicator\n"); 815 s_assign_local_comm = 0; 816 MPI_Comm_free(&(PhastaIOActiveFiles[i]->local_comm)); 817 } 818 819 free( PhastaIOActiveFiles[i]); 820 821 endTimer(&timer_end); 822 printPerf("finalizempiio", timer_start, timer_end, 0, 0, ""); 823 824 PhastaIONextActiveIndex--; 825 } 826 827 828 /** 829 * Special init for M2N in order to create a subcommunicator for the reduced solution (requires PRINT_PERF to be false for now) 830 * Initialize the file struct members and allocate space for file struct buffers. 831 * 832 * Note: this function is only called when we are using new format. Old POSIX 833 * format should skip this routine and call openfile() directly instead. 834 */ 835 int initphmpiiosub( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[],MPI_Comm my_local_comm) 836 { 837 // we init irank again in case query not called (e.g. syncIO write case) 838 839 MPI_Comm_rank(my_local_comm, &irank); 840 MPI_Comm_size(my_local_comm, &mysize); 841 842 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 843 844 double timer_start, timer_end; 845 startTimer(&timer_start); 846 847 char* imode = StringStripper( mode ); 848 849 // Note: if it's read, we presume query was called prior to init and 850 // MasterHeaderSize is already set to correct value from parsing header 851 // otherwise it's write then it needs some computation to be set 852 if ( cscompare( "read", imode ) ) { 853 // do nothing 854 } 855 else if( cscompare( "write", imode ) ) { 856 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 857 } 858 else { 859 printf("Error initphmpiio: can't recognize the mode %s", imode); 860 exit(1); 861 } 862 free ( imode ); 863 864 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 865 866 int i, j; 867 868 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 869 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 870 endTimer(&timer_end); 871 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 872 return MAX_PHASTA_FILES_EXCEEDED; 873 } 874 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 875 // { 876 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 877 // { 878 // PhastaIOActiveFiles[i] = NULL; 879 // } 880 // } 881 882 883 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 884 //PhastaIOActiveFiles[PhastaIONextActiveIndex] = ( phastaio_file_t *)calloc( 1 + 1, sizeof( phastaio_file_t ) ); 885 //mem_address = (long long )PhastaIOActiveFiles[PhastaIONextActiveIndex]; 886 //if( mem_address & (pool_align -1) ) 887 // PhastaIOActiveFiles[PhastaIONextActiveIndex] += pool_align - (mem_address & (pool_align -1)); 888 889 i = PhastaIONextActiveIndex; 890 PhastaIONextActiveIndex++; 891 892 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 893 894 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 895 896 PhastaIOActiveFiles[i]->Wrong_Endian = false; 897 898 PhastaIOActiveFiles[i]->nFields = *nfields; 899 PhastaIOActiveFiles[i]->nPPF = *nppf; 900 PhastaIOActiveFiles[i]->nFiles = *nfiles; 901 MPI_Comm_rank(my_local_comm, &(PhastaIOActiveFiles[i]->myrank)); 902 MPI_Comm_size(my_local_comm, &(PhastaIOActiveFiles[i]->numprocs)); 903 904 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 905 MPI_Comm_split(my_local_comm, 906 color, 907 PhastaIOActiveFiles[i]->myrank, 908 &(PhastaIOActiveFiles[i]->local_comm)); 909 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 910 &(PhastaIOActiveFiles[i]->local_numprocs)); 911 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 912 &(PhastaIOActiveFiles[i]->local_myrank)); 913 PhastaIOActiveFiles[i]->nppp = 914 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 915 916 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 917 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 918 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 919 920 PhastaIOActiveFiles[i]->my_offset_table = 921 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 922 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof(unsigned long long *) ); 923 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table; 924 //if( mem_address & (pool_align -1) ) 925 // PhastaIOActiveFiles[i]->my_offset_table += pool_align - (mem_address & (pool_align -1)); 926 927 PhastaIOActiveFiles[i]->my_read_table = 928 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 929 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof( unsigned long long *) ); 930 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table; 931 //if( mem_address & (pool_align -1) ) 932 // PhastaIOActiveFiles[i]->my_read_table += pool_align - (mem_address & (pool_align -1)); 933 934 for (j=0; j<*nfields; j++) 935 { 936 PhastaIOActiveFiles[i]->my_offset_table[j] = 937 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 938 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp + pool_align, sizeof( unsigned long long) ); 939 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table[j]; 940 //if( mem_address & (pool_align -1) ) 941 // PhastaIOActiveFiles[i]->my_offset_table[j] += pool_align - (mem_address & (pool_align -1)); 942 943 PhastaIOActiveFiles[i]->my_read_table[j] = 944 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 945 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) + pool_align ); 946 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table[j]; 947 //if( mem_address & (pool_align -1) ) 948 // PhastaIOActiveFiles[i]->my_read_table[j] += pool_align - (mem_address & (pool_align -1)); 949 } 950 *filehandle = i; 951 952 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 953 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 954 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 955 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 956 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 957 958 /* 959 PhastaIOActiveFiles[i]->master_header = 960 ( char * ) calloc( MasterHeaderSize + pool_align, sizeof( char ) ); 961 mem_address = (long long )PhastaIOActiveFiles[i]->master_header; 962 if( mem_address & (pool_align -1) ) 963 PhastaIOActiveFiles[i]->master_header += pool_align - (mem_address & (pool_align -1)); 964 965 PhastaIOActiveFiles[i]->double_chunk = 966 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 967 mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 968 if( mem_address & (pool_align -1) ) 969 PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 970 971 PhastaIOActiveFiles[i]->int_chunk = 972 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 973 mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 974 if( mem_address & (pool_align -1) ) 975 PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & (pool_align -1)); 976 977 PhastaIOActiveFiles[i]->read_double_chunk = 978 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 979 mem_address = (long long )PhastaIOActiveFiles[i]->read_double_chunk; 980 if( mem_address & (pool_align -1) ) 981 PhastaIOActiveFiles[i]->read_double_chunk += pool_align - (mem_address & (pool_align -1)); 982 983 PhastaIOActiveFiles[i]->read_int_chunk = 984 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 985 mem_address = (long long )PhastaIOActiveFiles[i]->read_int_chunk; 986 if( mem_address & (pool_align -1) ) 987 PhastaIOActiveFiles[i]->read_int_chunk += pool_align - (mem_address & (pool_align -1)); 988 */ 989 990 // Time monitoring 991 endTimer(&timer_end); 992 printPerf("initphmpiiosub", timer_start, timer_end, 0, 0, ""); 993 994 phprintf_0("Info initphmpiiosub: quiting function"); 995 996 return i; 997 } 998 999 1000 1001 /** open file for both POSIX and MPI-IO syncIO format. 1002 * 1003 * If it's old POSIX format, simply call posix fopen(). 1004 * 1005 * If it's MPI-IO foramt: 1006 * in "read" mode, it builds the header table that points to the offset of 1007 * fields for parts; 1008 * in "write" mode, it opens the file with MPI-IO open routine. 1009 */ 1010 void openfile(const char filename[], 1011 const char mode[], 1012 int* fileDescriptor ) 1013 { 1014 phprintf_0("Info: entering openfile"); 1015 1016 double timer_start, timer_end; 1017 startTimer(&timer_start); 1018 1019 if ( PhastaIONextActiveIndex == 0 ) 1020 { 1021 FILE* file=NULL ; 1022 *fileDescriptor = 0; 1023 char* fname = StringStripper( filename ); 1024 char* imode = StringStripper( mode ); 1025 1026 if ( cscompare( "read", imode ) ) file = fopen(fname, "rb" ); 1027 else if( cscompare( "write", imode ) ) file = fopen(fname, "wb" ); 1028 else if( cscompare( "append", imode ) ) file = fopen(fname, "ab" ); 1029 1030 if ( !file ){ 1031 fprintf(stderr,"Error openfile: unable to open file %s",fname ) ; 1032 } else { 1033 fileArray.push_back( file ); 1034 byte_order.push_back( false ); 1035 header_type.push_back( sizeof(int) ); 1036 *fileDescriptor = fileArray.size(); 1037 } 1038 free (fname); 1039 free (imode); 1040 } 1041 else // else it would be parallel I/O, opposed to posix io 1042 { 1043 char* fname = StringStripper( filename ); 1044 char* imode = StringStripper( mode ); 1045 int rc; 1046 int i = *fileDescriptor; 1047 checkFileDescriptor("openfile",&i); 1048 char* token; 1049 1050 if ( cscompare( "read", imode ) ) 1051 { 1052 // if (PhastaIOActiveFiles[i]->myrank == 0) 1053 // printf("\n **********\nRead open ... ... regular version\n"); 1054 1055 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1056 fname, 1057 MPI_MODE_RDONLY, 1058 MPI_INFO_NULL, 1059 &(PhastaIOActiveFiles[i]->file_handle) ); 1060 1061 if(rc) 1062 { 1063 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1064 printf("Error openfile: Unable to open file %s! File descriptor = %d\n",fname,*fileDescriptor); 1065 endTimer(&timer_end); 1066 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1067 return; 1068 } 1069 1070 MPI_Status read_tag_status; 1071 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1072 int j; 1073 int magic_number; 1074 1075 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1076 MPI_File_read_at( PhastaIOActiveFiles[i]->file_handle, 1077 0, 1078 PhastaIOActiveFiles[i]->master_header, 1079 MasterHeaderSize, 1080 MPI_CHAR, 1081 &read_tag_status ); 1082 } 1083 1084 MPI_Bcast( PhastaIOActiveFiles[i]->master_header, 1085 MasterHeaderSize, 1086 MPI_CHAR, 1087 0, 1088 PhastaIOActiveFiles[i]->local_comm ); 1089 1090 memcpy( read_out_tag, 1091 PhastaIOActiveFiles[i]->master_header, 1092 MAX_FIELDS_NAME_LENGTH-1 ); 1093 1094 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) 1095 { 1096 // Test endianess ... 1097 memcpy ( &magic_number, 1098 PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1099 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 1100 1101 if ( magic_number != ENDIAN_TEST_NUMBER ) 1102 { 1103 PhastaIOActiveFiles[i]->Wrong_Endian = true; 1104 } 1105 1106 memcpy( read_out_tag, 1107 PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH+1, // TODO: WHY +1??? 1108 MAX_FIELDS_NAME_LENGTH ); 1109 1110 // Read in # fields ... 1111 token = strtok ( read_out_tag, ":" ); 1112 token = strtok( NULL," ,;<>" ); 1113 PhastaIOActiveFiles[i]->nFields = atoi( token ); 1114 1115 unsigned long long **header_table; 1116 header_table = ( unsigned long long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long long *)); 1117 //header_table = ( unsigned long long ** ) calloc( PhastaIOActiveFiles[i]->nFields + pool_align, sizeof(unsigned long long *)); 1118 //mem_address = (long long )header_table; 1119 //if( mem_address & (pool_align -1) ) 1120 // header_table += pool_align - (mem_address & (pool_align -1)); 1121 1122 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1123 { 1124 header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long long)); 1125 //header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF + pool_align, sizeof(unsigned long long *)); 1126 //mem_address = (long long )header_table[j]; 1127 //if( mem_address & (pool_align -1) ) 1128 // header_table[j] += pool_align - (mem_address & (pool_align -1)); 1129 } 1130 1131 // Read in the offset table ... 1132 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1133 { 1134 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1135 memcpy( header_table[j], 1136 PhastaIOActiveFiles[i]->master_header + 1137 VERSION_INFO_HEADER_SIZE + 1138 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long), 1139 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long) ); 1140 } 1141 1142 MPI_Scatter( header_table[j], 1143 PhastaIOActiveFiles[i]->nppp, 1144 MPI_LONG_LONG_INT, 1145 PhastaIOActiveFiles[i]->my_read_table[j], 1146 PhastaIOActiveFiles[i]->nppp, 1147 MPI_LONG_LONG_INT, 1148 0, 1149 PhastaIOActiveFiles[i]->local_comm ); 1150 1151 // Swap byte order if endianess is different ... 1152 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) { 1153 SwapArrayByteOrder( PhastaIOActiveFiles[i]->my_read_table[j], 1154 sizeof(long long int), 1155 PhastaIOActiveFiles[i]->nppp ); 1156 } 1157 } 1158 1159 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1160 free ( header_table[j] ); 1161 } 1162 free (header_table); 1163 1164 } // end of if MPI_IO_TAG 1165 else //else not valid MPI file 1166 { 1167 *fileDescriptor = NOT_A_MPI_FILE; 1168 printf("Error openfile: The file %s you opened is not in syncIO new format, please check again! File descriptor = %d, MasterHeaderSize = %d, read_out_tag = %s\n",fname,*fileDescriptor,MasterHeaderSize,read_out_tag); 1169 //Printing MasterHeaderSize is useful to test a compiler bug on Intrepid BGP 1170 endTimer(&timer_end); 1171 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1172 return; 1173 } 1174 } // end of if "read" 1175 else if( cscompare( "write", imode ) ) 1176 { 1177 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1178 fname, 1179 MPI_MODE_WRONLY | MPI_MODE_CREATE, 1180 MPI_INFO_NULL, 1181 &(PhastaIOActiveFiles[i]->file_handle) ); 1182 if(rc) 1183 { 1184 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1185 return; 1186 } 1187 } // end of if "write" 1188 free (fname); 1189 free (imode); 1190 } // end of if FileIndex != 0 1191 1192 endTimer(&timer_end); 1193 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1194 } 1195 1196 /** close file for both POSIX and MPI-IO syncIO format. 1197 * 1198 * If it's old POSIX format, simply call posix fclose(). 1199 * 1200 * If it's MPI-IO foramt: 1201 * in "read" mode, it simply close file with MPI-IO close routine. 1202 * in "write" mode, rank 0 in each group will re-assemble the master header and 1203 * offset table and write to the beginning of file, then close the file. 1204 */ 1205 void closefile( int* fileDescriptor, 1206 const char mode[] ) 1207 { 1208 double timer_start, timer_end; 1209 startTimer(&timer_start); 1210 1211 int i = *fileDescriptor; 1212 checkFileDescriptor("closefile",&i); 1213 1214 if ( PhastaIONextActiveIndex == 0 ) { 1215 char* imode = StringStripper( mode ); 1216 1217 if( cscompare( "write", imode ) 1218 || cscompare( "append", imode ) ) { 1219 fflush( fileArray[ *fileDescriptor - 1 ] ); 1220 } 1221 1222 fclose( fileArray[ *fileDescriptor - 1 ] ); 1223 free (imode); 1224 } 1225 else { 1226 char* imode = StringStripper( mode ); 1227 1228 //write master header here: 1229 if ( cscompare( "write", imode ) ) { 1230 // if ( PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields < 2*ONE_MEGABYTE/8 ) //SHOULD BE CHECKED 1231 // MasterHeaderSize = 4*ONE_MEGABYTE; 1232 // else 1233 // MasterHeaderSize = 4*ONE_MEGABYTE + PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields * 8 - 2*ONE_MEGABYTE; 1234 1235 MasterHeaderSize = computeMHSize( PhastaIOActiveFiles[i]->nFields, PhastaIOActiveFiles[i]->nPPF, LATEST_WRITE_VERSION); 1236 phprintf_0("Info closefile: myrank = %d, MasterHeaderSize = %d\n", PhastaIOActiveFiles[i]->myrank, MasterHeaderSize); 1237 1238 MPI_Status write_header_status; 1239 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1240 char version[MAX_FIELDS_NAME_LENGTH/4]; 1241 char mhsize[MAX_FIELDS_NAME_LENGTH/4]; 1242 int magic_number = ENDIAN_TEST_NUMBER; 1243 1244 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) 1245 { 1246 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1247 sprintf(mpi_tag, "MPI_IO_Tag : "); 1248 memcpy(PhastaIOActiveFiles[i]->master_header, 1249 mpi_tag, 1250 MAX_FIELDS_NAME_LENGTH); 1251 1252 bzero((void*)version,MAX_FIELDS_NAME_LENGTH/4); 1253 // this version is "1", print version in ASCII 1254 sprintf(version, "version : %d",1); 1255 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/2, 1256 version, 1257 MAX_FIELDS_NAME_LENGTH/4); 1258 1259 // master header size is computed using the formula above 1260 bzero((void*)mhsize,MAX_FIELDS_NAME_LENGTH/4); 1261 sprintf(mhsize, "mhsize : "); 1262 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/4*3, 1263 mhsize, 1264 MAX_FIELDS_NAME_LENGTH/4); 1265 1266 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1267 sprintf(mpi_tag, 1268 "\nnFields : %d\n", 1269 PhastaIOActiveFiles[i]->nFields); 1270 memcpy(PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH, 1271 mpi_tag, 1272 MAX_FIELDS_NAME_LENGTH); 1273 1274 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1275 sprintf(mpi_tag, "\nnPPF : %d\n", PhastaIOActiveFiles[i]->nPPF); 1276 memcpy( PhastaIOActiveFiles[i]->master_header+ 1277 PhastaIOActiveFiles[i]->nFields * 1278 MAX_FIELDS_NAME_LENGTH + 1279 MAX_FIELDS_NAME_LENGTH * 2, 1280 mpi_tag, 1281 MAX_FIELDS_NAME_LENGTH); 1282 1283 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1284 &magic_number, 1285 sizeof(int)); 1286 1287 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("mhsize : ") -1 + MAX_FIELDS_NAME_LENGTH/4*3, 1288 &MasterHeaderSize, 1289 sizeof(int)); 1290 } 1291 1292 int j = 0; 1293 unsigned long long **header_table; 1294 header_table = ( unsigned long long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long long *)); 1295 //header_table = ( unsigned long long ** ) calloc( PhastaIOActiveFiles[i]->nFields + pool_align, sizeof(unsigned long long *)); 1296 //mem_address = (long long )header_table; 1297 //if( mem_address & (pool_align -1) ) 1298 // header_table += pool_align - (mem_address & (pool_align -1)); 1299 1300 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1301 header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long long)); 1302 //header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF + pool_align, sizeof (unsigned long long *)); 1303 //mem_address = (long long )header_table[j]; 1304 //if( mem_address & (pool_align -1) ) 1305 // header_table[j] += pool_align - (mem_address & (pool_align - 1)); 1306 } 1307 1308 //if( irank == 0 ) printf("gonna mpi_gather, myrank = %d\n", irank); 1309 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1310 MPI_Gather( PhastaIOActiveFiles[i]->my_offset_table[j], 1311 PhastaIOActiveFiles[i]->nppp, 1312 MPI_LONG_LONG_INT, 1313 header_table[j], 1314 PhastaIOActiveFiles[i]->nppp, 1315 MPI_LONG_LONG_INT, 1316 0, 1317 PhastaIOActiveFiles[i]->local_comm ); 1318 } 1319 1320 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1321 1322 //if( irank == 0 ) printf("gonna memcpy for every procs, myrank = %d\n", irank); 1323 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1324 memcpy ( PhastaIOActiveFiles[i]->master_header + 1325 VERSION_INFO_HEADER_SIZE + 1326 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long), 1327 header_table[j], 1328 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long) ); 1329 } 1330 1331 //if( irank == 0 ) printf("gonna file_write_at(), myrank = %d\n", irank); 1332 MPI_File_write_at( PhastaIOActiveFiles[i]->file_handle, 1333 0, 1334 PhastaIOActiveFiles[i]->master_header, 1335 MasterHeaderSize, 1336 MPI_CHAR, 1337 &write_header_status ); 1338 } 1339 1340 ////free(PhastaIOActiveFiles[i]->master_header); 1341 1342 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1343 free ( header_table[j] ); 1344 } 1345 free (header_table); 1346 } 1347 1348 //if( irank == 0 ) printf("gonna file_close(), myrank = %d\n", irank); 1349 MPI_File_close( &( PhastaIOActiveFiles[i]->file_handle ) ); 1350 free ( imode ); 1351 } 1352 1353 endTimer(&timer_end); 1354 printPerf("closefile_", timer_start, timer_end, 0, 0, ""); 1355 } 1356 1357 void readheader( int* fileDescriptor, 1358 const char keyphrase[], 1359 void* valueArray, 1360 int* nItems, 1361 const char datatype[], 1362 const char iotype[] ) 1363 { 1364 double timer_start, timer_end; 1365 1366 startTimer(&timer_start); 1367 1368 int i = *fileDescriptor; 1369 checkFileDescriptor("readheader",&i); 1370 1371 if ( PhastaIONextActiveIndex == 0 ) { 1372 int filePtr = *fileDescriptor - 1; 1373 FILE* fileObject; 1374 int* valueListInt; 1375 1376 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1377 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1378 fprintf(stderr,"openfile_ function has to be called before \n") ; 1379 fprintf(stderr,"acessing the file\n ") ; 1380 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1381 endTimer(&timer_end); 1382 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1383 return; 1384 } 1385 1386 LastHeaderKey[filePtr] = std::string(keyphrase); 1387 LastHeaderNotFound = false; 1388 1389 fileObject = fileArray[ filePtr ] ; 1390 Wrong_Endian = byte_order[ filePtr ]; 1391 1392 isBinary( iotype ); 1393 typeSize( datatype ); //redundant call, just avoid a compiler warning. 1394 1395 // right now we are making the assumption that we will only write integers 1396 // on the header line. 1397 1398 valueListInt = static_cast< int* >( valueArray ); 1399 int ierr = readHeader( fileObject , 1400 keyphrase, 1401 valueListInt, 1402 *nItems ) ; 1403 1404 byte_order[ filePtr ] = Wrong_Endian ; 1405 1406 if ( ierr ) LastHeaderNotFound = true; 1407 1408 //return ; // don't return, go to the end to print perf 1409 } 1410 else { 1411 unsigned int skip_size; 1412 int* valueListInt; 1413 valueListInt = static_cast <int*>(valueArray); 1414 char* token = NULL; 1415 bool FOUND = false ; 1416 isBinary( iotype ); 1417 1418 MPI_Status read_offset_status; 1419 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1420 memset(read_out_tag, '\0', MAX_FIELDS_NAME_LENGTH); 1421 char readouttag[MAX_FIELDS_NUMBER][MAX_FIELDS_NAME_LENGTH]; 1422 int j; 1423 1424 int string_length = strlen( keyphrase ); 1425 char* buffer = (char*) malloc ( string_length+1 ); 1426 //char* buffer = ( char * ) malloc( string_length + 1 + pool_align ); 1427 //mem_address = (long long )buffer; 1428 //if( mem_address & (pool_align -1) ) 1429 // buffer += pool_align - (mem_address & (pool_align -1)); 1430 1431 strcpy ( buffer, keyphrase ); 1432 buffer[ string_length ] = '\0'; 1433 1434 char* st2 = strtok ( buffer, "@" ); 1435 st2 = strtok (NULL, "@"); 1436 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1437 if ( char* p = strpbrk(buffer, "@") ) 1438 *p = '\0'; 1439 1440 // Check if the user has input the right GPid 1441 if ( ( PhastaIOActiveFiles[i]->GPid <= 1442 PhastaIOActiveFiles[i]->myrank * 1443 PhastaIOActiveFiles[i]->nppp )|| 1444 ( PhastaIOActiveFiles[i]->GPid > 1445 ( PhastaIOActiveFiles[i]->myrank + 1 ) * 1446 PhastaIOActiveFiles[i]->nppp ) ) 1447 { 1448 *fileDescriptor = NOT_A_MPI_FILE; 1449 printf("Error readheader: The file is not in syncIO new format, please check! myrank = %d, GPid = %d, nppp = %d, keyphrase = %s\n", PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->GPid, PhastaIOActiveFiles[i]->nppp, keyphrase); 1450 // It is possible atoi could not generate a clear integer from st2 because of additional garbage character in keyphrase 1451 endTimer(&timer_end); 1452 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1453 return; 1454 } 1455 1456 // Find the field we want ... 1457 //for ( j = 0; j<MAX_FIELDS_NUMBER; j++ ) 1458 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1459 { 1460 memcpy( readouttag[j], 1461 PhastaIOActiveFiles[i]->master_header + j*MAX_FIELDS_NAME_LENGTH+MAX_FIELDS_NAME_LENGTH*2+1, 1462 MAX_FIELDS_NAME_LENGTH-1 ); 1463 } 1464 1465 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1466 { 1467 token = strtok ( readouttag[j], ":" ); 1468 1469 //if ( cscompare( buffer, token ) ) 1470 if ( cscompare( token , buffer ) && cscompare( buffer, token ) ) 1471 // This double comparison is required for the field "number of nodes" and all the other fields that start with "number of nodes" (i.g. number of nodes in the mesh"). 1472 // Would be safer to rename "number of nodes" by "number of nodes in the part" so that the name are completely unique. But much more work to do that (Nspre, phParAdapt, etc). 1473 // Since the field name are unique in SyncIO (as it includes part ID), this should be safe and there should be no issue with the "?" trailing character. 1474 { 1475 PhastaIOActiveFiles[i]->read_field_count = j; 1476 FOUND = true; 1477 //printf("buffer: %s | token: %s | j: %d\n",buffer,token,j); 1478 break; 1479 } 1480 } 1481 free(buffer); 1482 1483 if (!FOUND) 1484 { 1485 //if(irank==0) printf("Warning readheader: Not found %s \n",keyphrase); //PhastaIOActiveFiles[i]->myrank is certainly initialized here. 1486 if(PhastaIOActiveFiles[i]->myrank == 0) printf("WARNING readheader: Not found %s\n",keyphrase); 1487 endTimer(&timer_end); 1488 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1489 return; 1490 } 1491 1492 // Find the part we want ... 1493 PhastaIOActiveFiles[i]->read_part_count = PhastaIOActiveFiles[i]->GPid - 1494 PhastaIOActiveFiles[i]->myrank * PhastaIOActiveFiles[i]->nppp - 1; 1495 1496 PhastaIOActiveFiles[i]->my_offset = 1497 PhastaIOActiveFiles[i]->my_read_table[PhastaIOActiveFiles[i]->read_field_count][PhastaIOActiveFiles[i]->read_part_count]; 1498 1499 // printf("****Rank %d offset is %d\n",PhastaIOActiveFiles[i]->myrank,PhastaIOActiveFiles[i]->my_offset); 1500 1501 // Read each datablock header here ... 1502 1503 MPI_File_read_at_all( PhastaIOActiveFiles[i]->file_handle, 1504 PhastaIOActiveFiles[i]->my_offset+1, 1505 read_out_tag, 1506 MAX_FIELDS_NAME_LENGTH-1, 1507 MPI_CHAR, 1508 &read_offset_status ); 1509 token = strtok ( read_out_tag, ":" ); 1510 1511 // printf("&&&&Rank %d read_out_tag is %s\n",PhastaIOActiveFiles[i]->myrank,read_out_tag); 1512 1513 if( cscompare( keyphrase , token ) ) //No need to compare also token with keyphrase like above. We should already have the right one. Otherwise there is a problem. 1514 { 1515 FOUND = true ; 1516 token = strtok( NULL, " ,;<>" ); 1517 skip_size = atoi( token ); 1518 for( j=0; j < *nItems && ( token = strtok( NULL," ,;<>") ); j++ ) 1519 valueListInt[j] = atoi( token ); 1520 1521 if ( j < *nItems ) 1522 { 1523 fprintf( stderr, "Expected # of ints not found for: %s\n", keyphrase ); 1524 } 1525 } 1526 else { 1527 //if(irank==0) 1528 if(PhastaIOActiveFiles[i]->myrank == 0) 1529 // If we enter this if, there is a problem with the name of some fields 1530 { 1531 printf("Error readheader: Unexpected mismatch between keyphrase = %s and token = %s\n",keyphrase,token); 1532 } 1533 } 1534 } 1535 1536 endTimer(&timer_end); 1537 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1538 1539 } 1540 1541 void readdatablock( int* fileDescriptor, 1542 const char keyphrase[], 1543 void* valueArray, 1544 int* nItems, 1545 const char datatype[], 1546 const char iotype[] ) 1547 { 1548 //if(irank == 0) printf("entering readdatablock()\n"); 1549 unsigned long long data_size = 0; 1550 double timer_start, timer_end; 1551 startTimer(&timer_start); 1552 1553 int i = *fileDescriptor; 1554 checkFileDescriptor("readdatablock",&i); 1555 1556 if ( PhastaIONextActiveIndex == 0 ) { 1557 int filePtr = *fileDescriptor - 1; 1558 FILE* fileObject; 1559 char junk; 1560 1561 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1562 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1563 fprintf(stderr,"openfile_ function has to be called before\n") ; 1564 fprintf(stderr,"acessing the file\n ") ; 1565 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1566 endTimer(&timer_end); 1567 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1568 return; 1569 } 1570 1571 // error check.. 1572 // since we require that a consistant header always preceed the data block 1573 // let us check to see that it is actually the case. 1574 1575 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1576 fprintf(stderr, "Header not consistant with data block\n"); 1577 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1578 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1579 fprintf(stderr, "Please recheck read sequence \n"); 1580 if( Strict_Error ) { 1581 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1582 endTimer(&timer_end); 1583 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1584 return; 1585 } 1586 } 1587 1588 if ( LastHeaderNotFound ) { 1589 endTimer(&timer_end); 1590 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1591 return; 1592 } 1593 fileObject = fileArray[ filePtr ]; 1594 Wrong_Endian = byte_order[ filePtr ]; 1595 1596 size_t type_size = typeSize( datatype ); 1597 int nUnits = *nItems; 1598 isBinary( iotype ); 1599 1600 LastHeaderKey.erase(filePtr); 1601 1602 if ( binary_format ) { 1603 fread( valueArray, type_size, nUnits, fileObject ); 1604 fread( &junk, sizeof(char), 1 , fileObject ); 1605 if ( Wrong_Endian ) SwapArrayByteOrder( valueArray, type_size, nUnits ); 1606 } else { 1607 1608 char* ts1 = StringStripper( datatype ); 1609 if ( cscompare( "integer", ts1 ) ) { 1610 for( int n=0; n < nUnits ; n++ ) 1611 fscanf(fileObject, "%d\n",(int*)((int*)valueArray+n) ); 1612 } else if ( cscompare( "double", ts1 ) ) { 1613 for( int n=0; n < nUnits ; n++ ) 1614 fscanf(fileObject, "%lf\n",(double*)((double*)valueArray+n) ); 1615 } 1616 free (ts1); 1617 } 1618 1619 //return; 1620 } 1621 else { 1622 // printf("read data block\n"); 1623 MPI_Status read_data_status; 1624 size_t type_size = typeSize( datatype ); 1625 int nUnits = *nItems; 1626 isBinary( iotype ); 1627 1628 // read datablock then 1629 //MR CHANGE 1630 // if ( cscompare ( datatype, "double")) 1631 char* ts2 = StringStripper( datatype ); 1632 if ( cscompare ( "double" , ts2)) 1633 //MR CHANGE END 1634 { 1635 1636 MPI_File_read_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1637 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1638 valueArray, 1639 nUnits, 1640 MPI_DOUBLE ); 1641 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1642 valueArray, 1643 &read_data_status ); 1644 data_size=8*nUnits; 1645 1646 } 1647 //MR CHANGE 1648 // else if ( cscompare ( datatype, "integer")) 1649 else if ( cscompare ( "integer" , ts2)) 1650 //MR CHANGE END 1651 { 1652 MPI_File_read_at_all_begin(PhastaIOActiveFiles[i]->file_handle, 1653 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1654 valueArray, 1655 nUnits, 1656 MPI_INT ); 1657 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1658 valueArray, 1659 &read_data_status ); 1660 data_size=4*nUnits; 1661 } 1662 else 1663 { 1664 *fileDescriptor = DATA_TYPE_ILLEGAL; 1665 printf("readdatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 1666 endTimer(&timer_end); 1667 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1668 return; 1669 } 1670 free(ts2); 1671 1672 1673 // printf("%d Read finishe\n",PhastaIOActiveFiles[i]->myrank); 1674 1675 // Swap data byte order if endianess is different ... 1676 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) 1677 { 1678 SwapArrayByteOrder( valueArray, type_size, nUnits ); 1679 } 1680 } 1681 1682 endTimer(&timer_end); 1683 char extra_msg[1024]; 1684 memset(extra_msg, '\0', 1024); 1685 char* key = StringStripper(keyphrase); 1686 sprintf(extra_msg, " field is %s ", key); 1687 printPerf("readdatablock", timer_start, timer_end, data_size, 1, extra_msg); 1688 free(key); 1689 1690 } 1691 1692 void writeheader( const int* fileDescriptor, 1693 const char keyphrase[], 1694 const void* valueArray, 1695 const int* nItems, 1696 const int* ndataItems, 1697 const char datatype[], 1698 const char iotype[]) 1699 { 1700 1701 //if(irank == 0) printf("entering writeheader()\n"); 1702 1703 double timer_start, timer_end; 1704 startTimer(&timer_start); 1705 1706 int i = *fileDescriptor; 1707 checkFileDescriptor("writeheader",&i); 1708 1709 if ( PhastaIONextActiveIndex == 0 ) { 1710 int filePtr = *fileDescriptor - 1; 1711 FILE* fileObject; 1712 1713 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1714 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1715 fprintf(stderr,"openfile_ function has to be called before \n") ; 1716 fprintf(stderr,"acessing the file\n ") ; 1717 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1718 endTimer(&timer_end); 1719 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1720 return; 1721 } 1722 1723 LastHeaderKey[filePtr] = std::string(keyphrase); 1724 DataSize = *ndataItems; 1725 fileObject = fileArray[ filePtr ] ; 1726 size_t type_size = typeSize( datatype ); 1727 isBinary( iotype ); 1728 header_type[ filePtr ] = type_size; 1729 1730 int _newline = ( *ndataItems > 0 ) ? sizeof( char ) : 0; 1731 int size_of_nextblock = 1732 ( binary_format ) ? type_size*( *ndataItems )+ _newline : *ndataItems ; 1733 1734 fprintf( fileObject, "%s : < %d > ", keyphrase, size_of_nextblock ); 1735 for( int i = 0; i < *nItems; i++ ) 1736 fprintf(fileObject, "%d ", *((int*)((int*)valueArray+i))); 1737 fprintf(fileObject, "\n"); 1738 1739 //return ; 1740 } 1741 else { // else it's parallel I/O 1742 DataSize = *ndataItems; 1743 size_t type_size = typeSize( datatype ); 1744 isBinary( iotype ); 1745 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1746 1747 int string_length = strlen( keyphrase ); 1748 char* buffer = (char*) malloc ( string_length+1 ); 1749 //char* buffer = ( char * ) malloc( string_length + 1 + pool_align ); 1750 //mem_address = (long long )buffer; 1751 //if( mem_address & (pool_align -1) ) 1752 // buffer += pool_align - (mem_address & (pool_align -1)); 1753 1754 strcpy ( buffer, keyphrase); 1755 buffer[ string_length ] = '\0'; 1756 1757 char* st2 = strtok ( buffer, "@" ); 1758 st2 = strtok (NULL, "@"); 1759 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1760 1761 if ( char* p = strpbrk(buffer, "@") ) 1762 *p = '\0'; 1763 1764 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1765 sprintf(mpi_tag, "\n%s : %d\n", buffer, PhastaIOActiveFiles[i]->field_count); 1766 unsigned long long offset_value; 1767 1768 int temp = *ndataItems; 1769 unsigned long long number_of_items = (unsigned long long)temp; 1770 MPI_Barrier(PhastaIOActiveFiles[i]->local_comm); 1771 1772 MPI_Scan( &number_of_items, 1773 &offset_value, 1774 1, 1775 MPI_LONG_LONG_INT, 1776 MPI_SUM, 1777 PhastaIOActiveFiles[i]->local_comm ); 1778 1779 offset_value = (offset_value - number_of_items) * type_size; 1780 1781 offset_value += PhastaIOActiveFiles[i]->local_myrank * 1782 DB_HEADER_SIZE + 1783 PhastaIOActiveFiles[i]->next_start_address; 1784 // This offset is the starting address of each datablock header... 1785 PhastaIOActiveFiles[i]->my_offset = offset_value; 1786 1787 // Write in my offset table ... 1788 PhastaIOActiveFiles[i]->my_offset_table[PhastaIOActiveFiles[i]->field_count][PhastaIOActiveFiles[i]->part_count] = 1789 PhastaIOActiveFiles[i]->my_offset; 1790 1791 // Update the next-start-address ... 1792 PhastaIOActiveFiles[i]->next_start_address = offset_value + 1793 number_of_items * type_size + 1794 DB_HEADER_SIZE; 1795 MPI_Bcast( &(PhastaIOActiveFiles[i]->next_start_address), 1796 1, 1797 MPI_LONG_LONG_INT, 1798 PhastaIOActiveFiles[i]->local_numprocs-1, 1799 PhastaIOActiveFiles[i]->local_comm ); 1800 1801 // Prepare datablock header ... 1802 int _newline = (*ndataItems>0)?sizeof(char):0; 1803 unsigned int size_of_nextblock = type_size * (*ndataItems) + _newline; 1804 1805 //char datablock_header[255]; 1806 //bzero((void*)datablock_header,255); 1807 char datablock_header[DB_HEADER_SIZE]; 1808 bzero((void*)datablock_header,DB_HEADER_SIZE); 1809 1810 PhastaIOActiveFiles[i]->GPid = PhastaIOActiveFiles[i]->nppp*PhastaIOActiveFiles[i]->myrank+PhastaIOActiveFiles[i]->part_count; 1811 sprintf( datablock_header, 1812 "\n%s : < %u >", 1813 keyphrase, 1814 size_of_nextblock ); 1815 1816 for ( int j = 0; j < *nItems; j++ ) 1817 { 1818 sprintf( datablock_header, 1819 "%s %d ", 1820 datablock_header, 1821 *((int*)((int*)valueArray+j))); 1822 } 1823 sprintf( datablock_header, 1824 "%s\n ", 1825 datablock_header ); 1826 1827 // Write datablock header ... 1828 //MR CHANGE 1829 // if ( cscompare(datatype,"double") ) 1830 char* ts1 = StringStripper( datatype ); 1831 if ( cscompare("double",ts1) ) 1832 //MR CHANGE END 1833 { 1834 free ( PhastaIOActiveFiles[i]->double_chunk ); 1835 PhastaIOActiveFiles[i]->double_chunk = ( double * )malloc( (sizeof( double )*number_of_items+ DB_HEADER_SIZE)); 1836 //PhastaIOActiveFiles[i]->double_chunk = ( double * ) malloc( sizeof( double )*number_of_items+ DB_HEADER_SIZE + pool_align ); 1837 //mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 1838 //if( mem_address & (pool_align -1) ) 1839 // PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 1840 1841 double * aa = ( double * )datablock_header; 1842 memcpy(PhastaIOActiveFiles[i]->double_chunk, aa, DB_HEADER_SIZE); 1843 } 1844 //MR CHANGE 1845 // if ( cscompare(datatype,"integer") ) 1846 else if ( cscompare("integer",ts1) ) 1847 //MR CHANGE END 1848 { 1849 free ( PhastaIOActiveFiles[i]->int_chunk ); 1850 PhastaIOActiveFiles[i]->int_chunk = ( int * )malloc( (sizeof( int )*number_of_items+ DB_HEADER_SIZE)); 1851 //PhastaIOActiveFiles[i]->int_chunk = ( int * ) malloc( sizeof( int )*number_of_items+ DB_HEADER_SIZE + pool_align ); 1852 //mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 1853 //if( mem_address & (pool_align -1) ) 1854 // PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & ( pool_align -1)); 1855 1856 int * aa = ( int * )datablock_header; 1857 memcpy(PhastaIOActiveFiles[i]->int_chunk, aa, DB_HEADER_SIZE); 1858 } 1859 else { 1860 // *fileDescriptor = DATA_TYPE_ILLEGAL; 1861 printf("writeheader - DATA_TYPE_ILLEGAL - %s\n",datatype); 1862 endTimer(&timer_end); 1863 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1864 return; 1865 } 1866 free(ts1); 1867 1868 PhastaIOActiveFiles[i]->part_count++; 1869 if ( PhastaIOActiveFiles[i]->part_count == PhastaIOActiveFiles[i]->nppp ) { 1870 //A new field will be written 1871 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1872 memcpy( PhastaIOActiveFiles[i]->master_header + 1873 PhastaIOActiveFiles[i]->field_count * 1874 MAX_FIELDS_NAME_LENGTH + 1875 MAX_FIELDS_NAME_LENGTH * 2, 1876 mpi_tag, 1877 MAX_FIELDS_NAME_LENGTH-1); 1878 } 1879 PhastaIOActiveFiles[i]->field_count++; 1880 PhastaIOActiveFiles[i]->part_count=0; 1881 } 1882 free(buffer); 1883 } 1884 1885 endTimer(&timer_end); 1886 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1887 } 1888 1889 void writedatablock( const int* fileDescriptor, 1890 const char keyphrase[], 1891 const void* valueArray, 1892 const int* nItems, 1893 const char datatype[], 1894 const char iotype[] ) 1895 { 1896 //if(irank == 0) printf("entering writedatablock()\n"); 1897 1898 unsigned long long data_size = 0; 1899 double timer_start, timer_end; 1900 startTimer(&timer_start); 1901 1902 int i = *fileDescriptor; 1903 checkFileDescriptor("writedatablock",&i); 1904 1905 if ( PhastaIONextActiveIndex == 0 ) { 1906 int filePtr = *fileDescriptor - 1; 1907 1908 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1909 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1910 fprintf(stderr,"openfile_ function has to be called before \n") ; 1911 fprintf(stderr,"acessing the file\n ") ; 1912 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1913 endTimer(&timer_end); 1914 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1915 return; 1916 } 1917 // since we require that a consistant header always preceed the data block 1918 // let us check to see that it is actually the case. 1919 1920 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1921 fprintf(stderr, "Header not consistant with data block\n"); 1922 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1923 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1924 fprintf(stderr, "Please recheck write sequence \n"); 1925 if( Strict_Error ) { 1926 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1927 endTimer(&timer_end); 1928 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1929 return; 1930 } 1931 } 1932 1933 FILE* fileObject = fileArray[ filePtr ] ; 1934 size_t type_size=typeSize( datatype ); 1935 isBinary( iotype ); 1936 1937 LastHeaderKey.erase(filePtr); 1938 1939 if ( header_type[filePtr] != (int)type_size ) { 1940 fprintf(stderr,"header and datablock differ on typeof data in the block for\n"); 1941 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1942 if( Strict_Error ) { 1943 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1944 endTimer(&timer_end); 1945 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1946 return; 1947 } 1948 } 1949 1950 int nUnits = *nItems; 1951 1952 if ( nUnits != DataSize ) { 1953 fprintf(stderr,"header and datablock differ on number of data items for\n"); 1954 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1955 if( Strict_Error ) { 1956 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1957 endTimer(&timer_end); 1958 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1959 return; 1960 } 1961 } 1962 1963 if ( binary_format ) { 1964 1965 fwrite( valueArray, type_size, nUnits, fileObject ); 1966 fprintf( fileObject,"\n"); 1967 1968 } else { 1969 1970 char* ts1 = StringStripper( datatype ); 1971 if ( cscompare( "integer", ts1 ) ) { 1972 for( int n=0; n < nUnits ; n++ ) 1973 fprintf(fileObject,"%d\n",*((int*)((int*)valueArray+n))); 1974 } else if ( cscompare( "double", ts1 ) ) { 1975 for( int n=0; n < nUnits ; n++ ) 1976 fprintf(fileObject,"%lf\n",*((double*)((double*)valueArray+n))); 1977 } 1978 free (ts1); 1979 } 1980 //return ; 1981 } 1982 else { // syncIO case 1983 MPI_Status write_data_status; 1984 isBinary( iotype ); 1985 int nUnits = *nItems; 1986 1987 //MR CHANGE 1988 // if ( cscompare(datatype,"double") ) 1989 char* ts1 = StringStripper( datatype ); 1990 if ( cscompare("double",ts1) ) 1991 //MR CHANGE END 1992 { 1993 memcpy((PhastaIOActiveFiles[i]->double_chunk+DB_HEADER_SIZE/sizeof(double)), valueArray, nUnits*sizeof(double)); 1994 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1995 PhastaIOActiveFiles[i]->my_offset, 1996 PhastaIOActiveFiles[i]->double_chunk, 1997 //BLOCK_SIZE/sizeof(double), 1998 nUnits+DB_HEADER_SIZE/sizeof(double), 1999 MPI_DOUBLE ); 2000 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 2001 PhastaIOActiveFiles[i]->double_chunk, 2002 &write_data_status ); 2003 data_size=8*nUnits; 2004 } 2005 //MR CHANGE 2006 // else if ( cscompare ( datatype, "integer")) 2007 else if ( cscompare("integer",ts1) ) 2008 //MR CHANGE END 2009 { 2010 memcpy((PhastaIOActiveFiles[i]->int_chunk+DB_HEADER_SIZE/sizeof(int)), valueArray, nUnits*sizeof(int)); 2011 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 2012 PhastaIOActiveFiles[i]->my_offset, 2013 PhastaIOActiveFiles[i]->int_chunk, 2014 nUnits+DB_HEADER_SIZE/sizeof(int), 2015 MPI_INT ); 2016 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 2017 PhastaIOActiveFiles[i]->int_chunk, 2018 &write_data_status ); 2019 data_size=4*nUnits; 2020 } 2021 else { 2022 printf("Error: writedatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 2023 endTimer(&timer_end); 2024 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 2025 return; 2026 } 2027 free(ts1); 2028 } 2029 2030 endTimer(&timer_end); 2031 char extra_msg[1024]; 2032 memset(extra_msg, '\0', 1024); 2033 char* key = StringStripper(keyphrase); 2034 sprintf(extra_msg, " field is %s ", key); 2035 printPerf("writedatablock", timer_start, timer_end, data_size, 1, extra_msg); 2036 free(key); 2037 2038 } 2039 2040 void 2041 SwapArrayByteOrder( void* array, 2042 int nbytes, 2043 int nItems ) 2044 { 2045 /* This swaps the byte order for the array of nItems each 2046 of size nbytes , This will be called only locally */ 2047 int i,j; 2048 unsigned char* ucDst = (unsigned char*)array; 2049 2050 for(i=0; i < nItems; i++) { 2051 for(j=0; j < (nbytes/2); j++) 2052 std::swap( ucDst[j] , ucDst[(nbytes - 1) - j] ); 2053 ucDst += nbytes; 2054 } 2055 } 2056 2057 void 2058 writestring( int* fileDescriptor, 2059 const char inString[] ) 2060 { 2061 2062 int filePtr = *fileDescriptor - 1; 2063 FILE* fileObject = fileArray[filePtr] ; 2064 fprintf(fileObject,"%s",inString ); 2065 return; 2066 } 2067 2068 void 2069 Gather_Headers( int* fileDescriptor, 2070 vector< string >& headers ) 2071 { 2072 2073 FILE* fileObject; 2074 char Line[1024]; 2075 2076 fileObject = fileArray[ (*fileDescriptor)-1 ]; 2077 2078 while( !feof(fileObject) ) { 2079 fgets( Line, 1024, fileObject); 2080 if ( Line[0] == '#' ) { 2081 headers.push_back( Line ); 2082 } else { 2083 break; 2084 } 2085 } 2086 rewind( fileObject ); 2087 clearerr( fileObject ); 2088 } 2089 void 2090 isWrong( void ) { (Wrong_Endian) ? fprintf(stdout,"YES\n"): fprintf(stdout,"NO\n") ; } 2091 2092 void 2093 togglestrictmode( void ) { Strict_Error = !Strict_Error; } 2094 2095 int 2096 isLittleEndian( void ) 2097 { 2098 // this function returns a 1 if the current running architecture is 2099 // LittleEndian Byte Ordered, else it returns a zero 2100 2101 union { 2102 long a; 2103 char c[sizeof( long )]; 2104 } endianUnion; 2105 2106 endianUnion.a = 1 ; 2107 2108 if ( endianUnion.c[sizeof(long)-1] != 1 ) return 1 ; 2109 else return 0; 2110 } 2111 2112 namespace PHASTA { 2113 const char* const PhastaIO_traits<int>::type_string = "integer"; 2114 const char* const PhastaIO_traits<double>::type_string = "double"; 2115 } 2116