diff --git a/CMakeLists.txt b/CMakeLists.txt index a007bc42c0..082c7eb3bd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -37,6 +37,7 @@ option (PIO_ENABLE_INTERNAL_TIMING "Gather and print GPTL timing stats" OFF) option (PIO_ENABLE_API_TIMING "Enable timer for all (costly) SCORPIO APIs (requires PIO_ENABLE_TIMING=ON)" ON) option (PIO_ENABLE_API_TRACING "Enable tracing of SCORPIO APIs" OFF) option (PIO_ENABLE_API_VAR_TRACING "Enable tracing of variables when tracing SCORPIO APIs" OFF) +option (PIO_ENABLE_API_DECOMP_TRACING "Enable tracing of I/O decompositions when tracing SCORPIO APIs" OFF) option (PIO_ENABLE_IO_STATS "Gather and print I/O performance stats" ON) option (PIO_ENABLE_LOGGING "Enable debug logging (large output possible)" OFF) option (PIO_ENABLE_DOC "Enable building SCORPIO documentation" ON) diff --git a/src/clib/CMakeLists.txt b/src/clib/CMakeLists.txt index 89c6ff3b84..7566402f0e 100644 --- a/src/clib/CMakeLists.txt +++ b/src/clib/CMakeLists.txt @@ -430,10 +430,18 @@ if (PIO_ENABLE_API_TRACING) message(STATUS "Disabling variable tracing in APIs (default)") set(ENABLE_API_VAR_TRACING 0) endif () + if (PIO_ENABLE_API_DECOMP_TRACING) + message(STATUS "Enabling I/O decomposition tracing in APIs") + set(ENABLE_API_DECOMP_TRACING 1) + else () + message(STATUS "Disabling I/O decomposition tracing in APIs (default)") + set(ENABLE_API_DECOMP_TRACING 0) + endif () else () message(STATUS "API tracing disabled (default)") set(ENABLE_API_TRACING 0) set(ENABLE_API_VAR_TRACING 0) + set(ENABLE_API_DECOMP_TRACING 0) endif () # The MPI library detection was done in the top level diff --git a/src/clib/api/spio_io_decomp_api.cpp b/src/clib/api/spio_io_decomp_api.cpp index ac1046300d..3a914b86d8 100644 --- a/src/clib/api/spio_io_decomp_api.cpp +++ b/src/clib/api/spio_io_decomp_api.cpp @@ -28,7 +28,9 @@ int PIOc_InitDecomp(int iosysid, int pio_type, int ndims, const int *gdimlen, in compmap, ioidp, rearr, iostart, iocount); #if SPIO_ENABLE_API_TRACING +#if SPIO_ENABLE_API_DECOMP_TRACING tr.set_decomp_info((ioidp) ? (*ioidp) : -1, compmap, maplen); +#endif tr.add_rval("*ioidp", (ioidp) ? (*ioidp) : -1); #endif return ret; @@ -80,7 +82,9 @@ int PIOc_init_decomp(int iosysid, int pio_type, int ndims, const int *gdimlen, i compmap, ioidp, rearranger, iostart, iocount); #if SPIO_ENABLE_API_TRACING +#if SPIO_ENABLE_API_DECOMP_TRACING tr.set_decomp_info((ioidp) ? (*ioidp) : -1, compmap, maplen); +#endif tr.add_rval("*ioidp", (ioidp) ? (*ioidp) : -1); #endif return ret; diff --git a/src/clib/core/iolib/hdf5/spio_async_hdf5_utils.cpp b/src/clib/core/iolib/hdf5/spio_async_hdf5_utils.cpp index 7d259d9a9d..7d0b901de4 100644 --- a/src/clib/core/iolib/hdf5/spio_async_hdf5_utils.cpp +++ b/src/clib/core/iolib/hdf5/spio_async_hdf5_utils.cpp @@ -29,6 +29,7 @@ extern "C"{ #include "spio_dbg_utils.hpp" #include "spio_dt_converter.hpp" #include "spio_hdf5_utils.hpp" +#include "spio_async_tcomm.hpp" struct Hdf5_create_info{ file_desc_t *file; @@ -124,6 +125,7 @@ static inline void update_reg_infos_start_frame(std::vector ®_in void spio_iosys_async_op_hdf5_create_free(void *pdata) { + //SPIO_Util::Dbg_Util::Stdout_logger(__func__); if(pdata){ delete(static_cast(pdata)); } @@ -136,6 +138,7 @@ int spio_iosys_async_op_hdf5_create(void *pdata) Hdf5_create_info *cinfo = static_cast(pdata); assert(cinfo && cinfo->file && cinfo->file->iosystem); + //SPIO_Util::Dbg_Util::Stdout_logger(std::string(__func__) + cinfo->fname); ret = spio_hdf5_create(cinfo->file->iosystem, cinfo->file, cinfo->fname.c_str()); @@ -183,6 +186,7 @@ int spio_iosys_async_hdf5_create_op_add(file_desc_t *file, const char *filename) void spio_iosys_async_op_hdf5_def_var_free(void *pdata) { + //SPIO_Util::Dbg_Util::Stdout_logger(__func__); if(pdata){ delete(static_cast(pdata)); } @@ -193,6 +197,7 @@ int spio_iosys_async_op_hdf5_def_var(void *pdata) int ret = PIO_NOERR; Hdf5_def_var_info *def_var_info = static_cast(pdata); + //SPIO_Util::Dbg_Util::Stdout_logger(std::string(__func__) + ": fname = " + def_var_info->file->fname + ", vname: " + def_var_info->vname); assert(def_var_info && def_var_info->file && def_var_info->file->iosystem); @@ -246,6 +251,7 @@ int spio_iosys_async_hdf5_def_var_op_add(file_desc_t *file, const char *name, void spio_iosys_async_op_hdf5_put_att_free(void *pdata) { + //SPIO_Util::Dbg_Util::Stdout_logger(__func__); if(pdata){ Hdf5_put_att_info *info = static_cast(pdata); free(info->abuf); @@ -262,6 +268,7 @@ int spio_iosys_async_op_hdf5_put_att(void *pdata) assert(info && info->file && info->file->iosystem); assert((info->varid >= 0) || (info->varid == PIO_GLOBAL)); assert(info->alen >= 0); + //SPIO_Util::Dbg_Util::Stdout_logger(std::string(__func__) + ": fname = " + info->file->fname + ", aname: " + info->aname); ret = spio_hdf5_put_att(info->file->iosystem, info->file, info->varid, info->aname.c_str(), info->atype, @@ -321,6 +328,7 @@ int spio_iosys_async_hdf5_put_att_op_add(file_desc_t *file, int varid, void spio_iosys_async_op_hdf5_enddef_free(void *pdata) { + //SPIO_Util::Dbg_Util::Stdout_logger(__func__); if(pdata){ delete(static_cast(pdata)); } @@ -333,6 +341,7 @@ int spio_iosys_async_op_hdf5_enddef(void *pdata) Hdf5_enddef_info *info = static_cast(pdata); assert(info && info->file && info->file->iosystem); + //SPIO_Util::Dbg_Util::Stdout_logger(std::string(__func__) + ": fname = " + info->file->fname); ret = spio_hdf5_enddef(info->file->iosystem, info->file); @@ -380,6 +389,7 @@ int spio_iosys_async_hdf5_enddef_op_add(file_desc_t *file) void spio_iosys_async_op_hdf5_put_var_free(void *pdata) { + //SPIO_Util::Dbg_Util::Stdout_logger(__func__); if(pdata){ Hdf5_put_var_info *info = static_cast(pdata); free(info->vbuf); @@ -395,6 +405,7 @@ int spio_iosys_async_op_hdf5_put_var(void *pdata) assert(info && info->file && info->file->iosystem); assert((info->varid >= 0) || (info->varid == PIO_GLOBAL)); + //SPIO_Util::Dbg_Util::Stdout_logger(std::string(__func__) + ": fname = " + info->file->fname + ", varid=" + std::to_string(info->varid)); ret = spio_hdf5_put_var(info->file->iosystem, info->file, info->varid, @@ -476,6 +487,7 @@ int spio_iosys_async_hdf5_put_var_op_add(file_desc_t *file, int varid, void spio_iosys_async_op_hdf5_set_frame_free(void *pdata) { + //SPIO_Util::Dbg_Util::Stdout_logger(__func__); if(pdata){ delete(static_cast(pdata)); } @@ -488,6 +500,7 @@ int spio_iosys_async_op_hdf5_set_frame(void *pdata) Hdf5_set_frame_info *info = static_cast(pdata); assert(info && info->file && info->file->iosystem); + //SPIO_Util::Dbg_Util::Stdout_logger(std::string(__func__) + ": fname = " + info->file->fname + ", frame=" + std::to_string(info->frame)); ret = spio_hdf5_set_frame(info->file, info->varid, info->frame); @@ -537,6 +550,7 @@ int spio_iosys_async_hdf5_set_frame_op_add(file_desc_t *file, int varid, int fra int spio_wait_all_hdf5_async_ops(int iosysid) { + //SPIO_Util::Dbg_Util::Stdout_logger(std::string(__func__) + ", iosysid=" + std::to_string(iosysid)); unsigned long long int sleep_time = 0; /* Sleep for 0.5 seconds */ const int SLEEP_TIME_IN_MILLISECONDS = 500; @@ -562,6 +576,7 @@ int pio_iosys_async_op_hdf5_write(void *pdata) int ret = PIO_NOERR; Hdf5_wcache *wcache = static_cast(pdata); assert(wcache); + //SPIO_Util::Dbg_Util::Stdout_logger(std::string(__func__) + ", fname=" + wcache->file->fname + ", nvars=" + std::to_string(wcache->nvars)); file_desc_t *file = wcache->file; int nvars = wcache->nvars; @@ -768,7 +783,7 @@ int pio_iosys_async_op_hdf5_write(void *pdata) } /* FIXME: Is this barrier needed ? */ - MPI_Barrier(ios->io_comm); + MPI_Barrier(ios->tcomm_info->get_io_comm()); iodesc->nasync_pend_ops--; file->npend_ops--; @@ -783,6 +798,7 @@ int pio_iosys_async_op_hdf5_write(void *pdata) void pio_iosys_async_op_hdf5_write_free(void *pdata) { + //SPIO_Util::Dbg_Util::Stdout_logger(__func__); #ifdef _HDF5 Hdf5_wcache *wcache = static_cast(pdata); assert(wcache); diff --git a/src/clib/core/pio_file.cpp b/src/clib/core/pio_file.cpp index 8e9cb48e89..8b566ed23b 100644 --- a/src/clib/core/pio_file.cpp +++ b/src/clib/core/pio_file.cpp @@ -14,6 +14,7 @@ #include #include "spio_hdf5_utils.hpp" #include "spio_async_tcomm.hpp" +#include "spio_dbg_utils.hpp" #ifdef _ADIOS2 #include "../../tools/adios2pio-nm/adios2pio-nm-lib-c.h" @@ -492,6 +493,7 @@ int spio_hard_closefile(iosystem_desc_t *ios, file_desc_t *file, #endif assert(ios && file); + //SPIO_Util::Dbg_Util::Stdout_logger(std::string(__func__) + file->fname); /* Get the lock before proceeding */ std::lock_guard lg(*(file->pmtx)); @@ -500,7 +502,11 @@ int spio_hard_closefile(iosystem_desc_t *ios, file_desc_t *file, if(sync_with_ioprocs){ //if(ios->ioproc) { while(file->npend_ops){} } - MPI_Barrier(ios->union_comm); + /* For I/O async threads the union comm is NULL comm */ + MPI_Comm sync_comm = ios->tcomm_info->get_union_comm(); + if(sync_comm != MPI_COMM_NULL){ + MPI_Barrier(sync_comm); + } } if(file->is_hard_closed) { return PIO_NOERR; } @@ -802,7 +808,12 @@ int spio_hard_closefile(iosystem_desc_t *ios, file_desc_t *file, } } +#if PIO_USE_ASYNC_WR_THREAD + /* FIXME: Use the iosystem/file error handlers once error handlers are multi-threaded */ + ierr = spio_handle_err(NULL, file, PIO_RETURN_ERROR, ierr, __FILE__, __LINE__); +#else ierr = check_netcdf(NULL, file, ierr, __FILE__, __LINE__); +#endif if(ierr != PIO_NOERR){ LOG((1, "nc*_close failed, ierr = %d", ierr)); return pio_err(NULL, file, ierr, __FILE__, __LINE__, @@ -862,6 +873,7 @@ int spio_hard_closefile(iosystem_desc_t *ios, file_desc_t *file, int spio_soft_closefile(iosystem_desc_t *ios, file_desc_t *file) { assert(ios && file && ios->ioproc); + //SPIO_Util::Dbg_Util::Stdout_logger(std::string(__func__) + file->fname); return pio_iosys_async_file_close_op_add(file); } @@ -974,24 +986,9 @@ int PIOc_closefile_impl(int ncid) if(ios->ioproc){ soft_close = true; } - else{ - /* Wait on all hdf5 async ops before a "hard close" */ - /* + /* To wait on all hdf5 async ops before a "hard close" (for debugging) ierr = spio_wait_all_hdf5_async_ops(ios->iosysid); - if(ierr != PIO_NOERR){ - return pio_err(ios, file, ierr, __FILE__, __LINE__, - "Closing file (%s, ncid=%d) failed. Error sending async msg PIO_MSG_CLOSE_FILE", pio_get_fname_from_file(file), ncid); - } - */ - } - } - else{ - /* Wait on all hdf5 async ops before a "hard close" */ - ierr = spio_wait_all_hdf5_async_ops(ios->iosysid); - if(ierr != PIO_NOERR){ - return pio_err(ios, file, ierr, __FILE__, __LINE__, - "Closing file (%s, ncid=%d) failed. Error sending async msg PIO_MSG_CLOSE_FILE", pio_get_fname_from_file(file), ncid); - } + */ } #endif diff --git a/src/clib/core/pioc_support.cpp b/src/clib/core/pioc_support.cpp index c4d7430297..3e40f9e4e0 100644 --- a/src/clib/core/pioc_support.cpp +++ b/src/clib/core/pioc_support.cpp @@ -1379,73 +1379,78 @@ int pio_err(iosystem_desc_t *ios, file_desc_t *file, int err_num, const char *fname, int line, const char *uerr_msg_fmt, ...) { - char err_msg[PIO_MAX_NAME + 1]; - int err_handler = default_error_handler; /* Default error handler. */ - int ret; + char err_msg[PIO_MAX_NAME + 1]; + int err_handler = default_error_handler; /* Default error handler. */ + int ret; - /* User must provide this. */ - pioassert(fname, "file name must be provided", __FILE__, __LINE__); + /* User must provide this. */ + pioassert(fname, "file name must be provided", __FILE__, __LINE__); - /* No harm, no foul. */ - if (err_num == PIO_NOERR) - return PIO_NOERR; + /* No harm, no foul. */ + if(err_num == PIO_NOERR) { return PIO_NOERR; } - /* Get the error message. */ - if ((ret = PIOc_strerror_impl(err_num, err_msg, PIO_MAX_NAME))) - return ret; + /* Get the error message. */ + ret = PIOc_strerror_impl(err_num, err_msg, PIO_MAX_NAME); + if(ret != PIO_NOERR) { return ret; } - /* PIO_MAX_NAME may not be enough to store the entire user error - * message, especially when long filenames are provided by the - * user in the message - */ - const int UERR_MSG_MAX_LEN = 8192; - char uerr_msg[UERR_MSG_MAX_LEN + 1]; - va_list argp; - va_start(argp, uerr_msg_fmt); - vsnprintf(uerr_msg, UERR_MSG_MAX_LEN, uerr_msg_fmt, argp); - va_end(argp); + /* PIO_MAX_NAME may not be enough to store the entire user error + * message, especially when long filenames are provided by the + * user in the message + */ + const int UERR_MSG_MAX_LEN = 8192; + char uerr_msg[UERR_MSG_MAX_LEN + 1]; + va_list argp; - /* If logging is in use, log an error message. */ - LOG((1, "ERROR: %s. %s err_num = %d fname = %s line = %d", uerr_msg, err_msg, err_num, fname ? fname : "\0", line)); + va_start(argp, uerr_msg_fmt); + vsnprintf(uerr_msg, UERR_MSG_MAX_LEN, uerr_msg_fmt, argp); + va_end(argp); - /* What error handler should we use? */ - if (file) - { - ios = file->iosystem; - err_handler = ios->error_handler; - } - else if (ios) - err_handler = ios->error_handler; + /* If logging is in use, log an error message. */ + LOG((1, "ERROR: %s. %s err_num = %d fname = %s line = %d", uerr_msg, err_msg, err_num, fname ? fname : "\0", line)); - LOG((2, "pio_err chose error handler = %d", err_handler)); + /* What error handler should we use? */ + if(file){ + ios = file->iosystem; + err_handler = ios->error_handler; + } + else if(ios){ + err_handler = ios->error_handler; + } - /* Should we abort? */ - if (err_handler == PIO_INTERNAL_ERROR) - { - /* For debugging only, this will print a traceback of the call tree. */ - piodie(fname, line, "An error occured, %s. %s (err=%d). Aborting since the error handler was set to PIO_INTERNAL_ERROR...", uerr_msg, err_msg, err_num); + LOG((2, "pio_err chose error handler = %d", err_handler)); + + /* Should we abort? */ + if(err_handler == PIO_INTERNAL_ERROR){ + /* For debugging only, this will print a traceback of the call tree. */ + piodie(fname, line, "An error occured, %s. %s (err=%d). Aborting since the error handler was set to PIO_INTERNAL_ERROR...", uerr_msg, err_msg, err_num); + } + else if(err_handler != PIO_RETURN_ERROR){ + /* If the user does not explicitly ask to return error, print + * the error message in stderr on the root IO proc + */ + bool print_err_msg = true; + if(ios){ + if(ios->tcomm_info->get_union_comm() != MPI_COMM_NULL){ + print_err_msg = (ios->tcomm_info->get_union_comm_rank() == ios->tcomm_info->get_union_comm_io_root()) ? true : false; + } + else if(ios->tcomm_info->get_io_comm() != MPI_COMM_NULL){ + print_err_msg = (ios->tcomm_info->get_io_comm_rank() == 0) ? true : false; + } } - else if (err_handler != PIO_RETURN_ERROR) - { - /* If the user does not explicitly ask to return error, print - * the error message in stderr on the root IO proc - */ - bool print_err_msg = (ios) ? (ios->tcomm_info->get_union_comm_rank() == ios->tcomm_info->get_union_comm_io_root()) : true; - if (print_err_msg) - { - fprintf(stderr, "PIO: ERROR: %s. %s (error num=%d), (%s:%d)\n", uerr_msg, err_msg, err_num, (fname) ? fname : "\0", line); + if(print_err_msg){ + fprintf(stderr, "PIO: ERROR: %s. %s (error num=%d), (%s:%d)\n", uerr_msg, err_msg, err_num, (fname) ? fname : "\0", line); #ifdef _HDF5 - H5Eprint2(H5E_DEFAULT, stderr); + H5Eprint2(H5E_DEFAULT, stderr); #endif - fflush(stderr); - } + fflush(stderr); } + } - /* For PIO_BCAST_ERROR and PIO_RETURN_ERROR error handlers - * just return the error code back to the caller - */ - return err_num; + /* For PIO_BCAST_ERROR and PIO_RETURN_ERROR error handlers + * just return the error code back to the caller + */ + return err_num; } /** @@ -4843,14 +4848,9 @@ int PIOc_openfile_retry_impl(int iosysid, int *ncidp, int *iotype, const char *f "Creating file (%s) failed. Error closing previous soft closed file", filename); } - //if(((*iotype == PIO_IOTYPE_HDF5) || (*iotype == PIO_IOTYPE_HDF5C)) && !(mode & PIO_WRITE)){ - if((*iotype == PIO_IOTYPE_HDF5) || (*iotype == PIO_IOTYPE_HDF5C)){ - ierr = spio_wait_all_hdf5_async_ops(ios->iosysid); - if(ierr != PIO_NOERR){ - return pio_err(ios, NULL, ierr, __FILE__, __LINE__, - "Creating file (%s) failed. Error waiting on all pending asynchronous HDF5 ops", filename); - } - } + /* To wait on all hdf5 async ops (for debugging), + * ierr = spio_wait_all_hdf5_async_ops(ios->iosysid); + */ #endif /* Allocate space for the file info. */ diff --git a/src/clib/core/progress_engine/spio_async_tcomm.cpp b/src/clib/core/progress_engine/spio_async_tcomm.cpp index 5e7375e89c..b785932cde 100644 --- a/src/clib/core/progress_engine/spio_async_tcomm.cpp +++ b/src/clib/core/progress_engine/spio_async_tcomm.cpp @@ -12,8 +12,13 @@ thread_local bool SPIO_Util::TComm_info::is_thread_init_ = false; SPIO_Util::TComm_info::TComm_info(MPI_Comm union_comm, int union_comm_rank, int union_comm_io_root, int union_comm_comp_root, MPI_Comm io_comm, int io_comm_rank, bool is_io_master, MPI_Comm comp_comm, int comp_comm_rank, bool is_comp_master, MPI_Comm intercomm, MPI_Comm my_comm, MPI_Comm node_comm): union_comm_rank_(union_comm_rank), union_comm_io_root_(union_comm_io_root), union_comm_comp_root_(union_comm_comp_root), io_comm_rank_(io_comm_rank), is_io_master_(is_io_master), comp_comm_rank_(comp_comm_rank), is_comp_master_(is_comp_master) { + int nthreads = 1; #if PIO_USE_ASYNC_WR_THREAD - tids_ = PIO_Util::PIO_async_tpool_manager::get_tpool_instance()->get_thread_ids(); + nthreads += PIO_Util::PIO_async_tpool_manager::get_num_threads(); + if(io_comm != MPI_COMM_NULL){ + tids_ = PIO_Util::PIO_async_tpool_manager::get_tpool_instance()->get_thread_ids(); + assert(static_cast(tids_.size()) == nthreads - 1); + } #endif /* Total number of comms = Main/Default thread + Number of threads in thread pool @@ -29,12 +34,12 @@ SPIO_Util::TComm_info::TComm_info(MPI_Comm union_comm, int union_comm_rank, int node_comms_.push_back(node_comm); int ret = MPI_SUCCESS; - for(std::size_t i = 0; i < tids_.size(); i++){ - MPI_Comm tmp_union_comm = MPI_COMM_NULL; - if(union_comm != MPI_COMM_NULL){ - ret = MPI_Comm_dup(union_comm, &tmp_union_comm); assert(ret == MPI_SUCCESS); - } - union_comms_.push_back(tmp_union_comm); + for(int i = 0; i < nthreads - 1; i++){ + /* Since the I/O threads no longer have access (or should be using) + * to global communicators, except the I/O communicator (io_comm), + * all other communicators are assigned to COMM_NULL (not duped) + */ + union_comms_.push_back(MPI_COMM_NULL); MPI_Comm tmp_io_comm = MPI_COMM_NULL; if(io_comm != MPI_COMM_NULL){ @@ -42,39 +47,22 @@ SPIO_Util::TComm_info::TComm_info(MPI_Comm union_comm, int union_comm_rank, int } io_comms_.push_back(tmp_io_comm); - MPI_Comm tmp_comp_comm = MPI_COMM_NULL; - if(comp_comm != MPI_COMM_NULL){ - ret = MPI_Comm_dup(comp_comm, &tmp_comp_comm); assert(ret == MPI_SUCCESS); - } - comp_comms_.push_back(tmp_comp_comm); - - MPI_Comm tmp_comm = MPI_COMM_NULL; - if(intercomm != MPI_COMM_NULL){ - ret = MPI_Comm_dup(intercomm, &tmp_comm); assert(ret == MPI_SUCCESS); - } - intercomms_.push_back(tmp_comm); - + comp_comms_.push_back(MPI_COMM_NULL); + intercomms_.push_back(MPI_COMM_NULL); /* my_comm, see iosystem_desc_t for details, is just a copy (not dup) * of union_comm or comp_comm */ - if(my_comm == union_comm){ - my_comms_.push_back(tmp_union_comm); - } - else if(my_comm == comp_comm){ - my_comms_.push_back(tmp_comp_comm); - } - else if(my_comm == io_comm){ + if(my_comm == io_comm){ my_comms_.push_back(tmp_io_comm); } else{ - assert(0); + my_comms_.push_back(MPI_COMM_NULL); } - tmp_comm = MPI_COMM_NULL; - if(node_comm != MPI_COMM_NULL){ - ret = MPI_Comm_dup(node_comm, &tmp_comm); assert(ret == MPI_SUCCESS); - } - node_comms_.push_back(tmp_comm); + /* FIXME: To start using node_comm we need to create a node local + * comm with just the I/O procs and dup it + */ + node_comms_.push_back(MPI_COMM_NULL); } } @@ -128,15 +116,18 @@ SPIO_Util::TComm_info::~TComm_info() * Free other comms */ + int nthreads = 1; +#if PIO_USE_ASYNC_WR_THREAD /* Number of threads including the main thread */ - std::size_t nthreads = tids_.size() + 1; - assert(union_comms_.size() == nthreads); - for(std::size_t tidx = 1; tidx < nthreads; tidx++){ - if(union_comms_[tidx] != MPI_COMM_NULL) { MPI_Comm_free(&union_comms_[tidx]); } + nthreads += PIO_Util::PIO_async_tpool_manager::get_num_threads(); +#endif + assert(static_cast(union_comms_.size()) == nthreads); + for(int tidx = 1; tidx < nthreads; tidx++){ + assert(union_comms_[tidx] == MPI_COMM_NULL); if(io_comms_[tidx] != MPI_COMM_NULL) { MPI_Comm_free(&io_comms_[tidx]); } - if(comp_comms_[tidx] != MPI_COMM_NULL) { MPI_Comm_free(&comp_comms_[tidx]); } - if(intercomms_[tidx] != MPI_COMM_NULL) { MPI_Comm_free(&intercomms_[tidx]); } - if(node_comms_[tidx] != MPI_COMM_NULL) { MPI_Comm_free(&node_comms_[tidx]); } + assert(comp_comms_[tidx] == MPI_COMM_NULL); + assert(intercomms_[tidx] == MPI_COMM_NULL); + assert(node_comms_[tidx] == MPI_COMM_NULL); } std::for_each(comm_infos_.begin(), comm_infos_.end(), [](MPI_Info &info) { MPI_Info_free(&info); }); diff --git a/src/clib/core/progress_engine/spio_async_tpool.cpp b/src/clib/core/progress_engine/spio_async_tpool.cpp index f96d271717..d5bd254637 100644 --- a/src/clib/core/progress_engine/spio_async_tpool.cpp +++ b/src/clib/core/progress_engine/spio_async_tpool.cpp @@ -89,11 +89,9 @@ int PIO_Util::PIO_async_tpool::dequeue_and_process( PIO_Util::PIO_async_tpool * PIO_Util::PIO_async_tpool_manager::get_tpool_instance(void ) { - /* We need to make NUM_THREADS configurable by the user (compile-time) */ - const int NUM_THREADS = SPIO_ASYNC_NTHREADS; if(tpool_ == NULL){ LOG((2, "PIO_async_tpool_manager:get_tpool_instance: Creating new tpool instance")); - tpool_ = new PIO_Util::PIO_async_tpool(NUM_THREADS); + tpool_ = new PIO_Util::PIO_async_tpool(get_num_threads()); } else{ LOG((2, "PIO_async_tpool_manager:get_tpool_instance: Retrieving tpool instance")); diff --git a/src/clib/core/progress_engine/spio_async_tpool.hpp b/src/clib/core/progress_engine/spio_async_tpool.hpp index 8a6a9d0cf7..773a9c0e1e 100644 --- a/src/clib/core/progress_engine/spio_async_tpool.hpp +++ b/src/clib/core/progress_engine/spio_async_tpool.hpp @@ -27,6 +27,7 @@ class PIO_async_tpool{ class PIO_async_tpool_manager{ public: + static int get_num_threads(void) { return SPIO_ASYNC_NTHREADS; } static PIO_async_tpool *get_tpool_instance(void ); ~PIO_async_tpool_manager(); private: diff --git a/src/clib/pio_config.h.in b/src/clib/pio_config.h.in index 43105607b4..928f96566d 100644 --- a/src/clib/pio_config.h.in +++ b/src/clib/pio_config.h.in @@ -183,6 +183,11 @@ * 0 otherwise */ #define SPIO_ENABLE_API_VAR_TRACING @ENABLE_API_VAR_TRACING@ +/** Set to 1 if the library is configured to trace API calls & + * trace I/O decomposition data in the API calls, + * 0 otherwise */ +#define SPIO_ENABLE_API_DECOMP_TRACING @ENABLE_API_DECOMP_TRACING@ + /** Set to 1 if the library is configured to use PnetCDF independent data mode, * 0 otherwise */ #define PIO_USE_INDEP_MODE @USE_INDEP_MODE@ diff --git a/src/clib/util/spio_dbg_utils.hpp b/src/clib/util/spio_dbg_utils.hpp index 749afb120d..46f03c6a53 100644 --- a/src/clib/util/spio_dbg_utils.hpp +++ b/src/clib/util/spio_dbg_utils.hpp @@ -64,6 +64,15 @@ namespace SPIO_Util{ return ostr.str(); } + class Stdout_logger{ + public: + Stdout_logger(const std::string &info) : info_(info) { std::cout << "Entering " << info_.c_str() << "\n" << std::flush; } + ~Stdout_logger() { std::cout << "Leaving " << info_.c_str() << "\n" << std::flush; } + + private: + const std::string info_; + }; + std::string get_iodesc_info(io_desc_t *ios); void get_stack_trace(std::vector &st); diff --git a/src/clib/util/spio_tracer_decomp.cpp b/src/clib/util/spio_tracer_decomp.cpp index 1baa837ab1..6655b9c759 100644 --- a/src/clib/util/spio_tracer_decomp.cpp +++ b/src/clib/util/spio_tracer_decomp.cpp @@ -16,7 +16,7 @@ #include "pio.h" #include "pio_internal.h" #include "mpi.h" -#if PIO_USE_PNETCDF +#if SPIO_ENABLE_API_DECOMP_TRACING #include "pnetcdf.h" #endif #include "spio_tracer.hpp" @@ -120,7 +120,7 @@ void SPIO_Util::Tracer::Decomp_Utils::Decomp_nc_logger::init(void ) fname_ = get_trace_decomp_tmp_fname(iosysid_, comm_, comm_rank_, wrank); -#if PIO_USE_PNETCDF +#if SPIO_ENABLE_API_DECOMP_TRACING int flags = NC_CLOBBER | NC_64BIT_DATA; ret = ncmpi_create(comm_, fname_.c_str(), flags, info, &ncid_); if(ret != NC_NOERR){ @@ -171,7 +171,7 @@ void SPIO_Util::Tracer::Decomp_Utils::Decomp_nc_logger::init(void ) throw std::runtime_error(std::string("Ending NetCDF define mode in I/O decomposition log file, file = ") + fname_ + std::string(", failed. iosysid = ") + std::to_string(iosysid_)); } -#endif // PIO_USE_PNETCDF +#endif // SPIO_ENABLE_API_DECOMP_TRACING ret = MPI_Info_free(&info); assert(ret == MPI_SUCCESS); @@ -249,7 +249,7 @@ void SPIO_Util::Tracer::Decomp_Utils::Decomp_nc_logger::log_decomp(int ioid, con std::string decomp_vname = std::string("decomp_") + std::to_string(ioid); int decomp_varid = INVALID_NCID; -#if PIO_USE_PNETCDF +#if SPIO_ENABLE_API_DECOMP_TRACING ret = ncmpi_redef(ncid_); if(ret != NC_NOERR){ throw std::runtime_error(std::string("Unable to enter redef mode to write decompositing map, ncid = ") + std::to_string(ncid_) + @@ -290,12 +290,12 @@ void SPIO_Util::Tracer::Decomp_Utils::Decomp_nc_logger::log_decomp(int ioid, con throw std::runtime_error(std::string("Error while waiting after writing the decomposition map, ncid = ") + std::to_string(ncid_) + std::string(", vname = ") + decomp_vname); } -#endif // PIO_USE_PNETCDF +#endif // SPIO_ENABLE_API_DECOMP_TRACING } void SPIO_Util::Tracer::Decomp_Utils::Decomp_nc_logger::finalize(void ) { -#if PIO_USE_PNETCDF +#if SPIO_ENABLE_API_DECOMP_TRACING if(ncid_ != INVALID_NCID){ int ret = MPI_SUCCESS; int dim_comm_sz_dimid = INVALID_NCID, dim_ndecompsx3_dimid = INVALID_NCID; @@ -367,14 +367,14 @@ void SPIO_Util::Tracer::Decomp_Utils::Decomp_nc_logger::finalize(void ) ncid_ = INVALID_NCID; } -#endif // PIO_USE_PNETCDF +#endif // SPIO_ENABLE_API_DECOMP_TRACING } int SPIO_Util::Tracer::Decomp_Utils::Decomp_nc_logger::get_dim_id(PIO_Offset dimsz) { int dimid = INVALID_NCID, ret = 0; -#if PIO_USE_PNETCDF +#if SPIO_ENABLE_API_DECOMP_TRACING std::map::iterator dim_id_iter = dim_sz_to_id_.find(dimsz); if(dim_id_iter == dim_sz_to_id_.end()){ assert(ncid_ != INVALID_NCID); @@ -405,7 +405,7 @@ int SPIO_Util::Tracer::Decomp_Utils::Decomp_nc_logger::get_dim_id(PIO_Offset dim else{ dimid = dim_id_iter->second; } -#endif // PIO_USE_PNETCDF +#endif // SPIO_ENABLE_API_DECOMP_TRACING return dimid; } diff --git a/tests/general/pio_decomp_tests.F90.in b/tests/general/pio_decomp_tests.F90.in index 17b78f5215..791228889e 100644 --- a/tests/general/pio_decomp_tests.F90.in +++ b/tests/general/pio_decomp_tests.F90.in @@ -326,3 +326,110 @@ PIO_TF_AUTO_TEST_SUB_BEGIN nc_write_1d_reuse_decomp call PIO_freedecomp(pio_tf_iosystem_, iodesc) PIO_TF_AUTO_TEST_SUB_END nc_write_1d_reuse_decomp + +! Create many decompositions and write to multiple files +! (1 decomp per file, many vars per file, writes using the same buffer) +! Note: For async+threaded case, the creation of the decompositions can +! run in parallel to the writes to files +PIO_TF_AUTO_TEST_SUB_BEGIN many_decomps_to_files + implicit none + integer, parameter :: NUM_FILES = 10 + integer, parameter :: NUM_VARS = 20 + integer, parameter :: VEC_LOCAL_SZ = 7 + integer, dimension(VEC_LOCAL_SZ) :: compdof, compdof_rel_disps + integer, dimension(VEC_LOCAL_SZ) :: buf, rbuf + integer, dimension(1) :: dims + character(len=PIO_TF_MAX_STR_LEN) :: tmp_idx_str + character(len=*), parameter :: PIO_VAR_NAME = 'PIO_TF_test_var1' + character(len=*), parameter :: PIO_DIM_NAME = 'PIO_TF_test_var_dim1' + + type :: test_file_info + character(len=PIO_TF_MAX_STR_LEN) :: filename + type(file_desc_t) :: pio_file + character(len=PIO_TF_MAX_STR_LEN) :: vnames(NUM_VARS) + type(var_desc_t) :: pio_var(NUM_VARS) + integer :: pio_dim + type(io_desc_t) :: iodesc + end type test_file_info + + type(test_file_info) :: tf_infos(NUM_FILES) + + integer :: i, j, k, ierr + ! iotypes = valid io types + integer, dimension(:), allocatable :: iotypes + character(len=PIO_TF_MAX_STR_LEN), dimension(:), allocatable :: iotype_descs + integer :: num_iotypes + + do i=1,VEC_LOCAL_SZ + compdof_rel_disps(i) = i + end do + dims(1) = VEC_LOCAL_SZ * pio_tf_world_sz_ + compdof = VEC_LOCAL_SZ * pio_tf_world_rank_ + compdof_rel_disps + buf = pio_tf_world_rank_; + + num_iotypes = 0 + call PIO_TF_Get_nc_iotypes(iotypes, iotype_descs, num_iotypes) + do i=1,num_iotypes + PIO_TF_LOG(0,*) "Testing : PIO_TF_DATA_TYPE : ", iotype_descs(i) + do j=1,NUM_FILES + call PIO_initdecomp(pio_tf_iosystem_, PIO_int, dims, compdof, tf_infos(j)%iodesc) + + write(tmp_idx_str, "(I0)") j + tf_infos(j)%filename = "many_decomps_to_files_test_" // trim(tmp_idx_str) // ".testfile" + ierr = PIO_createfile(pio_tf_iosystem_, tf_infos(j)%pio_file, iotypes(i), tf_infos(j)%filename, PIO_CLOBBER) + PIO_TF_CHECK_ERR(ierr, "Could not create file " // trim(tf_infos(j)%filename)) + + ierr = PIO_def_dim(tf_infos(j)%pio_file, PIO_DIM_NAME, dims(1), tf_infos(j)%pio_dim) + PIO_TF_CHECK_ERR(ierr, "Failed to define a dim : " // trim(tf_infos(j)%filename)) + + do k=1,NUM_VARS + write(tmp_idx_str, "(I0)") k + tf_infos(j)%vnames(k) = PIO_VAR_NAME // trim(tmp_idx_str) + ierr = PIO_def_var(tf_infos(j)%pio_file, tf_infos(j)%vnames(k), PIO_int, (/tf_infos(j)%pio_dim/), tf_infos(j)%pio_var(k)) + PIO_TF_CHECK_ERR(ierr, "Failed to define a var : " // trim(tf_infos(j)%filename)) + end do + + ierr = PIO_enddef(tf_infos(j)%pio_file) + PIO_TF_CHECK_ERR(ierr, "Failed to end redef mode : " // trim(tf_infos(j)%filename)) + + do k=1,NUM_VARS + ! Write the variable out + call PIO_write_darray(tf_infos(j)%pio_file, tf_infos(j)%pio_var(k), tf_infos(j)%iodesc, buf, ierr) + PIO_TF_CHECK_ERR(ierr, "Failed to write darray : " // trim(tf_infos(j)%filename)) + end do + + end do + + do j=1,NUM_FILES + ! For async+threaded, the re-open should sync the file +#ifdef PIO_TEST_CLOSE_OPEN_FOR_SYNC + call PIO_closefile(tf_infos(j)%pio_file) + + ierr = PIO_openfile(pio_tf_iosystem_, tf_infos(j)%pio_file, iotypes(i), tf_infos(j)%filename, PIO_nowrite) + PIO_TF_CHECK_ERR(ierr, "Could not reopen file " // trim(tf_infos(j)%filename)) + + do k=1,NUM_VARS + ierr = PIO_inq_varid(tf_infos(j)%pio_file, tf_infos(j)%vnames(k), tf_infos(j)%pio_var(k)) + PIO_TF_CHECK_ERR(ierr, "Could not inq var1 file2 :" // trim(tf_infos(j)%filename)) + end do +#else + call PIO_syncfile(tf_infos(j)%pio_file) +#endif + do k=1,NUM_VARS + rbuf = 0 + call PIO_read_darray(tf_infos(j)%pio_file, tf_infos(j)%pio_var(k), tf_infos(j)%iodesc, rbuf, ierr) + PIO_TF_CHECK_ERR(ierr, "Failed to read darray : " // trim(tf_infos(j)%filename)) + + PIO_TF_CHECK_VAL((rbuf, buf), "Got wrong val") + end do + + call PIO_closefile(tf_infos(j)%pio_file) + call PIO_deletefile(pio_tf_iosystem_, tf_infos(j)%filename) + call PIO_freedecomp(pio_tf_iosystem_, tf_infos(j)%iodesc) + end do + end do + if(allocated(iotypes)) then + deallocate(iotypes) + deallocate(iotype_descs) + end if +PIO_TF_AUTO_TEST_SUB_END many_decomps_to_files