Source code for luna.pathology.point_annotation.proxy_table.generate


import os, json
import shutil, logging

import click
from pyspark.sql.functions import lit, udf, explode, array, to_json
from pyspark.sql.types import ArrayType, StringType, IntegerType, MapType, StructType, StructField

from luna.common.CodeTimer import CodeTimer
from luna.common.config import ConfigSet
from luna.common.custom_logger import init_logger
from luna.common.sparksession import SparkConfig
from luna.common.utils import get_absolute_path
from luna.pathology.common.slideviewer_client import fetch_slide_ids
import luna.common.constants as const

os.environ['OPENBLAS_NUM_THREADS'] = '1'


[docs]def download_point_annotation(slideviewer_url, slideviewer_path, project_id, user): """Downloads point-click nuclear annotations using slideviewer API Args: slideviewer_url (string): slideviewer base url e.g. https://slideviewer-url.com slideviewer_path (string): slide path in slideviewer project_id (string): slideviewer project id user (string): username used to create the expert annotation Returns: json: point-click nuclear annotations """ from slideviewer_client import download_sv_point_annotation print (f" >>>>>>> Processing [{slideviewer_path}] <<<<<<<<") url = slideviewer_url + "/slides/" + str(user) + "@mskcc.org/projects;" + \ str(project_id) + ';' + slideviewer_path + "/getSVGLabels/nucleus" print(url) return download_sv_point_annotation(url)
@click.command() @click.option('-d', '--data_config_file', default=None, type=click.Path(exists=True), help="path to yaml file containing data input and output parameters. " "See data_config.yaml.template") @click.option('-a', '--app_config_file', default='config.yaml', type=click.Path(exists=True), help="path to yaml file containing application runtime parameters. " "See config.yaml.template") def cli(data_config_file, app_config_file): """This module generates a parquet table of point-click nuclear annotation jsons. The configuration files are copied to your project/configs/table_name folder to persist the metadata used to generate the proxy table. INPUT PARAMETERS app_config_file - path to yaml file containing application runtime parameters. See config.yaml.template data_config_file - path to yaml file containing data input and output parameters. See data_config.yaml.template - ROOT_PATH: path to output data - DATA_TYPE: data type used in table name e.g. POINT_RAW_JSON - PROJECT: your project name. used in table path - DATASET_NAME: optional, dataset name to version your table - PROJECT_ID: Slideviewer project id - USERS: list of users that provide expert annotations for this project - SLIDEVIEWER_CSV_FILE: an optional path to a SlideViewer csv file to use that lists the names of the whole slide images and for which the regional annotation proxy table generator should download point annotations. If this field is left blank, then the regional annotation proxy table generator will download this file from SlideViewer. TABLE SCHEMA - slideviewer_path: path to original slide image in slideviewer platform - slide_id: id for the slide. synonymous with image_id - sv_project_id: same as the PROJECT_ID from data_config_file, refers to the SlideViewer project number. - sv_json: json annotation file downloaded from slideviewer. - user: username of the annotator for a given annotation - sv_json_record_uuid: hash of raw json annotation file from slideviewer, format: SVPTJSON-{json_hash} """ logger = init_logger() with CodeTimer(logger, 'generate POINT_RAW_JSON table'): logger.info('data config file: ' + data_config_file) logger.info('app config file: ' + app_config_file) # load configs cfg = ConfigSet(name=const.DATA_CFG, config_file=data_config_file) cfg = ConfigSet(name=const.APP_CFG, config_file=app_config_file) # copy app and data configuration to destination config dir config_location = const.CONFIG_LOCATION(cfg) os.makedirs(config_location, exist_ok=True) shutil.copy(app_config_file, os.path.join(config_location, "app_config.yaml")) shutil.copy(data_config_file, os.path.join(config_location, "data_config.yaml")) logger.info("config files copied to %s", config_location) create_proxy_table()
[docs]def create_proxy_table(): """Create a proxy table of point annotation json files downloaded from the SlideViewer API Each row of the table is a point annotation json created by a user for a slide. Returns: None """ cfg = ConfigSet() logger = logging.getLogger(__name__) spark = SparkConfig().spark_session(config_name=const.APP_CFG, app_name="luna.pathology.point_annotation.proxy_table.generate") # load paths from configs point_table_path = const.TABLE_LOCATION(cfg) PROJECT_ID = cfg.get_value(path=const.DATA_CFG+'::PROJECT_ID') SLIDEVIEWER_URL = cfg.get_value(path=const.DATA_CFG+'::SLIDEVIEWER_URL') # Get slide list to use # Download CSV file in the project configs dir slides = fetch_slide_ids(SLIDEVIEWER_URL, PROJECT_ID, const.CONFIG_LOCATION(cfg), cfg.get_value(path=const.DATA_CFG+'::SLIDEVIEWER_CSV_FILE')) logger.info(slides) schema = StructType([StructField("slideviewer_path", StringType()), StructField("slide_id", StringType()), StructField("sv_project_id", IntegerType()) ]) df = spark.createDataFrame(slides, schema) # populate columns df = df.withColumn("users", array([lit(user) for user in cfg.get_value(const.DATA_CFG+'::USERS')])) df = df.select("slideviewer_path", "slide_id", "sv_project_id", explode("users").alias("user")) # download slide point annotation jsons # example point json: # [{"project_id":"8","image_id":"123.svs","label_type":"nucleus","x":"1440","y":"747","class":"0","classname":"Tissue 1"},{"project_id":"8","image_id":"123.svs","label_type":"nucleus","x":"1424","y":"774","class":"3","classname":"Tissue 4"}] point_json_struct = ArrayType( MapType(StringType(), StringType()) ) spark.sparkContext.addPyFile(get_absolute_path(__file__, "../../common/slideviewer_client.py")) download_point_annotation_udf = udf(download_point_annotation, point_json_struct) df = df.withColumn("sv_json", download_point_annotation_udf(lit(SLIDEVIEWER_URL), "slideviewer_path", "sv_project_id", "user"))\ .cache() # drop empty jsons that may have been created df = df.dropna(subset=["sv_json"]) # populate "date_added", "date_updated","latest", "sv_json_record_uuid" spark.sparkContext.addPyFile(get_absolute_path(__file__, "../../common/EnsureByteContext.py")) spark.sparkContext.addPyFile(get_absolute_path(__file__, "../../common/utils.py")) from luna.common.utils import generate_uuid_dict sv_json_record_uuid_udf = udf(generate_uuid_dict, StringType()) df = df.withColumn("sv_json_record_uuid", sv_json_record_uuid_udf(to_json("sv_json"), array(lit("SVPTJSON")))) df.show(10, False) df.write.format("parquet").mode("overwrite").save(point_table_path)
if __name__ == "__main__": cli()