Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 59 additions & 45 deletions weather_sp/splitter_pipeline/file_splitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class FileSplitter(abc.ABC):

def __init__(self, input_path: str, output_info: OutFileInfo,
force_split: bool = False, logging_level: int = logging.INFO,
grib_filter_expression: t.Optional[str] = None):
grib_filter_expression: t.Optional[str] = None, skip_on_invalid_grib_filter_expression:bool = False):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please run a linter over the files.

self.input_path = input_path
self.output_info = output_info
self.force_split = force_split
Expand All @@ -81,6 +81,7 @@ def __init__(self, input_path: str, output_info: OutFileInfo,
self.logger.debug('Splitter for path=%s, output base=%s',
self.input_path, self.output_info)
self.grib_filter_expression = grib_filter_expression
self.skip_on_invalid_grib_filter_expression = skip_on_invalid_grib_filter_expression

@abc.abstractmethod
def split_data(self) -> None:
Expand Down Expand Up @@ -207,7 +208,7 @@ def split_data(self) -> None:
grib_copy_cmd = shutil.which('grib_copy')
grib_get_cmd = shutil.which('grib_get')
uniq_cmd = shutil.which('uniq')
for cmd, name in [(grib_get_cmd, 'grib_copy'), (grib_get_cmd, 'grib_get'), (uniq_cmd, 'uniq')]:
for cmd, name in [(grib_copy_cmd, 'grib_copy'), (grib_get_cmd, 'grib_get'), (uniq_cmd, 'uniq')]:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this change in a separate PR please!

if not cmd:
raise EnvironmentError(f'binary {name!r} is not available in the current environment!')

Expand All @@ -227,49 +228,61 @@ def split_data(self) -> None:
# This ensures dims like time are represented as 0600 instead of 600.
split_dims_arg = ','.join(f'{dim}:s' for dim in split_dims)
with self._copy_to_local_file() as local_file:
self.logger.info('Skipping as needed...')
# Append -w flag to filter GRIB messages matching the given expression
if self.grib_filter_expression:
grib_get_args = [grib_get_cmd, '-p', split_dims_arg, '-w', self.grib_filter_expression, local_file.name]
else:
grib_get_args = [grib_get_cmd, '-p', split_dims_arg, local_file.name]
grib_get_process = subprocess.Popen(grib_get_args, stdout=subprocess.PIPE)
uniq_output = subprocess.check_output((uniq_cmd,), stdin=grib_get_process.stdout)
output_paths = []
skipped_paths = []
for line in uniq_output.decode('utf-8').rstrip('\n').split('\n'):
splits = dict(zip(split_dims, line.split(' ')))
output_path = self.output_info.formatted_output_path(splits)
if self.should_skip_file(output_path):
skipped_paths.append(output_path)
continue
output_paths.append(output_path)
if not output_paths:
metrics.Metrics.counter('file_splitters', 'skipped').inc()
self.logger.info('Skipping %s, file already split into: %s',
repr(self.input_path), ', '.join(skipped_paths))
return

with tempfile.TemporaryDirectory() as tmpdir:
self.logger.info('Performing split.')
dest = os.path.join(tmpdir, flat_output_template)
try:
self.logger.info('Skipping as needed...')
# Append -w flag to filter GRIB messages matching the given expression
if self.grib_filter_expression:
subprocess.run([grib_copy_cmd, "-w",
self.grib_filter_expression,
local_file.name, dest], check=True)
grib_get_args = [grib_get_cmd, '-p', split_dims_arg, '-w', self.grib_filter_expression, local_file.name]
else:
subprocess.run([grib_copy_cmd, local_file.name, dest],
check=True)

self.logger.info('Uploading %r...', self.input_path)
for flat_target in os.listdir(tmpdir):
dest_file_path = f'{prefix}{flat_target.replace(delimiter, slash)}'
self.logger.info([prefix, dest_file_path, local_file.name,
self.output_info.unformatted_output_path()])

copy(os.path.join(tmpdir, flat_target), dest_file_path)
self.logger.info('Finished uploading %r', self.input_path)

grib_get_args = [grib_get_cmd, '-p', split_dims_arg, local_file.name]
grib_get_process = subprocess.Popen(grib_get_args, stdout=subprocess.PIPE)
uniq_output = subprocess.check_output((uniq_cmd,), stdin=grib_get_process.stdout)
output_paths = []
skipped_paths = []
for line in uniq_output.decode('utf-8').rstrip('\n').split('\n'):
splits = dict(zip(split_dims, line.split(' ')))
output_path = self.output_info.formatted_output_path(splits)
if self.should_skip_file(output_path):
skipped_paths.append(output_path)
continue
output_paths.append(output_path)
if not output_paths:
metrics.Metrics.counter('file_splitters', 'skipped').inc()
self.logger.info('Skipping %s, file already split into: %s',
repr(self.input_path), ', '.join(skipped_paths))
return

with tempfile.TemporaryDirectory() as tmpdir:
self.logger.info('Performing split.')
dest = os.path.join(tmpdir, flat_output_template)
if self.grib_filter_expression:
subprocess.run([grib_copy_cmd, "-w",
self.grib_filter_expression,
local_file.name, dest], check=True)
else:
subprocess.run([grib_copy_cmd, local_file.name, dest],
check=True)

self.logger.info('Uploading %r...', self.input_path)
for flat_target in os.listdir(tmpdir):
dest_file_path = f'{prefix}{flat_target.replace(delimiter, slash)}'
self.logger.info([prefix, dest_file_path, local_file.name,
self.output_info.unformatted_output_path()])

copy(os.path.join(tmpdir, flat_target), dest_file_path)
self.logger.info('Finished uploading %r', self.input_path)
except Exception as e:
log_msg = (
f"GRIB tool failed for {self.input_path!r}. This means the requested "

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits:

  • GribSplitterV2 failed for ...
  • We can omit This means the requested filter expression {self.grib_filter_expression!r} does not exist in this file. and just log the error instead.

f"filter expression {self.grib_filter_expression!r} does not exist in this file. "
f"Error: {e}"
)
if self.skip_on_invalid_grib_filter_expression:
self.logger.warning(f"{log_msg} | Flag 'skip_on_invalid_grib_filter_expression' is True. Skipping file.")

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the log level as error.

nit: Can omit | Flag 'skip_on_invalid_grib_filter_expression' is True. Skipping file.

return
else:
self.logger.error(f"{log_msg} | Flag 'skip_on_invalid_grib_filter_expression' is False. Error raised")
raise

class NetCdfSplitter(FileSplitter):

Expand Down Expand Up @@ -353,7 +366,8 @@ def get_splitter(file_path: str,
dry_run: bool,
force_split: bool = False,
logging_level: int = logging.INFO,
grib_filter_expression: t.Optional[str] = None) -> FileSplitter:
grib_filter_expression: t.Optional[str] = None,
skip_on_invalid_grib_filter_expression: bool = False) -> FileSplitter:
if dry_run:
logger.info('Using splitter: DrySplitter')
return DrySplitter(file_path, output_info, logging_level=logging_level)
Expand All @@ -371,7 +385,7 @@ def get_splitter(file_path: str,
if cmd:
logger.info('Using splitter: GribSplitterV2')
return GribSplitterV2(file_path, output_info, force_split,
logging_level, grib_filter_expression)
logging_level, grib_filter_expression, skip_on_invalid_grib_filter_expression)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please run a linter over the files.

