1 /* 2 * 3 * This is the SyncIO library that uses MPI-IO collective functions to 4 * implement a flexible I/O checkpoint solution for a large number of 5 * processors. 6 * 7 * Previous developer: Ning Liu (liun2@cs.rpi.edu) 8 * Jing Fu (fuj@cs.rpi.edu) 9 * Current developers: Michel Rasquin (Michel.Rasquin@colorado.edu), 10 * Ben Matthews (benjamin.a.matthews@colorado.edu) 11 * 12 */ 13 14 #include <map> 15 #include <vector> 16 #include <string> 17 #include <string.h> 18 #include <ctype.h> 19 #include <stdlib.h> 20 #include <stdio.h> 21 #include <math.h> 22 #include <sstream> 23 #include "phastaIO.h" 24 #include "mpi.h" 25 #include "phiotmrc.h" 26 27 #define VERSION_INFO_HEADER_SIZE 8192 28 #define DB_HEADER_SIZE 1024 29 #define ONE_MEGABYTE 1048576 30 #define TWO_MEGABYTE 2097152 31 #define ENDIAN_TEST_NUMBER 12180 // Troy's Zip Code!! 32 #define MAX_PHASTA_FILES 64 33 #define MAX_PHASTA_FILE_NAME_LENGTH 1024 34 #define MAX_FIELDS_NUMBER ((VERSION_INFO_HEADER_SIZE/MAX_FIELDS_NAME_LENGTH)-4) // The meta data include - MPI_IO_Tag, nFields, nFields*names of the fields, nppf 35 // -3 for MPI_IO_Tag, nFields and nppf, -4 for extra security (former nFiles) 36 #define MAX_FIELDS_NAME_LENGTH 128 37 #define DefaultMHSize (4*ONE_MEGABYTE) 38 //#define DefaultMHSize (8350) //For test 39 #define LATEST_WRITE_VERSION 1 40 #define inv1024sq 953.674316406e-9 // = 1/1024/1024 41 int MasterHeaderSize = -1; 42 43 bool PRINT_PERF = false; // default print no perf results 44 int irank = -1; // global rank, should never be manually manipulated 45 int mysize = -1; 46 47 // Static variables are bad but used here to store the subcommunicator and associated variables 48 // Prevent MPI_Comm_split to be called more than once, especially on BGQ with the V1R2M1 driver (leak detected in MPI_Comm_split - IBM working on it) 49 static int s_assign_local_comm = 0; 50 static MPI_Comm s_local_comm; 51 static int s_local_size = -1; 52 static int s_local_rank = -1; 53 54 //unsigned long long pool_align = 8; 55 //unsigned long long mem_address; 56 57 // the following defines are for debug printf 58 #define PHASTAIO_PREFIX "phastaIO debug: " 59 #define PHASTAIO_DEBUG 0 //default to not print any debugging info 60 61 #if PHASTAIO_DEBUG 62 #define phprintf( s, arg...) printf(PHASTAIO_PREFIX s "\n", ##arg) 63 #define phprintf_0( s, arg...) do{ \ 64 MPI_Comm_rank(MPI_COMM_WORLD, &irank); \ 65 if(irank == 0){ \ 66 printf(PHASTAIO_PREFIX "irank=0: " s "\n", ##arg); \ 67 } \ 68 } while(0) 69 #else 70 #define phprintf( s, arg...) 71 #define phprintf_0( s, arg...) 72 #endif 73 74 enum PhastaIO_Errors 75 { 76 MAX_PHASTA_FILES_EXCEEDED = -1, 77 UNABLE_TO_OPEN_FILE = -2, 78 NOT_A_MPI_FILE = -3, 79 GPID_EXCEEDED = -4, 80 DATA_TYPE_ILLEGAL = -5, 81 }; 82 83 using namespace std; 84 85 namespace{ 86 87 map<int, std::string> LastHeaderKey; 88 vector< FILE* > fileArray; 89 vector< bool > byte_order; 90 vector< int > header_type; 91 int DataSize=0; 92 bool LastHeaderNotFound = false; 93 bool Wrong_Endian = false ; 94 bool Strict_Error = false ; 95 bool binary_format = true; 96 97 /***********************************************************************/ 98 /***************** NEW PHASTA IO CODE STARTS HERE **********************/ 99 /***********************************************************************/ 100 101 typedef struct 102 { 103 char filename[MAX_PHASTA_FILE_NAME_LENGTH]; /* defafults to 1024 */ 104 unsigned long long my_offset; 105 unsigned long long next_start_address; 106 unsigned long long **my_offset_table; 107 unsigned long long **my_read_table; 108 109 double * double_chunk; 110 double * read_double_chunk; 111 112 int field_count; 113 int part_count; 114 int read_field_count; 115 int read_part_count; 116 int GPid; 117 int start_id; 118 119 int mhsize; 120 121 int myrank; 122 int numprocs; 123 int local_myrank; 124 int local_numprocs; 125 126 int nppp; 127 int nPPF; 128 int nFiles; 129 int nFields; 130 131 int * int_chunk; 132 int * read_int_chunk; 133 134 int Wrong_Endian; /* default to false */ 135 char * master_header; 136 MPI_File file_handle; 137 MPI_Comm local_comm; 138 } phastaio_file_t; 139 140 typedef struct 141 { 142 //unsigned long long my_offset; 143 //unsigned long long **offset_table; 144 //int fileID; 145 int nppf, nfields; 146 //int GPid; 147 //int read_field_count; 148 char * masterHeader; 149 }serial_file; 150 151 serial_file *SerialFile; 152 phastaio_file_t *PhastaIOActiveFiles[MAX_PHASTA_FILES]; 153 int PhastaIONextActiveIndex = 0; /* indicates next index to allocate */ 154 155 // the caller has the responsibility to delete the returned string 156 // TODO: StringStipper("nbc value? ") returns NULL? 157 char* 158 StringStripper( const char istring[] ) { 159 160 int length = strlen( istring ); 161 162 //char* dest = new char [ length + 1 ]; 163 char* dest = (char *)malloc( length + 1 ); 164 165 //char * dest; 166 //dest = (char *)malloc( length + 1 + pool_align ); 167 //if (dest & 0x0F != 0) printf("not aligned!!!!\n"); 168 //mem_address = (long long )dest; 169 //if( mem_address & (pool_align -1) ) 170 // dest += pool_align - (mem_address & (pool_align -1)); 171 172 strcpy( dest, istring ); 173 dest[ length ] = '\0'; 174 175 if ( char* p = strpbrk( dest, " ") ) 176 *p = '\0'; 177 178 return dest; 179 } 180 181 inline int 182 cscompare( const char teststring[], 183 const char targetstring[] ) { 184 185 char* s1 = const_cast<char*>(teststring); 186 char* s2 = const_cast<char*>(targetstring); 187 188 while( *s1 == ' ') s1++; 189 while( *s2 == ' ') s2++; 190 while( ( *s1 ) 191 && ( *s2 ) 192 && ( *s2 != '?') 193 && ( tolower( *s1 )==tolower( *s2 ) ) ) { 194 s1++; 195 s2++; 196 while( *s1 == ' ') s1++; 197 while( *s2 == ' ') s2++; 198 } 199 if ( !( *s1 ) || ( *s1 == '?') ) return 1; 200 else return 0; 201 } 202 203 inline void 204 isBinary( const char iotype[] ) { 205 206 char* fname = StringStripper( iotype ); 207 if ( cscompare( fname, "binary" ) ) binary_format = true; 208 else binary_format = false; 209 free (fname); 210 211 } 212 213 inline size_t 214 typeSize( const char typestring[] ) { 215 216 char* ts1 = StringStripper( typestring ); 217 218 if ( cscompare( "integer", ts1 ) ) { 219 free (ts1); 220 return sizeof(int); 221 } else if ( cscompare( "double", ts1 ) ) { 222 free (ts1); 223 return sizeof( double ); 224 } else { 225 free (ts1); 226 fprintf(stderr,"unknown type : %s\n",ts1); 227 return 0; 228 } 229 } 230 231 int 232 readHeader( FILE* fileObject, 233 const char phrase[], 234 int* params, 235 int expect ) { 236 237 char* text_header; 238 char* token; 239 char Line[1024]; 240 char junk; 241 bool FOUND = false ; 242 int real_length; 243 int skip_size, integer_value; 244 int rewind_count=0; 245 246 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 247 rewind( fileObject ); 248 clearerr( fileObject ); 249 rewind_count++; 250 fgets( Line, 1024, fileObject ); 251 } 252 253 while( !FOUND && ( rewind_count < 2 ) ) { 254 if ( ( Line[0] != '\n' ) && ( real_length = strcspn( Line, "#" )) ){ 255 //text_header = new char [ real_length + 1 ]; 256 text_header = (char *)malloc( real_length + 1 ); 257 //text_header = (char *)malloc( real_length + 1 + pool_align ); 258 //mem_address = (long long )text_header; 259 //if( mem_address & (pool_align -1) ) 260 // text_header += pool_align - (mem_address & (pool_align -1)); 261 262 strncpy( text_header, Line, real_length ); 263 text_header[ real_length ] =static_cast<char>(NULL); 264 token = strtok ( text_header, ":" ); 265 //if( cscompare( phrase , token ) ) { 266 // Double comparison required because different fields can still start 267 // with the same name for mixed meshes (nbc code, nbc values, etc). 268 // Would be easy to fix cscompare instead but would it break sth else? 269 if( cscompare( phrase , token ) && cscompare( token , phrase ) ) { 270 FOUND = true ; 271 token = strtok( NULL, " ,;<>" ); 272 skip_size = atoi( token ); 273 int i; 274 for( i=0; i < expect && ( token = strtok( NULL," ,;<>") ); i++) { 275 params[i] = atoi( token ); 276 } 277 if ( i < expect ) { 278 fprintf(stderr,"Aloha Expected # of ints not found for: %s\n",phrase ); 279 } 280 } else if ( cscompare(token,"byteorder magic number") ) { 281 if ( binary_format ) { 282 fread((void*)&integer_value,sizeof(int),1,fileObject); 283 fread( &junk, sizeof(char), 1 , fileObject ); 284 if ( 362436 != integer_value ) Wrong_Endian = true; 285 } else{ 286 fscanf(fileObject, "%d\n", &integer_value ); 287 } 288 } else { 289 /* some other header, so just skip over */ 290 token = strtok( NULL, " ,;<>" ); 291 skip_size = atoi( token ); 292 if ( binary_format) 293 fseek( fileObject, skip_size, SEEK_CUR ); 294 else 295 for( int gama=0; gama < skip_size; gama++ ) 296 fgets( Line, 1024, fileObject ); 297 } 298 free (text_header); 299 } // end of if before while loop 300 301 if ( !FOUND ) 302 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 303 rewind( fileObject ); 304 clearerr( fileObject ); 305 rewind_count++; 306 fgets( Line, 1024, fileObject ); 307 } 308 } 309 310 if ( !FOUND ) { 311 //fprintf(stderr, "Error: Could not find: %s\n", phrase); 312 if(irank == 0) printf("WARNING: Could not find: %s\n", phrase); 313 return 1; 314 } 315 return 0; 316 } 317 318 } // end unnamed namespace 319 320 321 // begin of publicly visible functions 322 323 /** 324 * This function takes a long long pointer and assign (start) phiotmrc value to it 325 */ 326 //void startTimer(unsigned long long* start) { 327 void startTimer(double* start) { 328 329 if( !PRINT_PERF ) return; 330 331 MPI_Barrier(MPI_COMM_WORLD); 332 *start = phiotmrc(); 333 } 334 335 /** 336 * This function takes a long long pointer and assign (end) phiotmrc value to it 337 */ 338 void endTimer(double* end) { 339 340 if( !PRINT_PERF ) return; 341 342 *end = phiotmrc(); 343 MPI_Barrier(MPI_COMM_WORLD); 344 } 345 346 /** 347 * choose to print some performance results (or not) according to 348 * the PRINT_PERF macro 349 */ 350 void printPerf( 351 const char* func_name, 352 double start, 353 double end, 354 unsigned long long datasize, 355 int printdatainfo, 356 const char* extra_msg) { 357 358 if( !PRINT_PERF ) return; 359 360 unsigned long long data_size = datasize; 361 362 double time = end - start; 363 364 unsigned long long isizemin,isizemax,isizetot; 365 double sizemin,sizemax,sizeavg,sizetot,rate; 366 double tmin, tmax, tavg, ttot; 367 368 MPI_Allreduce(&time, &tmin,1, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD); 369 MPI_Allreduce(&time, &tmax,1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); 370 MPI_Allreduce(&time, &ttot,1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); 371 tavg = ttot/mysize; 372 373 if(irank == 0) { 374 if ( PhastaIONextActiveIndex == 0 ) printf("** 1PFPP "); 375 else printf("** syncIO "); 376 printf("%s(): Tmax = %f sec, Tmin = %f sec, Tavg = %f sec", func_name, tmax, tmin, tavg); 377 } 378 379 if(printdatainfo == 1) { // if printdatainfo ==1, compute I/O rate and block size 380 MPI_Allreduce(&data_size,&isizemin,1,MPI_LONG_LONG_INT,MPI_MIN,MPI_COMM_WORLD); 381 MPI_Allreduce(&data_size,&isizemax,1,MPI_LONG_LONG_INT,MPI_MAX,MPI_COMM_WORLD); 382 MPI_Allreduce(&data_size,&isizetot,1,MPI_LONG_LONG_INT,MPI_SUM,MPI_COMM_WORLD); 383 384 sizemin=(double)(isizemin*inv1024sq); 385 sizemax=(double)(isizemax*inv1024sq); 386 sizetot=(double)(isizetot*inv1024sq); 387 sizeavg=(double)(1.0*sizetot/mysize); 388 rate=(double)(1.0*sizetot/tmax); 389 390 if( irank == 0) { 391 printf(", Rate = %f MB/s [%s] \n \t\t\t block size: Min= %f MB; Max= %f MB; Avg= %f MB; Tot= %f MB\n", rate, extra_msg, sizemin,sizemax,sizeavg,sizetot); 392 } 393 } 394 else { 395 if(irank == 0) { 396 printf(" \n"); 397 //printf(" (%s) \n", extra_msg); 398 } 399 } 400 } 401 402 /** 403 * This function is normally called at the beginning of a read operation, before 404 * init function. 405 * This function (uses rank 0) reads out nfields, nppf, master header size, 406 * endianess and allocates for masterHeader string. 407 * These values are essential for following read operations. Rank 0 will bcast 408 * these values to other ranks in the commm world 409 * 410 * If the file set is of old POSIX format, it would throw error and exit 411 */ 412 void queryphmpiio(const char filename[],int *nfields, int *nppf) 413 { 414 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 415 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 416 417 if(irank == 0) { 418 FILE * fileHandle; 419 char* fname = StringStripper( filename ); 420 421 fileHandle = fopen (fname,"rb"); 422 if (fileHandle == NULL ) { 423 printf("\nError: File %s doesn't exist! Please check!\n",fname); 424 } 425 else { 426 SerialFile =(serial_file *)calloc( 1, sizeof( serial_file) ); 427 //SerialFile = (serial_file *)malloc( sizeof( serial_file ) + pool_align ); 428 //mem_address = (long long )SerialFile; 429 //if( mem_address & (pool_align -1) ) 430 // SerialFile += pool_align - (mem_address & (pool_align -1)); 431 432 //SerialFile->masterHeader = (char *)malloc(MasterHeaderSize); 433 //int meta_size_limit = 4*ONE_MEGABYTE; 434 //int meta_size_limit = (4+ MAX_FIELDS_NUMBER ) * MAX_FIELDS_NAME_LENGTH; 435 int meta_size_limit = VERSION_INFO_HEADER_SIZE; 436 SerialFile->masterHeader = (char *)malloc( meta_size_limit ); 437 //SerialFile->masterHeader = (char *)malloc( meta_size_limit + pool_align ); 438 //mem_address = (long long )SerialFile; 439 //if( mem_address & (pool_align -1) ) 440 // SerialFile->masterHeader += pool_align - (mem_address & (pool_align -1)); 441 442 fread(SerialFile->masterHeader, 1, meta_size_limit, fileHandle); 443 444 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 445 char version[MAX_FIELDS_NAME_LENGTH/4]; 446 int mhsize; 447 char * token; 448 int magic_number; 449 450 memcpy( read_out_tag, 451 SerialFile->masterHeader, 452 MAX_FIELDS_NAME_LENGTH-1 ); 453 454 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) { 455 // Test endianess ... 456 memcpy (&magic_number, 457 SerialFile->masterHeader + sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 458 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 459 460 if ( magic_number != ENDIAN_TEST_NUMBER ) { 461 printf("Endian is different!\n"); 462 // Will do swap later 463 } 464 465 // test version, old version, default masterheader size is 4M 466 // newer version masterheader size is read from first line 467 memcpy(version, 468 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/2, 469 MAX_FIELDS_NAME_LENGTH/4 - 1); //TODO: why -1? 470 471 if( cscompare ("version",version) ) { 472 // if there is "version" tag in the file, then it is newer format 473 // read master header size from here, otherwise use default 474 // Note: if version is "1", we know mhsize is at 3/4 place... 475 476 token = strtok(version, ":"); 477 token = strtok(NULL, " ,;<>" ); 478 int iversion = atoi(token); 479 480 if( iversion == 1) { 481 memcpy( &mhsize, 482 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/4*3 + sizeof("mhsize : ")-1, 483 sizeof(int)); 484 if ( magic_number != ENDIAN_TEST_NUMBER ) 485 SwapArrayByteOrder(&mhsize, sizeof(int), 1); 486 487 if( mhsize > DefaultMHSize ) { 488 //if actual headersize is larger than default, let's re-read 489 free(SerialFile->masterHeader); 490 SerialFile->masterHeader = (char *)malloc(mhsize); 491 fseek(fileHandle, 0, SEEK_SET); // reset the file stream position 492 fread(SerialFile->masterHeader,1,mhsize,fileHandle); 493 } 494 } 495 //TODO: check if this is a valid int?? 496 MasterHeaderSize = mhsize; 497 } 498 else { // else it's version 0's format w/o version tag, implicating MHSize=4M 499 MasterHeaderSize = DefaultMHSize; 500 } 501 502 memcpy( read_out_tag, 503 SerialFile->masterHeader+MAX_FIELDS_NAME_LENGTH+1, 504 MAX_FIELDS_NAME_LENGTH ); //TODO: why +1 505 506 // Read in # fields ... 507 token = strtok( read_out_tag, ":" ); 508 token = strtok( NULL," ,;<>" ); 509 *nfields = atoi( token ); 510 if ( *nfields > MAX_FIELDS_NUMBER) { 511 printf("Error queryphmpiio: nfields is larger than MAX_FIELDS_NUMBER!\n"); 512 } 513 SerialFile->nfields=*nfields; //TODO: sanity check of this int? 514 515 memcpy( read_out_tag, 516 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH * 2 517 + *nfields * MAX_FIELDS_NAME_LENGTH, 518 MAX_FIELDS_NAME_LENGTH); 519 520 token = strtok( read_out_tag, ":" ); 521 token = strtok( NULL," ,;<>" ); 522 *nppf = atoi( token ); 523 SerialFile->nppf=*nppf; //TODO: sanity check of int 524 } // end of if("MPI_IO_TAG") 525 else { 526 printf("Error queryphmpiio: The file you opened is not of syncIO new format, please check! read_out_tag = %s\n",read_out_tag); 527 exit(1); 528 } 529 fclose(fileHandle); 530 free(SerialFile->masterHeader); 531 free(SerialFile); 532 } //end of else 533 free(fname); 534 } 535 536 // Bcast value to every one 537 MPI_Bcast( nfields, 1, MPI_INT, 0, MPI_COMM_WORLD ); 538 MPI_Bcast( nppf, 1, MPI_INT, 0, MPI_COMM_WORLD ); 539 MPI_Bcast( &MasterHeaderSize, 1, MPI_INT, 0, MPI_COMM_WORLD ); 540 phprintf("Info queryphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 541 } 542 543 /** 544 * This function computes the right master header size (round to size of 2^n). 545 * This is only needed for file format version 1 in "write" mode. 546 */ 547 int computeMHSize(int nfields, int nppf, int version) { 548 int mhsize; 549 if(version == 1) { 550 //int meta_info_size = (2+nfields+1) * MAX_FIELDS_NAME_LENGTH; // 2 is MPI_IO_TAG and nFields, the others 1 is nppf 551 int meta_info_size = VERSION_INFO_HEADER_SIZE; 552 int actual_size = nfields * nppf * sizeof(long long) + meta_info_size; 553 //printf("actual_size = %d, offset table size = %d\n", actual_size, nfields * nppf * sizeof(long long)); 554 if (actual_size > DefaultMHSize) { 555 mhsize = (int) ceil( (double) actual_size/DefaultMHSize); // it's rounded to ceiling of this value 556 mhsize *= DefaultMHSize; 557 } 558 else { 559 mhsize = DefaultMHSize; 560 } 561 } 562 return mhsize; 563 } 564 565 /** 566 * Computes correct color of a rank according to number of files. 567 */ 568 extern "C" int computeColor( int myrank, int numprocs, int nfiles) { 569 int color = 570 (int)(myrank / (numprocs / nfiles)); 571 return color; 572 } 573 574 575 /** 576 * Check the file descriptor. 577 */ 578 void checkFileDescriptor(const char fctname[], 579 int* fileDescriptor ) { 580 if ( *fileDescriptor < 0 ) { 581 printf("Error: File descriptor = %d in %s\n",*fileDescriptor,fctname); 582 exit(1); 583 } 584 } 585 586 /** 587 * Initialize the file struct members and allocate space for file struct 588 * buffers. 589 * 590 * Note: this function is only called when we are using new format. Old POSIX 591 * format should skip this routine and call openfile() directly instead. 592 */ 593 int initphmpiio( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[]) 594 { 595 // we init irank again in case query not called (e.g. syncIO write case) 596 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 597 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 598 599 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 600 601 double timer_start, timer_end; 602 startTimer(&timer_start); 603 604 char* imode = StringStripper( mode ); 605 606 // Note: if it's read, we presume query was called prior to init and 607 // MasterHeaderSize is already set to correct value from parsing header 608 // otherwise it's write then it needs some computation to be set 609 if ( cscompare( "read", imode ) ) { 610 // do nothing 611 } 612 else if( cscompare( "write", imode ) ) { 613 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 614 } 615 else { 616 printf("Error initphmpiio: can't recognize the mode %s", imode); 617 exit(1); 618 } 619 free ( imode ); 620 621 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 622 623 int i, j; 624 625 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 626 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 627 endTimer(&timer_end); 628 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 629 return MAX_PHASTA_FILES_EXCEEDED; 630 } 631 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 632 // { 633 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 634 // { 635 // PhastaIOActiveFiles[i] = NULL; 636 // } 637 // } 638 639 640 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 641 //PhastaIOActiveFiles[PhastaIONextActiveIndex] = ( phastaio_file_t *)calloc( 1 + 1, sizeof( phastaio_file_t ) ); 642 //mem_address = (long long )PhastaIOActiveFiles[PhastaIONextActiveIndex]; 643 //if( mem_address & (pool_align -1) ) 644 // PhastaIOActiveFiles[PhastaIONextActiveIndex] += pool_align - (mem_address & (pool_align -1)); 645 646 i = PhastaIONextActiveIndex; 647 PhastaIONextActiveIndex++; 648 649 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 650 651 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 652 653 PhastaIOActiveFiles[i]->Wrong_Endian = false; 654 655 PhastaIOActiveFiles[i]->nFields = *nfields; 656 PhastaIOActiveFiles[i]->nPPF = *nppf; 657 PhastaIOActiveFiles[i]->nFiles = *nfiles; 658 MPI_Comm_rank(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->myrank)); 659 MPI_Comm_size(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->numprocs)); 660 661 662 if( *nfiles > 1 ) { // split the ranks according to each mpiio file 663 664 if ( s_assign_local_comm == 0) { // call mpi_comm_split for the first (and only) time 665 666 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Building subcommunicator\n"); 667 668 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 669 MPI_Comm_split(MPI_COMM_WORLD, 670 color, 671 PhastaIOActiveFiles[i]->myrank, 672 &(PhastaIOActiveFiles[i]->local_comm)); 673 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 674 &(PhastaIOActiveFiles[i]->local_numprocs)); 675 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 676 &(PhastaIOActiveFiles[i]->local_myrank)); 677 678 // back up now these variables so that we do not need to call comm_split again 679 s_local_comm = PhastaIOActiveFiles[i]->local_comm; 680 s_local_size = PhastaIOActiveFiles[i]->local_numprocs; 681 s_local_rank = PhastaIOActiveFiles[i]->local_myrank; 682 s_assign_local_comm = 1; 683 } 684 else { // recycle the subcommunicator 685 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Recycling subcommunicator\n"); 686 PhastaIOActiveFiles[i]->local_comm = s_local_comm; 687 PhastaIOActiveFiles[i]->local_numprocs = s_local_size; 688 PhastaIOActiveFiles[i]->local_myrank = s_local_rank; 689 } 690 } 691 else { // *nfiles == 1 here - no need to call mpi_comm_split here 692 693 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Bypassing subcommunicator\n"); 694 PhastaIOActiveFiles[i]->local_comm = MPI_COMM_WORLD; 695 PhastaIOActiveFiles[i]->local_numprocs = PhastaIOActiveFiles[i]->numprocs; 696 PhastaIOActiveFiles[i]->local_myrank = PhastaIOActiveFiles[i]->myrank; 697 698 } 699 700 PhastaIOActiveFiles[i]->nppp = 701 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 702 703 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 704 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 705 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 706 707 PhastaIOActiveFiles[i]->my_offset_table = 708 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 709 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof(unsigned long long *) ); 710 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table; 711 //if( mem_address & (pool_align -1) ) 712 // PhastaIOActiveFiles[i]->my_offset_table += pool_align - (mem_address & (pool_align -1)); 713 714 PhastaIOActiveFiles[i]->my_read_table = 715 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 716 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof( unsigned long long *) ); 717 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table; 718 //if( mem_address & (pool_align -1) ) 719 // PhastaIOActiveFiles[i]->my_read_table += pool_align - (mem_address & (pool_align -1)); 720 721 for (j=0; j<*nfields; j++) 722 { 723 PhastaIOActiveFiles[i]->my_offset_table[j] = 724 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 725 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp + pool_align, sizeof( unsigned long long) ); 726 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table[j]; 727 //if( mem_address & (pool_align -1) ) 728 // PhastaIOActiveFiles[i]->my_offset_table[j] += pool_align - (mem_address & (pool_align -1)); 729 730 PhastaIOActiveFiles[i]->my_read_table[j] = 731 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 732 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) + pool_align ); 733 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table[j]; 734 //if( mem_address & (pool_align -1) ) 735 // PhastaIOActiveFiles[i]->my_read_table[j] += pool_align - (mem_address & (pool_align -1)); 736 } 737 *filehandle = i; 738 739 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 740 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 741 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 742 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 743 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 744 745 /* 746 PhastaIOActiveFiles[i]->master_header = 747 ( char * ) calloc( MasterHeaderSize + pool_align, sizeof( char ) ); 748 mem_address = (long long )PhastaIOActiveFiles[i]->master_header; 749 if( mem_address & (pool_align -1) ) 750 PhastaIOActiveFiles[i]->master_header += pool_align - (mem_address & (pool_align -1)); 751 752 PhastaIOActiveFiles[i]->double_chunk = 753 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 754 mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 755 if( mem_address & (pool_align -1) ) 756 PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 757 758 PhastaIOActiveFiles[i]->int_chunk = 759 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 760 mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 761 if( mem_address & (pool_align -1) ) 762 PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & (pool_align -1)); 763 764 PhastaIOActiveFiles[i]->read_double_chunk = 765 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 766 mem_address = (long long )PhastaIOActiveFiles[i]->read_double_chunk; 767 if( mem_address & (pool_align -1) ) 768 PhastaIOActiveFiles[i]->read_double_chunk += pool_align - (mem_address & (pool_align -1)); 769 770 PhastaIOActiveFiles[i]->read_int_chunk = 771 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 772 mem_address = (long long )PhastaIOActiveFiles[i]->read_int_chunk; 773 if( mem_address & (pool_align -1) ) 774 PhastaIOActiveFiles[i]->read_int_chunk += pool_align - (mem_address & (pool_align -1)); 775 */ 776 777 // Time monitoring 778 endTimer(&timer_end); 779 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 780 781 phprintf_0("Info initphmpiio: quiting function"); 782 783 return i; 784 } 785 786 /** 787 * Destruct the file struct and free buffers allocated in init function. 788 */ 789 void finalizephmpiio( int *fileDescriptor ) 790 { 791 double timer_start, timer_end; 792 startTimer(&timer_start); 793 794 int i, j; 795 i = *fileDescriptor; 796 //PhastaIONextActiveIndex--; 797 798 /* //free the offset table for this phasta file */ 799 //for(j=0; j<MAX_FIELDS_NUMBER; j++) //Danger: undefined behavior for my_*_table.[j] not allocated or not initialized to NULL 800 for(j=0; j<PhastaIOActiveFiles[i]->nFields; j++) 801 { 802 free( PhastaIOActiveFiles[i]->my_offset_table[j]); 803 free( PhastaIOActiveFiles[i]->my_read_table[j]); 804 } 805 free ( PhastaIOActiveFiles[i]->my_offset_table ); 806 free ( PhastaIOActiveFiles[i]->my_read_table ); 807 free ( PhastaIOActiveFiles[i]->master_header ); 808 free ( PhastaIOActiveFiles[i]->double_chunk ); 809 free ( PhastaIOActiveFiles[i]->int_chunk ); 810 free ( PhastaIOActiveFiles[i]->read_double_chunk ); 811 free ( PhastaIOActiveFiles[i]->read_int_chunk ); 812 813 if( PhastaIOActiveFiles[i]->nFiles > 1 && s_assign_local_comm ) { // the comm was split 814 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Freeing subcommunicator\n"); 815 s_assign_local_comm = 0; 816 MPI_Comm_free(&(PhastaIOActiveFiles[i]->local_comm)); 817 } 818 819 free( PhastaIOActiveFiles[i]); 820 821 endTimer(&timer_end); 822 printPerf("finalizempiio", timer_start, timer_end, 0, 0, ""); 823 824 PhastaIONextActiveIndex--; 825 } 826 827 828 /** 829 * Special init for M2N in order to create a subcommunicator for the reduced solution (requires PRINT_PERF to be false for now) 830 * Initialize the file struct members and allocate space for file struct buffers. 831 * 832 * Note: this function is only called when we are using new format. Old POSIX 833 * format should skip this routine and call openfile() directly instead. 834 */ 835 int initphmpiiosub( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[],MPI_Comm my_local_comm) 836 { 837 // we init irank again in case query not called (e.g. syncIO write case) 838 839 MPI_Comm_rank(my_local_comm, &irank); 840 MPI_Comm_size(my_local_comm, &mysize); 841 842 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 843 844 double timer_start, timer_end; 845 startTimer(&timer_start); 846 847 char* imode = StringStripper( mode ); 848 849 // Note: if it's read, we presume query was called prior to init and 850 // MasterHeaderSize is already set to correct value from parsing header 851 // otherwise it's write then it needs some computation to be set 852 if ( cscompare( "read", imode ) ) { 853 // do nothing 854 } 855 else if( cscompare( "write", imode ) ) { 856 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 857 } 858 else { 859 printf("Error initphmpiio: can't recognize the mode %s", imode); 860 exit(1); 861 } 862 free ( imode ); 863 864 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 865 866 int i, j; 867 868 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 869 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 870 endTimer(&timer_end); 871 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 872 return MAX_PHASTA_FILES_EXCEEDED; 873 } 874 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 875 // { 876 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 877 // { 878 // PhastaIOActiveFiles[i] = NULL; 879 // } 880 // } 881 882 883 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 884 //PhastaIOActiveFiles[PhastaIONextActiveIndex] = ( phastaio_file_t *)calloc( 1 + 1, sizeof( phastaio_file_t ) ); 885 //mem_address = (long long )PhastaIOActiveFiles[PhastaIONextActiveIndex]; 886 //if( mem_address & (pool_align -1) ) 887 // PhastaIOActiveFiles[PhastaIONextActiveIndex] += pool_align - (mem_address & (pool_align -1)); 888 889 i = PhastaIONextActiveIndex; 890 PhastaIONextActiveIndex++; 891 892 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 893 894 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 895 896 PhastaIOActiveFiles[i]->Wrong_Endian = false; 897 898 PhastaIOActiveFiles[i]->nFields = *nfields; 899 PhastaIOActiveFiles[i]->nPPF = *nppf; 900 PhastaIOActiveFiles[i]->nFiles = *nfiles; 901 MPI_Comm_rank(my_local_comm, &(PhastaIOActiveFiles[i]->myrank)); 902 MPI_Comm_size(my_local_comm, &(PhastaIOActiveFiles[i]->numprocs)); 903 904 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 905 MPI_Comm_split(my_local_comm, 906 color, 907 PhastaIOActiveFiles[i]->myrank, 908 &(PhastaIOActiveFiles[i]->local_comm)); 909 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 910 &(PhastaIOActiveFiles[i]->local_numprocs)); 911 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 912 &(PhastaIOActiveFiles[i]->local_myrank)); 913 PhastaIOActiveFiles[i]->nppp = 914 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 915 916 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 917 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 918 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 919 920 PhastaIOActiveFiles[i]->my_offset_table = 921 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 922 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof(unsigned long long *) ); 923 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table; 924 //if( mem_address & (pool_align -1) ) 925 // PhastaIOActiveFiles[i]->my_offset_table += pool_align - (mem_address & (pool_align -1)); 926 927 PhastaIOActiveFiles[i]->my_read_table = 928 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 929 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof( unsigned long long *) ); 930 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table; 931 //if( mem_address & (pool_align -1) ) 932 // PhastaIOActiveFiles[i]->my_read_table += pool_align - (mem_address & (pool_align -1)); 933 934 for (j=0; j<*nfields; j++) 935 { 936 PhastaIOActiveFiles[i]->my_offset_table[j] = 937 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 938 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp + pool_align, sizeof( unsigned long long) ); 939 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table[j]; 940 //if( mem_address & (pool_align -1) ) 941 // PhastaIOActiveFiles[i]->my_offset_table[j] += pool_align - (mem_address & (pool_align -1)); 942 943 PhastaIOActiveFiles[i]->my_read_table[j] = 944 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 945 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) + pool_align ); 946 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table[j]; 947 //if( mem_address & (pool_align -1) ) 948 // PhastaIOActiveFiles[i]->my_read_table[j] += pool_align - (mem_address & (pool_align -1)); 949 } 950 *filehandle = i; 951 952 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 953 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 954 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 955 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 956 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 957 958 /* 959 PhastaIOActiveFiles[i]->master_header = 960 ( char * ) calloc( MasterHeaderSize + pool_align, sizeof( char ) ); 961 mem_address = (long long )PhastaIOActiveFiles[i]->master_header; 962 if( mem_address & (pool_align -1) ) 963 PhastaIOActiveFiles[i]->master_header += pool_align - (mem_address & (pool_align -1)); 964 965 PhastaIOActiveFiles[i]->double_chunk = 966 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 967 mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 968 if( mem_address & (pool_align -1) ) 969 PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 970 971 PhastaIOActiveFiles[i]->int_chunk = 972 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 973 mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 974 if( mem_address & (pool_align -1) ) 975 PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & (pool_align -1)); 976 977 PhastaIOActiveFiles[i]->read_double_chunk = 978 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 979 mem_address = (long long )PhastaIOActiveFiles[i]->read_double_chunk; 980 if( mem_address & (pool_align -1) ) 981 PhastaIOActiveFiles[i]->read_double_chunk += pool_align - (mem_address & (pool_align -1)); 982 983 PhastaIOActiveFiles[i]->read_int_chunk = 984 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 985 mem_address = (long long )PhastaIOActiveFiles[i]->read_int_chunk; 986 if( mem_address & (pool_align -1) ) 987 PhastaIOActiveFiles[i]->read_int_chunk += pool_align - (mem_address & (pool_align -1)); 988 */ 989 990 // Time monitoring 991 endTimer(&timer_end); 992 printPerf("initphmpiiosub", timer_start, timer_end, 0, 0, ""); 993 994 phprintf_0("Info initphmpiiosub: quiting function"); 995 996 return i; 997 } 998 999 1000 1001 /** open file for both POSIX and MPI-IO syncIO format. 1002 * 1003 * If it's old POSIX format, simply call posix fopen(). 1004 * 1005 * If it's MPI-IO foramt: 1006 * in "read" mode, it builds the header table that points to the offset of 1007 * fields for parts; 1008 * in "write" mode, it opens the file with MPI-IO open routine. 1009 */ 1010 void openfile(const char filename[], 1011 const char mode[], 1012 int* fileDescriptor ) 1013 { 1014 phprintf_0("Info: entering openfile"); 1015 1016 double timer_start, timer_end; 1017 startTimer(&timer_start); 1018 1019 if ( PhastaIONextActiveIndex == 0 ) 1020 { 1021 FILE* file=NULL ; 1022 *fileDescriptor = 0; 1023 char* fname = StringStripper( filename ); 1024 char* imode = StringStripper( mode ); 1025 1026 if ( cscompare( "read", imode ) ) file = fopen(fname, "rb" ); 1027 else if( cscompare( "write", imode ) ) file = fopen(fname, "wb" ); 1028 else if( cscompare( "append", imode ) ) file = fopen(fname, "ab" ); 1029 1030 if ( !file ){ 1031 fprintf(stderr,"Error openfile: unable to open file %s",fname ) ; 1032 } else { 1033 fileArray.push_back( file ); 1034 byte_order.push_back( false ); 1035 header_type.push_back( sizeof(int) ); 1036 *fileDescriptor = fileArray.size(); 1037 } 1038 free (fname); 1039 free (imode); 1040 } 1041 else // else it would be parallel I/O, opposed to posix io 1042 { 1043 char* fname = StringStripper( filename ); 1044 char* imode = StringStripper( mode ); 1045 int rc; 1046 int i = *fileDescriptor; 1047 checkFileDescriptor("openfile",&i); 1048 char* token; 1049 1050 if ( cscompare( "read", imode ) ) 1051 { 1052 // if (PhastaIOActiveFiles[i]->myrank == 0) 1053 // printf("\n **********\nRead open ... ... regular version\n"); 1054 1055 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1056 fname, 1057 MPI_MODE_RDONLY, 1058 MPI_INFO_NULL, 1059 &(PhastaIOActiveFiles[i]->file_handle) ); 1060 1061 if(rc) 1062 { 1063 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1064 printf("Error openfile: Unable to open file %s! File descriptor = %d\n",fname,*fileDescriptor); 1065 endTimer(&timer_end); 1066 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1067 return; 1068 } 1069 1070 MPI_Status read_tag_status; 1071 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1072 int j; 1073 int magic_number; 1074 1075 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1076 MPI_File_read_at( PhastaIOActiveFiles[i]->file_handle, 1077 0, 1078 PhastaIOActiveFiles[i]->master_header, 1079 MasterHeaderSize, 1080 MPI_CHAR, 1081 &read_tag_status ); 1082 } 1083 1084 MPI_Bcast( PhastaIOActiveFiles[i]->master_header, 1085 MasterHeaderSize, 1086 MPI_CHAR, 1087 0, 1088 PhastaIOActiveFiles[i]->local_comm ); 1089 1090 memcpy( read_out_tag, 1091 PhastaIOActiveFiles[i]->master_header, 1092 MAX_FIELDS_NAME_LENGTH-1 ); 1093 1094 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) 1095 { 1096 // Test endianess ... 1097 memcpy ( &magic_number, 1098 PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1099 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 1100 1101 if ( magic_number != ENDIAN_TEST_NUMBER ) 1102 { 1103 PhastaIOActiveFiles[i]->Wrong_Endian = true; 1104 } 1105 1106 memcpy( read_out_tag, 1107 PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH+1, // TODO: WHY +1??? 1108 MAX_FIELDS_NAME_LENGTH ); 1109 1110 // Read in # fields ... 1111 token = strtok ( read_out_tag, ":" ); 1112 token = strtok( NULL," ,;<>" ); 1113 PhastaIOActiveFiles[i]->nFields = atoi( token ); 1114 1115 unsigned long long **header_table; 1116 header_table = ( unsigned long long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long long *)); 1117 //header_table = ( unsigned long long ** ) calloc( PhastaIOActiveFiles[i]->nFields + pool_align, sizeof(unsigned long long *)); 1118 //mem_address = (long long )header_table; 1119 //if( mem_address & (pool_align -1) ) 1120 // header_table += pool_align - (mem_address & (pool_align -1)); 1121 1122 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1123 { 1124 header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long long)); 1125 //header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF + pool_align, sizeof(unsigned long long *)); 1126 //mem_address = (long long )header_table[j]; 1127 //if( mem_address & (pool_align -1) ) 1128 // header_table[j] += pool_align - (mem_address & (pool_align -1)); 1129 } 1130 1131 // Read in the offset table ... 1132 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1133 { 1134 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1135 memcpy( header_table[j], 1136 PhastaIOActiveFiles[i]->master_header + 1137 VERSION_INFO_HEADER_SIZE + 1138 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long), 1139 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long) ); 1140 } 1141 1142 MPI_Scatter( header_table[j], 1143 PhastaIOActiveFiles[i]->nppp, 1144 MPI_LONG_LONG_INT, 1145 PhastaIOActiveFiles[i]->my_read_table[j], 1146 PhastaIOActiveFiles[i]->nppp, 1147 MPI_LONG_LONG_INT, 1148 0, 1149 PhastaIOActiveFiles[i]->local_comm ); 1150 1151 // Swap byte order if endianess is different ... 1152 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) { 1153 SwapArrayByteOrder( PhastaIOActiveFiles[i]->my_read_table[j], 1154 sizeof(long long int), 1155 PhastaIOActiveFiles[i]->nppp ); 1156 } 1157 } 1158 1159 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1160 free ( header_table[j] ); 1161 } 1162 free (header_table); 1163 1164 } // end of if MPI_IO_TAG 1165 else //else not valid MPI file 1166 { 1167 *fileDescriptor = NOT_A_MPI_FILE; 1168 printf("Error openfile: The file %s you opened is not in syncIO new format, please check again! File descriptor = %d, MasterHeaderSize = %d, read_out_tag = %s\n",fname,*fileDescriptor,MasterHeaderSize,read_out_tag); 1169 //Printing MasterHeaderSize is useful to test a compiler bug on Intrepid BGP 1170 endTimer(&timer_end); 1171 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1172 return; 1173 } 1174 } // end of if "read" 1175 else if( cscompare( "write", imode ) ) 1176 { 1177 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1178 fname, 1179 MPI_MODE_WRONLY | MPI_MODE_CREATE, 1180 MPI_INFO_NULL, 1181 &(PhastaIOActiveFiles[i]->file_handle) ); 1182 if(rc) 1183 { 1184 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1185 return; 1186 } 1187 } // end of if "write" 1188 free (fname); 1189 free (imode); 1190 } // end of if FileIndex != 0 1191 1192 endTimer(&timer_end); 1193 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1194 } 1195 1196 /** close file for both POSIX and MPI-IO syncIO format. 1197 * 1198 * If it's old POSIX format, simply call posix fclose(). 1199 * 1200 * If it's MPI-IO foramt: 1201 * in "read" mode, it simply close file with MPI-IO close routine. 1202 * in "write" mode, rank 0 in each group will re-assemble the master header and 1203 * offset table and write to the beginning of file, then close the file. 1204 */ 1205 void closefile( int* fileDescriptor, 1206 const char mode[] ) 1207 { 1208 double timer_start, timer_end; 1209 startTimer(&timer_start); 1210 1211 int i = *fileDescriptor; 1212 checkFileDescriptor("closefile",&i); 1213 1214 if ( PhastaIONextActiveIndex == 0 ) { 1215 char* imode = StringStripper( mode ); 1216 1217 if( cscompare( "write", imode ) 1218 || cscompare( "append", imode ) ) { 1219 fflush( fileArray[ *fileDescriptor - 1 ] ); 1220 } 1221 1222 fclose( fileArray[ *fileDescriptor - 1 ] ); 1223 free (imode); 1224 } 1225 else { 1226 char* imode = StringStripper( mode ); 1227 1228 //write master header here: 1229 if ( cscompare( "write", imode ) ) { 1230 // if ( PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields < 2*ONE_MEGABYTE/8 ) //SHOULD BE CHECKED 1231 // MasterHeaderSize = 4*ONE_MEGABYTE; 1232 // else 1233 // MasterHeaderSize = 4*ONE_MEGABYTE + PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields * 8 - 2*ONE_MEGABYTE; 1234 1235 MasterHeaderSize = computeMHSize( PhastaIOActiveFiles[i]->nFields, PhastaIOActiveFiles[i]->nPPF, LATEST_WRITE_VERSION); 1236 phprintf_0("Info closefile: myrank = %d, MasterHeaderSize = %d\n", PhastaIOActiveFiles[i]->myrank, MasterHeaderSize); 1237 1238 MPI_Status write_header_status; 1239 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1240 char version[MAX_FIELDS_NAME_LENGTH/4]; 1241 char mhsize[MAX_FIELDS_NAME_LENGTH/4]; 1242 int magic_number = ENDIAN_TEST_NUMBER; 1243 1244 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) 1245 { 1246 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1247 sprintf(mpi_tag, "MPI_IO_Tag : "); 1248 memcpy(PhastaIOActiveFiles[i]->master_header, 1249 mpi_tag, 1250 MAX_FIELDS_NAME_LENGTH); 1251 1252 bzero((void*)version,MAX_FIELDS_NAME_LENGTH/4); 1253 // this version is "1", print version in ASCII 1254 sprintf(version, "version : %d",1); 1255 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/2, 1256 version, 1257 MAX_FIELDS_NAME_LENGTH/4); 1258 1259 // master header size is computed using the formula above 1260 bzero((void*)mhsize,MAX_FIELDS_NAME_LENGTH/4); 1261 sprintf(mhsize, "mhsize : "); 1262 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/4*3, 1263 mhsize, 1264 MAX_FIELDS_NAME_LENGTH/4); 1265 1266 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1267 sprintf(mpi_tag, 1268 "\nnFields : %d\n", 1269 PhastaIOActiveFiles[i]->nFields); 1270 memcpy(PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH, 1271 mpi_tag, 1272 MAX_FIELDS_NAME_LENGTH); 1273 1274 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1275 sprintf(mpi_tag, "\nnPPF : %d\n", PhastaIOActiveFiles[i]->nPPF); 1276 memcpy( PhastaIOActiveFiles[i]->master_header+ 1277 PhastaIOActiveFiles[i]->nFields * 1278 MAX_FIELDS_NAME_LENGTH + 1279 MAX_FIELDS_NAME_LENGTH * 2, 1280 mpi_tag, 1281 MAX_FIELDS_NAME_LENGTH); 1282 1283 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1284 &magic_number, 1285 sizeof(int)); 1286 1287 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("mhsize : ") -1 + MAX_FIELDS_NAME_LENGTH/4*3, 1288 &MasterHeaderSize, 1289 sizeof(int)); 1290 } 1291 1292 int j = 0; 1293 unsigned long long **header_table; 1294 header_table = ( unsigned long long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long long *)); 1295 //header_table = ( unsigned long long ** ) calloc( PhastaIOActiveFiles[i]->nFields + pool_align, sizeof(unsigned long long *)); 1296 //mem_address = (long long )header_table; 1297 //if( mem_address & (pool_align -1) ) 1298 // header_table += pool_align - (mem_address & (pool_align -1)); 1299 1300 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1301 header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long long)); 1302 //header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF + pool_align, sizeof (unsigned long long *)); 1303 //mem_address = (long long )header_table[j]; 1304 //if( mem_address & (pool_align -1) ) 1305 // header_table[j] += pool_align - (mem_address & (pool_align - 1)); 1306 } 1307 1308 //if( irank == 0 ) printf("gonna mpi_gather, myrank = %d\n", irank); 1309 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1310 MPI_Gather( PhastaIOActiveFiles[i]->my_offset_table[j], 1311 PhastaIOActiveFiles[i]->nppp, 1312 MPI_LONG_LONG_INT, 1313 header_table[j], 1314 PhastaIOActiveFiles[i]->nppp, 1315 MPI_LONG_LONG_INT, 1316 0, 1317 PhastaIOActiveFiles[i]->local_comm ); 1318 } 1319 1320 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1321 1322 //if( irank == 0 ) printf("gonna memcpy for every procs, myrank = %d\n", irank); 1323 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1324 memcpy ( PhastaIOActiveFiles[i]->master_header + 1325 VERSION_INFO_HEADER_SIZE + 1326 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long), 1327 header_table[j], 1328 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long) ); 1329 } 1330 1331 //if( irank == 0 ) printf("gonna file_write_at(), myrank = %d\n", irank); 1332 MPI_File_write_at( PhastaIOActiveFiles[i]->file_handle, 1333 0, 1334 PhastaIOActiveFiles[i]->master_header, 1335 MasterHeaderSize, 1336 MPI_CHAR, 1337 &write_header_status ); 1338 } 1339 1340 ////free(PhastaIOActiveFiles[i]->master_header); 1341 1342 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1343 free ( header_table[j] ); 1344 } 1345 free (header_table); 1346 } 1347 1348 //if( irank == 0 ) printf("gonna file_close(), myrank = %d\n", irank); 1349 MPI_File_close( &( PhastaIOActiveFiles[i]->file_handle ) ); 1350 free ( imode ); 1351 } 1352 1353 endTimer(&timer_end); 1354 printPerf("closefile_", timer_start, timer_end, 0, 0, ""); 1355 } 1356 1357 int readHeader( FILE* f, const char phrase[], 1358 int* params, int numParams, const char* iotype) { 1359 isBinary(iotype); 1360 return readHeader(f,phrase,params,numParams); 1361 } 1362 1363 void readheader( int* fileDescriptor, 1364 const char keyphrase[], 1365 void* valueArray, 1366 int* nItems, 1367 const char datatype[], 1368 const char iotype[] ) 1369 { 1370 double timer_start, timer_end; 1371 1372 startTimer(&timer_start); 1373 1374 int i = *fileDescriptor; 1375 checkFileDescriptor("readheader",&i); 1376 1377 if ( PhastaIONextActiveIndex == 0 ) { 1378 int filePtr = *fileDescriptor - 1; 1379 FILE* fileObject; 1380 int* valueListInt; 1381 1382 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1383 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1384 fprintf(stderr,"openfile_ function has to be called before \n") ; 1385 fprintf(stderr,"acessing the file\n ") ; 1386 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1387 endTimer(&timer_end); 1388 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1389 return; 1390 } 1391 1392 LastHeaderKey[filePtr] = std::string(keyphrase); 1393 LastHeaderNotFound = false; 1394 1395 fileObject = fileArray[ filePtr ] ; 1396 Wrong_Endian = byte_order[ filePtr ]; 1397 1398 isBinary( iotype ); 1399 typeSize( datatype ); //redundant call, just avoid a compiler warning. 1400 1401 // right now we are making the assumption that we will only write integers 1402 // on the header line. 1403 1404 valueListInt = static_cast< int* >( valueArray ); 1405 int ierr = readHeader( fileObject , 1406 keyphrase, 1407 valueListInt, 1408 *nItems ) ; 1409 1410 byte_order[ filePtr ] = Wrong_Endian ; 1411 1412 if ( ierr ) LastHeaderNotFound = true; 1413 1414 //return ; // don't return, go to the end to print perf 1415 } 1416 else { 1417 unsigned int skip_size; 1418 int* valueListInt; 1419 valueListInt = static_cast <int*>(valueArray); 1420 char* token = NULL; 1421 bool FOUND = false ; 1422 isBinary( iotype ); 1423 1424 MPI_Status read_offset_status; 1425 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1426 memset(read_out_tag, '\0', MAX_FIELDS_NAME_LENGTH); 1427 char readouttag[MAX_FIELDS_NUMBER][MAX_FIELDS_NAME_LENGTH]; 1428 int j; 1429 1430 int string_length = strlen( keyphrase ); 1431 char* buffer = (char*) malloc ( string_length+1 ); 1432 //char* buffer = ( char * ) malloc( string_length + 1 + pool_align ); 1433 //mem_address = (long long )buffer; 1434 //if( mem_address & (pool_align -1) ) 1435 // buffer += pool_align - (mem_address & (pool_align -1)); 1436 1437 strcpy ( buffer, keyphrase ); 1438 buffer[ string_length ] = '\0'; 1439 1440 char* st2 = strtok ( buffer, "@" ); 1441 st2 = strtok (NULL, "@"); 1442 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1443 if ( char* p = strpbrk(buffer, "@") ) 1444 *p = '\0'; 1445 1446 // Check if the user has input the right GPid 1447 if ( ( PhastaIOActiveFiles[i]->GPid <= 1448 PhastaIOActiveFiles[i]->myrank * 1449 PhastaIOActiveFiles[i]->nppp )|| 1450 ( PhastaIOActiveFiles[i]->GPid > 1451 ( PhastaIOActiveFiles[i]->myrank + 1 ) * 1452 PhastaIOActiveFiles[i]->nppp ) ) 1453 { 1454 *fileDescriptor = NOT_A_MPI_FILE; 1455 printf("Error readheader: The file is not in syncIO new format, please check! myrank = %d, GPid = %d, nppp = %d, keyphrase = %s\n", PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->GPid, PhastaIOActiveFiles[i]->nppp, keyphrase); 1456 // It is possible atoi could not generate a clear integer from st2 because of additional garbage character in keyphrase 1457 endTimer(&timer_end); 1458 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1459 return; 1460 } 1461 1462 // Find the field we want ... 1463 //for ( j = 0; j<MAX_FIELDS_NUMBER; j++ ) 1464 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1465 { 1466 memcpy( readouttag[j], 1467 PhastaIOActiveFiles[i]->master_header + j*MAX_FIELDS_NAME_LENGTH+MAX_FIELDS_NAME_LENGTH*2+1, 1468 MAX_FIELDS_NAME_LENGTH-1 ); 1469 } 1470 1471 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1472 { 1473 token = strtok ( readouttag[j], ":" ); 1474 1475 //if ( cscompare( buffer, token ) ) 1476 if ( cscompare( token , buffer ) && cscompare( buffer, token ) ) 1477 // This double comparison is required for the field "number of nodes" and all the other fields that start with "number of nodes" (i.g. number of nodes in the mesh"). 1478 // Would be safer to rename "number of nodes" by "number of nodes in the part" so that the name are completely unique. But much more work to do that (Nspre, phParAdapt, etc). 1479 // Since the field name are unique in SyncIO (as it includes part ID), this should be safe and there should be no issue with the "?" trailing character. 1480 { 1481 PhastaIOActiveFiles[i]->read_field_count = j; 1482 FOUND = true; 1483 //printf("buffer: %s | token: %s | j: %d\n",buffer,token,j); 1484 break; 1485 } 1486 } 1487 free(buffer); 1488 1489 if (!FOUND) 1490 { 1491 //if(irank==0) printf("Warning readheader: Not found %s \n",keyphrase); //PhastaIOActiveFiles[i]->myrank is certainly initialized here. 1492 if(PhastaIOActiveFiles[i]->myrank == 0) printf("WARNING readheader: Not found %s\n",keyphrase); 1493 endTimer(&timer_end); 1494 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1495 return; 1496 } 1497 1498 // Find the part we want ... 1499 PhastaIOActiveFiles[i]->read_part_count = PhastaIOActiveFiles[i]->GPid - 1500 PhastaIOActiveFiles[i]->myrank * PhastaIOActiveFiles[i]->nppp - 1; 1501 1502 PhastaIOActiveFiles[i]->my_offset = 1503 PhastaIOActiveFiles[i]->my_read_table[PhastaIOActiveFiles[i]->read_field_count][PhastaIOActiveFiles[i]->read_part_count]; 1504 1505 // printf("****Rank %d offset is %d\n",PhastaIOActiveFiles[i]->myrank,PhastaIOActiveFiles[i]->my_offset); 1506 1507 // Read each datablock header here ... 1508 1509 MPI_File_read_at_all( PhastaIOActiveFiles[i]->file_handle, 1510 PhastaIOActiveFiles[i]->my_offset+1, 1511 read_out_tag, 1512 MAX_FIELDS_NAME_LENGTH-1, 1513 MPI_CHAR, 1514 &read_offset_status ); 1515 token = strtok ( read_out_tag, ":" ); 1516 1517 // printf("&&&&Rank %d read_out_tag is %s\n",PhastaIOActiveFiles[i]->myrank,read_out_tag); 1518 1519 if( cscompare( keyphrase , token ) ) //No need to compare also token with keyphrase like above. We should already have the right one. Otherwise there is a problem. 1520 { 1521 FOUND = true ; 1522 token = strtok( NULL, " ,;<>" ); 1523 skip_size = atoi( token ); 1524 for( j=0; j < *nItems && ( token = strtok( NULL," ,;<>") ); j++ ) 1525 valueListInt[j] = atoi( token ); 1526 1527 if ( j < *nItems ) 1528 { 1529 fprintf( stderr, "Expected # of ints not found for: %s\n", keyphrase ); 1530 } 1531 } 1532 else { 1533 //if(irank==0) 1534 if(PhastaIOActiveFiles[i]->myrank == 0) 1535 // If we enter this if, there is a problem with the name of some fields 1536 { 1537 printf("Error readheader: Unexpected mismatch between keyphrase = %s and token = %s\n",keyphrase,token); 1538 } 1539 } 1540 } 1541 1542 endTimer(&timer_end); 1543 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1544 1545 } 1546 1547 void readdatablock( int* fileDescriptor, 1548 const char keyphrase[], 1549 void* valueArray, 1550 int* nItems, 1551 const char datatype[], 1552 const char iotype[] ) 1553 { 1554 //if(irank == 0) printf("entering readdatablock()\n"); 1555 unsigned long long data_size = 0; 1556 double timer_start, timer_end; 1557 startTimer(&timer_start); 1558 1559 int i = *fileDescriptor; 1560 checkFileDescriptor("readdatablock",&i); 1561 1562 if ( PhastaIONextActiveIndex == 0 ) { 1563 int filePtr = *fileDescriptor - 1; 1564 FILE* fileObject; 1565 char junk; 1566 1567 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1568 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1569 fprintf(stderr,"openfile_ function has to be called before\n") ; 1570 fprintf(stderr,"acessing the file\n ") ; 1571 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1572 endTimer(&timer_end); 1573 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1574 return; 1575 } 1576 1577 // error check.. 1578 // since we require that a consistant header always preceed the data block 1579 // let us check to see that it is actually the case. 1580 1581 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1582 fprintf(stderr, "Header not consistant with data block\n"); 1583 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1584 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1585 fprintf(stderr, "Please recheck read sequence \n"); 1586 if( Strict_Error ) { 1587 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1588 endTimer(&timer_end); 1589 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1590 return; 1591 } 1592 } 1593 1594 if ( LastHeaderNotFound ) { 1595 endTimer(&timer_end); 1596 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1597 return; 1598 } 1599 fileObject = fileArray[ filePtr ]; 1600 Wrong_Endian = byte_order[ filePtr ]; 1601 1602 size_t type_size = typeSize( datatype ); 1603 int nUnits = *nItems; 1604 isBinary( iotype ); 1605 1606 LastHeaderKey.erase(filePtr); 1607 1608 if ( binary_format ) { 1609 fread( valueArray, type_size, nUnits, fileObject ); 1610 fread( &junk, sizeof(char), 1 , fileObject ); 1611 if ( Wrong_Endian ) SwapArrayByteOrder( valueArray, type_size, nUnits ); 1612 } else { 1613 1614 char* ts1 = StringStripper( datatype ); 1615 if ( cscompare( "integer", ts1 ) ) { 1616 for( int n=0; n < nUnits ; n++ ) 1617 fscanf(fileObject, "%d\n",(int*)((int*)valueArray+n) ); 1618 } else if ( cscompare( "double", ts1 ) ) { 1619 for( int n=0; n < nUnits ; n++ ) 1620 fscanf(fileObject, "%lf\n",(double*)((double*)valueArray+n) ); 1621 } 1622 free (ts1); 1623 } 1624 1625 //return; 1626 } 1627 else { 1628 // printf("read data block\n"); 1629 MPI_Status read_data_status; 1630 size_t type_size = typeSize( datatype ); 1631 int nUnits = *nItems; 1632 isBinary( iotype ); 1633 1634 // read datablock then 1635 //MR CHANGE 1636 // if ( cscompare ( datatype, "double")) 1637 char* ts2 = StringStripper( datatype ); 1638 if ( cscompare ( "double" , ts2)) 1639 //MR CHANGE END 1640 { 1641 1642 MPI_File_read_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1643 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1644 valueArray, 1645 nUnits, 1646 MPI_DOUBLE ); 1647 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1648 valueArray, 1649 &read_data_status ); 1650 data_size=8*nUnits; 1651 1652 } 1653 //MR CHANGE 1654 // else if ( cscompare ( datatype, "integer")) 1655 else if ( cscompare ( "integer" , ts2)) 1656 //MR CHANGE END 1657 { 1658 MPI_File_read_at_all_begin(PhastaIOActiveFiles[i]->file_handle, 1659 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1660 valueArray, 1661 nUnits, 1662 MPI_INT ); 1663 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1664 valueArray, 1665 &read_data_status ); 1666 data_size=4*nUnits; 1667 } 1668 else 1669 { 1670 *fileDescriptor = DATA_TYPE_ILLEGAL; 1671 printf("readdatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 1672 endTimer(&timer_end); 1673 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1674 return; 1675 } 1676 free(ts2); 1677 1678 1679 // printf("%d Read finishe\n",PhastaIOActiveFiles[i]->myrank); 1680 1681 // Swap data byte order if endianess is different ... 1682 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) 1683 { 1684 SwapArrayByteOrder( valueArray, type_size, nUnits ); 1685 } 1686 } 1687 1688 endTimer(&timer_end); 1689 char extra_msg[1024]; 1690 memset(extra_msg, '\0', 1024); 1691 char* key = StringStripper(keyphrase); 1692 sprintf(extra_msg, " field is %s ", key); 1693 printPerf("readdatablock", timer_start, timer_end, data_size, 1, extra_msg); 1694 free(key); 1695 1696 } 1697 1698 void writeheader( const int* fileDescriptor, 1699 const char keyphrase[], 1700 const void* valueArray, 1701 const int* nItems, 1702 const int* ndataItems, 1703 const char datatype[], 1704 const char iotype[]) 1705 { 1706 1707 //if(irank == 0) printf("entering writeheader()\n"); 1708 1709 double timer_start, timer_end; 1710 startTimer(&timer_start); 1711 1712 int i = *fileDescriptor; 1713 checkFileDescriptor("writeheader",&i); 1714 1715 if ( PhastaIONextActiveIndex == 0 ) { 1716 int filePtr = *fileDescriptor - 1; 1717 FILE* fileObject; 1718 1719 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1720 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1721 fprintf(stderr,"openfile_ function has to be called before \n") ; 1722 fprintf(stderr,"acessing the file\n ") ; 1723 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1724 endTimer(&timer_end); 1725 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1726 return; 1727 } 1728 1729 LastHeaderKey[filePtr] = std::string(keyphrase); 1730 DataSize = *ndataItems; 1731 fileObject = fileArray[ filePtr ] ; 1732 size_t type_size = typeSize( datatype ); 1733 isBinary( iotype ); 1734 header_type[ filePtr ] = type_size; 1735 1736 int _newline = ( *ndataItems > 0 ) ? sizeof( char ) : 0; 1737 int size_of_nextblock = 1738 ( binary_format ) ? type_size*( *ndataItems )+ _newline : *ndataItems ; 1739 1740 fprintf( fileObject, "%s : < %d > ", keyphrase, size_of_nextblock ); 1741 for( int i = 0; i < *nItems; i++ ) 1742 fprintf(fileObject, "%d ", *((int*)((int*)valueArray+i))); 1743 fprintf(fileObject, "\n"); 1744 1745 //return ; 1746 } 1747 else { // else it's parallel I/O 1748 DataSize = *ndataItems; 1749 size_t type_size = typeSize( datatype ); 1750 isBinary( iotype ); 1751 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1752 1753 int string_length = strlen( keyphrase ); 1754 char* buffer = (char*) malloc ( string_length+1 ); 1755 //char* buffer = ( char * ) malloc( string_length + 1 + pool_align ); 1756 //mem_address = (long long )buffer; 1757 //if( mem_address & (pool_align -1) ) 1758 // buffer += pool_align - (mem_address & (pool_align -1)); 1759 1760 strcpy ( buffer, keyphrase); 1761 buffer[ string_length ] = '\0'; 1762 1763 char* st2 = strtok ( buffer, "@" ); 1764 st2 = strtok (NULL, "@"); 1765 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1766 1767 if ( char* p = strpbrk(buffer, "@") ) 1768 *p = '\0'; 1769 1770 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1771 sprintf(mpi_tag, "\n%s : %d\n", buffer, PhastaIOActiveFiles[i]->field_count); 1772 unsigned long long offset_value; 1773 1774 int temp = *ndataItems; 1775 unsigned long long number_of_items = (unsigned long long)temp; 1776 MPI_Barrier(PhastaIOActiveFiles[i]->local_comm); 1777 1778 MPI_Scan( &number_of_items, 1779 &offset_value, 1780 1, 1781 MPI_LONG_LONG_INT, 1782 MPI_SUM, 1783 PhastaIOActiveFiles[i]->local_comm ); 1784 1785 offset_value = (offset_value - number_of_items) * type_size; 1786 1787 offset_value += PhastaIOActiveFiles[i]->local_myrank * 1788 DB_HEADER_SIZE + 1789 PhastaIOActiveFiles[i]->next_start_address; 1790 // This offset is the starting address of each datablock header... 1791 PhastaIOActiveFiles[i]->my_offset = offset_value; 1792 1793 // Write in my offset table ... 1794 PhastaIOActiveFiles[i]->my_offset_table[PhastaIOActiveFiles[i]->field_count][PhastaIOActiveFiles[i]->part_count] = 1795 PhastaIOActiveFiles[i]->my_offset; 1796 1797 // Update the next-start-address ... 1798 PhastaIOActiveFiles[i]->next_start_address = offset_value + 1799 number_of_items * type_size + 1800 DB_HEADER_SIZE; 1801 MPI_Bcast( &(PhastaIOActiveFiles[i]->next_start_address), 1802 1, 1803 MPI_LONG_LONG_INT, 1804 PhastaIOActiveFiles[i]->local_numprocs-1, 1805 PhastaIOActiveFiles[i]->local_comm ); 1806 1807 // Prepare datablock header ... 1808 int _newline = (*ndataItems>0)?sizeof(char):0; 1809 unsigned int size_of_nextblock = type_size * (*ndataItems) + _newline; 1810 1811 //char datablock_header[255]; 1812 //bzero((void*)datablock_header,255); 1813 char datablock_header[DB_HEADER_SIZE]; 1814 bzero((void*)datablock_header,DB_HEADER_SIZE); 1815 1816 PhastaIOActiveFiles[i]->GPid = PhastaIOActiveFiles[i]->nppp*PhastaIOActiveFiles[i]->myrank+PhastaIOActiveFiles[i]->part_count; 1817 sprintf( datablock_header, 1818 "\n%s : < %u >", 1819 keyphrase, 1820 size_of_nextblock ); 1821 1822 for ( int j = 0; j < *nItems; j++ ) 1823 { 1824 sprintf( datablock_header, 1825 "%s %d ", 1826 datablock_header, 1827 *((int*)((int*)valueArray+j))); 1828 } 1829 sprintf( datablock_header, 1830 "%s\n ", 1831 datablock_header ); 1832 1833 // Write datablock header ... 1834 //MR CHANGE 1835 // if ( cscompare(datatype,"double") ) 1836 char* ts1 = StringStripper( datatype ); 1837 if ( cscompare("double",ts1) ) 1838 //MR CHANGE END 1839 { 1840 free ( PhastaIOActiveFiles[i]->double_chunk ); 1841 PhastaIOActiveFiles[i]->double_chunk = ( double * )malloc( (sizeof( double )*number_of_items+ DB_HEADER_SIZE)); 1842 //PhastaIOActiveFiles[i]->double_chunk = ( double * ) malloc( sizeof( double )*number_of_items+ DB_HEADER_SIZE + pool_align ); 1843 //mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 1844 //if( mem_address & (pool_align -1) ) 1845 // PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 1846 1847 double * aa = ( double * )datablock_header; 1848 memcpy(PhastaIOActiveFiles[i]->double_chunk, aa, DB_HEADER_SIZE); 1849 } 1850 //MR CHANGE 1851 // if ( cscompare(datatype,"integer") ) 1852 else if ( cscompare("integer",ts1) ) 1853 //MR CHANGE END 1854 { 1855 free ( PhastaIOActiveFiles[i]->int_chunk ); 1856 PhastaIOActiveFiles[i]->int_chunk = ( int * )malloc( (sizeof( int )*number_of_items+ DB_HEADER_SIZE)); 1857 //PhastaIOActiveFiles[i]->int_chunk = ( int * ) malloc( sizeof( int )*number_of_items+ DB_HEADER_SIZE + pool_align ); 1858 //mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 1859 //if( mem_address & (pool_align -1) ) 1860 // PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & ( pool_align -1)); 1861 1862 int * aa = ( int * )datablock_header; 1863 memcpy(PhastaIOActiveFiles[i]->int_chunk, aa, DB_HEADER_SIZE); 1864 } 1865 else { 1866 // *fileDescriptor = DATA_TYPE_ILLEGAL; 1867 printf("writeheader - DATA_TYPE_ILLEGAL - %s\n",datatype); 1868 endTimer(&timer_end); 1869 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1870 return; 1871 } 1872 free(ts1); 1873 1874 PhastaIOActiveFiles[i]->part_count++; 1875 if ( PhastaIOActiveFiles[i]->part_count == PhastaIOActiveFiles[i]->nppp ) { 1876 //A new field will be written 1877 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1878 memcpy( PhastaIOActiveFiles[i]->master_header + 1879 PhastaIOActiveFiles[i]->field_count * 1880 MAX_FIELDS_NAME_LENGTH + 1881 MAX_FIELDS_NAME_LENGTH * 2, 1882 mpi_tag, 1883 MAX_FIELDS_NAME_LENGTH-1); 1884 } 1885 PhastaIOActiveFiles[i]->field_count++; 1886 PhastaIOActiveFiles[i]->part_count=0; 1887 } 1888 free(buffer); 1889 } 1890 1891 endTimer(&timer_end); 1892 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1893 } 1894 1895 void writedatablock( const int* fileDescriptor, 1896 const char keyphrase[], 1897 const void* valueArray, 1898 const int* nItems, 1899 const char datatype[], 1900 const char iotype[] ) 1901 { 1902 //if(irank == 0) printf("entering writedatablock()\n"); 1903 1904 unsigned long long data_size = 0; 1905 double timer_start, timer_end; 1906 startTimer(&timer_start); 1907 1908 int i = *fileDescriptor; 1909 checkFileDescriptor("writedatablock",&i); 1910 1911 if ( PhastaIONextActiveIndex == 0 ) { 1912 int filePtr = *fileDescriptor - 1; 1913 1914 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1915 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1916 fprintf(stderr,"openfile_ function has to be called before \n") ; 1917 fprintf(stderr,"acessing the file\n ") ; 1918 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1919 endTimer(&timer_end); 1920 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1921 return; 1922 } 1923 // since we require that a consistant header always preceed the data block 1924 // let us check to see that it is actually the case. 1925 1926 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1927 fprintf(stderr, "Header not consistant with data block\n"); 1928 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1929 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1930 fprintf(stderr, "Please recheck write sequence \n"); 1931 if( Strict_Error ) { 1932 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1933 endTimer(&timer_end); 1934 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1935 return; 1936 } 1937 } 1938 1939 FILE* fileObject = fileArray[ filePtr ] ; 1940 size_t type_size=typeSize( datatype ); 1941 isBinary( iotype ); 1942 1943 LastHeaderKey.erase(filePtr); 1944 1945 if ( header_type[filePtr] != (int)type_size ) { 1946 fprintf(stderr,"header and datablock differ on typeof data in the block for\n"); 1947 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1948 if( Strict_Error ) { 1949 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1950 endTimer(&timer_end); 1951 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1952 return; 1953 } 1954 } 1955 1956 int nUnits = *nItems; 1957 1958 if ( nUnits != DataSize ) { 1959 fprintf(stderr,"header and datablock differ on number of data items for\n"); 1960 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1961 if( Strict_Error ) { 1962 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1963 endTimer(&timer_end); 1964 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1965 return; 1966 } 1967 } 1968 1969 if ( binary_format ) { 1970 1971 fwrite( valueArray, type_size, nUnits, fileObject ); 1972 fprintf( fileObject,"\n"); 1973 1974 } else { 1975 1976 char* ts1 = StringStripper( datatype ); 1977 if ( cscompare( "integer", ts1 ) ) { 1978 for( int n=0; n < nUnits ; n++ ) 1979 fprintf(fileObject,"%d\n",*((int*)((int*)valueArray+n))); 1980 } else if ( cscompare( "double", ts1 ) ) { 1981 for( int n=0; n < nUnits ; n++ ) 1982 fprintf(fileObject,"%lf\n",*((double*)((double*)valueArray+n))); 1983 } 1984 free (ts1); 1985 } 1986 //return ; 1987 } 1988 else { // syncIO case 1989 MPI_Status write_data_status; 1990 isBinary( iotype ); 1991 int nUnits = *nItems; 1992 1993 //MR CHANGE 1994 // if ( cscompare(datatype,"double") ) 1995 char* ts1 = StringStripper( datatype ); 1996 if ( cscompare("double",ts1) ) 1997 //MR CHANGE END 1998 { 1999 memcpy((PhastaIOActiveFiles[i]->double_chunk+DB_HEADER_SIZE/sizeof(double)), valueArray, nUnits*sizeof(double)); 2000 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 2001 PhastaIOActiveFiles[i]->my_offset, 2002 PhastaIOActiveFiles[i]->double_chunk, 2003 //BLOCK_SIZE/sizeof(double), 2004 nUnits+DB_HEADER_SIZE/sizeof(double), 2005 MPI_DOUBLE ); 2006 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 2007 PhastaIOActiveFiles[i]->double_chunk, 2008 &write_data_status ); 2009 data_size=8*nUnits; 2010 } 2011 //MR CHANGE 2012 // else if ( cscompare ( datatype, "integer")) 2013 else if ( cscompare("integer",ts1) ) 2014 //MR CHANGE END 2015 { 2016 memcpy((PhastaIOActiveFiles[i]->int_chunk+DB_HEADER_SIZE/sizeof(int)), valueArray, nUnits*sizeof(int)); 2017 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 2018 PhastaIOActiveFiles[i]->my_offset, 2019 PhastaIOActiveFiles[i]->int_chunk, 2020 nUnits+DB_HEADER_SIZE/sizeof(int), 2021 MPI_INT ); 2022 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 2023 PhastaIOActiveFiles[i]->int_chunk, 2024 &write_data_status ); 2025 data_size=4*nUnits; 2026 } 2027 else { 2028 printf("Error: writedatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 2029 endTimer(&timer_end); 2030 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 2031 return; 2032 } 2033 free(ts1); 2034 } 2035 2036 endTimer(&timer_end); 2037 char extra_msg[1024]; 2038 memset(extra_msg, '\0', 1024); 2039 char* key = StringStripper(keyphrase); 2040 sprintf(extra_msg, " field is %s ", key); 2041 printPerf("writedatablock", timer_start, timer_end, data_size, 1, extra_msg); 2042 free(key); 2043 2044 } 2045 2046 void 2047 SwapArrayByteOrder( void* array, 2048 int nbytes, 2049 int nItems ) 2050 { 2051 /* This swaps the byte order for the array of nItems each 2052 of size nbytes , This will be called only locally */ 2053 int i,j; 2054 unsigned char* ucDst = (unsigned char*)array; 2055 2056 for(i=0; i < nItems; i++) { 2057 for(j=0; j < (nbytes/2); j++) 2058 std::swap( ucDst[j] , ucDst[(nbytes - 1) - j] ); 2059 ucDst += nbytes; 2060 } 2061 } 2062 2063 void 2064 writestring( int* fileDescriptor, 2065 const char inString[] ) 2066 { 2067 2068 int filePtr = *fileDescriptor - 1; 2069 FILE* fileObject = fileArray[filePtr] ; 2070 fprintf(fileObject,"%s",inString ); 2071 return; 2072 } 2073 2074 void 2075 Gather_Headers( int* fileDescriptor, 2076 vector< string >& headers ) 2077 { 2078 2079 FILE* fileObject; 2080 char Line[1024]; 2081 2082 fileObject = fileArray[ (*fileDescriptor)-1 ]; 2083 2084 while( !feof(fileObject) ) { 2085 fgets( Line, 1024, fileObject); 2086 if ( Line[0] == '#' ) { 2087 headers.push_back( Line ); 2088 } else { 2089 break; 2090 } 2091 } 2092 rewind( fileObject ); 2093 clearerr( fileObject ); 2094 } 2095 void 2096 isWrong( void ) { (Wrong_Endian) ? fprintf(stdout,"YES\n"): fprintf(stdout,"NO\n") ; } 2097 2098 void 2099 togglestrictmode( void ) { Strict_Error = !Strict_Error; } 2100 2101 int 2102 isLittleEndian( void ) 2103 { 2104 // this function returns a 1 if the current running architecture is 2105 // LittleEndian Byte Ordered, else it returns a zero 2106 2107 union { 2108 long a; 2109 char c[sizeof( long )]; 2110 } endianUnion; 2111 2112 endianUnion.a = 1 ; 2113 2114 if ( endianUnion.c[sizeof(long)-1] != 1 ) return 1 ; 2115 else return 0; 2116 } 2117 2118 namespace PHASTA { 2119 const char* const PhastaIO_traits<int>::type_string = "integer"; 2120 const char* const PhastaIO_traits<double>::type_string = "double"; 2121 } 2122