1 /* 2 * 3 * This is the SyncIO library that uses MPI-IO collective functions to 4 * implement a flexible I/O checkpoint solution for a large number of 5 * processors. 6 * 7 * Previous developer: Ning Liu (liun2@cs.rpi.edu) 8 * Jing Fu (fuj@cs.rpi.edu) 9 * Current developers: Michel Rasquin (Michel.Rasquin@colorado.edu), 10 * Ben Matthews (benjamin.a.matthews@colorado.edu) 11 * 12 */ 13 14 #include <map> 15 #include <vector> 16 #include <string> 17 #include <string.h> 18 #include <ctype.h> 19 #include <stdlib.h> 20 #include <stdio.h> 21 #include <math.h> 22 #include <sstream> 23 #include "phastaIO.h" 24 #include "mpi.h" 25 #include "phiotmrc.h" 26 27 #define VERSION_INFO_HEADER_SIZE 8192 28 #define DB_HEADER_SIZE 1024 29 #define ONE_MEGABYTE 1048576 30 #define TWO_MEGABYTE 2097152 31 #define ENDIAN_TEST_NUMBER 12180 // Troy's Zip Code!! 32 #define MAX_PHASTA_FILES 64 33 #define MAX_PHASTA_FILE_NAME_LENGTH 1024 34 #define MAX_FIELDS_NUMBER ((VERSION_INFO_HEADER_SIZE/MAX_FIELDS_NAME_LENGTH)-4) // The meta data include - MPI_IO_Tag, nFields, nFields*names of the fields, nppf 35 // -3 for MPI_IO_Tag, nFields and nppf, -4 for extra security (former nFiles) 36 #define MAX_FIELDS_NAME_LENGTH 128 37 #define DefaultMHSize (4*ONE_MEGABYTE) 38 //#define DefaultMHSize (8350) //For test 39 #define LATEST_WRITE_VERSION 1 40 #define inv1024sq 953.674316406e-9 // = 1/1024/1024 41 int MasterHeaderSize = -1; 42 43 bool PRINT_PERF = false; // default print no perf results 44 int irank = -1; // global rank, should never be manually manipulated 45 int mysize = -1; 46 47 // Static variables are bad but used here to store the subcommunicator and associated variables 48 // Prevent MPI_Comm_split to be called more than once, especially on BGQ with the V1R2M1 driver (leak detected in MPI_Comm_split - IBM working on it) 49 static int s_assign_local_comm = 0; 50 static MPI_Comm s_local_comm; 51 static int s_local_size = -1; 52 static int s_local_rank = -1; 53 54 //unsigned long long pool_align = 8; 55 //unsigned long long mem_address; 56 57 // the following defines are for debug printf 58 #define PHASTAIO_PREFIX "phastaIO debug: " 59 #define PHASTAIO_DEBUG 0 //default to not print any debugging info 60 61 #if PHASTAIO_DEBUG 62 #define phprintf( s, arg...) printf(PHASTAIO_PREFIX s "\n", ##arg) 63 #define phprintf_0( s, arg...) do{ \ 64 MPI_Comm_rank(MPI_COMM_WORLD, &irank); \ 65 if(irank == 0){ \ 66 printf(PHASTAIO_PREFIX "irank=0: " s "\n", ##arg); \ 67 } \ 68 } while(0) 69 #else 70 #define phprintf( s, arg...) 71 #define phprintf_0( s, arg...) 72 #endif 73 74 enum PhastaIO_Errors 75 { 76 MAX_PHASTA_FILES_EXCEEDED = -1, 77 UNABLE_TO_OPEN_FILE = -2, 78 NOT_A_MPI_FILE = -3, 79 GPID_EXCEEDED = -4, 80 DATA_TYPE_ILLEGAL = -5, 81 }; 82 83 using namespace std; 84 85 namespace{ 86 87 map<int, std::string> LastHeaderKey; 88 vector< FILE* > fileArray; 89 vector< bool > byte_order; 90 vector< int > header_type; 91 int DataSize=0; 92 bool LastHeaderNotFound = false; 93 bool Wrong_Endian = false ; 94 bool Strict_Error = false ; 95 bool binary_format = true; 96 97 /***********************************************************************/ 98 /***************** NEW PHASTA IO CODE STARTS HERE **********************/ 99 /***********************************************************************/ 100 101 typedef struct 102 { 103 char filename[MAX_PHASTA_FILE_NAME_LENGTH]; /* defafults to 1024 */ 104 unsigned long long my_offset; 105 unsigned long long next_start_address; 106 unsigned long long **my_offset_table; 107 unsigned long long **my_read_table; 108 109 double * double_chunk; 110 double * read_double_chunk; 111 112 int field_count; 113 int part_count; 114 int read_field_count; 115 int read_part_count; 116 int GPid; 117 int start_id; 118 119 int mhsize; 120 121 int myrank; 122 int numprocs; 123 int local_myrank; 124 int local_numprocs; 125 126 int nppp; 127 int nPPF; 128 int nFiles; 129 int nFields; 130 131 int * int_chunk; 132 int * read_int_chunk; 133 134 int Wrong_Endian; /* default to false */ 135 char * master_header; 136 MPI_File file_handle; 137 MPI_Comm local_comm; 138 } phastaio_file_t; 139 140 typedef struct 141 { 142 //unsigned long long my_offset; 143 //unsigned long long **offset_table; 144 //int fileID; 145 int nppf, nfields; 146 //int GPid; 147 //int read_field_count; 148 char * masterHeader; 149 }serial_file; 150 151 serial_file *SerialFile; 152 phastaio_file_t *PhastaIOActiveFiles[MAX_PHASTA_FILES]; 153 int PhastaIONextActiveIndex = 0; /* indicates next index to allocate */ 154 155 // the caller has the responsibility to delete the returned string 156 // TODO: StringStipper("nbc value? ") returns NULL? 157 char* 158 StringStripper( const char istring[] ) { 159 160 int length = strlen( istring ); 161 162 //char* dest = new char [ length + 1 ]; 163 char* dest = (char *)malloc( length + 1 ); 164 165 //char * dest; 166 //dest = (char *)malloc( length + 1 + pool_align ); 167 //if (dest & 0x0F != 0) printf("not aligned!!!!\n"); 168 //mem_address = (long long )dest; 169 //if( mem_address & (pool_align -1) ) 170 // dest += pool_align - (mem_address & (pool_align -1)); 171 172 strcpy( dest, istring ); 173 dest[ length ] = '\0'; 174 175 if ( char* p = strpbrk( dest, " ") ) 176 *p = '\0'; 177 178 return dest; 179 } 180 181 inline int 182 cscompare( const char teststring[], 183 const char targetstring[] ) { 184 185 char* s1 = const_cast<char*>(teststring); 186 char* s2 = const_cast<char*>(targetstring); 187 188 while( *s1 == ' ') s1++; 189 while( *s2 == ' ') s2++; 190 while( ( *s1 ) 191 && ( *s2 ) 192 && ( *s2 != '?') 193 && ( tolower( *s1 )==tolower( *s2 ) ) ) { 194 s1++; 195 s2++; 196 while( *s1 == ' ') s1++; 197 while( *s2 == ' ') s2++; 198 } 199 if ( !( *s1 ) || ( *s1 == '?') ) return 1; 200 else return 0; 201 } 202 203 inline void 204 isBinary( const char iotype[] ) { 205 206 char* fname = StringStripper( iotype ); 207 if ( cscompare( fname, "binary" ) ) binary_format = true; 208 else binary_format = false; 209 free (fname); 210 211 } 212 213 inline size_t 214 typeSize( const char typestring[] ) { 215 216 char* ts1 = StringStripper( typestring ); 217 218 if ( cscompare( "integer", ts1 ) ) { 219 free (ts1); 220 return sizeof(int); 221 } else if ( cscompare( "double", ts1 ) ) { 222 free (ts1); 223 return sizeof( double ); 224 } else { 225 free (ts1); 226 fprintf(stderr,"unknown type : %s\n",ts1); 227 return 0; 228 } 229 } 230 231 int 232 readHeader( FILE* fileObject, 233 const char phrase[], 234 int* params, 235 int expect ) { 236 237 char* text_header; 238 char* token; 239 char Line[1024]; 240 char junk; 241 bool FOUND = false ; 242 int real_length; 243 int skip_size, integer_value; 244 int rewind_count=0; 245 246 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 247 rewind( fileObject ); 248 clearerr( fileObject ); 249 rewind_count++; 250 fgets( Line, 1024, fileObject ); 251 } 252 253 while( !FOUND && ( rewind_count < 2 ) ) { 254 if ( ( Line[0] != '\n' ) && ( real_length = strcspn( Line, "#" )) ){ 255 //text_header = new char [ real_length + 1 ]; 256 text_header = (char *)malloc( real_length + 1 ); 257 //text_header = (char *)malloc( real_length + 1 + pool_align ); 258 //mem_address = (long long )text_header; 259 //if( mem_address & (pool_align -1) ) 260 // text_header += pool_align - (mem_address & (pool_align -1)); 261 262 strncpy( text_header, Line, real_length ); 263 text_header[ real_length ] =static_cast<char>(NULL); 264 token = strtok ( text_header, ":" ); 265 //if( cscompare( phrase , token ) ) { 266 // Double comparison required because different fields can still start 267 // with the same name for mixed meshes (nbc code, nbc values, etc). 268 // Would be easy to fix cscompare instead but would it break sth else? 269 if( cscompare( phrase , token ) && cscompare( token , phrase ) ) { 270 FOUND = true ; 271 token = strtok( NULL, " ,;<>" ); 272 skip_size = atoi( token ); 273 int i; 274 for( i=0; i < expect && ( token = strtok( NULL," ,;<>") ); i++) { 275 params[i] = atoi( token ); 276 } 277 if ( i < expect ) { 278 fprintf(stderr,"Aloha Expected # of ints not found for: %s\n",phrase ); 279 } 280 } else if ( cscompare(token,"byteorder magic number") ) { 281 if ( binary_format ) { 282 fread((void*)&integer_value,sizeof(int),1,fileObject); 283 fread( &junk, sizeof(char), 1 , fileObject ); 284 if ( 362436 != integer_value ) Wrong_Endian = true; 285 } else{ 286 fscanf(fileObject, "%d\n", &integer_value ); 287 } 288 } else { 289 /* some other header, so just skip over */ 290 token = strtok( NULL, " ,;<>" ); 291 skip_size = atoi( token ); 292 if ( binary_format) 293 fseek( fileObject, skip_size, SEEK_CUR ); 294 else 295 for( int gama=0; gama < skip_size; gama++ ) 296 fgets( Line, 1024, fileObject ); 297 } 298 free (text_header); 299 } // end of if before while loop 300 301 if ( !FOUND ) 302 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 303 rewind( fileObject ); 304 clearerr( fileObject ); 305 rewind_count++; 306 fgets( Line, 1024, fileObject ); 307 } 308 } 309 310 if ( !FOUND ) { 311 //fprintf(stderr, "Error: Could not find: %s\n", phrase); 312 if(irank == 0) printf("WARNING: Could not find: %s\n", phrase); 313 return 1; 314 } 315 return 0; 316 } 317 318 } // end unnamed namespace 319 320 321 // begin of publicly visible functions 322 323 /** 324 * This function takes a long long pointer and assign (start) phiotmrc value to it 325 */ 326 //void startTimer(unsigned long long* start) { 327 void startTimer(double* start) { 328 329 if( !PRINT_PERF ) return; 330 331 MPI_Barrier(MPI_COMM_WORLD); 332 *start = phiotmrc(); 333 } 334 335 /** 336 * This function takes a long long pointer and assign (end) phiotmrc value to it 337 */ 338 void endTimer(double* end) { 339 340 if( !PRINT_PERF ) return; 341 342 *end = phiotmrc(); 343 MPI_Barrier(MPI_COMM_WORLD); 344 } 345 346 /** 347 * choose to print some performance results (or not) according to 348 * the PRINT_PERF macro 349 */ 350 void printPerf( 351 const char* func_name, 352 double start, 353 double end, 354 unsigned long long datasize, 355 int printdatainfo, 356 const char* extra_msg) { 357 358 if( !PRINT_PERF ) return; 359 360 unsigned long long data_size = datasize; 361 362 double time = end - start; 363 364 unsigned long long isizemin,isizemax,isizetot; 365 double sizemin,sizemax,sizeavg,sizetot,rate; 366 double tmin, tmax, tavg, ttot; 367 368 MPI_Allreduce(&time, &tmin,1, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD); 369 MPI_Allreduce(&time, &tmax,1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); 370 MPI_Allreduce(&time, &ttot,1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); 371 tavg = ttot/mysize; 372 373 if(irank == 0) { 374 if ( PhastaIONextActiveIndex == 0 ) printf("** 1PFPP "); 375 else printf("** syncIO "); 376 printf("%s(): Tmax = %f sec, Tmin = %f sec, Tavg = %f sec", func_name, tmax, tmin, tavg); 377 } 378 379 if(printdatainfo == 1) { // if printdatainfo ==1, compute I/O rate and block size 380 MPI_Allreduce(&data_size,&isizemin,1,MPI_LONG_LONG_INT,MPI_MIN,MPI_COMM_WORLD); 381 MPI_Allreduce(&data_size,&isizemax,1,MPI_LONG_LONG_INT,MPI_MAX,MPI_COMM_WORLD); 382 MPI_Allreduce(&data_size,&isizetot,1,MPI_LONG_LONG_INT,MPI_SUM,MPI_COMM_WORLD); 383 384 sizemin=(double)(isizemin*inv1024sq); 385 sizemax=(double)(isizemax*inv1024sq); 386 sizetot=(double)(isizetot*inv1024sq); 387 sizeavg=(double)(1.0*sizetot/mysize); 388 rate=(double)(1.0*sizetot/tmax); 389 390 if( irank == 0) { 391 printf(", Rate = %f MB/s [%s] \n \t\t\t block size: Min= %f MB; Max= %f MB; Avg= %f MB; Tot= %f MB\n", rate, extra_msg, sizemin,sizemax,sizeavg,sizetot); 392 } 393 } 394 else { 395 if(irank == 0) { 396 printf(" \n"); 397 //printf(" (%s) \n", extra_msg); 398 } 399 } 400 } 401 402 /** 403 * This function is normally called at the beginning of a read operation, before 404 * init function. 405 * This function (uses rank 0) reads out nfields, nppf, master header size, 406 * endianess and allocates for masterHeader string. 407 * These values are essential for following read operations. Rank 0 will bcast 408 * these values to other ranks in the commm world 409 * 410 * If the file set is of old POSIX format, it would throw error and exit 411 */ 412 void queryphmpiio(const char filename[],int *nfields, int *nppf) 413 { 414 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 415 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 416 417 if(irank == 0) { 418 FILE * fileHandle; 419 char* fname = StringStripper( filename ); 420 421 fileHandle = fopen (fname,"rb"); 422 if (fileHandle == NULL ) { 423 printf("\nError: File %s doesn't exist! Please check!\n",fname); 424 } 425 else { 426 SerialFile =(serial_file *)calloc( 1, sizeof( serial_file) ); 427 //SerialFile = (serial_file *)malloc( sizeof( serial_file ) + pool_align ); 428 //mem_address = (long long )SerialFile; 429 //if( mem_address & (pool_align -1) ) 430 // SerialFile += pool_align - (mem_address & (pool_align -1)); 431 432 //SerialFile->masterHeader = (char *)malloc(MasterHeaderSize); 433 //int meta_size_limit = 4*ONE_MEGABYTE; 434 //int meta_size_limit = (4+ MAX_FIELDS_NUMBER ) * MAX_FIELDS_NAME_LENGTH; 435 int meta_size_limit = VERSION_INFO_HEADER_SIZE; 436 SerialFile->masterHeader = (char *)malloc( meta_size_limit ); 437 //SerialFile->masterHeader = (char *)malloc( meta_size_limit + pool_align ); 438 //mem_address = (long long )SerialFile; 439 //if( mem_address & (pool_align -1) ) 440 // SerialFile->masterHeader += pool_align - (mem_address & (pool_align -1)); 441 442 fread(SerialFile->masterHeader, 1, meta_size_limit, fileHandle); 443 444 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 445 char version[MAX_FIELDS_NAME_LENGTH/4]; 446 int mhsize; 447 char * token; 448 int magic_number; 449 450 memcpy( read_out_tag, 451 SerialFile->masterHeader, 452 MAX_FIELDS_NAME_LENGTH-1 ); 453 454 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) { 455 // Test endianess ... 456 memcpy (&magic_number, 457 SerialFile->masterHeader + sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 458 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 459 460 if ( magic_number != ENDIAN_TEST_NUMBER ) { 461 printf("Endian is different!\n"); 462 // Will do swap later 463 } 464 465 // test version, old version, default masterheader size is 4M 466 // newer version masterheader size is read from first line 467 memcpy(version, 468 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/2, 469 MAX_FIELDS_NAME_LENGTH/4 - 1); //TODO: why -1? 470 471 if( cscompare ("version",version) ) { 472 // if there is "version" tag in the file, then it is newer format 473 // read master header size from here, otherwise use default 474 // Note: if version is "1", we know mhsize is at 3/4 place... 475 476 token = strtok(version, ":"); 477 token = strtok(NULL, " ,;<>" ); 478 int iversion = atoi(token); 479 480 if( iversion == 1) { 481 memcpy( &mhsize, 482 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/4*3 + sizeof("mhsize : ")-1, 483 sizeof(int)); 484 if ( magic_number != ENDIAN_TEST_NUMBER ) 485 SwapArrayByteOrder(&mhsize, sizeof(int), 1); 486 487 if( mhsize > DefaultMHSize ) { 488 //if actual headersize is larger than default, let's re-read 489 free(SerialFile->masterHeader); 490 SerialFile->masterHeader = (char *)malloc(mhsize); 491 fseek(fileHandle, 0, SEEK_SET); // reset the file stream position 492 fread(SerialFile->masterHeader,1,mhsize,fileHandle); 493 } 494 } 495 //TODO: check if this is a valid int?? 496 MasterHeaderSize = mhsize; 497 } 498 else { // else it's version 0's format w/o version tag, implicating MHSize=4M 499 MasterHeaderSize = DefaultMHSize; 500 } 501 502 memcpy( read_out_tag, 503 SerialFile->masterHeader+MAX_FIELDS_NAME_LENGTH+1, 504 MAX_FIELDS_NAME_LENGTH ); //TODO: why +1 505 506 // Read in # fields ... 507 token = strtok( read_out_tag, ":" ); 508 token = strtok( NULL," ,;<>" ); 509 *nfields = atoi( token ); 510 if ( *nfields > MAX_FIELDS_NUMBER) { 511 printf("Error queryphmpiio: nfields is larger than MAX_FIELDS_NUMBER!\n"); 512 } 513 SerialFile->nfields=*nfields; //TODO: sanity check of this int? 514 515 memcpy( read_out_tag, 516 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH * 2 517 + *nfields * MAX_FIELDS_NAME_LENGTH, 518 MAX_FIELDS_NAME_LENGTH); 519 520 token = strtok( read_out_tag, ":" ); 521 token = strtok( NULL," ,;<>" ); 522 *nppf = atoi( token ); 523 SerialFile->nppf=*nppf; //TODO: sanity check of int 524 } // end of if("MPI_IO_TAG") 525 else { 526 printf("Error queryphmpiio: The file you opened is not of syncIO new format, please check! read_out_tag = %s\n",read_out_tag); 527 exit(1); 528 } 529 fclose(fileHandle); 530 free(SerialFile->masterHeader); 531 free(SerialFile); 532 } //end of else 533 free(fname); 534 } 535 536 // Bcast value to every one 537 MPI_Bcast( nfields, 1, MPI_INT, 0, MPI_COMM_WORLD ); 538 MPI_Bcast( nppf, 1, MPI_INT, 0, MPI_COMM_WORLD ); 539 MPI_Bcast( &MasterHeaderSize, 1, MPI_INT, 0, MPI_COMM_WORLD ); 540 phprintf("Info queryphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 541 } 542 543 /** 544 * This function computes the right master header size (round to size of 2^n). 545 * This is only needed for file format version 1 in "write" mode. 546 */ 547 int computeMHSize(int nfields, int nppf, int version) { 548 int mhsize; 549 if(version == 1) { 550 //int meta_info_size = (2+nfields+1) * MAX_FIELDS_NAME_LENGTH; // 2 is MPI_IO_TAG and nFields, the others 1 is nppf 551 int meta_info_size = VERSION_INFO_HEADER_SIZE; 552 int actual_size = nfields * nppf * sizeof(long long) + meta_info_size; 553 //printf("actual_size = %d, offset table size = %d\n", actual_size, nfields * nppf * sizeof(long long)); 554 if (actual_size > DefaultMHSize) { 555 mhsize = (int) ceil( (double) actual_size/DefaultMHSize); // it's rounded to ceiling of this value 556 mhsize *= DefaultMHSize; 557 } 558 else { 559 mhsize = DefaultMHSize; 560 } 561 } 562 return mhsize; 563 } 564 565 /** 566 * Computes correct color of a rank according to number of files. 567 */ 568 extern "C" int computeColor( int myrank, int numprocs, int nfiles) { 569 int color = 570 (int)(myrank / (numprocs / nfiles)); 571 return color; 572 } 573 574 575 /** 576 * Check the file descriptor. 577 */ 578 void checkFileDescriptor(const char fctname[], 579 int* fileDescriptor ) { 580 if ( *fileDescriptor < 0 ) { 581 printf("Error: File descriptor = %d in %s\n",*fileDescriptor,fctname); 582 exit(1); 583 } 584 } 585 586 /** 587 * Initialize the file struct members and allocate space for file struct 588 * buffers. 589 * 590 * Note: this function is only called when we are using new format. Old POSIX 591 * format should skip this routine and call openfile() directly instead. 592 */ 593 int initphmpiio( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[]) 594 { 595 // we init irank again in case query not called (e.g. syncIO write case) 596 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 597 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 598 599 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 600 601 double timer_start, timer_end; 602 startTimer(&timer_start); 603 604 char* imode = StringStripper( mode ); 605 606 // Note: if it's read, we presume query was called prior to init and 607 // MasterHeaderSize is already set to correct value from parsing header 608 // otherwise it's write then it needs some computation to be set 609 if ( cscompare( "read", imode ) ) { 610 // do nothing 611 } 612 else if( cscompare( "write", imode ) ) { 613 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 614 } 615 else { 616 printf("Error initphmpiio: can't recognize the mode %s", imode); 617 exit(1); 618 } 619 free ( imode ); 620 621 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 622 623 int i, j; 624 625 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 626 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 627 endTimer(&timer_end); 628 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 629 return MAX_PHASTA_FILES_EXCEEDED; 630 } 631 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 632 // { 633 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 634 // { 635 // PhastaIOActiveFiles[i] = NULL; 636 // } 637 // } 638 639 640 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 641 //PhastaIOActiveFiles[PhastaIONextActiveIndex] = ( phastaio_file_t *)calloc( 1 + 1, sizeof( phastaio_file_t ) ); 642 //mem_address = (long long )PhastaIOActiveFiles[PhastaIONextActiveIndex]; 643 //if( mem_address & (pool_align -1) ) 644 // PhastaIOActiveFiles[PhastaIONextActiveIndex] += pool_align - (mem_address & (pool_align -1)); 645 646 i = PhastaIONextActiveIndex; 647 PhastaIONextActiveIndex++; 648 649 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 650 651 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 652 653 PhastaIOActiveFiles[i]->Wrong_Endian = false; 654 655 PhastaIOActiveFiles[i]->nFields = *nfields; 656 PhastaIOActiveFiles[i]->nPPF = *nppf; 657 PhastaIOActiveFiles[i]->nFiles = *nfiles; 658 MPI_Comm_rank(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->myrank)); 659 MPI_Comm_size(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->numprocs)); 660 661 662 if( *nfiles > 1 ) { // split the ranks according to each mpiio file 663 664 if ( s_assign_local_comm == 0) { // call mpi_comm_split for the first (and only) time 665 666 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Building subcommunicator\n"); 667 668 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 669 MPI_Comm_split(MPI_COMM_WORLD, 670 color, 671 PhastaIOActiveFiles[i]->myrank, 672 &(PhastaIOActiveFiles[i]->local_comm)); 673 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 674 &(PhastaIOActiveFiles[i]->local_numprocs)); 675 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 676 &(PhastaIOActiveFiles[i]->local_myrank)); 677 678 // back up now these variables so that we do not need to call comm_split again 679 s_local_comm = PhastaIOActiveFiles[i]->local_comm; 680 s_local_size = PhastaIOActiveFiles[i]->local_numprocs; 681 s_local_rank = PhastaIOActiveFiles[i]->local_myrank; 682 s_assign_local_comm = 1; 683 } 684 else { // recycle the subcommunicator 685 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Recycling subcommunicator\n"); 686 PhastaIOActiveFiles[i]->local_comm = s_local_comm; 687 PhastaIOActiveFiles[i]->local_numprocs = s_local_size; 688 PhastaIOActiveFiles[i]->local_myrank = s_local_rank; 689 } 690 } 691 else { // *nfiles == 1 here - no need to call mpi_comm_split here 692 693 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Bypassing subcommunicator\n"); 694 PhastaIOActiveFiles[i]->local_comm = MPI_COMM_WORLD; 695 PhastaIOActiveFiles[i]->local_numprocs = PhastaIOActiveFiles[i]->numprocs; 696 PhastaIOActiveFiles[i]->local_myrank = PhastaIOActiveFiles[i]->myrank; 697 698 } 699 700 PhastaIOActiveFiles[i]->nppp = 701 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 702 703 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 704 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 705 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 706 707 PhastaIOActiveFiles[i]->my_offset_table = 708 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 709 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof(unsigned long long *) ); 710 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table; 711 //if( mem_address & (pool_align -1) ) 712 // PhastaIOActiveFiles[i]->my_offset_table += pool_align - (mem_address & (pool_align -1)); 713 714 PhastaIOActiveFiles[i]->my_read_table = 715 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 716 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof( unsigned long long *) ); 717 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table; 718 //if( mem_address & (pool_align -1) ) 719 // PhastaIOActiveFiles[i]->my_read_table += pool_align - (mem_address & (pool_align -1)); 720 721 for (j=0; j<*nfields; j++) 722 { 723 PhastaIOActiveFiles[i]->my_offset_table[j] = 724 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 725 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp + pool_align, sizeof( unsigned long long) ); 726 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table[j]; 727 //if( mem_address & (pool_align -1) ) 728 // PhastaIOActiveFiles[i]->my_offset_table[j] += pool_align - (mem_address & (pool_align -1)); 729 730 PhastaIOActiveFiles[i]->my_read_table[j] = 731 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 732 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) + pool_align ); 733 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table[j]; 734 //if( mem_address & (pool_align -1) ) 735 // PhastaIOActiveFiles[i]->my_read_table[j] += pool_align - (mem_address & (pool_align -1)); 736 } 737 *filehandle = i; 738 739 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 740 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 741 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 742 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 743 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 744 745 /* 746 PhastaIOActiveFiles[i]->master_header = 747 ( char * ) calloc( MasterHeaderSize + pool_align, sizeof( char ) ); 748 mem_address = (long long )PhastaIOActiveFiles[i]->master_header; 749 if( mem_address & (pool_align -1) ) 750 PhastaIOActiveFiles[i]->master_header += pool_align - (mem_address & (pool_align -1)); 751 752 PhastaIOActiveFiles[i]->double_chunk = 753 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 754 mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 755 if( mem_address & (pool_align -1) ) 756 PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 757 758 PhastaIOActiveFiles[i]->int_chunk = 759 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 760 mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 761 if( mem_address & (pool_align -1) ) 762 PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & (pool_align -1)); 763 764 PhastaIOActiveFiles[i]->read_double_chunk = 765 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 766 mem_address = (long long )PhastaIOActiveFiles[i]->read_double_chunk; 767 if( mem_address & (pool_align -1) ) 768 PhastaIOActiveFiles[i]->read_double_chunk += pool_align - (mem_address & (pool_align -1)); 769 770 PhastaIOActiveFiles[i]->read_int_chunk = 771 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 772 mem_address = (long long )PhastaIOActiveFiles[i]->read_int_chunk; 773 if( mem_address & (pool_align -1) ) 774 PhastaIOActiveFiles[i]->read_int_chunk += pool_align - (mem_address & (pool_align -1)); 775 */ 776 777 // Time monitoring 778 endTimer(&timer_end); 779 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 780 781 phprintf_0("Info initphmpiio: quiting function"); 782 783 return i; 784 } 785 786 /** 787 * Destruct the file struct and free buffers allocated in init function. 788 */ 789 void finalizephmpiio( int *fileDescriptor ) 790 { 791 double timer_start, timer_end; 792 startTimer(&timer_start); 793 794 int i, j; 795 i = *fileDescriptor; 796 //PhastaIONextActiveIndex--; 797 798 /* //free the offset table for this phasta file */ 799 //for(j=0; j<MAX_FIELDS_NUMBER; j++) //Danger: undefined behavior for my_*_table.[j] not allocated or not initialized to NULL 800 for(j=0; j<PhastaIOActiveFiles[i]->nFields; j++) 801 { 802 free( PhastaIOActiveFiles[i]->my_offset_table[j]); 803 free( PhastaIOActiveFiles[i]->my_read_table[j]); 804 } 805 free ( PhastaIOActiveFiles[i]->my_offset_table ); 806 free ( PhastaIOActiveFiles[i]->my_read_table ); 807 free ( PhastaIOActiveFiles[i]->master_header ); 808 free ( PhastaIOActiveFiles[i]->double_chunk ); 809 free ( PhastaIOActiveFiles[i]->int_chunk ); 810 free ( PhastaIOActiveFiles[i]->read_double_chunk ); 811 free ( PhastaIOActiveFiles[i]->read_int_chunk ); 812 813 if( PhastaIOActiveFiles[i]->nFiles > 1 && s_assign_local_comm ) { // the comm was split 814 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Freeing subcommunicator\n"); 815 s_assign_local_comm = 0; 816 MPI_Comm_free(&(PhastaIOActiveFiles[i]->local_comm)); 817 } 818 819 free( PhastaIOActiveFiles[i]); 820 821 endTimer(&timer_end); 822 printPerf("finalizempiio", timer_start, timer_end, 0, 0, ""); 823 824 PhastaIONextActiveIndex--; 825 } 826 827 828 /** 829 * Special init for M2N in order to create a subcommunicator for the reduced solution (requires PRINT_PERF to be false for now) 830 * Initialize the file struct members and allocate space for file struct buffers. 831 * 832 * Note: this function is only called when we are using new format. Old POSIX 833 * format should skip this routine and call openfile() directly instead. 834 */ 835 int initphmpiiosub( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[],MPI_Comm my_local_comm) 836 { 837 // we init irank again in case query not called (e.g. syncIO write case) 838 839 MPI_Comm_rank(my_local_comm, &irank); 840 MPI_Comm_size(my_local_comm, &mysize); 841 842 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 843 844 double timer_start, timer_end; 845 startTimer(&timer_start); 846 847 char* imode = StringStripper( mode ); 848 849 // Note: if it's read, we presume query was called prior to init and 850 // MasterHeaderSize is already set to correct value from parsing header 851 // otherwise it's write then it needs some computation to be set 852 if ( cscompare( "read", imode ) ) { 853 // do nothing 854 } 855 else if( cscompare( "write", imode ) ) { 856 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 857 } 858 else { 859 printf("Error initphmpiio: can't recognize the mode %s", imode); 860 exit(1); 861 } 862 free ( imode ); 863 864 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d", irank, MasterHeaderSize); 865 866 int i, j; 867 868 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 869 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 870 endTimer(&timer_end); 871 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 872 return MAX_PHASTA_FILES_EXCEEDED; 873 } 874 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 875 // { 876 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 877 // { 878 // PhastaIOActiveFiles[i] = NULL; 879 // } 880 // } 881 882 883 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 884 //PhastaIOActiveFiles[PhastaIONextActiveIndex] = ( phastaio_file_t *)calloc( 1 + 1, sizeof( phastaio_file_t ) ); 885 //mem_address = (long long )PhastaIOActiveFiles[PhastaIONextActiveIndex]; 886 //if( mem_address & (pool_align -1) ) 887 // PhastaIOActiveFiles[PhastaIONextActiveIndex] += pool_align - (mem_address & (pool_align -1)); 888 889 i = PhastaIONextActiveIndex; 890 PhastaIONextActiveIndex++; 891 892 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 893 894 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 895 896 PhastaIOActiveFiles[i]->Wrong_Endian = false; 897 898 PhastaIOActiveFiles[i]->nFields = *nfields; 899 PhastaIOActiveFiles[i]->nPPF = *nppf; 900 PhastaIOActiveFiles[i]->nFiles = *nfiles; 901 MPI_Comm_rank(my_local_comm, &(PhastaIOActiveFiles[i]->myrank)); 902 MPI_Comm_size(my_local_comm, &(PhastaIOActiveFiles[i]->numprocs)); 903 904 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 905 MPI_Comm_split(my_local_comm, 906 color, 907 PhastaIOActiveFiles[i]->myrank, 908 &(PhastaIOActiveFiles[i]->local_comm)); 909 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 910 &(PhastaIOActiveFiles[i]->local_numprocs)); 911 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 912 &(PhastaIOActiveFiles[i]->local_myrank)); 913 PhastaIOActiveFiles[i]->nppp = 914 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 915 916 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 917 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 918 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 919 920 PhastaIOActiveFiles[i]->my_offset_table = 921 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 922 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof(unsigned long long *) ); 923 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table; 924 //if( mem_address & (pool_align -1) ) 925 // PhastaIOActiveFiles[i]->my_offset_table += pool_align - (mem_address & (pool_align -1)); 926 927 PhastaIOActiveFiles[i]->my_read_table = 928 ( unsigned long long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long long *) ); 929 //( unsigned long long **)calloc( MAX_FIELDS_NUMBER + pool_align, sizeof( unsigned long long *) ); 930 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table; 931 //if( mem_address & (pool_align -1) ) 932 // PhastaIOActiveFiles[i]->my_read_table += pool_align - (mem_address & (pool_align -1)); 933 934 for (j=0; j<*nfields; j++) 935 { 936 PhastaIOActiveFiles[i]->my_offset_table[j] = 937 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 938 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp + pool_align, sizeof( unsigned long long) ); 939 //mem_address = (long long )PhastaIOActiveFiles[i]->my_offset_table[j]; 940 //if( mem_address & (pool_align -1) ) 941 // PhastaIOActiveFiles[i]->my_offset_table[j] += pool_align - (mem_address & (pool_align -1)); 942 943 PhastaIOActiveFiles[i]->my_read_table[j] = 944 ( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) ); 945 //( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long long) + pool_align ); 946 //mem_address = (long long )PhastaIOActiveFiles[i]->my_read_table[j]; 947 //if( mem_address & (pool_align -1) ) 948 // PhastaIOActiveFiles[i]->my_read_table[j] += pool_align - (mem_address & (pool_align -1)); 949 } 950 *filehandle = i; 951 952 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 953 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 954 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 955 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 956 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 957 958 /* 959 PhastaIOActiveFiles[i]->master_header = 960 ( char * ) calloc( MasterHeaderSize + pool_align, sizeof( char ) ); 961 mem_address = (long long )PhastaIOActiveFiles[i]->master_header; 962 if( mem_address & (pool_align -1) ) 963 PhastaIOActiveFiles[i]->master_header += pool_align - (mem_address & (pool_align -1)); 964 965 PhastaIOActiveFiles[i]->double_chunk = 966 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 967 mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 968 if( mem_address & (pool_align -1) ) 969 PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 970 971 PhastaIOActiveFiles[i]->int_chunk = 972 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 973 mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 974 if( mem_address & (pool_align -1) ) 975 PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & (pool_align -1)); 976 977 PhastaIOActiveFiles[i]->read_double_chunk = 978 ( double * ) calloc( 1 + pool_align , sizeof( double ) ); 979 mem_address = (long long )PhastaIOActiveFiles[i]->read_double_chunk; 980 if( mem_address & (pool_align -1) ) 981 PhastaIOActiveFiles[i]->read_double_chunk += pool_align - (mem_address & (pool_align -1)); 982 983 PhastaIOActiveFiles[i]->read_int_chunk = 984 ( int * ) calloc( 1 + pool_align , sizeof( int ) ); 985 mem_address = (long long )PhastaIOActiveFiles[i]->read_int_chunk; 986 if( mem_address & (pool_align -1) ) 987 PhastaIOActiveFiles[i]->read_int_chunk += pool_align - (mem_address & (pool_align -1)); 988 */ 989 990 // Time monitoring 991 endTimer(&timer_end); 992 printPerf("initphmpiiosub", timer_start, timer_end, 0, 0, ""); 993 994 phprintf_0("Info initphmpiiosub: quiting function"); 995 996 return i; 997 } 998 999 1000 1001 /** open file for both POSIX and MPI-IO syncIO format. 1002 * 1003 * If it's old POSIX format, simply call posix fopen(). 1004 * 1005 * If it's MPI-IO foramt: 1006 * in "read" mode, it builds the header table that points to the offset of 1007 * fields for parts; 1008 * in "write" mode, it opens the file with MPI-IO open routine. 1009 */ 1010 void openfile(const char filename[], 1011 const char mode[], 1012 int* fileDescriptor ) 1013 { 1014 phprintf_0("Info: entering openfile"); 1015 1016 double timer_start, timer_end; 1017 startTimer(&timer_start); 1018 1019 if ( PhastaIONextActiveIndex == 0 ) 1020 { 1021 FILE* file=NULL ; 1022 *fileDescriptor = 0; 1023 char* fname = StringStripper( filename ); 1024 char* imode = StringStripper( mode ); 1025 1026 if ( cscompare( "read", imode ) ) file = fopen(fname, "rb" ); 1027 else if( cscompare( "write", imode ) ) file = fopen(fname, "wb" ); 1028 else if( cscompare( "append", imode ) ) file = fopen(fname, "ab" ); 1029 1030 if ( !file ){ 1031 fprintf(stderr,"Error openfile: unable to open file %s",fname ) ; 1032 } else { 1033 fileArray.push_back( file ); 1034 byte_order.push_back( false ); 1035 header_type.push_back( sizeof(int) ); 1036 *fileDescriptor = fileArray.size(); 1037 } 1038 free (fname); 1039 free (imode); 1040 } 1041 else // else it would be parallel I/O, opposed to posix io 1042 { 1043 char* fname = StringStripper( filename ); 1044 char* imode = StringStripper( mode ); 1045 int rc; 1046 int i = *fileDescriptor; 1047 checkFileDescriptor("openfile",&i); 1048 char* token; 1049 1050 if ( cscompare( "read", imode ) ) 1051 { 1052 // if (PhastaIOActiveFiles[i]->myrank == 0) 1053 // printf("\n **********\nRead open ... ... regular version\n"); 1054 1055 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1056 fname, 1057 MPI_MODE_RDONLY, 1058 MPI_INFO_NULL, 1059 &(PhastaIOActiveFiles[i]->file_handle) ); 1060 1061 if(rc) 1062 { 1063 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1064 printf("Error openfile: Unable to open file %s! File descriptor = %d\n",fname,*fileDescriptor); 1065 endTimer(&timer_end); 1066 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1067 return; 1068 } 1069 1070 MPI_Status read_tag_status; 1071 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1072 int j; 1073 int magic_number; 1074 1075 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1076 MPI_File_read_at( PhastaIOActiveFiles[i]->file_handle, 1077 0, 1078 PhastaIOActiveFiles[i]->master_header, 1079 MasterHeaderSize, 1080 MPI_CHAR, 1081 &read_tag_status ); 1082 } 1083 1084 MPI_Bcast( PhastaIOActiveFiles[i]->master_header, 1085 MasterHeaderSize, 1086 MPI_CHAR, 1087 0, 1088 PhastaIOActiveFiles[i]->local_comm ); 1089 1090 memcpy( read_out_tag, 1091 PhastaIOActiveFiles[i]->master_header, 1092 MAX_FIELDS_NAME_LENGTH-1 ); 1093 1094 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) 1095 { 1096 // Test endianess ... 1097 memcpy ( &magic_number, 1098 PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1099 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 1100 1101 if ( magic_number != ENDIAN_TEST_NUMBER ) 1102 { 1103 PhastaIOActiveFiles[i]->Wrong_Endian = true; 1104 } 1105 1106 memcpy( read_out_tag, 1107 PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH+1, // TODO: WHY +1??? 1108 MAX_FIELDS_NAME_LENGTH ); 1109 1110 // Read in # fields ... 1111 token = strtok ( read_out_tag, ":" ); 1112 token = strtok( NULL," ,;<>" ); 1113 PhastaIOActiveFiles[i]->nFields = atoi( token ); 1114 1115 unsigned long long **header_table; 1116 header_table = ( unsigned long long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long long *)); 1117 //header_table = ( unsigned long long ** ) calloc( PhastaIOActiveFiles[i]->nFields + pool_align, sizeof(unsigned long long *)); 1118 //mem_address = (long long )header_table; 1119 //if( mem_address & (pool_align -1) ) 1120 // header_table += pool_align - (mem_address & (pool_align -1)); 1121 1122 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1123 { 1124 header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long long)); 1125 //header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF + pool_align, sizeof(unsigned long long *)); 1126 //mem_address = (long long )header_table[j]; 1127 //if( mem_address & (pool_align -1) ) 1128 // header_table[j] += pool_align - (mem_address & (pool_align -1)); 1129 } 1130 1131 // Read in the offset table ... 1132 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1133 { 1134 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1135 memcpy( header_table[j], 1136 PhastaIOActiveFiles[i]->master_header + 1137 VERSION_INFO_HEADER_SIZE + 1138 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long), 1139 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long) ); 1140 } 1141 1142 MPI_Scatter( header_table[j], 1143 PhastaIOActiveFiles[i]->nppp, 1144 MPI_LONG_LONG_INT, 1145 PhastaIOActiveFiles[i]->my_read_table[j], 1146 PhastaIOActiveFiles[i]->nppp, 1147 MPI_LONG_LONG_INT, 1148 0, 1149 PhastaIOActiveFiles[i]->local_comm ); 1150 1151 // Swap byte order if endianess is different ... 1152 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) { 1153 SwapArrayByteOrder( PhastaIOActiveFiles[i]->my_read_table[j], 1154 sizeof(long long int), 1155 PhastaIOActiveFiles[i]->nppp ); 1156 } 1157 } 1158 1159 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1160 free ( header_table[j] ); 1161 } 1162 free (header_table); 1163 1164 } // end of if MPI_IO_TAG 1165 else //else not valid MPI file 1166 { 1167 *fileDescriptor = NOT_A_MPI_FILE; 1168 printf("Error openfile: The file %s you opened is not in syncIO new format, please check again! File descriptor = %d, MasterHeaderSize = %d, read_out_tag = %s\n",fname,*fileDescriptor,MasterHeaderSize,read_out_tag); 1169 //Printing MasterHeaderSize is useful to test a compiler bug on Intrepid BGP 1170 endTimer(&timer_end); 1171 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1172 return; 1173 } 1174 } // end of if "read" 1175 else if( cscompare( "write", imode ) ) 1176 { 1177 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1178 fname, 1179 MPI_MODE_WRONLY | MPI_MODE_CREATE, 1180 MPI_INFO_NULL, 1181 &(PhastaIOActiveFiles[i]->file_handle) ); 1182 if(rc) 1183 { 1184 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1185 return; 1186 } 1187 } // end of if "write" 1188 free (fname); 1189 free (imode); 1190 } // end of if FileIndex != 0 1191 1192 endTimer(&timer_end); 1193 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1194 } 1195 1196 /** close file for both POSIX and MPI-IO syncIO format. 1197 * 1198 * If it's old POSIX format, simply call posix fclose(). 1199 * 1200 * If it's MPI-IO foramt: 1201 * in "read" mode, it simply close file with MPI-IO close routine. 1202 * in "write" mode, rank 0 in each group will re-assemble the master header and 1203 * offset table and write to the beginning of file, then close the file. 1204 */ 1205 void closefile( int* fileDescriptor, 1206 const char mode[] ) 1207 { 1208 double timer_start, timer_end; 1209 startTimer(&timer_start); 1210 1211 int i = *fileDescriptor; 1212 checkFileDescriptor("closefile",&i); 1213 1214 if ( PhastaIONextActiveIndex == 0 ) { 1215 char* imode = StringStripper( mode ); 1216 1217 if( cscompare( "write", imode ) 1218 || cscompare( "append", imode ) ) { 1219 fflush( fileArray[ *fileDescriptor - 1 ] ); 1220 } 1221 1222 fclose( fileArray[ *fileDescriptor - 1 ] ); 1223 free (imode); 1224 } 1225 else { 1226 char* imode = StringStripper( mode ); 1227 1228 //write master header here: 1229 if ( cscompare( "write", imode ) ) { 1230 // if ( PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields < 2*ONE_MEGABYTE/8 ) //SHOULD BE CHECKED 1231 // MasterHeaderSize = 4*ONE_MEGABYTE; 1232 // else 1233 // MasterHeaderSize = 4*ONE_MEGABYTE + PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields * 8 - 2*ONE_MEGABYTE; 1234 1235 MasterHeaderSize = computeMHSize( PhastaIOActiveFiles[i]->nFields, PhastaIOActiveFiles[i]->nPPF, LATEST_WRITE_VERSION); 1236 phprintf_0("Info closefile: myrank = %d, MasterHeaderSize = %d\n", PhastaIOActiveFiles[i]->myrank, MasterHeaderSize); 1237 1238 MPI_Status write_header_status; 1239 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1240 char version[MAX_FIELDS_NAME_LENGTH/4]; 1241 char mhsize[MAX_FIELDS_NAME_LENGTH/4]; 1242 int magic_number = ENDIAN_TEST_NUMBER; 1243 1244 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) 1245 { 1246 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1247 sprintf(mpi_tag, "MPI_IO_Tag : "); 1248 memcpy(PhastaIOActiveFiles[i]->master_header, 1249 mpi_tag, 1250 MAX_FIELDS_NAME_LENGTH); 1251 1252 bzero((void*)version,MAX_FIELDS_NAME_LENGTH/4); 1253 // this version is "1", print version in ASCII 1254 sprintf(version, "version : %d",1); 1255 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/2, 1256 version, 1257 MAX_FIELDS_NAME_LENGTH/4); 1258 1259 // master header size is computed using the formula above 1260 bzero((void*)mhsize,MAX_FIELDS_NAME_LENGTH/4); 1261 sprintf(mhsize, "mhsize : "); 1262 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/4*3, 1263 mhsize, 1264 MAX_FIELDS_NAME_LENGTH/4); 1265 1266 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1267 sprintf(mpi_tag, 1268 "\nnFields : %d\n", 1269 PhastaIOActiveFiles[i]->nFields); 1270 memcpy(PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH, 1271 mpi_tag, 1272 MAX_FIELDS_NAME_LENGTH); 1273 1274 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1275 sprintf(mpi_tag, "\nnPPF : %d\n", PhastaIOActiveFiles[i]->nPPF); 1276 memcpy( PhastaIOActiveFiles[i]->master_header+ 1277 PhastaIOActiveFiles[i]->nFields * 1278 MAX_FIELDS_NAME_LENGTH + 1279 MAX_FIELDS_NAME_LENGTH * 2, 1280 mpi_tag, 1281 MAX_FIELDS_NAME_LENGTH); 1282 1283 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1284 &magic_number, 1285 sizeof(int)); 1286 1287 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("mhsize : ") -1 + MAX_FIELDS_NAME_LENGTH/4*3, 1288 &MasterHeaderSize, 1289 sizeof(int)); 1290 } 1291 1292 int j = 0; 1293 unsigned long long **header_table; 1294 header_table = ( unsigned long long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long long *)); 1295 //header_table = ( unsigned long long ** ) calloc( PhastaIOActiveFiles[i]->nFields + pool_align, sizeof(unsigned long long *)); 1296 //mem_address = (long long )header_table; 1297 //if( mem_address & (pool_align -1) ) 1298 // header_table += pool_align - (mem_address & (pool_align -1)); 1299 1300 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1301 header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long long)); 1302 //header_table[j]=( unsigned long long * ) calloc( PhastaIOActiveFiles[i]->nPPF + pool_align, sizeof (unsigned long long *)); 1303 //mem_address = (long long )header_table[j]; 1304 //if( mem_address & (pool_align -1) ) 1305 // header_table[j] += pool_align - (mem_address & (pool_align - 1)); 1306 } 1307 1308 //if( irank == 0 ) printf("gonna mpi_gather, myrank = %d\n", irank); 1309 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1310 MPI_Gather( PhastaIOActiveFiles[i]->my_offset_table[j], 1311 PhastaIOActiveFiles[i]->nppp, 1312 MPI_LONG_LONG_INT, 1313 header_table[j], 1314 PhastaIOActiveFiles[i]->nppp, 1315 MPI_LONG_LONG_INT, 1316 0, 1317 PhastaIOActiveFiles[i]->local_comm ); 1318 } 1319 1320 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1321 1322 //if( irank == 0 ) printf("gonna memcpy for every procs, myrank = %d\n", irank); 1323 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1324 memcpy ( PhastaIOActiveFiles[i]->master_header + 1325 VERSION_INFO_HEADER_SIZE + 1326 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long), 1327 header_table[j], 1328 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long long) ); 1329 } 1330 1331 //if( irank == 0 ) printf("gonna file_write_at(), myrank = %d\n", irank); 1332 MPI_File_write_at( PhastaIOActiveFiles[i]->file_handle, 1333 0, 1334 PhastaIOActiveFiles[i]->master_header, 1335 MasterHeaderSize, 1336 MPI_CHAR, 1337 &write_header_status ); 1338 } 1339 1340 ////free(PhastaIOActiveFiles[i]->master_header); 1341 1342 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1343 free ( header_table[j] ); 1344 } 1345 free (header_table); 1346 } 1347 1348 //if( irank == 0 ) printf("gonna file_close(), myrank = %d\n", irank); 1349 MPI_File_close( &( PhastaIOActiveFiles[i]->file_handle ) ); 1350 free ( imode ); 1351 } 1352 1353 endTimer(&timer_end); 1354 printPerf("closefile_", timer_start, timer_end, 0, 0, ""); 1355 } 1356 1357 int readHeader( FILE* f, const char phrase[], 1358 int* params, int numParams, const char* iotype) { 1359 isBinary(iotype); 1360 return readHeader(f,phrase,params,numParams); 1361 } 1362 1363 void readheader( int* fileDescriptor, 1364 const char keyphrase[], 1365 void* valueArray, 1366 int* nItems, 1367 const char datatype[], 1368 const char iotype[] ) 1369 { 1370 double timer_start, timer_end; 1371 1372 startTimer(&timer_start); 1373 1374 int i = *fileDescriptor; 1375 checkFileDescriptor("readheader",&i); 1376 1377 if ( PhastaIONextActiveIndex == 0 ) { 1378 int filePtr = *fileDescriptor - 1; 1379 FILE* fileObject; 1380 int* valueListInt; 1381 1382 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1383 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1384 fprintf(stderr,"openfile_ function has to be called before \n") ; 1385 fprintf(stderr,"acessing the file\n ") ; 1386 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1387 endTimer(&timer_end); 1388 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1389 return; 1390 } 1391 1392 LastHeaderKey[filePtr] = std::string(keyphrase); 1393 LastHeaderNotFound = false; 1394 1395 fileObject = fileArray[ filePtr ] ; 1396 Wrong_Endian = byte_order[ filePtr ]; 1397 1398 isBinary( iotype ); 1399 typeSize( datatype ); //redundant call, just avoid a compiler warning. 1400 1401 // right now we are making the assumption that we will only write integers 1402 // on the header line. 1403 1404 valueListInt = static_cast< int* >( valueArray ); 1405 int ierr = readHeader( fileObject , 1406 keyphrase, 1407 valueListInt, 1408 *nItems ) ; 1409 1410 byte_order[ filePtr ] = Wrong_Endian ; 1411 1412 if ( ierr ) LastHeaderNotFound = true; 1413 1414 //return ; // don't return, go to the end to print perf 1415 } 1416 else { 1417 unsigned int skip_size; 1418 int* valueListInt; 1419 valueListInt = static_cast <int*>(valueArray); 1420 char* token = NULL; 1421 bool FOUND = false ; 1422 isBinary( iotype ); 1423 1424 MPI_Status read_offset_status; 1425 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1426 memset(read_out_tag, '\0', MAX_FIELDS_NAME_LENGTH); 1427 char readouttag[MAX_FIELDS_NUMBER][MAX_FIELDS_NAME_LENGTH]; 1428 int j; 1429 1430 int string_length = strlen( keyphrase ); 1431 char* buffer = (char*) malloc ( string_length+1 ); 1432 //char* buffer = ( char * ) malloc( string_length + 1 + pool_align ); 1433 //mem_address = (long long )buffer; 1434 //if( mem_address & (pool_align -1) ) 1435 // buffer += pool_align - (mem_address & (pool_align -1)); 1436 1437 strcpy ( buffer, keyphrase ); 1438 buffer[ string_length ] = '\0'; 1439 1440 char* st2 = strtok ( buffer, "@" ); 1441 st2 = strtok (NULL, "@"); 1442 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1443 if ( char* p = strpbrk(buffer, "@") ) 1444 *p = '\0'; 1445 1446 // Check if the user has input the right GPid 1447 if ( ( PhastaIOActiveFiles[i]->GPid <= 1448 PhastaIOActiveFiles[i]->myrank * 1449 PhastaIOActiveFiles[i]->nppp )|| 1450 ( PhastaIOActiveFiles[i]->GPid > 1451 ( PhastaIOActiveFiles[i]->myrank + 1 ) * 1452 PhastaIOActiveFiles[i]->nppp ) ) 1453 { 1454 *fileDescriptor = NOT_A_MPI_FILE; 1455 printf("Error readheader: The file is not in syncIO new format, please check! myrank = %d, GPid = %d, nppp = %d, keyphrase = %s\n", PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->GPid, PhastaIOActiveFiles[i]->nppp, keyphrase); 1456 // It is possible atoi could not generate a clear integer from st2 because of additional garbage character in keyphrase 1457 endTimer(&timer_end); 1458 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1459 return; 1460 } 1461 1462 // Find the field we want ... 1463 //for ( j = 0; j<MAX_FIELDS_NUMBER; j++ ) 1464 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1465 { 1466 memcpy( readouttag[j], 1467 PhastaIOActiveFiles[i]->master_header + j*MAX_FIELDS_NAME_LENGTH+MAX_FIELDS_NAME_LENGTH*2+1, 1468 MAX_FIELDS_NAME_LENGTH-1 ); 1469 } 1470 1471 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1472 { 1473 token = strtok ( readouttag[j], ":" ); 1474 1475 //if ( cscompare( buffer, token ) ) 1476 if ( cscompare( token , buffer ) && cscompare( buffer, token ) ) 1477 // This double comparison is required for the field "number of nodes" and all the other fields that start with "number of nodes" (i.g. number of nodes in the mesh"). 1478 // Would be safer to rename "number of nodes" by "number of nodes in the part" so that the name are completely unique. But much more work to do that (Nspre, phParAdapt, etc). 1479 // Since the field name are unique in SyncIO (as it includes part ID), this should be safe and there should be no issue with the "?" trailing character. 1480 { 1481 PhastaIOActiveFiles[i]->read_field_count = j; 1482 FOUND = true; 1483 //printf("buffer: %s | token: %s | j: %d\n",buffer,token,j); 1484 break; 1485 } 1486 } 1487 free(buffer); 1488 1489 if (!FOUND) 1490 { 1491 //if(irank==0) printf("Warning readheader: Not found %s \n",keyphrase); //PhastaIOActiveFiles[i]->myrank is certainly initialized here. 1492 if(PhastaIOActiveFiles[i]->myrank == 0) printf("WARNING readheader: Not found %s\n",keyphrase); 1493 endTimer(&timer_end); 1494 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1495 return; 1496 } 1497 1498 // Find the part we want ... 1499 PhastaIOActiveFiles[i]->read_part_count = PhastaIOActiveFiles[i]->GPid - 1500 PhastaIOActiveFiles[i]->myrank * PhastaIOActiveFiles[i]->nppp - 1; 1501 1502 PhastaIOActiveFiles[i]->my_offset = 1503 PhastaIOActiveFiles[i]->my_read_table[PhastaIOActiveFiles[i]->read_field_count][PhastaIOActiveFiles[i]->read_part_count]; 1504 1505 // printf("****Rank %d offset is %d\n",PhastaIOActiveFiles[i]->myrank,PhastaIOActiveFiles[i]->my_offset); 1506 1507 // Read each datablock header here ... 1508 1509 MPI_File_read_at_all( PhastaIOActiveFiles[i]->file_handle, 1510 PhastaIOActiveFiles[i]->my_offset+1, 1511 read_out_tag, 1512 MAX_FIELDS_NAME_LENGTH-1, 1513 MPI_CHAR, 1514 &read_offset_status ); 1515 token = strtok ( read_out_tag, ":" ); 1516 1517 // printf("&&&&Rank %d read_out_tag is %s\n",PhastaIOActiveFiles[i]->myrank,read_out_tag); 1518 1519 if( cscompare( keyphrase , token ) ) //No need to compare also token with keyphrase like above. We should already have the right one. Otherwise there is a problem. 1520 { 1521 FOUND = true ; 1522 token = strtok( NULL, " ,;<>" ); 1523 skip_size = atoi( token ); 1524 for( j=0; j < *nItems && ( token = strtok( NULL," ,;<>") ); j++ ) 1525 valueListInt[j] = atoi( token ); 1526 1527 if ( j < *nItems ) 1528 { 1529 fprintf( stderr, "Expected # of ints not found for: %s\n", keyphrase ); 1530 } 1531 } 1532 else { 1533 //if(irank==0) 1534 if(PhastaIOActiveFiles[i]->myrank == 0) 1535 // If we enter this if, there is a problem with the name of some fields 1536 { 1537 printf("Error readheader: Unexpected mismatch between keyphrase = %s and token = %s\n",keyphrase,token); 1538 } 1539 } 1540 } 1541 1542 endTimer(&timer_end); 1543 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1544 1545 } 1546 1547 void readDataBlock( 1548 FILE* fileObject, 1549 void* valueArray, 1550 int nItems, 1551 const char datatype[], 1552 const char iotype[] ) 1553 { 1554 isBinary(iotype); 1555 size_t type_size = typeSize( datatype ); 1556 if ( binary_format ) { 1557 char junk = '\0'; 1558 fread( valueArray, type_size, nItems, fileObject ); 1559 fread( &junk, sizeof(char), 1 , fileObject ); 1560 if ( Wrong_Endian ) SwapArrayByteOrder( valueArray, type_size, nItems ); 1561 } else { 1562 char* ts1 = StringStripper( datatype ); 1563 if ( cscompare( "integer", ts1 ) ) { 1564 for( int n=0; n < nItems ; n++ ) 1565 fscanf(fileObject, "%d\n",(int*)((int*)valueArray+n) ); 1566 } else if ( cscompare( "double", ts1 ) ) { 1567 for( int n=0; n < nItems ; n++ ) 1568 fscanf(fileObject, "%lf\n",(double*)((double*)valueArray+n) ); 1569 } 1570 free (ts1); 1571 } 1572 } 1573 1574 void readdatablock( int* fileDescriptor, 1575 const char keyphrase[], 1576 void* valueArray, 1577 int* nItems, 1578 const char datatype[], 1579 const char iotype[] ) 1580 { 1581 //if(irank == 0) printf("entering readdatablock()\n"); 1582 unsigned long long data_size = 0; 1583 double timer_start, timer_end; 1584 startTimer(&timer_start); 1585 1586 int i = *fileDescriptor; 1587 checkFileDescriptor("readdatablock",&i); 1588 1589 if ( PhastaIONextActiveIndex == 0 ) { 1590 int filePtr = *fileDescriptor - 1; 1591 FILE* fileObject; 1592 char junk; 1593 1594 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1595 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1596 fprintf(stderr,"openfile_ function has to be called before\n") ; 1597 fprintf(stderr,"acessing the file\n ") ; 1598 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1599 endTimer(&timer_end); 1600 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1601 return; 1602 } 1603 1604 // error check.. 1605 // since we require that a consistant header always preceed the data block 1606 // let us check to see that it is actually the case. 1607 1608 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1609 fprintf(stderr, "Header not consistant with data block\n"); 1610 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1611 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1612 fprintf(stderr, "Please recheck read sequence \n"); 1613 if( Strict_Error ) { 1614 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1615 endTimer(&timer_end); 1616 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1617 return; 1618 } 1619 } 1620 1621 if ( LastHeaderNotFound ) { 1622 endTimer(&timer_end); 1623 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1624 return; 1625 } 1626 fileObject = fileArray[ filePtr ]; 1627 Wrong_Endian = byte_order[ filePtr ]; 1628 LastHeaderKey.erase(filePtr); 1629 readDataBlock(fileObject,valueArray,*nItems,datatype,iotype); 1630 1631 //return; 1632 } 1633 else { 1634 // printf("read data block\n"); 1635 MPI_Status read_data_status; 1636 size_t type_size = typeSize( datatype ); 1637 int nUnits = *nItems; 1638 isBinary( iotype ); 1639 1640 // read datablock then 1641 //MR CHANGE 1642 // if ( cscompare ( datatype, "double")) 1643 char* ts2 = StringStripper( datatype ); 1644 if ( cscompare ( "double" , ts2)) 1645 //MR CHANGE END 1646 { 1647 1648 MPI_File_read_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1649 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1650 valueArray, 1651 nUnits, 1652 MPI_DOUBLE ); 1653 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1654 valueArray, 1655 &read_data_status ); 1656 data_size=8*nUnits; 1657 1658 } 1659 //MR CHANGE 1660 // else if ( cscompare ( datatype, "integer")) 1661 else if ( cscompare ( "integer" , ts2)) 1662 //MR CHANGE END 1663 { 1664 MPI_File_read_at_all_begin(PhastaIOActiveFiles[i]->file_handle, 1665 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1666 valueArray, 1667 nUnits, 1668 MPI_INT ); 1669 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1670 valueArray, 1671 &read_data_status ); 1672 data_size=4*nUnits; 1673 } 1674 else 1675 { 1676 *fileDescriptor = DATA_TYPE_ILLEGAL; 1677 printf("readdatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 1678 endTimer(&timer_end); 1679 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1680 return; 1681 } 1682 free(ts2); 1683 1684 1685 // printf("%d Read finishe\n",PhastaIOActiveFiles[i]->myrank); 1686 1687 // Swap data byte order if endianess is different ... 1688 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) 1689 { 1690 SwapArrayByteOrder( valueArray, type_size, nUnits ); 1691 } 1692 } 1693 1694 endTimer(&timer_end); 1695 char extra_msg[1024]; 1696 memset(extra_msg, '\0', 1024); 1697 char* key = StringStripper(keyphrase); 1698 sprintf(extra_msg, " field is %s ", key); 1699 printPerf("readdatablock", timer_start, timer_end, data_size, 1, extra_msg); 1700 free(key); 1701 1702 } 1703 1704 void writeHeader(FILE* f, 1705 const char keyphrase[], 1706 const void* valueArray, 1707 const int nItems, 1708 const int ndataItems, 1709 const char datatype[], 1710 const char iotype[]) 1711 { 1712 isBinary( iotype ); 1713 1714 const int _newline = 1715 ( ndataItems > 0 ) ? sizeof( char ) : 0; 1716 int size_of_nextblock = 1717 ( binary_format ) ? typeSize(datatype) * ndataItems + _newline : ndataItems; 1718 1719 fprintf( f, "%s : < %d > ", keyphrase, size_of_nextblock ); 1720 for( int i = 0; i < nItems; i++ ) 1721 fprintf( f, "%d ", *((int*)((int*)valueArray+i))); 1722 fprintf( f, "\n"); 1723 } 1724 1725 void writeheader( const int* fileDescriptor, 1726 const char keyphrase[], 1727 const void* valueArray, 1728 const int* nItems, 1729 const int* ndataItems, 1730 const char datatype[], 1731 const char iotype[]) 1732 { 1733 1734 //if(irank == 0) printf("entering writeheader()\n"); 1735 1736 double timer_start, timer_end; 1737 startTimer(&timer_start); 1738 1739 int i = *fileDescriptor; 1740 checkFileDescriptor("writeheader",&i); 1741 1742 if ( PhastaIONextActiveIndex == 0 ) { 1743 int filePtr = *fileDescriptor - 1; 1744 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1745 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1746 fprintf(stderr,"openfile_ function has to be called before \n") ; 1747 fprintf(stderr,"acessing the file\n ") ; 1748 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1749 endTimer(&timer_end); 1750 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1751 return; 1752 } 1753 1754 LastHeaderKey[filePtr] = std::string(keyphrase); 1755 DataSize = *ndataItems; 1756 FILE* fileObject = fileArray[ filePtr ] ; 1757 header_type[ filePtr ] = typeSize( datatype ); 1758 writeHeader(fileObject,keyphrase,valueArray,*nItems, 1759 *ndataItems,datatype,iotype); 1760 } 1761 else { // else it's parallel I/O 1762 DataSize = *ndataItems; 1763 size_t type_size = typeSize( datatype ); 1764 isBinary( iotype ); 1765 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1766 1767 int string_length = strlen( keyphrase ); 1768 char* buffer = (char*) malloc ( string_length+1 ); 1769 //char* buffer = ( char * ) malloc( string_length + 1 + pool_align ); 1770 //mem_address = (long long )buffer; 1771 //if( mem_address & (pool_align -1) ) 1772 // buffer += pool_align - (mem_address & (pool_align -1)); 1773 1774 strcpy ( buffer, keyphrase); 1775 buffer[ string_length ] = '\0'; 1776 1777 char* st2 = strtok ( buffer, "@" ); 1778 st2 = strtok (NULL, "@"); 1779 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1780 1781 if ( char* p = strpbrk(buffer, "@") ) 1782 *p = '\0'; 1783 1784 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1785 sprintf(mpi_tag, "\n%s : %d\n", buffer, PhastaIOActiveFiles[i]->field_count); 1786 unsigned long long offset_value; 1787 1788 int temp = *ndataItems; 1789 unsigned long long number_of_items = (unsigned long long)temp; 1790 MPI_Barrier(PhastaIOActiveFiles[i]->local_comm); 1791 1792 MPI_Scan( &number_of_items, 1793 &offset_value, 1794 1, 1795 MPI_LONG_LONG_INT, 1796 MPI_SUM, 1797 PhastaIOActiveFiles[i]->local_comm ); 1798 1799 offset_value = (offset_value - number_of_items) * type_size; 1800 1801 offset_value += PhastaIOActiveFiles[i]->local_myrank * 1802 DB_HEADER_SIZE + 1803 PhastaIOActiveFiles[i]->next_start_address; 1804 // This offset is the starting address of each datablock header... 1805 PhastaIOActiveFiles[i]->my_offset = offset_value; 1806 1807 // Write in my offset table ... 1808 PhastaIOActiveFiles[i]->my_offset_table[PhastaIOActiveFiles[i]->field_count][PhastaIOActiveFiles[i]->part_count] = 1809 PhastaIOActiveFiles[i]->my_offset; 1810 1811 // Update the next-start-address ... 1812 PhastaIOActiveFiles[i]->next_start_address = offset_value + 1813 number_of_items * type_size + 1814 DB_HEADER_SIZE; 1815 MPI_Bcast( &(PhastaIOActiveFiles[i]->next_start_address), 1816 1, 1817 MPI_LONG_LONG_INT, 1818 PhastaIOActiveFiles[i]->local_numprocs-1, 1819 PhastaIOActiveFiles[i]->local_comm ); 1820 1821 // Prepare datablock header ... 1822 int _newline = (*ndataItems>0)?sizeof(char):0; 1823 unsigned int size_of_nextblock = type_size * (*ndataItems) + _newline; 1824 1825 //char datablock_header[255]; 1826 //bzero((void*)datablock_header,255); 1827 char datablock_header[DB_HEADER_SIZE]; 1828 bzero((void*)datablock_header,DB_HEADER_SIZE); 1829 1830 PhastaIOActiveFiles[i]->GPid = PhastaIOActiveFiles[i]->nppp*PhastaIOActiveFiles[i]->myrank+PhastaIOActiveFiles[i]->part_count; 1831 sprintf( datablock_header, 1832 "\n%s : < %u >", 1833 keyphrase, 1834 size_of_nextblock ); 1835 1836 for ( int j = 0; j < *nItems; j++ ) 1837 { 1838 sprintf( datablock_header, 1839 "%s %d ", 1840 datablock_header, 1841 *((int*)((int*)valueArray+j))); 1842 } 1843 sprintf( datablock_header, 1844 "%s\n ", 1845 datablock_header ); 1846 1847 // Write datablock header ... 1848 //MR CHANGE 1849 // if ( cscompare(datatype,"double") ) 1850 char* ts1 = StringStripper( datatype ); 1851 if ( cscompare("double",ts1) ) 1852 //MR CHANGE END 1853 { 1854 free ( PhastaIOActiveFiles[i]->double_chunk ); 1855 PhastaIOActiveFiles[i]->double_chunk = ( double * )malloc( (sizeof( double )*number_of_items+ DB_HEADER_SIZE)); 1856 //PhastaIOActiveFiles[i]->double_chunk = ( double * ) malloc( sizeof( double )*number_of_items+ DB_HEADER_SIZE + pool_align ); 1857 //mem_address = (long long )PhastaIOActiveFiles[i]->double_chunk; 1858 //if( mem_address & (pool_align -1) ) 1859 // PhastaIOActiveFiles[i]->double_chunk += pool_align - (mem_address & (pool_align -1)); 1860 1861 double * aa = ( double * )datablock_header; 1862 memcpy(PhastaIOActiveFiles[i]->double_chunk, aa, DB_HEADER_SIZE); 1863 } 1864 //MR CHANGE 1865 // if ( cscompare(datatype,"integer") ) 1866 else if ( cscompare("integer",ts1) ) 1867 //MR CHANGE END 1868 { 1869 free ( PhastaIOActiveFiles[i]->int_chunk ); 1870 PhastaIOActiveFiles[i]->int_chunk = ( int * )malloc( (sizeof( int )*number_of_items+ DB_HEADER_SIZE)); 1871 //PhastaIOActiveFiles[i]->int_chunk = ( int * ) malloc( sizeof( int )*number_of_items+ DB_HEADER_SIZE + pool_align ); 1872 //mem_address = (long long )PhastaIOActiveFiles[i]->int_chunk; 1873 //if( mem_address & (pool_align -1) ) 1874 // PhastaIOActiveFiles[i]->int_chunk += pool_align - (mem_address & ( pool_align -1)); 1875 1876 int * aa = ( int * )datablock_header; 1877 memcpy(PhastaIOActiveFiles[i]->int_chunk, aa, DB_HEADER_SIZE); 1878 } 1879 else { 1880 // *fileDescriptor = DATA_TYPE_ILLEGAL; 1881 printf("writeheader - DATA_TYPE_ILLEGAL - %s\n",datatype); 1882 endTimer(&timer_end); 1883 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1884 return; 1885 } 1886 free(ts1); 1887 1888 PhastaIOActiveFiles[i]->part_count++; 1889 if ( PhastaIOActiveFiles[i]->part_count == PhastaIOActiveFiles[i]->nppp ) { 1890 //A new field will be written 1891 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1892 memcpy( PhastaIOActiveFiles[i]->master_header + 1893 PhastaIOActiveFiles[i]->field_count * 1894 MAX_FIELDS_NAME_LENGTH + 1895 MAX_FIELDS_NAME_LENGTH * 2, 1896 mpi_tag, 1897 MAX_FIELDS_NAME_LENGTH-1); 1898 } 1899 PhastaIOActiveFiles[i]->field_count++; 1900 PhastaIOActiveFiles[i]->part_count=0; 1901 } 1902 free(buffer); 1903 } 1904 1905 endTimer(&timer_end); 1906 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1907 } 1908 1909 void writeDataBlock( FILE* f, 1910 const void* valueArray, 1911 const int nItems, 1912 const char datatype[], 1913 const char iotype[] ) 1914 { 1915 isBinary( iotype ); 1916 size_t type_size = typeSize( datatype ); 1917 if ( binary_format ) { 1918 fwrite( valueArray, type_size, nItems, f ); 1919 fprintf( f,"\n"); 1920 } else { 1921 char* ts1 = StringStripper( datatype ); 1922 if ( cscompare( "integer", ts1 ) ) { 1923 for( int n=0; n < nItems ; n++ ) 1924 fprintf(f,"%d\n",*((int*)((int*)valueArray+n))); 1925 } else if ( cscompare( "double", ts1 ) ) { 1926 for( int n=0; n < nItems ; n++ ) 1927 fprintf(f,"%lf\n",*((double*)((double*)valueArray+n))); 1928 } 1929 free (ts1); 1930 } 1931 } 1932 1933 void writedatablock( const int* fileDescriptor, 1934 const char keyphrase[], 1935 const void* valueArray, 1936 const int* nItems, 1937 const char datatype[], 1938 const char iotype[] ) 1939 { 1940 //if(irank == 0) printf("entering writedatablock()\n"); 1941 1942 unsigned long long data_size = 0; 1943 double timer_start, timer_end; 1944 startTimer(&timer_start); 1945 1946 int i = *fileDescriptor; 1947 checkFileDescriptor("writedatablock",&i); 1948 1949 if ( PhastaIONextActiveIndex == 0 ) { 1950 int filePtr = *fileDescriptor - 1; 1951 1952 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1953 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1954 fprintf(stderr,"openfile_ function has to be called before \n") ; 1955 fprintf(stderr,"acessing the file\n ") ; 1956 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1957 endTimer(&timer_end); 1958 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1959 return; 1960 } 1961 // since we require that a consistant header always preceed the data block 1962 // let us check to see that it is actually the case. 1963 1964 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1965 fprintf(stderr, "Header not consistant with data block\n"); 1966 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1967 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1968 fprintf(stderr, "Please recheck write sequence \n"); 1969 if( Strict_Error ) { 1970 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1971 endTimer(&timer_end); 1972 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1973 return; 1974 } 1975 } 1976 1977 FILE* fileObject = fileArray[ filePtr ] ; 1978 size_t type_size=typeSize( datatype ); 1979 isBinary( iotype ); 1980 1981 LastHeaderKey.erase(filePtr); 1982 1983 if ( header_type[filePtr] != (int)type_size ) { 1984 fprintf(stderr,"header and datablock differ on typeof data in the block for\n"); 1985 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1986 if( Strict_Error ) { 1987 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1988 endTimer(&timer_end); 1989 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1990 return; 1991 } 1992 } 1993 1994 int nUnits = *nItems; 1995 1996 if ( nUnits != DataSize ) { 1997 fprintf(stderr,"header and datablock differ on number of data items for\n"); 1998 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1999 if( Strict_Error ) { 2000 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 2001 endTimer(&timer_end); 2002 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 2003 return; 2004 } 2005 } 2006 writeDataBlock(fileObject,valueArray,*nItems,datatype,iotype); 2007 } 2008 else { // syncIO case 2009 MPI_Status write_data_status; 2010 isBinary( iotype ); 2011 int nUnits = *nItems; 2012 2013 //MR CHANGE 2014 // if ( cscompare(datatype,"double") ) 2015 char* ts1 = StringStripper( datatype ); 2016 if ( cscompare("double",ts1) ) 2017 //MR CHANGE END 2018 { 2019 memcpy((PhastaIOActiveFiles[i]->double_chunk+DB_HEADER_SIZE/sizeof(double)), valueArray, nUnits*sizeof(double)); 2020 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 2021 PhastaIOActiveFiles[i]->my_offset, 2022 PhastaIOActiveFiles[i]->double_chunk, 2023 //BLOCK_SIZE/sizeof(double), 2024 nUnits+DB_HEADER_SIZE/sizeof(double), 2025 MPI_DOUBLE ); 2026 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 2027 PhastaIOActiveFiles[i]->double_chunk, 2028 &write_data_status ); 2029 data_size=8*nUnits; 2030 } 2031 //MR CHANGE 2032 // else if ( cscompare ( datatype, "integer")) 2033 else if ( cscompare("integer",ts1) ) 2034 //MR CHANGE END 2035 { 2036 memcpy((PhastaIOActiveFiles[i]->int_chunk+DB_HEADER_SIZE/sizeof(int)), valueArray, nUnits*sizeof(int)); 2037 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 2038 PhastaIOActiveFiles[i]->my_offset, 2039 PhastaIOActiveFiles[i]->int_chunk, 2040 nUnits+DB_HEADER_SIZE/sizeof(int), 2041 MPI_INT ); 2042 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 2043 PhastaIOActiveFiles[i]->int_chunk, 2044 &write_data_status ); 2045 data_size=4*nUnits; 2046 } 2047 else { 2048 printf("Error: writedatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 2049 endTimer(&timer_end); 2050 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 2051 return; 2052 } 2053 free(ts1); 2054 } 2055 2056 endTimer(&timer_end); 2057 char extra_msg[1024]; 2058 memset(extra_msg, '\0', 1024); 2059 char* key = StringStripper(keyphrase); 2060 sprintf(extra_msg, " field is %s ", key); 2061 printPerf("writedatablock", timer_start, timer_end, data_size, 1, extra_msg); 2062 free(key); 2063 2064 } 2065 2066 void 2067 SwapArrayByteOrder( void* array, 2068 int nbytes, 2069 int nItems ) 2070 { 2071 /* This swaps the byte order for the array of nItems each 2072 of size nbytes , This will be called only locally */ 2073 int i,j; 2074 unsigned char* ucDst = (unsigned char*)array; 2075 2076 for(i=0; i < nItems; i++) { 2077 for(j=0; j < (nbytes/2); j++) 2078 std::swap( ucDst[j] , ucDst[(nbytes - 1) - j] ); 2079 ucDst += nbytes; 2080 } 2081 } 2082 2083 void 2084 writestring( int* fileDescriptor, 2085 const char inString[] ) 2086 { 2087 2088 int filePtr = *fileDescriptor - 1; 2089 FILE* fileObject = fileArray[filePtr] ; 2090 fprintf(fileObject,"%s",inString ); 2091 return; 2092 } 2093 2094 void 2095 Gather_Headers( int* fileDescriptor, 2096 vector< string >& headers ) 2097 { 2098 2099 FILE* fileObject; 2100 char Line[1024]; 2101 2102 fileObject = fileArray[ (*fileDescriptor)-1 ]; 2103 2104 while( !feof(fileObject) ) { 2105 fgets( Line, 1024, fileObject); 2106 if ( Line[0] == '#' ) { 2107 headers.push_back( Line ); 2108 } else { 2109 break; 2110 } 2111 } 2112 rewind( fileObject ); 2113 clearerr( fileObject ); 2114 } 2115 void 2116 isWrong( void ) { (Wrong_Endian) ? fprintf(stdout,"YES\n"): fprintf(stdout,"NO\n") ; } 2117 2118 void 2119 togglestrictmode( void ) { Strict_Error = !Strict_Error; } 2120 2121 int 2122 isLittleEndian( void ) 2123 { 2124 // this function returns a 1 if the current running architecture is 2125 // LittleEndian Byte Ordered, else it returns a zero 2126 2127 union { 2128 long a; 2129 char c[sizeof( long )]; 2130 } endianUnion; 2131 2132 endianUnion.a = 1 ; 2133 2134 if ( endianUnion.c[sizeof(long)-1] != 1 ) return 1 ; 2135 else return 0; 2136 } 2137 2138 namespace PHASTA { 2139 const char* const PhastaIO_traits<int>::type_string = "integer"; 2140 const char* const PhastaIO_traits<double>::type_string = "double"; 2141 } 2142