Source code for luna.common.DataStore

from luna.common.Neo4jConnection import Neo4jConnection
from luna.common.Node import Node, CONTAINER_TYPES
from luna.common.config import ConfigSet

import os, socket, pathlib, logging, shutil
from minio import Minio

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed

logger = logging.getLogger(__name__)

[docs]class DataStore_v2: def __init__(self, store_location): if os.environ['LUNA_HOME']: self.params = ConfigSet(name='STORE_CFG', config_file=os.path.join(os.environ['LUNA_HOME'], 'conf', 'datastore.cfg')) \ .get_config_set("STORE_CFG") else: raise RuntimeError( "$LUNA_HOME is not set. Make sure you have set $LUNA_HOME and $LUNA_HOME/conf/datastore.cfg") logger.info(f"Configured datastore with {self.params}") self.backend = store_location os.makedirs(self.backend, exist_ok=True) logger.info(f"Datstore file backend= {self.backend}")
[docs] def ensure_datastore(self, datastore_id, datastore_type): """ :params: datastore_id - unique container ID :params: datastore_type - the type of the container """ datastore_id = str(datastore_id) if not datastore_type in CONTAINER_TYPES: logger.warning (f"DataStore type [{datastore_type}] invalid, please choose from [{CONTAINER_TYPES}]" ) return if ":" in datastore_id: logger.warning (f"Invalid datastore_id [{datastore_id}], only use alphanumeric characters") return conn = Neo4jConnection(uri=self.params['GRAPH_URI'], user=self.params['GRAPH_USER'], pwd=self.params['GRAPH_PASSWORD']) res = conn.query(f""" MERGE (datastore:globals:{datastore_type}{{qualified_address:'{datastore_id}'}}) RETURN count(datastore)""") if res[0]['count(datastore)']==1: logger.info(f"DataStore [{datastore_id}] of type [{datastore_type}] created or matched successfully!") else: logger.error("The datastore {node} could not be created or found")
def _write_to_graph_store(self, node, store_id): """ Saves the 'node' to a datastore managed in the graph DB """ try: # Configure our connection conn = Neo4jConnection(uri=self.params['GRAPH_URI'], user=self.params['GRAPH_USER'], pwd=self.params['GRAPH_PASSWORD']) res = conn.query( f""" MATCH (datastore) WHERE datastore.qualified_address = '{store_id}' MERGE (datastore)-[:HAS_DATA]->(da:{node.get_match_str()}) ON MATCH SET da = {node.get_map_str()} ON CREATE SET da = {node.get_map_str()} RETURN count(datastore)""" ) if res is None: logger.error(f"Tried adding data to {store_id}, however query failed, this data will not be available!", extra={'store_id': store_id}) return if not res[0]['count(datastore)']==1: logger.warning(f"Tried adding data to {store_id}, however datastore did not exist, this data will not be available!", extra={'store_id': store_id}) return except Exception as exc: logger.exception(f"On write, encountered {exc}, continuing...", extra={'store_id': store_id})
[docs] def get(self, store_id, namespace_id, data_type, data_tag='data', realpath=True): """ Looks up and returns the path of data given the store_id, namespace_id, data_type, and data_tag """ dest_dir = os.path.join (self.backend, store_id, namespace_id, data_type, data_tag) if not os.path.exists(dest_dir): # if realpath is true, return path to data instead of symlink location if os.path.lexists(dest_dir): if realpath: dest_dir = os.readlink(dest_dir) else: raise RuntimeWarning(f"Data not found at {dest_dir}") return dest_dir
[docs] def put(self, filepath, store_id, namespace_id, data_type, data_tag='data', metadata={}, symlink=False): """ Puts the file at filepath at the proper location given a store_id, namespace_id, data_type, and data_tag, and save metadata to DB """ dest_dir = os.path.join (self.backend, store_id, namespace_id, data_type, data_tag) if symlink: os.makedirs(pathlib.Path(dest_dir).parent, exist_ok=True) if os.path.lexists(dest_dir): os.remove(dest_dir) logger.info(f"Create symlink {dest_dir} -> {filepath}") os.symlink(filepath, dest_dir) else: os.makedirs(dest_dir, exist_ok=True) logger.info(f"Save {filepath} -> {dest_dir}") shutil.copy(filepath, dest_dir ) if self.params['GRAPH_STORE_ENABLED']: node = Node(data_type, data_tag, metadata) node.set_namespace(namespace_id, store_id) logger.info(f"Adding: {node}") self._write_to_graph_store (node, store_id) return dest_dir
[docs] def write(self, iostream, store_id, namespace_id, data_type, data_tag, metadata={}, dtype='w'): """ Writes iostream at the proper location given a store_id, namespace_id, data_type, and data_tag, and save metadata to DB """ dest_path_dir = os.path.join (store_id, namespace_id, data_type) dest_path_file = os.path.join (dest_path_dir, data_tag) dest_dir = os.path.join (self.backend, dest_path_dir) dest_file = os.path.join (self.backend, dest_path_file) os.makedirs(dest_dir, exist_ok=True) logger.info(f"Save -> {dest_file}") with open(dest_file, dtype) as fp: fp.write(iostream) if self.params['GRAPH_STORE_ENABLED']: node = Node(data_type, data_tag, metadata) node.set_namespace(namespace_id, store_id) logger.info(f"Adding: {node}") self._write_to_graph_store (node, store_id) return dest_file
[docs]def bootstrap (container_id): logger.info(f"Bootstrapping pipeline for {container_id}") return 1
[docs]class DataStore(object): """ DataStore: an abstraction with an id, name, namespace, type, and a list of associated data nodes Interfaces with a metadata store (graph DB) and raw file stores (gpfs, potentially others) Handles the matching and creation of metadata Example usage: $ container = luna.common.GraphEnum.DataStore( params ).setNamespace("test").setContainer("1.2.840...") > Connecting to: neo4j://localhost:7687 > Connection successfull: True > Running on: localhost > Lookup ID: 1.2.840... > Found: [<Record id(container)=7091 labels(container)=['scan'] container.type='scan' container.name='1.2.840...>] > Match on: WHERE id(container) = 7091 > Successfully attached to: scan 1.2.840... $ node = Node("dicom", "DCM-0123", {"Modality":"CT", "path":"file:/some/path/1.dcm"}) $ container.put(node) > Adding: test-0000 DataStore has 1 pending commits $ > Committing dicom:globals{ hash: 'abc123' name: 'my-dicom', qualified_address: 'test::1.2.840::my-dicom', namespace: 'test', type: 'dicom' , path: 'file:/some/path/1.dcm'} $ container.get("dicom", "my-dicom").path > /some/path/1.dcm $ container.get("dicom", "my-dicom").properties['Modality'] > 'CT' The container includes a logging method: $ container.logger.info("I am processing the CT") > 'yyyy-mm-dd h:m:s,ms - DataStore [1] - INFO - I am processing the CT' """ # TODO: worried about schema issues? like making sure name, namespace, type and qualified path are present, neo4j offers schema enforcment. # TODO: testing # TODO: error checking def __init__(self, params): """ Initialize the container object. Connects to the graph DB Figure out what host this code is running on :params: params - dictonary of important configuration, right now, only the graph URI connection parameters are needed. """ if isinstance(params, ConfigSet): params=params.get_config_set("APP_CFG") # Connect to graph DB logger.debug ("Connecting to: %s", params['GRAPH_URI']) self._conn = Neo4jConnection(uri=params['GRAPH_URI'], user=params['GRAPH_USER'], pwd=params['GRAPH_PASSWORD']) logger.debug ("Connection test: %s", self._conn.test_connection()) if params.get('OBJECT_STORE_ENABLED', False): logger.debug ("Connecting to: %s", params['MINIO_URI']) self._client = Minio(params['MINIO_URI'], access_key=params['MINIO_USER'], secret_key=params['MINIO_PASSWORD'], secure=False) try: for bucket in self._client.list_buckets(): logger.debug("Found bucket %s", bucket.name ) logger.debug("OBJECT_STORE_ENABLED=True") params['OBJECT_STORE_ENABLED'] = True except: logger.warning("Could not connect to object store") logger.warning("Set OBJECT_STORE_ENABLED=False") params['OBJECT_STORE_ENABLED'] = False self._host = socket.gethostname() # portable to *docker* containers logger.debug ("Running on: %s", self._host) self.params = params self._attached = False
[docs] def createNamespace(self, namespace_id: str): """ Creates a namesapce, if it doesn't exist, else, tells you it exists :params: namespace_id - namespace value """ cohort = Node("cohort", namespace_id) create_res = self._conn.query(f""" MERGE (co:{cohort.get_create_str()}) RETURN co""") if len(create_res) == 1: logger.info(f"Namespace [{namespace_id}] created successfully") return self
[docs] def setNamespace(self, namespace_id: str): """ Sets the namespace for this container's commits, if it exists :params: namespace_id - namespace value """ self._namespace_id = namespace_id self._namespace_node = Node("cohort", namespace_id) self._bucket_id = namespace_id.lower().replace('_','-') logger.debug(f"Checking if [{namespace_id}] exists...") match_res = self._conn.query(f""" MATCH (co:{self._namespace_node.get_match_str()}) RETURN co""") if not len(match_res) == 1: raise RuntimeError( f"Namespace [{namespace_id}] does not exist, call .createNamespace() first!") if self.params.get('OBJECT_STORE_ENABLED', False): if not self._client.bucket_exists(self._bucket_id): self._client.make_bucket(self._bucket_id) return self
[docs] def createDatastore(self, container_id, container_type): """ Checks if the node referenced by container_id is a valid container, queries the metastore for relevant metadata :params: container_id - unique container ID :params: type - the type of the container """ if not container_type in ['generic', 'patient', 'accession', 'scan', 'slide', 'parquet']: logger.warning (f"DataStore type [{container_type}] invalid, please choose from ['generic', 'patient', 'accession', 'scan', 'slide', 'parquet']" ) if ":" in container_id: logger.warning (f"Invalid container_id [{container_id}], only use alphanumeric characters") node = Node(container_type, container_id) node.set_namespace( self._namespace_id ) create_res = self._conn.query(f""" MERGE (container:{node.get_create_str()}) RETURN container""") if len(create_res)==1: logger.info(f"DataStore [{container_id}] of type [{container_type}] created or matched successfully!") else: logger.error("The container does not exists") return self
[docs] def setDatastore(self, container_id): """ Checks if the node referenced by container_id is a valid datastore, queries the metastore for relevant metadata :params: container_id - the unique container ID, either as an integer (neo4j autopopulated ID) or as a string (the Qualified Path) """ self._attached = False logger.info ("Lookup ID: %s", container_id) # Figure out how to match the node if isinstance(container_id, str) and not "uid://" in container_id: node = Node("generic", container_id) node.set_namespace( self._namespace_id ) print (node.get_address()) match_clause = f"""WHERE container.qualified_address = '{node.get_address()}' """ elif isinstance(container_id, str) and "uid://" in container_id: match_clause = f"""WHERE id(container) = {container_id.replace('uid://', '')} """ elif isinstance(container_id, int): match_clause = f"""WHERE id(container) = {container_id} """ else: raise RuntimeError("Invalid container_id type not (str, int)") # Run query res = self._conn.query(f""" MATCH (container) {match_clause} RETURN id(container), labels(container), container.type, container.name, container.namespace, container.qualified_address""" ) # Check if the results are singleton (they should be... since we only query unique IDs!!!) if res is None or len(res) == 0: logger.warning (f"DataStore [{container_id}] does not exist, you can try creating it first with createContainer()") return self # Set some potentially import parameters self._datastore_id = res[0]["id(container)"] self._name = res[0]["container.name"] self._qualifiedpath = res[0]["container.qualified_address"] self._type = res[0]["container.type"] self._labels = res[0]["labels(container)"] self.address = res[0]["container.qualified_address"] # Containers need to have a qualified path if self._qualifiedpath is None: logger.warning ("Found, however not valid container object, containers must have a name, namespace, and qualified path") return self # Let us know attaching was a success! :) logger.info ("Successfully attached to %s container id=%s @ %s", self._type, self._datastore_id, self.address) self._attached = True return self
[docs] def isAttached(self): """ Returns true if container was properly attached (i.e. checks in setDatastore succeeded), else False """ logger.debug ("Attached: %s", self._attached) return self._attached
[docs] def get(self, type, name): """ Query graph DB container node for dependent data nodes, and return one Parses the path field URL for various cases, and sets the node.data an node.aux attribute with a corrected path Note: namespace is not a default filter for get nodes, but is for adding them (i.e., one can write data under a different namespace) :params: type - the type of data designed e.g. radiomics, mha, dicom, png, svs, geojson, etc. :params: name - can be used to filter nodes e.g. name of the node in the subspace of the container (e.g. generate-mhd) :example: get("mhd", "generate-mhd") gets data of type "mhd" generated from the method "generate-mhd" in this container's context/subspace """ assert self.isAttached() query = f"""MATCH (container)-[:HAS_DATA]-(data:{type}) WHERE id(container) = {self._datastore_id} AND data.name='{type}-{name}' AND data.namespace='{self._namespace_id}' RETURN data""" logger.debug(query) res = self._conn.query(query) # Catches bad queries # If successfull query, reconstruct a Node object if res is None: logger.warning(f"get() query failed, data.name='{type}-{name}' returning None") return None elif len(res) == 0: logger.warning(f"get() found no nodes, data.name='{type}-{name}' returning None") return None elif len(res) > 1: logger.warning(f"get() found many nodes (?), data.name='{type}-{name}' returning None") return None else: node = Node(res[0]['data']['type'], res[0]['data']['name'], dict(res[0]['data'].items())) logger.debug ("Query Successful:") logger.debug (node) node.set_data(node.properties.get('data', None)) node.set_aux (node.properties.get('aux', None)) return node
[docs] @staticmethod def run(namespace, container_id, pipeline): """ Runner for pipelined jobs """ for func in pipeline: module = func[0] params = func[1] module (cohort_id=namespace, container_id=container_id, method_data=params)
[docs] def runLocal(self, pipeline): """ Run a pipeline in the main thread, blocking. :params: pipeline - an ordered list of (function, params) tuples to execute """ self.run (self._namespace_id, self._name, pipeline)
[docs] def runProcessPoolExecutor(self, pipeline, executor): """ Use a process pool executor to run full pipelines in background :params: pipeline - an ordered list of (function, params) tuples to execute :params: executor - a ProcessPoolExecutor passed from a parent script """ assert isinstance(executor, ProcessPoolExecutor) return executor.submit(self.run, self._namespace_id, self._name, pipeline)
[docs] def runDaskDistributed(self, pipeline, client): """ Submit functions to dask workers. Dask can track dependencies via a semaphore future, so we pass that explicitly and submit each function individually :params: pipeline - an ordered list of (function, params) tuples to execute :params: client - a dask client """ from dask.distributed import Client assert isinstance(client, Client) future = client.submit (bootstrap, self._name) for func in pipeline: module = func[0] params = func[1] future = client.submit (module, self._namespace_id, self._name, params, semaphore=future) return future
[docs] def put(self, node: Node): """ Adds a node to a temporary dictonary that will be used to save/commit nodes to the relevant databases If you add the same node under the same name, no change as the Decorates the node with the container's namespace :param: node - node object """ assert isinstance(node, Node) assert self.isAttached() logger.info(f"Adding node: {node}") # Decorate with the container namespace node.set_namespace( self._namespace_id, self._name ) node._datastore_id = self._datastore_id # Set node data object(s) only if there is a path and the object store is enabled node.objects = [] if node.data is not None and self.params.get("OBJECT_STORE_ENABLED", False): node.properties['object_bucket'] = f"{self._bucket_id}" node.properties['object_folder'] = f"{self._name}/{node.name}" data_path = pathlib.Path( node.data ) if data_path.is_file(): node.objects.append( data_path ) if data_path.is_dir(): # TODO: enable extention in glob via something? for path in data_path.glob("*.*"): node.objects.append(path) logger.info ("Node has %s pending object commits", len(node.objects)) # Set node aux object only if a path and the object store is enabled if node.aux is not None and self.params.get("OBJECT_STORE_ENABLED", False): node.properties['object_bucket'] = f"{self._bucket_id}" node.properties['object_folder'] = f"{self._name}/{node.name}" node.objects.append( pathlib.Path( node.aux )) logger.info ("Node has %s pending object commits", len(node.objects)) # Add to node commit dictonary logger.info ("Adding: %s", node.get_address()) self._conn.query( f""" MATCH (container) WHERE id(container) = {node._datastore_id} MERGE (container)-[:HAS_DATA]->(da:{node.get_match_str()}) ON MATCH SET da = {node.get_map_str()} ON CREATE SET da = {node.get_map_str()} """ ) if self.params.get("OBJECT_STORE_ENABLED", False): future_uploads = [] executor = ThreadPoolExecutor(max_workers=4) object_bucket = node.properties.get("object_bucket") object_folder = node.properties.get("object_folder") for p in node.objects: future = executor.submit(self._client.fput_object, object_bucket, f"{object_folder}/{p.name}", p, part_size=250000000) future_uploads.append(future) n_count_futures = 0 n_total_futures = len (future_uploads) for future in as_completed(future_uploads): try: data = future.result() except: logger.exception('Bad upload: generated an exception:') else: n_count_futures += 1 if n_count_futures < 10: logger.info("Upload successful with etag: %s", data[0]) if n_count_futures < 1000 and n_count_futures % 100 == 0: logger.info("Uploaded [%s/%s]", n_count_futures, n_total_futures) if n_count_futures % 1000 == 0: logger.info("Uploaded [%s/%s]", n_count_futures, n_total_futures) logger.info("Uploaded [%s/%s]", n_count_futures, n_total_futures) logger.info("Shutdown executor %s", executor) executor.shutdown() logger.info("Done saving all records!!")
[docs] def add(self, *args): logger.warning ("Datastore.add() has been depreciated")
[docs] def saveAll(self, *args): logger.warning ("Datastore.saveAll() has been depreciated")