retuve.app.utils
1# Copyright 2024 Adam McArthur 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import asyncio 16import hashlib 17import json 18import os 19import secrets 20from datetime import datetime 21 22from fastapi import ( 23 APIRouter, 24 Depends, 25 Header, 26 HTTPException, 27 Request, 28 Response, 29) 30from httpx import AsyncClient 31 32from retuve.funcs import retuve_run 33 34 35def generate_token(): 36 return secrets.token_urlsafe(32) 37 38 39def consistent_hash(value, mod): 40 # Convert the input to a string and encode it 41 encoded_value = str(value).encode() 42 # Use a reproducible hash function (e.g., SHA-256) 43 hash_value = int(hashlib.sha256(encoded_value).hexdigest(), 16) 44 return hash_value % mod 45 46 47TOKEN_STORE = {} 48API_TOKEN_STORE = {} 49 50 51async def save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config): 52 # Save the DICOM file in the appropriate directory 53 dicom_path = f"{live_batchdir}/{instance_id}.dcm" 54 if dicom_content is not None: 55 # if path already exists, return 56 if os.path.exists(dicom_path): 57 return 58 with open(dicom_path, "wb") as f: 59 f.write(dicom_content) 60 61 # Process the DICOM 62 result = await asyncio.to_thread( 63 retuve_run, 64 hip_mode=config.batch.hip_mode, 65 config=config, 66 modes_func=config.batch.mode_func, 67 modes_func_kwargs_dict={}, 68 file=dicom_path, 69 ) 70 71 return result 72 73 74async def save_results(instance_id, savedir, result=None, just_paths=False): 75 """ 76 Saves DICOM content, video, image, and metrics results for a given instance ID. 77 78 :param instance_id: The unique ID of the DICOM instance. 79 :param result: The result object returned by `retuve_run`. 80 :param savedir: The base directory for saving the results. 81 :param just_paths: Whether to return just the paths of the saved results. 82 """ 83 result_dir = f"{savedir}/{instance_id}" 84 os.makedirs(result_dir, exist_ok=True) 85 86 video_path = f"{result_dir}/video.mp4" 87 img_path = f"{result_dir}/img.jpg" 88 metrics_path = f"{result_dir}/metrics.json" 89 90 if just_paths: 91 return video_path, img_path, metrics_path 92 93 # Save video result if available 94 if result.video_clip: 95 await asyncio.to_thread(result.video_clip.write_videofile, video_path) 96 97 # Save image result if available 98 if result.image: 99 await asyncio.to_thread(result.image.save, img_path) 100 101 # Save metrics if available 102 if result.metrics: 103 with open(metrics_path, "w") as f: 104 json.dump(result.metrics, f) 105 106 return video_path, img_path, metrics_path 107 108 109async def get_sorted_dicom_images( 110 orthanc_url, username=None, password=None, latest_time=None 111): 112 """ 113 Fetch and sort DICOM images from an Orthanc server based on acquisition time. 114 115 :param orthanc_url: The Orthanc server URL. 116 :param username: Username for authentication (optional). 117 :param password: Password for authentication (optional). 118 :param latest_time: Filter for acquisition times after this datetime. 119 :return: A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime). 120 """ 121 122 auth = (username, password) if username and password else None 123 images_with_dates = [] 124 latest_acq_time = latest_time or datetime.min 125 126 async with AsyncClient() as client: 127 patients_response = await client.get(f"{orthanc_url}/patients", auth=auth) 128 for patient_id in patients_response.json(): 129 studies_response = await client.get( 130 f"{orthanc_url}/patients/{patient_id}/studies", auth=auth 131 ) 132 for study in studies_response.json(): 133 study_id = study["ID"] 134 series_response = await client.get( 135 f"{orthanc_url}/studies/{study_id}/series", auth=auth 136 ) 137 for series in series_response.json(): 138 series_id = series["ID"] 139 instances_response = await client.get( 140 f"{orthanc_url}/series/{series_id}/instances", 141 auth=auth, 142 ) 143 for instance in instances_response.json(): 144 instance_id = instance["ID"] 145 metadata_response = await client.get( 146 f"{orthanc_url}/instances/{instance_id}/simplified-tags", 147 auth=auth, 148 ) 149 metadata = metadata_response.json() 150 151 # Remove files that are not multiframe US's 152 if not ( 153 metadata.get("SOPClassUID") == "1.2.840.10008.5.1.4.1.1.3.1" 154 and int(metadata.get("NumberOfFrames", 0)) > 1 155 ): 156 continue 157 158 acq_date, acq_time = metadata.get( 159 "AcquisitionDate" 160 ), metadata.get("AcquisitionTime") 161 if acq_date and acq_time: 162 acq_datetime = datetime.strptime( 163 f"{acq_date}{acq_time.split('.')[0]}", 164 "%Y%m%d%H%M%S", 165 ) 166 167 images_with_dates.append( 168 (acq_datetime, instance_id, metadata) 169 ) 170 171 final_images_with_dates = [] 172 173 if len(images_with_dates) == 0: 174 return [], latest_acq_time 175 176 for acq_datetime, instance_id, metadata in images_with_dates: 177 if acq_datetime > latest_acq_time: 178 file_response = await client.get( 179 f"{orthanc_url}/instances/{instance_id}/file", 180 auth=auth, 181 ) 182 183 # hash the main instance_id to be smaller 184 small_id = consistent_hash(instance_id, 10**8) 185 patient_id_real = metadata.get("PatientID") 186 instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}" 187 188 latest_acq_time = max(latest_acq_time, acq_datetime) 189 190 final_images_with_dates.append( 191 (acq_datetime, file_response.content, instance_id) 192 ) 193 194 images_with_dates.sort(key=lambda x: x[0]) 195 latest_image = images_with_dates[-1] 196 197 # inject file_response.content into latest_image 198 file_response = await client.get( 199 f"{orthanc_url}/instances/{latest_image[1]}/file", 200 auth=auth, 201 ) 202 203 instance_id = latest_image[1] 204 acq_datetime = latest_image[0] 205 metadata = latest_image[2] 206 small_id = consistent_hash(instance_id, 10**8) 207 patient_id_real = metadata.get("PatientID") 208 instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}" 209 210 latest_image = ( 211 acq_datetime, 212 file_response.content, 213 instance_id, 214 ) 215 216 if not final_images_with_dates: 217 final_images_with_dates = [latest_image] 218 219 return [ 220 (image, instance_id) for _, image, instance_id in final_images_with_dates 221 ], latest_acq_time 222 223 224class UnauthorizedException(Exception): 225 """Raised when the user is unauthorized and should be redirected.""" 226 227 pass 228 229 230def validate_auth_token(auth_token: str): 231 token_data = TOKEN_STORE.get(auth_token) 232 if not token_data or token_data["expires"] < datetime.utcnow(): 233 raise UnauthorizedException 234 return token_data["username"] 235 236 237def validate_api_token(api_token: str): 238 token_data = API_TOKEN_STORE.get(api_token) 239 if not token_data or token_data["expires"] < datetime.utcnow(): 240 raise HTTPException(status_code=403, detail="Invalid API token.") 241 return token_data["username"]
def
generate_token():
def
consistent_hash(value, mod):
TOKEN_STORE =
{}
API_TOKEN_STORE =
{}
async def
save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config):
52async def save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config): 53 # Save the DICOM file in the appropriate directory 54 dicom_path = f"{live_batchdir}/{instance_id}.dcm" 55 if dicom_content is not None: 56 # if path already exists, return 57 if os.path.exists(dicom_path): 58 return 59 with open(dicom_path, "wb") as f: 60 f.write(dicom_content) 61 62 # Process the DICOM 63 result = await asyncio.to_thread( 64 retuve_run, 65 hip_mode=config.batch.hip_mode, 66 config=config, 67 modes_func=config.batch.mode_func, 68 modes_func_kwargs_dict={}, 69 file=dicom_path, 70 ) 71 72 return result
async def
save_results(instance_id, savedir, result=None, just_paths=False):
75async def save_results(instance_id, savedir, result=None, just_paths=False): 76 """ 77 Saves DICOM content, video, image, and metrics results for a given instance ID. 78 79 :param instance_id: The unique ID of the DICOM instance. 80 :param result: The result object returned by `retuve_run`. 81 :param savedir: The base directory for saving the results. 82 :param just_paths: Whether to return just the paths of the saved results. 83 """ 84 result_dir = f"{savedir}/{instance_id}" 85 os.makedirs(result_dir, exist_ok=True) 86 87 video_path = f"{result_dir}/video.mp4" 88 img_path = f"{result_dir}/img.jpg" 89 metrics_path = f"{result_dir}/metrics.json" 90 91 if just_paths: 92 return video_path, img_path, metrics_path 93 94 # Save video result if available 95 if result.video_clip: 96 await asyncio.to_thread(result.video_clip.write_videofile, video_path) 97 98 # Save image result if available 99 if result.image: 100 await asyncio.to_thread(result.image.save, img_path) 101 102 # Save metrics if available 103 if result.metrics: 104 with open(metrics_path, "w") as f: 105 json.dump(result.metrics, f) 106 107 return video_path, img_path, metrics_path
Saves DICOM content, video, image, and metrics results for a given instance ID.
Parameters
- instance_id: The unique ID of the DICOM instance.
- result: The result object returned by
retuve_run. - savedir: The base directory for saving the results.
- just_paths: Whether to return just the paths of the saved results.
async def
get_sorted_dicom_images(orthanc_url, username=None, password=None, latest_time=None):
110async def get_sorted_dicom_images( 111 orthanc_url, username=None, password=None, latest_time=None 112): 113 """ 114 Fetch and sort DICOM images from an Orthanc server based on acquisition time. 115 116 :param orthanc_url: The Orthanc server URL. 117 :param username: Username for authentication (optional). 118 :param password: Password for authentication (optional). 119 :param latest_time: Filter for acquisition times after this datetime. 120 :return: A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime). 121 """ 122 123 auth = (username, password) if username and password else None 124 images_with_dates = [] 125 latest_acq_time = latest_time or datetime.min 126 127 async with AsyncClient() as client: 128 patients_response = await client.get(f"{orthanc_url}/patients", auth=auth) 129 for patient_id in patients_response.json(): 130 studies_response = await client.get( 131 f"{orthanc_url}/patients/{patient_id}/studies", auth=auth 132 ) 133 for study in studies_response.json(): 134 study_id = study["ID"] 135 series_response = await client.get( 136 f"{orthanc_url}/studies/{study_id}/series", auth=auth 137 ) 138 for series in series_response.json(): 139 series_id = series["ID"] 140 instances_response = await client.get( 141 f"{orthanc_url}/series/{series_id}/instances", 142 auth=auth, 143 ) 144 for instance in instances_response.json(): 145 instance_id = instance["ID"] 146 metadata_response = await client.get( 147 f"{orthanc_url}/instances/{instance_id}/simplified-tags", 148 auth=auth, 149 ) 150 metadata = metadata_response.json() 151 152 # Remove files that are not multiframe US's 153 if not ( 154 metadata.get("SOPClassUID") == "1.2.840.10008.5.1.4.1.1.3.1" 155 and int(metadata.get("NumberOfFrames", 0)) > 1 156 ): 157 continue 158 159 acq_date, acq_time = metadata.get( 160 "AcquisitionDate" 161 ), metadata.get("AcquisitionTime") 162 if acq_date and acq_time: 163 acq_datetime = datetime.strptime( 164 f"{acq_date}{acq_time.split('.')[0]}", 165 "%Y%m%d%H%M%S", 166 ) 167 168 images_with_dates.append( 169 (acq_datetime, instance_id, metadata) 170 ) 171 172 final_images_with_dates = [] 173 174 if len(images_with_dates) == 0: 175 return [], latest_acq_time 176 177 for acq_datetime, instance_id, metadata in images_with_dates: 178 if acq_datetime > latest_acq_time: 179 file_response = await client.get( 180 f"{orthanc_url}/instances/{instance_id}/file", 181 auth=auth, 182 ) 183 184 # hash the main instance_id to be smaller 185 small_id = consistent_hash(instance_id, 10**8) 186 patient_id_real = metadata.get("PatientID") 187 instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}" 188 189 latest_acq_time = max(latest_acq_time, acq_datetime) 190 191 final_images_with_dates.append( 192 (acq_datetime, file_response.content, instance_id) 193 ) 194 195 images_with_dates.sort(key=lambda x: x[0]) 196 latest_image = images_with_dates[-1] 197 198 # inject file_response.content into latest_image 199 file_response = await client.get( 200 f"{orthanc_url}/instances/{latest_image[1]}/file", 201 auth=auth, 202 ) 203 204 instance_id = latest_image[1] 205 acq_datetime = latest_image[0] 206 metadata = latest_image[2] 207 small_id = consistent_hash(instance_id, 10**8) 208 patient_id_real = metadata.get("PatientID") 209 instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}" 210 211 latest_image = ( 212 acq_datetime, 213 file_response.content, 214 instance_id, 215 ) 216 217 if not final_images_with_dates: 218 final_images_with_dates = [latest_image] 219 220 return [ 221 (image, instance_id) for _, image, instance_id in final_images_with_dates 222 ], latest_acq_time
Fetch and sort DICOM images from an Orthanc server based on acquisition time.
Parameters
- orthanc_url: The Orthanc server URL.
- username: Username for authentication (optional).
- password: Password for authentication (optional).
- latest_time: Filter for acquisition times after this datetime.
Returns
A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime).
def
validate_auth_token(auth_token: str):
def
validate_api_token(api_token: str):