diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dd6fb0e2..4ddfcf3c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,7 @@ jobs: # glymur needs dynamic library on macos and windows, and causing error # os: [ubuntu-22.04, macos-latest, windows-latest] os: [ubuntu-22.04] - + runs-on: ${{ matrix.os }} steps: - uses: actions/checkout@v4 @@ -34,7 +34,7 @@ jobs: path: ./.venv key: venv-${{ hashFiles('poetry.lock') }} - name: Install the project dependencies - run: poetry install --with dev,lsm,df,wk + run: poetry install --with dev --extras all - name: Run the automated tests run: poetry run pytest -v working-directory: ./tests diff --git a/linc_convert/utils/dandifs.py b/linc_convert/utils/dandifs.py new file mode 100755 index 00000000..4e5cc9c7 --- /dev/null +++ b/linc_convert/utils/dandifs.py @@ -0,0 +1,536 @@ +"""A `fsspec` File System for (remote) DANDI.""" +# stdlib +import json +import logging +import os +import re +from os import PathLike +from typing import Iterator +from urllib.parse import unquote as url_unquote + +# externals +import requests +from dandi.dandiapi import ( + BaseRemoteAsset, + DandiAPIClient, + DandiInstance, + NotFoundError, + RemoteAsset, + RemoteDandiset, +) +from dandi.utils import get_instance +from fsspec import register_implementation +from fsspec.implementations.http import HTTPFileSystem +from fsspec.registry import known_implementations +from fsspec.spec import AbstractBufferedFile, AbstractFileSystem +from fsspec.utils import stringify_path, tokenize + +LOG = logging.getLogger(__name__) + + +class RemoteDandiFileSystem(AbstractFileSystem): + """ + A file system that browses through a remote dandiset. + + Examples + -------- + Load and parse a remote file + ```python + from dandi.fs import RemoteDandiFileSystem + import json + fs = RemoteDandiFileSystem() + with fs.open('dandi://dandi/000026/rawdata/sub-I38/ses-MRI/anat/' + 'sub-I38_ses-MRI-echo-4_flip-4_VFA.json') as f: + info = json.load(f) + ``` + + The 'dandi://' protocol is registered with fsspec, so the same + result can be achived by + ```python + import fsspec + import json + with fsspec.open('dandi://dandi/000026/rawdata/sub-I38/ses-MRI/anat/' + 'sub-I38_ses-MRI-echo-4_flip-4_VFA.json') as f: + info = json.load(f) + ``` + + Browse a dataset + ```python + from dandi.fs import RemoteDandiFileSystem + fs = RemoteDandiFileSystem('000026') + fs.glob('**/anat/*.json') + ``` + + """ + + def __init__( + self, + dandiset: str | RemoteDandiset | None = None, + version: str | None = None, + client: str | DandiInstance | DandiAPIClient | None = None, + **http_kwargs + ) -> None: + """ + Initialise a remote DANDI file system. + + The root of a DANDI file system is a dandiset at a given version. + The file system can be initialized from + - a `RemoteDandiset` instance; or + - the name of a dandiset [+ version]; and + . a DandiAPIClient instance + . a DandiInstance instance + . the name of a known DANDI instance + . the url of a DANDI server + + Parameters + ---------- + dandiset : str or dandi.RemoteDandiset, optional + An instantiated dandiset, or the identifier of a dandiset + (e.g., `'000026'`). See `RemoteDandiset` for more info. + version : str, optional + The version of the dandiset to query (e.g., `'draft'`) + client : str or dandi.DandiInstance or dandi.DandiAPIClient + An instantiated dandi instance (or its identifier) or + an instantiated dandi client (or its url). + Default: `'dandi'`. See `DandiAPIClient` for more info. + + Other Parameters + ---------------- + http_kwargs: key-value + Any other parameters passed on to the HTTP file system + """ + self._httpfs = HTTPFileSystem(**http_kwargs) + super().__init__() + if not isinstance(dandiset, RemoteDandiset): + if isinstance(client, str): + if not client.startswith('http'): + client = get_instance(client) + if isinstance(client, DandiInstance): + client = DandiAPIClient.for_dandi_instance(client) + else: + client = DandiAPIClient(client) + if dandiset: + dandiset = self.get_dandiset(client, dandiset, version) + self._dandiset = dandiset + self._client = None if dandiset else client + + # ------------------------------------------------------------------ + # DANDI-specific helpers + # ------------------------------------------------------------------ + + @property + def dandiset(self) -> RemoteDandiset: + """Access dandiset.""" + return self._dandiset + + @dandiset.setter + def dandiset(self, x: RemoteDandiset) -> None: + """Assign dandiset.""" + if x: + self._client = None + elif self._dandiset: + self._client = self._dandiset.client + self._dandiset = x + + @property + def client(self) -> DandiAPIClient: + """Access dandi client.""" + return self.dandiset.client if self.dandiset else self._client + + @client.setter + def client(self, x: DandiAPIClient) -> None: + """Assign dandi client.""" + if self.dandiset: + raise ValueError('Cannot assign a DANDI client to a FileSystem ' + 'that is already linked to a dandiset. ' + 'Unassign the dandiset first.') + self._client = x + + @classmethod + def for_url(cls, url: str) -> "RemoteDandiFileSystem": + """ + Instantiate a FileSystem that interacts with the correct + DANDI instance for a given url. + """ + instance, dandiset, version, *_ = split_dandi_url(url) + return cls(dandiset, version, instance) + + @classmethod + def _auth_apply( + cls, fn: callable, client: DandiAPIClient, auth: bool | None = None + ) -> object: + is_401 = lambda e: ( # noqa: E731 + e.response is not None + and e.response.status_code == 401 + and auth is not False + ) + level = logging.getLogger("dandi").getEffectiveLevel() + logging.getLogger("dandi").setLevel(1000) + try: + if auth is not False: + client.dandi_authenticate() + return fn() + except requests.HTTPError as e: + if not is_401(e): + raise e + try: + client.authenticate(os.environ.get("LINCBRAIN_API_KEY", "")) + return fn() + except requests.HTTPError as e: + if not is_401(e): + raise e + exc = e + raise exc + finally: + logging.getLogger("dandi").setLevel(level) + + @classmethod + def _get_dandiset( + cls, client: DandiAPIClient, *args, **kwargs + ) -> RemoteDandiset: + auth = kwargs.pop("auth", None) + return cls._auth_apply( + lambda: client.get_dandiset(*args, **kwargs), + client, auth + ) + + @classmethod + def _get_asset(cls, client: DandiAPIClient, *a, **k) -> BaseRemoteAsset: + auth = k.pop("auth", None) + return cls._auth_apply( + lambda: client.get_asset(*a, **k), + client, auth + ) + + def get_dandiset( + self, path: str, auth: bool | None = None, + dandiset: RemoteDandiset | None = None, + ) -> tuple[RemoteDandiset, str]: + """ + If path is a relative path, return (self.dandiset, path) + Else, the path is an absolute URL and we instantiate the correct + remote dandiset and spit out the relative path. + + Returns: (dandiset, path) or (dandiset, asset) + """ + dandiset = dandiset or self.dandiset + if path.startswith(('http://', 'https://', 'dandi://', 'DANDI:')): + instance, dandiset_id, version_id, path, asset_id \ + = split_dandi_url(path) + api_url = get_instance(instance) + if self.client.api_url == api_url.api: + client = self.client + else: + client = DandiAPIClient.for_dandi_instance(instance) + dandiset = None + if not asset_id: + args = (dandiset_id, version_id) + if not dandiset or dandiset.identifier != dandiset_id: + dandiset = self._get_dandiset(client, *args, auth=auth) + if not dandiset or dandiset.version_id != version_id: + dandiset = self._get_dandiset(client, *args, auth=auth) + else: + asset = self._get_asset(client, asset_id, auth=auth) + return dandiset, asset + elif not dandiset: + raise ValueError('File system must be linked to a dandiset to ' + 'use relative paths.') + return dandiset, path + + def _get_json(self, url: str) -> dict: + with self._httpfs.open(url, "rt") as f: + info = json.load(f) + return info + + def _s3_url_from_asset(self, asset: RemoteAsset) -> str: + info = self._get_json(asset.api_url) + # info = requests.request(url=asset.api_url, method='get').json() + url = '' + if "neuroglancerUrl" in info: + # LINC: use neuroglancer url but drop format protocol + url = info["neuroglancerUrl"] + url = url.split("://") + url = "://".join(url[1:]) + if 'neuroglancer.lincbrain.org' not in url: + raise NotFoundError(asset, url) + return url + for url in info['contentUrl']: + if "s3.amazonaws.com" in url: + return url + raise NotFoundError(asset, url) + + def _maybe_to_s3(self, url: str) -> str: + url = stringify_path(url) + # FIXME: not very generic test + is_s3 = ( + 's3.amazonaws.com' in url or + 'neuroglancer.lincbrain.org' in url + ) + if not is_s3: + url = self.s3_url(url) + return url + + def s3_url(self, path: str, **kwargs) -> str: + """Get the the asset url on AWS S3.""" + dandiset, asset = self.get_dandiset(path, **kwargs) + if not isinstance(path, RemoteAsset): + try: + asset = self._auth_apply( + lambda: dandiset.get_asset_by_path(asset), + dandiset.client, kwargs.get("auth", None) + ) + + except NotFoundError: + path = asset.rstrip("/") + path_prefix, path_suffix = path, '' + while path_prefix: + + *path_prefix, new_suffix = path_prefix.split('/') + if not path_suffix: + path_suffix = new_suffix + else: + path_suffix = path_suffix.split('/') + path_suffix = '/'.join([new_suffix, *path_suffix]) + path_prefix = '/'.join(path_prefix) + + try: + asset = self._auth_apply( + lambda: dandiset.get_asset_by_path(path_prefix), + dandiset.client, kwargs.get("auth", None) + ) + url = self._s3_url_from_asset(asset).rstrip("/") + url += "/" + path_suffix + return url + except NotFoundError: + continue + raise NotFoundError(path) + + return self._s3_url_from_asset(asset) + + # ------------------------------------------------------------------ + # FileSystem API + # ------------------------------------------------------------------ + + def ls( # noqa: D102 + self, + path: str | PathLike, + detail: bool = True, + **kwargs + ) -> list[str] | list[dict]: + path = stringify_path(path).strip('/') + assets = kwargs.pop('assets', None) + if assets is None: + dandiset = kwargs.pop('dandiset', None) + if not dandiset: + dandiset, path = self.get_dandiset(path) + assets = dandiset.get_assets_with_path_prefix(path) + + entries = [] + full_dirs = set() + + def getdate(asset: RemoteAsset, field: str) -> str: + return getattr(getattr(asset, field, None), + 'isoformat', lambda: None)() + + assets, assets_in = [], assets + for asset in assets_in: + size = getattr(asset, 'size', None) + created = getdate(asset, 'created') + modified = getdate(asset, 'modified') + identifier = getattr(asset, 'identifer', None) + asset = getattr(asset, 'path', asset) + # 1) is the input path exactly this asset? + asset = asset[len(path):].strip('/') + if not asset: + entries.append({ + 'name': path, + 'size': size, + 'created': created, + 'modified': modified, + 'identifier': identifier, + 'type': 'file', + }) + continue + # 2) look at the first level under `path` + name = asset.split('/')[0] + fullpath = path + '/' + name + if '/' not in asset: + # 3) this asset is a file directly under `path` + entries.append({ + 'name': fullpath, + 'size': size, + 'created': created, + 'modified': modified, + 'identifier': identifier, + 'type': 'file', + }) + continue + else: + # 4) this asset is a file a few levels under `path` + # -> we do not list the path but list the directory + if fullpath not in full_dirs: + entries.append({ + 'name': fullpath, + 'size': None, + 'type': 'directory', + }) + full_dirs.add(fullpath) + assets.append(path + '/' + asset) + + if detail: + return entries + else: + return [entry['name'] for entry in entries] + + def checksum(self, path: str, **kwargs) -> str: # noqa: D102 + # we override fsspec's default implementation when path is a + # directory (since in this case there is no created/modified date) + dandiset = kwargs.pop('dandiset', None) + if not dandiset: + dandiset, path = self.get_dandiset(path) + assets = dandiset.get_assets_with_path_prefix(path) + return tokenize(assets) + + def glob( # noqa: D102 + self, + path: str, + order: str | None = None, + **kwargs + ) -> Iterator[str]: + # we override fsspec's default implementation (which uses find) + # to leverage the more efficient `get_assets_by_glob` from dandi + # + # order : [-]{created, modified, path} + # + # TODO: implement fsspec `maxdepth` keyword + dandiset = kwargs.pop('dandiset', None) + if not dandiset: + dandiset, path = self.get_dandiset(path) + assets = dandiset.get_assets_by_glob(path, order) + for asset in assets: + yield asset.path + + def exists(self, path: str, **kwargs) -> bool: # noqa: D102 + # we override fsspec's default implementation (which uses info) + # to avoid calls to ls (which calls get_assets_by_path on the + # *parent* and is therefore slower) + dandiset = kwargs.pop('dandiset', None) + if not dandiset: + dandiset, path = self.get_dandiset(path) + if isinstance(path, BaseRemoteAsset): + return True + # check if it is a file + try: + dandiset.get_asset_by_path(path) + return True + except NotFoundError: + pass + # check if it is a directory + path = path.rstrip('/') + '/' + assets = dandiset.get_assets_with_path_prefix(path) + try: + next(assets) + return True + except StopIteration: + pass + # it might be a path to something inside a zarr -- let's try to find it + try: + return self._httpfs.exists(self.s3_url(path, dandiset=dandiset)) + except NotFoundError: + pass + return False + + def open( # noqa: D102 + self, + path: str, + *args, + **kwargs + ) -> AbstractBufferedFile: + path = self._maybe_to_s3(path) + return self._httpfs.open(path, *args, **kwargs) + + +def split_dandi_url(url: str) -> tuple[str, str, str, str, str]: + """ + Split a valid dandi url into its subparts. + Returns: (instance, dandiset_id, version_id, path, asset_id) + where instance can be an instance_id or an URL. + """ + instance = None + server = None + dandiset_id = None + version = None + path = '' + asset_id = None + if url.startswith('dandi://'): + # dandi:///[@][/] + ptrn = r'dandi://(?P[^/]+)/(?P\d+)(@(?P[^/]+))?(?P

