pyluna-common.luna.common package#

Submodules#

pyluna-common.luna.common.CodeTimer module#

Created on November 02, 2020

@author: pashaa@mskcc.org

class luna.common.CodeTimer.CodeTimer(logger, name=None)[source]#

Bases: object

pyluna-common.luna.common.DataStore module#

pyluna-common.luna.common.GraphEnum module#

pyluna-common.luna.common.Neo4jConnection module#

class luna.common.Neo4jConnection.Neo4jConnection(uri, user, pwd)[source]#

Bases: object

close()[source]#
commute_all_sink_id(sc, sqlc, SINK_TYPE)[source]#

Spark connector for an input source id Returns a dataframe with one column named the sink/target ID

commute_cohort_id_to_spark(sc, sqlc, SOURCE_TYPE, SINK_TYPE, QUERY_ID)[source]#

Spark connector for an input cohort id Returns a dataframe with one column named the sink/target ID

commute_record_id_to_spark(sc, sqlc, SOURCE_TYPE, SINK_TYPE)[source]#

Spark connector for an input cohort id Returns a dataframe with one column named the sink/target ID

commute_sink_id_to_spark(sc, sqlc, SINK_TYPE, QUERY_ID)[source]#

A pass through function for consistency

commute_source_id_to_spark(sc, sqlc, SOURCE_TYPE, SINK_TYPE, QUERY_ID)[source]#

Spark connector for an input source id Returns a dataframe with one column named the sink/target ID

commute_source_id_to_spark_query(sc, sqlc, WHERE_CLAUSE, SINK_TYPE)[source]#

Spark connector for an input source id Returns a dataframe with one column named the sink/target ID

create_id_lookup_table(sc, sqlc, SOURCE_TYPE, SINK_TYPE, QUERY_ID)[source]#

Spark connector for an input source id Returns a dataframe with two ID columns as specified by SOURCE_TYPE and SINK_TYPE

create_id_lookup_table_where(sqlc, source, sink, r='ID_LINK|HAS_RECORD', WHERE_CLAUSE='')[source]#

Main method for creating lookup tables in a general manner.

Parameters
  • source – what ID type you are starting with (e.g. a dmp_patient_id) sink: what ID you wish to map to (e.g. a SeriesInstanceUID)

  • r – optional. Allowed relationship types. Default is ID_LINK|HAS_RECORD to contain mapping within a patient subgraph WHERE_CLAUSE: optional. source, sink (as nodes) and r (as relationships) become availabe for filtering in a WHERE clause

Returns

a dataframe with three columns, [ source | sink | pathspec ] where pathspec is a text-based representation of the path between the source and sink IDs

match_concept_node(QUERY_ID)[source]#

Get node(s) given a value input

query(query, db=None)[source]#

Runs a cyper query against the initalized driver

test_connection()[source]#
test_count()[source]#

Get node(s) given a value input

luna.common.Neo4jConnection.pretty_path(path)[source]#

pyluna-common.luna.common.Node module#

class luna.common.Node.Node(node_type, node_name, properties=None)[source]#

Bases: object

Node object defines the type and attributes of a graph node.

Param

node_type: node type. e.g. scan

Param

name: required node name. e.g. scan-123

Param

properties: dict of key, value pairs for the node.

get_address()[source]#

Returns current node address

get_all_props()[source]#

Name is a required field, but it’s still a property of this node. Return the properties as a dict including the name property!

get_create_str()[source]#

Returns a string representation of the node with all properties

get_map_str()[source]#

Returns the properties as a cypher map

get_match_str()[source]#

Returns a string representation of the node with only the qualified_address as a property

static get_qualified_name(*args)[source]#

Returns the full name given a namespace and patient ID

static prop_str(fields, row)[source]#

Returns a kv string like ‘id: 123, …’ where prop values come from row.

static prop_str_repr(fields, row)[source]#

Returns a kv string like ‘ - id: 123 <newline> …’ where prop values come from row.

set_aux(aux)[source]#
set_data(data)[source]#
set_namespace(namespace_id: str, subspace_id=None)[source]#

Sets the namespace for this Node commits

Params

namespace_id - namespace value

Params

subspace_id - subspace value, optional

pyluna-common.luna.common.PipelineBuilder module#

luna.common.PipelineBuilder.load(stream)[source]#

pyluna-common.luna.common.config module#

Created on October 17, 2019

@author: pashaa@mskcc.org

class luna.common.config.ConfigSet(name=None, config_file=None, schema_file=None)[source]#

Bases: object

This is a singleton class that can load a collection of configurations from yaml files.

ConfigSet loads configurations from yaml files only once on first invocation of this class with the specified yaml file. The class then maintains the configuration in memory in a singleton instance. All new invocations of this class will serve up the same configuration.

Each configuration in the collection is identified by a logical name.

