pyluna-common.luna.common package

Submodules

pyluna-common.luna.common.CodeTimer module

Created on November 02, 2020

@author: pashaa@mskcc.org

class luna.common.CodeTimer.CodeTimer(logger, name=None)[source]

Bases: object

pyluna-common.luna.common.DataStore module

class luna.common.DataStore.DataStore(params)[source]

Bases: object

DataStore: an abstraction with an id, name, namespace, type, and a list of associated data nodes

Interfaces with a metadata store (graph DB) and raw file stores (gpfs, potentially others)

Handles the matching and creation of metadata

Example usage:

$ container = luna.common.GraphEnum.DataStore( params ).setNamespace(“test”).setContainer(“1.2.840…”) > Connecting to: neo4j://localhost:7687 > Connection successfull: True > Running on: localhost > Lookup ID: 1.2.840… > Found: [<Record id(container)=7091 labels(container)=[‘scan’] container.type=’scan’ container.name=’1.2.840…>] > Match on: WHERE id(container) = 7091 > Successfully attached to: scan 1.2.840…

$ node = Node(“dicom”, “DCM-0123”, {“Modality”:”CT”, “path”:”file:/some/path/1.dcm”})

$ container.put(node) > Adding: test-0000 DataStore has 1 pending commits

$ > Committing dicom:globals{ hash: ‘abc123’ name: ‘my-dicom’, qualified_address: ‘test::1.2.840::my-dicom’, namespace: ‘test’, type: ‘dicom’ , path: ‘file:/some/path/1.dcm’}

$ container.get(“dicom”, “my-dicom”).path > /some/path/1.dcm

$ container.get(“dicom”, “my-dicom”).properties[‘Modality’] > ‘CT’

The container includes a logging method: $ container.logger.info(“I am processing the CT”) > ‘yyyy-mm-dd h:m:s,ms - DataStore [1] - INFO - I am processing the CT’

add(*args)[source]
createDatastore(container_id, container_type)[source]

Checks if the node referenced by container_id is a valid container, queries the metastore for relevant metadata

Params

container_id - unique container ID

Params

type - the type of the container

createNamespace(namespace_id: str)[source]

Creates a namesapce, if it doesn’t exist, else, tells you it exists

Params

namespace_id - namespace value

get(type, name)[source]

Query graph DB container node for dependent data nodes, and return one Parses the path field URL for various cases, and sets the node.data an node.aux attribute with a corrected path Note: namespace is not a default filter for get nodes, but is for adding them (i.e., one can write data under a different namespace)

Params

type - the type of data designed e.g. radiomics, mha, dicom, png, svs, geojson, etc.

Params

name - can be used to filter nodes e.g. name of the node in the subspace of the container (e.g. generate-mhd)

Example

get(“mhd”, “generate-mhd”) gets data of type “mhd” generated from the method “generate-mhd” in this container’s context/subspace

isAttached()[source]

Returns true if container was properly attached (i.e. checks in setDatastore succeeded), else False

put(node: luna.common.Node.Node)[source]

Adds a node to a temporary dictonary that will be used to save/commit nodes to the relevant databases If you add the same node under the same name, no change as the Decorates the node with the container’s namespace

Param

node - node object

static run(namespace, container_id, pipeline)[source]

Runner for pipelined jobs

runDaskDistributed(pipeline, client)[source]

Submit functions to dask workers. Dask can track dependencies via a semaphore future, so we pass that explicitly and submit each function individually

Params

pipeline - an ordered list of (function, params) tuples to execute

Params

client - a dask client

runLocal(pipeline)[source]

Run a pipeline in the main thread, blocking.

Params

pipeline - an ordered list of (function, params) tuples to execute

runProcessPoolExecutor(pipeline, executor)[source]

Use a process pool executor to run full pipelines in background

Params

pipeline - an ordered list of (function, params) tuples to execute

Params

executor - a ProcessPoolExecutor passed from a parent script

saveAll(*args)[source]
setDatastore(container_id)[source]

Checks if the node referenced by container_id is a valid datastore, queries the metastore for relevant metadata

