Runner#
- class coffea.processor.Runner(executor: ~coffea.processor.executor.ExecutorBase, pre_executor: ~coffea.processor.executor.ExecutorBase | None = None, chunksize: int = 100000, maxchunks: int | None = None, metadata_cache: ~collections.abc.MutableMapping | None = None, skipbadfiles: bool | tuple[type[BaseException], ...] = False, xrootdtimeout: int | None = 60, align_clusters: bool = False, savemetrics: bool = False, use_result_type: bool = False, schema: ~coffea.nanoevents.schemas.base.BaseSchema | None = <class 'coffea.nanoevents.schemas.nanoaod.NanoAODSchema'>, processor_compression: int = 1, format: str = 'root', checkpointer: ~coffea.processor.checkpointer.CheckpointerABC | None = None, cachestrategy: None | ~typing.Literal['dask-worker'] | ~collections.abc.Callable[[...], ~collections.abc.MutableMapping] = None)[source]#
Bases:
objectA tool to run a processor using uproot for data delivery
A convenience wrapper to submit jobs for a file set, which is a dictionary of dataset: [file list] entries. Supports only uproot TTree reading, via NanoEvents. For more customized processing, e.g. to read other objects from the files and pass them into data frames, one can write a similar function in their user code.
- Parameters:
executor (
ExecutorBase instance) – Executor, which implements a callable with inputs: items, function, accumulator and performs some action equivalent to:for item in items: accumulator += function(item)pre_executor (
ExecutorBase instance) – Executor, used to calculate fileset metadata Defaults to executorchunksize (
int, optional) – Maximum number of entries to process at a time in the data frame, default: 100kmaxchunks (
int, optional) – Maximum number of chunks to process per dataset Defaults to processing the whole datasetmetadata_cache (
mapping, optional) – A dict-like object to use as a cache for (file, tree) metadata that is used to determine chunking. Defaults to a in-memory LRU cache that holds 100k entries (about 1MB depending on the length of filenames, etc.) If you edit an input file (please don’t) during a session, the session can be restarted to clear the cache.checkpointer (
CheckpointerABC, optional) – A CheckpointerABC instance to manage checkpointing of each chunk outputuse_result_type (
bool, optional) – If True,__call__returnsOk(output)orErr(exception)instead of raising. Requiresskipbadfilesto be set (Trueor a tuple of exception types); the same set of exception types controls which errors get captured asErr— anything outside that set still propagates. If False (default), returns the output directly and raises on error.
Attributes Summary
Methods Summary
__call__(fileset, processor_instance, *[, ...])Run the processor_instance on a given fileset
automatic_retries(retries, skipbadfiles, ...)This should probably defined on Executor-level.
get_cache(cachestrategy)metadata_fetcher_parquet(item)metadata_fetcher_root(xrootdtimeout, ...)preprocess(fileset, *[, treename, ...])Preprocess the fileset and generate work items
run(fileset, processor_instance, *[, ...])Run the processor_instance on a given fileset
Attributes Documentation
- cachestrategy: None | Literal['dask-worker'] | Callable[[...], MutableMapping] = None#
- checkpointer: CheckpointerABC | None = None#
- executor: ExecutorBase = <dataclasses._MISSING_TYPE object>#
- metadata_cache: MutableMapping | None = None#
- retries#
- skipbadfiles: bool | tuple[type[BaseException], ...] = False#
- use_dataframes#
Methods Documentation
-
__call__(fileset: dict, processor_instance: ProcessorABC | Callable[[
awkward.Array], Any], *, treename: str | None = None, uproot_options: dict | None = {}, iteritems_options: dict | None = {}, trace: Callable | None = None) Result | Addable | MutableSet | MutableMapping[source]# Run the processor_instance on a given fileset
- Parameters:
fileset (
dict) – A dictionary{dataset: [file, file], }Optionally, if some files’ tree name differ, the dictionary can be specified:{dataset: {'treename': 'name', 'files': [file, file]}, }You can also define branches to preload per dataset:{dataset: {'preload': ['branch1', 'branch2'], 'files': [file, file]}, }processor_instance (
ProcessorABCorCallable) – An instance of a class deriving from ProcessorABC or a single-argument callabletreename (
str) – name of tree inside each root file, can beNone; treename can also be defined in fileset, which will override the passed treenameuproot_options (
dict, optional) – Any options to pass touproot.openiteritems_options (
dict, optional) – Any options to pass totree.iteritemstrace (
Callable, optional) – A tracing function that determines which columns a processing function accesses. It takes two arguments — a processing function (accepting events) and a NanoEvents array — and returns an iterable of column name strings. Seecoffea.nanoevents.trace.tracefor the default implementation. When provided and preprocessing is needed, tracing is performed and takes precedence over fileset-levelpreload.
- Returns:
When
use_result_type=True, returnsOk(output)orErr(exception). Whenuse_result_type=False(default), returns the output directly and raises on error. Whensavemetrics=True, the output value is(output, metrics).- Return type:
ResultorAccumulatable
- static automatic_retries(retries: int, skipbadfiles: bool | tuple[type[BaseException], ...], func, *args, use_result_type: bool = False, **kwargs)[source]#
This should probably defined on Executor-level.
- static metadata_fetcher_root(xrootdtimeout: int, align_clusters: bool, uproot_options: dict, item: FileMeta) Addable | MutableSet | MutableMapping[source]#
- preprocess(fileset: dict, *, treename: str | None = None, uproot_options: dict | None = {}, trace: Callable | None = None, processor_instance: ProcessorABC | Callable | None = None) Generator[source]#
Preprocess the fileset and generate work items
- Parameters:
fileset (
dict) – A dictionary{dataset: [file, file], }Optionally, if some files’ tree name differ, the dictionary can be specified:{dataset: {'treename': 'name', 'files': [file, file]}, }You can also define a different tree name per file in the dictionary:{dataset: {'files': {file: 'name'}}, }You can also define branches to preload per dataset:{dataset: {'preload': ['branch1', 'branch2'], 'files': [file, file]}, }treename (
str) – name of tree inside each root file, can beNone; treename can also be defined in fileset, which will override the passed treenameuproot_options (
dict, optional) – Any options to pass touproot.opentrace (
Callable, optional) – A tracing function that determines which columns a processing function accesses. It takes two arguments — a processing function (accepting events) and a NanoEvents array — and returns an iterable of column name strings. Seecoffea.nanoevents.trace.tracefor the default implementation. When provided,processor_instancemust also be given. Tracing is performed using the first openable file per dataset and the result overrides anypreloadspecified in the fileset. IfNone, no tracing is performed and only fileset-levelpreload(if any) is used.processor_instance (
ProcessorABCorCallable, optional) – The processor whose column access will be traced. Required whentraceis provided.
- Returns:
chunks – A generator yielding
WorkItemchunks ready to be passed torun(). The caller may.send(new_chunksize)to adjust the target chunksize on the fly; the generator’s return value is the final chunksize used.- Return type:
Generator[WorkItem,int,int]
-
run(fileset: dict | str | list[WorkItem] | Generator, processor_instance: ProcessorABC | Callable[[
awkward.Array], Any], *, treename: str | None = None, uproot_options: dict | None = {}, iteritems_options: dict | None = {}, trace: Callable | None = None) Result | Addable | MutableSet | MutableMapping[source]# Run the processor_instance on a given fileset
- Parameters:
fileset (
dict | str | List[WorkItem] | Generator) –Fileset can be one of the following:
A dictionary
{dataset: [file, file], }Optionally, if some files’ tree name differ, the dictionary can be specified:{dataset: {'treename': 'name', 'files': [file, file]}, }You can also define a different tree name per file in the dictionary:{dataset: {'files': {file: 'name'}}, }You can also define branches to preload per dataset:{dataset: {'preload': ['branch1', 'branch2'], 'files': [file, file]}, }A single file name
File chunks for self.preprocess()
Chunk generator
processor_instance (
ProcessorABCorCallable) – An instance of a class deriving from ProcessorABC or a single-argument callabletreename (
str, optional) – name of tree inside each root file, can beNone; treename can also be defined in fileset, which will override the passed treename Not needed if processing premade chunksuproot_options (
dict, optional) – Any options to pass touproot.openiteritems_options (
dict, optional) – Any options to pass totree.iteritemstrace (
Callable, optional) – A tracing function that determines which columns a processing function accesses. It takes two arguments — a processing function (accepting events) and a NanoEvents array — and returns an iterable of column name strings. Seecoffea.nanoevents.trace.tracefor the default implementation. When provided and preprocessing is needed, tracing is performed and takes precedence over fileset-levelpreload.
- Returns:
When
use_result_type=True, returnsOk(output)on success andErr(exception)on failure (exceptions are captured, not raised). Whenuse_result_type=False(default), returns the raw output dict and exceptions propagate. See__call__for the user-facing output withsavemetrics/use_dataframesextraction applied.- Return type:
ResultorAccumulatable