retuve.app.utils

  1import asyncio
  2import hashlib
  3import json
  4import os
  5import secrets
  6from datetime import datetime
  7
  8from fastapi import (
  9    APIRouter,
 10    Depends,
 11    Header,
 12    HTTPException,
 13    Request,
 14    Response,
 15)
 16from httpx import AsyncClient
 17
 18from retuve.funcs import retuve_run
 19
 20
 21def generate_token():
 22    return secrets.token_urlsafe(32)
 23
 24
 25def consistent_hash(value, mod):
 26    # Convert the input to a string and encode it
 27    encoded_value = str(value).encode()
 28    # Use a reproducible hash function (e.g., SHA-256)
 29    hash_value = int(hashlib.sha256(encoded_value).hexdigest(), 16)
 30    return hash_value % mod
 31
 32
 33TOKEN_STORE = {}
 34API_TOKEN_STORE = {}
 35
 36
 37async def save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config):
 38    # Save the DICOM file in the appropriate directory
 39    dicom_path = f"{live_batchdir}/{instance_id}.dcm"
 40    if dicom_content is not None:
 41        # if path already exists, return
 42        if os.path.exists(dicom_path):
 43            return
 44        with open(dicom_path, "wb") as f:
 45            f.write(dicom_content)
 46
 47    # Process the DICOM
 48    result = await asyncio.to_thread(
 49        retuve_run,
 50        hip_mode=config.batch.hip_mode,
 51        config=config,
 52        modes_func=config.batch.mode_func,
 53        modes_func_kwargs_dict={},
 54        file=dicom_path,
 55    )
 56
 57    return result
 58
 59
 60async def save_results(instance_id, savedir, result=None, just_paths=False):
 61    """
 62    Saves DICOM content, video, image, and metrics results for a given instance ID.
 63
 64    :param instance_id: The unique ID of the DICOM instance.
 65    :param result: The result object returned by `retuve_run`.
 66    :param savedir: The base directory for saving the results.
 67    :param just_paths: Whether to return just the paths of the saved results.
 68    """
 69    result_dir = f"{savedir}/{instance_id}"
 70    os.makedirs(result_dir, exist_ok=True)
 71
 72    video_path = f"{result_dir}/video.mp4"
 73    img_path = f"{result_dir}/img.jpg"
 74    metrics_path = f"{result_dir}/metrics.json"
 75
 76    if just_paths:
 77        return video_path, img_path, metrics_path
 78
 79    # Save video result if available
 80    if result.video_clip:
 81        await asyncio.to_thread(result.video_clip.write_videofile, video_path)
 82
 83    # Save image result if available
 84    if result.image:
 85        await asyncio.to_thread(result.image.save, img_path)
 86
 87    # Save metrics if available
 88    if result.metrics:
 89        with open(metrics_path, "w") as f:
 90            json.dump(result.metrics, f)
 91
 92    return video_path, img_path, metrics_path
 93
 94
 95async def get_sorted_dicom_images(
 96    orthanc_url, username=None, password=None, latest_time=None
 97):
 98    """
 99    Fetch and sort DICOM images from an Orthanc server based on acquisition time.
100
101    :param orthanc_url: The Orthanc server URL.
102    :param username: Username for authentication (optional).
103    :param password: Password for authentication (optional).
104    :param latest_time: Filter for acquisition times after this datetime.
105    :return: A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime).
106    """
107
108    auth = (username, password) if username and password else None
109    images_with_dates = []
110    latest_acq_time = latest_time or datetime.min
111
112    async with AsyncClient() as client:
113        patients_response = await client.get(f"{orthanc_url}/patients", auth=auth)
114        for patient_id in patients_response.json():
115            studies_response = await client.get(
116                f"{orthanc_url}/patients/{patient_id}/studies", auth=auth
117            )
118            for study in studies_response.json():
119                study_id = study["ID"]
120                series_response = await client.get(
121                    f"{orthanc_url}/studies/{study_id}/series", auth=auth
122                )
123                for series in series_response.json():
124                    series_id = series["ID"]
125                    instances_response = await client.get(
126                        f"{orthanc_url}/series/{series_id}/instances",
127                        auth=auth,
128                    )
129                    for instance in instances_response.json():
130                        instance_id = instance["ID"]
131                        metadata_response = await client.get(
132                            f"{orthanc_url}/instances/{instance_id}/simplified-tags",
133                            auth=auth,
134                        )
135                        metadata = metadata_response.json()
136
137                        # Remove files that are not multiframe US's
138                        if not (
139                            metadata.get("SOPClassUID") == "1.2.840.10008.5.1.4.1.1.3.1"
140                            and int(metadata.get("NumberOfFrames", 0)) > 1
141                        ):
142                            continue
143
144                        acq_date, acq_time = metadata.get(
145                            "AcquisitionDate"
146                        ), metadata.get("AcquisitionTime")
147                        if acq_date and acq_time:
148                            acq_datetime = datetime.strptime(
149                                f"{acq_date}{acq_time.split('.')[0]}",
150                                "%Y%m%d%H%M%S",
151                            )
152
153                            images_with_dates.append(
154                                (acq_datetime, instance_id, metadata)
155                            )
156
157        final_images_with_dates = []
158
159        if len(images_with_dates) == 0:
160            return [], latest_acq_time
161
162        for acq_datetime, instance_id, metadata in images_with_dates:
163            if acq_datetime > latest_acq_time:
164                file_response = await client.get(
165                    f"{orthanc_url}/instances/{instance_id}/file",
166                    auth=auth,
167                )
168
169                # hash the main instance_id to be smaller
170                small_id = consistent_hash(instance_id, 10**8)
171                patient_id_real = metadata.get("PatientID")
172                instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}"
173
174                latest_acq_time = max(latest_acq_time, acq_datetime)
175
176                final_images_with_dates.append(
177                    (acq_datetime, file_response.content, instance_id)
178                )
179
180        images_with_dates.sort(key=lambda x: x[0])
181        latest_image = images_with_dates[-1]
182
183        # inject file_response.content into latest_image
184        file_response = await client.get(
185            f"{orthanc_url}/instances/{latest_image[1]}/file",
186            auth=auth,
187        )
188
189        instance_id = latest_image[1]
190        acq_datetime = latest_image[0]
191        metadata = latest_image[2]
192        small_id = consistent_hash(instance_id, 10**8)
193        patient_id_real = metadata.get("PatientID")
194        instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}"
195
196        latest_image = (
197            acq_datetime,
198            file_response.content,
199            instance_id,
200        )
201
202        if not final_images_with_dates:
203            final_images_with_dates = [latest_image]
204
205    return [
206        (image, instance_id) for _, image, instance_id in final_images_with_dates
207    ], latest_acq_time
208
209
210class UnauthorizedException(Exception):
211    """Raised when the user is unauthorized and should be redirected."""
212
213    pass
214
215
216def validate_auth_token(auth_token: str):
217    token_data = TOKEN_STORE.get(auth_token)
218    if not token_data or token_data["expires"] < datetime.utcnow():
219        raise UnauthorizedException
220    return token_data["username"]
221
222
223def validate_api_token(api_token: str):
224    token_data = API_TOKEN_STORE.get(api_token)
225    if not token_data or token_data["expires"] < datetime.utcnow():
226        raise HTTPException(status_code=403, detail="Invalid API token.")
227    return token_data["username"]
def generate_token():
22def generate_token():
23    return secrets.token_urlsafe(32)
def consistent_hash(value, mod):
26def consistent_hash(value, mod):
27    # Convert the input to a string and encode it
28    encoded_value = str(value).encode()
29    # Use a reproducible hash function (e.g., SHA-256)
30    hash_value = int(hashlib.sha256(encoded_value).hexdigest(), 16)
31    return hash_value % mod
TOKEN_STORE = {}
API_TOKEN_STORE = {}
async def save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config):
38async def save_dicom_and_get_results(live_batchdir, instance_id, dicom_content, config):
39    # Save the DICOM file in the appropriate directory
40    dicom_path = f"{live_batchdir}/{instance_id}.dcm"
41    if dicom_content is not None:
42        # if path already exists, return
43        if os.path.exists(dicom_path):
44            return
45        with open(dicom_path, "wb") as f:
46            f.write(dicom_content)
47
48    # Process the DICOM
49    result = await asyncio.to_thread(
50        retuve_run,
51        hip_mode=config.batch.hip_mode,
52        config=config,
53        modes_func=config.batch.mode_func,
54        modes_func_kwargs_dict={},
55        file=dicom_path,
56    )
57
58    return result
async def save_results(instance_id, savedir, result=None, just_paths=False):
61async def save_results(instance_id, savedir, result=None, just_paths=False):
62    """
63    Saves DICOM content, video, image, and metrics results for a given instance ID.
64
65    :param instance_id: The unique ID of the DICOM instance.
66    :param result: The result object returned by `retuve_run`.
67    :param savedir: The base directory for saving the results.
68    :param just_paths: Whether to return just the paths of the saved results.
69    """
70    result_dir = f"{savedir}/{instance_id}"
71    os.makedirs(result_dir, exist_ok=True)
72
73    video_path = f"{result_dir}/video.mp4"
74    img_path = f"{result_dir}/img.jpg"
75    metrics_path = f"{result_dir}/metrics.json"
76
77    if just_paths:
78        return video_path, img_path, metrics_path
79
80    # Save video result if available
81    if result.video_clip:
82        await asyncio.to_thread(result.video_clip.write_videofile, video_path)
83
84    # Save image result if available
85    if result.image:
86        await asyncio.to_thread(result.image.save, img_path)
87
88    # Save metrics if available
89    if result.metrics:
90        with open(metrics_path, "w") as f:
91            json.dump(result.metrics, f)
92
93    return video_path, img_path, metrics_path

