Source code for luna.common.dask

import os
import logging
import dask
import asyncio

from dask.distributed import Client
from dask.distributed import worker_client, get_client, get_worker
from functools import partial

logger = logging.getLogger(__name__)

LENGTH_MANY_TASKS = 50000 # When should we warn the user they are possibly paying a high price?

[docs]def prune_empty_delayed(tasks): """ A less-than-ideal method to prune empty tasks from dask tasks Here we're trading CPU and time for memory. Args: tasks (list): list of delayed dask tasks Returns: list[dask.delayed]: a reduced list of delayed dask tasks """ @dask.delayed def mock_return(task): return task if len(tasks) >= LENGTH_MANY_TASKS: logger.warning(f"Hope you're okay with a length={len(tasks)} for loop!") return [mock_return(task) for task in dask.compute(*tasks) if task is not None]
[docs]def get_or_create_dask_client(): try: client = get_client() except ValueError: client = Client(threads_per_worker=1) return client
[docs]def get_local_dask_directory(): local_directory = dask.config.get("temporary-directory") or os.getcwd() local_directory = os.path.join(local_directory, "dask-worker-space") return local_directory
[docs]class JobExecutionError(Exception): pass
[docs]def with_event_loop(func): """ This method decorates functions run on dask workers with an async function call Namely, this allows us to manage the execution of a function a bit better, and especially, to exit job execution if things take too long (1hr) Here, the function func is run in a background thread, and has access to the dask schedular through the 'runner'. Critically, sumbission to this runner/client looks the same regardless of if it occurs in a sub-process/thread Mostly, this is a workaround to impliment some form of timeout when running very long-tasks on dask. While one cannot (or should not) kill the running thread, Dask will cleanup the child tasks eventually once all jobs finish. Examples: >>> @with_dask_event_loop >>> my_job(args, kwargs, runner=None): >>> runner.submit(sleep, 10) """ async def wrapped(*args, **kwargs): loop = asyncio.get_event_loop() # Get our current dask worker, functions wrapped with this method can only be run on dask workers logger.info ("Initializing job... getting parent worker") try: worker = get_worker() except ValueError as exc: logger.error("Could not get dask worker!") raise RuntimeError("Data-processing job called without parent dask worker") except Exception as exc: logger.exception(f"Unknown exception when getting dask worker") logger.info (f"Successfully found worker {worker}") logger.info (f"Running job {func} with args: {args}, kwargs: {kwargs}") # Get our worker client, and pass as a dask client exector with worker_client() as runner: # We'll run our function in a background thread # executor = ProcessPoolExecutor(max_workers=1) # Add our runner to kwargs kwargs['runner'] = runner # Kick off the job job = loop.run_in_executor(worker.executor, partial(func, *args, **kwargs)) # Move on from job if things take more than hour done, pending = await asyncio.wait([job], timeout=3600) # Do some cleanup if len(pending) != 0: logger.warning ("Killing pending tasks!") for task in pending: task.cancel() # executor.shutdown(wait=False) # Get the return value if len(done) == 1: return_value = done.pop().result() else: return_value = None # Logg that we're done! logger.info (f"Done running job, returning {return_value}") return return_value def run_loop(*args, **kwargs): """ Uses async and threading capabilities Use of background thread causes this error on shutdown: ERROR - asyncio - task: <Task pending coro=<HTTP1ServerConnection._server_request_loop() running at /gpfs/mskmindhdp_emc/sw/env/lib64/python3.6/site-packages/tornado/http1connection.py:817> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f52e8259318>()]> cb=[IOLoop.add_future.<locals>.<lambda>() at /gpfs/mskmindhdp_emc/sw/env/lib64/python3.6/site-packages/tornado/ioloop.py:690]> Seems like some async task gets hung up in the child thread... """ loop = asyncio.new_event_loop() result = loop.run_until_complete(wrapped(*args, **kwargs)) loop.close() return result
[docs]def dask_job(job_name): """ The simplier version of a dask job decorator, which only provides the worker_client as a runner to the calling function Examples: >>> @dask_job('my_job') >>> my_job(args, kwargs, runner=None): >>> runner.submit(sleep, 10) """ def wrapped(func): def run_simple(namespace, index, *args, **kwargs): """ Only provides runner object to method, no threading """ # Tell us we are running logger.info (f"Initializing {job_name} @ {namespace}/{index}") # See if we are on a dask worker try: worker = get_worker() except ValueError as exc: logger.warning("Could not get dask worker!") worker = None except Exception as exc: logger.exception(f"Unknown exception when getting dask worker") worker = None logger.info (f"Successfully found worker {worker}") # Jobs get an output directory and an ouput parquet slice output_dir = os.path.join(os.environ['MIND_GPFS_DIR'], "data_dev", namespace, index) output_ds = os.path.join(os.environ['MIND_GPFS_DIR'], "data_dev", namespace, job_name.upper() + "_DS") os.makedirs(output_dir, exist_ok=True) os.makedirs(output_ds, exist_ok=True) output_segment = os.path.join(output_ds, f"ResultSegment-{index}.parquet") logger.info(f"Setup ouput dir={output_dir} slice={output_ds}") # Kick off the job try: logger.info (f"Running job {func} with args: {args}, kwargs: {kwargs}") return_value = func (index, output_dir, output_segment, *args, **kwargs ) except Exception as exc: logger.exception(f"Job execution failed due to: {exc}", extra={ "namespace":namespace, "key": index}) raise JobExecutionError(f"Job {func} did not run successfully, please check input data! {args}, {kwargs}") return return_value run_simple.__name__ = job_name return run_simple return wrapped