Params

container_id - the unique container ID, either as an integer (neo4j autopopulated ID) or as a string (the Qualified Path)

setNamespace(namespace_id: str)[source]

Sets the namespace for this container’s commits, if it exists

Params

namespace_id - namespace value

class luna.common.DataStore.DataStore_v2(store_location)[source]

Bases: object

ensure_datastore(datastore_id, datastore_type)[source]
Params

datastore_id - unique container ID

Params

datastore_type - the type of the container

get(store_id, namespace_id, data_type, data_tag='data', realpath=True)[source]

Looks up and returns the path of data given the store_id, namespace_id, data_type, and data_tag

put(filepath, store_id, namespace_id, data_type, data_tag='data', metadata={}, symlink=False)[source]

Puts the file at filepath at the proper location given a store_id, namespace_id, data_type, and data_tag, and save metadata to DB

write(iostream, store_id, namespace_id, data_type, data_tag, metadata={}, dtype='w')[source]

Writes iostream at the proper location given a store_id, namespace_id, data_type, and data_tag, and save metadata to DB

luna.common.DataStore.bootstrap(container_id)[source]

pyluna-common.luna.common.EnsureByteContext module

Created on November 04, 2020

@author: aukermaa@mskcc.org

class luna.common.EnsureByteContext.EnsureByteContext[source]

Bases: object

pyluna-common.luna.common.GraphEnum module

pyluna-common.luna.common.Neo4jConnection module

class luna.common.Neo4jConnection.Neo4jConnection(uri, user, pwd)[source]

Bases: object

close()[source]
commute_all_sink_id(sc, sqlc, SINK_TYPE)[source]

Spark connector for an input source id Returns a dataframe with one column named the sink/target ID

commute_cohort_id_to_spark(sc, sqlc, SOURCE_TYPE, SINK_TYPE, QUERY_ID)[source]

Spark connector for an input cohort id Returns a dataframe with one column named the sink/target ID

commute_record_id_to_spark(sc, sqlc, SOURCE_TYPE, SINK_TYPE)[source]

Spark connector for an input cohort id Returns a dataframe with one column named the sink/target ID

commute_sink_id_to_spark(sc, sqlc, SINK_TYPE, QUERY_ID)[source]

A pass through function for consistency

commute_source_id_to_spark(sc, sqlc, SOURCE_TYPE, SINK_TYPE, QUERY_ID)[source]

Spark connector for an input source id Returns a dataframe with one column named the sink/target ID

commute_source_id_to_spark_query(sc, sqlc, WHERE_CLAUSE, SINK_TYPE)[source]

Spark connector for an input source id Returns a dataframe with one column named the sink/target ID

create_id_lookup_table(sc, sqlc, SOURCE_TYPE, SINK_TYPE, QUERY_ID)[source]

Spark connector for an input source id Returns a dataframe with two ID columns as specified by SOURCE_TYPE and SINK_TYPE

create_id_lookup_table_where(sqlc, source, sink, r='ID_LINK|HAS_RECORD', WHERE_CLAUSE='')[source]

Main method for creating lookup tables in a general manner.

Parameters
  • source – what ID type you are starting with (e.g. a dmp_patient_id) sink: what ID you wish to map to (e.g. a SeriesInstanceUID)

  • r – optional. Allowed relationship types. Default is ID_LINK|HAS_RECORD to contain mapping within a patient subgraph WHERE_CLAUSE: optional. source, sink (as nodes) and r (as relationships) become availabe for filtering in a WHERE clause

Returns

a dataframe with three columns, [ source | sink | pathspec ] where pathspec is a text-based representation of the path between the source and sink IDs

match_concept_node(QUERY_ID)[source]

Get node(s) given a value input

query(query, db=None)[source]

Runs a cyper query against the initalized driver

test_connection()[source]
test_count()[source]

Get node(s) given a value input

luna.common.Neo4jConnection.pretty_path(path)[source]

pyluna-common.luna.common.Node module

class luna.common.Node.Node(node_type, node_name, properties=None)[source]

Bases: object

