retuve.app.utils
1import asyncio 2import hashlib 3import json 4import os 5import secrets 6from datetime import datetime 7 8from fastapi import ( 9 APIRouter, 10 Depends, 11 Header, 12 HTTPException, 13 Request, 14 Response, 15) 16from httpx import AsyncClient 17 18from retuve.funcs import retuve_run 19 20 21def generate_token(): 22 return secrets.token_urlsafe(32) 23 24 25def consistent_hash(value, mod): 26 # Convert the input to a string and encode it 27 encoded_value = str(value).encode() 28 # Use a reproducible hash function (e.g., SHA-256) 29 hash_value = int(hashlib.sha256(encoded_value).hexdigest(), 16) 30 return hash_value % mod 31 32 33TOKEN_STORE = {} 34API_TOKEN_STORE = {} 35 36 37async def save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config): 38 # Save the DICOM file in the appropriate directory 39 dicom_path = f"{live_batchdir}/{instance_id}.dcm" 40 if dicom_content is not None: 41 # if path already exists, return 42 if os.path.exists(dicom_path): 43 return 44 with open(dicom_path, "wb") as f: 45 f.write(dicom_content) 46 47 # Process the DICOM 48 result = await asyncio.to_thread( 49 retuve_run, 50 hip_mode=config.batch.hip_mode, 51 config=config, 52 modes_func=config.batch.mode_func, 53 modes_func_kwargs_dict={}, 54 file=dicom_path, 55 ) 56 57 return result 58 59 60async def save_results(instance_id, savedir, result=None, just_paths=False): 61 """ 62 Saves DICOM content, video, image, and metrics results for a given instance ID. 63 64 :param instance_id: The unique ID of the DICOM instance. 65 :param result: The result object returned by `retuve_run`. 66 :param savedir: The base directory for saving the results. 67 :param just_paths: Whether to return just the paths of the saved results. 68 """ 69 result_dir = f"{savedir}/{instance_id}" 70 os.makedirs(result_dir, exist_ok=True) 71 72 video_path = f"{result_dir}/video.mp4" 73 img_path = f"{result_dir}/img.jpg" 74 metrics_path = f"{result_dir}/metrics.json" 75 76 if just_paths: 77 return video_path, img_path, metrics_path 78 79 # Save video result if available 80 if result.video_clip: 81 await asyncio.to_thread(result.video_clip.write_videofile, video_path) 82 83 # Save image result if available 84 if result.image: 85 await asyncio.to_thread(result.image.save, img_path) 86 87 # Save metrics if available 88 if result.metrics: 89 with open(metrics_path, "w") as f: 90 json.dump(result.metrics, f) 91 92 return video_path, img_path, metrics_path 93 94 95async def get_sorted_dicom_images( 96 orthanc_url, username=None, password=None, latest_time=None 97): 98 """ 99 Fetch and sort DICOM images from an Orthanc server based on acquisition time. 100 101 :param orthanc_url: The Orthanc server URL. 102 :param username: Username for authentication (optional). 103 :param password: Password for authentication (optional). 104 :param latest_time: Filter for acquisition times after this datetime. 105 :return: A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime). 106 """ 107 108 auth = (username, password) if username and password else None 109 images_with_dates = [] 110 latest_acq_time = latest_time or datetime.min 111 112 async with AsyncClient() as client: 113 patients_response = await client.get(f"{orthanc_url}/patients", auth=auth) 114 for patient_id in patients_response.json(): 115 studies_response = await client.get( 116 f"{orthanc_url}/patients/{patient_id}/studies", auth=auth 117 ) 118 for study in studies_response.json(): 119 study_id = study["ID"] 120 series_response = await client.get( 121 f"{orthanc_url}/studies/{study_id}/series", auth=auth 122 ) 123 for series in series_response.json(): 124 series_id = series["ID"] 125 instances_response = await client.get( 126 f"{orthanc_url}/series/{series_id}/instances", 127 auth=auth, 128 ) 129 for instance in instances_response.json(): 130 instance_id = instance["ID"] 131 metadata_response = await client.get( 132 f"{orthanc_url}/instances/{instance_id}/simplified-tags", 133 auth=auth, 134 ) 135 metadata = metadata_response.json() 136 137 # Remove files that are not multiframe US's 138 if not ( 139 metadata.get("SOPClassUID") == "1.2.840.10008.5.1.4.1.1.3.1" 140 and int(metadata.get("NumberOfFrames", 0)) > 1 141 ): 142 continue 143 144 acq_date, acq_time = metadata.get( 145 "AcquisitionDate" 146 ), metadata.get("AcquisitionTime") 147 if acq_date and acq_time: 148 acq_datetime = datetime.strptime( 149 f"{acq_date}{acq_time.split('.')[0]}", 150 "%Y%m%d%H%M%S", 151 ) 152 153 images_with_dates.append( 154 (acq_datetime, instance_id, metadata) 155 ) 156 157 final_images_with_dates = [] 158 159 if len(images_with_dates) == 0: 160 return [], latest_acq_time 161 162 for acq_datetime, instance_id, metadata in images_with_dates: 163 if acq_datetime > latest_acq_time: 164 file_response = await client.get( 165 f"{orthanc_url}/instances/{instance_id}/file", 166 auth=auth, 167 ) 168 169 # hash the main instance_id to be smaller 170 small_id = consistent_hash(instance_id, 10**8) 171 patient_id_real = metadata.get("PatientID") 172 instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}" 173 174 latest_acq_time = max(latest_acq_time, acq_datetime) 175 176 final_images_with_dates.append( 177 (acq_datetime, file_response.content, instance_id) 178 ) 179 180 images_with_dates.sort(key=lambda x: x[0]) 181 latest_image = images_with_dates[-1] 182 183 # inject file_response.content into latest_image 184 file_response = await client.get( 185 f"{orthanc_url}/instances/{latest_image[1]}/file", 186 auth=auth, 187 ) 188 189 instance_id = latest_image[1] 190 acq_datetime = latest_image[0] 191 metadata = latest_image[2] 192 small_id = consistent_hash(instance_id, 10**8) 193 patient_id_real = metadata.get("PatientID") 194 instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}" 195 196 latest_image = ( 197 acq_datetime, 198 file_response.content, 199 instance_id, 200 ) 201 202 if not final_images_with_dates: 203 final_images_with_dates = [latest_image] 204 205 return [ 206 (image, instance_id) for _, image, instance_id in final_images_with_dates 207 ], latest_acq_time 208 209 210class UnauthorizedException(Exception): 211 """Raised when the user is unauthorized and should be redirected.""" 212 213 pass 214 215 216def validate_auth_token(auth_token: str): 217 token_data = TOKEN_STORE.get(auth_token) 218 if not token_data or token_data["expires"] < datetime.utcnow(): 219 raise UnauthorizedException 220 return token_data["username"] 221 222 223def validate_api_token(api_token: str): 224 token_data = API_TOKEN_STORE.get(api_token) 225 if not token_data or token_data["expires"] < datetime.utcnow(): 226 raise HTTPException(status_code=403, detail="Invalid API token.") 227 return token_data["username"]
def
generate_token():
def
consistent_hash(value, mod):
TOKEN_STORE =
{}
API_TOKEN_STORE =
{}
async def
save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config):
38async def save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config): 39 # Save the DICOM file in the appropriate directory 40 dicom_path = f"{live_batchdir}/{instance_id}.dcm" 41 if dicom_content is not None: 42 # if path already exists, return 43 if os.path.exists(dicom_path): 44 return 45 with open(dicom_path, "wb") as f: 46 f.write(dicom_content) 47 48 # Process the DICOM 49 result = await asyncio.to_thread( 50 retuve_run, 51 hip_mode=config.batch.hip_mode, 52 config=config, 53 modes_func=config.batch.mode_func, 54 modes_func_kwargs_dict={}, 55 file=dicom_path, 56 ) 57 58 return result
async def
save_results(instance_id, savedir, result=None, just_paths=False):
61async def save_results(instance_id, savedir, result=None, just_paths=False): 62 """ 63 Saves DICOM content, video, image, and metrics results for a given instance ID. 64 65 :param instance_id: The unique ID of the DICOM instance. 66 :param result: The result object returned by `retuve_run`. 67 :param savedir: The base directory for saving the results. 68 :param just_paths: Whether to return just the paths of the saved results. 69 """ 70 result_dir = f"{savedir}/{instance_id}" 71 os.makedirs(result_dir, exist_ok=True) 72 73 video_path = f"{result_dir}/video.mp4" 74 img_path = f"{result_dir}/img.jpg" 75 metrics_path = f"{result_dir}/metrics.json" 76 77 if just_paths: 78 return video_path, img_path, metrics_path 79 80 # Save video result if available 81 if result.video_clip: 82 await asyncio.to_thread(result.video_clip.write_videofile, video_path) 83 84 # Save image result if available 85 if result.image: 86 await asyncio.to_thread(result.image.save, img_path) 87 88 # Save metrics if available 89 if result.metrics: 90 with open(metrics_path, "w") as f: 91 json.dump(result.metrics, f) 92 93 return video_path, img_path, metrics_path
Saves DICOM content, video, image, and metrics results for a given instance ID.
Parameters
- instance_id: The unique ID of the DICOM instance.
- result: The result object returned by
retuve_run
. - savedir: The base directory for saving the results.
- just_paths: Whether to return just the paths of the saved results.
async def
get_sorted_dicom_images(orthanc_url, username=None, password=None, latest_time=None):
96async def get_sorted_dicom_images( 97 orthanc_url, username=None, password=None, latest_time=None 98): 99 """ 100 Fetch and sort DICOM images from an Orthanc server based on acquisition time. 101 102 :param orthanc_url: The Orthanc server URL. 103 :param username: Username for authentication (optional). 104 :param password: Password for authentication (optional). 105 :param latest_time: Filter for acquisition times after this datetime. 106 :return: A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime). 107 """ 108 109 auth = (username, password) if username and password else None 110 images_with_dates = [] 111 latest_acq_time = latest_time or datetime.min 112 113 async with AsyncClient() as client: 114 patients_response = await client.get(f"{orthanc_url}/patients", auth=auth) 115 for patient_id in patients_response.json(): 116 studies_response = await client.get( 117 f"{orthanc_url}/patients/{patient_id}/studies", auth=auth 118 ) 119 for study in studies_response.json(): 120 study_id = study["ID"] 121 series_response = await client.get( 122 f"{orthanc_url}/studies/{study_id}/series", auth=auth 123 ) 124 for series in series_response.json(): 125 series_id = series["ID"] 126 instances_response = await client.get( 127 f"{orthanc_url}/series/{series_id}/instances", 128 auth=auth, 129 ) 130 for instance in instances_response.json(): 131 instance_id = instance["ID"] 132 metadata_response = await client.get( 133 f"{orthanc_url}/instances/{instance_id}/simplified-tags", 134 auth=auth, 135 ) 136 metadata = metadata_response.json() 137 138 # Remove files that are not multiframe US's 139 if not ( 140 metadata.get("SOPClassUID") == "1.2.840.10008.5.1.4.1.1.3.1" 141 and int(metadata.get("NumberOfFrames", 0)) > 1 142 ): 143 continue 144 145 acq_date, acq_time = metadata.get( 146 "AcquisitionDate" 147 ), metadata.get("AcquisitionTime") 148 if acq_date and acq_time: 149 acq_datetime = datetime.strptime( 150 f"{acq_date}{acq_time.split('.')[0]}", 151 "%Y%m%d%H%M%S", 152 ) 153 154 images_with_dates.append( 155 (acq_datetime, instance_id, metadata) 156 ) 157 158 final_images_with_dates = [] 159 160 if len(images_with_dates) == 0: 161 return [], latest_acq_time 162 163 for acq_datetime, instance_id, metadata in images_with_dates: 164 if acq_datetime > latest_acq_time: 165 file_response = await client.get( 166 f"{orthanc_url}/instances/{instance_id}/file", 167 auth=auth, 168 ) 169 170 # hash the main instance_id to be smaller 171 small_id = consistent_hash(instance_id, 10**8) 172 patient_id_real = metadata.get("PatientID") 173 instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}" 174 175 latest_acq_time = max(latest_acq_time, acq_datetime) 176 177 final_images_with_dates.append( 178 (acq_datetime, file_response.content, instance_id) 179 ) 180 181 images_with_dates.sort(key=lambda x: x[0]) 182 latest_image = images_with_dates[-1] 183 184 # inject file_response.content into latest_image 185 file_response = await client.get( 186 f"{orthanc_url}/instances/{latest_image[1]}/file", 187 auth=auth, 188 ) 189 190 instance_id = latest_image[1] 191 acq_datetime = latest_image[0] 192 metadata = latest_image[2] 193 small_id = consistent_hash(instance_id, 10**8) 194 patient_id_real = metadata.get("PatientID") 195 instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}" 196 197 latest_image = ( 198 acq_datetime, 199 file_response.content, 200 instance_id, 201 ) 202 203 if not final_images_with_dates: 204 final_images_with_dates = [latest_image] 205 206 return [ 207 (image, instance_id) for _, image, instance_id in final_images_with_dates 208 ], latest_acq_time
Fetch and sort DICOM images from an Orthanc server based on acquisition time.
Parameters
- orthanc_url: The Orthanc server URL.
- username: Username for authentication (optional).
- password: Password for authentication (optional).
- latest_time: Filter for acquisition times after this datetime.
Returns
A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime).
def
validate_auth_token(auth_token: str):
def
validate_api_token(api_token: str):