Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/clib/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ message(STATUS "===== Configuring SCORPIO C Core... =====")
set (spio_core_src
pio_spmd.cpp
pioc_support.cpp
spio_iodesc_utils.cpp
pioc.cpp
pio_nc.cpp
pio_getput_int.cpp
Expand Down
63 changes: 40 additions & 23 deletions src/clib/core/iolib/hdf5/spio_async_hdf5_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,50 @@ struct Hdf5_wcache{
int nvars;
int fndims;
std::vector<int> varids;
io_desc_t *iodesc;
std::shared_ptr<io_desc_t> iodesc;
std::vector<int> frame;

bool wr_fillbuf;
void *iobuf;
std::size_t iobuf_sz;
void *fillbuf;
std::size_t fillbuf_sz;

Hdf5_wcache(file_desc_t *file, int nvars, int fndims,
const int *varids, std::shared_ptr<io_desc_t> iodesc, bool wr_fillbuf, const int *frame);
~Hdf5_wcache();
};

Hdf5_wcache::Hdf5_wcache(file_desc_t *file, int nvars, int fndims,
const int *varids, std::shared_ptr<io_desc_t> iodesc, bool wr_fillbuf, const int *frame):
file(file), nvars(nvars), fndims(fndims), iodesc(iodesc),
wr_fillbuf(wr_fillbuf), iobuf(NULL), iobuf_sz(0),
fillbuf(NULL), fillbuf_sz(0)
{
assert(file && (nvars > 0) && (fndims > 0) && varids && iodesc);

/* Cache varids and frames (one frame, the frame being written,
* for each varid)
*/
this->varids.resize(nvars);
std::copy(varids, varids + nvars, this->varids.begin());

if(frame){
this->frame.resize(nvars);
std::copy(frame, frame + nvars, this->frame.begin());
}

/* FIXME: Copy/init iobuf and fillbuf here */
}

Hdf5_wcache::~Hdf5_wcache()
{
/* Don't delete cached iodesc ptr (we don't own it) */

if(iobuf) { brel(iobuf); iobuf = NULL; }
if(fillbuf) { brel(fillbuf); fillbuf = NULL; }
}

/* Global vars */
std::atomic<int> SPIO_Util::GVars::npend_hdf5_async_ops;

Expand Down Expand Up @@ -566,7 +600,7 @@ int pio_iosys_async_op_hdf5_write(void *pdata)
file_desc_t *file = wcache->file;
int nvars = wcache->nvars;
int fndims = wcache->fndims;
io_desc_t *iodesc = wcache->iodesc;
io_desc_t *iodesc = wcache->iodesc.get();

assert(file && (nvars > 0) && (fndims > 0) && iodesc);
assert((file->iotype == PIO_IOTYPE_HDF5) || (file->iotype == PIO_IOTYPE_HDF5C));
Expand Down Expand Up @@ -787,25 +821,15 @@ void pio_iosys_async_op_hdf5_write_free(void *pdata)
Hdf5_wcache *wcache = static_cast<struct Hdf5_wcache *>(pdata);
assert(wcache);

/* Using swap trick to free vectors
* - swap vector with an empty local/temp vector that gets deallocated when func exits
*/
//wcache->varids.clear();
std::vector<int>().swap(wcache->varids);
//wcache->frame.clear();
std::vector<int>().swap(wcache->frame);

if(wcache->iobuf){ brel(wcache->iobuf); }
if(wcache->fillbuf){ brel(wcache->fillbuf); }

free(wcache);
delete(wcache);
#else // _HDF5
assert(0);
#endif // _HDF5
}