Node object defines the type and attributes of a graph node.

Param

node_type: node type. e.g. scan

Param

name: required node name. e.g. scan-123

Param

properties: dict of key, value pairs for the node.

get_address()[source]

Returns current node address

get_all_props()[source]

Name is a required field, but it’s still a property of this node. Return the properties as a dict including the name property!

get_create_str()[source]

Returns a string representation of the node with all properties

get_map_str()[source]

Returns the properties as a cypher map

get_match_str()[source]

Returns a string representation of the node with only the qualified_address as a property

static get_qualified_name(*args)[source]

Returns the full name given a namespace and patient ID

static prop_str(fields, row)[source]

Returns a kv string like ‘id: 123, …’ where prop values come from row.

static prop_str_repr(fields, row)[source]

Returns a kv string like ‘ - id: 123 <newline> …’ where prop values come from row.

set_aux(aux)[source]
set_data(data)[source]
set_namespace(namespace_id: str, subspace_id=None)[source]

Sets the namespace for this Node commits

Params

namespace_id - namespace value

Params

subspace_id - subspace value, optional

pyluna-common.luna.common.PipelineBuilder module

luna.common.PipelineBuilder.load(stream)[source]

pyluna-common.luna.common.config module

Created on October 17, 2019

@author: pashaa@mskcc.org

class luna.common.config.ConfigSet(name=None, config_file=None, schema_file=None)[source]

Bases: object

This is a singleton class that can load a collection of configurations from yaml files.

ConfigSet loads configurations from yaml files only once on first invocation of this class with the specified yaml file. The class then maintains the configuration in memory in a singleton instance. All new invocations of this class will serve up the same configuration.

Each configuration in the collection is identified by a logical name.

If a new invocation of this class is created with an existing logical name and a different yaml file, the singleton instance replaces the existing configuration with the newly specified yaml file for the given logical name.

clear()[source]

clear the entire collection of configurations

get_config_set(name)[source]
Parameters

name – logical name of the configuration

Returns

a dictonary of top-level keys in the config stored in this instance.

Raises

ValueError if a configuration with the specified name was never loaded

get_keys(name)[source]
Parameters

name – logical name of the configuration

Returns

a list of top-level keys in the config stored in this instance.

Raises

ValueError if a configuration with the specified name was never loaded

get_names()[source]
Returns

a list of logical names of the configs stored in this instance.

get_value(path)[source]

Gets the value for the specified jsonpath from the specified configuration.

Parameters
  • path (str) – path to a value in a configuration. The path must be of the form “name::jsonpath”

  • value. (where name is the logical name of the configuration and jsonpath is the jsonpath to) –

  • https (jsonpath expressions may be tested here -) – //pypi.org/project/jsonpath-ng/

  • https – //jsonpath.com/

Returns

value from config file

Return type

str

Raises
  • ValueError – if no match is found for the specified exception or a configuration with

  • the specified name was never loaded

has_value(path)[source]
Parameters
  • path (str) – path to a value in a configuration. The path must be of the form

  • value. ("name::jsonpath" where name is the logical name of the configuration and jsonpath is the jsonpath to) –

  • https (may be tested here -) – //pypi.org/project/jsonpath-ng/ jsonpath expressions

  • https – //jsonpath.com/

Returns

true if value is not an empty string, else false.

Return type

boolean

Raises

ValueError – if a configuration with the specified name was never loaded

pyluna-common.luna.common.constants module

Created on November 16, 2020

@author: rosed2@mskcc.org

luna.common.constants.CONFIG_LOCATION(cfg)[source]
luna.common.constants.PROJECT_LOCATION(cfg)[source]

ROOT_PATH is a path to mind data e.g. /gpfs/mind/data or hdfs://server:port/data

Parameters

cfg

Returns

ROOT_PATH/PROJECT_NAME

luna.common.constants.TABLE_LOCATION(cfg, is_source=False)[source]
luna.common.constants.TABLE_NAME(cfg, is_source=False)[source]

pyluna-common.luna.common.custom_logger module

class luna.common.custom_logger.MultilineFormatter(fmt=None, datefmt=None, style='%')[source]

