Runner#

class coffea.processor.Runner(executor: ~coffea.processor.executor.ExecutorBase, pre_executor: ~coffea.processor.executor.ExecutorBase | None = None, chunksize: int = 100000, maxchunks: int | None = None, metadata_cache: ~collections.abc.MutableMapping | None = None, skipbadfiles: bool | tuple[type[BaseException], ...] = False, xrootdtimeout: int | None = 60, align_clusters: bool = False, savemetrics: bool = False, use_result_type: bool = False, schema: ~coffea.nanoevents.schemas.base.BaseSchema | None = <class 'coffea.nanoevents.schemas.nanoaod.NanoAODSchema'>, processor_compression: int = 1, format: str = 'root', checkpointer: ~coffea.processor.checkpointer.CheckpointerABC | None = None, cachestrategy: None | ~typing.Literal['dask-worker'] | ~collections.abc.Callable[[...], ~collections.abc.MutableMapping] = None)[source]#

Bases: object

A tool to run a processor using uproot for data delivery

A convenience wrapper to submit jobs for a file set, which is a dictionary of dataset: [file list] entries. Supports only uproot TTree reading, via NanoEvents. For more customized processing, e.g. to read other objects from the files and pass them into data frames, one can write a similar function in their user code.

Parameters:
  • executor (ExecutorBase instance) – Executor, which implements a callable with inputs: items, function, accumulator and performs some action equivalent to: for item in items: accumulator += function(item)

  • pre_executor (ExecutorBase instance) – Executor, used to calculate fileset metadata Defaults to executor

  • chunksize (int, optional) – Maximum number of entries to process at a time in the data frame, default: 100k

  • maxchunks (int, optional) – Maximum number of chunks to process per dataset Defaults to processing the whole dataset

  • metadata_cache (mapping, optional) – A dict-like object to use as a cache for (file, tree) metadata that is used to determine chunking. Defaults to a in-memory LRU cache that holds 100k entries (about 1MB depending on the length of filenames, etc.) If you edit an input file (please don’t) during a session, the session can be restarted to clear the cache.

  • checkpointer (CheckpointerABC, optional) – A CheckpointerABC instance to manage checkpointing of each chunk output

  • use_result_type (bool, optional) – If True, __call__ returns Ok(output) or Err(exception) instead of raising. Requires skipbadfiles to be set (True or a tuple of exception types); the same set of exception types controls which errors get captured as Err — anything outside that set still propagates. If False (default), returns the output directly and raises on error.

Attributes Summary

Methods Summary

__call__(fileset, processor_instance, *[, ...])

Run the processor_instance on a given fileset

automatic_retries(retries, skipbadfiles, ...)

This should probably defined on Executor-level.

get_cache(cachestrategy)

metadata_fetcher_parquet(item)

metadata_fetcher_root(xrootdtimeout, ...)

preprocess(fileset, *[, treename, ...])

Preprocess the fileset and generate work items

run(fileset, processor_instance, *[, ...])

Run the processor_instance on a given fileset

Attributes Documentation

align_clusters: bool = False#
cachestrategy: None | Literal['dask-worker'] | Callable[[...], MutableMapping] = None#
checkpointer: CheckpointerABC | None = None#
chunksize: int = 100000#
executor: ExecutorBase = <dataclasses._MISSING_TYPE object>#
format: str = 'root'#
maxchunks: int | None = None#
metadata_cache: MutableMapping | None = None#
pre_executor: ExecutorBase | None = None#
processor_compression: int = 1#
retries#
savemetrics: bool = False#
skipbadfiles: bool | tuple[type[BaseException], ...] = False#
use_dataframes#
use_result_type: bool = False#
xrootdtimeout: int | None = 60#

Methods Documentation

__call__(fileset: dict, processor_instance: ProcessorABC | Callable[[awkward.Array], Any], *, treename: str | None = None, uproot_options: dict | None = {}, iteritems_options: dict | None = {}, trace: Callable | None = None) Result | Addable | MutableSet | MutableMapping[source]#

Run the processor_instance on a given fileset

Parameters:
  • fileset (dict) – A dictionary {dataset: [file, file], } Optionally, if some files’ tree name differ, the dictionary can be specified: {dataset: {'treename': 'name', 'files': [file, file]}, } You can also define branches to preload per dataset: {dataset: {'preload': ['branch1', 'branch2'], 'files': [file, file]}, }

  • processor_instance (ProcessorABC or Callable) – An instance of a class deriving from ProcessorABC or a single-argument callable

  • treename (str) – name of tree inside each root file, can be None; treename can also be defined in fileset, which will override the passed treename

  • uproot_options (dict, optional) – Any options to pass to uproot.open

  • iteritems_options (dict, optional) – Any options to pass to tree.iteritems

  • trace (Callable, optional) – A tracing function that determines which columns a processing function accesses. It takes two arguments — a processing function (accepting events) and a NanoEvents array — and returns an iterable of column name strings. See coffea.nanoevents.trace.trace for the default implementation. When provided and preprocessing is needed, tracing is performed and takes precedence over fileset-level preload.

