pyluna-common.luna.common package
Contents
pyluna-common.luna.common package#
Submodules#
pyluna-common.luna.common.CodeTimer module#
Created on November 02, 2020
@author: pashaa@mskcc.org
pyluna-common.luna.common.DataStore module#
- class luna.common.DataStore.DataStore(params)[source]#
Bases:
objectDataStore: an abstraction with an id, name, namespace, type, and a list of associated data nodes
Interfaces with a metadata store (graph DB) and raw file stores (gpfs, potentially others)
Handles the matching and creation of metadata
Example usage:
$ container = luna.common.GraphEnum.DataStore( params ).setNamespace(“test”).setContainer(“1.2.840…”) > Connecting to: neo4j://localhost:7687 > Connection successfull: True > Running on: localhost > Lookup ID: 1.2.840… > Found: [<Record id(container)=7091 labels(container)=[‘scan’] container.type=’scan’ container.name=’1.2.840…>] > Match on: WHERE id(container) = 7091 > Successfully attached to: scan 1.2.840…
$ node = Node(“dicom”, “DCM-0123”, {“Modality”:”CT”, “path”:”file:/some/path/1.dcm”})
$ container.put(node) > Adding: test-0000 DataStore has 1 pending commits
$ > Committing dicom:globals{ hash: ‘abc123’ name: ‘my-dicom’, qualified_address: ‘test::1.2.840::my-dicom’, namespace: ‘test’, type: ‘dicom’ , path: ‘file:/some/path/1.dcm’}
$ container.get(“dicom”, “my-dicom”).path > /some/path/1.dcm
$ container.get(“dicom”, “my-dicom”).properties[‘Modality’] > ‘CT’
The container includes a logging method: $ container.logger.info(“I am processing the CT”) > ‘yyyy-mm-dd h:m:s,ms - DataStore [1] - INFO - I am processing the CT’
- createDatastore(container_id, container_type)[source]#
Checks if the node referenced by container_id is a valid container, queries the metastore for relevant metadata
- Params
container_id - unique container ID
- Params
type - the type of the container
- createNamespace(namespace_id: str)[source]#
Creates a namesapce, if it doesn’t exist, else, tells you it exists
- Params
namespace_id - namespace value
- get(type, name)[source]#
Query graph DB container node for dependent data nodes, and return one Parses the path field URL for various cases, and sets the node.data an node.aux attribute with a corrected path Note: namespace is not a default filter for get nodes, but is for adding them (i.e., one can write data under a different namespace)
- Params
type - the type of data designed e.g. radiomics, mha, dicom, png, svs, geojson, etc.
- Params
name - can be used to filter nodes e.g. name of the node in the subspace of the container (e.g. generate-mhd)
- Example
get(“mhd”, “generate-mhd”) gets data of type “mhd” generated from the method “generate-mhd” in this container’s context/subspace
- isAttached()[source]#
Returns true if container was properly attached (i.e. checks in setDatastore succeeded), else False
- put(node: luna.common.Node.Node)[source]#
Adds a node to a temporary dictonary that will be used to save/commit nodes to the relevant databases If you add the same node under the same name, no change as the Decorates the node with the container’s namespace
- Param
node - node object
- runDaskDistributed(pipeline, client)[source]#
Submit functions to dask workers. Dask can track dependencies via a semaphore future, so we pass that explicitly and submit each function individually
- Params
pipeline - an ordered list of (function, params) tuples to execute
- Params
client - a dask client
- runLocal(pipeline)[source]#
Run a pipeline in the main thread, blocking.
- Params
pipeline - an ordered list of (function, params) tuples to execute
- runProcessPoolExecutor(pipeline, executor)[source]#
Use a process pool executor to run full pipelines in background
- Params
pipeline - an ordered list of (function, params) tuples to execute
- Params
executor - a ProcessPoolExecutor passed from a parent script
- class luna.common.DataStore.DataStore_v2(store_location)[source]#
Bases:
object- ensure_datastore(datastore_id, datastore_type)[source]#
- Params
datastore_id - unique container ID
- Params
datastore_type - the type of the container
- get(store_id, namespace_id, data_type, data_tag='data', realpath=True)[source]#
Looks up and returns the path of data given the store_id, namespace_id, data_type, and data_tag
pyluna-common.luna.common.GraphEnum module#
pyluna-common.luna.common.Neo4jConnection module#
- class luna.common.Neo4jConnection.Neo4jConnection(uri, user, pwd)[source]#
Bases:
object- commute_all_sink_id(sc, sqlc, SINK_TYPE)[source]#
Spark connector for an input source id Returns a dataframe with one column named the sink/target ID
- commute_cohort_id_to_spark(sc, sqlc, SOURCE_TYPE, SINK_TYPE, QUERY_ID)[source]#
Spark connector for an input cohort id Returns a dataframe with one column named the sink/target ID
- commute_record_id_to_spark(sc, sqlc, SOURCE_TYPE, SINK_TYPE)[source]#
Spark connector for an input cohort id Returns a dataframe with one column named the sink/target ID
- commute_sink_id_to_spark(sc, sqlc, SINK_TYPE, QUERY_ID)[source]#
A pass through function for consistency
- commute_source_id_to_spark(sc, sqlc, SOURCE_TYPE, SINK_TYPE, QUERY_ID)[source]#
Spark connector for an input source id Returns a dataframe with one column named the sink/target ID
- commute_source_id_to_spark_query(sc, sqlc, WHERE_CLAUSE, SINK_TYPE)[source]#
Spark connector for an input source id Returns a dataframe with one column named the sink/target ID
- create_id_lookup_table(sc, sqlc, SOURCE_TYPE, SINK_TYPE, QUERY_ID)[source]#
Spark connector for an input source id Returns a dataframe with two ID columns as specified by SOURCE_TYPE and SINK_TYPE
- create_id_lookup_table_where(sqlc, source, sink, r='ID_LINK|HAS_RECORD', WHERE_CLAUSE='')[source]#
Main method for creating lookup tables in a general manner.
- Parameters
source – what ID type you are starting with (e.g. a dmp_patient_id) sink: what ID you wish to map to (e.g. a SeriesInstanceUID)
r – optional. Allowed relationship types. Default is ID_LINK|HAS_RECORD to contain mapping within a patient subgraph WHERE_CLAUSE: optional. source, sink (as nodes) and r (as relationships) become availabe for filtering in a WHERE clause
- Returns
a dataframe with three columns, [ source | sink | pathspec ] where pathspec is a text-based representation of the path between the source and sink IDs
pyluna-common.luna.common.Node module#
- class luna.common.Node.Node(node_type, node_name, properties=None)[source]#
Bases:
objectNode object defines the type and attributes of a graph node.
- Param
node_type: node type. e.g. scan
- Param
name: required node name. e.g. scan-123
- Param
properties: dict of key, value pairs for the node.
- get_all_props()[source]#
Name is a required field, but it’s still a property of this node. Return the properties as a dict including the name property!
- get_match_str()[source]#
Returns a string representation of the node with only the qualified_address as a property
- static prop_str(fields, row)[source]#
Returns a kv string like ‘id: 123, …’ where prop values come from row.
pyluna-common.luna.common.PipelineBuilder module#
pyluna-common.luna.common.config module#
Created on October 17, 2019
@author: pashaa@mskcc.org
- class luna.common.config.ConfigSet(name=None, config_file=None, schema_file=None)[source]#
Bases:
objectThis is a singleton class that can load a collection of configurations from yaml files.
ConfigSet loads configurations from yaml files only once on first invocation of this class with the specified yaml file. The class then maintains the configuration in memory in a singleton instance. All new invocations of this class will serve up the same configuration.
Each configuration in the collection is identified by a logical name.
If a new invocation of this class is created with an existing logical name and a different yaml file, the singleton instance replaces the existing configuration with the newly specified yaml file for the given logical name.
- get_config_set(name)[source]#
- Parameters
name – logical name of the configuration
- Returns
a dictonary of top-level keys in the config stored in this instance.
- Raises
ValueError if a configuration with the specified name was never loaded
- get_keys(name)[source]#
- Parameters
name – logical name of the configuration
- Returns
a list of top-level keys in the config stored in this instance.
- Raises
ValueError if a configuration with the specified name was never loaded
- get_value(path)[source]#
Gets the value for the specified jsonpath from the specified configuration.
- Parameters
path (str) – path to a value in a configuration. The path must be of the form “name::jsonpath”
value. (where name is the logical name of the configuration and jsonpath is the jsonpath to) –
https (jsonpath expressions may be tested here -) – //pypi.org/project/jsonpath-ng/
https – //jsonpath.com/
- Returns
value from config file
- Return type
str
- Raises
ValueError – if no match is found for the specified exception or a configuration with
the specified name was never loaded –
- has_value(path)[source]#
- Parameters
path (str) – path to a value in a configuration. The path must be of the form
value. ("name::jsonpath" where name is the logical name of the configuration and jsonpath is the jsonpath to) –
https (may be tested here -) – //pypi.org/project/jsonpath-ng/ jsonpath expressions
https – //jsonpath.com/
- Returns
true if value is not an empty string, else false.
- Return type
boolean
- Raises
ValueError – if a configuration with the specified name was never loaded
pyluna-common.luna.common.constants module#
Created on November 16, 2020
@author: rosed2@mskcc.org
pyluna-common.luna.common.custom_logger module#
- class luna.common.custom_logger.MultilineFormatter(fmt=None, datefmt=None, style='%')[source]#
Bases:
logging.Formatter- format(record: logging.LogRecord)[source]#
Format the specified record as text.
The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.
pyluna-common.luna.common.dask module#
- luna.common.dask.dask_job(job_name)[source]#
The simplier version of a dask job decorator, which only provides the worker_client as a runner to the calling function
Examples
>>> @dask_job('my_job') >>> my_job(args, kwargs, runner=None): >>> runner.submit(sleep, 10)
- luna.common.dask.prune_empty_delayed(tasks)[source]#
A less-than-ideal method to prune empty tasks from dask tasks Here we’re trading CPU and time for memory.
- Parameters
tasks (list) – list of delayed dask tasks
- Returns
a reduced list of delayed dask tasks
- Return type
list[dask.delayed]
- luna.common.dask.with_event_loop(func)[source]#
This method decorates functions run on dask workers with an async function call Namely, this allows us to manage the execution of a function a bit better, and especially, to exit job execution if things take too long (1hr)
Here, the function func is run in a background thread, and has access to the dask schedular through the ‘runner’. Critically, sumbission to this runner/client looks the same regardless of if it occurs in a sub-process/thread
Mostly, this is a workaround to impliment some form of timeout when running very long-tasks on dask. While one cannot (or should not) kill the running thread, Dask will cleanup the child tasks eventually once all jobs finish.
Examples
>>> @with_dask_event_loop >>> my_job(args, kwargs, runner=None): >>> runner.submit(sleep, 10)
pyluna-common.luna.common.sparksession module#
pyluna-common.luna.common.utils module#
- luna.common.utils.apply_csv_filter(input_paths, subset_csv=None)[source]#
Filteres a list of input_paths based on include/exclude logic given for either the full path, filename, or filestem
If using “include” logic, only matching entries with include=True are kept. If using “exclude” logic, only matching entries with exclude=True are removed.
The origional list is returned if the given subset_csv is None or empty
- Parameters
input_paths (list[str]) – list of input paths to filter
subset_csv (str) – path to a csv with subset/filter information/flags
- Returns
list[str]: filtered list
- Raises
RuntimeError – If the given subset_csv is invalid
- luna.common.utils.clean_nested_colname(s)[source]#
Removes map name for MapType columns. e.g. metadata.SeriesInstanceUID -> SeriesInstanceUID
- luna.common.utils.cli_runner(cli_kwargs: dict, cli_params: List[tuple], cli_function: Callable[[...], dict], pass_keys: bool = False)[source]#
For special input_* parameters, see if we should infer the input given an output/result directory
- Parameters
cli_kwargs (dict) – keyword arguments from the CLI call
cli_params (List[tuple]) – param list, where each element is the parameter (name, type)
cli_function (Callable[..., dict]) – cli_function entry point, should accept exactly the arguments given by cli_params
pass_keys (bool) – will pass found segment keys to transform function as ‘keys’ kwarg
- Returns
None
- luna.common.utils.does_not_contain(token, value)[source]#
Validate that token is not a substring of value
- Param
token: string e.g. : | .
- Param
value: dictionary, list, or str
- luna.common.utils.expand_inputs(given_params: dict)[source]#
For special input_* parameters, see if we should infer the input given an output/result directory
- Parameters
given_params (dict) – keyword arguments to check types
- Returns
Input- expanded keyword argument dictonary
- Return type
dict
- luna.common.utils.generate_uuid(path, prefix)[source]#
Returns hash of the file given path, preceded by the prefix. :param path: file path e.g. file:/path/to/file :param prefix: list e.g. [“SVGEOJSON”,”default-label”] :return: string uuid
- luna.common.utils.generate_uuid_binary(content, prefix)[source]#
Returns hash of the binary, preceded by the prefix. :param content: binary :param prefix: list e.g. [“FEATURE”] :return: string uuid
- luna.common.utils.generate_uuid_dict(json_str, prefix)[source]#
Returns hash of the json string, preceded by the prefix. :param json_str: str representation of json :param prefix: list e.g. [“SVGEOJSON”,”default-label”] :return: v
- luna.common.utils.get_absolute_path(module_path, relative_path)[source]#
Given the path to a module file and the path, relative to the module file, of another file that needs to be referenced in the module, this method returns the absolute path of the file that needs to be referenced.
This method makes it possible to resolve absolute paths to files in any environment a module and the referenced files are deployed to.
:param module_path path to the module. Use ‘__file__’ from the module. :param relative_path path to the file that needs to be referenced by the module. The path must be relative to the module. :return absolute path to file with the specified relative_path
- luna.common.utils.get_dataset_url()[source]#
Retrieve a “dataset URL” from the environment, may look like http://localhost:6077 or file:///absolute/path/to/dataset/dir
- luna.common.utils.get_method_data(cohort_id, method_id)[source]#
Return method dict
- Param
cohort_id: string
- Param
method_id: string
- luna.common.utils.grouper(iterable, n)[source]#
Turn an iterable into an iterable of iterables
‘None’ should not be a member of the input iterable as it is removed to handle the fillvalues
- Parameters
iterable (iterable) – an iterable
n (int) – sie of chunks
fillvalue –
- Returns
iterable[iterable]
- luna.common.utils.load_func(dotpath: str)[source]#
load function in module from a parsed yaml string
- Parameters
dotpath (str) – module/function name written as a string (ie torchvision.models.resnet34)
- Returns
The inferred module itself, not the string representation
- luna.common.utils.post_to_dataset(input_feature_data, waystation_url, dataset_id, keys)[source]#
Interface feature data to a parquet dataset
- Parameters
input_feature_data (str) – path to input data
waystation_url (str) – URL of dataset root (either file or using waystation)
dataset_id (str) – Dataset name/ID
keys (dict) – corresponding segment keys
- luna.common.utils.rebase_schema_numeric(df)[source]#
Tries to convert all columns in a dataframe to numeric types, if possible, with integer types taking precident
Note: this is an in-place operation
- Parameters
df (pd.DataFrame) – dataframe to convert columns
- luna.common.utils.replace_token(token, token_replacement, value)[source]#
Replace token with token_replacement in value
- Param
token: string e.g. : | .
- Param
token_replacement: string e.g. _ -
- Param
value: dictionary, list, or str
- luna.common.utils.validate_params(given_params: dict, params_list: List[tuple])[source]#
Ensure that a dictonary of params or keyword arguments is correct given a parameter list
Checks that neccessary parameters exist, and that their type can be casted corretly. There’s special logic for list and dictonary types. JSON arguments are parsed as dict types.
- Parameters
given_params (dict) – keyword arguments to check types
params_list (List[tuple]) – param list, where each element is the parameter (param, type)
- Returns
Validated and casted keyword argument dictonary
- Return type
dict