Runner#
- class coffea.processor.Runner(executor: ~coffea.processor.executor.ExecutorBase, pre_executor: ~coffea.processor.executor.ExecutorBase | None = None, chunksize: int = 100000, maxchunks: int | None = None, metadata_cache: ~collections.abc.MutableMapping | None = None, skipbadfiles: bool | tuple[type[BaseException], ...] = False, xrootdtimeout: int | None = 60, align_clusters: bool = False, savemetrics: bool = False, schema: ~coffea.nanoevents.schemas.base.BaseSchema | None = <class 'coffea.nanoevents.schemas.nanoaod.NanoAODSchema'>, processor_compression: int = 1, format: str = 'root', checkpointer: ~coffea.processor.checkpointer.CheckpointerABC | None = None, cachestrategy: None | ~typing.Literal['dask-worker'] | ~collections.abc.Callable[[...], ~collections.abc.MutableMapping] = None)[source]#
Bases:
objectA tool to run a processor using uproot for data delivery
A convenience wrapper to submit jobs for a file set, which is a dictionary of dataset: [file list] entries. Supports only uproot TTree reading, via NanoEvents. For more customized processing, e.g. to read other objects from the files and pass them into data frames, one can write a similar function in their user code.
- Parameters:
executor (
ExecutorBase instance) – Executor, which implements a callable with inputs: items, function, accumulator and performs some action equivalent to:for item in items: accumulator += function(item)pre_executor (
ExecutorBase instance) – Executor, used to calculate fileset metadata Defaults to executorchunksize (
int, optional) – Maximum number of entries to process at a time in the data frame, default: 100kmaxchunks (
int, optional) – Maximum number of chunks to process per dataset Defaults to processing the whole datasetmetadata_cache (
mapping, optional) – A dict-like object to use as a cache for (file, tree) metadata that is used to determine chunking. Defaults to a in-memory LRU cache that holds 100k entries (about 1MB depending on the length of filenames, etc.) If you edit an input file (please don’t) during a session, the session can be restarted to clear the cache.checkpointer (
CheckpointerABC, optional) – A CheckpointerABC instance to manage checkpointing of each chunk output
Attributes Summary
Methods Summary
__call__(fileset, processor_instance, *[, ...])Run the processor_instance on a given fileset
automatic_retries(retries, skipbadfiles, ...)This should probably defined on Executor-level.
get_cache(cachestrategy)metadata_fetcher_parquet(item)metadata_fetcher_root(xrootdtimeout, ...)preprocess(fileset, *[, treename, ...])Preprocess the fileset and generate work items
run(fileset, processor_instance, *[, ...])Run the processor_instance on a given fileset
Attributes Documentation
- cachestrategy: None | Literal['dask-worker'] | Callable[[...], MutableMapping] = None#
- checkpointer: CheckpointerABC | None = None#
- executor: ExecutorBase = <dataclasses._MISSING_TYPE object>#
- metadata_cache: MutableMapping | None = None#
- retries#
- skipbadfiles: bool | tuple[type[BaseException], ...] = False#
- use_dataframes#
Methods Documentation
-
__call__(fileset: dict, processor_instance: ProcessorABC | Callable[[
awkward.Array], Any], *, treename: str | None = None, uproot_options: dict | None = {}, iteritems_options: dict | None = {}, trace: Callable | None = None) Addable | MutableSet | MutableMapping[source]# Run the processor_instance on a given fileset
- Parameters:
fileset (
dict) – A dictionary{dataset: [file, file], }Optionally, if some files’ tree name differ, the dictionary can be specified:{dataset: {'treename': 'name', 'files': [file, file]}, }You can also define branches to preload per dataset:{dataset: {'preload': ['branch1', 'branch2'], 'files': [file, file]}, }processor_instance (
ProcessorABCorCallable) – An instance of a class deriving from ProcessorABC or a single-argument callabletreename (
str) – name of tree inside each root file, can beNone; treename can also be defined in fileset, which will override the passed treenameuproot_options (
dict, optional) – Any options to pass touproot.openiteritems_options (
dict, optional) – Any options to pass totree.iteritemstrace (
Callable, optional) – A tracing function that determines which columns a processing function accesses. It takes two arguments — a processing function (accepting events) and a NanoEvents array — and returns an iterable of column name strings. Seecoffea.nanoevents.trace.tracefor the default implementation. When provided and preprocessing is needed, tracing is performed and takes precedence over fileset-levelpreload.
- static automatic_retries(retries: int, skipbadfiles: bool | tuple[type[BaseException], ...], func, *args, **kwargs)[source]#
This should probably defined on Executor-level.
- static metadata_fetcher_root(xrootdtimeout: int, align_clusters: bool, uproot_options: dict, item: FileMeta) Addable | MutableSet | MutableMapping[source]#
- preprocess(fileset: dict, *, treename: str | None = None, uproot_options: dict | None = {}, trace: Callable | None = None, processor_instance: ProcessorABC | Callable | None = None) Generator[source]#
Preprocess the fileset and generate work items
- Parameters:
fileset (
dict) – A dictionary{dataset: [file, file], }Optionally, if some files’ tree name differ, the dictionary can be specified:{dataset: {'treename': 'name', 'files': [file, file]}, }You can also define a different tree name per file in the dictionary:{dataset: {'files': {file: 'name'}}, }You can also define branches to preload per dataset:{dataset: {'preload': ['branch1', 'branch2'], 'files': [file, file]}, }treename (
str) – name of tree inside each root file, can beNone; treename can also be defined in fileset, which will override the passed treenameuproot_options (
dict, optional) – Any options to pass touproot.opentrace (
Callable, optional) – A tracing function that determines which columns a processing function accesses. It takes two arguments — a processing function (accepting events) and a NanoEvents array — and returns an iterable of column name strings. Seecoffea.nanoevents.trace.tracefor the default implementation. When provided,processor_instancemust also be given. Tracing is performed using the first openable file per dataset and the result overrides anypreloadspecified in the fileset. IfNone, no tracing is performed and only fileset-levelpreload(if any) is used.processor_instance (
ProcessorABCorCallable, optional) – The processor whose column access will be traced. Required whentraceis provided.
-
run(fileset: dict | str | list[WorkItem] | Generator, processor_instance: ProcessorABC | Callable[[
awkward.Array], Any], *, treename: str | None = None, uproot_options: dict | None = {}, iteritems_options: dict | None = {}, trace: Callable | None = None) Addable | MutableSet | MutableMapping[source]# Run the processor_instance on a given fileset
- Parameters:
fileset (
dict | str | List[WorkItem] | Generator) –Fileset can be one of the following:
A dictionary
{dataset: [file, file], }Optionally, if some files’ tree name differ, the dictionary can be specified:{dataset: {'treename': 'name', 'files': [file, file]}, }You can also define a different tree name per file in the dictionary:{dataset: {'files': {file: 'name'}}, }You can also define branches to preload per dataset:{dataset: {'preload': ['branch1', 'branch2'], 'files': [file, file]}, }A single file name
File chunks for self.preprocess()
Chunk generator
processor_instance (
ProcessorABCorCallable) – An instance of a class deriving from ProcessorABC or a single-argument callabletreename (
str, optional) – name of tree inside each root file, can beNone; treename can also be defined in fileset, which will override the passed treename Not needed if processing premade chunksuproot_options (
dict, optional) – Any options to pass touproot.openiteritems_options (
dict, optional) – Any options to pass totree.iteritemstrace (
Callable, optional) – A tracing function that determines which columns a processing function accesses. It takes two arguments — a processing function (accepting events) and a NanoEvents array — and returns an iterable of column name strings. Seecoffea.nanoevents.trace.tracefor the default implementation. When provided and preprocessing is needed, tracing is performed and takes precedence over fileset-levelpreload.