.*)' + match = re.match(ptrn, url) + if not match: + raise SyntaxError('Wrong dandi url') + instance = match.group('i') + dandiset_id = match.group('d') + version = match.group('v') + path = match.group('p') + elif url.startswith(('DANDI:', 'https://identifiers.org/DANDI:')): + # DANDI:[/] + # https://identifiers.org/DANDI:[/] + ptrn = r'(https://identifiers.org/)?DANDI:(?P\d+)(/(?P[^/]+))?' + match = re.match(ptrn, url) + if not match: + raise SyntaxError('Wrong dandi url') + dandiset_id = match.group('d') + version = match.group('v') + instance = 'DANDI' + else: + ptrn = r'https://(?P[^/]+)(/api)?(/#)?(?P.*)' + match = re.match(ptrn, url) + if not match: + raise SyntaxError('Wrong dandi url') + server = match.group('s') + url = match.group('u') + if url.startswith('/dandisets/'): + # https://[/api]/dandisets/ + # . [/versions[/]] + # . /versions//assets/[/download] + # . /versions//assets/?path= + ptrn = r'/dandisets/(?P\d+)(/versions/(?P[^/]+))?(?P.*)' + match = re.match(ptrn, url) + if not match: + raise SyntaxError('Wrong dandi url') + dandiset_id = match.group('d') + version = match.group('v') + url = match.group('u') + ptrn = r'/assets/((\?path=(?P