Bases: logging.Formatter

format(record: logging.LogRecord)[source]

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

luna.common.custom_logger.init_logger(filename='data-processing.log')[source]

pyluna-common.luna.common.dask module

exception luna.common.dask.JobExecutionError[source]

Bases: Exception

luna.common.dask.dask_job(job_name)[source]

The simplier version of a dask job decorator, which only provides the worker_client as a runner to the calling function

Examples

>>> @dask_job('my_job')
>>> my_job(args, kwargs, runner=None):
>>>     runner.submit(sleep, 10)
luna.common.dask.get_local_dask_directory()[source]
luna.common.dask.get_or_create_dask_client()[source]
luna.common.dask.prune_empty_delayed(tasks)[source]

A less-than-ideal method to prune empty tasks from dask tasks Here we’re trading CPU and time for memory.

Parameters

tasks (list) – list of delayed dask tasks

Returns

a reduced list of delayed dask tasks

Return type

list[dask.delayed]

luna.common.dask.with_event_loop(func)[source]

This method decorates functions run on dask workers with an async function call Namely, this allows us to manage the execution of a function a bit better, and especially, to exit job execution if things take too long (1hr)

Here, the function func is run in a background thread, and has access to the dask schedular through the ‘runner’. Critically, sumbission to this runner/client looks the same regardless of if it occurs in a sub-process/thread

Mostly, this is a workaround to impliment some form of timeout when running very long-tasks on dask. While one cannot (or should not) kill the running thread, Dask will cleanup the child tasks eventually once all jobs finish.

Examples

>>> @with_dask_event_loop
>>> my_job(args, kwargs, runner=None):
>>>     runner.submit(sleep, 10)

pyluna-common.luna.common.sparksession module

class luna.common.sparksession.SparkConfig[source]

Bases: object

spark_session(config_name, app_name)[source]

@:param config_name logical name of the configuration to use from the ConfigSet. See luna/common/config.py @:param app_name application name to give to the spark session. This name will be used in the spark logs.

pyluna-common.luna.common.utils module

luna.common.utils.clean_nested_colname(s)[source]

Removes map name for MapType columns. e.g. metadata.SeriesInstanceUID -> SeriesInstanceUID

luna.common.utils.does_not_contain(token, value)[source]

Validate that token is not a substring of value

Param

token: string e.g. : | .

Param

value: dictionary, list, or str

luna.common.utils.generate_uuid(path, prefix)[source]

Returns hash of the file given path, preceded by the prefix. :param path: file path e.g. file:/path/to/file :param prefix: list e.g. [“SVGEOJSON”,”default-label”] :return: string uuid

luna.common.utils.generate_uuid_binary(content, prefix)[source]

Returns hash of the binary, preceded by the prefix. :param content: binary :param prefix: list e.g. [“FEATURE”] :return: string uuid

luna.common.utils.generate_uuid_dict(json_str, prefix)[source]

Returns hash of the json string, preceded by the prefix. :param json_str: str representation of json :param prefix: list e.g. [“SVGEOJSON”,”default-label”] :return: v

luna.common.utils.get_absolute_path(module_path, relative_path)[source]

Given the path to a module file and the path, relative to the module file, of another file that needs to be referenced in the module, this method returns the absolute path of the file that needs to be referenced.

This method makes it possible to resolve absolute paths to files in any environment a module and the referenced files are deployed to.

:param module_path path to the module. Use ‘__file__’ from the module. :param relative_path path to the file that needs to be referenced by the module. The path must be relative to the module. :return absolute path to file with the specified relative_path

luna.common.utils.get_method_data(cohort_id, method_id)[source]

Return method dict

Param

cohort_id: string

Param

method_id: string

luna.common.utils.replace_token(token, token_replacement, value)[source]

Replace token with token_replacement in value

Param

token: string e.g. : | .

Param

token_replacement: string e.g. _ -

Param

value: dictionary, list, or str

luna.common.utils.to_sql_field(s)[source]
luna.common.utils.to_sql_value(s)[source]

Module contents