int pio_iosys_async_hdf5_write_op_add(file_desc_t *file, int nvars, int fndims,
const int *varids, io_desc_t *iodesc, int fill, const int *frame)
const int *varids, std::shared_ptr<io_desc_t> iodesc, int fill, const int *frame)
{
#ifdef _HDF5
int ret = PIO_NOERR;
Expand All @@ -819,15 +843,8 @@ int pio_iosys_async_hdf5_write_op_add(file_desc_t *file, int nvars, int fndims,
return PIO_NOERR;
}

std::vector<int> vids(varids, varids + nvars);
std::vector<int> frms;
if(frame){
frms.resize(nvars);
std::copy(frame, frame + nvars, frms.begin());
}

Hdf5_wcache *wcache = static_cast<Hdf5_wcache *>(calloc(1, sizeof(Hdf5_wcache)));
*wcache = {file, nvars, fndims, vids, iodesc, frms, (fill) ? true : false, NULL, 0, NULL, 0};
Hdf5_wcache *wcache = new Hdf5_wcache(file, nvars, fndims, varids, iodesc,
(fill) ? true : false, frame);

/* We need to copy the iobuf/fillbuf since the mvcache gets reused for future writes */
/* Copy iobuf/fillvalue */
Expand Down
3 changes: 2 additions & 1 deletion src/clib/core/iolib/hdf5/spio_async_hdf5_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "pio_config.h"
#include "pio.h"
#include "pio_internal.h"
#include <memory>

namespace SPIO_Util{
namespace GVars{
Expand All @@ -23,7 +24,7 @@ int spio_wait_all_hdf5_async_ops(int iosysid);
int pio_iosys_async_op_hdf5_write(void *pdata);
void pio_iosys_async_op_hdf5_write_free(void *pdata);
int pio_iosys_async_hdf5_write_op_add(file_desc_t *file, int nvars, int fndims,
const int *varids, io_desc_t *iodesc, int fill, const int *frame);
const int *varids, std::shared_ptr<io_desc_t> iodesc, int fill, const int *frame);
int spio_iosys_async_hdf5_set_frame_op_add(file_desc_t *file, int varid, int frame);

#define __SPIO_ASYNC_HDF5_UTILS_HPP__
Expand Down
15 changes: 11 additions & 4 deletions src/clib/core/pio_darray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "spio_dt_converter.hpp"
#include "spio_async_utils.hpp"
#include <string>
#include <vector>
#include <algorithm>

/* uint64_t definition */
#ifdef _ADIOS2
Expand Down Expand Up @@ -165,11 +167,13 @@ int PIOc_write_darray_multi_impl(int ncid, const int *varids, int ioid, int nvar
"Writing multiple variables to file (%s, ncid=%d) failed. Trying to write to a read only file, try reopening the file in write mode (use the PIO_WRITE flag)", pio_get_fname_from_file(file), ncid);
}

/* Get iodesc. */
if(!(iodesc = pio_get_iodesc_from_id(ioid))){
/* Get cached iodesc. */
std::shared_ptr<io_desc_t> sp_iodesc = spio_get_iodesc_ref_from_file(file, ioid);
if(!sp_iodesc){
return pio_err(ios, file, PIO_EBADID, __FILE__, __LINE__,
"Writing multiple variables to file (%s, ncid=%d) failed. Invalid arguments, invalid PIO decomposition id (%d) provided", pio_get_fname_from_file(file), ncid, ioid);
}
iodesc = sp_iodesc.get();
pioassert(iodesc->rearranger == PIO_REARR_BOX || iodesc->rearranger == PIO_REARR_SUBSET || iodesc->rearranger == PIO_REARR_CONTIG,
"unknown rearranger", __FILE__, __LINE__);

Expand Down Expand Up @@ -411,7 +415,7 @@ int PIOc_write_darray_multi_impl(int ncid, const int *varids, int ioid, int nvar
case PIO_IOTYPE_HDF5:
case PIO_IOTYPE_HDF5C:
#if PIO_USE_ASYNC_WR_THREAD
ierr = pio_iosys_async_hdf5_write_op_add(file, nvars, fndims, varids, iodesc,
ierr = pio_iosys_async_hdf5_write_op_add(file, nvars, fndims, varids, sp_iodesc,
DARRAY_DATA, frame);
if(ierr != PIO_NOERR){
return pio_err(ios, file, ierr, __FILE__, __LINE__,
Expand Down Expand Up @@ -503,7 +507,7 @@ int PIOc_write_darray_multi_impl(int ncid, const int *varids, int ioid, int nvar
case PIO_IOTYPE_HDF5:
case PIO_IOTYPE_HDF5C:
#if PIO_USE_ASYNC_WR_THREAD
ierr = pio_iosys_async_hdf5_write_op_add(file, nvars, fndims, varids, iodesc,
ierr = pio_iosys_async_hdf5_write_op_add(file, nvars, fndims, varids, sp_iodesc,
DARRAY_FILL, frame);
if(ierr != PIO_NOERR){
return pio_err(ios, file, ierr, __FILE__, __LINE__,
Expand Down Expand Up @@ -2470,6 +2474,9 @@ int PIOc_write_darray_impl(int ncid, int varid, int ioid, PIO_Offset arraylen, c
}
wmb->num_arrays++;

/* Cache a ref to the iodesc in the file */
spio_add_iodesc_ref_to_file(file, ioid);

LOG((2, "wmb->num_arrays = %d iodesc->maxbytes / iodesc->mpitype_size = %d "
"iodesc->ndof = %d iodesc->llen = %d", wmb->num_arrays,
iodesc->maxbytes / iodesc->mpitype_size, iodesc->ndof, iodesc->llen));
Expand Down
36 changes: 36 additions & 0 deletions src/clib/core/pio_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <thread>
#include <chrono>
#include <string>
#include <algorithm>
#include "spio_hdf5_utils.hpp"
#include "spio_async_tcomm.hpp"

Expand Down Expand Up @@ -477,6 +478,41 @@ int spio_wait_on_hard_close(iosystem_desc_t *ios, file_desc_t *file)
return PIO_NOERR;
}

void spio_add_iodesc_ref_to_file(file_desc_t *file, int ioid)
{
assert(file && file->pmtx && file->io_desc_refs);

/* Get the lock before proceeding */
std::lock_guard<std::mutex> lg(*(file->pmtx));

/* Don't add duplicates, just one ref to iodesc used by file.
* When cached iodesc is used, for writing data, its used for all
* cached variables at once. So we just need one ref to iodesc here
*/
std::shared_ptr<io_desc_t> iodesc = pio_get_iodesc_sptr_from_id(ioid);
file->io_desc_refs->insert({ioid, iodesc});
}

std::shared_ptr<io_desc_t> spio_get_iodesc_ref_from_file(file_desc_t *file, int ioid)
{
assert(file && file->pmtx && file->io_desc_refs);

/* Get the lock before proceeding */
std::lock_guard<std::mutex> lg(*(file->pmtx));

/* Search through the cached I/O descs */
std::map<int, std::shared_ptr<io_desc_t> >::iterator iter = file->io_desc_refs->find(ioid);

if(iter != file->io_desc_refs->end()){
/* The caller now has ownership of this iodesc */
std::shared_ptr<io_desc_t> sp = iter->second;
file->io_desc_refs->erase(iter);
return sp;
}

return nullptr;
}

/* Close the file ("hard close")
* @param ios: Pointer to the iosystem_desc
* @param file: Pointer to the file_desc for the file
Expand Down
70 changes: 15 additions & 55 deletions src/clib/core/pioc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ static int subset_rearranger_init(iosystem_desc_t *ios, io_desc_t *iodesc, const
int ret = PIO_NOERR;

assert(ios && iodesc);
assert(iodesc->map && iodesc->dimlen);
assert(((iodesc->maplen == 0) || iodesc->map) && iodesc->dimlen);

iodesc->num_aiotasks = ios->num_iotasks;
LOG((2, "creating subset rearranger iodesc->num_aiotasks = %d", iodesc->num_aiotasks));
Expand All @@ -552,7 +552,7 @@ static int box_rearranger_init(iosystem_desc_t *ios, io_desc_t *iodesc, const PI
{
int ret = PIO_NOERR;
assert(ios && iodesc);
assert(iodesc->map && iodesc->dimlen && iodesc->firstregion);
assert(((iodesc->maplen == 0) || iodesc->map) && ((iodesc->ndims == 0) || iodesc->dimlen) && iodesc->firstregion);

if(ios->ioproc){
/* Unless the user specifies the start and count for each
Expand Down Expand Up @@ -652,7 +652,7 @@ static int contig_rearranger_init(iosystem_desc_t *ios, io_desc_t *iodesc, const
int ret = PIO_NOERR;

assert(ios && iodesc);
assert(iodesc->map && iodesc->dimlen);
assert(((iodesc->maplen == 0) || iodesc->map) && ((iodesc->ndims == 0) || iodesc->dimlen));

if(iostart && iocount){
/* FIXME: We should at least convert iostart/iocount arrays to decomp maps */
Expand Down Expand Up @@ -746,7 +746,6 @@ static int initdecomp(int iosysid, int pio_type, int ndims, const int *gdimlen,
const PIO_Offset *iostart, const PIO_Offset *iocount, bool map_zero_based=false)
{
iosystem_desc_t *ios; /* Pointer to io system information. */
io_desc_t *iodesc; /* The IO description. */
int mpierr = MPI_SUCCESS; /* Return code from MPI function calls. */
int ierr; /* Return code. */

Expand Down Expand Up @@ -810,22 +809,7 @@ static int initdecomp(int iosysid, int pio_type, int ndims, const int *gdimlen,
}
}

/* Allocate space for the iodesc info. This also allocates the
* first region and copies the rearranger opts into this
* iodesc. */
if((ierr = malloc_iodesc(ios, pio_type, ndims, maplen, &iodesc))){
return pio_err(ios, NULL, ierr, __FILE__, __LINE__,
"Initializing the PIO decomposition failed. Out of memory allocating memory for I/O descriptor (ndims = %d, maplen = %d)", ndims, maplen);
}

/* Set the rearranger. */
if(!rearranger){
iodesc->rearranger = ios->default_rearranger;
}
else{
iodesc->rearranger = *rearranger;
}
LOG((2, "iodesc->rearranger = %d", iodesc->rearranger));
int rearr = (rearranger) ? *rearranger : ios->default_rearranger;

/* In scenarios involving ultra-high resolution E3SM/SCREAM cases,
* the default BOX rearranger may encounter significant performance
Expand All @@ -844,45 +828,20 @@ static int initdecomp(int iosysid, int pio_type, int ndims, const int *gdimlen,
* potentially negating the benefits of reduced initialization time
* associated with this approach.
*/
if(iodesc->rearranger == PIO_REARR_ANY){
iodesc->rearranger = spio_get_opt_pio_rearr(ios, maplen);
}

/* Cache the local decomposition map */
if(map_zero_based){
/* BOX and SUBSET rearrangers expect map to the 1-based */
if((iodesc->rearranger == PIO_REARR_BOX) || (iodesc->rearranger == PIO_REARR_SUBSET)){
std::transform(compmap, compmap + maplen, iodesc->map,
[](PIO_Offset off) { return off + 1; });
}
else{
std::copy(compmap, compmap + maplen, iodesc->map);
}
}
else{
/* The decomposition map is 1-based */
if(iodesc->rearranger == PIO_REARR_CONTIG){
/* CONTIG rearranger expects map to be 0-based */
std::transform(compmap, compmap + maplen, iodesc->map,
[](PIO_Offset off) { return off - 1; });
}
else{
std::copy(compmap, compmap + maplen, iodesc->map);
}
}
if(rearr == PIO_REARR_ANY) { rearr = spio_get_opt_pio_rearr(ios, maplen); }

/* Cache the dimension lengths */
std::copy(gdimlen, gdimlen + ndims, iodesc->dimlen);
std::shared_ptr<io_desc_t> iodesc = std::make_shared<io_desc_t>(ios, pio_type,
ndims, gdimlen, maplen, compmap, rearr, map_zero_based);

/* Initialize the rearranger */
if(iodesc->rearranger == PIO_REARR_SUBSET){
ierr = subset_rearranger_init(ios, iodesc, iostart, iocount);
ierr = subset_rearranger_init(ios, iodesc.get(), iostart, iocount);
}
else if(iodesc->rearranger == PIO_REARR_BOX){
ierr = box_rearranger_init(ios, iodesc, iostart, iocount);
ierr = box_rearranger_init(ios, iodesc.get(), iostart, iocount);
}
else if(iodesc->rearranger == PIO_REARR_CONTIG){
ierr = contig_rearranger_init(ios, iodesc, iostart, iocount);
ierr = contig_rearranger_init(ios, iodesc.get(), iostart, iocount);
}
else{
return pio_err(ios, NULL, ierr, __FILE__, __LINE__,
Expand All @@ -905,20 +864,21 @@ static int initdecomp(int iosysid, int pio_type, int ndims, const int *gdimlen,
*/
comm = ios->union_comm;
}

*ioidp = pio_add_to_iodesc_list(iodesc, comm);

#if PIO_SAVE_DECOMPS
if(pio_save_decomps_regex_match(*ioidp, NULL, NULL)){
std::string filename;
pio_create_uniq_str(ios, iodesc, filename, "piodecomp", ".dat");
pio_create_uniq_str(ios, iodesc.get(), filename, "piodecomp", ".dat");

LOG((2, "Saving decomp map to %s", filename.c_str()));
PIOc_writemap_impl(filename.c_str(), *ioidp, ndims, gdimlen, maplen, (PIO_Offset *)compmap, ios->my_comm);

std::string log_fname;
pio_create_uniq_str(ios, iodesc, log_fname, "piodecomp", ".nc");
pio_create_uniq_str(ios, iodesc.get(), log_fname, "piodecomp", ".nc");
SPIO_Util::Decomp_Util::Decomp_logger *logger = SPIO_Util::Decomp_Util::create_decomp_logger(ios->comp_comm, log_fname);
(*logger).write_only().open().put(iodesc).close();
(*logger).write_only().open().put(iodesc.get()).close();
delete logger;

iodesc->is_saved = true;
Expand All @@ -928,7 +888,7 @@ static int initdecomp(int iosysid, int pio_type, int ndims, const int *gdimlen,
#endif

#if PIO_ENABLE_LOGGING
dbg_log_iodesc(ios, iodesc);
dbg_log_iodesc(ios, iodesc.get());
#endif /* PIO_ENABLE_LOGGING */

return PIO_NOERR;
Expand Down
Loading
Loading