Saves DICOM content, video, image, and metrics results for a given instance ID.

Parameters
  • instance_id: The unique ID of the DICOM instance.
  • result: The result object returned by retuve_run.
  • savedir: The base directory for saving the results.
  • just_paths: Whether to return just the paths of the saved results.
async def get_sorted_dicom_images(orthanc_url, username=None, password=None, latest_time=None):
 96async def get_sorted_dicom_images(
 97    orthanc_url, username=None, password=None, latest_time=None
 98):
 99    """
100    Fetch and sort DICOM images from an Orthanc server based on acquisition time.
101
102    :param orthanc_url: The Orthanc server URL.
103    :param username: Username for authentication (optional).
104    :param password: Password for authentication (optional).
105    :param latest_time: Filter for acquisition times after this datetime.
106    :return: A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime).
107    """
108
109    auth = (username, password) if username and password else None
110    images_with_dates = []
111    latest_acq_time = latest_time or datetime.min
112
113    async with AsyncClient() as client:
114        patients_response = await client.get(f"{orthanc_url}/patients", auth=auth)
115        for patient_id in patients_response.json():
116            studies_response = await client.get(
117                f"{orthanc_url}/patients/{patient_id}/studies", auth=auth
118            )
119            for study in studies_response.json():
120                study_id = study["ID"]
121                series_response = await client.get(
122                    f"{orthanc_url}/studies/{study_id}/series", auth=auth
123                )
124                for series in series_response.json():
125                    series_id = series["ID"]
126                    instances_response = await client.get(
127                        f"{orthanc_url}/series/{series_id}/instances",
128                        auth=auth,
129                    )
130                    for instance in instances_response.json():
131                        instance_id = instance["ID"]
132                        metadata_response = await client.get(
133                            f"{orthanc_url}/instances/{instance_id}/simplified-tags",
134                            auth=auth,
135                        )
136                        metadata = metadata_response.json()
137
138                        # Remove files that are not multiframe US's
139                        if not (
140                            metadata.get("SOPClassUID") == "1.2.840.10008.5.1.4.1.1.3.1"
141                            and int(metadata.get("NumberOfFrames", 0)) > 1
142                        ):
143                            continue
144
145                        acq_date, acq_time = metadata.get(
146                            "AcquisitionDate"
147                        ), metadata.get("AcquisitionTime")
148                        if acq_date and acq_time:
149                            acq_datetime = datetime.strptime(
150                                f"{acq_date}{acq_time.split('.')[0]}",
151                                "%Y%m%d%H%M%S",
152                            )
153
154                            images_with_dates.append(
155                                (acq_datetime, instance_id, metadata)
156                            )
157
158        final_images_with_dates = []
159
160        if len(images_with_dates) == 0:
161            return [], latest_acq_time
162
163        for acq_datetime, instance_id, metadata in images_with_dates:
164            if acq_datetime > latest_acq_time:
165                file_response = await client.get(
166                    f"{orthanc_url}/instances/{instance_id}/file",
167                    auth=auth,
168                )
169
170                # hash the main instance_id to be smaller
171                small_id = consistent_hash(instance_id, 10**8)
172                patient_id_real = metadata.get("PatientID")
173                instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}"
174
175                latest_acq_time = max(latest_acq_time, acq_datetime)
176
177                final_images_with_dates.append(
178                    (acq_datetime, file_response.content, instance_id)
179                )
180
181        images_with_dates.sort(key=lambda x: x[0])
182        latest_image = images_with_dates[-1]
183
184        # inject file_response.content into latest_image
185        file_response = await client.get(
186            f"{orthanc_url}/instances/{latest_image[1]}/file",
187            auth=auth,
188        )
189
190        instance_id = latest_image[1]
191        acq_datetime = latest_image[0]
192        metadata = latest_image[2]
193        small_id = consistent_hash(instance_id, 10**8)
194        patient_id_real = metadata.get("PatientID")
195        instance_id = f"{acq_datetime.strftime('%Y-%m-%d-%H:%M:%S')} Patient-{patient_id_real} ID-{small_id}"
196
197        latest_image = (
198            acq_datetime,
199            file_response.content,
200            instance_id,
201        )
202
203        if not final_images_with_dates:
204            final_images_with_dates = [latest_image]
205
206    return [
207        (image, instance_id) for _, image, instance_id in final_images_with_dates
208    ], latest_acq_time

Fetch and sort DICOM images from an Orthanc server based on acquisition time.

Parameters
  • orthanc_url: The Orthanc server URL.
  • username: Username for authentication (optional).
  • password: Password for authentication (optional).
  • latest_time: Filter for acquisition times after this datetime.
Returns

A tuple of (sorted list of (DICOM content, instance ID), latest acquisition datetime).

class UnauthorizedException(builtins.Exception):
211class UnauthorizedException(Exception):
212    """Raised when the user is unauthorized and should be redirected."""
213
214    pass

Raised when the user is unauthorized and should be redirected.

def validate_auth_token(auth_token: str):
217def validate_auth_token(auth_token: str):
218    token_data = TOKEN_STORE.get(auth_token)
219    if not token_data or token_data["expires"] < datetime.utcnow():
220        raise UnauthorizedException
221    return token_data["username"]
def validate_api_token(api_token: str):
224def validate_api_token(api_token: str):
225    token_data = API_TOKEN_STORE.get(api_token)
226    if not token_data or token_data["expires"] < datetime.utcnow():
227        raise HTTPException(status_code=403, detail="Invalid API token.")
228    return token_data["username"]