1 /* 2 * 3 * This is the SyncIO library that uses MPI-IO collective functions to 4 * implement a flexible I/O checkpoint solution for a large number of 5 * processors. 6 * 7 * Previous developer: Ning Liu (liun2@cs.rpi.edu) 8 * Jing Fu (fuj@cs.rpi.edu) 9 * Current developers: Michel Rasquin (Michel.Rasquin@colorado.edu), 10 * Ben Matthews (benjamin.a.matthews@colorado.edu) 11 * 12 */ 13 14 #include <map> 15 #include <vector> 16 #include <string> 17 #include <cstdarg> 18 #include <string.h> 19 #include <ctype.h> 20 #include <stdlib.h> 21 #include <stdio.h> 22 #include <math.h> 23 #include <sstream> 24 #include "phastaIO.h" 25 #include "mpi.h" 26 #include "phiotmrc.h" 27 #include "phiostats.h" 28 #include <assert.h> 29 30 #define VERSION_INFO_HEADER_SIZE 8192 31 #define DB_HEADER_SIZE 1024 32 #define ONE_MEGABYTE 1048576 33 #define TWO_MEGABYTE 2097152 34 #define ENDIAN_TEST_NUMBER 12180 // Troy's Zip Code!! 35 #define MAX_PHASTA_FILES 64 36 #define MAX_PHASTA_FILE_NAME_LENGTH 1024 37 #define MAX_FIELDS_NUMBER ((VERSION_INFO_HEADER_SIZE/MAX_FIELDS_NAME_LENGTH)-4) // The meta data include - MPI_IO_Tag, nFields, nFields*names of the fields, nppf 38 // -3 for MPI_IO_Tag, nFields and nppf, -4 for extra security (former nFiles) 39 #define MAX_FIELDS_NAME_LENGTH 128 40 #define DefaultMHSize (4*ONE_MEGABYTE) 41 //#define DefaultMHSize (8350) //For test 42 #define LATEST_WRITE_VERSION 1 43 #define inv1024sq 953.674316406e-9 // = 1/1024/1024 44 int MasterHeaderSize = -1; 45 46 bool PRINT_PERF = false; // default print no perf results 47 int irank = -1; // global rank, should never be manually manipulated 48 int mysize = -1; 49 50 // Static variables are bad but used here to store the subcommunicator and associated variables 51 // Prevent MPI_Comm_split to be called more than once, especially on BGQ with the V1R2M1 driver (leak detected in MPI_Comm_split - IBM working on it) 52 static int s_assign_local_comm = 0; 53 static MPI_Comm s_local_comm; 54 static int s_local_size = -1; 55 static int s_local_rank = -1; 56 57 // the following defines are for debug printf 58 #define PHASTAIO_DEBUG 0 //default to not print any debugging info 59 60 void phprintf(const char* fmt, ...) { 61 (void)fmt; 62 #if PHASTAIO_DEBUG 63 char format[1024]; 64 snprintf(format, sizeof(format), "phastaIO debug: %s", fmt); 65 va_list ap; 66 va_start(ap,fmt); 67 vprintf(format,ap); 68 va_end(ap); 69 #endif 70 } 71 72 void phprintf_0(const char* fmt, ...) { 73 (void)fmt; 74 #if PHASTAIO_DEBUG 75 int rank = 0; 76 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 77 if(rank == 0){ 78 char format[1024]; 79 snprintf(format, sizeof(format), "phastaIO debug: irank=0 %s", fmt); 80 va_list ap; 81 va_start(ap,fmt); 82 vprintf(format, ap); 83 va_end(ap); 84 } 85 #endif 86 } 87 88 enum PhastaIO_Errors 89 { 90 MAX_PHASTA_FILES_EXCEEDED = -1, 91 UNABLE_TO_OPEN_FILE = -2, 92 NOT_A_MPI_FILE = -3, 93 GPID_EXCEEDED = -4, 94 DATA_TYPE_ILLEGAL = -5 95 }; 96 97 using namespace std; 98 99 namespace{ 100 101 map<int, std::string> LastHeaderKey; 102 vector< FILE* > fileArray; 103 vector< bool > byte_order; 104 vector< int > header_type; 105 int DataSize=0; 106 bool LastHeaderNotFound = false; 107 bool Wrong_Endian = false ; 108 bool Strict_Error = false ; 109 bool binary_format = true; 110 111 /***********************************************************************/ 112 /***************** NEW PHASTA IO CODE STARTS HERE **********************/ 113 /***********************************************************************/ 114 115 typedef struct 116 { 117 char filename[MAX_PHASTA_FILE_NAME_LENGTH]; /* defafults to 1024 */ 118 unsigned long my_offset; 119 unsigned long next_start_address; 120 unsigned long **my_offset_table; 121 unsigned long **my_read_table; 122 123 double * double_chunk; 124 double * read_double_chunk; 125 126 int field_count; 127 int part_count; 128 int read_field_count; 129 int read_part_count; 130 int GPid; 131 int start_id; 132 133 int mhsize; 134 135 int myrank; 136 int numprocs; 137 int local_myrank; 138 int local_numprocs; 139 140 int nppp; 141 int nPPF; 142 int nFiles; 143 int nFields; 144 145 int * int_chunk; 146 int * read_int_chunk; 147 148 int Wrong_Endian; /* default to false */ 149 char * master_header; 150 MPI_File file_handle; 151 MPI_Comm local_comm; 152 } phastaio_file_t; 153 154 typedef struct 155 { 156 int nppf, nfields; 157 char * masterHeader; 158 }serial_file; 159 160 serial_file *SerialFile; 161 phastaio_file_t *PhastaIOActiveFiles[MAX_PHASTA_FILES]; 162 int PhastaIONextActiveIndex = 0; /* indicates next index to allocate */ 163 164 // the caller has the responsibility to delete the returned string 165 // TODO: StringStipper("nbc value? ") returns NULL? 166 char* StringStripper( const char istring[] ) { 167 int length = strlen( istring ); 168 char* dest = (char *)malloc( length + 1 ); 169 strcpy( dest, istring ); 170 dest[ length ] = '\0'; 171 if ( char* p = strpbrk( dest, " ") ) 172 *p = '\0'; 173 return dest; 174 } 175 176 inline int cscompare( const char teststring[], const char targetstring[] ) { 177 char* s1 = const_cast<char*>(teststring); 178 char* s2 = const_cast<char*>(targetstring); 179 180 while( *s1 == ' ') s1++; 181 while( *s2 == ' ') s2++; 182 while( ( *s1 ) 183 && ( *s2 ) 184 && ( *s2 != '?') 185 && ( tolower( *s1 )==tolower( *s2 ) ) ) { 186 s1++; 187 s2++; 188 while( *s1 == ' ') s1++; 189 while( *s2 == ' ') s2++; 190 } 191 if ( !( *s1 ) || ( *s1 == '?') ) return 1; 192 else return 0; 193 } 194 195 inline void isBinary( const char iotype[] ) { 196 char* fname = StringStripper( iotype ); 197 if ( cscompare( fname, "binary" ) ) binary_format = true; 198 else binary_format = false; 199 free (fname); 200 201 } 202 203 inline size_t typeSize( const char typestring[] ) { 204 char* ts1 = StringStripper( typestring ); 205 if ( cscompare( "integer", ts1 ) ) { 206 free (ts1); 207 return sizeof(int); 208 } else if ( cscompare( "double", ts1 ) ) { 209 free (ts1); 210 return sizeof( double ); 211 } else { 212 free (ts1); 213 fprintf(stderr,"unknown type : %s\n",ts1); 214 return 0; 215 } 216 } 217 218 int readHeader( 219 FILE* fileObject, 220 const char phrase[], 221 int* params, 222 int expect ) { 223 char* text_header; 224 char* token; 225 char Line[1024] = "\0"; 226 char junk; 227 bool FOUND = false ; 228 int real_length; 229 int skip_size, integer_value; 230 int rewind_count=0; 231 232 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 233 rewind( fileObject ); 234 clearerr( fileObject ); 235 rewind_count++; 236 fgets( Line, 1024, fileObject ); 237 } 238 239 while( !FOUND && ( rewind_count < 2 ) ) { 240 if ( ( Line[0] != '\n' ) && ( real_length = strcspn( Line, "#" )) ) { 241 text_header = (char *)malloc( real_length + 1 ); 242 strncpy( text_header, Line, real_length ); 243 text_header[ real_length ] =static_cast<char>(NULL); 244 token = strtok ( text_header, ":" ); 245 assert(token); 246 if( cscompare( phrase , token ) ) { 247 FOUND = true ; 248 token = strtok( NULL, " ,;<>" ); 249 assert(token); 250 skip_size = atoi( token ); 251 int i; 252 for( i=0; i < expect && ( token = strtok( NULL," ,;<>") ); i++) { 253 assert(token); 254 params[i] = atoi( token ); 255 } 256 if ( i < expect ) { 257 fprintf(stderr, 258 "Aloha Expected # of ints not found for: %s\n",phrase ); 259 } 260 } else if ( cscompare(token,"byteorder magic number") ) { 261 if ( binary_format ) { 262 fread((void*)&integer_value,sizeof(int),1,fileObject); 263 fread( &junk, sizeof(char), 1 , fileObject ); 264 if ( 362436 != integer_value ) Wrong_Endian = true; 265 } else{ 266 fscanf(fileObject, "%d\n", &integer_value ); 267 } 268 } else { 269 /* some other header, so just skip over */ 270 token = strtok( NULL, " ,;<>" ); 271 assert(token); 272 skip_size = atoi( token ); 273 if ( binary_format) 274 fseek( fileObject, skip_size, SEEK_CUR ); 275 else 276 for( int gama=0; gama < skip_size; gama++ ) 277 fgets( Line, 1024, fileObject ); 278 } 279 free (text_header); 280 } // end of if before while loop 281 282 if ( !FOUND ) 283 if( !fgets( Line, 1024, fileObject ) && feof( fileObject ) ) { 284 rewind( fileObject ); 285 clearerr( fileObject ); 286 rewind_count++; 287 fgets( Line, 1024, fileObject ); 288 } 289 } 290 291 if ( !FOUND ) { 292 //fprintf(stderr, "Error: Could not find: %s\n", phrase); 293 if(irank == 0) printf("WARNING: Could not find: %s\n", phrase); 294 return 1; 295 } 296 return 0; 297 } 298 299 } // end unnamed namespace 300 301 302 // begin of publicly visible functions 303 304 /** 305 * This function takes a long long pointer and assign (start) phiotmrc value to it 306 */ 307 void startTimer(double* start) { 308 if( !PRINT_PERF ) return; 309 MPI_Barrier(MPI_COMM_WORLD); 310 *start = phiotmrc(); 311 } 312 313 /** 314 * This function takes a long long pointer and assign (end) phiotmrc value to it 315 */ 316 void endTimer(double* end) { 317 if( !PRINT_PERF ) return; 318 *end = phiotmrc(); 319 MPI_Barrier(MPI_COMM_WORLD); 320 } 321 322 /** 323 * choose to print some performance results (or not) according to 324 * the PRINT_PERF macro 325 */ 326 void printPerf( 327 const char* func_name, 328 double start, 329 double end, 330 unsigned long datasize, 331 int printdatainfo, 332 const char* extra_msg) { 333 if( !PRINT_PERF ) return; 334 unsigned long data_size = datasize; 335 double time = end - start; 336 unsigned long isizemin,isizemax,isizetot; 337 double sizemin,sizemax,sizeavg,sizetot,rate; 338 double tmin, tmax, tavg, ttot; 339 340 MPI_Allreduce(&time, &tmin,1, MPI_DOUBLE, MPI_MIN, MPI_COMM_WORLD); 341 MPI_Allreduce(&time, &tmax,1, MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD); 342 MPI_Allreduce(&time, &ttot,1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD); 343 tavg = ttot/mysize; 344 345 if(irank == 0) { 346 if ( PhastaIONextActiveIndex == 0 ) printf("** 1PFPP "); 347 else printf("** syncIO "); 348 printf("%s(): Tmax = %f sec, Tmin = %f sec, Tavg = %f sec", func_name, tmax, tmin, tavg); 349 } 350 351 if(printdatainfo == 1) { // if printdatainfo ==1, compute I/O rate and block size 352 MPI_Allreduce(&data_size,&isizemin,1,MPI_LONG_LONG_INT,MPI_MIN,MPI_COMM_WORLD); 353 MPI_Allreduce(&data_size,&isizemax,1,MPI_LONG_LONG_INT,MPI_MAX,MPI_COMM_WORLD); 354 MPI_Allreduce(&data_size,&isizetot,1,MPI_LONG_LONG_INT,MPI_SUM,MPI_COMM_WORLD); 355 356 sizemin=(double)(isizemin*inv1024sq); 357 sizemax=(double)(isizemax*inv1024sq); 358 sizetot=(double)(isizetot*inv1024sq); 359 sizeavg=(double)(1.0*sizetot/mysize); 360 rate=(double)(1.0*sizetot/tmax); 361 362 if( irank == 0) { 363 printf(", Rate = %f MB/s [%s] \n \t\t\t" 364 " block size: Min= %f MB; Max= %f MB; Avg= %f MB; Tot= %f MB\n", 365 rate, extra_msg, sizemin,sizemax,sizeavg,sizetot); 366 } 367 } 368 else { 369 if(irank == 0) { 370 printf(" \n"); 371 //printf(" (%s) \n", extra_msg); 372 } 373 } 374 } 375 376 /** 377 * This function is normally called at the beginning of a read operation, before 378 * init function. 379 * This function (uses rank 0) reads out nfields, nppf, master header size, 380 * endianess and allocates for masterHeader string. 381 * These values are essential for following read operations. Rank 0 will bcast 382 * these values to other ranks in the commm world 383 * 384 * If the file set is of old POSIX format, it would throw error and exit 385 */ 386 void queryphmpiio(const char filename[],int *nfields, int *nppf) 387 { 388 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 389 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 390 391 if(irank == 0) { 392 FILE * fileHandle; 393 char* fname = StringStripper( filename ); 394 395 phioTime t0,t1; 396 phastaio_time(&t0); 397 fileHandle = fopen (fname,"rb"); 398 phastaio_time(&t1); 399 const size_t elapsed = phastaio_time_diff(&t0,&t1); 400 phastaio_addOpenTime(elapsed); 401 if (fileHandle == NULL ) { 402 printf("\nError: File %s doesn't exist! Please check!\n",fname); 403 } 404 else { 405 SerialFile =(serial_file *)calloc( 1, sizeof( serial_file) ); 406 int meta_size_limit = VERSION_INFO_HEADER_SIZE; 407 SerialFile->masterHeader = (char *)malloc( meta_size_limit ); 408 fread(SerialFile->masterHeader, 1, meta_size_limit, fileHandle); 409 410 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 411 char version[MAX_FIELDS_NAME_LENGTH/4]; 412 int mhsize; 413 char * token; 414 int magic_number; 415 416 memcpy( read_out_tag, 417 SerialFile->masterHeader, 418 MAX_FIELDS_NAME_LENGTH-1 ); 419 420 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) { 421 // Test endianess ... 422 memcpy (&magic_number, 423 SerialFile->masterHeader + sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 424 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 425 426 if ( magic_number != ENDIAN_TEST_NUMBER ) { 427 printf("Endian is different!\n"); 428 // Will do swap later 429 } 430 431 // test version, old version, default masterheader size is 4M 432 // newer version masterheader size is read from first line 433 memcpy(version, 434 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/2, 435 MAX_FIELDS_NAME_LENGTH/4 - 1); //TODO: why -1? 436 437 if( cscompare ("version",version) ) { 438 // if there is "version" tag in the file, then it is newer format 439 // read master header size from here, otherwise use default 440 // Note: if version is "1", we know mhsize is at 3/4 place... 441 442 token = strtok(version, ":"); 443 token = strtok(NULL, " ,;<>" ); 444 int iversion = atoi(token); 445 446 if( iversion == 1) { 447 memcpy( &mhsize, 448 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH/4*3 + sizeof("mhsize : ")-1, 449 sizeof(int)); 450 if ( magic_number != ENDIAN_TEST_NUMBER ) 451 SwapArrayByteOrder(&mhsize, sizeof(int), 1); 452 453 if( mhsize > DefaultMHSize ) { 454 //if actual headersize is larger than default, let's re-read 455 free(SerialFile->masterHeader); 456 SerialFile->masterHeader = (char *)malloc(mhsize); 457 fseek(fileHandle, 0, SEEK_SET); // reset the file stream position 458 fread(SerialFile->masterHeader,1,mhsize,fileHandle); 459 } 460 } 461 //TODO: check if this is a valid int?? 462 MasterHeaderSize = mhsize; 463 } 464 else { // else it's version 0's format w/o version tag, implicating MHSize=4M 465 MasterHeaderSize = DefaultMHSize; 466 } 467 468 memcpy( read_out_tag, 469 SerialFile->masterHeader+MAX_FIELDS_NAME_LENGTH+1, 470 MAX_FIELDS_NAME_LENGTH ); //TODO: why +1 471 472 // Read in # fields ... 473 token = strtok( read_out_tag, ":" ); 474 token = strtok( NULL," ,;<>" ); 475 *nfields = atoi( token ); 476 if ( *nfields > MAX_FIELDS_NUMBER) { 477 printf("Error queryphmpiio: nfields is larger than MAX_FIELDS_NUMBER!\n"); 478 } 479 SerialFile->nfields=*nfields; //TODO: sanity check of this int? 480 481 memcpy( read_out_tag, 482 SerialFile->masterHeader + MAX_FIELDS_NAME_LENGTH * 2 483 + *nfields * MAX_FIELDS_NAME_LENGTH, 484 MAX_FIELDS_NAME_LENGTH); 485 486 token = strtok( read_out_tag, ":" ); 487 token = strtok( NULL," ,;<>" ); 488 *nppf = atoi( token ); 489 SerialFile->nppf=*nppf; //TODO: sanity check of int 490 } // end of if("MPI_IO_TAG") 491 else { 492 printf("Error queryphmpiio: The file you opened is not of syncIO new format, please check! read_out_tag = %s\n",read_out_tag); 493 exit(1); 494 } 495 phioTime t0,t1; 496 phastaio_time(&t0); 497 fclose(fileHandle); 498 phastaio_time(&t1); 499 const size_t elapsed = phastaio_time_diff(&t0,&t1); 500 phastaio_addCloseTime(elapsed); 501 free(SerialFile->masterHeader); 502 free(SerialFile); 503 } //end of else 504 free(fname); 505 } 506 507 // Bcast value to every one 508 MPI_Bcast( nfields, 1, MPI_INT, 0, MPI_COMM_WORLD ); 509 MPI_Bcast( nppf, 1, MPI_INT, 0, MPI_COMM_WORLD ); 510 MPI_Bcast( &MasterHeaderSize, 1, MPI_INT, 0, MPI_COMM_WORLD ); 511 phprintf("Info queryphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 512 } 513 514 /** 515 * This function computes the right master header size (round to size of 2^n). 516 * This is only needed for file format version 1 in "write" mode. 517 */ 518 int computeMHSize(int nfields, int nppf, int version) { 519 int mhsize=0; 520 if(version == 1) { 521 //int meta_info_size = (2+nfields+1) * MAX_FIELDS_NAME_LENGTH; // 2 is MPI_IO_TAG and nFields, the others 1 is nppf 522 int meta_info_size = VERSION_INFO_HEADER_SIZE; 523 int actual_size = nfields * nppf * sizeof(unsigned long) + meta_info_size; 524 //printf("actual_size = %d, offset table size = %d\n", actual_size, nfields * nppf * sizeof(long long)); 525 if (actual_size > DefaultMHSize) { 526 mhsize = (int) ceil( (double) actual_size/DefaultMHSize); // it's rounded to ceiling of this value 527 mhsize *= DefaultMHSize; 528 } 529 else { 530 mhsize = DefaultMHSize; 531 } 532 } else { 533 int rank = 0; 534 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 535 if(!rank) { 536 fprintf(stderr, 537 "ERROR invalid version passed to %s... exiting\n", __func__); 538 exit(EXIT_FAILURE); 539 } 540 } 541 return mhsize; 542 } 543 544 /** 545 * Computes correct color of a rank according to number of files. 546 */ 547 extern "C" int computeColor( int myrank, int numprocs, int nfiles) { 548 int color = 549 (int)(myrank / (numprocs / nfiles)); 550 return color; 551 } 552 553 554 /** 555 * Check the file descriptor. 556 */ 557 void checkFileDescriptor(const char fctname[], int* fileDescriptor ) { 558 if ( *fileDescriptor < 0 ) { 559 printf("Error: File descriptor = %d in %s\n",*fileDescriptor,fctname); 560 exit(1); 561 } 562 } 563 564 /** 565 * Initialize the file struct members and allocate space for file struct 566 * buffers. 567 * 568 * Note: this function is only called when we are using new format. Old POSIX 569 * format should skip this routine and call openfile() directly instead. 570 */ 571 int initphmpiio( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[]) 572 { 573 // we init irank again in case query not called (e.g. syncIO write case) 574 MPI_Comm_rank(MPI_COMM_WORLD, &irank); 575 MPI_Comm_size(MPI_COMM_WORLD, &mysize); 576 577 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d, nfields %d, nppf %d, nfiles %d\n", irank, MasterHeaderSize, *nfields, *nppf, *nfiles); 578 579 double timer_start, timer_end; 580 startTimer(&timer_start); 581 582 char* imode = StringStripper( mode ); 583 584 // Note: if it's read, we presume query was called prior to init and 585 // MasterHeaderSize is already set to correct value from parsing header 586 // otherwise it's write then it needs some computation to be set 587 if ( cscompare( "read", imode ) ) { 588 // do nothing 589 } 590 else if( cscompare( "write", imode ) ) { 591 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 592 } 593 else { 594 printf("Error initphmpiio: can't recognize the mode %s", imode); 595 exit(1); 596 } 597 free ( imode ); 598 599 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 600 601 int i, j; 602 603 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 604 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 605 endTimer(&timer_end); 606 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 607 return MAX_PHASTA_FILES_EXCEEDED; 608 } 609 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 610 // { 611 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 612 // { 613 // PhastaIOActiveFiles[i] = NULL; 614 // } 615 // } 616 617 618 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 619 620 i = PhastaIONextActiveIndex; 621 PhastaIONextActiveIndex++; 622 623 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 624 625 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 626 627 PhastaIOActiveFiles[i]->Wrong_Endian = false; 628 629 PhastaIOActiveFiles[i]->nFields = *nfields; 630 PhastaIOActiveFiles[i]->nPPF = *nppf; 631 PhastaIOActiveFiles[i]->nFiles = *nfiles; 632 MPI_Comm_rank(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->myrank)); 633 MPI_Comm_size(MPI_COMM_WORLD, &(PhastaIOActiveFiles[i]->numprocs)); 634 635 636 if( *nfiles > 1 ) { // split the ranks according to each mpiio file 637 638 if ( s_assign_local_comm == 0) { // call mpi_comm_split for the first (and only) time 639 640 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Building subcommunicator\n"); 641 642 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 643 MPI_Comm_split(MPI_COMM_WORLD, 644 color, 645 PhastaIOActiveFiles[i]->myrank, 646 &(PhastaIOActiveFiles[i]->local_comm)); 647 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 648 &(PhastaIOActiveFiles[i]->local_numprocs)); 649 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 650 &(PhastaIOActiveFiles[i]->local_myrank)); 651 652 // back up now these variables so that we do not need to call comm_split again 653 s_local_comm = PhastaIOActiveFiles[i]->local_comm; 654 s_local_size = PhastaIOActiveFiles[i]->local_numprocs; 655 s_local_rank = PhastaIOActiveFiles[i]->local_myrank; 656 s_assign_local_comm = 1; 657 } 658 else { // recycle the subcommunicator 659 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Recycling subcommunicator\n"); 660 PhastaIOActiveFiles[i]->local_comm = s_local_comm; 661 PhastaIOActiveFiles[i]->local_numprocs = s_local_size; 662 PhastaIOActiveFiles[i]->local_myrank = s_local_rank; 663 } 664 } 665 else { // *nfiles == 1 here - no need to call mpi_comm_split here 666 667 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Bypassing subcommunicator\n"); 668 PhastaIOActiveFiles[i]->local_comm = MPI_COMM_WORLD; 669 PhastaIOActiveFiles[i]->local_numprocs = PhastaIOActiveFiles[i]->numprocs; 670 PhastaIOActiveFiles[i]->local_myrank = PhastaIOActiveFiles[i]->myrank; 671 672 } 673 674 PhastaIOActiveFiles[i]->nppp = 675 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 676 677 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 678 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 679 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 680 681 PhastaIOActiveFiles[i]->my_offset_table = 682 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 683 684 PhastaIOActiveFiles[i]->my_read_table = 685 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 686 687 for (j=0; j<*nfields; j++) 688 { 689 PhastaIOActiveFiles[i]->my_offset_table[j] = 690 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 691 692 PhastaIOActiveFiles[i]->my_read_table[j] = 693 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 694 } 695 *filehandle = i; 696 697 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 698 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 699 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 700 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 701 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 702 703 // Time monitoring 704 endTimer(&timer_end); 705 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 706 707 phprintf_0("Info initphmpiio: quiting function"); 708 709 return i; 710 } 711 712 /** 713 * Destruct the file struct and free buffers allocated in init function. 714 */ 715 void finalizephmpiio( int *fileDescriptor ) 716 { 717 double timer_start, timer_end; 718 startTimer(&timer_start); 719 720 int i, j; 721 i = *fileDescriptor; 722 //PhastaIONextActiveIndex--; 723 724 /* //free the offset table for this phasta file */ 725 //for(j=0; j<MAX_FIELDS_NUMBER; j++) //Danger: undefined behavior for my_*_table.[j] not allocated or not initialized to NULL 726 for(j=0; j<PhastaIOActiveFiles[i]->nFields; j++) 727 { 728 free( PhastaIOActiveFiles[i]->my_offset_table[j]); 729 free( PhastaIOActiveFiles[i]->my_read_table[j]); 730 } 731 free ( PhastaIOActiveFiles[i]->my_offset_table ); 732 free ( PhastaIOActiveFiles[i]->my_read_table ); 733 free ( PhastaIOActiveFiles[i]->master_header ); 734 free ( PhastaIOActiveFiles[i]->double_chunk ); 735 free ( PhastaIOActiveFiles[i]->int_chunk ); 736 free ( PhastaIOActiveFiles[i]->read_double_chunk ); 737 free ( PhastaIOActiveFiles[i]->read_int_chunk ); 738 739 if( PhastaIOActiveFiles[i]->nFiles > 1 && s_assign_local_comm ) { // the comm was split 740 if (PhastaIOActiveFiles[i]->myrank == 0) printf("Freeing subcommunicator\n"); 741 s_assign_local_comm = 0; 742 MPI_Comm_free(&(PhastaIOActiveFiles[i]->local_comm)); 743 } 744 745 free( PhastaIOActiveFiles[i]); 746 747 endTimer(&timer_end); 748 printPerf("finalizempiio", timer_start, timer_end, 0, 0, ""); 749 750 PhastaIONextActiveIndex--; 751 } 752 753 754 /** 755 * Special init for M2N in order to create a subcommunicator for the reduced solution (requires PRINT_PERF to be false for now) 756 * Initialize the file struct members and allocate space for file struct buffers. 757 * 758 * Note: this function is only called when we are using new format. Old POSIX 759 * format should skip this routine and call openfile() directly instead. 760 */ 761 int initphmpiiosub( int *nfields, int *nppf, int *nfiles, int *filehandle, const char mode[],MPI_Comm my_local_comm) 762 { 763 // we init irank again in case query not called (e.g. syncIO write case) 764 765 MPI_Comm_rank(my_local_comm, &irank); 766 MPI_Comm_size(my_local_comm, &mysize); 767 768 phprintf("Info initphmpiio: entering function, myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 769 770 double timer_start, timer_end; 771 startTimer(&timer_start); 772 773 char* imode = StringStripper( mode ); 774 775 // Note: if it's read, we presume query was called prior to init and 776 // MasterHeaderSize is already set to correct value from parsing header 777 // otherwise it's write then it needs some computation to be set 778 if ( cscompare( "read", imode ) ) { 779 // do nothing 780 } 781 else if( cscompare( "write", imode ) ) { 782 MasterHeaderSize = computeMHSize(*nfields, *nppf, LATEST_WRITE_VERSION); 783 } 784 else { 785 printf("Error initphmpiio: can't recognize the mode %s", imode); 786 exit(1); 787 } 788 free ( imode ); 789 790 phprintf("Info initphmpiio: myrank = %d, MasterHeaderSize = %d\n", irank, MasterHeaderSize); 791 792 int i, j; 793 794 if( PhastaIONextActiveIndex == MAX_PHASTA_FILES ) { 795 printf("Error initphmpiio: PhastaIONextActiveIndex = MAX_PHASTA_FILES"); 796 endTimer(&timer_end); 797 printPerf("initphmpiio", timer_start, timer_end, 0, 0, ""); 798 return MAX_PHASTA_FILES_EXCEEDED; 799 } 800 // else if( PhastaIONextActiveIndex == 0 ) //Hang in debug mode on Intrepid 801 // { 802 // for( i = 0; i < MAX_PHASTA_FILES; i++ ); 803 // { 804 // PhastaIOActiveFiles[i] = NULL; 805 // } 806 // } 807 808 809 PhastaIOActiveFiles[PhastaIONextActiveIndex] = (phastaio_file_t *)calloc( 1, sizeof( phastaio_file_t) ); 810 811 i = PhastaIONextActiveIndex; 812 PhastaIONextActiveIndex++; 813 814 //PhastaIOActiveFiles[i]->next_start_address = 2*TWO_MEGABYTE; 815 816 PhastaIOActiveFiles[i]->next_start_address = MasterHeaderSize; // what does this mean??? TODO 817 818 PhastaIOActiveFiles[i]->Wrong_Endian = false; 819 820 PhastaIOActiveFiles[i]->nFields = *nfields; 821 PhastaIOActiveFiles[i]->nPPF = *nppf; 822 PhastaIOActiveFiles[i]->nFiles = *nfiles; 823 MPI_Comm_rank(my_local_comm, &(PhastaIOActiveFiles[i]->myrank)); 824 MPI_Comm_size(my_local_comm, &(PhastaIOActiveFiles[i]->numprocs)); 825 826 int color = computeColor(PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->numprocs, PhastaIOActiveFiles[i]->nFiles); 827 MPI_Comm_split(my_local_comm, 828 color, 829 PhastaIOActiveFiles[i]->myrank, 830 &(PhastaIOActiveFiles[i]->local_comm)); 831 MPI_Comm_size(PhastaIOActiveFiles[i]->local_comm, 832 &(PhastaIOActiveFiles[i]->local_numprocs)); 833 MPI_Comm_rank(PhastaIOActiveFiles[i]->local_comm, 834 &(PhastaIOActiveFiles[i]->local_myrank)); 835 PhastaIOActiveFiles[i]->nppp = 836 PhastaIOActiveFiles[i]->nPPF/PhastaIOActiveFiles[i]->local_numprocs; 837 838 PhastaIOActiveFiles[i]->start_id = PhastaIOActiveFiles[i]->nPPF * 839 (int)(PhastaIOActiveFiles[i]->myrank/PhastaIOActiveFiles[i]->local_numprocs) + 840 (PhastaIOActiveFiles[i]->local_myrank * PhastaIOActiveFiles[i]->nppp); 841 842 PhastaIOActiveFiles[i]->my_offset_table = 843 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 844 845 PhastaIOActiveFiles[i]->my_read_table = 846 ( unsigned long ** ) calloc( MAX_FIELDS_NUMBER , sizeof( unsigned long *) ); 847 848 for (j=0; j<*nfields; j++) 849 { 850 PhastaIOActiveFiles[i]->my_offset_table[j] = 851 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 852 853 PhastaIOActiveFiles[i]->my_read_table[j] = 854 ( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nppp , sizeof( unsigned long) ); 855 } 856 *filehandle = i; 857 858 PhastaIOActiveFiles[i]->master_header = (char *)calloc(MasterHeaderSize,sizeof( char )); 859 PhastaIOActiveFiles[i]->double_chunk = (double *)calloc(1,sizeof( double )); 860 PhastaIOActiveFiles[i]->int_chunk = (int *)calloc(1,sizeof( int )); 861 PhastaIOActiveFiles[i]->read_double_chunk = (double *)calloc(1,sizeof( double )); 862 PhastaIOActiveFiles[i]->read_int_chunk = (int *)calloc(1,sizeof( int )); 863 864 // Time monitoring 865 endTimer(&timer_end); 866 printPerf("initphmpiiosub", timer_start, timer_end, 0, 0, ""); 867 868 phprintf_0("Info initphmpiiosub: quiting function"); 869 870 return i; 871 } 872 873 874 875 /** open file for both POSIX and MPI-IO syncIO format. 876 * 877 * If it's old POSIX format, simply call posix fopen(). 878 * 879 * If it's MPI-IO foramt: 880 * in "read" mode, it builds the header table that points to the offset of 881 * fields for parts; 882 * in "write" mode, it opens the file with MPI-IO open routine. 883 */ 884 void openfile(const char filename[], const char mode[], int* fileDescriptor ) 885 { 886 phprintf_0("Info: entering openfile"); 887 888 double timer_start, timer_end; 889 startTimer(&timer_start); 890 891 if ( PhastaIONextActiveIndex == 0 ) 892 { 893 FILE* file=NULL ; 894 *fileDescriptor = 0; 895 char* fname = StringStripper( filename ); 896 char* imode = StringStripper( mode ); 897 898 phioTime t0,t1; 899 phastaio_time(&t0); 900 if ( cscompare( "read", imode ) ) file = fopen(fname, "rb" ); 901 else if( cscompare( "write", imode ) ) file = fopen(fname, "wb" ); 902 else if( cscompare( "append", imode ) ) file = fopen(fname, "ab" ); 903 phastaio_time(&t1); 904 const size_t elapsed = phastaio_time_diff(&t0,&t1); 905 phastaio_addOpenTime(elapsed); 906 907 if ( !file ){ 908 fprintf(stderr,"Error openfile: unable to open file %s\n",fname); 909 } else { 910 fileArray.push_back( file ); 911 byte_order.push_back( false ); 912 header_type.push_back( sizeof(int) ); 913 *fileDescriptor = fileArray.size(); 914 } 915 free (fname); 916 free (imode); 917 } 918 else // else it would be parallel I/O, opposed to posix io 919 { 920 char* fname = StringStripper( filename ); 921 char* imode = StringStripper( mode ); 922 int rc; 923 int i = *fileDescriptor; 924 checkFileDescriptor("openfile",&i); 925 char* token; 926 927 if ( cscompare( "read", imode ) ) 928 { 929 // if (PhastaIOActiveFiles[i]->myrank == 0) 930 // printf("\n **********\nRead open ... ... regular version\n"); 931 932 phioTime t0,t1; 933 phastaio_time(&t0); 934 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 935 fname, 936 MPI_MODE_RDONLY, 937 MPI_INFO_NULL, 938 &(PhastaIOActiveFiles[i]->file_handle) ); 939 phastaio_time(&t1); 940 const size_t elapsed = phastaio_time_diff(&t0,&t1); 941 phastaio_addOpenTime(elapsed); 942 943 if(rc) 944 { 945 *fileDescriptor = UNABLE_TO_OPEN_FILE; 946 int error_string_length; 947 char error_string[4096]; 948 MPI_Error_string(rc, error_string, &error_string_length); 949 fprintf(stderr, "Error openfile: Unable to open file %s! MPI reports \"%s\"\n",fname,error_string); 950 endTimer(&timer_end); 951 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 952 return; 953 } 954 955 MPI_Status read_tag_status; 956 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 957 int j; 958 int magic_number; 959 960 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 961 MPI_File_read_at( PhastaIOActiveFiles[i]->file_handle, 962 0, 963 PhastaIOActiveFiles[i]->master_header, 964 MasterHeaderSize, 965 MPI_CHAR, 966 &read_tag_status ); 967 } 968 969 MPI_Bcast( PhastaIOActiveFiles[i]->master_header, 970 MasterHeaderSize, 971 MPI_CHAR, 972 0, 973 PhastaIOActiveFiles[i]->local_comm ); 974 975 memcpy( read_out_tag, 976 PhastaIOActiveFiles[i]->master_header, 977 MAX_FIELDS_NAME_LENGTH-1 ); 978 979 if ( cscompare ("MPI_IO_Tag",read_out_tag) ) 980 { 981 // Test endianess ... 982 memcpy ( &magic_number, 983 PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 984 sizeof(int) ); // masterheader should look like "MPI_IO_Tag : 12180 " with 12180 in binary format 985 986 if ( magic_number != ENDIAN_TEST_NUMBER ) 987 { 988 PhastaIOActiveFiles[i]->Wrong_Endian = true; 989 } 990 991 memcpy( read_out_tag, 992 PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH+1, // TODO: WHY +1??? 993 MAX_FIELDS_NAME_LENGTH ); 994 995 // Read in # fields ... 996 token = strtok ( read_out_tag, ":" ); 997 token = strtok( NULL," ,;<>" ); 998 PhastaIOActiveFiles[i]->nFields = atoi( token ); 999 1000 unsigned long **header_table; 1001 header_table = ( unsigned long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long *)); 1002 1003 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1004 { 1005 header_table[j]=( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long)); 1006 } 1007 1008 // Read in the offset table ... 1009 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) 1010 { 1011 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1012 memcpy( header_table[j], 1013 PhastaIOActiveFiles[i]->master_header + 1014 VERSION_INFO_HEADER_SIZE + 1015 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long), 1016 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long) ); 1017 } 1018 1019 MPI_Scatter( header_table[j], 1020 PhastaIOActiveFiles[i]->nppp, 1021 MPI_LONG_LONG_INT, 1022 PhastaIOActiveFiles[i]->my_read_table[j], 1023 PhastaIOActiveFiles[i]->nppp, 1024 MPI_LONG_LONG_INT, 1025 0, 1026 PhastaIOActiveFiles[i]->local_comm ); 1027 1028 // Swap byte order if endianess is different ... 1029 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) { 1030 SwapArrayByteOrder( PhastaIOActiveFiles[i]->my_read_table[j], 1031 sizeof(unsigned long), 1032 PhastaIOActiveFiles[i]->nppp ); 1033 } 1034 } 1035 1036 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1037 free ( header_table[j] ); 1038 } 1039 free (header_table); 1040 1041 } // end of if MPI_IO_TAG 1042 else //else not valid MPI file 1043 { 1044 *fileDescriptor = NOT_A_MPI_FILE; 1045 printf("Error openfile: The file %s you opened is not in syncIO new format, please check again! File descriptor = %d, MasterHeaderSize = %d, read_out_tag = %s\n",fname,*fileDescriptor,MasterHeaderSize,read_out_tag); 1046 //Printing MasterHeaderSize is useful to test a compiler bug on Intrepid BGP 1047 endTimer(&timer_end); 1048 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1049 return; 1050 } 1051 } // end of if "read" 1052 else if( cscompare( "write", imode ) ) 1053 { 1054 phioTime t0,t1; 1055 phastaio_time(&t0); 1056 rc = MPI_File_open( PhastaIOActiveFiles[i]->local_comm, 1057 fname, 1058 MPI_MODE_WRONLY | MPI_MODE_CREATE, 1059 MPI_INFO_NULL, 1060 &(PhastaIOActiveFiles[i]->file_handle) ); 1061 phastaio_time(&t1); 1062 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1063 phastaio_addOpenTime(elapsed); 1064 if(rc != MPI_SUCCESS) 1065 { 1066 *fileDescriptor = UNABLE_TO_OPEN_FILE; 1067 int error_string_length; 1068 char error_string[4096]; 1069 MPI_Error_string(rc, error_string, &error_string_length); 1070 fprintf(stderr, "Error openfile: Unable to open file %s! MPI reports \"%s\"\n",fname,error_string); 1071 return; 1072 } 1073 } // end of if "write" 1074 free (fname); 1075 free (imode); 1076 } // end of if FileIndex != 0 1077 1078 endTimer(&timer_end); 1079 printPerf("openfile", timer_start, timer_end, 0, 0, ""); 1080 } 1081 1082 /** close file for both POSIX and MPI-IO syncIO format. 1083 * 1084 * If it's old POSIX format, simply call posix fclose(). 1085 * 1086 * If it's MPI-IO foramt: 1087 * in "read" mode, it simply close file with MPI-IO close routine. 1088 * in "write" mode, rank 0 in each group will re-assemble the master header and 1089 * offset table and write to the beginning of file, then close the file. 1090 */ 1091 void closefile( int* fileDescriptor, const char mode[] ) 1092 { 1093 double timer_start, timer_end; 1094 startTimer(&timer_start); 1095 1096 int i = *fileDescriptor; 1097 checkFileDescriptor("closefile",&i); 1098 1099 if ( PhastaIONextActiveIndex == 0 ) { 1100 char* imode = StringStripper( mode ); 1101 1102 if( cscompare( "write", imode ) 1103 || cscompare( "append", imode ) ) { 1104 fflush( fileArray[ *fileDescriptor - 1 ] ); 1105 } 1106 1107 phioTime t0,t1; 1108 phastaio_time(&t0); 1109 fclose( fileArray[ *fileDescriptor - 1 ] ); 1110 phastaio_time(&t1); 1111 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1112 phastaio_addCloseTime(elapsed); 1113 free (imode); 1114 } 1115 else { 1116 char* imode = StringStripper( mode ); 1117 1118 //write master header here: 1119 if ( cscompare( "write", imode ) ) { 1120 // if ( PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields < 2*ONE_MEGABYTE/8 ) //SHOULD BE CHECKED 1121 // MasterHeaderSize = 4*ONE_MEGABYTE; 1122 // else 1123 // MasterHeaderSize = 4*ONE_MEGABYTE + PhastaIOActiveFiles[i]->nPPF * PhastaIOActiveFiles[i]->nFields * 8 - 2*ONE_MEGABYTE; 1124 1125 MasterHeaderSize = computeMHSize( PhastaIOActiveFiles[i]->nFields, PhastaIOActiveFiles[i]->nPPF, LATEST_WRITE_VERSION); 1126 phprintf_0("Info closefile: myrank = %d, MasterHeaderSize = %d\n", PhastaIOActiveFiles[i]->myrank, MasterHeaderSize); 1127 1128 MPI_Status write_header_status; 1129 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1130 char version[MAX_FIELDS_NAME_LENGTH/4]; 1131 char mhsize[MAX_FIELDS_NAME_LENGTH/4]; 1132 int magic_number = ENDIAN_TEST_NUMBER; 1133 1134 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) 1135 { 1136 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1137 sprintf(mpi_tag, "MPI_IO_Tag : "); 1138 memcpy(PhastaIOActiveFiles[i]->master_header, 1139 mpi_tag, 1140 MAX_FIELDS_NAME_LENGTH); 1141 1142 bzero((void*)version,MAX_FIELDS_NAME_LENGTH/4); 1143 // this version is "1", print version in ASCII 1144 sprintf(version, "version : %d",1); 1145 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/2, 1146 version, 1147 MAX_FIELDS_NAME_LENGTH/4); 1148 1149 // master header size is computed using the formula above 1150 bzero((void*)mhsize,MAX_FIELDS_NAME_LENGTH/4); 1151 sprintf(mhsize, "mhsize : "); 1152 memcpy(PhastaIOActiveFiles[i]->master_header + MAX_FIELDS_NAME_LENGTH/4*3, 1153 mhsize, 1154 MAX_FIELDS_NAME_LENGTH/4); 1155 1156 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1157 sprintf(mpi_tag, 1158 "\nnFields : %d\n", 1159 PhastaIOActiveFiles[i]->nFields); 1160 memcpy(PhastaIOActiveFiles[i]->master_header+MAX_FIELDS_NAME_LENGTH, 1161 mpi_tag, 1162 MAX_FIELDS_NAME_LENGTH); 1163 1164 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1165 sprintf(mpi_tag, "\nnPPF : %d\n", PhastaIOActiveFiles[i]->nPPF); 1166 memcpy( PhastaIOActiveFiles[i]->master_header+ 1167 PhastaIOActiveFiles[i]->nFields * 1168 MAX_FIELDS_NAME_LENGTH + 1169 MAX_FIELDS_NAME_LENGTH * 2, 1170 mpi_tag, 1171 MAX_FIELDS_NAME_LENGTH); 1172 1173 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("MPI_IO_Tag : ")-1, //-1 sizeof returns the size of the string+1 for "\0" 1174 &magic_number, 1175 sizeof(int)); 1176 1177 memcpy( PhastaIOActiveFiles[i]->master_header+sizeof("mhsize : ") -1 + MAX_FIELDS_NAME_LENGTH/4*3, 1178 &MasterHeaderSize, 1179 sizeof(int)); 1180 } 1181 1182 int j = 0; 1183 unsigned long **header_table; 1184 header_table = ( unsigned long ** )calloc(PhastaIOActiveFiles[i]->nFields, sizeof(unsigned long *)); 1185 1186 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1187 header_table[j]=( unsigned long * ) calloc( PhastaIOActiveFiles[i]->nPPF , sizeof( unsigned long)); 1188 } 1189 1190 //if( irank == 0 ) printf("gonna mpi_gather, myrank = %d\n", irank); 1191 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1192 MPI_Gather( PhastaIOActiveFiles[i]->my_offset_table[j], 1193 PhastaIOActiveFiles[i]->nppp, 1194 MPI_LONG_LONG_INT, 1195 header_table[j], 1196 PhastaIOActiveFiles[i]->nppp, 1197 MPI_LONG_LONG_INT, 1198 0, 1199 PhastaIOActiveFiles[i]->local_comm ); 1200 } 1201 1202 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1203 1204 //if( irank == 0 ) printf("gonna memcpy for every procs, myrank = %d\n", irank); 1205 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1206 memcpy ( PhastaIOActiveFiles[i]->master_header + 1207 VERSION_INFO_HEADER_SIZE + 1208 j * PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long), 1209 header_table[j], 1210 PhastaIOActiveFiles[i]->nPPF * sizeof(unsigned long) ); 1211 } 1212 1213 //if( irank == 0 ) printf("gonna file_write_at(), myrank = %d\n", irank); 1214 MPI_File_write_at( PhastaIOActiveFiles[i]->file_handle, 1215 0, 1216 PhastaIOActiveFiles[i]->master_header, 1217 MasterHeaderSize, 1218 MPI_CHAR, 1219 &write_header_status ); 1220 } 1221 1222 ////free(PhastaIOActiveFiles[i]->master_header); 1223 1224 for ( j = 0; j < PhastaIOActiveFiles[i]->nFields; j++ ) { 1225 free ( header_table[j] ); 1226 } 1227 free (header_table); 1228 } 1229 1230 //if( irank == 0 ) printf("gonna file_close(), myrank = %d\n", irank); 1231 phioTime t0,t1; 1232 phastaio_time(&t0); 1233 MPI_File_close( &( PhastaIOActiveFiles[i]->file_handle ) ); 1234 phastaio_time(&t1); 1235 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1236 phastaio_addCloseTime(elapsed); 1237 free ( imode ); 1238 } 1239 1240 endTimer(&timer_end); 1241 printPerf("closefile_", timer_start, timer_end, 0, 0, ""); 1242 } 1243 1244 int readHeader( FILE* f, const char phrase[], 1245 int* params, int numParams, const char* iotype) { 1246 isBinary(iotype); 1247 return readHeader(f,phrase,params,numParams); 1248 } 1249 1250 void readheader( 1251 int* fileDescriptor, 1252 const char keyphrase[], 1253 void* valueArray, 1254 int* nItems, 1255 const char datatype[], 1256 const char iotype[] ) 1257 { 1258 double timer_start, timer_end; 1259 1260 startTimer(&timer_start); 1261 1262 int i = *fileDescriptor; 1263 checkFileDescriptor("readheader",&i); 1264 1265 if ( PhastaIONextActiveIndex == 0 ) { 1266 int filePtr = *fileDescriptor - 1; 1267 FILE* fileObject; 1268 int* valueListInt; 1269 1270 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1271 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1272 fprintf(stderr,"openfile_ function has to be called before \n") ; 1273 fprintf(stderr,"acessing the file\n ") ; 1274 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1275 endTimer(&timer_end); 1276 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1277 return; 1278 } 1279 1280 LastHeaderKey[filePtr] = std::string(keyphrase); 1281 LastHeaderNotFound = false; 1282 1283 fileObject = fileArray[ filePtr ] ; 1284 Wrong_Endian = byte_order[ filePtr ]; 1285 1286 isBinary( iotype ); 1287 typeSize( datatype ); //redundant call, just avoid a compiler warning. 1288 1289 // right now we are making the assumption that we will only write integers 1290 // on the header line. 1291 1292 valueListInt = static_cast< int* >( valueArray ); 1293 int ierr = readHeader( fileObject , 1294 keyphrase, 1295 valueListInt, 1296 *nItems ) ; 1297 1298 byte_order[ filePtr ] = Wrong_Endian ; 1299 1300 if ( ierr ) LastHeaderNotFound = true; 1301 1302 //return ; // don't return, go to the end to print perf 1303 } 1304 else { 1305 int* valueListInt; 1306 valueListInt = static_cast <int*>(valueArray); 1307 char* token = NULL; 1308 bool FOUND = false ; 1309 isBinary( iotype ); 1310 1311 MPI_Status read_offset_status; 1312 char read_out_tag[MAX_FIELDS_NAME_LENGTH]; 1313 memset(read_out_tag, '\0', MAX_FIELDS_NAME_LENGTH); 1314 char readouttag[MAX_FIELDS_NUMBER][MAX_FIELDS_NAME_LENGTH]; 1315 int j; 1316 1317 int string_length = strlen( keyphrase ); 1318 char* buffer = (char*) malloc ( string_length+1 ); 1319 1320 strcpy ( buffer, keyphrase ); 1321 buffer[ string_length ] = '\0'; 1322 1323 char* st2 = strtok ( buffer, "@" ); 1324 st2 = strtok (NULL, "@"); 1325 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1326 if ( char* p = strpbrk(buffer, "@") ) 1327 *p = '\0'; 1328 1329 // Check if the user has input the right GPid 1330 if ( ( PhastaIOActiveFiles[i]->GPid <= 1331 PhastaIOActiveFiles[i]->myrank * 1332 PhastaIOActiveFiles[i]->nppp )|| 1333 ( PhastaIOActiveFiles[i]->GPid > 1334 ( PhastaIOActiveFiles[i]->myrank + 1 ) * 1335 PhastaIOActiveFiles[i]->nppp ) ) 1336 { 1337 *fileDescriptor = NOT_A_MPI_FILE; 1338 printf("Error readheader: The file is not in syncIO new format, please check! myrank = %d, GPid = %d, nppp = %d, keyphrase = %s\n", PhastaIOActiveFiles[i]->myrank, PhastaIOActiveFiles[i]->GPid, PhastaIOActiveFiles[i]->nppp, keyphrase); 1339 // It is possible atoi could not generate a clear integer from st2 because of additional garbage character in keyphrase 1340 endTimer(&timer_end); 1341 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1342 return; 1343 } 1344 1345 // Find the field we want ... 1346 //for ( j = 0; j<MAX_FIELDS_NUMBER; j++ ) 1347 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1348 { 1349 memcpy( readouttag[j], 1350 PhastaIOActiveFiles[i]->master_header + j*MAX_FIELDS_NAME_LENGTH+MAX_FIELDS_NAME_LENGTH*2+1, 1351 MAX_FIELDS_NAME_LENGTH-1 ); 1352 } 1353 1354 for ( j = 0; j<PhastaIOActiveFiles[i]->nFields; j++ ) 1355 { 1356 token = strtok ( readouttag[j], ":" ); 1357 1358 //if ( cscompare( buffer, token ) ) 1359 if ( cscompare( token , buffer ) && cscompare( buffer, token ) ) 1360 // This double comparison is required for the field "number of nodes" and all the other fields that start with "number of nodes" (i.g. number of nodes in the mesh"). 1361 // Would be safer to rename "number of nodes" by "number of nodes in the part" so that the name are completely unique. But much more work to do that (Nspre, phParAdapt, etc). 1362 // Since the field name are unique in SyncIO (as it includes part ID), this should be safe and there should be no issue with the "?" trailing character. 1363 { 1364 PhastaIOActiveFiles[i]->read_field_count = j; 1365 FOUND = true; 1366 //printf("buffer: %s | token: %s | j: %d\n",buffer,token,j); 1367 break; 1368 } 1369 } 1370 free(buffer); 1371 1372 if (!FOUND) 1373 { 1374 //if(irank==0) printf("Warning readheader: Not found %s \n",keyphrase); //PhastaIOActiveFiles[i]->myrank is certainly initialized here. 1375 if(PhastaIOActiveFiles[i]->myrank == 0) printf("WARNING readheader: Not found %s\n",keyphrase); 1376 endTimer(&timer_end); 1377 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1378 return; 1379 } 1380 1381 // Find the part we want ... 1382 PhastaIOActiveFiles[i]->read_part_count = PhastaIOActiveFiles[i]->GPid - 1383 PhastaIOActiveFiles[i]->myrank * PhastaIOActiveFiles[i]->nppp - 1; 1384 1385 PhastaIOActiveFiles[i]->my_offset = 1386 PhastaIOActiveFiles[i]->my_read_table[PhastaIOActiveFiles[i]->read_field_count][PhastaIOActiveFiles[i]->read_part_count]; 1387 1388 // printf("****Rank %d offset is %d\n",PhastaIOActiveFiles[i]->myrank,PhastaIOActiveFiles[i]->my_offset); 1389 1390 // Read each datablock header here ... 1391 1392 MPI_File_read_at_all( PhastaIOActiveFiles[i]->file_handle, 1393 PhastaIOActiveFiles[i]->my_offset+1, 1394 read_out_tag, 1395 MAX_FIELDS_NAME_LENGTH-1, 1396 MPI_CHAR, 1397 &read_offset_status ); 1398 token = strtok ( read_out_tag, ":" ); 1399 1400 // printf("&&&&Rank %d read_out_tag is %s\n",PhastaIOActiveFiles[i]->myrank,read_out_tag); 1401 1402 if( cscompare( keyphrase , token ) ) //No need to compare also token with keyphrase like above. We should already have the right one. Otherwise there is a problem. 1403 { 1404 FOUND = true ; 1405 token = strtok( NULL, " ,;<>" ); 1406 for( j=0; j < *nItems && ( token = strtok( NULL," ,;<>") ); j++ ) 1407 valueListInt[j] = atoi( token ); 1408 1409 if ( j < *nItems ) 1410 { 1411 fprintf( stderr, "Expected # of ints not found for: %s\n", keyphrase ); 1412 } 1413 } 1414 else { 1415 //if(irank==0) 1416 if(PhastaIOActiveFiles[i]->myrank == 0) 1417 // If we enter this if, there is a problem with the name of some fields 1418 { 1419 printf("Error readheader: Unexpected mismatch between keyphrase = %s and token = %s\n",keyphrase,token); 1420 } 1421 } 1422 } 1423 1424 endTimer(&timer_end); 1425 printPerf("readheader", timer_start, timer_end, 0, 0, ""); 1426 1427 } 1428 1429 void readDataBlock( 1430 FILE* fileObject, 1431 void* valueArray, 1432 int nItems, 1433 const char datatype[], 1434 const char iotype[] ) 1435 { 1436 isBinary(iotype); 1437 size_t type_size = typeSize( datatype ); 1438 phioTime t0,t1; 1439 phastaio_time(&t0); 1440 if ( binary_format ) { 1441 char junk = '\0'; 1442 fread( valueArray, type_size, nItems, fileObject ); 1443 fread( &junk, sizeof(char), 1 , fileObject ); 1444 if ( Wrong_Endian ) SwapArrayByteOrder( valueArray, type_size, nItems ); 1445 } else { 1446 char* ts1 = StringStripper( datatype ); 1447 if ( cscompare( "integer", ts1 ) ) { 1448 for( int n=0; n < nItems ; n++ ) 1449 fscanf(fileObject, "%d\n",(int*)((int*)valueArray+n) ); 1450 } else if ( cscompare( "double", ts1 ) ) { 1451 for( int n=0; n < nItems ; n++ ) 1452 fscanf(fileObject, "%lf\n",(double*)((double*)valueArray+n) ); 1453 } 1454 free (ts1); 1455 } 1456 phastaio_time(&t1); 1457 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1458 phastaio_addReadTime(elapsed); 1459 phastaio_addReadBytes(nItems*type_size); 1460 } 1461 1462 void readdatablock( 1463 int* fileDescriptor, 1464 const char keyphrase[], 1465 void* valueArray, 1466 int* nItems, 1467 const char datatype[], 1468 const char iotype[] ) 1469 { 1470 //if(irank == 0) printf("entering readdatablock()\n"); 1471 unsigned long data_size = 0; 1472 double timer_start, timer_end; 1473 startTimer(&timer_start); 1474 1475 int i = *fileDescriptor; 1476 checkFileDescriptor("readdatablock",&i); 1477 1478 if ( PhastaIONextActiveIndex == 0 ) { 1479 int filePtr = *fileDescriptor - 1; 1480 FILE* fileObject; 1481 1482 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1483 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1484 fprintf(stderr,"openfile_ function has to be called before\n") ; 1485 fprintf(stderr,"acessing the file\n ") ; 1486 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1487 endTimer(&timer_end); 1488 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1489 return; 1490 } 1491 1492 // error check.. 1493 // since we require that a consistant header always preceed the data block 1494 // let us check to see that it is actually the case. 1495 1496 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1497 fprintf(stderr, "Header not consistant with data block\n"); 1498 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1499 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1500 fprintf(stderr, "Please recheck read sequence \n"); 1501 if( Strict_Error ) { 1502 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1503 endTimer(&timer_end); 1504 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1505 return; 1506 } 1507 } 1508 1509 if ( LastHeaderNotFound ) { 1510 endTimer(&timer_end); 1511 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1512 return; 1513 } 1514 fileObject = fileArray[ filePtr ]; 1515 Wrong_Endian = byte_order[ filePtr ]; 1516 LastHeaderKey.erase(filePtr); 1517 readDataBlock(fileObject,valueArray,*nItems,datatype,iotype); 1518 1519 //return; 1520 } 1521 else { 1522 // printf("read data block\n"); 1523 MPI_Status read_data_status; 1524 size_t type_size = typeSize( datatype ); 1525 int nUnits = *nItems; 1526 isBinary( iotype ); 1527 1528 // read datablock then 1529 //MR CHANGE 1530 // if ( cscompare ( datatype, "double")) 1531 char* ts2 = StringStripper( datatype ); 1532 if ( cscompare ( "double" , ts2)) 1533 //MR CHANGE END 1534 { 1535 1536 phioTime t0,t1; 1537 phastaio_time(&t0); 1538 MPI_File_read_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1539 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1540 valueArray, 1541 nUnits, 1542 MPI_DOUBLE ); 1543 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1544 valueArray, 1545 &read_data_status ); 1546 data_size=8*nUnits; 1547 phastaio_time(&t1); 1548 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1549 phastaio_addReadTime(elapsed); 1550 phastaio_addReadBytes(nUnits*sizeof(double)); 1551 } 1552 //MR CHANGE 1553 // else if ( cscompare ( datatype, "integer")) 1554 else if ( cscompare ( "integer" , ts2)) 1555 //MR CHANGE END 1556 { 1557 phioTime t0,t1; 1558 phastaio_time(&t0); 1559 MPI_File_read_at_all_begin(PhastaIOActiveFiles[i]->file_handle, 1560 PhastaIOActiveFiles[i]->my_offset + DB_HEADER_SIZE, 1561 valueArray, 1562 nUnits, 1563 MPI_INT ); 1564 MPI_File_read_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1565 valueArray, 1566 &read_data_status ); 1567 data_size=4*nUnits; 1568 phastaio_time(&t1); 1569 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1570 phastaio_addReadTime(elapsed); 1571 phastaio_addReadBytes(nUnits*sizeof(int)); 1572 } 1573 else 1574 { 1575 *fileDescriptor = DATA_TYPE_ILLEGAL; 1576 printf("readdatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 1577 endTimer(&timer_end); 1578 printPerf("readdatablock", timer_start, timer_end, 0, 0, ""); 1579 return; 1580 } 1581 free(ts2); 1582 1583 1584 // printf("%d Read finishe\n",PhastaIOActiveFiles[i]->myrank); 1585 1586 // Swap data byte order if endianess is different ... 1587 if ( PhastaIOActiveFiles[i]->Wrong_Endian ) 1588 { 1589 SwapArrayByteOrder( valueArray, type_size, nUnits ); 1590 } 1591 } 1592 1593 endTimer(&timer_end); 1594 char extra_msg[1024]; 1595 memset(extra_msg, '\0', 1024); 1596 char* key = StringStripper(keyphrase); 1597 sprintf(extra_msg, " field is %s ", key); 1598 printPerf("readdatablock", timer_start, timer_end, data_size, 1, extra_msg); 1599 free(key); 1600 1601 } 1602 1603 void writeHeader( 1604 FILE* f, 1605 const char keyphrase[], 1606 const void* valueArray, 1607 const int nItems, 1608 const int ndataItems, 1609 const char datatype[], 1610 const char iotype[]) 1611 { 1612 isBinary( iotype ); 1613 1614 const int _newline = 1615 ( ndataItems > 0 ) ? sizeof( char ) : 0; 1616 int size_of_nextblock = 1617 ( binary_format ) ? typeSize(datatype) * ndataItems + _newline : ndataItems; 1618 1619 fprintf( f, "%s : < %d > ", keyphrase, size_of_nextblock ); 1620 for( int i = 0; i < nItems; i++ ) 1621 fprintf( f, "%d ", *((int*)((int*)valueArray+i))); 1622 fprintf( f, "\n"); 1623 } 1624 1625 void writeheader( 1626 const int* fileDescriptor, 1627 const char keyphrase[], 1628 const void* valueArray, 1629 const int* nItems, 1630 const int* ndataItems, 1631 const char datatype[], 1632 const char iotype[]) 1633 { 1634 1635 //if(irank == 0) printf("entering writeheader()\n"); 1636 1637 double timer_start, timer_end; 1638 startTimer(&timer_start); 1639 1640 int i = *fileDescriptor; 1641 checkFileDescriptor("writeheader",&i); 1642 1643 if ( PhastaIONextActiveIndex == 0 ) { 1644 int filePtr = *fileDescriptor - 1; 1645 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1646 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1647 fprintf(stderr,"openfile_ function has to be called before \n") ; 1648 fprintf(stderr,"acessing the file\n ") ; 1649 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1650 endTimer(&timer_end); 1651 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1652 return; 1653 } 1654 1655 LastHeaderKey[filePtr] = std::string(keyphrase); 1656 DataSize = *ndataItems; 1657 FILE* fileObject = fileArray[ filePtr ] ; 1658 header_type[ filePtr ] = typeSize( datatype ); 1659 writeHeader(fileObject,keyphrase,valueArray,*nItems, 1660 *ndataItems,datatype,iotype); 1661 } 1662 else { // else it's parallel I/O 1663 DataSize = *ndataItems; 1664 size_t type_size = typeSize( datatype ); 1665 isBinary( iotype ); 1666 char mpi_tag[MAX_FIELDS_NAME_LENGTH]; 1667 1668 int string_length = strlen( keyphrase ); 1669 char* buffer = (char*) malloc ( string_length+1 ); 1670 1671 strcpy ( buffer, keyphrase); 1672 buffer[ string_length ] = '\0'; 1673 1674 char* st2 = strtok ( buffer, "@" ); 1675 st2 = strtok (NULL, "@"); 1676 PhastaIOActiveFiles[i]->GPid = atoi(st2); 1677 1678 if ( char* p = strpbrk(buffer, "@") ) 1679 *p = '\0'; 1680 1681 bzero((void*)mpi_tag,MAX_FIELDS_NAME_LENGTH); 1682 sprintf(mpi_tag, "\n%s : %d\n", buffer, PhastaIOActiveFiles[i]->field_count); 1683 unsigned long offset_value; 1684 1685 int temp = *ndataItems; 1686 unsigned long number_of_items = (unsigned long)temp; 1687 MPI_Barrier(PhastaIOActiveFiles[i]->local_comm); 1688 1689 MPI_Scan( &number_of_items, 1690 &offset_value, 1691 1, 1692 MPI_LONG_LONG_INT, 1693 MPI_SUM, 1694 PhastaIOActiveFiles[i]->local_comm ); 1695 1696 offset_value = (offset_value - number_of_items) * type_size; 1697 1698 offset_value += PhastaIOActiveFiles[i]->local_myrank * 1699 DB_HEADER_SIZE + 1700 PhastaIOActiveFiles[i]->next_start_address; 1701 // This offset is the starting address of each datablock header... 1702 PhastaIOActiveFiles[i]->my_offset = offset_value; 1703 1704 // Write in my offset table ... 1705 PhastaIOActiveFiles[i]->my_offset_table[PhastaIOActiveFiles[i]->field_count][PhastaIOActiveFiles[i]->part_count] = 1706 PhastaIOActiveFiles[i]->my_offset; 1707 1708 // Update the next-start-address ... 1709 PhastaIOActiveFiles[i]->next_start_address = offset_value + 1710 number_of_items * type_size + 1711 DB_HEADER_SIZE; 1712 MPI_Bcast( &(PhastaIOActiveFiles[i]->next_start_address), 1713 1, 1714 MPI_LONG_LONG_INT, 1715 PhastaIOActiveFiles[i]->local_numprocs-1, 1716 PhastaIOActiveFiles[i]->local_comm ); 1717 1718 // Prepare datablock header ... 1719 int _newline = (*ndataItems>0)?sizeof(char):0; 1720 unsigned int size_of_nextblock = type_size * (*ndataItems) + _newline; 1721 1722 //char datablock_header[255]; 1723 //bzero((void*)datablock_header,255); 1724 char datablock_header[DB_HEADER_SIZE]; 1725 bzero((void*)datablock_header,DB_HEADER_SIZE); 1726 1727 PhastaIOActiveFiles[i]->GPid = PhastaIOActiveFiles[i]->nppp*PhastaIOActiveFiles[i]->myrank+PhastaIOActiveFiles[i]->part_count; 1728 sprintf( datablock_header, 1729 "\n%s : < %u >", 1730 keyphrase, 1731 size_of_nextblock ); 1732 1733 for ( int j = 0; j < *nItems; j++ ) 1734 { 1735 sprintf( datablock_header, 1736 "%s %d ", 1737 datablock_header, 1738 *((int*)((int*)valueArray+j))); 1739 } 1740 sprintf( datablock_header, 1741 "%s\n ", 1742 datablock_header ); 1743 1744 // Write datablock header ... 1745 //MR CHANGE 1746 // if ( cscompare(datatype,"double") ) 1747 char* ts1 = StringStripper( datatype ); 1748 if ( cscompare("double",ts1) ) 1749 //MR CHANGE END 1750 { 1751 free ( PhastaIOActiveFiles[i]->double_chunk ); 1752 PhastaIOActiveFiles[i]->double_chunk = ( double * )malloc( (sizeof( double )*number_of_items+ DB_HEADER_SIZE)); 1753 1754 double * aa = ( double * )datablock_header; 1755 memcpy(PhastaIOActiveFiles[i]->double_chunk, aa, DB_HEADER_SIZE); 1756 } 1757 //MR CHANGE 1758 // if ( cscompare(datatype,"integer") ) 1759 else if ( cscompare("integer",ts1) ) 1760 //MR CHANGE END 1761 { 1762 free ( PhastaIOActiveFiles[i]->int_chunk ); 1763 PhastaIOActiveFiles[i]->int_chunk = ( int * )malloc( (sizeof( int )*number_of_items+ DB_HEADER_SIZE)); 1764 1765 int * aa = ( int * )datablock_header; 1766 memcpy(PhastaIOActiveFiles[i]->int_chunk, aa, DB_HEADER_SIZE); 1767 } 1768 else { 1769 // *fileDescriptor = DATA_TYPE_ILLEGAL; 1770 printf("writeheader - DATA_TYPE_ILLEGAL - %s\n",datatype); 1771 endTimer(&timer_end); 1772 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1773 return; 1774 } 1775 free(ts1); 1776 1777 PhastaIOActiveFiles[i]->part_count++; 1778 if ( PhastaIOActiveFiles[i]->part_count == PhastaIOActiveFiles[i]->nppp ) { 1779 //A new field will be written 1780 if ( PhastaIOActiveFiles[i]->local_myrank == 0 ) { 1781 memcpy( PhastaIOActiveFiles[i]->master_header + 1782 PhastaIOActiveFiles[i]->field_count * 1783 MAX_FIELDS_NAME_LENGTH + 1784 MAX_FIELDS_NAME_LENGTH * 2, 1785 mpi_tag, 1786 MAX_FIELDS_NAME_LENGTH-1); 1787 } 1788 PhastaIOActiveFiles[i]->field_count++; 1789 PhastaIOActiveFiles[i]->part_count=0; 1790 } 1791 free(buffer); 1792 } 1793 1794 endTimer(&timer_end); 1795 printPerf("writeheader", timer_start, timer_end, 0, 0, ""); 1796 } 1797 1798 void writeDataBlock( 1799 FILE* f, 1800 const void* valueArray, 1801 const int nItems, 1802 const char datatype[], 1803 const char iotype[] ) 1804 { 1805 isBinary( iotype ); 1806 size_t type_size = typeSize( datatype ); 1807 phioTime t0,t1; 1808 phastaio_time(&t0); 1809 if ( binary_format ) { 1810 fwrite( valueArray, type_size, nItems, f ); 1811 fprintf( f,"\n"); 1812 } else { 1813 char* ts1 = StringStripper( datatype ); 1814 if ( cscompare( "integer", ts1 ) ) { 1815 const int* vals = (int*) valueArray; 1816 for( int n=0; n < nItems ; n++ ) 1817 fprintf(f,"%d\n",vals[n]); 1818 } else if ( cscompare( "double", ts1 ) ) { 1819 const double* vals = (double*) valueArray; 1820 for( int n=0; n < nItems ; n++ ) 1821 fprintf(f,"%f\n",vals[n]); 1822 } 1823 free (ts1); 1824 } 1825 phastaio_time(&t1); 1826 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1827 phastaio_addWriteTime(elapsed); 1828 phastaio_addWriteBytes(nItems*type_size); 1829 } 1830 1831 void writedatablock( 1832 const int* fileDescriptor, 1833 const char keyphrase[], 1834 const void* valueArray, 1835 const int* nItems, 1836 const char datatype[], 1837 const char iotype[] ) 1838 { 1839 //if(irank == 0) printf("entering writedatablock()\n"); 1840 1841 unsigned long data_size = 0; 1842 double timer_start, timer_end; 1843 startTimer(&timer_start); 1844 1845 int i = *fileDescriptor; 1846 checkFileDescriptor("writedatablock",&i); 1847 1848 if ( PhastaIONextActiveIndex == 0 ) { 1849 int filePtr = *fileDescriptor - 1; 1850 1851 if ( *fileDescriptor < 1 || *fileDescriptor > (int)fileArray.size() ) { 1852 fprintf(stderr,"No file associated with Descriptor %d\n",*fileDescriptor); 1853 fprintf(stderr,"openfile_ function has to be called before \n") ; 1854 fprintf(stderr,"acessing the file\n ") ; 1855 fprintf(stderr,"fatal error: cannot continue, returning out of call\n"); 1856 endTimer(&timer_end); 1857 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1858 return; 1859 } 1860 // since we require that a consistant header always preceed the data block 1861 // let us check to see that it is actually the case. 1862 1863 if ( ! cscompare( LastHeaderKey[ filePtr ].c_str(), keyphrase ) ) { 1864 fprintf(stderr, "Header not consistant with data block\n"); 1865 fprintf(stderr, "Header: %s\n", LastHeaderKey[ filePtr ].c_str() ); 1866 fprintf(stderr, "DataBlock: %s\n ", keyphrase ); 1867 fprintf(stderr, "Please recheck write sequence \n"); 1868 if( Strict_Error ) { 1869 fprintf(stderr, "fatal error: cannot continue, returning out of call\n"); 1870 endTimer(&timer_end); 1871 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1872 return; 1873 } 1874 } 1875 1876 FILE* fileObject = fileArray[ filePtr ] ; 1877 size_t type_size=typeSize( datatype ); 1878 isBinary( iotype ); 1879 1880 LastHeaderKey.erase(filePtr); 1881 1882 if ( header_type[filePtr] != (int)type_size ) { 1883 fprintf(stderr,"header and datablock differ on typeof data in the block for\n"); 1884 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1885 if( Strict_Error ) { 1886 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1887 endTimer(&timer_end); 1888 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1889 return; 1890 } 1891 } 1892 1893 int nUnits = *nItems; 1894 1895 if ( nUnits != DataSize ) { 1896 fprintf(stderr,"header and datablock differ on number of data items for\n"); 1897 fprintf(stderr,"keyphrase : %s\n", keyphrase); 1898 if( Strict_Error ) { 1899 fprintf(stderr,"fatal error: cannot continue, returning out of call\n" ); 1900 endTimer(&timer_end); 1901 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1902 return; 1903 } 1904 } 1905 writeDataBlock(fileObject,valueArray,*nItems,datatype,iotype); 1906 } 1907 else { // syncIO case 1908 MPI_Status write_data_status; 1909 isBinary( iotype ); 1910 int nUnits = *nItems; 1911 1912 //MR CHANGE 1913 // if ( cscompare(datatype,"double") ) 1914 char* ts1 = StringStripper( datatype ); 1915 if ( cscompare("double",ts1) ) 1916 //MR CHANGE END 1917 { 1918 memcpy((PhastaIOActiveFiles[i]->double_chunk+DB_HEADER_SIZE/sizeof(double)), valueArray, nUnits*sizeof(double)); 1919 phioTime t0,t1; 1920 phastaio_time(&t0); 1921 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1922 PhastaIOActiveFiles[i]->my_offset, 1923 PhastaIOActiveFiles[i]->double_chunk, 1924 //BLOCK_SIZE/sizeof(double), 1925 nUnits+DB_HEADER_SIZE/sizeof(double), 1926 MPI_DOUBLE ); 1927 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1928 PhastaIOActiveFiles[i]->double_chunk, 1929 &write_data_status ); 1930 data_size=8*nUnits; 1931 phastaio_time(&t1); 1932 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1933 phastaio_addWriteTime(elapsed); 1934 phastaio_addWriteBytes((nUnits*sizeof(double))+DB_HEADER_SIZE); 1935 } 1936 //MR CHANGE 1937 // else if ( cscompare ( datatype, "integer")) 1938 else if ( cscompare("integer",ts1) ) 1939 //MR CHANGE END 1940 { 1941 memcpy((PhastaIOActiveFiles[i]->int_chunk+DB_HEADER_SIZE/sizeof(int)), valueArray, nUnits*sizeof(int)); 1942 phioTime t0,t1; 1943 phastaio_time(&t0); 1944 MPI_File_write_at_all_begin( PhastaIOActiveFiles[i]->file_handle, 1945 PhastaIOActiveFiles[i]->my_offset, 1946 PhastaIOActiveFiles[i]->int_chunk, 1947 nUnits+DB_HEADER_SIZE/sizeof(int), 1948 MPI_INT ); 1949 MPI_File_write_at_all_end( PhastaIOActiveFiles[i]->file_handle, 1950 PhastaIOActiveFiles[i]->int_chunk, 1951 &write_data_status ); 1952 data_size=4*nUnits; 1953 phastaio_time(&t1); 1954 const size_t elapsed = phastaio_time_diff(&t0,&t1); 1955 phastaio_addWriteTime(elapsed); 1956 phastaio_addWriteBytes((nUnits*sizeof(int))+DB_HEADER_SIZE); 1957 } 1958 else { 1959 printf("Error: writedatablock - DATA_TYPE_ILLEGAL - %s\n",datatype); 1960 endTimer(&timer_end); 1961 printPerf("writedatablock", timer_start, timer_end, 0, 0, ""); 1962 return; 1963 } 1964 free(ts1); 1965 } 1966 1967 endTimer(&timer_end); 1968 char extra_msg[1024]; 1969 memset(extra_msg, '\0', 1024); 1970 char* key = StringStripper(keyphrase); 1971 sprintf(extra_msg, " field is %s ", key); 1972 printPerf("writedatablock", timer_start, timer_end, data_size, 1, extra_msg); 1973 free(key); 1974 1975 } 1976 1977 void SwapArrayByteOrder( void* array, int nbytes, int nItems ) 1978 { 1979 /* This swaps the byte order for the array of nItems each 1980 of size nbytes , This will be called only locally */ 1981 int i,j; 1982 unsigned char* ucDst = (unsigned char*)array; 1983 for(i=0; i < nItems; i++) { 1984 for(j=0; j < (nbytes/2); j++) 1985 std::swap( ucDst[j] , ucDst[(nbytes - 1) - j] ); 1986 ucDst += nbytes; 1987 } 1988 } 1989 1990 void writestring( int* fileDescriptor, const char inString[] ) 1991 { 1992 int filePtr = *fileDescriptor - 1; 1993 FILE* fileObject = fileArray[filePtr] ; 1994 fprintf(fileObject,"%s",inString ); 1995 return; 1996 } 1997 1998 void Gather_Headers( int* fileDescriptor, vector< string >& headers ) 1999 { 2000 FILE* fileObject; 2001 char Line[1024]; 2002 2003 fileObject = fileArray[ (*fileDescriptor)-1 ]; 2004 2005 while( !feof(fileObject) ) { 2006 fgets( Line, 1024, fileObject); 2007 if ( Line[0] == '#' ) { 2008 headers.push_back( Line ); 2009 } else { 2010 break; 2011 } 2012 } 2013 rewind( fileObject ); 2014 clearerr( fileObject ); 2015 } 2016 2017 void isWrong( void ) { 2018 (Wrong_Endian) ? fprintf(stdout,"YES\n") : fprintf(stdout,"NO\n"); 2019 } 2020 2021 void togglestrictmode( void ) { Strict_Error = !Strict_Error; } 2022 2023 int isLittleEndian( void ) 2024 { 2025 // this function returns a 1 if the current running architecture is 2026 // LittleEndian Byte Ordered, else it returns a zero 2027 union { 2028 long a; 2029 char c[sizeof( long )]; 2030 } endianUnion; 2031 endianUnion.a = 1 ; 2032 if ( endianUnion.c[sizeof(long)-1] != 1 ) return 1 ; 2033 else return 0; 2034 } 2035 2036 namespace PHASTA { 2037 const char* const PhastaIO_traits<int>::type_string = "integer"; 2038 const char* const PhastaIO_traits<double>::type_string = "double"; 2039 } 2040