If a new invocation of this class is created with an existing logical name and a different yaml file, the singleton instance replaces the existing configuration with the newly specified yaml file for the given logical name.

clear()[source]#

clear the entire collection of configurations

get_config_set(name)[source]#
Parameters

name – logical name of the configuration

Returns

a dictonary of top-level keys in the config stored in this instance.

Raises

ValueError if a configuration with the specified name was never loaded

get_keys(name)[source]#
Parameters

name – logical name of the configuration

Returns

a list of top-level keys in the config stored in this instance.

Raises

ValueError if a configuration with the specified name was never loaded

get_names()[source]#
Returns

a list of logical names of the configs stored in this instance.

get_value(path)[source]#

Gets the value for the specified jsonpath from the specified configuration.

Parameters
  • path (str) – path to a value in a configuration. The path must be of the form “name::jsonpath”

  • value. (where name is the logical name of the configuration and jsonpath is the jsonpath to) –

  • https (jsonpath expressions may be tested here -) – //pypi.org/project/jsonpath-ng/

  • https – //jsonpath.com/

Returns

value from config file

Return type

str

Raises
  • ValueError – if no match is found for the specified exception or a configuration with

  • the specified name was never loaded

has_value(path)[source]#
Parameters
  • path (str) – path to a value in a configuration. The path must be of the form

  • value. ("name::jsonpath" where name is the logical name of the configuration and jsonpath is the jsonpath to) –

  • https (may be tested here -) – //pypi.org/project/jsonpath-ng/ jsonpath expressions

  • https – //jsonpath.com/

Returns

true if value is not an empty string, else false.

Return type

boolean

Raises

ValueError – if a configuration with the specified name was never loaded

pyluna-common.luna.common.constants module#

Created on November 16, 2020

@author: rosed2@mskcc.org

luna.common.constants.CONFIG_LOCATION(cfg)[source]#
luna.common.constants.PROJECT_LOCATION(cfg)[source]#

ROOT_PATH is a path to mind data e.g. /gpfs/mind/data or hdfs://server:port/data

Parameters

cfg

Returns

ROOT_PATH/PROJECT_NAME

luna.common.constants.TABLE_LOCATION(cfg, is_source=False)[source]#
luna.common.constants.TABLE_NAME(cfg, is_source=False)[source]#

pyluna-common.luna.common.custom_logger module#

class luna.common.custom_logger.MultilineFormatter(fmt=None, datefmt=None, style='%')[source]#

Bases: logging.Formatter

format(record: logging.LogRecord)[source]#

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

luna.common.custom_logger.init_logger(filename='data-processing.log')[source]#

pyluna-common.luna.common.dask module#

exception luna.common.dask.JobExecutionError[source]#

Bases: Exception

luna.common.dask.dask_job(job_name)[source]#

The simplier version of a dask job decorator, which only provides the worker_client as a runner to the calling function

Examples

>>> @dask_job('my_job')
>>> my_job(args, kwargs, runner=None):
>>>     runner.submit(sleep, 10)
luna.common.dask.get_local_dask_directory()[source]#
luna.common.dask.get_or_create_dask_client()[source]#
luna.common.dask.prune_empty_delayed(tasks)[source]#

A less-than-ideal method to prune empty tasks from dask tasks Here we’re trading CPU and time for memory.

Parameters

tasks (list) – list of delayed dask tasks

Returns

a reduced list of delayed dask tasks

Return type

list[dask.delayed]

luna.common.dask.with_event_loop(func)[source]#

This method decorates functions run on dask workers with an async function call Namely, this allows us to manage the execution of a function a bit better, and especially, to exit job execution if things take too long (1hr)

Here, the function func is run in a background thread, and has access to the dask schedular through the ‘runner’. Critically, sumbission to this runner/client looks the same regardless of if it occurs in a sub-process/thread

Mostly, this is a workaround to impliment some form of timeout when running very long-tasks on dask. While one cannot (or should not) kill the running thread, Dask will cleanup the child tasks eventually once all jobs finish.

Examples

>>> @with_dask_event_loop
>>> my_job(args, kwargs, runner=None):
>>>     runner.submit(sleep, 10)

pyluna-common.luna.common.sparksession module#

class luna.common.sparksession.SparkConfig[source]#

Bases: object

spark_session(config_name, app_name)[source]#

@:param config_name logical name of the configuration to use from the ConfigSet. See luna/common/config.py @:param app_name application name to give to the spark session. This name will be used in the spark logs.

pyluna-common.luna.common.utils module#

luna.common.utils.apply_csv_filter(input_paths, subset_csv=None)[source]#

Filteres a list of input_paths based on include/exclude logic given for either the full path, filename, or filestem

If using “include” logic, only matching entries with include=True are kept. If using “exclude” logic, only matching entries with exclude=True are removed.

The origional list is returned if the given subset_csv is None or empty

Parameters
  • input_paths (list[str]) – list of input paths to filter

  • subset_csv (str) – path to a csv with subset/filter information/flags

