Source code for luna.common.Neo4jConnection

from neo4j import GraphDatabase
from neo4j import __version__ as neo4j_version

from pyspark.sql.types import StringType,StructType,StructField
import re

[docs]def pretty_path(path): to_print = '' for i,x in enumerate(path): if type(x)==dict: node_desc = '(' + ','.join([key+":"+x[key] for key in x.keys()]) + ')' if i==0: to_print += "SOURCE:" + node_desc # First elif i==(len(path)-1): to_print += "SINK:" + node_desc # Last else: to_print += node_desc # Middle if type(x)==str: to_print += '-[' + x + ']-' return to_print
[docs]class Neo4jConnection: def __init__(self, uri, user, pwd): self.__uri = uri self.__user = user self.__pwd = pwd self.__driver = None try: self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd)) except Exception as ex: print("Failed to create the driver: ", ex)
[docs] def close(self): if self.__driver is not None: self.__driver.close()
[docs] def query(self, query, db=None): """ Runs a cyper query against the initalized driver """ assert self.__driver is not None, "Driver not initialized!" session = None response = None try: session = self.__driver.session(database=db) if db is not None else self.__driver.session() response = list(session.run(query)) except Exception as e: print("Query failed:", e) finally: if session is not None: session.close() return response
[docs] def test_connection(self): try: self.__driver.session().run("MATCH () RETURN 1 LIMIT 1") return True except Exception: return False
# ========================================================================================================== # Simple methods
[docs] def test_count(self): """ Get node(s) given a value input """ result = self.query(f""" MATCH (n) RETURN n """ ) node_count = len(result) print (f"Successfully connected to {node_count} nodes!")
[docs] def match_concept_node(self, QUERY_ID): """ Get node(s) given a value input """ result = self.query(f""" MATCH (concept_node) WHERE concept_node.value = '{QUERY_ID}' RETURN concept_node""" ) return [(x.data(),) for x in result]
# ========================================================================================================== # This should become the main helper method # Leaving others in for now for backwards compatability
[docs] def create_id_lookup_table_where(self, sqlc, source, sink, r="ID_LINK|HAS_RECORD", WHERE_CLAUSE=""): """ Main method for creating lookup tables in a general manner. Args: source: what ID type you are starting with (e.g. a dmp_patient_id) sink: what ID you wish to map to (e.g. a SeriesInstanceUID) r: optional. Allowed relationship types. Default is ID_LINK|HAS_RECORD to contain mapping within a patient subgraph WHERE_CLAUSE: optional. source, sink (as nodes) and r (as relationships) become availabe for filtering in a WHERE clause Returns: a dataframe with three columns, [ source | sink | pathspec ] where pathspec is a text-based representation of the path between the source and sink IDs """ banned_words = ['delete', 'merge', 'create', 'set', 'remove'] if re.compile('|'.join(banned_words),re.IGNORECASE).search(WHERE_CLAUSE): #re.IGNORECASE is used to ignore case raise Exception("You tried to alter the database, goodbye") return print (f""">>> QUERY >>> \n\tMATCH (source:{source})-[r:{r}*]-(sink:{sink}), \n\tpath=shortestPath( (source)-[:{r}*..15]-(sink) ) \n\t{WHERE_CLAUSE} RETURN DISTINCT source,sink,path""") result = self.query(f""" MATCH (source:{source})-[r:{r}*]-(sink:{sink}), path=shortestPath( (source)-[:{r}*..15]-(sink) ) \ {WHERE_CLAUSE} \ RETURN DISTINCT source,sink,path """ ) if result is None: print ("Improper query returning null") return None cSchema = StructType([StructField(source, StringType(), True), StructField(sink, StringType(), True), StructField("pathspec", StringType(), True)]) return sqlc.createDataFrame(([(x.data()['source']['value'],x.data()['sink']['value'], pretty_path(x.data()['path'])) for x in result]),schema=cSchema)
# ========================================================================================================== # >>>>>>>> To depreciate over time <<<<<<<<
[docs] def commute_source_id_to_spark(self, sc, sqlc, SOURCE_TYPE, SINK_TYPE, QUERY_ID ): """ Spark connector for an input source id Returns a dataframe with one column named the sink/target ID """ result = self.query(f""" MATCH (source:{SOURCE_TYPE})-[:ID_LINK*]-(sink:{SINK_TYPE}) \ WHERE source.value = '{QUERY_ID}' \ RETURN source,sink """ ) cSchema = StructType([StructField(SINK_TYPE, StringType(), True)]) return sqlc.createDataFrame(sc.parallelize([(x.data()['sink']['value'],) for x in result]),schema=cSchema)
[docs] def commute_source_id_to_spark_query(self, sc, sqlc, WHERE_CLAUSE, SINK_TYPE ): """ Spark connector for an input source id Returns a dataframe with one column named the sink/target ID """ result = self.query(f""" MATCH path=(source)-[r*]-(sink:{SINK_TYPE}) \ {WHERE_CLAUSE} \ RETURN source,sink,path """ ) for x in result: print (pretty_path(x.data()['path'])) cSchema = StructType([StructField(SINK_TYPE, StringType(), True)]) return sqlc.createDataFrame(([(x.data()['sink']['value'],) for x in result]),schema=cSchema)
[docs] def create_id_lookup_table(self, sc, sqlc, SOURCE_TYPE, SINK_TYPE, QUERY_ID ): """ Spark connector for an input source id Returns a dataframe with two ID columns as specified by SOURCE_TYPE and SINK_TYPE """ result = self.query(f""" MATCH (source:{SOURCE_TYPE})-[:ID_LINK|HAS_RECORD*]-(sink:{SINK_TYPE}) \ WHERE source.value = '{QUERY_ID}' \ RETURN source,sink """ ) cSchema = StructType([StructField(SOURCE_TYPE, StringType(), True), StructField(SINK_TYPE, StringType(), True)]) return sqlc.createDataFrame(sc.parallelize([(x.data()['source']['value'],x.data()['sink']['value']) for x in result]),schema=cSchema)
[docs] def commute_cohort_id_to_spark (self, sc, sqlc, SOURCE_TYPE, SINK_TYPE, QUERY_ID): """ Spark connector for an input cohort id Returns a dataframe with one column named the sink/target ID """ result = self.query(f""" MATCH (source:{SOURCE_TYPE})-[:COHORT_LINK*1..1]-(entry_node) WHERE source.value = '{QUERY_ID}' \ MATCH (entry_node)-[:ID_LINK*]-(sink:{SINK_TYPE}) RETURN source,sink """ ) cSchema = StructType([StructField(SINK_TYPE, StringType(), True)]) return sqlc.createDataFrame(sc.parallelize([(x.data()['sink']['value'],) for x in result]),schema=cSchema)
[docs] def commute_record_id_to_spark (self, sc, sqlc, SOURCE_TYPE, SINK_TYPE): """ Spark connector for an input cohort id Returns a dataframe with one column named the sink/target ID """ result = self.query(f""" MATCH (source:{SOURCE_TYPE})-[:HAS_RECORD*1..1]-(sink:{SINK_TYPE}) RETURN source,sink """ ) cSchema = StructType([StructField(SINK_TYPE, StringType(), True)]) return sqlc.createDataFrame(sc.parallelize([(x.data()['sink']['value'],) for x in result]),schema=cSchema)
[docs] def commute_sink_id_to_spark (self, sc, sqlc, SINK_TYPE, QUERY_ID): """ A pass through function for consistency """ cSchema = StructType([StructField(SINK_TYPE, StringType(), True)]) return sqlc.createDataFrame(sc.parallelize([(x,) for x in [QUERY_ID]]),schema=cSchema)
[docs] def commute_all_sink_id(self, sc, sqlc, SINK_TYPE): """ Spark connector for an input source id Returns a dataframe with one column named the sink/target ID """ result = self.query(f""" MATCH (sink:{SINK_TYPE}) \ RETURN sink """ ) cSchema = StructType([StructField(SINK_TYPE, StringType(), True)]) return sqlc.createDataFrame(sc.parallelize([(x.data()['sink']['value'],) for x in result]),schema=cSchema)