Returns:

When use_result_type=True, returns Ok(output) or Err(exception). When use_result_type=False (default), returns the output directly and raises on error. When savemetrics=True, the output value is (output, metrics).

Return type:

Result or Accumulatable

static automatic_retries(retries: int, skipbadfiles: bool | tuple[type[BaseException], ...], func, *args, use_result_type: bool = False, **kwargs)[source]#

This should probably defined on Executor-level.

static get_cache(cachestrategy)[source]#
static metadata_fetcher_parquet(item: FileMeta)[source]#
static metadata_fetcher_root(xrootdtimeout: int, align_clusters: bool, uproot_options: dict, item: FileMeta) Addable | MutableSet | MutableMapping[source]#
preprocess(fileset: dict, *, treename: str | None = None, uproot_options: dict | None = {}, trace: Callable | None = None, processor_instance: ProcessorABC | Callable | None = None) Generator[source]#

Preprocess the fileset and generate work items

Parameters:
  • fileset (dict) – A dictionary {dataset: [file, file], } Optionally, if some files’ tree name differ, the dictionary can be specified: {dataset: {'treename': 'name', 'files': [file, file]}, } You can also define a different tree name per file in the dictionary: {dataset: {'files': {file: 'name'}}, } You can also define branches to preload per dataset: {dataset: {'preload': ['branch1', 'branch2'], 'files': [file, file]}, }

  • treename (str) – name of tree inside each root file, can be None; treename can also be defined in fileset, which will override the passed treename

  • uproot_options (dict, optional) – Any options to pass to uproot.open

  • trace (Callable, optional) – A tracing function that determines which columns a processing function accesses. It takes two arguments — a processing function (accepting events) and a NanoEvents array — and returns an iterable of column name strings. See coffea.nanoevents.trace.trace for the default implementation. When provided, processor_instance must also be given. Tracing is performed using the first openable file per dataset and the result overrides any preload specified in the fileset. If None, no tracing is performed and only fileset-level preload (if any) is used.

  • processor_instance (ProcessorABC or Callable, optional) – The processor whose column access will be traced. Required when trace is provided.

Returns:

chunks – A generator yielding WorkItem chunks ready to be passed to run(). The caller may .send(new_chunksize) to adjust the target chunksize on the fly; the generator’s return value is the final chunksize used.

Return type:

Generator[WorkItem, int, int]

run(fileset: dict | str | list[WorkItem] | Generator, processor_instance: ProcessorABC | Callable[[awkward.Array], Any], *, treename: str | None = None, uproot_options: dict | None = {}, iteritems_options: dict | None = {}, trace: Callable | None = None) Result | Addable | MutableSet | MutableMapping[source]#

Run the processor_instance on a given fileset

Parameters:
  • fileset (dict | str | List[WorkItem] | Generator) –

    Fileset can be one of the following:

    • A dictionary {dataset: [file, file], } Optionally, if some files’ tree name differ, the dictionary can be specified: {dataset: {'treename': 'name', 'files': [file, file]}, } You can also define a different tree name per file in the dictionary: {dataset: {'files': {file: 'name'}}, } You can also define branches to preload per dataset: {dataset: {'preload': ['branch1', 'branch2'], 'files': [file, file]}, }

    • A single file name

    • File chunks for self.preprocess()

    • Chunk generator

  • processor_instance (ProcessorABC or Callable) – An instance of a class deriving from ProcessorABC or a single-argument callable

  • treename (str, optional) – name of tree inside each root file, can be None; treename can also be defined in fileset, which will override the passed treename Not needed if processing premade chunks

  • uproot_options (dict, optional) – Any options to pass to uproot.open

  • iteritems_options (dict, optional) – Any options to pass to tree.iteritems

  • trace (Callable, optional) – A tracing function that determines which columns a processing function accesses. It takes two arguments — a processing function (accepting events) and a NanoEvents array — and returns an iterable of column name strings. See coffea.nanoevents.trace.trace for the default implementation. When provided and preprocessing is needed, tracing is performed and takes precedence over fileset-level preload.

Returns:

When use_result_type=True, returns Ok(output) on success and Err(exception) on failure (exceptions are captured, not raised). When use_result_type=False (default), returns the raw output dict and exceptions propagate. See __call__ for the user-facing output with savemetrics / use_dataframes extraction applied.

Return type:

Result or Accumulatable