Returns

list[str]: filtered list

Raises

RuntimeError – If the given subset_csv is invalid

luna.common.utils.clean_nested_colname(s)[source]#

Removes map name for MapType columns. e.g. metadata.SeriesInstanceUID -> SeriesInstanceUID

luna.common.utils.cli_runner(cli_kwargs: dict, cli_params: List[tuple], cli_function: Callable[[...], dict], pass_keys: bool = False)[source]#

For special input_* parameters, see if we should infer the input given an output/result directory

Parameters
  • cli_kwargs (dict) – keyword arguments from the CLI call

  • cli_params (List[tuple]) – param list, where each element is the parameter (name, type)

  • cli_function (Callable[..., dict]) – cli_function entry point, should accept exactly the arguments given by cli_params

  • pass_keys (bool) – will pass found segment keys to transform function as ‘keys’ kwarg

Returns

None

luna.common.utils.does_not_contain(token, value)[source]#

Validate that token is not a substring of value

Param

token: string e.g. : | .

Param

value: dictionary, list, or str

luna.common.utils.expand_inputs(given_params: dict)[source]#

For special input_* parameters, see if we should infer the input given an output/result directory

Parameters

given_params (dict) – keyword arguments to check types

Returns

Input- expanded keyword argument dictonary

Return type

dict

luna.common.utils.generate_uuid(path, prefix)[source]#

Returns hash of the file given path, preceded by the prefix. :param path: file path e.g. file:/path/to/file :param prefix: list e.g. [“SVGEOJSON”,”default-label”] :return: string uuid

luna.common.utils.generate_uuid_binary(content, prefix)[source]#

Returns hash of the binary, preceded by the prefix. :param content: binary :param prefix: list e.g. [“FEATURE”] :return: string uuid

luna.common.utils.generate_uuid_dict(json_str, prefix)[source]#

Returns hash of the json string, preceded by the prefix. :param json_str: str representation of json :param prefix: list e.g. [“SVGEOJSON”,”default-label”] :return: v

luna.common.utils.get_absolute_path(module_path, relative_path)[source]#

Given the path to a module file and the path, relative to the module file, of another file that needs to be referenced in the module, this method returns the absolute path of the file that needs to be referenced.

This method makes it possible to resolve absolute paths to files in any environment a module and the referenced files are deployed to.

:param module_path path to the module. Use ‘__file__’ from the module. :param relative_path path to the file that needs to be referenced by the module. The path must be relative to the module. :return absolute path to file with the specified relative_path

luna.common.utils.get_dataset_url()[source]#

Retrieve a “dataset URL” from the environment, may look like http://localhost:6077 or file:///absolute/path/to/dataset/dir

luna.common.utils.get_method_data(cohort_id, method_id)[source]#

Return method dict

Param

cohort_id: string

Param

method_id: string

luna.common.utils.grouper(iterable, n)[source]#

Turn an iterable into an iterable of iterables

‘None’ should not be a member of the input iterable as it is removed to handle the fillvalues

Parameters
  • iterable (iterable) – an iterable

  • n (int) – sie of chunks

  • fillvalue

Returns

iterable[iterable]

luna.common.utils.load_func(dotpath: str)[source]#

load function in module from a parsed yaml string

Parameters

dotpath (str) – module/function name written as a string (ie torchvision.models.resnet34)

Returns

The inferred module itself, not the string representation

luna.common.utils.post_to_dataset(input_feature_data, waystation_url, dataset_id, keys)[source]#

Interface feature data to a parquet dataset

Parameters
  • input_feature_data (str) – path to input data

  • waystation_url (str) – URL of dataset root (either file or using waystation)

  • dataset_id (str) – Dataset name/ID

  • keys (dict) – corresponding segment keys

luna.common.utils.rebase_schema_numeric(df)[source]#

Tries to convert all columns in a dataframe to numeric types, if possible, with integer types taking precident

Note: this is an in-place operation

Parameters

df (pd.DataFrame) – dataframe to convert columns

luna.common.utils.replace_token(token, token_replacement, value)[source]#

Replace token with token_replacement in value

Param

token: string e.g. : | .

Param

token_replacement: string e.g. _ -

Param

value: dictionary, list, or str

luna.common.utils.to_sql_field(s)[source]#
luna.common.utils.to_sql_value(s)[source]#
luna.common.utils.validate_params(given_params: dict, params_list: List[tuple])[source]#

Ensure that a dictonary of params or keyword arguments is correct given a parameter list

Checks that neccessary parameters exist, and that their type can be casted corretly. There’s special logic for list and dictonary types. JSON arguments are parsed as dict types.

Parameters
  • given_params (dict) – keyword arguments to check types

  • params_list (List[tuple]) – param list, where each element is the parameter (param, type)

Returns

Validated and casted keyword argument dictonary

Return type

dict

Module contents#