Source code for ClearMap.processors.generic_tab_processor
"""
This module contains the generic processor classes that are used to define the processing steps and run the processing
This is inherited by all processors in ClearMap
"""
import os
import sys
import warnings
from concurrent.futures.process import BrokenProcessPool
[docs]
class ProcessorSteps:
def __init__(self, workspace, postfix=''):
self.postfix = postfix
self.workspace = workspace
@property
def steps(self):
raise NotImplementedError
[docs]
def path_from_step_name(self, step_name):
raise NotImplementedError
@property
def existing_steps(self):
return [s for s in self.steps if self.step_exists(s)]
@property
def last_step(self):
return self.existing_steps[-1]
[docs]
def get_next_steps(self, step_name):
return self.steps[self.steps.index(step_name)+1:]
[docs]
def step_exists(self, step_name):
return os.path.exists(self.path_from_step_name(step_name))
[docs]
def remove_next_steps_files(self, target_step_name):
for step_name in self.get_next_steps(target_step_name):
f_path = self.path_from_step_name(step_name)
if os.path.exists(f_path):
warnings.warn(f"WARNING: Remove previous step {step_name}, file {f_path}")
os.remove(f_path)
[docs]
def path(self, step, step_back=False, n_before=0):
if n_before:
step = self.steps[self.steps.index(step) - n_before]
f_path = self.path_from_step_name(step)
if not os.path.exists(f_path):
if step_back: # FIXME: steps back only once ??
f_path = self.path(self.steps[self.steps.index(step) - 1])
else:
raise IndexError(f'Could not find path "{f_path}" and not allowed to step back')
return f_path
[docs]
class TabProcessor:
def __init__(self):
self.stopped = False
self.progress_watcher = None
self.workspace = None
self.machine_config = {}
[docs]
def set_progress_watcher(self, watcher):
self.progress_watcher = watcher
[docs]
def update_watcher_progress(self, val):
if self.progress_watcher is not None:
self.progress_watcher.increment(val)
[docs]
def update_watcher_main_progress(self, val=1):
if self.progress_watcher is not None:
self.progress_watcher.increment_main_progress(val)
[docs]
def set_watcher_step(self, step_name):
if self.progress_watcher is not None:
self.progress_watcher.main_step_name = step_name
[docs]
def prepare_watcher_for_substep(self, counter_size, pattern, title, increment_main=False):
"""
Prepare the progress watcher for the coming processing step. The watcher will in turn signal changes to the
progress bar
Arguments
---------
counter_size: int
The progress bar maximum
pattern: str or re.Pattern or (str, re.Pattern)
The string to search for in the log to signal an increment of 1
title: str
The title of the step for the progress bar
increment_main: bool
Whether a new step should be added to the main progress bar
"""
if self.progress_watcher is not None:
self.progress_watcher.prepare_for_substep(counter_size, pattern, title)
if increment_main:
self.update_watcher_main_progress()
[docs]
def stop_process(self): # REFACTOR: put in parent class ??
self.stopped = True
if hasattr(self.workspace, 'executor') and self.workspace.executor is not None:
if sys.version_info[:2] >= (3, 9):
print('Canceling process')
self.workspace.executor.shutdown(cancel_futures=True) # The new clean version
else:
self.workspace.executor.immediate_shutdown() # Dirty but we have no choice in python < 3.9
self.workspace.executor = None
# raise BrokenProcessPool
elif hasattr(self.workspace, 'process') and self.workspace.process is not None:
self.workspace.process.terminate()
# self.workspace.process.wait()
self.workspace.process = None
raise CanceledProcessing
@property
def verbose(self):
return self.machine_config['verbosity'] == 'debug'
[docs]
def run(self):
raise NotImplementedError
# def setup(self):
# pass
[docs]
class CanceledProcessing(BrokenProcessPool): # TODO: better inheritance
pass