else:
logger.info('Using splitter: GribSplitter')
return GribSplitter(file_path, output_info, force_split,
Expand Down
12 changes: 10 additions & 2 deletions weather_sp/splitter_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def split_file(input_file: str,
dry_run: bool,
force_split: bool = False,
logging_level: int = logging.INFO,
grib_filter_expression: t.Optional[str] = None):
grib_filter_expression: t.Optional[str] = None,
skip_on_invalid_grib_filter_expression: bool = False):
output_base_name = get_output_base_name(input_path=input_file,
input_base=input_base_dir,
output_template=output_template,
Expand All @@ -61,7 +62,8 @@ def split_file(input_file: str,
dry_run,
force_split,
level,
grib_filter_expression)
grib_filter_expression,
skip_on_invalid_grib_filter_expression)
splitter.split_data()


Expand Down Expand Up @@ -134,6 +136,10 @@ def run(argv: t.List[str], save_main_session: bool = True):
'specifically supported by the GribSplitterV2'
'implementation.'
'Example: typeOfLevel=isobaricInhPa,level=1000')
parser.add_argument('--skip-on-invalid-grib-filter-expression', action='store_true', default=False,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this flag's name is a bit ambiguous. Rather than validating whether the GRIB filter expression itself is invalid, it seems we are just toggling how the pipeline handles a failure (crashing vs. logging and continuing).

Suggestion: A better name would be suppress_splitter_pipeline_failures.

help='If provided (True), files that do not contain the key-values specified '
'in the --where filter will be logged and skipped. By default (False), '
'the pipeline will raise an error and break.')
parser.add_argument('--topic', type=str, default=None,
help='Pub/Sub topic to read from for streaming mode.')
parser.add_argument('--subscription', type=str, default=None,
Expand Down Expand Up @@ -162,6 +168,7 @@ def run(argv: t.List[str], save_main_session: bool = True):
formatting = known_args.formatting
dry_run = known_args.dry_run
grib_filter_expression = known_args.where
skip_on_invalid_grib_filter_expression = known_args.skip_on_invalid_grib_filter_expression

if not output_template and not output_dir:
raise ValueError('No output specified')
Expand Down Expand Up @@ -214,6 +221,7 @@ def run(argv: t.List[str], save_main_session: bool = True):
known_args.force,
known_args.log_level,
grib_filter_expression,
skip_on_invalid_grib_filter_expression,
)
)

Expand Down
Loading