[.*]+))|(?P[^/]+))' + match = re.match(ptrn, url) + if match: + path = match.group('p') + asset_id = match.group('a') + elif url.startswith('/dandiset/'): + # https://[/api]/[#/]dandiset/ + # [/][/files[?location=]] + ptrn = r'(/(?P[^/]+))?/files(\?location=(?P

.*))?' + ptrn = r'/dandiset/(?P\d+)' + ptrn + match = re.match(ptrn, url) + dandiset_id = match.group('d') + version = match.group('v') + path = match.group('p') + elif url.startswith('/assets/'): + # https://[/api]/assets/[/download] + ptrn = r'/assets/(?P[^/]+)' + match = re.match(ptrn, url) + if not match: + raise SyntaxError('Wrong dandi url') + asset_id = match.group('a') + + path = url_unquote(path) + path = (path or '').strip('/') + + if instance is None: + instance = 'https://' + server + + return instance, dandiset_id, version, path, asset_id + + +if "dandi" not in known_implementations: + register_implementation("dandi", RemoteDandiFileSystem) diff --git a/linc_convert/utils/opener.py b/linc_convert/utils/opener.py new file mode 100755 index 00000000..89390dd1 --- /dev/null +++ b/linc_convert/utils/opener.py @@ -0,0 +1,594 @@ +"""General-purpose byte stream opener. + +Classes +------- +open + General-purpose stream opener. +async_open + Asymchronous version of `open`. + +Functions +--------- +stringify_path + Ensure that the input is a str or a file-like object. +fsopen + Open a file with fsspec (authentify if needed). +filesystem + Return the fsspec filesystem corresponding to a protocol or URI. +exists + Check that the file or directory pointed by a URI exists. +async_exists + Asychronous version of `exists`. +linc_auth_opt + Create fsspec authentication options to access lincbrain data. +dandi_auth_opt + Create dandi authentication options to access dandi data. +read_json + Read local or remote JSON file. +write_json + Write local or remote JSON file. +update_json + Update local or remote JSON file. +""" +# stdlib +import json +import logging +import time +from bz2 import BZ2File +from inspect import isawaitable +from io import BufferedReader +from os import PathLike, environ +from pathlib import Path +from types import TracebackType +from typing import IO + +# externals +import fsspec +import fsspec.asyn +import requests +from indexed_gzip import IndexedGzipFile + +LOG = logging.getLogger(__name__) + +DANDI_API = { + "linc": "https://api.lincbrain.org/api", + "dandi": "https://api.dandiarchive.org/api", +} + +# Cache for LINC cookies +_LINC_AUTH_CACHE = {} +_DANDI_AUTH_CACHE = {} + +# Cache for fsspec filesystems +_FILESYSTEMS_CACHE = {} + +# Hints +URILike = PathLike | str +FileLike = IO | URILike +FileSystem = fsspec.AbstractFileSystem + + +def stringify_path(filename: FileLike) -> IO | str: + """Ensure that the input is a str or a file-like object.""" + if isinstance(filename, PathLike): + return filename.__fspath__() + if isinstance(filename, Path): + return str(filename) + return filename + + +def linc_auth_opt(token: str | None = None) -> dict: + """ + Create fsspec authentication options to access lincbrain data. + + These options should only be used when accessing data behind + `neuroglancer.lincbrain.org`. + + Parameters + ---------- + token : str + Your LINCBRAIN_API_KEY + + Returns + ------- + opt : dict + options to pass to `fsspec`'s `HTTPFileSystem`. + """ + API = DANDI_API['linc'] + token = token or environ.get('LINCBRAIN_API_KEY', None) + if not token: + return {} + if token in _LINC_AUTH_CACHE: + return _LINC_AUTH_CACHE[token] + headers = {"Authorization": f"token {token}"} + # Check that credential is correct + session = requests.Session() + session.get(f"{API}/auth/token", headers=headers) + # Get cookies + response = session.get(f"{API}/permissions/s3/", headers=headers) + cookies = response.cookies.get_dict() + # Pass cookies to FileSystem + opt = {'cookies': cookies} + _LINC_AUTH_CACHE[token] = opt + return opt + + +def dandi_auth_opt(token: str | None = None, instance: str = "dandi") -> dict: + """ + Create fsspec authentication options to access the dandi api. + + These options should only be used when accessing data behind + `dandi://`. + + Parameters + ---------- + token : str + Your DANDI_API_KEY or LINCBRAIN_API_KEY + instance : {"dandi", "linc"} + + Returns + ------- + opt : dict + options to pass to `fsspec`'s `HTTPFileSystem`. + """ + prefix = {"dandi": "DANDI", "linc": "LINCBRAIN"}[instance] + token = ( + token + or environ.get(f'{prefix}_API_KEY', None) + or environ.get('DANDI_API_KEY', None) + ) + if not token: + return {} + if token in _DANDI_AUTH_CACHE: + return _DANDI_AUTH_CACHE[token] + headers = {"Authorization": f"token {token}"} + # Check that credential is correct + session = requests.Session() + session.get(f"{DANDI_API[instance]}/auth/token", headers=headers) + # Pass cookies to FileSystem + opt = {"headers": headers} + _DANDI_AUTH_CACHE[token] = opt + return opt + + +def filesystem(protocol: URILike | FileSystem, **opt: dict) -> FileSystem: + """Return the filesystem corresponding to a protocol or URI.""" + if isinstance(protocol, fsspec.AbstractFileSystem): + return protocol + protocol = url = stringify_path(protocol) + if "://" in protocol: + protocol = protocol.split("://")[0] + + # --- LINC/DANDI authentification --- + linc_auth = opt.pop("linc_auth", None) + if linc_auth is None: + linc_auth = "neuroglancer.lincbrain.org" in url + linc_auth = linc_auth or "dandi://linc" in url + dandi_auth = opt.pop("dandi_auth", None) + if dandi_auth is None: + dandi_auth = "dandi://" in url + # ----------------------------------- + + if (protocol, linc_auth, dandi_auth) in _FILESYSTEMS_CACHE: + return _FILESYSTEMS_CACHE[(protocol, linc_auth, dandi_auth)] + + # --- LINC/DANDI authentification --- + opt.setdefault("client_kwargs", {}) + if linc_auth: + LOG.debug(f"linc_auth - {url}") + opt["client_kwargs"].update(linc_auth_opt()) + if dandi_auth: + LOG.debug(f"dandi_auth - {url}") + instance = "linc" if linc_auth else "dandi" + opt["client_kwargs"].update(dandi_auth_opt(instance=instance)) + # ----------------------------------- + + fs = fsspec.filesystem(protocol, **opt) + _FILESYSTEMS_CACHE[(protocol, linc_auth, dandi_auth)] = fs + return fs + + +def exists(uri: URILike, **opt) -> bool: + """Check that the file or directory pointed by a URI exists.""" + tic = time.time() + fs = filesystem(uri, **opt) + exists = fs.exists(uri) + toc = time.time() + LOG.debug(f"exists({uri}): {exists} | {toc-tic} s") + return exists + + +async def async_exists(uri: URILike, **opt) -> bool: + """Check that the file or directory pointed by a URI exists.""" + tic = time.time() + fs = filesystem(uri, **opt) + if isinstance(fs, fsspec.asyn.AsyncFileSystem): + exists = await fs._exists(uri) + else: + exists = fs.exists(uri) + toc = time.time() + LOG.debug(f"exists({uri}): {exists} | {toc-tic} s") + return exists + + +def read_json(fileobj: FileLike, *args, **kwargs) -> dict: + """Read local or remote JSON file.""" + tic = time.time() + with open(fileobj, "rb") as f: + data = json.load(f, *args, **kwargs) + toc = time.time() + LOG.debug(f"read_json({fileobj}): {toc-tic} s") + return data + + +def write_json(obj: dict, fileobj: FileLike, *args, **kwargs) -> None: + """Write local or remote JSON file.""" + tic = time.time() + with open(fileobj, "w") as f: + json.dump(obj, f, *args, **kwargs) + toc = time.time() + LOG.debug(f"write_json(..., {fileobj}): {toc-tic} s") + return + + +def update_json(obj: dict, fileobj: FileLike, *args, **kwargs) -> dict: + """Read local or remote JSON file.""" + write_json(read_json(fileobj).update(obj), fileobj, *args, **kwargs) + + +def fsopen( + uri: URILike, + mode: str = "rb", + compression: str | None = None +) -> fsspec.spec.AbstractBufferedFile: + """Open a file with fsspec (authentify if needed).""" + fs = filesystem(uri) + return fs.open(uri, mode, compression=compression) + + +async def fsopen_async( + uri: URILike, + mode: str = "rb", + compression: str | None = None +) -> fsspec.spec.AbstractBufferedFile: + """Open a file with fsspec (authentify if needed).""" + fs = filesystem(uri) + if hasattr(fs, "open_async"): + return await fs.open_async(uri, mode, compression=compression) + else: + return fs.open(uri, mode, compression=compression) + + +class open: + """ + General-purpose stream opener. + + Handles: + + * paths (local or url) + * opened file-objects + * anything that `fsspec` handles + * compressed streams + """ + + def __init__( + self, + fileobj: FileLike, + mode: str = 'rb', + compression: str | None = None, + open: bool = True, + ) -> None: + """ + Open a file from a path or url or an opened file object. + + Parameters + ---------- + fileobj : path or url or file-object + Input file + mode : str + Opening mode + compression : {'zip', 'bz2', 'gzip', 'lzma', 'xz', 'infer'} + Compression mode. + If 'infer', guess from magic number (if mode 'r') or + filename (if mode 'w'). + auth : dict + Authentification options to pass to `fsspec`. + + Returns + ------- + fileobj + Opened file + """ + self.fileobj: str | IO = stringify_path(fileobj) + """The input path or file-like object""" + self.fileobjs: list[IO] = [] + """Additional file-like objects that may be created upon opening.""" + self.mode: str = mode + """Opening mode.""" + self.compression: str = compression + """Compression mode.""" + + # open + self._is_open = False + if open: + self._open() + + @property + def _effective_fileobj(self) -> IO: + # This is the most final file-like object + return self.fileobjs[-1] if self.fileobjs else self.fileobj + + def _close(self) -> None: + # close all the file-like objects that we've created + for fileobj in reversed(self.fileobjs): + fileobj.close() + self.fileobjs = [] + self._is_open = False + + def _open(self) -> None: + if self._is_open: + return + if isinstance(self.fileobj, str): + self._fsspec_open() + if self.compression == 'infer' and 'r' in self.mode: + self._infer() + self._is_open = True + + def _fsspec_open(self) -> None: + # Take the last file object (most likely the input fileobj), which + # must be a path, and open it with `fsspec.open()`. + uri = stringify_path(self._effective_fileobj) + # Ensure that input is a string + if not isinstance(uri, str): + raise TypeError("Expected a URI, but got:", uri) + # Set compression option + opt = dict() + if not (self.compression == 'infer' and 'r' in self.mode): + opt['compression'] = self.compression + # Open with fsspec + fileobj = fsopen(uri, mode=self.mode, **opt) + self.fileobjs.append(fileobj) + # -> returns a fsspec object that's not always fully opened + # so open again to be sure. + if hasattr(fileobj, 'open'): + fileobj = fileobj.open() + self.fileobjs.append(fileobj) + + def _infer(self) -> None: + # Take the last file object (which at this point must be a byte + # stream) and infer its compression from its magic bytes. + fileobj = self._effective_fileobj + if not hasattr(fileobj, "peek"): + fileobj = BufferedReader(fileobj) + self.fileobjs.append(fileobj) + try: + magic = fileobj.peek(2) + except Exception: + return + if magic == b'\x1f\x8b': + fileobj = IndexedGzipFile(fileobj) + self.fileobjs.append(fileobj) + if magic == b'BZh': + fileobj = BZ2File(fileobj) + self.fileobjs.append(fileobj) + + # ------------------------------------------------------------------ + # open() / IO API + # ------------------------------------------------------------------ + + def __enter__(self) -> IO: # noqa: D105 + self._open() + return self._effective_fileobj + + def __exit__( # noqa: D105 + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + traceback: TracebackType | None, + ) -> None: + self._close() + + def __del__(self) -> None: + """Close all owned streams on delete.""" + self._close() + + def open(self) -> IO: + """Open the file.""" + if not self._is_open: + self._open() + return self + + def close(self) -> None: + """Close the file object.""" + self._close() + + def read(self, *a, **k) -> bytes | str: + """Read characters ("t") or byes ("b").""" + return self._effective_fileobj.read(*a, **k) + + def readline(self, *a, **k) -> bytes | str: + """Read lines ("t").""" + return self._effective_fileobj.readline(*a, **k) + + def readinto(self, *a, **k) -> None: + """Read bytes into an existing buffer.""" + return self._effective_fileobj.readinto(*a, **k) + + def write(self, *a, **k) -> int: + """Write characters ("t") or bytes ("b").""" + return self._effective_fileobj.write(*a, **k) + + def writeline(self, *a, **k) -> int: + """Write lines.""" + return self._effective_fileobj.writeline(*a, **k) + + def peek(self, *a, **k) -> bytes | str: + """Read the first bytes without moving the cursor.""" + return self._effective_fileobj.peek(*a, **k) + + def seek(self, *a, **k) -> int: + """Move the cursor.""" + return self._effective_fileobj.seek(*a, **k) + + def tell(self, *a, **k) -> int: + """Return the cursor position.""" + return self._effective_fileobj.tell(*a, **k) + + def readable(self) -> bool: + """Is stream readable.""" + return all(getattr(x, "readable", False) for x in self.fileobjs) + + def writable(self) -> bool: + """Is stream readable.""" + return all(getattr(x, "writable", False) for x in self.fileobjs) + + def seekable(self) -> bool: + """Is stream readable.""" + return all(getattr(x, "seekable", False) for x in self.fileobjs) + + +async def _maybe_await(obj: object) -> object: + if isawaitable(obj): + return await obj + else: + return obj + + +class async_open(open): + """Async version of open.""" + + async def _close_async(self) -> None: + # close all the file-like objects that we've created + for fileobj in reversed(self.fileobjs): + if isinstance(fileobj, fsspec.asyn.AbstractAsyncStreamedFile): + await fileobj.close() + else: + fileobj.close() + self.fileobjs = [] + self._is_open = False + + async def _open_async(self) -> None: + if self._is_open: + return + if isinstance(self.fileobj, str): + await self._fsspec_open_async() + if self.compression == 'infer' and 'r' in self.mode: + await self._infer_async() + self._is_open = True + + async def _fsspec_open_async(self) -> None: + # Take the last file object (most likely the input fileobj), which + # must be a path, and open it with `fsspec.open()`. + uri = stringify_path(self._effective_fileobj) + # Ensure that input is a string + if not isinstance(uri, str): + raise TypeError("Expected a URI, but got:", uri) + # Set compression option + opt = dict() + if not (self.compression == 'infer' and 'r' in self.mode): + opt['compression'] = self.compression + # Open with fsspec + fileobj = await fsopen_async(uri, mode=self.mode, **opt) + self.fileobjs.append(fileobj) + # -> returns a fsspec object that's not always fully opened + # so open again to be sure. + if hasattr(fileobj, 'open_async'): + fileobj = await fileobj.open_async() + self.fileobjs.append(fileobj) + elif hasattr(fileobj, 'open'): + fileobj = fileobj.open() + self.fileobjs.append(fileobj) + + async def _infer_async(self) -> None: + # Take the last file object (which at this point must be a byte + # stream) and infer its compression from its magic bytes. + fileobj = self._effective_fileobj + if not hasattr(fileobj, "peek"): + fileobj = BufferedReader(fileobj) + self.fileobjs.append(fileobj) + try: + if hasattr(fileobj, "peek_async"): + magic = await fileobj.peek(2) + else: + magic = fileobj.peek(2) + except Exception: + return + if magic == b'\x1f\x8b': + fileobj = IndexedGzipFile(fileobj) + self.fileobjs.append(fileobj) + if magic == b'BZh': + fileobj = BZ2File(fileobj) + self.fileobjs.append(fileobj) + + # ------------------------------------------------------------------ + # open() / IO API + # ------------------------------------------------------------------ + + async def __aenter__(self) -> IO: # noqa: D105 + await self._open_async() + return self._effective_fileobj + + async def __aexit__( # noqa: D105 + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + traceback: TracebackType | None, + ) -> None: + await self._close_async() + + async def __del__(self) -> None: + """Close all owned streams on delete.""" + await self._close_async() + + async def open(self) -> IO: + """Open the file.""" + await self._open_async() + return self + + async def close(self) -> None: + """Close the file object.""" + await self._close_async() + + async def read(self, *a, **k) -> bytes | str: + """Read characters ("t") or byes ("b").""" + return await _maybe_await(self._effective_fileobj.read(*a, **k)) + + async def readline(self, *a, **k) -> bytes | str: + """Read lines ("t").""" + return await _maybe_await(self._effective_fileobj.readline(*a, **k)) + + async def readinto(self, *a, **k) -> None: + """Read bytes into an existing buffer.""" + return await _maybe_await(self._effective_fileobj.readinto(*a, **k)) + + async def write(self, *a, **k) -> int: + """Write characters ("t") or bytes ("b").""" + return await _maybe_await(self._effective_fileobj.write(*a, **k)) + + async def writeline(self, *a, **k) -> int: + """Write lines.""" + return await _maybe_await(self._effective_fileobj.writeline(*a, **k)) + + async def peek(self, *a, **k) -> bytes | str: + """Read the first bytes without moving the cursor.""" + return await _maybe_await(self._effective_fileobj.peek(*a, **k)) + + async def seek(self, *a, **k) -> int: + """Move the cursor.""" + return await _maybe_await(self._effective_fileobj.seek(*a, **k)) + + async def tell(self, *a, **k) -> int: + """Return the cursor position.""" + return await _maybe_await(self._effective_fileobj.tell(*a, **k)) + + def readable(self) -> bool: + """Is stream readable.""" + return all(getattr(x, "readable", False) for x in self.fileobjs) + + def writable(self) -> bool: + """Is stream readable.""" + return all(getattr(x, "writable", False) for x in self.fileobjs) + + def seekable(self) -> bool: + """Is stream readable.""" + return all(getattr(x, "seekable", False) for x in self.fileobjs) diff --git a/pyproject.toml b/pyproject.toml index a703051c..f8cda203 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,28 +28,22 @@ cyclopts = "^3.0.0" numpy = "*" nibabel = "*" zarr = "^2.0.0" - - -[tool.poetry.group.df] -optional = true -[tool.poetry.group.df.dependencies] -glymur = "*" - -[tool.poetry.group.lsm] -optional = true -[tool.poetry.group.lsm.dependencies] -tifffile = "*" - -[tool.poetry.group.psoct] -optional = true -[tool.poetry.group.psoct.dependencies] -h5py = "*" -scipy = "*" - -[tool.poetry.group.wk] -optional = true -[tool.poetry.group.wk.dependencies] -wkw = "*" +# optionals +glymur = { version = "*", optional = true } +tifffile = { version = "*", optional = true } +h5py = { version = "*", optional = true } +scipy = { version = "*", optional = true } +wkw = { version = "*", optional = true } +dandi = { version = "^0.44", optional = true } +fsspec = { version = ">= 2021.0.0", extras = ["full"], optional = true } + +[tool.poetry.extras] +remote = ["dandi", "fsspec"] +df = ["glymur"] +lsm = ["tifffile"] +psoct = ["h5py", "scipy"] +wk = ["wkw"] +all = ["dandi", "fsspec", "glymur", "tifffile", "h5py", "scipy", "wkw"] [tool.poetry.group.dev] optional = true @@ -108,6 +102,8 @@ target-version = "py310" [tool.ruff.lint] select = ["ANN", "D", "E", "F", "I"] ignore = [ + "ANN002", # args should not be annotated + "ANN003", # kwargs should not be annotated "ANN101", # self should not be annotated. "ANN102" # cls should not be annotated. ] @@ -138,5 +134,5 @@ bump = true pattern = "default-unprefixed" [build-system] -requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning>=1.0.0,<2.0.0"] +requires = ["poetry-core>=1.0.0,<2.0.0", "poetry-dynamic-versioning>=1.0.0,<2.0.0"] build-backend = "poetry_dynamic_versioning.backend"